-
Notifications
You must be signed in to change notification settings - Fork 152
APEXMALHAR-2022 Developed S3 Output Module #483
APEXMALHAR-2022 Developed S3 Output Module #483
Conversation
if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) { | ||
return; | ||
} | ||
S3BlockMetaData metaData = blockInfo.get(tuple.getBlockId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having just blockId as the key here might cause problems since the way block ids are computed, blocks in different files may have the same block id. I'll suggest to have something like filename as well, as part of the key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. You are right. I made the changes as you suggested.
} | ||
processedBlocks.add(tuple.getBlockId()); | ||
long partSize = tuple.getRecord().length; | ||
PartETag pt = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename variable => partETag?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
* Send the CompleteMultipartUploadRequest to S3 if all the blocks of a file are uploaded into S3. | ||
* @param keyName file to upload into S3 | ||
*/ | ||
private void emitFileMerge(String keyName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Name seems misleading. Can we rename it to something like: verifyAndEmitFileMerge
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
return; | ||
} | ||
if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) { | ||
uploadedFiles.add(keyName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Files can get uploaded across windows. We cannot assume this keyName is uploaded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I fixed this.
uploadFileMetadata.getFileMetadata().getNumberOfBlocks() != partETags.size()) { | ||
return; | ||
} | ||
if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
During recovery, no need to do anything as the window is already processed.
We can move this if block to the beginning of the method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
} | ||
|
||
@Override | ||
public void committed(long windowId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we save the uploadedFiles
in the WAL every window? This will help us to clear up uploadParts
and fileMetaDatas
in the end window.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you missed this..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. I made the changes.
for (int i = 0; i < blocks.length; i++) { | ||
blockInfo.put(blocks[i], new S3BlockMetaData(tuple.getKeyName(), tuple.getUploadId(), i + 1)); | ||
} | ||
if (blocks.length < 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactor?
if (blocks.length > 0) { blockInfo.get(blocks[blocks.length - 1]).setLastBlock(true); }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
protected transient AmazonS3 s3Client; | ||
private transient Set<Long> processedBlocks; | ||
private transient long currentWindowId; | ||
private transient List<AbstractBlockReader.ReaderRecord<Slice>> recordData; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename to waitingTuples
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
/** | ||
* Process the blocks which are in wait state. | ||
*/ | ||
private void processWaitBlocks() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method seems idempotent. Can we call this during handleIdleTime()
as well? This may shorten the time spent in endWindow()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Blocks has to wait till it receives the file meta data. I added this method call after receives the file meta data of the waited block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think we should add it to handleIdleTime()
and remove it from the processUploadFileMetadata()
call.
Most of the times, it will have the meta data before it receives the actual data right? So only in some cases will we have the blocks waiting for meta data. Better to do it in the handleIdleTime
and endWindow
calls than blocking the operator thread to process waiting blocks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
if (processedBlocks.contains(tuple.getBlockId())) { | ||
return; | ||
} | ||
processedBlocks.add(tuple.getBlockId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need processedBlocks
. Can't we directly make changes to blockInfo
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes we can do. I added this because of there is an issue in FSInputModule that the same block id is emitting. PR for this fix is merged. I removed the "processedBlocks" as it is not needed.
a77a280
to
dc945a4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added a few more comments. Please check.
* Creates the empty object metadata for initiate multipart upload request. | ||
* @return the ObjectMetadata | ||
*/ | ||
public ObjectMetadata createObjectMetadata() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. By default it creates empty objectMetadata. User can override this and set the properties like encryption algorithm, etc. For more details, please refer below link:
http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/s3/model/ObjectMetadata.html
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.apex.malhar.lib.fs.s3output; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this to package: org.apache.apex.malhar.lib.io.fs.s3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chaithu14 should we have a separate maven module for this similar to kafka?
@tweise Suggestions?
private String secretAccessKey; | ||
private String endPoint; | ||
private Map<String, S3BlockMetaData> blockInfo = new HashMap<>(); | ||
private Map<Long, String> blockId2FilePath = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename to blockIdToFilePath
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The purpose of this seems to be for tracking whether the block has been received from the ``BlockMetaDatainput port. If so, can we change it to a
Set``` instead of a ```Map```?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. We need filepath of the received block to construct unique block id.
public final transient DefaultOutputPort<UploadBlockMetadata> output = new DefaultOutputPort<>(); | ||
|
||
/** | ||
* This input port receives incoming tuples. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
incoming tuples.
=> incoming tuples (Block Data)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
* @param tuple UploadFileMetadata | ||
*/ | ||
protected void processUploadFileMetadata(S3InitiateFileUpload.UploadFileMetadata tuple) | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we skip processing this if currentWindowId <= windowDataManager.getLargestCompletedWindow()
? Similar to other process calls..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This constructs the blockInfo Map and clearing this in replay call. So, I think it's better to process even if currentWindowId <= windowDataManager.getLargestCompletedWindow().
if (partETags.size() <= 1) { | ||
uploadedFiles.add(keyName); | ||
currentWindowRecoveryState.add(keyName); | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOG.debug("Uploaded file {} successfully", keyName);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
*/ | ||
protected void processUploadBlock(S3BlockUpload.UploadBlockMetadata tuple) | ||
{ | ||
List<PartETag> listOfUploads = uploadParts.get(tuple.getKeyName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Skip processing if windowId <= windowDataManager.getLargestCompletedWindow()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. Need to process this because I am not saving the uploadParts in WAL.
*/ | ||
protected void processFileMetadata(S3InitiateFileUpload.UploadFileMetadata tuple) | ||
{ | ||
String keyName = tuple.getKeyName(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Skip processing if windowId <= windowDataManager.getLargestCompletedWindow()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
} | ||
|
||
@Override | ||
public void committed(long windowId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you missed this..
@@ -0,0 +1,115 @@ | |||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tests seem to be excluded from the build.
Can you try looking for some embedded S3 server for testing? Or write mock tests which simulate the behavior of S3 in order to have the unit testing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added mock test
dc945a4
to
3fb799f
Compare
@NotNull | ||
private String secretAccessKey; | ||
private String endPoint; | ||
protected List<String> uploadedFiles = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this e transient?
@@ -193,6 +193,7 @@ | |||
<configuration> | |||
<excludes> | |||
<exclude>**/S3InputModuleAppTest.java</exclude> | |||
<exclude>**/S3OutputModuleAppTest.java</exclude> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we have the mock tests now, we can remove this test as it will not run anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
private static final String APPLICATION_PATH_PREFIX = "target/s3outputtest/"; | ||
private String applicationPath; | ||
private Attribute.AttributeMap.DefaultAttributeMap attributes; | ||
Context.OperatorContext context; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
PutObjectResult objResult = new PutObjectResult(); | ||
objResult.setETag("SuccessFullyUploaded"); | ||
|
||
UploadPartResult partResult = new UploadPartResult(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you try it out using multiple parts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be difficult to be done. Because it depends on UploadPartRequest and this is the passed argument to client.uploadPart().
LocalMode.Controller lc = lma.getController(); | ||
lc.setHeartbeatMonitoringEnabled(true); | ||
lc.runAsync(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Try using setExitCondition() as in https://github.com/apache/apex-malhar/blob/master/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -0,0 +1,54 @@ | |||
package org.apache.apex.malhar.lib.fs.s3output; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Headers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
import static org.mockito.Matchers.any; | ||
import static org.mockito.Mockito.when; | ||
|
||
public class S3OutputModuleMockTest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some docs on what is being validated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
import static org.mockito.Mockito.any; | ||
import static org.mockito.Mockito.when; | ||
|
||
public class S3InitiateUploadTest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some docs on what is being validated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
*/ | ||
private void verifyAndEmitFileMerge(String keyName) | ||
{ | ||
if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be moved to the processFileMetadata
and processBlockUpload
methods?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. This can't be moved to processFileMetadata and processBlockUpload methods. Because I am not saving the uploadParts and fileMetadatas in WAL.
/** | ||
* This operator can be used to upload the block into S3 bucket using multi-part feature or putObject API. | ||
* Upload the block into S3 using multi-part feature only if the number of blocks of a file is > 1. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to mention that this is useful in context of the S3 Output Module. Same for all operators in this module.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Also check the build - some rat check failure.. |
b57099e
to
91bb9f6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a couple of minor comments...
|
||
import static org.apache.apex.malhar.lib.fs.s3.S3OutputModuleMockTest.client; | ||
|
||
public class S3OutputModuleTest extends S3OutputModule |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Can you rename this class to S3OutputTestModule
?
Otherwise this seems to be a test class without any tests :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
}); | ||
lc.run(10000); | ||
|
||
lc.shutdown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need of shutdown()
; lc.run()
will shutdown automatically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. Updated
91bb9f6
to
6ab63bd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes look good to me...Will merge soon if there are no other comments.
*/ | ||
|
||
@InterfaceStability.Evolving | ||
public class S3BlockUpload implements Operator, Operator.CheckpointNotificationListener, Operator.IdleTimeHandler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please change file name to S3BlockUploadOperator or S3BlockUploader?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to S3BlockUploadOperator.
* This operator is useful in context of S3 Output Module. | ||
*/ | ||
@InterfaceStability.Evolving | ||
public class S3InitiateFileUpload implements Operator, Operator.CheckpointNotificationListener |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please change filename to S3FileUploadInitiator or S3InitiateFileUploadOperator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to S3InitiateFileUploadOperator.
* - S3BlockUpload | ||
* - S3FileMerger | ||
* | ||
* Initial BenchMark Results |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be very useful. Excellent.
* AWS access key | ||
*/ | ||
@NotNull | ||
private String accessKey; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we make properties for S3OutputModule consistent with S3InputModule.
Either we should take accessKey, secretAccessKey seperately for both the modules.
OR take URL input for both the modules.
There could be discussion around pros, cons of both the approaches. Thus, you might tackle it as separate JIRA.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I will create the JIRA for the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Request you to put link to new JIRA in reply to this comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created JIRA for this task: https://issues.apache.org/jira/browse/APEXMALHAR-2360
* @tags S3, Output | ||
*/ | ||
@InterfaceStability.Evolving | ||
public class S3OutputModule implements Module |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would recommend to move this as a separate pom project under malhar. Just like we have malhar-kafka, malhar-hive
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this under malhar library because the existing S3InputModule is under malhar library.
* Creates the number of instances of S3FileMerger operator. | ||
*/ | ||
@Min(1) | ||
private int noOfFileMergers = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in the input module we have property called readerCount.
To make it consistent, we should rename this property to mergerCount.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
return; | ||
} | ||
for (Map.Entry<String, UploadBlockMetadata> ubm: recoveredData.entrySet()) { | ||
output.emit(ubm.getValue()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid abbreviations in the variable names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
*/ | ||
private void processWaitBlocks() | ||
{ | ||
Iterator<AbstractBlockReader.ReaderRecord<Slice>> ite = waitingTuples.iterator(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid abbreviations in the variable names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
376d1eb
to
cb28681
Compare
Changes looks good to me. |
cb28681
to
a5e8fa3
Compare
No description provided.