From 121195818e39fe16014b87ba6b9707c662fc8e74 Mon Sep 17 00:00:00 2001 From: DE20436406 Date: Tue, 28 Feb 2023 19:56:12 +0530 Subject: [PATCH] Signed-off-by: deepaksahu562 deepak.sahu562@gmail.com Description Created "s3-sink" plugin. Github issue : #1048 Added Functionality Configurations for the bucket name, key path and key pattern. The key pattern support timestamps such as logs-${YYYY.mm}-${uniqueId}. Collection of objects from Buffer and store it in RAM/Local file. Check List New functionality s3-sink plugin. New functionality has been documented. New functionality has javadoc added. Commits are signed per the DCO using --signoff By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. For more information on following Developer Certificate of Origin and signing off your commits, please check https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md --- data-prepper-plugins/s3-sink/README.md | 9 + data-prepper-plugins/s3-sink/build.gradle | 67 +++++++ .../plugins/sink/S3ObjectIndex.java | 111 +++++++++++ .../dataprepper/plugins/sink/S3Sink.java | 116 ++++++++++++ .../plugins/sink/S3SinkConfig.java | 96 ++++++++++ .../plugins/sink/S3SinkService.java | 42 +++++ .../plugins/sink/S3SinkWorker.java | 174 ++++++++++++++++++ .../AwsAuthenticationOptions.java | 82 +++++++++ .../sink/configuration/ObjectOptions.java | 27 +++ .../sink/configuration/ThresholdOptions.java | 50 +++++ .../s3-sink/src/main/resources/pipelines.yaml | 21 +++ settings.gradle | 1 + 12 files changed, 796 insertions(+) create mode 100644 data-prepper-plugins/s3-sink/README.md create mode 100644 data-prepper-plugins/s3-sink/build.gradle create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3ObjectIndex.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptions.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java create mode 100644 data-prepper-plugins/s3-sink/src/main/resources/pipelines.yaml diff --git a/data-prepper-plugins/s3-sink/README.md b/data-prepper-plugins/s3-sink/README.md new file mode 100644 index 0000000000..1fbdefe20a --- /dev/null +++ b/data-prepper-plugins/s3-sink/README.md @@ -0,0 +1,9 @@ +Attached simple-sample-pipeline: opensearch-data-prepper\data-prepper\data-prepper-plugins\s3-sink\src\main\resources\pipelines.yaml + +Functional Requirements +1 Provide a mechanism to received events from buffer then process and write to s3. +2 Codecs encode the events into the desired format based on the configuration. +3 Flush the encoded events into s3 bucket as objects. +4 Object name based on the key-pattern. +5 Object length depends on the thresholds provided in the configuration. +6 The Thresholds such as events count, bytes capacity and data collection duration. diff --git a/data-prepper-plugins/s3-sink/build.gradle b/data-prepper-plugins/s3-sink/build.gradle new file mode 100644 index 0000000000..acfa7bfff6 --- /dev/null +++ b/data-prepper-plugins/s3-sink/build.gradle @@ -0,0 +1,67 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +plugins { + id 'java' +} + +repositories { + mavenCentral() +} + +dependencies { + implementation project(':data-prepper-api') + implementation 'io.micrometer:micrometer-core' + implementation 'software.amazon.awssdk:s3' + implementation 'software.amazon.awssdk:sts' + implementation 'software.amazon.awssdk:sqs' + implementation 'com.fasterxml.jackson.core:jackson-core' + implementation 'com.fasterxml.jackson.core:jackson-databind' + implementation 'org.apache.commons:commons-compress:1.21' + implementation 'joda-time:joda-time:2.11.1' + implementation 'org.hibernate.validator:hibernate-validator:7.0.5.Final' + implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv' + implementation 'software.amazon.awssdk:aws-sdk-java:2.17.148' + implementation 'org.mapdb:mapdb:3.0.8' + testImplementation 'org.apache.commons:commons-lang3:3.12.0' + testImplementation project(':data-prepper-test-common') +} + +test { + useJUnitPlatform() +} + +sourceSets { + integrationTest { + java { + compileClasspath += main.output + test.output + runtimeClasspath += main.output + test.output + srcDir file('src/integrationTest/java') + } + resources.srcDir file('src/integrationTest/resources') + } +} + +configurations { + integrationTestImplementation.extendsFrom testImplementation + integrationTestRuntime.extendsFrom testRuntime +} + +task integrationTest(type: Test) { + group = 'verification' + testClassesDirs = sourceSets.integrationTest.output.classesDirs + + useJUnitPlatform() + + classpath = sourceSets.integrationTest.runtimeClasspath + systemProperty 'tests.s3source.bucket', System.getProperty('tests.s3source.bucket') + systemProperty 'tests.s3source.region', System.getProperty('tests.s3source.region') + systemProperty 'tests.s3source.queue.url', System.getProperty('tests.s3source.queue.url') + + filter { + includeTestsMatching '*IT' + } +} + diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3ObjectIndex.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3ObjectIndex.java new file mode 100644 index 0000000000..31a173a621 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3ObjectIndex.java @@ -0,0 +1,111 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Set; +import java.util.TimeZone; +import java.util.UUID; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Reference to an S3 object key Index patterns. + */ + +public class S3ObjectIndex { + + private static final String TIME_PATTERN_STARTING_SYMBOLS = "\\${"; + + //For matching a string that begins with a "${" and ends with a "}". + //For a string like "data-prepper-${yyyy-MM-dd}", "${yyyy-MM-dd}" is matched. + private static final String TIME_PATTERN_REGULAR_EXPRESSION = "\\$\\{.*?\\}"; + + //For matching a string enclosed by "%{" and "}". + //For a string like "data-prepper-${yyyy-MM}", "yyyy-MM" is matched. + private static final String TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION = "\\$\\{(.*?)\\}"; + + private static final ZoneId UTC_ZONE_ID = ZoneId.of(TimeZone.getTimeZone("UTC").getID()); + + S3ObjectIndex() { } + + /* + Create Index with date,time with UniqueID prepended. + */ + public static String getIndexAliasWithDate(final String indexAlias) { + DateTimeFormatter dateFormatter = getDatePatternFormatter(indexAlias); + String suffix = (dateFormatter != null) ? dateFormatter.format(getCurrentUtcTime()) : ""; + return indexAlias.replaceAll(TIME_PATTERN_REGULAR_EXPRESSION, "") + suffix + UUID.randomUUID(); + } + + /* + Validate the index with the regular expression pattern. Throws exception if validation fails + */ + public static DateTimeFormatter getDatePatternFormatter(final String indexAlias) { + final Pattern pattern = Pattern.compile(TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION); + final Matcher timePatternMatcher = pattern.matcher(indexAlias); + if (timePatternMatcher.find()) { + final String timePattern = timePatternMatcher.group(1); + if (timePatternMatcher.find()) { // check if there is a one more match. + throw new IllegalArgumentException("An index only allows one date-time pattern."); + } + if(timePattern.contains(TIME_PATTERN_STARTING_SYMBOLS)){ //check if it is a nested pattern such as "data-prepper-%{%{yyyy.MM.dd}}" + throw new IllegalArgumentException("An index doesn't allow nested date-time patterns."); + } + validateTimePatternIsAtTheEnd(indexAlias, timePattern); + validateNoSpecialCharsInTimePattern(timePattern); + validateTimePatternGranularity(timePattern); + return DateTimeFormatter.ofPattern(timePattern); + } + return null; + } + + /* + Data Prepper only allows time pattern as a suffix. + */ + private static void validateTimePatternIsAtTheEnd(final String indexAlias, final String timePattern) { + if (!indexAlias.endsWith(timePattern + "}")) { + throw new IllegalArgumentException("Time pattern can only be a suffix of an index."); + } + } + + /* + * Special characters can cause failures in creating indexes. + * */ + private static final Set INVALID_CHARS = Set.of('#', '\\', '/', '*', '?', '"', '<', '>', '|', ',', ':'); + public static void validateNoSpecialCharsInTimePattern(String timePattern) { + boolean containsInvalidCharacter = timePattern.chars() + .mapToObj(c -> (char) c) + .anyMatch(character -> INVALID_CHARS.contains(character)); + if (containsInvalidCharacter) { + throw new IllegalArgumentException("Index time pattern contains one or multiple special characters: " + INVALID_CHARS); + } + } + + /* + * Validates the time pattern, support creating indexes with time patterns that are too granular hour, minute and second + */ + private static final Set UNSUPPORTED_TIME_GRANULARITY_CHARS = Set.of('A', 'n', 'N'); + public static void validateTimePatternGranularity(String timePattern) { + boolean containsUnsupportedTimeSymbol = timePattern.chars() + .mapToObj(c -> (char) c) + .anyMatch(character -> UNSUPPORTED_TIME_GRANULARITY_CHARS.contains(character)); + if (containsUnsupportedTimeSymbol) { + throw new IllegalArgumentException("Index time pattern contains time patterns that are less than one hour: " + + UNSUPPORTED_TIME_GRANULARITY_CHARS); + } + } + + /* + Returns the current UTC Date and Time + */ + public static ZonedDateTime getCurrentUtcTime() { + return LocalDateTime.now().atZone(ZoneId.systemDefault()).withZoneSameInstant(UTC_ZONE_ID); + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java new file mode 100644 index 0000000000..9708283648 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java @@ -0,0 +1,116 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink; + +import java.util.Collection; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.AbstractSink; +import org.opensearch.dataprepper.model.sink.Sink; +//import org.opensearch.dataprepper.plugins.sink.codec.Codec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * Implementation class of s3-sink plugin + * + */ +@DataPrepperPlugin(name = "s3", pluginType = Sink.class, pluginConfigurationType = S3SinkConfig.class) +public class S3Sink extends AbstractSink> { + + private static final Logger LOG = LoggerFactory.getLogger(S3Sink.class); + private static final int EVENT_QUEUE_SIZE = 100000; + + private final S3SinkConfig s3SinkConfig; + private volatile boolean initialized; + private static BlockingQueue eventQueue; + private static boolean isStopRequested; + + //private final Codec codec; + private final ObjectMapper objectMapper = new ObjectMapper(); + + /** + * + * @param pluginSetting + * @param s3SinkConfig + * @param pluginFactory + */ + @DataPrepperPluginConstructor + public S3Sink(PluginSetting pluginSetting, final S3SinkConfig s3SinkConfig, final PluginFactory pluginFactory) { + super(pluginSetting); + this.s3SinkConfig = s3SinkConfig; + final PluginModel codecConfiguration = s3SinkConfig.getCodec(); + final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings()); + //codec = pluginFactory.loadPlugin(Codec.class, codecPluginSettings); + initialized = Boolean.FALSE; + } + + @Override + public boolean isReady() { + return initialized; + } + + @Override + public void doInitialize() { + try { + doInitializeInternal(); + } catch (InvalidPluginConfigurationException e) { + LOG.error("Failed to initialize S3-Sink."); + this.shutdown(); + throw new RuntimeException(e.getMessage(), e); + } catch (Exception e) { + LOG.warn("Failed to initialize S3-Sink, retrying. Error - {} \n {}", e.getMessage(), e.getCause()); + } + } + + private void doInitializeInternal() { + eventQueue = new ArrayBlockingQueue<>(EVENT_QUEUE_SIZE); + S3SinkService s3SinkService = new S3SinkService(s3SinkConfig); + S3SinkWorker worker = new S3SinkWorker(s3SinkService, s3SinkConfig); + new Thread(worker).start(); + initialized = Boolean.TRUE; + } + + @Override + public void doOutput(final Collection> records) { + LOG.debug("Records size : {}", records.size()); + if (records.isEmpty()) { + return; + } + + for (final Record recordData : records) { + + Event event = recordData.getData(); + getEventQueue().add(event); + + } + } + + @Override + public void shutdown() { + super.shutdown(); + isStopRequested = Boolean.TRUE; + LOG.info("s3-sink sutdonwn completed"); + } + + public static BlockingQueue getEventQueue() { + return eventQueue; + } + + public static boolean isStopRequested() { + return isStopRequested; + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java new file mode 100644 index 0000000000..718ce504b2 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java @@ -0,0 +1,96 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.plugins.sink.configuration.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.sink.configuration.ObjectOptions; +import org.opensearch.dataprepper.plugins.sink.configuration.ThresholdOptions; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotNull; + +/* + An implementation class of s3 sink configuration + */ +public class S3SinkConfig { + + @JsonProperty("aws") + @NotNull + @Valid + private AwsAuthenticationOptions awsAuthenticationOptions; + + @JsonProperty("threshold") + @NotNull + private ThresholdOptions thresholdOptions; + + @JsonProperty("object") + @NotNull + private ObjectOptions objectOptions; + + @JsonProperty("codec") + @NotNull + private PluginModel codec; + + @JsonProperty("temporary_storage") + @NotNull + private String temporaryStorage; + + @JsonProperty("bucket") + @NotNull + private String bucketName; + + @JsonProperty("key_path_prefix") + @NotNull + private String keyPathPrefix; + + /* + Aws Authentication configuration Options + */ + public AwsAuthenticationOptions getAwsAuthenticationOptions() { + return awsAuthenticationOptions; + } + + /* + Threshold configuration Options + */ + public ThresholdOptions getThresholdOptions() { + return thresholdOptions; + } + + /* + s3 index configuration Options + */ + public ObjectOptions getObjectOptions() { + return objectOptions; + } + + /* + sink codec configuration Options + */ + public PluginModel getCodec() { return codec; } + + /* + s3 index path configuration Option + */ + public String getKeyPathPrefix() { + return keyPathPrefix; + } + + /* + s3 bucket name configuration Option + */ + public String getBucketName() { + return bucketName; + } + + /* + Temporary storage location configuration Options + */ + public String getTemporaryStorage() { + return temporaryStorage; + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java new file mode 100644 index 0000000000..711a03d621 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.retry.RetryPolicy; +import software.amazon.awssdk.services.s3.S3Client; + +public class S3SinkService { + + private static final Logger LOG = LoggerFactory.getLogger(S3SinkService.class); + + private final S3SinkConfig s3SinkConfig; + private final S3Client s3Client; + + S3SinkService(final S3SinkConfig s3SinkConfig){ + this.s3SinkConfig = s3SinkConfig; + this.s3Client = createS3Client(); + } + + + S3Client createS3Client() { + LOG.info("Creating S3 client"); + return S3Client.builder() + .region(s3SinkConfig.getAwsAuthenticationOptions().getAwsRegion()) + .credentialsProvider(s3SinkConfig.getAwsAuthenticationOptions().authenticateAwsConfiguration()) + .overrideConfiguration(ClientOverrideConfiguration.builder() + .retryPolicy(RetryPolicy.builder().numRetries(5).build()) + .build()) + .build(); + } + + public S3Client getS3Client() { + return s3Client; + } + +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java new file mode 100644 index 0000000000..7840e4224d --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java @@ -0,0 +1,174 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.NavigableSet; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.time.StopWatch; +import org.mapdb.DB; +import org.mapdb.DBMaker; +import org.mapdb.Serializer; +import org.opensearch.dataprepper.model.event.Event; +//import org.opensearch.dataprepper.plugins.sink.stream.InMemoryAccumulator; +//import org.opensearch.dataprepper.plugins.sink.stream.LocalFileAccumulator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * In-order to process bulk records, records splits into numStreams & + * eventsPerChunk. numStreams & eventsPerChunk depends on numEvents provided by + * user in pipelines.yaml eventsPerChunk will be always 20, only numStreams will + * be vary based on numEvents. + * + * numEvents(event_count) must be always divided by 100 completely without any + * remnant. + * + * Ex. 1) if numEvents = 100 then numStreams = 2 and eventsPerChunk = 50 + * 2) if numEvents = 1000 then numStreams = 20 and eventsPerChunk = 50 + */ +public class S3SinkWorker implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(S3SinkWorker.class); + private static final float LOAD_FACTOR = 0.02f; + private static final String IN_MEMORY = "in_memory"; + private static final String LOCAL_FILE = "local_file"; + private final int numEvents; + private int numStreams; + private final int eventsPerChunk; + private final S3SinkService s3SinkService; + private final S3SinkConfig s3SinkConfig; + + /** + * + * @param s3SinkService + * @param s3SinkConfig + */ + public S3SinkWorker(S3SinkService s3SinkService, S3SinkConfig s3SinkConfig) { + this.numEvents = s3SinkConfig.getThresholdOptions().getEeventCount(); + this.numStreams = (int) (numEvents * LOAD_FACTOR); + this.eventsPerChunk = numEvents / numStreams; + this.s3SinkService = s3SinkService; + this.s3SinkConfig = s3SinkConfig; + } + + @Override + public void run() { + try { + while (!S3Sink.isStopRequested()) { + if (s3SinkConfig.getTemporaryStorage().equalsIgnoreCase(IN_MEMORY)) { + inMemmoryAccumulator(); + } else { + localFileAccumulator(); + } + } + } catch (Exception e) { + e.printStackTrace(); + LOG.error("Exception in S3SinkWorker : \n Error message {} \n Exception cause {}", e.getMessage(), + e.getCause(), e); + } + } + + /** + * Accumulates data from buffer and store into in memory + */ + public void inMemmoryAccumulator() { + HashSet inMemoryEventSet = null; + HashMap> inMemoryEventMap = null; + try { + StopWatch watch = new StopWatch(); + watch.start(); + int streamCount = 0; + int byteCount = 0; + int eventCount = 0; + long eventCollectionDuration = 0; + inMemoryEventMap = new HashMap<>(numStreams); + for (int stream = 0; stream < numStreams; stream++) { + inMemoryEventSet = new HashSet<>(eventsPerChunk); + boolean flag = Boolean.FALSE; + for (int data = 0; data < eventsPerChunk + && thresholdsCheck(eventCount, watch, byteCount); data++, eventCount++) { + Event event = S3Sink.getEventQueue().take(); + inMemoryEventSet.add(event); + byteCount += event.toJsonString().getBytes().length; + flag = Boolean.TRUE; + eventCollectionDuration = watch.getTime(TimeUnit.SECONDS); + } + if (flag) { + inMemoryEventMap.put(stream, inMemoryEventSet); + streamCount++; + } else { + // Once threshold reached then No more streaming required per snapshot, hence + // terminate the streaming(outer) loop + break; + } + } + + LOG.info( + "In-Memory snapshot info : Byte_count = {} Bytes \t Event_count = {} Records \t Event_collection_duration = {} sec & \t Number of stream {}", + byteCount, eventCount, eventCollectionDuration, streamCount); + + //new InMemoryAccumulator(inMemoryEventMap, streamCount, s3SinkService, s3SinkConfig).doAccumulate(); + } catch (Exception e) { + LOG.error("Exception while storing recoreds into In-Memory", e); + } + } + + /** + * Accumulates data from buffer and store in local file + */ + public void localFileAccumulator() { + DB db = null; + NavigableSet localFileEventSet = null; + int byteCount = 0; + int eventCount = 0; + long eventCollectionDuration = 0; + try { + StopWatch watch = new StopWatch(); + watch.start(); + db = DBMaker.memoryDB().make(); + localFileEventSet = db.treeSet("mySet").serializer(Serializer.STRING).createOrOpen(); + for (int data = 0; thresholdsCheck(data, watch, byteCount); data++) { + String event = S3Sink.getEventQueue().take().toJsonString(); + byteCount += event.getBytes().length; + localFileEventSet.add(event); + eventCount++; + eventCollectionDuration = watch.getTime(TimeUnit.SECONDS); + } + db.commit(); + LOG.info( + "Local-File snapshot info : Byte_count = {} Bytes, \t Event_count = {} Records \n & Event_collection_duration = {} Sec", + byteCount, eventCount, eventCollectionDuration); + + //new LocalFileAccumulator(localFileEventSet, s3SinkService, s3SinkConfig).doAccumulate(); + + } catch (Exception e) { + LOG.error("Exception while storing recoreds into Local-file", e); + } finally { + if (db !=null && !db.isClosed()) { + db.close(); + } + } + } + + /** + * Bunch of events based on thresholds set in the configuration. The Thresholds + * such as events count, bytes capacity and data collection duration. + * + * @param i + * @param watch + * @param byteCount + * @return + */ + private boolean thresholdsCheck(int eventCount, StopWatch watch, int byteCount) { + boolean flag = Boolean.FALSE; + flag = eventCount < numEvents + && watch.getTime(TimeUnit.SECONDS) < s3SinkConfig.getThresholdOptions().getEventCollectionDuration() + && byteCount < s3SinkConfig.getThresholdOptions().getByteCapacity(); + return flag; + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptions.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptions.java new file mode 100644 index 0000000000..73d83d668d --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptions.java @@ -0,0 +1,82 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.Size; +import software.amazon.awssdk.arns.Arn; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; +import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; + +import java.util.Map; +import java.util.UUID; + +/* + An implementation class AWS Authentication configuration + */ +public class AwsAuthenticationOptions { + @JsonProperty("region") + @Size(min = 1, message = "Region cannot be empty string") + private String awsRegion; + + @JsonProperty("sts_role_arn") + @Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters") + private String awsStsRoleArn; + + @JsonProperty("sts_header_overrides") + @Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override") + private Map awsStsHeaderOverrides; + + /* + AWS Region configuration + */ + public Region getAwsRegion() { + return awsRegion != null ? Region.of(awsRegion) : null; + } + + /* + Aws Credentials Provider configuration + */ + public AwsCredentialsProvider authenticateAwsConfiguration() { + + final AwsCredentialsProvider awsCredentialsProvider; + if (awsStsRoleArn != null && !awsStsRoleArn.isEmpty()) { + try { + Arn.fromString(awsStsRoleArn); + } catch (final Exception e) { + throw new IllegalArgumentException("Invalid ARN format for awsStsRoleArn"); + } + + final StsClient stsClient = StsClient.builder() + .region(getAwsRegion()) + .build(); + + AssumeRoleRequest.Builder assumeRoleRequestBuilder = AssumeRoleRequest.builder() + .roleSessionName("S3-Sink-" + UUID.randomUUID()) + .roleArn(awsStsRoleArn); + + if(awsStsHeaderOverrides != null && !awsStsHeaderOverrides.isEmpty()) { + assumeRoleRequestBuilder = assumeRoleRequestBuilder + .overrideConfiguration(configuration -> awsStsHeaderOverrides.forEach(configuration::putHeader)); + } + + awsCredentialsProvider = StsAssumeRoleCredentialsProvider.builder() + .stsClient(stsClient) + .refreshRequest(assumeRoleRequestBuilder.build()) + .build(); + + } else { + // use default credential provider + awsCredentialsProvider = DefaultCredentialsProvider.create(); + } + + return awsCredentialsProvider; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java new file mode 100644 index 0000000000..8501f70daa --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotNull; + +/* + An implementation class of Threshold configuration Options + */ +public class ObjectOptions { + private static final String DEFAULT_KEY_PATTERN = "logs-${YYYY-MM-DD hh:mm:ss}"; + + @JsonProperty("file_pattern") + @NotNull + private String filePattern = DEFAULT_KEY_PATTERN; + + /* + Read s3 object index file patten configuration + */ + public String getFilePattern() { + return filePattern; + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java new file mode 100644 index 0000000000..b50d1219f5 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java @@ -0,0 +1,50 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotNull; + +/* + An implementation class of s3 index configuration Options + */ +public class ThresholdOptions { + static final int DEFAULT_EVENT_COUNT = 1000; + private static final long DEFAULT_BYTE_CAPACITY = 5000000; + private static final long DEFAULT_TIMEOUT = 60; + + @JsonProperty("event_count") + @NotNull + private int eventCount = DEFAULT_EVENT_COUNT; + + @JsonProperty("byte_capacity") + @NotNull + private long byteCapacity = DEFAULT_BYTE_CAPACITY; + + @JsonProperty("event_collection_duration") + private long eventCollectionDuration = DEFAULT_TIMEOUT; + + /* + Read event collection duration configuration + */ + public long getEventCollectionDuration() { + return eventCollectionDuration; + } + + /* + Read byte capacity configuration + */ + public long getByteCapacity() { + return byteCapacity; + } + + /* + Read the event count configuration + */ + public int getEeventCount() { + return eventCount; + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/resources/pipelines.yaml b/data-prepper-plugins/s3-sink/src/main/resources/pipelines.yaml new file mode 100644 index 0000000000..6e51f4a11d --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/resources/pipelines.yaml @@ -0,0 +1,21 @@ +simple-sample-pipeline: + workers: 4 + delay: "5000" + source: + random: + sink: + - s3: + aws: + region: us-east-1 + sts_role_arn: arn:aws:iam::701869769844 :role/s3-full-access + bucket: dataprepper + key_path_prefix: logdata + object: + file_pattern: logs-${yyyy-MM-dd} + threshold: + event_count: 200 + byte_capacity: 2500 + event_collection_duration: 20 + codec: + json: + temporary_storage: local_file \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 751567838a..a3c749d922 100644 --- a/settings.gradle +++ b/settings.gradle @@ -91,4 +91,5 @@ include 'release:docker' include 'release:maven' include 'e2e-test:peerforwarder' include 'rss-source' +include 'data-prepper-plugins:s3-sink'