Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIO based file transport implementation #1

Merged
merged 12 commits into from
Aug 22, 2019

Conversation

isururanawaka
Copy link
Contributor

No description provided.

Copy link
Contributor

@DImuthuUpe DImuthuUpe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @isururanawaka this is really great and well thought implementation. I have few comments though.

/**
* Includes constants related to S3 SDK
*/
public interface S3Constants {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we use a constant class for this instead of an Interface and use static imports? Using an interface to keep constants seems like a violation of class hierarchy
https://veerasundar.com/blog/2012/04/java-constants-using-class-interface-static-imports/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We can use a constant class.

public static void copyData(ReadableByteChannel src, WritableByteChannel dest) throws IOException {
final ByteBuffer buffer = ByteBuffer.allocateDirect(Constants.BUFFER_SIZE);
int count = 0;
while ((count =src.read(buffer)) != -1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the src is slow and bytes are not coming as fast as expected? Should the loop wait for some time or continuously retry?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread will wait until data is available on the channel. If there is an error e.g connection is closed then it will return with error.

ReadableByteChannel rChannel = (ReadableByteChannel) src.getChannel();
WritableByteChannel dChannel = (WritableByteChannel) dst.getChannel();
try {
if (rChannel instanceof FileChannel) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there the only 3 cases? What if both src and dst channels are local file channels?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That case also handled. It only requires one to be FileChannel. It will transfer from first case.

* @param channel
* @param obj
*/
public void cacheChannel(Channel channel, Object obj) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the main intention of caching the channel? Do you think that the all channels will be implemented using NIO Channels?

* @param channel
* @return
*/
public Object getConnectorChannel(Channel channel) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What could be the edge cases of channels getting invalidated / timed out?

* This represents the output connector, where to write
* data from the application.
*/
public interface SinkConnector extends Connector{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Connector {

* This represents the input connector, where to read data
* from the application
*/
public interface SourceConnector extends Connector{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Connector {

expTimeMillis += S3Constants.CONNECTION_EXPIRE_TIME;
expiration.setTime(expTimeMillis);

GeneratePresignedUrlRequest generatePresignedUrlRequest = new GeneratePresignedUrlRequest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we use s3client.putObject method instead of this approach?

Copy link
Contributor Author

@isururanawaka isururanawaka Aug 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With pre signed URL method. we can provide IAM Roles and it may reduce security issues
https://medium.com/@aidan.hallett/securing-aws-s3-uploads-using-presigned-urls-aa821c13ae8d

Other reason is PutObject method requires Inputstream. For that we need to copy data from Outputstream to Input stream, using this method avoids that.

private AmazonS3 s3Client;

@Override
public boolean initiate(ConnectorConfig connectorConfig) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that you are trying to make the APIs generic as possible. But in some cases, it might cause the API to less readable. Fro an outsider, it would be hard to understand what are the properties that he should pass into connectorConfig as it contains generic key value pairs. Instead of using that one, why don't we come up configs specific for each connector with clearly defined properties. Say S3ConnectorConfig. Then the user exactly knows what to pass. That will reduce the reusability a bit but will improve the clarity of the API. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree. What I thought was Connector implementation is specific to its transport attributes and local to that. Hence, anyone who tries to read code should know the transport specific attributes. So I thought it won't be a problem. But it makes Connector decouple from implementation details.

@DImuthuUpe
Copy link
Contributor

Looks good to me. I'm going to merge this. Thanks @isururanawaka

@DImuthuUpe DImuthuUpe merged commit c2f432e into apache:master Aug 22, 2019
DImuthuUpe pushed a commit that referenced this pull request Apr 23, 2020
GCS transport implementation
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants