From 515b7b60cb71ce42a33b3deed65eab9677adea04 Mon Sep 17 00:00:00 2001 From: deepaksahu562 Date: Thu, 2 Mar 2023 15:26:51 +0530 Subject: [PATCH] s3-sink - Code refactor (#1048) Signed-off-by: Deepak Sahu --- .../dataprepper/plugins/sink/S3Sink.java | 44 +++- .../plugins/sink/S3SinkConfig.java | 13 +- .../plugins/sink/S3SinkWorker.java | 78 +++--- .../ClosableQueue.java | 2 +- .../ConvertibleOutputStream.java | 8 +- .../ExecutorServiceResultsHandler.java | 2 +- .../InMemoryAccumulator.java | 16 +- .../IntegrityCheckException.java | 2 +- .../LocalFileAccumulator.java | 45 ++-- .../MultiPartOutputStream.java | 2 +- .../{ => accumulator}/SinkAccumulator.java | 2 +- .../{stream => accumulator}/StreamPart.java | 2 +- .../StreamTransferManager.java | 4 +- .../sink/{stream => accumulator}/Utils.java | 5 +- .../dataprepper/plugins/sink/codec/Codec.java | 7 +- .../plugins/sink/codec/JsonCodec.java | 13 +- .../AwsAuthenticationOptions.java | 52 ++-- .../sink/configuration/ObjectOptions.java | 3 +- .../sink/configuration/ThresholdOptions.java | 7 +- .../plugins/sink/S3ObjectIndexTests.java | 89 +++++++ .../plugins/sink/S3SinkConfigTest.java | 85 +++++++ .../plugins/sink/codec/JsonCodecTest.java | 136 +++++++++++ .../AwsAuthenticationOptionsTest.java | 230 ++++++++++++++++++ 23 files changed, 701 insertions(+), 146 deletions(-) rename data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/{stream => accumulator}/ClosableQueue.java (92%) rename data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/{stream => accumulator}/ConvertibleOutputStream.java (97%) rename data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/{stream => accumulator}/ExecutorServiceResultsHandler.java (98%) rename data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/{stream => accumulator}/InMemoryAccumulator.java (84%) rename data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/{stream => accumulator}/IntegrityCheckException.java (87%) rename data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/{stream => accumulator}/LocalFileAccumulator.java (80%) rename data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/{stream => accumulator}/MultiPartOutputStream.java (99%) rename data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/{ => accumulator}/SinkAccumulator.java (66%) rename data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/{stream => accumulator}/StreamPart.java (95%) rename data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/{stream => accumulator}/StreamTransferManager.java (98%) rename data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/{stream => accumulator}/Utils.java (91%) create mode 100644 data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3ObjectIndexTests.java create mode 100644 data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfigTest.java create mode 100644 data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodecTest.java create mode 100644 data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptionsTest.java diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java index b2cd509a5f..70c6978a25 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java @@ -18,12 +18,11 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.AbstractSink; import org.opensearch.dataprepper.model.sink.Sink; +import org.opensearch.dataprepper.plugins.sink.accumulator.SinkAccumulator; import org.opensearch.dataprepper.plugins.sink.codec.Codec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.ObjectMapper; - /** * Implementation class of s3-sink plugin * @@ -33,13 +32,19 @@ public class S3Sink extends AbstractSink> { private static final Logger LOG = LoggerFactory.getLogger(S3Sink.class); private static final int EVENT_QUEUE_SIZE = 100000; - + private static final String IN_MEMORY = "in_memory"; + private static final String LOCAL_FILE = "local_file"; + private final S3SinkConfig s3SinkConfig; + private S3SinkWorker worker; + private SinkAccumulator accumulator; + private final Codec codec; private volatile boolean initialized; private static BlockingQueue eventQueue; private static boolean isStopRequested; + private Thread workerThread; + - private final Codec codec; /** * @@ -78,8 +83,10 @@ public void doInitialize() { private void doInitializeInternal() { eventQueue = new ArrayBlockingQueue<>(EVENT_QUEUE_SIZE); S3SinkService s3SinkService = new S3SinkService(s3SinkConfig); - S3SinkWorker worker = new S3SinkWorker(s3SinkService, s3SinkConfig, codec); - new Thread(worker).start(); + worker = new S3SinkWorker(s3SinkService, s3SinkConfig, codec); + S3SinkWorkerRunner runner = new S3SinkWorkerRunner(); + workerThread = new Thread(runner); + workerThread.start(); initialized = Boolean.TRUE; } @@ -91,10 +98,8 @@ public void doOutput(final Collection> records) { } for (final Record recordData : records) { - Event event = recordData.getData(); getEventQueue().add(event); - } } @@ -102,6 +107,9 @@ public void doOutput(final Collection> records) { public void shutdown() { super.shutdown(); isStopRequested = Boolean.TRUE; + if (workerThread.isAlive()) { + workerThread.stop(); + } LOG.info("s3-sink sutdonwn completed"); } @@ -112,4 +120,24 @@ public static BlockingQueue getEventQueue() { public static boolean isStopRequested() { return isStopRequested; } + + private class S3SinkWorkerRunner implements Runnable { + @Override + public void run() { + try { + while (!S3Sink.isStopRequested()) { + if (s3SinkConfig.getTemporaryStorage().equalsIgnoreCase(IN_MEMORY)) { + accumulator = worker.inMemmoryAccumulator(); + } else { + accumulator = worker.localFileAccumulator(); + } + accumulator.doAccumulate(); + } + } catch (Exception e) { + e.printStackTrace(); + LOG.error("Exception in S3Sink : \n Error message {} \n Exception cause {}", e.getMessage(), + e.getCause(), e); + } + } + } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java index 280c03b358..923964f349 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java @@ -13,11 +13,16 @@ import jakarta.validation.Valid; import jakarta.validation.constraints.NotNull; -/** +/* An implementation class of s3 sink configuration */ public class S3SinkConfig { + static final String DEFAULT_BUCKET_NAME = "dataprepper"; + static final String DEFAULT_PATH_PREFIX = "logdata"; + + static final String DEFAULT_TEMP_STORAGE = "local_file"; + @JsonProperty("aws") @NotNull @Valid @@ -37,15 +42,15 @@ public class S3SinkConfig { @JsonProperty("temporary_storage") @NotNull - private String temporaryStorage; + private String temporaryStorage = DEFAULT_TEMP_STORAGE; @JsonProperty("bucket") @NotNull - private String bucketName; + private String bucketName = DEFAULT_BUCKET_NAME; @JsonProperty("key_path_prefix") @NotNull - private String keyPathPrefix; + private String keyPathPrefix = DEFAULT_PATH_PREFIX; /* Aws Authentication configuration Options diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java index 41b1b22901..46474773d6 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java @@ -4,6 +4,8 @@ */ package org.opensearch.dataprepper.plugins.sink; +import java.io.ByteArrayOutputStream; +import java.io.OutputStream; import java.util.HashMap; import java.util.HashSet; import java.util.NavigableSet; @@ -14,9 +16,10 @@ import org.mapdb.DBMaker; import org.mapdb.Serializer; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.plugins.sink.accumulator.InMemoryAccumulator; +import org.opensearch.dataprepper.plugins.sink.accumulator.LocalFileAccumulator; +import org.opensearch.dataprepper.plugins.sink.accumulator.SinkAccumulator; import org.opensearch.dataprepper.plugins.sink.codec.Codec; -import org.opensearch.dataprepper.plugins.sink.stream.InMemoryAccumulator; -import org.opensearch.dataprepper.plugins.sink.stream.LocalFileAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,60 +35,42 @@ * Ex. 1) if numEvents = 100 then numStreams = 2 and eventsPerChunk = 50 * 2) if numEvents = 1000 then numStreams = 20 and eventsPerChunk = 50 */ -public class S3SinkWorker implements Runnable { +public class S3SinkWorker { private static final Logger LOG = LoggerFactory.getLogger(S3SinkWorker.class); private static final float LOAD_FACTOR = 0.02f; - private static final String IN_MEMORY = "in_memory"; - private static final String LOCAL_FILE = "local_file"; - private final int numEvents; - private int numStreams; - private final int eventsPerChunk; private final S3SinkService s3SinkService; private final S3SinkConfig s3SinkConfig; private final Codec codec; - + private SinkAccumulator accumulator; + private final int numEvents; + private int numStreams; + private final int eventsPerChunk; + /** * * @param s3SinkService * @param s3SinkConfig */ - public S3SinkWorker(S3SinkService s3SinkService, S3SinkConfig s3SinkConfig, Codec codec) { + public S3SinkWorker(final S3SinkService s3SinkService, final S3SinkConfig s3SinkConfig, final Codec codec) { this.numEvents = s3SinkConfig.getThresholdOptions().getEeventCount(); - this.numStreams = (int) (numEvents * LOAD_FACTOR); - this.eventsPerChunk = numEvents / numStreams; this.s3SinkService = s3SinkService; this.s3SinkConfig = s3SinkConfig; this.codec = codec; - } - - @Override - public void run() { - try { - while (!S3Sink.isStopRequested()) { - if (s3SinkConfig.getTemporaryStorage().equalsIgnoreCase(IN_MEMORY)) { - inMemmoryAccumulator(); - } else { - localFileAccumulator(); - } - } - } catch (Exception e) { - e.printStackTrace(); - LOG.error("Exception in S3SinkWorker : \n Error message {} \n Exception cause {}", e.getMessage(), - e.getCause(), e); - } + numStreams = (int) (numEvents * LOAD_FACTOR); + eventsPerChunk = numEvents / numStreams; } /** * Accumulates data from buffer and store into in memory */ - public void inMemmoryAccumulator() { - HashSet inMemoryEventSet = null; - HashMap> inMemoryEventMap = null; + public SinkAccumulator inMemmoryAccumulator() { + HashSet inMemoryEventSet = null; + HashMap> inMemoryEventMap = null; + int streamCount = 0; try { StopWatch watch = new StopWatch(); watch.start(); - int streamCount = 0; int byteCount = 0; int eventCount = 0; long eventCollectionDuration = 0; @@ -96,7 +81,9 @@ public void inMemmoryAccumulator() { for (int data = 0; data < eventsPerChunk && thresholdsCheck(eventCount, watch, byteCount); data++, eventCount++) { Event event = S3Sink.getEventQueue().take(); - inMemoryEventSet.add(event); + OutputStream outPutStream = new ByteArrayOutputStream(); + codec.parse(outPutStream, event); + inMemoryEventSet.add(outPutStream.toString()); byteCount += event.toJsonString().getBytes().length; flag = Boolean.TRUE; eventCollectionDuration = watch.getTime(TimeUnit.SECONDS); @@ -115,16 +102,18 @@ public void inMemmoryAccumulator() { "In-Memory snapshot info : Byte_count = {} Bytes \t Event_count = {} Records \t Event_collection_duration = {} sec & \t Number of stream {}", byteCount, eventCount, eventCollectionDuration, streamCount); - //new InMemoryAccumulator(inMemoryEventMap, streamCount, s3SinkService, s3SinkConfig).doAccumulate(); + accumulator = new InMemoryAccumulator(inMemoryEventMap, streamCount, s3SinkService, s3SinkConfig); } catch (Exception e) { LOG.error("Exception while storing recoreds into In-Memory", e); } + + return accumulator; } /** * Accumulates data from buffer and store in local file */ - public void localFileAccumulator() { + public SinkAccumulator localFileAccumulator() { DB db = null; NavigableSet localFileEventSet = null; int byteCount = 0; @@ -136,9 +125,11 @@ public void localFileAccumulator() { db = DBMaker.memoryDB().make(); localFileEventSet = db.treeSet("mySet").serializer(Serializer.STRING).createOrOpen(); for (int data = 0; thresholdsCheck(data, watch, byteCount); data++) { - String event = S3Sink.getEventQueue().take().toJsonString(); - byteCount += event.getBytes().length; - localFileEventSet.add(event); + Event event = S3Sink.getEventQueue().take(); + OutputStream outPutStream = new ByteArrayOutputStream(); + codec.parse(outPutStream, event); + byteCount += event.toJsonString().getBytes().length; + localFileEventSet.add(outPutStream.toString()); eventCount++; eventCollectionDuration = watch.getTime(TimeUnit.SECONDS); } @@ -146,16 +137,11 @@ public void localFileAccumulator() { LOG.info( "Local-File snapshot info : Byte_count = {} Bytes, \t Event_count = {} Records \n & Event_collection_duration = {} Sec", byteCount, eventCount, eventCollectionDuration); - - //new LocalFileAccumulator(localFileEventSet, s3SinkService, s3SinkConfig).doAccumulate(); - + accumulator = new LocalFileAccumulator(localFileEventSet, s3SinkService, s3SinkConfig, db); } catch (Exception e) { LOG.error("Exception while storing recoreds into Local-file", e); - } finally { - if (db !=null && !db.isClosed()) { - db.close(); - } } + return accumulator; } /** diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/ClosableQueue.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/ClosableQueue.java similarity index 92% rename from data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/ClosableQueue.java rename to data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/ClosableQueue.java index 3c017ca350..68d0732913 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/ClosableQueue.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/ClosableQueue.java @@ -2,7 +2,7 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.stream; +package org.opensearch.dataprepper.plugins.sink.accumulator; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/ConvertibleOutputStream.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/ConvertibleOutputStream.java similarity index 97% rename from data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/ConvertibleOutputStream.java rename to data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/ConvertibleOutputStream.java index 76dfdd71be..b26ab84bcf 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/ConvertibleOutputStream.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/ConvertibleOutputStream.java @@ -2,10 +2,7 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.stream; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +package org.opensearch.dataprepper.plugins.sink.accumulator; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -13,6 +10,9 @@ import java.io.InputStream; import java.security.MessageDigest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * A ByteArrayOutputStream with some useful additional functionality. */ diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/ExecutorServiceResultsHandler.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/ExecutorServiceResultsHandler.java similarity index 98% rename from data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/ExecutorServiceResultsHandler.java rename to data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/ExecutorServiceResultsHandler.java index 40609ead9f..b94b41c6e7 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/ExecutorServiceResultsHandler.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/ExecutorServiceResultsHandler.java @@ -2,7 +2,7 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.stream; +package org.opensearch.dataprepper.plugins.sink.accumulator; import java.util.Iterator; import java.util.concurrent.Callable; diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/InMemoryAccumulator.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/InMemoryAccumulator.java similarity index 84% rename from data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/InMemoryAccumulator.java rename to data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/InMemoryAccumulator.java index 095a77ffd4..c8eb881f2a 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/InMemoryAccumulator.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/InMemoryAccumulator.java @@ -2,7 +2,7 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.stream; +package org.opensearch.dataprepper.plugins.sink.accumulator; import java.util.HashSet; import java.util.List; @@ -11,22 +11,20 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.plugins.sink.S3ObjectIndex; import org.opensearch.dataprepper.plugins.sink.S3SinkConfig; import org.opensearch.dataprepper.plugins.sink.S3SinkService; -import org.opensearch.dataprepper.plugins.sink.SinkAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.s3.S3Client; /** - * Accumulates data from buffer and store into in memory + * Upload accumulated data(in-memory) to amazon s3 */ public class InMemoryAccumulator implements SinkAccumulator { private static final Logger LOG = LoggerFactory.getLogger(InMemoryAccumulator.class); - final Map> inMemoryEventMap; + final Map> inMemoryEventMap; private final int numStreams; private final S3SinkService s3SinkService; private final S3SinkConfig s3SinkConfig; @@ -41,7 +39,7 @@ public class InMemoryAccumulator implements SinkAccumulator { * @param s3SinkService * @param s3SinkConfig */ - public InMemoryAccumulator(final Map> inMemoryEventMap, final int numStreams, + public InMemoryAccumulator(final Map> inMemoryEventMap, final int numStreams, final S3SinkService s3SinkService, final S3SinkConfig s3SinkConfig) { this.inMemoryEventMap = inMemoryEventMap; this.numStreams = numStreams; @@ -72,14 +70,14 @@ public void doAccumulate() { ExecutorService pool = Executors.newFixedThreadPool(numStreams); for (int streamsInput = 0; streamsInput < numStreams; streamsInput++) { final int streamIndex = streamsInput; - HashSet eventSet = inMemoryEventMap.get(streamsInput); + HashSet eventSet = inMemoryEventMap.get(streamsInput); pool.submit(new Runnable() { public void run() { try { MultiPartOutputStream outputStream = streams.get(streamIndex); if (eventSet != null) { - for (Event event : eventSet) { - outputStream.write(event.toJsonString().getBytes()); + for (String event : eventSet) { + outputStream.write(event.getBytes()); } } // The stream must be closed once all the data has been written diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/IntegrityCheckException.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/IntegrityCheckException.java similarity index 87% rename from data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/IntegrityCheckException.java rename to data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/IntegrityCheckException.java index ccaf095c7c..38d0ca9a65 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/IntegrityCheckException.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/IntegrityCheckException.java @@ -2,7 +2,7 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.stream; +package org.opensearch.dataprepper.plugins.sink.accumulator; /** * Thrown when final integrity check fails. It suggests that the multipart upload failed diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/LocalFileAccumulator.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/LocalFileAccumulator.java similarity index 80% rename from data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/LocalFileAccumulator.java rename to data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/LocalFileAccumulator.java index 0f75b1f4ba..e4effde647 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/LocalFileAccumulator.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/LocalFileAccumulator.java @@ -2,7 +2,7 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.stream; +package org.opensearch.dataprepper.plugins.sink.accumulator; import java.io.BufferedWriter; import java.io.File; @@ -13,54 +13,49 @@ import java.util.NavigableSet; import java.util.Optional; +import org.mapdb.DB; import org.opensearch.dataprepper.plugins.sink.S3ObjectIndex; import org.opensearch.dataprepper.plugins.sink.S3SinkConfig; import org.opensearch.dataprepper.plugins.sink.S3SinkService; -import org.opensearch.dataprepper.plugins.sink.SinkAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.AwsCredentials; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.core.waiters.WaiterResponse; -import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.HeadObjectRequest; import software.amazon.awssdk.services.s3.model.HeadObjectResponse; -import software.amazon.awssdk.services.s3.model.NoSuchUploadException; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.waiters.S3Waiter; /** - * - * @author de20436406 - * + * Upload accumulated data(local-file) to amazon s3 */ public class LocalFileAccumulator implements SinkAccumulator { private static final Logger LOG = LoggerFactory.getLogger(LocalFileAccumulator.class); private static final int MAX_RETRY = 3; private final S3SinkService s3SinkService; private final S3SinkConfig s3SinkConfig; - private final NavigableSet localFileEventSet; private String fileAbsolutePath = null; private String localFileName = null; + private DB db = null; /** * * @param localFileEventSet * @param s3SinkService * @param s3SinkConfig + * */ public LocalFileAccumulator(final NavigableSet localFileEventSet, final S3SinkService s3SinkService, - final S3SinkConfig s3SinkConfig) { + final S3SinkConfig s3SinkConfig, DB db) { this.localFileEventSet = localFileEventSet; this.s3SinkService = s3SinkService; this.s3SinkConfig = s3SinkConfig; + this.db = db; } @Override @@ -93,20 +88,28 @@ public void doAccumulate() { } catch (Exception e) { LOG.error("Unable to store {} into Amazon S3", localFileName, e); } finally { - try { - boolean isLocalFileDeleted = Files.deleteIfExists(Paths.get(fileAbsolutePath)); - if (isLocalFileDeleted) { - LOG.info("Local file deleted successfully {}", fileAbsolutePath); - } else { - LOG.warn("Local file not deleted {}", fileAbsolutePath); + if (fileAbsolutePath != null) { + try { + boolean isLocalFileDeleted = Files.deleteIfExists(Paths.get(fileAbsolutePath)); + if (isLocalFileDeleted) { + LOG.info("Local file deleted successfully {}", fileAbsolutePath); + } else { + LOG.warn("Local file not deleted {}", fileAbsolutePath); + } + } catch (IOException e) { + LOG.error("Local file unable to deleted {}", fileAbsolutePath); + e.printStackTrace(); } - } catch (IOException e) { - LOG.error("Local file unable to deleted {}", fileAbsolutePath); - e.printStackTrace(); + } + if (db != null && !db.isClosed()) { + db.close(); } } } + /** + * Upload accumulated data(local-file) to amazon s3 + */ @SuppressWarnings("finally") private boolean fileSaveToS3() { final String bucketName = s3SinkConfig.getBucketName(); diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/MultiPartOutputStream.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/MultiPartOutputStream.java similarity index 99% rename from data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/MultiPartOutputStream.java rename to data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/MultiPartOutputStream.java index 3ceb04fd99..7a8fedb925 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/MultiPartOutputStream.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/MultiPartOutputStream.java @@ -2,7 +2,7 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.stream; +package org.opensearch.dataprepper.plugins.sink.accumulator; import java.io.OutputStream; import java.util.concurrent.BlockingQueue; diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/SinkAccumulator.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/SinkAccumulator.java similarity index 66% rename from data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/SinkAccumulator.java rename to data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/SinkAccumulator.java index cdd5f4a669..fff05ff275 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/SinkAccumulator.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/SinkAccumulator.java @@ -2,7 +2,7 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink; +package org.opensearch.dataprepper.plugins.sink.accumulator; public interface SinkAccumulator { diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/StreamPart.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/StreamPart.java similarity index 95% rename from data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/StreamPart.java rename to data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/StreamPart.java index 45e231565f..4ff9d16f84 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/StreamPart.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/StreamPart.java @@ -2,7 +2,7 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.stream; +package org.opensearch.dataprepper.plugins.sink.accumulator; import java.io.InputStream; import java.util.Base64; diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/StreamTransferManager.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/StreamTransferManager.java similarity index 98% rename from data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/StreamTransferManager.java rename to data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/StreamTransferManager.java index d84c48285a..3a7032dabd 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/StreamTransferManager.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/StreamTransferManager.java @@ -2,7 +2,7 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.stream; +package org.opensearch.dataprepper.plugins.sink.accumulator; import java.util.ArrayList; import java.util.Collections; @@ -23,8 +23,6 @@ import software.amazon.awssdk.services.s3.model.CompletedPart; import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; -import software.amazon.awssdk.services.s3.model.ListMultipartUploadsRequest; -import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.UploadPartRequest; import software.amazon.awssdk.services.s3.model.UploadPartResponse; diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/Utils.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/Utils.java similarity index 91% rename from data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/Utils.java rename to data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/Utils.java index 7cfbcba90e..1bfcd6f554 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/Utils.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/Utils.java @@ -2,11 +2,8 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.stream; +package org.opensearch.dataprepper.plugins.sink.accumulator; -import java.lang.InterruptedException; -import java.lang.RuntimeException; -import java.lang.Thread; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/Codec.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/Codec.java index 488adffb1d..1f08f2c491 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/Codec.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/Codec.java @@ -5,13 +5,13 @@ package org.opensearch.dataprepper.plugins.sink.codec; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.record.Record; - import java.io.IOException; import java.io.OutputStream; import java.util.Collection; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; + /** * A codec parsing data through an output stream. Each implementation of this class should * support parsing a specific type or format of data. See sub-classes for examples. @@ -29,4 +29,5 @@ public interface Codec { void parse(OutputStream outputStream, Record eventCollection) throws IOException; void parse(OutputStream outputStream, Event event) throws IOException; + } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodec.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodec.java index cb13088b39..6f3762cdf1 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodec.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodec.java @@ -5,16 +5,17 @@ package org.opensearch.dataprepper.plugins.sink.codec; -import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; -import org.opensearch.dataprepper.model.event.Event; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.opensearch.dataprepper.model.record.Record; - import java.io.IOException; import java.io.OutputStream; import java.util.Collection; import java.util.Objects; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; + +import com.fasterxml.jackson.databind.ObjectMapper; + /** * An implementation of {@link Codec} which serializes to JSON. */ @@ -61,6 +62,6 @@ public void parse(OutputStream outputStream, Event event) throws IOException Objects.requireNonNull(event); objectMapper.writeValue(outputStream, event.toJsonString()); - + } } \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptions.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptions.java index d42f8502f0..3ac3e19cbe 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptions.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptions.java @@ -5,7 +5,11 @@ package org.opensearch.dataprepper.plugins.sink.configuration; +import java.util.Map; +import java.util.UUID; + import com.fasterxml.jackson.annotation.JsonProperty; + import jakarta.validation.constraints.Size; import software.amazon.awssdk.arns.Arn; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; @@ -15,9 +19,6 @@ import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; -import java.util.Map; -import java.util.UUID; - /** An implementation class AWS Authentication configuration */ @@ -47,32 +48,27 @@ public Region getAwsRegion() { public AwsCredentialsProvider authenticateAwsConfiguration() { final AwsCredentialsProvider awsCredentialsProvider; - if (awsStsRoleArn != null && !awsStsRoleArn.isEmpty()) { - try { - Arn.fromString(awsStsRoleArn); - } catch (final Exception e) { - throw new IllegalArgumentException("Invalid ARN format for awsStsRoleArn"); - } + if (awsStsRoleArn != null && !awsStsRoleArn.isEmpty()) { + try { + Arn.fromString(awsStsRoleArn); + } catch (final Exception e) { + throw new IllegalArgumentException("Invalid ARN format for awsStsRoleArn"); + } + + final StsClient stsClient = StsClient.builder().region(getAwsRegion()).build(); + + AssumeRoleRequest.Builder assumeRoleRequestBuilder = AssumeRoleRequest.builder() + .roleSessionName("S3-Sink-" + UUID.randomUUID()).roleArn(awsStsRoleArn); + + if (awsStsHeaderOverrides != null && !awsStsHeaderOverrides.isEmpty()) { + assumeRoleRequestBuilder = assumeRoleRequestBuilder.overrideConfiguration( + configuration -> awsStsHeaderOverrides.forEach(configuration::putHeader)); + } + + awsCredentialsProvider = StsAssumeRoleCredentialsProvider.builder().stsClient(stsClient) + .refreshRequest(assumeRoleRequestBuilder.build()).build(); - final StsClient stsClient = StsClient.builder() - .region(getAwsRegion()) - .build(); - - AssumeRoleRequest.Builder assumeRoleRequestBuilder = AssumeRoleRequest.builder() - .roleSessionName("S3-Sink-" + UUID.randomUUID()) - .roleArn(awsStsRoleArn); - - if(awsStsHeaderOverrides != null && !awsStsHeaderOverrides.isEmpty()) { - assumeRoleRequestBuilder = assumeRoleRequestBuilder - .overrideConfiguration(configuration -> awsStsHeaderOverrides.forEach(configuration::putHeader)); - } - - awsCredentialsProvider = StsAssumeRoleCredentialsProvider.builder() - .stsClient(stsClient) - .refreshRequest(assumeRoleRequestBuilder.build()) - .build(); - - } else { + } else { // use default credential provider awsCredentialsProvider = DefaultCredentialsProvider.create(); } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java index 540c066929..9f63bb31a8 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java @@ -6,13 +6,14 @@ package org.opensearch.dataprepper.plugins.sink.configuration; import com.fasterxml.jackson.annotation.JsonProperty; + import jakarta.validation.constraints.NotNull; /** An implementation class of Threshold configuration Options */ public class ObjectOptions { - private static final String DEFAULT_KEY_PATTERN = "logs-${YYYY-MM-DD hh:mm:ss}"; + private static final String DEFAULT_KEY_PATTERN = "logs-${yyyy-MM-dd hh:mm:ss}"; @JsonProperty("file_pattern") @NotNull diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java index 513a47aab1..b67bf21fd5 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java @@ -6,15 +6,16 @@ package org.opensearch.dataprepper.plugins.sink.configuration; import com.fasterxml.jackson.annotation.JsonProperty; + import jakarta.validation.constraints.NotNull; /** An implementation class of s3 index configuration Options */ public class ThresholdOptions { - static final int DEFAULT_EVENT_COUNT = 1000; - private static final long DEFAULT_BYTE_CAPACITY = 5000000; - private static final long DEFAULT_TIMEOUT = 60; + static final int DEFAULT_EVENT_COUNT = 200; + private static final long DEFAULT_BYTE_CAPACITY = 2500; + private static final long DEFAULT_TIMEOUT = 20; @JsonProperty("event_count") @NotNull diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3ObjectIndexTests.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3ObjectIndexTests.java new file mode 100644 index 0000000000..26c98e6e79 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3ObjectIndexTests.java @@ -0,0 +1,89 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink; + +import org.junit.Before; +import org.junit.jupiter.api.Test; +import java.io.IOException; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import static org.junit.Assert.*; +import static org.mockito.MockitoAnnotations.initMocks; + +public class S3ObjectIndexTests { + + @Before + public void setup() throws IOException { + initMocks(this); + + } + @Test + public void testIndexTimePatterns_not_equal() throws IllegalArgumentException{ + + String expectedIndex = S3ObjectIndex.getIndexAliasWithDate("logs-${yyyy-MM-dd}"); + String actualIndex = S3ObjectIndex.getIndexAliasWithDate("logs-${yyyy-MM-dd}"); + assertFalse(actualIndex.contains(expectedIndex)); + } + + @Test + public void testIndexTimePattern_Exceptional_time_TooGranular() throws IllegalArgumentException{ + assertThrows(IllegalArgumentException.class, () -> { + S3ObjectIndex.getDatePatternFormatter("logs-${yyyy-AA-dd}"); + }) ; + } + + @Test + public void testIndexTimePatterns_equal() throws IllegalArgumentException{ + + DateTimeFormatter expectedIndex = S3ObjectIndex.getDatePatternFormatter("logs-${yyyy-MM-dd}"); + DateTimeFormatter actualIndex = S3ObjectIndex.getDatePatternFormatter("logs-${yyyy-MM-dd}"); + assertEquals(actualIndex.toString(),expectedIndex.toString()); + } + + @Test + public void test_utc_current_time() throws IllegalArgumentException{ + + ZonedDateTime expectedIndex = S3ObjectIndex.getCurrentUtcTime(); + ZonedDateTime actualIndex = S3ObjectIndex.getCurrentUtcTime(); + + assertEquals(expectedIndex.getDayOfYear(),actualIndex.getDayOfYear()); + assertEquals(expectedIndex.getDayOfMonth(),actualIndex.getDayOfMonth()); + assertEquals(expectedIndex.getDayOfWeek(),actualIndex.getDayOfWeek()); + } + + @Test + public void testIndexTimePattern_Exceptional_TooGranular() throws IllegalArgumentException{ + assertThrows(IllegalArgumentException.class, () -> { + S3ObjectIndex.getDatePatternFormatter("logs-${yyyy-AA-dd hh:mm}"); + }) ; + } + + @Test + public void testIndexTimePattern_Exceptional_at_theEnd() throws IllegalArgumentException{ + assertThrows(IllegalArgumentException.class, () -> { + S3ObjectIndex.getDatePatternFormatter("test-${yyy{MM}dd}"); + }) ; + } + + @Test + public void testIndex_allows_one_date_time_pattern_Exceptional() throws IllegalArgumentException{ + assertThrows(IllegalArgumentException.class, () -> { + S3ObjectIndex.getDatePatternFormatter("logs-${yyyy-MM-dd}-${yyyy-MM-dd}"); + }) ; + } + + @Test + public void testIndex_nested_pattern_Exceptional() throws IllegalArgumentException{ + assertThrows(IllegalArgumentException.class, () -> { + S3ObjectIndex.getDatePatternFormatter("data-prepper-\\${\\${yyyy.MM.dd}}"); + }) ; + } + + @Test + public void testIndex_null_time_pattern() throws NullPointerException{ + assertNull(S3ObjectIndex.getDatePatternFormatter("data-prepper")); + } +} diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfigTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfigTest.java new file mode 100644 index 0000000000..a41e78bfa8 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfigTest.java @@ -0,0 +1,85 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink; + +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.plugins.sink.configuration.ObjectOptions; +import org.opensearch.dataprepper.plugins.sink.configuration.ThresholdOptions; + +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +class S3SinkConfigTest { + + static final String DEFAULT_BUCKET_NAME = "dataprepper"; + static final String DEFAULT_PATH_PREFIX = "logdata"; + + static final String DEFAULT_TEMP_STORAGE = "local_file"; + + public static int DEFAULT_EVENT_COUNT = 200; + public static final long DEFAULT_BYTE_CAPACITY = 2500; + public static final long DEFAULT_TIMEOUT = 20; + + public final String DEFAULT_KEY_PATTERN = "logs-${yyyy-MM-dd hh:mm:ss}"; + + @Test + void default_key_pattern_test() { + assertThat(new ObjectOptions().getFilePattern(), equalTo(DEFAULT_KEY_PATTERN)); + } + + @Test + void default_event_count_duration_test() { + assertThat(new ThresholdOptions().getEventCollectionDuration(), equalTo(DEFAULT_TIMEOUT)); + } + + @Test + void default_byte_capacity_test() { + assertThat(new ThresholdOptions().getByteCapacity(), equalTo(DEFAULT_BYTE_CAPACITY)); + } + + @Test + void default_request_timeout_test() { + assertThat(new ThresholdOptions().getEeventCount(), equalTo(DEFAULT_EVENT_COUNT)); + } + + @Test + void default_get_bucket_name_test() { + assertThat(new S3SinkConfig().getBucketName(), equalTo(DEFAULT_BUCKET_NAME)); + } + + @Test + void default_get_key_path_prefix_test() { + assertThat(new S3SinkConfig().getKeyPathPrefix(), equalTo(DEFAULT_PATH_PREFIX)); + } + + @Test + void default_get_temporary_storage() { + assertThat(new S3SinkConfig().getTemporaryStorage(), equalTo(DEFAULT_TEMP_STORAGE)); + } + + @Test + void default_get_threshold_options_in_sinkconfig_exception() { + assertNull(new S3SinkConfig().getThresholdOptions()); + } + + @Test + void default_get_AWS_Auth_options_in_sinkconfig_exception() { + assertNull(new S3SinkConfig().getAwsAuthenticationOptions()); + } + + @Test + void default_get_object_options_in_sinkconfig_exception() { + assertNull(new S3SinkConfig().getObjectOptions()); + } + + void default_get_plugin_model_options_in_sinkconfig_exception() { + assertNull(new S3SinkConfig().getCodec()); + } +} diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodecTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodecTest.java new file mode 100644 index 0000000000..7b495b34d1 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodecTest.java @@ -0,0 +1,136 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.codec; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.log.JacksonLog; +import org.opensearch.dataprepper.model.record.Record; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.io.*; +import java.util.*; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; + +class JsonCodecTest { + + private ObjectMapper objectMapper; + private Collection> eventCollection; + + @BeforeEach + void setUp() { + objectMapper = new ObjectMapper(); + eventCollection = mock(Collection.class); + } + + private JsonCodec createObjectUnderTest() { + return new JsonCodec(); + } + + @Test + void parse_with_null_OutputStream_throws() { + final JsonCodec objectUnderTest = createObjectUnderTest(); + + assertThrows(NullPointerException.class, () -> + objectUnderTest.parse(null, eventCollection)); + + verifyNoInteractions(eventCollection); + } + + @Test + void parse_with_null_record_event_collection_throws() { + final JsonCodec objectUnderTest = createObjectUnderTest(); + final Collection> jsonObjects = null; + final OutputStream outputStream = mock(OutputStream.class); + assertThrows(NullPointerException.class, () -> + objectUnderTest.parse(outputStream, jsonObjects)); + + verifyNoInteractions(outputStream); + } + + + @ParameterizedTest + @ValueSource(ints = {2}) + void parse_with_record_events_collection_outputstream_json_codec(final int numberOfObjects) throws IOException { + + final JsonCodec objectUnderTest = createObjectUnderTest(); + final Collection> jsonObjects = new LinkedList>(); + + for (int i = 0; i < numberOfObjects; i++) + jsonObjects.add(createRecord()); + + /* OutputStream outputStream = new ByteArrayOutputStream(); + createObjectUnderTest().parse(outputStream, jsonObjects); + assertNotNull(outputStream);*/ + + OutputStream outputStream = new ByteArrayOutputStream(); + createObjectUnderTest().parse(outputStream, jsonObjects); + + /* try { + OutputStream outputStream = new FileOutputStream("C:\\Wipro\\OpenSearch\\venkatfile.json",true); + createObjectUnderTest().parse(outputStream, jsonObjects); + outputStream.write(2048); + assertNotNull(outputStream); + } + catch(Exception ex){}*/ + } + + + @ParameterizedTest + @ValueSource(ints = {2}) + void parse_with_record_events_outputstream_json_codec(final int numberOfObjects) throws IOException { + + final JsonCodec objectUnderTest = createObjectUnderTest(); + final Record jsonObjects = createRecord(); + OutputStream outputStream = new ByteArrayOutputStream(); + + createObjectUnderTest().parse(outputStream, jsonObjects); + assertNotNull(outputStream); + } + + @ParameterizedTest + @ValueSource(ints = {2}) + void parse_with_events_outputstream_json_codec(final int numberOfObjects) throws IOException { + + final JsonCodec objectUnderTest = createObjectUnderTest(); + final Map eventData = new HashMap<>(); + eventData.put("key1", "value"); + eventData.put("key2", "value"); + final JacksonEvent event = JacksonLog.builder().withData(eventData).withEventType("LOG").build(); + + OutputStream outputStream = new ByteArrayOutputStream(); + createObjectUnderTest().parse(outputStream, event); + assertNotNull(outputStream); + } + + private static Record createRecord() { + + Map json = generateJson(); + final JacksonEvent event = JacksonLog.builder() + .withData(json) + .build(); + + return new Record<>(event); + } + + private static Map generateJson() { + final Map jsonObject = new LinkedHashMap<>(); + for (int i = 0; i < 7; i++) { + jsonObject.put(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + } + jsonObject.put(UUID.randomUUID().toString(), Arrays.asList(UUID.randomUUID().toString(), UUID.randomUUID().toString(), UUID.randomUUID().toString())); + + return jsonObject; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptionsTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptionsTest.java new file mode 100644 index 0000000000..f08ab9a7e2 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptionsTest.java @@ -0,0 +1,230 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.configuration; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.MockedStatic; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.services.sts.StsClientBuilder; +import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; + +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.Map; +import java.util.UUID; +import java.util.function.Consumer; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +class AwsAuthenticationOptionsTest { + + private AwsAuthenticationOptions awsAuthenticationOptions; + + @BeforeEach + void setUp() { + awsAuthenticationOptions = new AwsAuthenticationOptions(); + } + + @Test + void getAwsRegion_returns_Region_of() throws NoSuchFieldException, IllegalAccessException { + final String regionString = UUID.randomUUID().toString(); + final Region expectedRegionObject = mock(Region.class); + reflectivelySetField(awsAuthenticationOptions, "awsRegion", regionString); + final Region actualRegion; + try(final MockedStatic regionMockedStatic = mockStatic(Region.class)) { + regionMockedStatic.when(() -> Region.of(regionString)).thenReturn(expectedRegionObject); + actualRegion = awsAuthenticationOptions.getAwsRegion(); + } + assertThat(actualRegion, equalTo(expectedRegionObject)); + } + + @Test + void getAwsRegion_returns_null_when_region_is_null() throws NoSuchFieldException, IllegalAccessException { + reflectivelySetField(awsAuthenticationOptions, "awsRegion", null); + assertThat(awsAuthenticationOptions.getAwsRegion(), nullValue()); + } + + @Test + void authenticateAWSConfiguration_should_return_s3Client_without_sts_role_arn() throws NoSuchFieldException, IllegalAccessException { + reflectivelySetField(awsAuthenticationOptions, "awsRegion", "us-east-1"); + reflectivelySetField(awsAuthenticationOptions, "awsStsRoleArn", null); + + final DefaultCredentialsProvider mockedCredentialsProvider = mock(DefaultCredentialsProvider.class); + final AwsCredentialsProvider actualCredentialsProvider; + try (final MockedStatic defaultCredentialsProviderMockedStatic = mockStatic(DefaultCredentialsProvider.class)) { + defaultCredentialsProviderMockedStatic.when(DefaultCredentialsProvider::create) + .thenReturn(mockedCredentialsProvider); + actualCredentialsProvider = awsAuthenticationOptions.authenticateAwsConfiguration(); + } + + assertThat(actualCredentialsProvider, sameInstance(mockedCredentialsProvider)); + } + + @Nested + class WithSts { + private StsClient stsClient; + private StsClientBuilder stsClientBuilder; + + @BeforeEach + void setUp() { + stsClient = mock(StsClient.class); + stsClientBuilder = mock(StsClientBuilder.class); + + when(stsClientBuilder.build()).thenReturn(stsClient); + } + + @Test + void authenticateAWSConfiguration_should_return_s3Client_with_sts_role_arn() throws NoSuchFieldException, IllegalAccessException { + reflectivelySetField(awsAuthenticationOptions, "awsRegion", "us-east-1"); + reflectivelySetField(awsAuthenticationOptions, "awsStsRoleArn", "arn:aws:iam::123456789012:iam-role"); + + when(stsClientBuilder.region(Region.US_EAST_1)).thenReturn(stsClientBuilder); + final AssumeRoleRequest.Builder assumeRoleRequestBuilder = mock(AssumeRoleRequest.Builder.class); + when(assumeRoleRequestBuilder.roleSessionName(anyString())) + .thenReturn(assumeRoleRequestBuilder); + when(assumeRoleRequestBuilder.roleArn(anyString())) + .thenReturn(assumeRoleRequestBuilder); + + final AwsCredentialsProvider actualCredentialsProvider; + try (final MockedStatic stsClientMockedStatic = mockStatic(StsClient.class); + final MockedStatic assumeRoleRequestMockedStatic = mockStatic(AssumeRoleRequest.class)) { + stsClientMockedStatic.when(StsClient::builder).thenReturn(stsClientBuilder); + assumeRoleRequestMockedStatic.when(AssumeRoleRequest::builder).thenReturn(assumeRoleRequestBuilder); + actualCredentialsProvider = awsAuthenticationOptions.authenticateAwsConfiguration(); + } + + assertThat(actualCredentialsProvider, instanceOf(AwsCredentialsProvider.class)); + + verify(assumeRoleRequestBuilder).roleArn("arn:aws:iam::123456789012:iam-role"); + verify(assumeRoleRequestBuilder).roleSessionName(anyString()); + verify(assumeRoleRequestBuilder).build(); + verifyNoMoreInteractions(assumeRoleRequestBuilder); + } + + @Test + void authenticateAWSConfiguration_should_return_s3Client_with_sts_role_arn_when_no_region() throws NoSuchFieldException, IllegalAccessException { + reflectivelySetField(awsAuthenticationOptions, "awsRegion", null); + reflectivelySetField(awsAuthenticationOptions, "awsStsRoleArn", "arn:aws:iam::123456789012:iam-role"); + assertThat(awsAuthenticationOptions.getAwsRegion(), equalTo(null)); + + when(stsClientBuilder.region(null)).thenReturn(stsClientBuilder); + + final AwsCredentialsProvider actualCredentialsProvider; + try (final MockedStatic stsClientMockedStatic = mockStatic(StsClient.class)) { + stsClientMockedStatic.when(StsClient::builder).thenReturn(stsClientBuilder); + actualCredentialsProvider = awsAuthenticationOptions.authenticateAwsConfiguration(); + } + + assertThat(actualCredentialsProvider, instanceOf(AwsCredentialsProvider.class)); + } + + @Test + void authenticateAWSConfiguration_should_override_STS_Headers_when_HeaderOverrides_when_set() throws NoSuchFieldException, IllegalAccessException { + final String headerName1 = UUID.randomUUID().toString(); + final String headerValue1 = UUID.randomUUID().toString(); + final String headerName2 = UUID.randomUUID().toString(); + final String headerValue2 = UUID.randomUUID().toString(); + final Map overrideHeaders = Map.of(headerName1, headerValue1, headerName2, headerValue2); + + reflectivelySetField(awsAuthenticationOptions, "awsRegion", "us-east-1"); + reflectivelySetField(awsAuthenticationOptions, "awsStsRoleArn", "arn:aws:iam::123456789012:iam-role"); + reflectivelySetField(awsAuthenticationOptions, "awsStsHeaderOverrides", overrideHeaders); + + when(stsClientBuilder.region(Region.US_EAST_1)).thenReturn(stsClientBuilder); + + final AssumeRoleRequest.Builder assumeRoleRequestBuilder = mock(AssumeRoleRequest.Builder.class); + when(assumeRoleRequestBuilder.roleSessionName(anyString())) + .thenReturn(assumeRoleRequestBuilder); + when(assumeRoleRequestBuilder.roleArn(anyString())) + .thenReturn(assumeRoleRequestBuilder); + when(assumeRoleRequestBuilder.overrideConfiguration(any(Consumer.class))) + .thenReturn(assumeRoleRequestBuilder); + + final AwsCredentialsProvider actualCredentialsProvider; + try (final MockedStatic stsClientMockedStatic = mockStatic(StsClient.class); + final MockedStatic assumeRoleRequestMockedStatic = mockStatic(AssumeRoleRequest.class)) { + stsClientMockedStatic.when(StsClient::builder).thenReturn(stsClientBuilder); + assumeRoleRequestMockedStatic.when(AssumeRoleRequest::builder).thenReturn(assumeRoleRequestBuilder); + actualCredentialsProvider = awsAuthenticationOptions.authenticateAwsConfiguration(); + } + + assertThat(actualCredentialsProvider, instanceOf(AwsCredentialsProvider.class)); + + final ArgumentCaptor> configurationCaptor = ArgumentCaptor.forClass(Consumer.class); + + verify(assumeRoleRequestBuilder).roleArn("arn:aws:iam::123456789012:iam-role"); + verify(assumeRoleRequestBuilder).roleSessionName(anyString()); + verify(assumeRoleRequestBuilder).overrideConfiguration(configurationCaptor.capture()); + verify(assumeRoleRequestBuilder).build(); + verifyNoMoreInteractions(assumeRoleRequestBuilder); + + final Consumer actualOverride = configurationCaptor.getValue(); + + final AwsRequestOverrideConfiguration.Builder configurationBuilder = mock(AwsRequestOverrideConfiguration.Builder.class); + actualOverride.accept(configurationBuilder); + verify(configurationBuilder).putHeader(headerName1, headerValue1); + verify(configurationBuilder).putHeader(headerName2, headerValue2); + verifyNoMoreInteractions(configurationBuilder); + } + + @Test + void authenticateAWSConfiguration_should_not_override_STS_Headers_when_HeaderOverrides_are_empty() throws NoSuchFieldException, IllegalAccessException { + reflectivelySetField(awsAuthenticationOptions, "awsRegion", "us-east-1"); + reflectivelySetField(awsAuthenticationOptions, "awsStsRoleArn", "arn:aws:iam::123456789012:iam-role"); + reflectivelySetField(awsAuthenticationOptions, "awsStsHeaderOverrides", Collections.emptyMap()); + + when(stsClientBuilder.region(Region.US_EAST_1)).thenReturn(stsClientBuilder); + final AssumeRoleRequest.Builder assumeRoleRequestBuilder = mock(AssumeRoleRequest.Builder.class); + when(assumeRoleRequestBuilder.roleSessionName(anyString())) + .thenReturn(assumeRoleRequestBuilder); + when(assumeRoleRequestBuilder.roleArn(anyString())) + .thenReturn(assumeRoleRequestBuilder); + + final AwsCredentialsProvider actualCredentialsProvider; + try (final MockedStatic stsClientMockedStatic = mockStatic(StsClient.class); + final MockedStatic assumeRoleRequestMockedStatic = mockStatic(AssumeRoleRequest.class)) { + stsClientMockedStatic.when(StsClient::builder).thenReturn(stsClientBuilder); + assumeRoleRequestMockedStatic.when(AssumeRoleRequest::builder).thenReturn(assumeRoleRequestBuilder); + actualCredentialsProvider = awsAuthenticationOptions.authenticateAwsConfiguration(); + } + + assertThat(actualCredentialsProvider, instanceOf(AwsCredentialsProvider.class)); + + verify(assumeRoleRequestBuilder).roleArn("arn:aws:iam::123456789012:iam-role"); + verify(assumeRoleRequestBuilder).roleSessionName(anyString()); + verify(assumeRoleRequestBuilder).build(); + verifyNoMoreInteractions(assumeRoleRequestBuilder); + } + } + + private void reflectivelySetField(final AwsAuthenticationOptions awsAuthenticationOptions, final String fieldName, final Object value) throws NoSuchFieldException, IllegalAccessException { + final Field field = AwsAuthenticationOptions.class.getDeclaredField(fieldName); + try { + field.setAccessible(true); + field.set(awsAuthenticationOptions, value); + } finally { + field.setAccessible(false); + } + } +} \ No newline at end of file