From a5e8fa3facca750f5d7402c2c29e7cbabe53bd9e Mon Sep 17 00:00:00 2001 From: chaitanya Date: Wed, 30 Nov 2016 10:47:36 +0530 Subject: [PATCH] APEXMALHAR-2022 Development of S3 Output Module --- .../lib/fs/s3/S3BlockUploadOperator.java | 538 ++++++++++++++++++ .../apex/malhar/lib/fs/s3/S3FileMerger.java | 302 ++++++++++ .../fs/s3/S3InitiateFileUploadOperator.java | 378 ++++++++++++ .../apex/malhar/lib/fs/s3/S3OutputModule.java | 325 +++++++++++ .../s3/S3InitiateFileUploadOperatorTest.java | 119 ++++ .../lib/fs/s3/S3OutputModuleMockTest.java | 171 ++++++ .../malhar/lib/fs/s3/S3OutputTestModule.java | 72 +++ 7 files changed, 1905 insertions(+) create mode 100644 library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3BlockUploadOperator.java create mode 100644 library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3FileMerger.java create mode 100644 library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3InitiateFileUploadOperator.java create mode 100644 library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3OutputModule.java create mode 100644 library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3InitiateFileUploadOperatorTest.java create mode 100644 library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3OutputModuleMockTest.java create mode 100644 library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3OutputTestModule.java diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3BlockUploadOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3BlockUploadOperator.java new file mode 100644 index 0000000000..aafa3a75ba --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3BlockUploadOperator.java @@ -0,0 +1,538 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.fs.s3; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; +import org.apache.apex.malhar.lib.wal.WindowDataManager; +import org.apache.hadoop.classification.InterfaceStability; + +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PartETag; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.PutObjectResult; +import com.amazonaws.services.s3.model.UploadPartRequest; +import com.esotericsoftware.kryo.serializers.FieldSerializer; +import com.esotericsoftware.kryo.serializers.JavaSerializer; +import com.google.common.base.Preconditions; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.lib.io.block.AbstractBlockReader; +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.netlet.util.Slice; + +/** + * This operator can be used to upload the block into S3 bucket using multi-part feature or putObject API. + * Upload the block into S3 using multi-part feature only if the number of blocks of a file is > 1. + * This operator is useful in context of S3 Output Module. + */ + +@InterfaceStability.Evolving +public class S3BlockUploadOperator implements Operator, Operator.CheckpointNotificationListener, Operator.IdleTimeHandler +{ + private static final Logger LOG = LoggerFactory.getLogger(S3BlockUploadOperator.class); + @NotNull + private String bucketName; + @NotNull + private String accessKey; + @NotNull + private String secretAccessKey; + private String endPoint; + private Map blockInfo = new HashMap<>(); + private transient Map blockIdToFilePath = new HashMap<>(); + private WindowDataManager windowDataManager = new FSWindowDataManager(); + protected transient AmazonS3 s3Client; + private transient long currentWindowId; + private transient List> waitingTuples; + private transient Map currentWindowRecoveryState; + public final transient DefaultOutputPort output = new DefaultOutputPort<>(); + + /** + * This input port receives incoming tuple's(Block data). + */ + public final transient DefaultInputPort> blockInput = new DefaultInputPort>() + { + @Override + public void process(AbstractBlockReader.ReaderRecord tuple) + { + uploadBlockIntoS3(tuple); + } + }; + + /** + * Input port to receive Block meta data + */ + public final transient DefaultInputPort blockMetadataInput = new DefaultInputPort() + { + @Override + public void process(BlockMetadata.FileBlockMetadata blockMetadata) + { + if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) { + return; + } + blockIdToFilePath.put(blockMetadata.getBlockId(), blockMetadata.getFilePath()); + LOG.debug("received blockId {} for file {} ", blockMetadata.getBlockId(), blockMetadata.getFilePath()); + } + }; + + /** + * Input port to receive upload file meta data. + */ + public final transient DefaultInputPort uploadMetadataInput = new DefaultInputPort() + { + @Override + public void process(S3InitiateFileUploadOperator.UploadFileMetadata tuple) + { + processUploadFileMetadata(tuple); + } + }; + + /** + * Convert each block of a given file into S3BlockMetaData + * @param tuple UploadFileMetadata + */ + protected void processUploadFileMetadata(S3InitiateFileUploadOperator.UploadFileMetadata tuple) + { + long[] blocks = tuple.getFileMetadata().getBlockIds(); + String filePath = tuple.getFileMetadata().getFilePath(); + for (int i = 0; i < blocks.length; i++) { + String blockId = getUniqueBlockIdFromFile(blocks[i], filePath); + if (blockInfo.get(blockId) != null) { + break; + } + blockInfo.put(blockId, new S3BlockMetaData(tuple.getKeyName(), tuple.getUploadId(), i + 1)); + } + if (blocks.length > 0) { + blockInfo.get(getUniqueBlockIdFromFile(blocks[blocks.length - 1], filePath)).setLastBlock(true); + } + } + + /** + * Construct the unique block id from the given block id and file path. + * @param blockId Id of the block + * @param filepath given filepath + * @return unique block id + */ + public static String getUniqueBlockIdFromFile(long blockId, String filepath) + { + return Long.toString(blockId) + "|" + filepath; + } + + @Override + public void setup(Context.OperatorContext context) + { + waitingTuples = new ArrayList<>(); + currentWindowRecoveryState = new HashMap<>(); + windowDataManager.setup(context); + s3Client = createClient(); + } + + /** + * Create AmazonS3 client using AWS credentials + * @return AmazonS3 + */ + protected AmazonS3 createClient() + { + AmazonS3 client = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretAccessKey)); + if (endPoint != null) { + client.setEndpoint(endPoint); + } + return client; + } + + @Override + public void beginWindow(long windowId) + { + currentWindowId = windowId; + if (windowId <= windowDataManager.getLargestCompletedWindow()) { + replay(windowId); + } + } + + /** + * Replay the state. + * @param windowId replay window Id + */ + protected void replay(long windowId) + { + try { + @SuppressWarnings("unchecked") + Map recoveredData = (Map)windowDataManager.retrieve(windowId); + if (recoveredData == null) { + return; + } + for (Map.Entry uploadBlockMetadata: recoveredData.entrySet()) { + output.emit(uploadBlockMetadata.getValue()); + blockInfo.remove(uploadBlockMetadata.getKey()); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void endWindow() + { + if (waitingTuples.size() > 0) { + processWaitBlocks(); + } + + for (String uniqueblockId : currentWindowRecoveryState.keySet()) { + long blockId = Long.parseLong(uniqueblockId.substring(0, uniqueblockId.indexOf("|"))); + LOG.debug("Successfully uploaded {} block", blockId); + blockIdToFilePath.remove(blockId); + blockInfo.remove(uniqueblockId); + } + + if (blockIdToFilePath.size() > 0) { + for (Long blockId : blockIdToFilePath.keySet()) { + LOG.info("Unable to uploaded {} block", blockId); + } + blockIdToFilePath.clear(); + } + + if (currentWindowId > windowDataManager.getLargestCompletedWindow()) { + try { + windowDataManager.save(currentWindowRecoveryState, currentWindowId); + } catch (IOException e) { + throw new RuntimeException("Unable to save recovery", e); + } + } + currentWindowRecoveryState.clear(); + } + + @Override + public void teardown() + { + windowDataManager.teardown(); + } + + /** + * Process the blocks which are in wait state. + */ + private void processWaitBlocks() + { + Iterator> waitIterator = waitingTuples.iterator(); + + while (waitIterator.hasNext()) { + AbstractBlockReader.ReaderRecord blockData = waitIterator.next(); + String filePath = blockIdToFilePath.get(blockData.getBlockId()); + if (filePath != null && blockInfo.get(getUniqueBlockIdFromFile(blockData.getBlockId(), filePath)) != null) { + uploadBlockIntoS3(blockData); + waitIterator.remove(); + } + } + } + + /** + * Upload the block into S3 bucket. + * @param tuple block data + */ + protected void uploadBlockIntoS3(AbstractBlockReader.ReaderRecord tuple) + { + if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) { + return; + } + // Check whether the block metadata is present for this block + if (blockIdToFilePath.get(tuple.getBlockId()) == null) { + if (!waitingTuples.contains(tuple)) { + waitingTuples.add(tuple); + } + return; + } + String uniqueBlockId = getUniqueBlockIdFromFile(tuple.getBlockId(), blockIdToFilePath.get(tuple.getBlockId())); + S3BlockMetaData metaData = blockInfo.get(uniqueBlockId); + // Check whether the file metadata is received + if (metaData == null) { + if (!waitingTuples.contains(tuple)) { + waitingTuples.add(tuple); + } + return; + } + long partSize = tuple.getRecord().length; + PartETag partETag = null; + ByteArrayInputStream bis = new ByteArrayInputStream(tuple.getRecord().buffer); + // Check if it is a Single block of a file + if (metaData.isLastBlock && metaData.partNo == 1) { + ObjectMetadata omd = createObjectMetadata(); + omd.setContentLength(partSize); + PutObjectResult result = s3Client.putObject(new PutObjectRequest(bucketName, metaData.getKeyName(), bis, omd)); + partETag = new PartETag(1, result.getETag()); + } else { + // Else upload use multi-part feature + try { + // Create request to upload a part. + UploadPartRequest uploadRequest = new UploadPartRequest().withBucketName(bucketName).withKey(metaData.getKeyName()) + .withUploadId(metaData.getUploadId()).withPartNumber(metaData.getPartNo()).withInputStream(bis).withPartSize(partSize); + partETag = s3Client.uploadPart(uploadRequest).getPartETag(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + UploadBlockMetadata uploadmetadata = new UploadBlockMetadata(partETag, metaData.getKeyName()); + output.emit(uploadmetadata); + currentWindowRecoveryState.put(uniqueBlockId, uploadmetadata); + try { + bis.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Creates the empty object metadata for initiate multipart upload request. + * @return the ObjectMetadata + */ + public ObjectMetadata createObjectMetadata() + { + return new ObjectMetadata(); + } + + @Override + public void beforeCheckpoint(long windowId) + { + } + + @Override + public void checkpointed(long windowId) + { + } + + @Override + public void committed(long windowId) + { + try { + windowDataManager.committed(windowId); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void handleIdleTime() + { + if (waitingTuples.size() > 0) { + processWaitBlocks(); + } + } + + /** + * Upload block metadata consists of partETag and key name. + */ + public static class UploadBlockMetadata + { + @FieldSerializer.Bind(JavaSerializer.class) + private PartETag partETag; + private String keyName; + + // For Kryo + public UploadBlockMetadata() + { + } + + public UploadBlockMetadata(PartETag partETag, String keyName) + { + this.partETag = partETag; + this.keyName = keyName; + } + + /** + * Get the partETag of the block + * @return the partETag + */ + public PartETag getPartETag() + { + return partETag; + } + + /** + * Return the key name of the file + * @return key name + */ + public String getKeyName() + { + return keyName; + } + + @Override + public int hashCode() + { + return keyName.hashCode(); + } + } + + /** + * S3 Block meta data consists of keyname, upload Id, part number and whether the block is last block or not. + */ + public static class S3BlockMetaData + { + private String keyName; + private String uploadId; + private Integer partNo; + private boolean isLastBlock; + + // For Kryo Serialization + public S3BlockMetaData() + { + } + + public S3BlockMetaData(String keyName, String uploadId, Integer partNo) + { + this.keyName = keyName; + this.uploadId = uploadId; + this.partNo = partNo; + this.isLastBlock = false; + } + + /** + * Return the key name of the file + * @return key name + */ + public String getKeyName() + { + return keyName; + } + + /** + * Return the upload id of the block + * @return the uplaod id + */ + public String getUploadId() + { + return uploadId; + } + + /** + * Return the part number of the block + * @return the part number + */ + public Integer getPartNo() + { + return partNo; + } + + /** + * Specifies whether the block is last or not. + * @return isLastBlock + */ + public boolean isLastBlock() + { + return isLastBlock; + } + + /** + * Sets the block is last or not. + * @param lastBlock Specifies whether the block is last or not + */ + public void setLastBlock(boolean lastBlock) + { + isLastBlock = lastBlock; + } + } + + /** + * Returns the name of the bucket in which to upload the blocks. + * @return bucket name + */ + public String getBucketName() + { + return bucketName; + } + + /** + * Sets the name of the bucket in which to upload the blocks. + * @param bucketName bucket name + */ + public void setBucketName(@NotNull String bucketName) + { + this.bucketName = Preconditions.checkNotNull(bucketName); + } + + /** + * Return the AWS access key + * @return access key + */ + public String getAccessKey() + { + return accessKey; + } + + /** + * Sets the AWS access key + * @param accessKey access key + */ + public void setAccessKey(@NotNull String accessKey) + { + this.accessKey = Preconditions.checkNotNull(accessKey); + } + + /** + * Return the AWS access key + * @return access key + */ + public String getSecretAccessKey() + { + return secretAccessKey; + } + + /** + * Sets the AWS access key + * @param secretAccessKey access key + */ + public void setSecretAccessKey(@NotNull String secretAccessKey) + { + this.secretAccessKey = Preconditions.checkNotNull(secretAccessKey); + } + + /** + * Return the AWS S3 end point + * @return S3 end point + */ + public String getEndPoint() + { + return endPoint; + } + + /** + * Sets the AWS S3 end point + * @param endPoint S3 end point + */ + public void setEndPoint(String endPoint) + { + this.endPoint = endPoint; + } +} diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3FileMerger.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3FileMerger.java new file mode 100644 index 0000000000..96fbc29254 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3FileMerger.java @@ -0,0 +1,302 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.fs.s3; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; +import org.apache.apex.malhar.lib.wal.WindowDataManager; +import org.apache.hadoop.classification.InterfaceStability; + +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; +import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; +import com.amazonaws.services.s3.model.PartETag; +import com.esotericsoftware.kryo.serializers.FieldSerializer; +import com.esotericsoftware.kryo.serializers.JavaSerializer; +import com.google.common.base.Preconditions; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.Operator; +/** + * This operator can be used to merge the S3 blocks into a file. This operator will request for + * S3 CompleteMultipartUploadRequest once all the blocks are uploaded using multi-part feature. + * This operator is useful in context of S3 Output Module. + */ + +@InterfaceStability.Evolving +public class S3FileMerger implements Operator, Operator.CheckpointNotificationListener +{ + private static final Logger LOG = LoggerFactory.getLogger(S3FileMerger.class); + @NotNull + private String bucketName; + @NotNull + private String accessKey; + @NotNull + private String secretAccessKey; + private String endPoint; + protected transient List uploadedFiles = new ArrayList<>(); + private WindowDataManager windowDataManager = new FSWindowDataManager(); + @FieldSerializer.Bind(JavaSerializer.class) + private Map> uploadParts = new HashMap<>(); + private Map fileMetadatas = new HashMap<>(); + protected transient long currentWindowId; + protected transient AmazonS3 s3Client; + + /** + * Input port to receive UploadBlockMetadata + */ + public final transient DefaultInputPort uploadMetadataInput = new DefaultInputPort() + { + @Override + public void process(S3BlockUploadOperator.UploadBlockMetadata tuple) + { + processUploadBlock(tuple); + } + }; + + /** + * Process to merge the uploaded block into a file. + * @param tuple uploaded block meta data + */ + protected void processUploadBlock(S3BlockUploadOperator.UploadBlockMetadata tuple) + { + List listOfUploads = uploadParts.get(tuple.getKeyName()); + if (listOfUploads == null) { + listOfUploads = new ArrayList<>(); + uploadParts.put(tuple.getKeyName(), listOfUploads); + } + listOfUploads.add(tuple.getPartETag()); + if (fileMetadatas.get(tuple.getKeyName()) != null) { + verifyAndEmitFileMerge(tuple.getKeyName()); + } + } + + /** + * Input port to receive UploadFileMetadata + */ + public final transient DefaultInputPort filesMetadataInput = new DefaultInputPort() + { + @Override + public void process(S3InitiateFileUploadOperator.UploadFileMetadata tuple) + { + processFileMetadata(tuple); + } + }; + + /** + * Process to merge the uploaded blocks for the given file metadata. + * @param tuple file metadata + */ + protected void processFileMetadata(S3InitiateFileUploadOperator.UploadFileMetadata tuple) + { + String keyName = tuple.getKeyName(); + fileMetadatas.put(keyName, tuple); + if (uploadParts.get(keyName) != null) { + verifyAndEmitFileMerge(keyName); + } + } + + @Override + public void setup(Context.OperatorContext context) + { + windowDataManager.setup(context); + s3Client = createClient(); + } + + /** + * Create AmazonS3 client using AWS credentials + * @return AmazonS3 + */ + protected AmazonS3 createClient() + { + AmazonS3 client = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretAccessKey)); + if (endPoint != null) { + client.setEndpoint(endPoint); + } + return client; + } + + + @Override + public void beginWindow(long windowId) + { + currentWindowId = windowId; + } + + @Override + public void endWindow() + { + if (uploadedFiles.size() > 0) { + for (String keyName: uploadedFiles) { + uploadParts.remove(keyName); + fileMetadatas.remove(keyName); + } + uploadedFiles.clear(); + } + if (currentWindowId > windowDataManager.getLargestCompletedWindow()) { + try { + windowDataManager.save("Uploaded Files", currentWindowId); + } catch (IOException e) { + throw new RuntimeException("Unable to save recovery", e); + } + } + } + + @Override + public void teardown() + { + windowDataManager.teardown(); + } + + /** + * Send the CompleteMultipartUploadRequest to S3 if all the blocks of a file are uploaded into S3. + * @param keyName file to upload into S3 + */ + private void verifyAndEmitFileMerge(String keyName) + { + if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) { + return; + } + S3InitiateFileUploadOperator.UploadFileMetadata uploadFileMetadata = fileMetadatas.get(keyName); + List partETags = uploadParts.get(keyName); + if (partETags == null || uploadFileMetadata == null || + uploadFileMetadata.getFileMetadata().getNumberOfBlocks() != partETags.size()) { + return; + } + + if (partETags.size() <= 1) { + uploadedFiles.add(keyName); + LOG.debug("Uploaded file {} successfully", keyName); + return; + } + + CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest(bucketName, + keyName, uploadFileMetadata.getUploadId(), partETags); + CompleteMultipartUploadResult result = s3Client.completeMultipartUpload(compRequest); + if (result.getETag() != null) { + uploadedFiles.add(keyName); + LOG.debug("Uploaded file {} successfully", keyName); + } + } + + @Override + public void beforeCheckpoint(long windowId) + { + } + + @Override + public void checkpointed(long windowId) + { + } + + @Override + public void committed(long windowId) + { + try { + windowDataManager.committed(windowId); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Return the name of the bucket in which to upload the files + * @return name of the bucket + */ + public String getBucketName() + { + return bucketName; + } + + /** + * Sets the name of the bucket in which to upload the files. + * @param bucketName name of the bucket + */ + public void setBucketName(@NotNull String bucketName) + { + this.bucketName = Preconditions.checkNotNull(bucketName); + } + + /** + * Return the AWS access key + * @return the access key + */ + public String getAccessKey() + { + return accessKey; + } + + /** + * Sets the AWS access key + * @param accessKey AWS access key + */ + public void setAccessKey(@NotNull String accessKey) + { + this.accessKey = Preconditions.checkNotNull(accessKey); + } + + /** + * Returns the AWS secret access key + * @return AWS secret access key + */ + public String getSecretAccessKey() + { + return secretAccessKey; + } + + /** + * Sets the AWS secret access key + * @param secretAccessKey AWS secret access key + */ + public void setSecretAccessKey(@NotNull String secretAccessKey) + { + this.secretAccessKey = Preconditions.checkNotNull(secretAccessKey); + } + + /** + * Get the AWS S3 end point + * @return the AWS S3 end point + */ + public String getEndPoint() + { + return endPoint; + } + + /** + * Set the S3 end point + * @param endPoint end point + */ + public void setEndPoint(String endPoint) + { + this.endPoint = endPoint; + } +} diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3InitiateFileUploadOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3InitiateFileUploadOperator.java new file mode 100644 index 0000000000..3a38265af3 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3InitiateFileUploadOperator.java @@ -0,0 +1,378 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.fs.s3; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; +import org.apache.apex.malhar.lib.wal.WindowDataManager; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.Path; + +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.google.common.base.Preconditions; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.lib.io.fs.AbstractFileSplitter; + +/** + * This is an S3 Initiate file upload operator which can be used to initiate file upload and emits the upload id. + * Initiate the given file for upload only if the file contains more than one block. + * This operator is useful in context of S3 Output Module. + */ +@InterfaceStability.Evolving +public class S3InitiateFileUploadOperator implements Operator, Operator.CheckpointNotificationListener +{ + @NotNull + private String bucketName; + @NotNull + private String accessKey; + @NotNull + private String secretAccessKey; + private String endPoint; + @NotNull + private String outputDirectoryPath; + private WindowDataManager windowDataManager = new FSWindowDataManager(); + protected transient AmazonS3 s3Client; + protected transient long currentWindowId; + protected transient List currentWindowRecoveryState; + + public final transient DefaultOutputPort fileMetadataOutput = new DefaultOutputPort<>(); + + public final transient DefaultOutputPort uploadMetadataOutput = new DefaultOutputPort<>(); + + /** + * This input port receive file metadata and those files will be upload into S3. + */ + public final transient DefaultInputPort filesMetadataInput = new DefaultInputPort() + { + @Override + public void process(AbstractFileSplitter.FileMetadata tuple) + { + processTuple(tuple); + } + }; + + /** + * For the input file, initiate the upload and emit the UploadFileMetadata through the fileMetadataOutput, + * uploadMetadataOutput ports. + * @param tuple given tuple + */ + protected void processTuple(AbstractFileSplitter.FileMetadata tuple) + { + if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) { + return; + } + String keyName = getKeyName(tuple.getFilePath()); + String uploadId = ""; + if (tuple.getNumberOfBlocks() > 1) { + InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucketName, keyName); + initRequest.setObjectMetadata(createObjectMetadata()); + InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(initRequest); + uploadId = initResponse.getUploadId(); + } + UploadFileMetadata uploadFileMetadata = new UploadFileMetadata(tuple, uploadId, keyName); + fileMetadataOutput.emit(uploadFileMetadata); + uploadMetadataOutput.emit(uploadFileMetadata); + currentWindowRecoveryState.add(uploadFileMetadata); + } + + /** + * Creates the empty object metadata for initiate multipart upload request. + * @return the ObjectMetadata + */ + public ObjectMetadata createObjectMetadata() + { + return new ObjectMetadata(); + } + + @Override + public void setup(Context.OperatorContext context) + { + outputDirectoryPath = StringUtils.removeEnd(outputDirectoryPath, Path.SEPARATOR); + currentWindowRecoveryState = new ArrayList<>(); + windowDataManager.setup(context); + s3Client = createClient(); + } + + /** + * Create AmazonS3 client using AWS credentials + * @return AmazonS3 + */ + protected AmazonS3 createClient() + { + AmazonS3 client = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretAccessKey)); + if (endPoint != null) { + client.setEndpoint(endPoint); + } + return client; + } + + /** + * Generates the key name from the given file path and output directory path. + * @param filePath file path to upload + * @return key name for the given file + */ + private String getKeyName(String filePath) + { + return outputDirectoryPath + Path.SEPARATOR + StringUtils.removeStart(filePath, Path.SEPARATOR); + } + + @Override + public void beginWindow(long windowId) + { + currentWindowId = windowId; + if (windowId <= windowDataManager.getLargestCompletedWindow()) { + replay(windowId); + } + } + + @Override + public void endWindow() + { + if (currentWindowId > windowDataManager.getLargestCompletedWindow()) { + try { + windowDataManager.save(currentWindowRecoveryState, currentWindowId); + } catch (IOException e) { + throw new RuntimeException("Unable to save recovery", e); + } + } + currentWindowRecoveryState.clear(); + } + + @Override + public void teardown() + { + windowDataManager.teardown(); + } + + protected void replay(long windowId) + { + try { + @SuppressWarnings("unchecked") + List recoveredData = (List)windowDataManager.retrieve(windowId); + if (recoveredData != null) { + for (UploadFileMetadata upfm : recoveredData) { + uploadMetadataOutput.emit(upfm); + fileMetadataOutput.emit(upfm); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void beforeCheckpoint(long windowId) + { + + } + + @Override + public void checkpointed(long windowId) + { + + } + + @Override + public void committed(long windowId) + { + try { + windowDataManager.committed(windowId); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Return the name of the bucket in which to create the multipart upload. + * @return bucket name + */ + public String getBucketName() + { + return bucketName; + } + + /** + * Set the name of the bucket in which to create the multipart upload. + * @param bucketName bucket name + */ + public void setBucketName(@NotNull String bucketName) + { + this.bucketName = Preconditions.checkNotNull(bucketName); + } + + /** + * Return the AWS access key + * @return AWS access key + */ + public String getAccessKey() + { + return accessKey; + } + + /** + * Sets the AWS access key + * @param accessKey given access key + */ + public void setAccessKey(@NotNull String accessKey) + { + this.accessKey = Preconditions.checkNotNull(accessKey); + } + + /** + * Return the AWS secret access key + * @return the AWS secret access key + */ + public String getSecretAccessKey() + { + return secretAccessKey; + } + + /** + * Sets the AWS secret access key + * @param secretAccessKey secret access key + */ + public void setSecretAccessKey(@NotNull String secretAccessKey) + { + this.secretAccessKey = Preconditions.checkNotNull(secretAccessKey); + } + + /** + * Output directory path for the files to upload + * @return the output directory path + */ + public String getOutputDirectoryPath() + { + return outputDirectoryPath; + } + + /** + * Sets the output directory path for uploading new files. + * @param outputDirectoryPath output directory path + */ + public void setOutputDirectoryPath(@NotNull String outputDirectoryPath) + { + this.outputDirectoryPath = Preconditions.checkNotNull(outputDirectoryPath); + } + + /** + * Returns the window data manager. + * @return the windowDataManager + */ + public WindowDataManager getWindowDataManager() + { + return windowDataManager; + } + + /** + * Sets the window data manager + * @param windowDataManager given windowDataManager + */ + public void setWindowDataManager(@NotNull WindowDataManager windowDataManager) + { + this.windowDataManager = Preconditions.checkNotNull(windowDataManager); + } + + /** + * Returns the AWS S3 end point + * @return the S3 end point + */ + public String getEndPoint() + { + return endPoint; + } + + /** + * Sets the AWS S3 end point + * @param endPoint S3 end point + */ + public void setEndPoint(String endPoint) + { + this.endPoint = endPoint; + } + + /** + * A file upload metadata which contains file metadata, upload id, key name. + */ + public static class UploadFileMetadata + { + private AbstractFileSplitter.FileMetadata fileMetadata; + private String uploadId; + private String keyName; + + // For Kryo + public UploadFileMetadata() + { + } + + public UploadFileMetadata(AbstractFileSplitter.FileMetadata fileMetadata, String uploadId, String keyName) + { + this.fileMetadata = fileMetadata; + this.uploadId = uploadId; + this.keyName = keyName; + } + + @Override + public int hashCode() + { + return keyName.hashCode(); + } + + /** + * Returns the name of the key generated from file path. + * @return the key name + */ + public String getKeyName() + { + return keyName; + } + + /** + * Return the file metadata of a file. + * @return the fileMetadata + */ + public AbstractFileSplitter.FileMetadata getFileMetadata() + { + return fileMetadata; + } + + /** + * Returns the unique upload id of a file. + * @return the upload Id of a file + */ + public String getUploadId() + { + return uploadId; + } + } +} diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3OutputModule.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3OutputModule.java new file mode 100644 index 0000000000..6c3d8bed0d --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3OutputModule.java @@ -0,0 +1,325 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.fs.s3; + +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; + +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Module; +import com.datatorrent.common.partitioner.StatelessPartitioner; +import com.datatorrent.lib.io.block.AbstractBlockReader; +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.lib.io.fs.AbstractFileSplitter; +import com.datatorrent.netlet.util.Slice; + +import static com.datatorrent.api.Context.OperatorContext.TIMEOUT_WINDOW_COUNT; + +/** + * S3OutputModule can be used to upload the files/directory into S3. This module supports + * parallel uploads of multiple blocks of the same file and merge those blocks in sequence. + * + * Below operators are wrapped into single component using Module API + * - S3InitiateFileUploadOperator + * - S3BlockUploadOperator + * - S3FileMerger + * + * Initial BenchMark Results + * ------------------------- + * The Module writes 18 MB/s to S3 using multi part upload feature with the following configuration + * + * File Size = 1 GB + * Partition count of S3BlockUploadOperator = 6 + * Partition count of S3FileMerger = 1 + * Container memory size of this module as follows: + * S3InitiateFileUploadOperator = 1 GB + * S3BlockUploadOperator = 2.5 GB + * S3FileMerger = 2 GB + * + * + * @displayName S3 Output Module + * @tags S3, Output + */ +@InterfaceStability.Evolving +public class S3OutputModule implements Module +{ + /** + * AWS access key + */ + @NotNull + private String accessKey; + /** + * AWS secret access key + */ + @NotNull + private String secretAccessKey; + + /** + * S3 End point + */ + private String endPoint; + /** + * Name of the bucket in which to upload the files + */ + @NotNull + private String bucketName; + + /** + * Path of the output directory. Relative path of the files copied will be + * maintained w.r.t. source directory and output directory + */ + @NotNull + private String outputDirectoryPath; + + /** + * Specified as count of streaming windows. This value will set to the operators in this module because + * the operators in this module is mostly interacts with the Amazon S3. + * Due to this reason, window id of these operators might be lag behind with the upstream operators. + */ + @Min(120) + private int timeOutWindowCount = 6000; + + /** + * Creates the number of instances of S3FileMerger operator. + */ + @Min(1) + private int mergerCount = 1; + /** + * Input port for files metadata. + */ + public final transient ProxyInputPort filesMetadataInput = new ProxyInputPort(); + + /** + * Input port for blocks metadata + */ + public final transient ProxyInputPort blocksMetadataInput = new ProxyInputPort(); + + /** + * Input port for blocks data + */ + public final transient ProxyInputPort> blockData = new ProxyInputPort>(); + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + // DAG for S3 Output Module as follows: + // ---- S3InitiateFileUploadOperator -----| + // | S3FileMerger + // ---- S3BlockUploadOperator ------------| + + S3InitiateFileUploadOperator initiateUpload = dag.addOperator("InitiateUpload", createS3InitiateUpload()); + initiateUpload.setAccessKey(accessKey); + initiateUpload.setSecretAccessKey(secretAccessKey); + initiateUpload.setBucketName(bucketName); + initiateUpload.setOutputDirectoryPath(outputDirectoryPath); + + S3BlockUploadOperator blockUploader = dag.addOperator("BlockUpload", createS3BlockUpload()); + blockUploader.setAccessKey(accessKey); + blockUploader.setSecretAccessKey(secretAccessKey); + blockUploader.setBucketName(bucketName); + + S3FileMerger fileMerger = dag.addOperator("FileMerger", createS3FileMerger()); + fileMerger.setAccessKey(accessKey); + fileMerger.setSecretAccessKey(secretAccessKey); + fileMerger.setBucketName(bucketName); + + if (endPoint != null) { + initiateUpload.setEndPoint(endPoint); + blockUploader.setEndPoint(endPoint); + fileMerger.setEndPoint(endPoint); + } + + dag.setInputPortAttribute(blockUploader.blockInput, Context.PortContext.PARTITION_PARALLEL, true); + dag.setInputPortAttribute(blockUploader.blockMetadataInput, Context.PortContext.PARTITION_PARALLEL, true); + + dag.setAttribute(initiateUpload, TIMEOUT_WINDOW_COUNT, timeOutWindowCount); + dag.setAttribute(blockUploader, TIMEOUT_WINDOW_COUNT, timeOutWindowCount); + dag.setAttribute(fileMerger, TIMEOUT_WINDOW_COUNT, timeOutWindowCount); + dag.setUnifierAttribute(blockUploader.output, TIMEOUT_WINDOW_COUNT, timeOutWindowCount); + + dag.setAttribute(fileMerger,Context.OperatorContext.PARTITIONER, new StatelessPartitioner(mergerCount)); + // Add Streams + dag.addStream("InitiateUploadIDToMerger", initiateUpload.fileMetadataOutput, fileMerger.filesMetadataInput); + dag.addStream("InitiateUploadIDToWriter", initiateUpload.uploadMetadataOutput, blockUploader.uploadMetadataInput); + dag.addStream("WriterToMerger", blockUploader.output, fileMerger.uploadMetadataInput); + + // Set the proxy ports + filesMetadataInput.set(initiateUpload.filesMetadataInput); + blocksMetadataInput.set(blockUploader.blockMetadataInput); + blockData.set(blockUploader.blockInput); + } + + /** + * Create the S3InitiateFileUploadOperator for initiate upload + * @return S3InitiateFileUploadOperator + */ + protected S3InitiateFileUploadOperator createS3InitiateUpload() + { + return new S3InitiateFileUploadOperator(); + } + + /** + * Create the S3BlockUploadOperator for block upload into S3 bucket + * @return S3BlockUploadOperator + */ + protected S3BlockUploadOperator createS3BlockUpload() + { + return new S3BlockUploadOperator(); + } + + /** + * Create the S3FileMerger for sending complete request + * @return S3FileMerger + */ + protected S3FileMerger createS3FileMerger() + { + return new S3FileMerger(); + } + + /** + * Get the AWS access key + * @return AWS access key + */ + public String getAccessKey() + { + return accessKey; + } + + /** + * Set the AWS access key + * @param accessKey access key + */ + public void setAccessKey(String accessKey) + { + this.accessKey = accessKey; + } + + /** + * Return the AWS secret access key + * @return AWS secret access key + */ + public String getSecretAccessKey() + { + return secretAccessKey; + } + + /** + * Set the AWS secret access key + * @param secretAccessKey AWS secret access key + */ + public void setSecretAccessKey(String secretAccessKey) + { + this.secretAccessKey = secretAccessKey; + } + + + /** + * Get the name of the bucket in which to upload the files + * @return bucket name + */ + public String getBucketName() + { + return bucketName; + } + + /** + * Set the name of the bucket in which to upload the files + * @param bucketName name of the bucket + */ + public void setBucketName(String bucketName) + { + this.bucketName = bucketName; + } + + /** + * Return the S3 End point + * @return S3 End point + */ + public String getEndPoint() + { + return endPoint; + } + + /** + * Set the S3 End point + * @param endPoint S3 end point + */ + public void setEndPoint(String endPoint) + { + this.endPoint = endPoint; + } + + /** + * Get the path of the output directory. + * @return path of output directory + */ + public String getOutputDirectoryPath() + { + return outputDirectoryPath; + } + + /** + * Set the path of the output directory. + * @param outputDirectoryPath path of output directory + */ + public void setOutputDirectoryPath(String outputDirectoryPath) + { + this.outputDirectoryPath = outputDirectoryPath; + } + + /** + * Get the number of streaming windows for the operators which have stalled processing. + * @return the number of streaming windows + */ + public int getTimeOutWindowCount() + { + return timeOutWindowCount; + } + + /** + * Set the number of streaming windows. + * @param timeOutWindowCount given number of streaming windows for time out. + */ + public void setTimeOutWindowCount(int timeOutWindowCount) + { + this.timeOutWindowCount = timeOutWindowCount; + } + + /** + * Get the partition count of S3FileMerger operator + * @return the partition count + */ + public int getMergerCount() + { + return mergerCount; + } + + /** + * Set the partition count of S3FileMerger Operator + * @param mergerCount given partition count + */ + public void setMergerCount(int mergerCount) + { + this.mergerCount = mergerCount; + } +} diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3InitiateFileUploadOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3InitiateFileUploadOperatorTest.java new file mode 100644 index 0000000000..a077fb9ad6 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3InitiateFileUploadOperatorTest.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.fs.s3; + +import java.io.IOException; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.io.fs.AbstractFileSplitter; +import com.datatorrent.lib.testbench.CollectorTestSink; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.when; + +/** + * Testing the S3InitiateFileUploadOperator operator. It verifies the generated upload id by S3InitiateFileUploadOperator + * and client through the mock are same or not. + */ +public class S3InitiateFileUploadOperatorTest +{ + private String uploadId = "uploadfile1"; + private static final String APPLICATION_PATH_PREFIX = "target/s3outputtest/"; + private String applicationPath; + private Attribute.AttributeMap.DefaultAttributeMap attributes; + private Context.OperatorContext context; + @Mock + public AmazonS3 client; + @Mock + public AbstractFileSplitter.FileMetadata fileMetadata; + + public class S3InitiateFileUploadTest extends S3InitiateFileUploadOperator + { + @Override + protected AmazonS3 createClient() + { + return client; + } + } + + @Before + public void beforeTest() + { + applicationPath = OperatorContextTestHelper.getUniqueApplicationPath(APPLICATION_PATH_PREFIX); + attributes = new Attribute.AttributeMap.DefaultAttributeMap(); + attributes.put(DAG.APPLICATION_PATH, applicationPath); + context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes); + } + + @After + public void afterTest() + { + Path root = new Path(applicationPath); + try { + FileSystem fs = FileSystem.newInstance(root.toUri(), new Configuration()); + fs.delete(root, true); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Test + public void testInitiateUpload() + { + InitiateMultipartUploadResult result = new InitiateMultipartUploadResult(); + result.setUploadId(uploadId); + + MockitoAnnotations.initMocks(this); + when(client.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class))).thenReturn(result); + when(fileMetadata.getFilePath()).thenReturn("/tmp/file1.txt"); + when(fileMetadata.getNumberOfBlocks()).thenReturn(4); + + S3InitiateFileUploadTest operator = new S3InitiateFileUploadTest(); + operator.setBucketName("testbucket"); + operator.setup(context); + + CollectorTestSink fileSink = new CollectorTestSink<>(); + CollectorTestSink tmp = (CollectorTestSink)fileSink; + operator.fileMetadataOutput.setSink(tmp); + operator.beginWindow(0); + operator.processTuple(fileMetadata); + operator.endWindow(); + + S3InitiateFileUploadOperator.UploadFileMetadata emitted = (S3InitiateFileUploadOperator.UploadFileMetadata)tmp.collectedTuples.get(0); + Assert.assertEquals("Upload ID :", uploadId, emitted.getUploadId()); + } +} diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3OutputModuleMockTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3OutputModuleMockTest.java new file mode 100644 index 0000000000..8fe5ef92a1 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3OutputModuleMockTest.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.fs.s3; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.Callable; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; +import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.PutObjectResult; +import com.amazonaws.services.s3.model.UploadPartRequest; +import com.amazonaws.services.s3.model.UploadPartResult; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.io.fs.FSInputModule; +import com.datatorrent.stram.StramLocalCluster; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.when; + +/** + * Verifies the S3OutputModule using the application. This reads the data from local file system + * "input" directory and uploads the files into "output" directory. + */ +public class S3OutputModuleMockTest +{ + private String uploadId = "uploadfile"; + private static final String APPLICATION_PATH_PREFIX = "target/s3outputmocktest/"; + private static final String FILE_DATA = "Testing the S3OutputModule. This File has more data hence more blocks."; + private static final String FILE = "file.txt"; + private String inputDir; + private String outputDir; + private String applicationPath; + private File inputFile; + @Mock + public static AmazonS3 client; + + @Before + public void beforeTest() throws IOException + { + applicationPath = OperatorContextTestHelper.getUniqueApplicationPath(APPLICATION_PATH_PREFIX); + inputDir = applicationPath + File.separator + "input"; + outputDir = applicationPath + File.separator + "output"; + inputFile = new File(inputDir + File.separator + FILE); + FileUtils.writeStringToFile(inputFile, FILE_DATA); + } + + @After + public void afterTest() + { + Path root = new Path(applicationPath); + try { + FileSystem fs = FileSystem.newInstance(root.toUri(), new Configuration()); + fs.delete(root, true); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private CompleteMultipartUploadResult completeMultiPart() throws IOException + { + FileUtils.copyFile(inputFile, new File(outputDir + File.separator + FILE)); + CompleteMultipartUploadResult result = new CompleteMultipartUploadResult(); + result.setETag(outputDir); + return result; + } + + @Test + public void testS3OutputModule() throws Exception + { + InitiateMultipartUploadResult result = new InitiateMultipartUploadResult(); + result.setUploadId(uploadId); + + PutObjectResult objResult = new PutObjectResult(); + objResult.setETag("SuccessFullyUploaded"); + + UploadPartResult partResult = new UploadPartResult(); + partResult.setPartNumber(1); + partResult.setETag("SuccessFullyPartUploaded"); + + MockitoAnnotations.initMocks(this); + when(client.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class))).thenReturn(result); + when(client.putObject(any(PutObjectRequest.class))).thenReturn(objResult); + when(client.uploadPart(any(UploadPartRequest.class))).thenReturn(partResult); + when(client.completeMultipartUpload(any(CompleteMultipartUploadRequest.class))).thenReturn(completeMultiPart()); + + Application app = new S3OutputModuleMockTest.Application(); + Configuration conf = new Configuration(); + conf.set("dt.operator.HDFSInputModule.prop.files", inputDir); + conf.set("dt.operator.HDFSInputModule.prop.blockSize", "10"); + conf.set("dt.operator.HDFSInputModule.prop.blocksThreshold", "1"); + conf.set("dt.attr.CHECKPOINT_WINDOW_COUNT","20"); + + conf.set("dt.operator.S3OutputModule.prop.accessKey", "accessKey"); + conf.set("dt.operator.S3OutputModule.prop.secretAccessKey", "secretKey"); + conf.set("dt.operator.S3OutputModule.prop.bucketName", "bucketKey"); + conf.set("dt.operator.S3OutputModule.prop.outputDirectoryPath", outputDir); + + Path outDir = new Path("file://" + new File(outputDir).getAbsolutePath()); + final Path outputFilePath = new Path(outDir.toString() + File.separator + FILE); + final FileSystem fs = FileSystem.newInstance(outDir.toUri(), new Configuration()); + LocalMode lma = LocalMode.newInstance(); + lma.prepareDAG(app, conf); + LocalMode.Controller lc = lma.getController(); + lc.setHeartbeatMonitoringEnabled(true); + + ((StramLocalCluster)lc).setExitCondition(new Callable() + { + @Override + public Boolean call() throws Exception + { + return fs.exists(outputFilePath); + } + }); + lc.run(10000); + + Assert.assertTrue("output file exist", fs.exists(outputFilePath)); + } + + private static class Application implements StreamingApplication + { + @Override + public void populateDAG(DAG dag, Configuration conf) + { + FSInputModule inputModule = dag.addModule("HDFSInputModule", new FSInputModule()); + S3OutputTestModule outputModule = dag.addModule("S3OutputModule", new S3OutputTestModule()); + + dag.addStream("FileMetaData", inputModule.filesMetadataOutput, outputModule.filesMetadataInput); + dag.addStream("BlocksMetaData", inputModule.blocksMetadataOutput, outputModule.blocksMetadataInput) + .setLocality(DAG.Locality.CONTAINER_LOCAL); + dag.addStream("BlocksData", inputModule.messages, outputModule.blockData).setLocality(DAG.Locality.CONTAINER_LOCAL); + + } + } +} diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3OutputTestModule.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3OutputTestModule.java new file mode 100644 index 0000000000..f1cb291637 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3OutputTestModule.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.fs.s3; + +import com.amazonaws.services.s3.AmazonS3; + +import static org.apache.apex.malhar.lib.fs.s3.S3OutputModuleMockTest.client; + +public class S3OutputTestModule extends S3OutputModule +{ + + public static class S3InitiateFileUploadTest extends S3InitiateFileUploadOperator + { + @Override + protected AmazonS3 createClient() + { + return client; + } + } + + public static class S3BlockUploadTest extends S3BlockUploadOperator + { + @Override + protected AmazonS3 createClient() + { + return client; + } + } + + public static class S3FileMergerTest extends S3FileMerger + { + @Override + protected AmazonS3 createClient() + { + return client; + } + } + + @Override + protected S3InitiateFileUploadOperator createS3InitiateUpload() + { + return new S3InitiateFileUploadTest(); + } + + @Override + protected S3BlockUploadOperator createS3BlockUpload() + { + return new S3BlockUploadTest(); + } + + @Override + protected S3FileMerger createS3FileMerger() + { + return new S3FileMergerTest(); + } +}