diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3ObjectIndex.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3ObjectIndex.java index 940d356d3b..bcc6f7c06e 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3ObjectIndex.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3ObjectIndex.java @@ -17,7 +17,6 @@ /** * Reference to an S3 object key Index patterns. - * */ public class S3ObjectIndex { @@ -36,8 +35,8 @@ public class S3ObjectIndex { S3ObjectIndex() { } - /* - Create Index with date,time with UniqueID prepended. + /** + * Create Index with date,time with UniqueID prepended. */ public static String getIndexAliasWithDate(final String indexAlias) { DateTimeFormatter dateFormatter = getDatePatternFormatter(indexAlias); @@ -45,8 +44,8 @@ public static String getIndexAliasWithDate(final String indexAlias) { return indexAlias.replaceAll(TIME_PATTERN_REGULAR_EXPRESSION, "") + suffix + UUID.randomUUID(); } - /* - Validate the index with the regular expression pattern. Throws exception if validation fails + /** + * Validate the index with the regular expression pattern. Throws exception if validation fails */ public static DateTimeFormatter getDatePatternFormatter(final String indexAlias) { final Pattern pattern = Pattern.compile(TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION); @@ -67,18 +66,18 @@ public static DateTimeFormatter getDatePatternFormatter(final String indexAlias) return null; } - /* - Data Prepper only allows time pattern as a suffix. - */ + /** + * Data Prepper only allows time pattern as a suffix. + */ private static void validateTimePatternIsAtTheEnd(final String indexAlias, final String timePattern) { if (!indexAlias.endsWith(timePattern + "}")) { throw new IllegalArgumentException("Time pattern can only be a suffix of an index."); } } - /* + /** * Special characters can cause failures in creating indexes. - * */ + */ private static final Set INVALID_CHARS = Set.of('#', '\\', '/', '*', '?', '"', '<', '>', '|', ',', ':'); public static void validateNoSpecialCharsInTimePattern(String timePattern) { boolean containsInvalidCharacter = timePattern.chars() @@ -89,7 +88,7 @@ public static void validateNoSpecialCharsInTimePattern(String timePattern) { } } - /* + /** * Validates the time pattern, support creating indexes with time patterns that are too granular hour, minute and second */ private static final Set UNSUPPORTED_TIME_GRANULARITY_CHARS = Set.of('A', 'n', 'N'); @@ -103,8 +102,8 @@ public static void validateTimePatternGranularity(String timePattern) { } } - /* - Returns the current UTC Date and Time + /** + * Returns the current UTC Date and Time */ public static ZonedDateTime getCurrentUtcTime() { return LocalDateTime.now().atZone(ZoneId.systemDefault()).withZoneSameInstant(UTC_ZONE_ID); diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java index 70c6978a25..3aeb40922a 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java @@ -25,58 +25,57 @@ /** * Implementation class of s3-sink plugin - * */ @DataPrepperPlugin(name = "s3", pluginType = Sink.class, pluginConfigurationType = S3SinkConfig.class) public class S3Sink extends AbstractSink> { - private static final Logger LOG = LoggerFactory.getLogger(S3Sink.class); - private static final int EVENT_QUEUE_SIZE = 100000; - private static final String IN_MEMORY = "in_memory"; - private static final String LOCAL_FILE = "local_file"; - - private final S3SinkConfig s3SinkConfig; - private S3SinkWorker worker; - private SinkAccumulator accumulator; + private static final Logger LOG = LoggerFactory.getLogger(S3Sink.class); + private static final int EVENT_QUEUE_SIZE = 100000; + private static final String IN_MEMORY = "in_memory"; + private static final String LOCAL_FILE = "local_file"; + + private final S3SinkConfig s3SinkConfig; + private S3SinkWorker worker; + private SinkAccumulator accumulator; private final Codec codec; + private final String storageType; private volatile boolean initialized; private static BlockingQueue eventQueue; - private static boolean isStopRequested; + private static volatile boolean isStopRequested; private Thread workerThread; - - /** - * * @param pluginSetting * @param s3SinkConfig * @param pluginFactory */ @DataPrepperPluginConstructor - public S3Sink(PluginSetting pluginSetting, final S3SinkConfig s3SinkConfig, final PluginFactory pluginFactory) { + public S3Sink(final PluginSetting pluginSetting, final S3SinkConfig s3SinkConfig, final PluginFactory pluginFactory) { super(pluginSetting); this.s3SinkConfig = s3SinkConfig; + storageType = s3SinkConfig.getTemporaryStorage(); final PluginModel codecConfiguration = s3SinkConfig.getCodec(); - final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings()); - codec = pluginFactory.loadPlugin(Codec.class, codecPluginSettings); + final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), + codecConfiguration.getPluginSettings()); + codec = pluginFactory.loadPlugin(Codec.class, codecPluginSettings); initialized = Boolean.FALSE; } @Override - public boolean isReady() { + public final boolean isReady() { return initialized; } @Override - public void doInitialize() { + public final void doInitialize() { try { doInitializeInternal(); } catch (InvalidPluginConfigurationException e) { - LOG.error("Failed to initialize S3-Sink."); + LOG.error("Failed to initialize S3-Sink.", e); this.shutdown(); throw new RuntimeException(e.getMessage(), e); } catch (Exception e) { - LOG.warn("Failed to initialize S3-Sink, retrying. Error - {} \n {}", e.getMessage(), e.getCause()); + LOG.warn("Failed to initialize S3-Sink, retrying. Error - {} ", e.getMessage(), e); } } @@ -91,12 +90,12 @@ private void doInitializeInternal() { } @Override - public void doOutput(final Collection> records) { + public final void doOutput(final Collection> records) { LOG.debug("Records size : {}", records.size()); if (records.isEmpty()) { return; } - + for (final Record recordData : records) { Event event = recordData.getData(); getEventQueue().add(event); @@ -104,7 +103,7 @@ public void doOutput(final Collection> records) { } @Override - public void shutdown() { + public final void shutdown() { super.shutdown(); isStopRequested = Boolean.TRUE; if (workerThread.isAlive()) { @@ -120,13 +119,16 @@ public static BlockingQueue getEventQueue() { public static boolean isStopRequested() { return isStopRequested; } - + + /** + * This {@link S3SinkWorkerRunner} keep listing event from {@link doOutput} + */ private class S3SinkWorkerRunner implements Runnable { @Override public void run() { try { while (!S3Sink.isStopRequested()) { - if (s3SinkConfig.getTemporaryStorage().equalsIgnoreCase(IN_MEMORY)) { + if (storageType.equalsIgnoreCase(IN_MEMORY)) { accumulator = worker.inMemmoryAccumulator(); } else { accumulator = worker.localFileAccumulator(); @@ -134,9 +136,7 @@ public void run() { accumulator.doAccumulate(); } } catch (Exception e) { - e.printStackTrace(); - LOG.error("Exception in S3Sink : \n Error message {} \n Exception cause {}", e.getMessage(), - e.getCause(), e); + LOG.error("Exception while runing S3SinkWorkerRunner : ", e); } } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java index 923964f349..1ee49be362 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java @@ -13,15 +13,12 @@ import jakarta.validation.Valid; import jakarta.validation.constraints.NotNull; -/* - An implementation class of s3 sink configuration +/** + * An implementation class of s3 sink configuration */ public class S3SinkConfig { - static final String DEFAULT_BUCKET_NAME = "dataprepper"; - static final String DEFAULT_PATH_PREFIX = "logdata"; - - static final String DEFAULT_TEMP_STORAGE = "local_file"; + static final String DEFAULT_TEMP_STORAGE = "local_file"; @JsonProperty("aws") @NotNull @@ -46,54 +43,54 @@ public class S3SinkConfig { @JsonProperty("bucket") @NotNull - private String bucketName = DEFAULT_BUCKET_NAME; + private String bucketName; @JsonProperty("key_path_prefix") @NotNull - private String keyPathPrefix = DEFAULT_PATH_PREFIX; + private String keyPathPrefix; - /* - Aws Authentication configuration Options + /** + * Aws Authentication configuration Options */ public AwsAuthenticationOptions getAwsAuthenticationOptions() { return awsAuthenticationOptions; } - /* - Threshold configuration Options + /** + * Threshold configuration Options */ public ThresholdOptions getThresholdOptions() { return thresholdOptions; } - /* - s3 index configuration Options + /** + * s3 index configuration Options */ public ObjectOptions getObjectOptions() { return objectOptions; } - /* - sink codec configuration Options + /** + * sink codec configuration Options */ public PluginModel getCodec() { return codec; } - /* - s3 index path configuration Option + /** + * s3 index path configuration Option */ public String getKeyPathPrefix() { return keyPathPrefix; } - /* - s3 bucket name configuration Option + /** + * s3 bucket name configuration Option */ public String getBucketName() { return bucketName; } - /* - Temporary storage location configuration Options + /** + * Temporary storage location configuration Options */ public String getTemporaryStorage() { return temporaryStorage; diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java index d897e2be46..6b79188994 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java @@ -20,6 +20,9 @@ public class S3SinkService { private final S3SinkConfig s3SinkConfig; private final S3Client s3Client; + /** + * @param s3SinkConfig + */ S3SinkService(final S3SinkConfig s3SinkConfig){ this.s3SinkConfig = s3SinkConfig; this.s3Client = createS3Client(); @@ -37,6 +40,9 @@ S3Client createS3Client() { .build(); } + /** + * @return s3Client object + */ public S3Client getS3Client() { return s3Client; } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java index 46474773d6..a8d25cbb70 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java @@ -16,6 +16,7 @@ import org.mapdb.DBMaker; import org.mapdb.Serializer; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.plugins.sink.accumulator.InMemoryAccumulator; import org.opensearch.dataprepper.plugins.sink.accumulator.LocalFileAccumulator; import org.opensearch.dataprepper.plugins.sink.accumulator.SinkAccumulator; @@ -32,31 +33,41 @@ * numEvents(event_count) must be always divided by 100 completely without any * remnant. * - * Ex. 1) if numEvents = 100 then numStreams = 2 and eventsPerChunk = 50 + * {@code LOAD_FACTOR} required to divide collections of records (numEvents) + * into streams + * + * Ex. 1) if numEvents = 100 then numStreams = 2 and eventsPerChunk = 50 * 2) if numEvents = 1000 then numStreams = 20 and eventsPerChunk = 50 */ public class S3SinkWorker { private static final Logger LOG = LoggerFactory.getLogger(S3SinkWorker.class); + /** + * {@code LOAD_FACTOR} required to divide collections of records into streams + */ private static final float LOAD_FACTOR = 0.02f; private final S3SinkService s3SinkService; private final S3SinkConfig s3SinkConfig; private final Codec codec; private SinkAccumulator accumulator; private final int numEvents; - private int numStreams; + private final ByteCount byteCapacity; + private final long duration; + private final int numStreams; private final int eventsPerChunk; /** - * * @param s3SinkService * @param s3SinkConfig */ public S3SinkWorker(final S3SinkService s3SinkService, final S3SinkConfig s3SinkConfig, final Codec codec) { - this.numEvents = s3SinkConfig.getThresholdOptions().getEeventCount(); this.s3SinkService = s3SinkService; this.s3SinkConfig = s3SinkConfig; this.codec = codec; + numEvents = s3SinkConfig.getThresholdOptions().getEventCount(); + byteCapacity = s3SinkConfig.getThresholdOptions().getByteCapacity(); + duration = s3SinkConfig.getThresholdOptions().getEventCollectionDuration().getSeconds(); + numStreams = (int) (numEvents * LOAD_FACTOR); eventsPerChunk = numEvents / numStreams; } @@ -93,13 +104,16 @@ public SinkAccumulator inMemmoryAccumulator() { streamCount++; } else { // Once threshold reached then No more streaming required per snapshot, hence - // terminate the streaming(outer) loop + // stop the streaming(outer) loop break; } } LOG.info( - "In-Memory snapshot info : Byte_count = {} Bytes \t Event_count = {} Records \t Event_collection_duration = {} sec & \t Number of stream {}", + "In-Memory snapshot info : Byte_count = {} Bytes " + + "\t Event_count = {} Records " + + "\t Event_collection_duration = {} sec & " + + "\t Number of stream {}", byteCount, eventCount, eventCollectionDuration, streamCount); accumulator = new InMemoryAccumulator(inMemoryEventMap, streamCount, s3SinkService, s3SinkConfig); @@ -135,7 +149,9 @@ public SinkAccumulator localFileAccumulator() { } db.commit(); LOG.info( - "Local-File snapshot info : Byte_count = {} Bytes, \t Event_count = {} Records \n & Event_collection_duration = {} Sec", + "Local-File snapshot info : Byte_count = {} Bytes, " + + "\t Event_count = {} Records " + + "\t & Event_collection_duration = {} Sec", byteCount, eventCount, eventCollectionDuration); accumulator = new LocalFileAccumulator(localFileEventSet, s3SinkService, s3SinkConfig, db); } catch (Exception e) { @@ -148,7 +164,7 @@ public SinkAccumulator localFileAccumulator() { * Bunch of events based on thresholds set in the configuration. The Thresholds * such as events count, bytes capacity and data collection duration. * - * @param i + * @param eventCount * @param watch * @param byteCount * @return @@ -156,8 +172,8 @@ public SinkAccumulator localFileAccumulator() { private boolean thresholdsCheck(int eventCount, StopWatch watch, int byteCount) { boolean flag = Boolean.FALSE; flag = eventCount < numEvents - && watch.getTime(TimeUnit.SECONDS) < s3SinkConfig.getThresholdOptions().getEventCollectionDuration() - && byteCount < s3SinkConfig.getThresholdOptions().getByteCapacity(); + && watch.getTime(TimeUnit.SECONDS) < duration + && byteCount < byteCapacity.getBytes(); return flag; } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/InMemoryAccumulator.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/InMemoryAccumulator.java index c8eb881f2a..11dfe16261 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/InMemoryAccumulator.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/InMemoryAccumulator.java @@ -19,6 +19,7 @@ import software.amazon.awssdk.services.s3.S3Client; /** + * An implementation of {@link SinkAccumulator}. * Upload accumulated data(in-memory) to amazon s3 */ public class InMemoryAccumulator implements SinkAccumulator { diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/LocalFileAccumulator.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/LocalFileAccumulator.java index e4effde647..6d1c96d1ec 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/LocalFileAccumulator.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/LocalFileAccumulator.java @@ -31,6 +31,7 @@ import software.amazon.awssdk.services.s3.waiters.S3Waiter; /** + * An implementation of {@link SinkAccumulator}. * Upload accumulated data(local-file) to amazon s3 */ public class LocalFileAccumulator implements SinkAccumulator { @@ -44,11 +45,9 @@ public class LocalFileAccumulator implements SinkAccumulator { private DB db = null; /** - * * @param localFileEventSet * @param s3SinkService * @param s3SinkConfig - * */ public LocalFileAccumulator(final NavigableSet localFileEventSet, final S3SinkService s3SinkService, final S3SinkConfig s3SinkConfig, DB db) { @@ -97,8 +96,7 @@ public void doAccumulate() { LOG.warn("Local file not deleted {}", fileAbsolutePath); } } catch (IOException e) { - LOG.error("Local file unable to deleted {}", fileAbsolutePath); - e.printStackTrace(); + LOG.error("Local file unable to deleted {}", fileAbsolutePath, e); } } if (db != null && !db.isClosed()) { diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/SinkAccumulator.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/SinkAccumulator.java index fff05ff275..5def7e4e06 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/SinkAccumulator.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/SinkAccumulator.java @@ -4,6 +4,9 @@ */ package org.opensearch.dataprepper.plugins.sink.accumulator; +/** + * {@link SinkAccumulator} Accumulate buffer records + */ public interface SinkAccumulator { void doAccumulate(); diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodec.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodec.java index 6f3762cdf1..486d98279b 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodec.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodec.java @@ -23,7 +23,7 @@ public class JsonCodec implements Codec { private final ObjectMapper objectMapper = new ObjectMapper(); - /* + /** * Generates a serialized json string of the Events */ @@ -40,7 +40,7 @@ public void parse(final OutputStream outputStream, final Collection awsStsHeaderOverrides; - /* - AWS Region configuration + /** + * AWS Region configuration */ public Region getAwsRegion() { return awsRegion != null ? Region.of(awsRegion) : null; } - /* - Aws Credentials Provider configuration + /** + * Aws Credentials Provider configuration */ public AwsCredentialsProvider authenticateAwsConfiguration() { diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java index 9f63bb31a8..e5b99726ef 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java @@ -6,11 +6,10 @@ package org.opensearch.dataprepper.plugins.sink.configuration; import com.fasterxml.jackson.annotation.JsonProperty; - import jakarta.validation.constraints.NotNull; /** - An implementation class of Threshold configuration Options + * An implementation class of Threshold configuration Options */ public class ObjectOptions { private static final String DEFAULT_KEY_PATTERN = "logs-${yyyy-MM-dd hh:mm:ss}"; @@ -19,8 +18,8 @@ public class ObjectOptions { @NotNull private String filePattern = DEFAULT_KEY_PATTERN; - /* - Read s3 object index file patten configuration + /** + * Read s3 object index file patten configuration */ public String getFilePattern() { return filePattern; diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java index b67bf21fd5..331a4a9d73 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java @@ -5,17 +5,25 @@ package org.opensearch.dataprepper.plugins.sink.configuration; +import java.time.Duration; +import java.time.format.DateTimeParseException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.opensearch.dataprepper.model.types.ByteCount; import com.fasterxml.jackson.annotation.JsonProperty; - import jakarta.validation.constraints.NotNull; /** - An implementation class of s3 index configuration Options + * An implementation class of s3 index configuration Options */ public class ThresholdOptions { - static final int DEFAULT_EVENT_COUNT = 200; - private static final long DEFAULT_BYTE_CAPACITY = 2500; - private static final long DEFAULT_TIMEOUT = 20; + + private static final String SIMPLE_DURATION_REGEX = "^(0|[1-9]\\d*)(s|ms)$"; + private static final Pattern SIMPLE_DURATION_PATTERN = Pattern.compile(SIMPLE_DURATION_REGEX); + + static final int DEFAULT_EVENT_COUNT = 2000; + private static final String DEFAULT_BYTE_CAPACITY = "50mb"; + private static final String DEFAULT_TIMEOUT = "180s"; @JsonProperty("event_count") @NotNull @@ -23,29 +31,68 @@ public class ThresholdOptions { @JsonProperty("byte_capacity") @NotNull - private long byteCapacity = DEFAULT_BYTE_CAPACITY; + private String byteCapacity = DEFAULT_BYTE_CAPACITY; @JsonProperty("event_collection_duration") - private long eventCollectionDuration = DEFAULT_TIMEOUT; + private String eventCollectionDuration = DEFAULT_TIMEOUT; + + /** + * Read event collection duration configuration + */ + public Duration getEventCollectionDuration() { - /* - Read event collection duration configuration - */ - public long getEventCollectionDuration() { - return eventCollectionDuration; + Duration duration; + try { + duration = Duration.parse(eventCollectionDuration); + } catch (final DateTimeParseException e) { + duration = parseSimpleDuration(eventCollectionDuration); + if (duration == null) { + throw new IllegalArgumentException("Durations must use either ISO 8601 notation or simple notations for seconds (60s) or milliseconds (100ms). Whitespace is ignored."); + } + } + return duration; } - /* - Read byte capacity configuration - */ - public long getByteCapacity() { - return byteCapacity; + /** + * Read byte capacity configuration + */ + public ByteCount getByteCapacity() { + return ByteCount.parse(byteCapacity); } - /* - Read the event count configuration - */ - public int getEeventCount() { + /** + * Read the event count configuration + */ + public int getEventCount() { return eventCount; } + + /** + * parse event duration configuration + */ + private Duration parseSimpleDuration(final String durationString) throws IllegalArgumentException { + final String durationStringNoSpaces = durationString.replaceAll("\\s", ""); + final Matcher matcher = SIMPLE_DURATION_PATTERN.matcher(durationStringNoSpaces); + if (!matcher.find()) { + return null; + } + + final long durationNumber = Long.parseLong(matcher.group(1)); + final String durationUnit = matcher.group(2); + + return getDurationFromUnitAndNumber(durationNumber, durationUnit); + } + + /** + * Return Duration in seconds/milliseconds of configuration Event Collection Duration + */ + private Duration getDurationFromUnitAndNumber(final long durationNumber, final String durationUnit) { + switch (durationUnit) { + case "s": + return Duration.ofSeconds(durationNumber); + case "ms": + return Duration.ofMillis(durationNumber); + } + return null; + } } diff --git a/data-prepper-plugins/s3-sink/src/main/resources/pipelines.yaml b/data-prepper-plugins/s3-sink/src/main/resources/pipelines.yaml index 6e51f4a11d..1b2d0409db 100644 --- a/data-prepper-plugins/s3-sink/src/main/resources/pipelines.yaml +++ b/data-prepper-plugins/s3-sink/src/main/resources/pipelines.yaml @@ -13,9 +13,9 @@ simple-sample-pipeline: object: file_pattern: logs-${yyyy-MM-dd} threshold: - event_count: 200 - byte_capacity: 2500 - event_collection_duration: 20 + event_count: 2000 + byte_capacity: 50mb + event_collection_duration: PT2M codec: json: temporary_storage: local_file \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfigTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfigTest.java index a41e78bfa8..7ac4bfead1 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfigTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfigTest.java @@ -46,7 +46,7 @@ void default_byte_capacity_test() { @Test void default_request_timeout_test() { - assertThat(new ThresholdOptions().getEeventCount(), equalTo(DEFAULT_EVENT_COUNT)); + assertThat(new ThresholdOptions().getEventCount(), equalTo(DEFAULT_EVENT_COUNT)); } @Test diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorkerTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorkerTest.java new file mode 100644 index 0000000000..3b604c449f --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorkerTest.java @@ -0,0 +1,63 @@ +package org.opensearch.dataprepper.plugins.sink; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.log.JacksonLog; +import org.opensearch.dataprepper.plugins.sink.accumulator.SinkAccumulator; +import org.opensearch.dataprepper.plugins.sink.codec.Codec; + +import software.amazon.awssdk.services.sqs.SqsClient; + +public class S3SinkWorkerTest { + private S3SinkWorker s3SinkWorker; + private S3SinkService s3SinkService; + private S3SinkConfig s3SinkConfig; + private Codec codec; + + @BeforeEach + void setUp() { + s3SinkService = mock(S3SinkService.class); + s3SinkConfig = mock(S3SinkConfig.class); + codec = mock(Codec.class); + + //when(s3SourceConfig.getSqsOptions()).thenReturn(sqsOptions); + when(s3SinkConfig.getThresholdOptions().getEventCount()).thenReturn(100); + + s3SinkWorker = new S3SinkWorker(s3SinkService, s3SinkConfig, codec); + } + + @Test + void inMemmoryAccumulatorTest() { + SinkAccumulator sinkAccumulator = s3SinkWorker.inMemmoryAccumulator(); + } + + private void addRecord() { + + Runnable runner = new Runnable() { + + @Override + public void run() { + + for (int i = 0; i < 200; i++) { + + final Map eventData = new HashMap<>(); + eventData.put("key1", "value"); + eventData.put("key2", "value"); + final JacksonEvent event = JacksonLog.builder().withData(eventData).withEventType("LOG").build(); + + //S3Sink.getEventQueue().add(); + + } + + } + }; + } +}