diff --git a/data-prepper-plugins/s3-sink/README.md b/data-prepper-plugins/s3-sink/README.md new file mode 100644 index 0000000000..1fbdefe20a --- /dev/null +++ b/data-prepper-plugins/s3-sink/README.md @@ -0,0 +1,9 @@ +Attached simple-sample-pipeline: opensearch-data-prepper\data-prepper\data-prepper-plugins\s3-sink\src\main\resources\pipelines.yaml + +Functional Requirements +1 Provide a mechanism to received events from buffer then process and write to s3. +2 Codecs encode the events into the desired format based on the configuration. +3 Flush the encoded events into s3 bucket as objects. +4 Object name based on the key-pattern. +5 Object length depends on the thresholds provided in the configuration. +6 The Thresholds such as events count, bytes capacity and data collection duration. diff --git a/data-prepper-plugins/s3-sink/build.gradle b/data-prepper-plugins/s3-sink/build.gradle new file mode 100644 index 0000000000..acfa7bfff6 --- /dev/null +++ b/data-prepper-plugins/s3-sink/build.gradle @@ -0,0 +1,67 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +plugins { + id 'java' +} + +repositories { + mavenCentral() +} + +dependencies { + implementation project(':data-prepper-api') + implementation 'io.micrometer:micrometer-core' + implementation 'software.amazon.awssdk:s3' + implementation 'software.amazon.awssdk:sts' + implementation 'software.amazon.awssdk:sqs' + implementation 'com.fasterxml.jackson.core:jackson-core' + implementation 'com.fasterxml.jackson.core:jackson-databind' + implementation 'org.apache.commons:commons-compress:1.21' + implementation 'joda-time:joda-time:2.11.1' + implementation 'org.hibernate.validator:hibernate-validator:7.0.5.Final' + implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv' + implementation 'software.amazon.awssdk:aws-sdk-java:2.17.148' + implementation 'org.mapdb:mapdb:3.0.8' + testImplementation 'org.apache.commons:commons-lang3:3.12.0' + testImplementation project(':data-prepper-test-common') +} + +test { + useJUnitPlatform() +} + +sourceSets { + integrationTest { + java { + compileClasspath += main.output + test.output + runtimeClasspath += main.output + test.output + srcDir file('src/integrationTest/java') + } + resources.srcDir file('src/integrationTest/resources') + } +} + +configurations { + integrationTestImplementation.extendsFrom testImplementation + integrationTestRuntime.extendsFrom testRuntime +} + +task integrationTest(type: Test) { + group = 'verification' + testClassesDirs = sourceSets.integrationTest.output.classesDirs + + useJUnitPlatform() + + classpath = sourceSets.integrationTest.runtimeClasspath + systemProperty 'tests.s3source.bucket', System.getProperty('tests.s3source.bucket') + systemProperty 'tests.s3source.region', System.getProperty('tests.s3source.region') + systemProperty 'tests.s3source.queue.url', System.getProperty('tests.s3source.queue.url') + + filter { + includeTestsMatching '*IT' + } +} + diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3ObjectIndex.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3ObjectIndex.java new file mode 100644 index 0000000000..31a173a621 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3ObjectIndex.java @@ -0,0 +1,111 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Set; +import java.util.TimeZone; +import java.util.UUID; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Reference to an S3 object key Index patterns. + */ + +public class S3ObjectIndex { + + private static final String TIME_PATTERN_STARTING_SYMBOLS = "\\${"; + + //For matching a string that begins with a "${" and ends with a "}". + //For a string like "data-prepper-${yyyy-MM-dd}", "${yyyy-MM-dd}" is matched. + private static final String TIME_PATTERN_REGULAR_EXPRESSION = "\\$\\{.*?\\}"; + + //For matching a string enclosed by "%{" and "}". + //For a string like "data-prepper-${yyyy-MM}", "yyyy-MM" is matched. + private static final String TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION = "\\$\\{(.*?)\\}"; + + private static final ZoneId UTC_ZONE_ID = ZoneId.of(TimeZone.getTimeZone("UTC").getID()); + + S3ObjectIndex() { } + + /* + Create Index with date,time with UniqueID prepended. + */ + public static String getIndexAliasWithDate(final String indexAlias) { + DateTimeFormatter dateFormatter = getDatePatternFormatter(indexAlias); + String suffix = (dateFormatter != null) ? dateFormatter.format(getCurrentUtcTime()) : ""; + return indexAlias.replaceAll(TIME_PATTERN_REGULAR_EXPRESSION, "") + suffix + UUID.randomUUID(); + } + + /* + Validate the index with the regular expression pattern. Throws exception if validation fails + */ + public static DateTimeFormatter getDatePatternFormatter(final String indexAlias) { + final Pattern pattern = Pattern.compile(TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION); + final Matcher timePatternMatcher = pattern.matcher(indexAlias); + if (timePatternMatcher.find()) { + final String timePattern = timePatternMatcher.group(1); + if (timePatternMatcher.find()) { // check if there is a one more match. + throw new IllegalArgumentException("An index only allows one date-time pattern."); + } + if(timePattern.contains(TIME_PATTERN_STARTING_SYMBOLS)){ //check if it is a nested pattern such as "data-prepper-%{%{yyyy.MM.dd}}" + throw new IllegalArgumentException("An index doesn't allow nested date-time patterns."); + } + validateTimePatternIsAtTheEnd(indexAlias, timePattern); + validateNoSpecialCharsInTimePattern(timePattern); + validateTimePatternGranularity(timePattern); + return DateTimeFormatter.ofPattern(timePattern); + } + return null; + } + + /* + Data Prepper only allows time pattern as a suffix. + */ + private static void validateTimePatternIsAtTheEnd(final String indexAlias, final String timePattern) { + if (!indexAlias.endsWith(timePattern + "}")) { + throw new IllegalArgumentException("Time pattern can only be a suffix of an index."); + } + } + + /* + * Special characters can cause failures in creating indexes. + * */ + private static final Set INVALID_CHARS = Set.of('#', '\\', '/', '*', '?', '"', '<', '>', '|', ',', ':'); + public static void validateNoSpecialCharsInTimePattern(String timePattern) { + boolean containsInvalidCharacter = timePattern.chars() + .mapToObj(c -> (char) c) + .anyMatch(character -> INVALID_CHARS.contains(character)); + if (containsInvalidCharacter) { + throw new IllegalArgumentException("Index time pattern contains one or multiple special characters: " + INVALID_CHARS); + } + } + + /* + * Validates the time pattern, support creating indexes with time patterns that are too granular hour, minute and second + */ + private static final Set UNSUPPORTED_TIME_GRANULARITY_CHARS = Set.of('A', 'n', 'N'); + public static void validateTimePatternGranularity(String timePattern) { + boolean containsUnsupportedTimeSymbol = timePattern.chars() + .mapToObj(c -> (char) c) + .anyMatch(character -> UNSUPPORTED_TIME_GRANULARITY_CHARS.contains(character)); + if (containsUnsupportedTimeSymbol) { + throw new IllegalArgumentException("Index time pattern contains time patterns that are less than one hour: " + + UNSUPPORTED_TIME_GRANULARITY_CHARS); + } + } + + /* + Returns the current UTC Date and Time + */ + public static ZonedDateTime getCurrentUtcTime() { + return LocalDateTime.now().atZone(ZoneId.systemDefault()).withZoneSameInstant(UTC_ZONE_ID); + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java new file mode 100644 index 0000000000..9708283648 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java @@ -0,0 +1,116 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink; + +import java.util.Collection; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.AbstractSink; +import org.opensearch.dataprepper.model.sink.Sink; +//import org.opensearch.dataprepper.plugins.sink.codec.Codec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * Implementation class of s3-sink plugin + * + */ +@DataPrepperPlugin(name = "s3", pluginType = Sink.class, pluginConfigurationType = S3SinkConfig.class) +public class S3Sink extends AbstractSink> { + + private static final Logger LOG = LoggerFactory.getLogger(S3Sink.class); + private static final int EVENT_QUEUE_SIZE = 100000; + + private final S3SinkConfig s3SinkConfig; + private volatile boolean initialized; + private static BlockingQueue eventQueue; + private static boolean isStopRequested; + + //private final Codec codec; + private final ObjectMapper objectMapper = new ObjectMapper(); + + /** + * + * @param pluginSetting + * @param s3SinkConfig + * @param pluginFactory + */ + @DataPrepperPluginConstructor + public S3Sink(PluginSetting pluginSetting, final S3SinkConfig s3SinkConfig, final PluginFactory pluginFactory) { + super(pluginSetting); + this.s3SinkConfig = s3SinkConfig; + final PluginModel codecConfiguration = s3SinkConfig.getCodec(); + final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings()); + //codec = pluginFactory.loadPlugin(Codec.class, codecPluginSettings); + initialized = Boolean.FALSE; + } + + @Override + public boolean isReady() { + return initialized; + } + + @Override + public void doInitialize() { + try { + doInitializeInternal(); + } catch (InvalidPluginConfigurationException e) { + LOG.error("Failed to initialize S3-Sink."); + this.shutdown(); + throw new RuntimeException(e.getMessage(), e); + } catch (Exception e) { + LOG.warn("Failed to initialize S3-Sink, retrying. Error - {} \n {}", e.getMessage(), e.getCause()); + } + } + + private void doInitializeInternal() { + eventQueue = new ArrayBlockingQueue<>(EVENT_QUEUE_SIZE); + S3SinkService s3SinkService = new S3SinkService(s3SinkConfig); + S3SinkWorker worker = new S3SinkWorker(s3SinkService, s3SinkConfig); + new Thread(worker).start(); + initialized = Boolean.TRUE; + } + + @Override + public void doOutput(final Collection> records) { + LOG.debug("Records size : {}", records.size()); + if (records.isEmpty()) { + return; + } + + for (final Record recordData : records) { + + Event event = recordData.getData(); + getEventQueue().add(event); + + } + } + + @Override + public void shutdown() { + super.shutdown(); + isStopRequested = Boolean.TRUE; + LOG.info("s3-sink sutdonwn completed"); + } + + public static BlockingQueue getEventQueue() { + return eventQueue; + } + + public static boolean isStopRequested() { + return isStopRequested; + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java new file mode 100644 index 0000000000..718ce504b2 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java @@ -0,0 +1,96 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.plugins.sink.configuration.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.sink.configuration.ObjectOptions; +import org.opensearch.dataprepper.plugins.sink.configuration.ThresholdOptions; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotNull; + +/* + An implementation class of s3 sink configuration + */ +public class S3SinkConfig { + + @JsonProperty("aws") + @NotNull + @Valid + private AwsAuthenticationOptions awsAuthenticationOptions; + + @JsonProperty("threshold") + @NotNull + private ThresholdOptions thresholdOptions; + + @JsonProperty("object") + @NotNull + private ObjectOptions objectOptions; + + @JsonProperty("codec") + @NotNull + private PluginModel codec; + + @JsonProperty("temporary_storage") + @NotNull + private String temporaryStorage; + + @JsonProperty("bucket") + @NotNull + private String bucketName; + + @JsonProperty("key_path_prefix") + @NotNull + private String keyPathPrefix; + + /* + Aws Authentication configuration Options + */ + public AwsAuthenticationOptions getAwsAuthenticationOptions() { + return awsAuthenticationOptions; + } + + /* + Threshold configuration Options + */ + public ThresholdOptions getThresholdOptions() { + return thresholdOptions; + } + + /* + s3 index configuration Options + */ + public ObjectOptions getObjectOptions() { + return objectOptions; + } + + /* + sink codec configuration Options + */ + public PluginModel getCodec() { return codec; } + + /* + s3 index path configuration Option + */ + public String getKeyPathPrefix() { + return keyPathPrefix; + } + + /* + s3 bucket name configuration Option + */ + public String getBucketName() { + return bucketName; + } + + /* + Temporary storage location configuration Options + */ + public String getTemporaryStorage() { + return temporaryStorage; + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java new file mode 100644 index 0000000000..711a03d621 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.retry.RetryPolicy; +import software.amazon.awssdk.services.s3.S3Client; + +public class S3SinkService { + + private static final Logger LOG = LoggerFactory.getLogger(S3SinkService.class); + + private final S3SinkConfig s3SinkConfig; + private final S3Client s3Client; + + S3SinkService(final S3SinkConfig s3SinkConfig){ + this.s3SinkConfig = s3SinkConfig; + this.s3Client = createS3Client(); + } + + + S3Client createS3Client() { + LOG.info("Creating S3 client"); + return S3Client.builder() + .region(s3SinkConfig.getAwsAuthenticationOptions().getAwsRegion()) + .credentialsProvider(s3SinkConfig.getAwsAuthenticationOptions().authenticateAwsConfiguration()) + .overrideConfiguration(ClientOverrideConfiguration.builder() + .retryPolicy(RetryPolicy.builder().numRetries(5).build()) + .build()) + .build(); + } + + public S3Client getS3Client() { + return s3Client; + } + +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java new file mode 100644 index 0000000000..7840e4224d --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java @@ -0,0 +1,174 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.NavigableSet; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.time.StopWatch; +import org.mapdb.DB; +import org.mapdb.DBMaker; +import org.mapdb.Serializer; +import org.opensearch.dataprepper.model.event.Event; +//import org.opensearch.dataprepper.plugins.sink.stream.InMemoryAccumulator; +//import org.opensearch.dataprepper.plugins.sink.stream.LocalFileAccumulator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * In-order to process bulk records, records splits into numStreams & + * eventsPerChunk. numStreams & eventsPerChunk depends on numEvents provided by + * user in pipelines.yaml eventsPerChunk will be always 20, only numStreams will + * be vary based on numEvents. + * + * numEvents(event_count) must be always divided by 100 completely without any + * remnant. + * + * Ex. 1) if numEvents = 100 then numStreams = 2 and eventsPerChunk = 50 + * 2) if numEvents = 1000 then numStreams = 20 and eventsPerChunk = 50 + */ +public class S3SinkWorker implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(S3SinkWorker.class); + private static final float LOAD_FACTOR = 0.02f; + private static final String IN_MEMORY = "in_memory"; + private static final String LOCAL_FILE = "local_file"; + private final int numEvents; + private int numStreams; + private final int eventsPerChunk; + private final S3SinkService s3SinkService; + private final S3SinkConfig s3SinkConfig; + + /** + * + * @param s3SinkService + * @param s3SinkConfig + */ + public S3SinkWorker(S3SinkService s3SinkService, S3SinkConfig s3SinkConfig) { + this.numEvents = s3SinkConfig.getThresholdOptions().getEeventCount(); + this.numStreams = (int) (numEvents * LOAD_FACTOR); + this.eventsPerChunk = numEvents / numStreams; + this.s3SinkService = s3SinkService; + this.s3SinkConfig = s3SinkConfig; + } + + @Override + public void run() { + try { + while (!S3Sink.isStopRequested()) { + if (s3SinkConfig.getTemporaryStorage().equalsIgnoreCase(IN_MEMORY)) { + inMemmoryAccumulator(); + } else { + localFileAccumulator(); + } + } + } catch (Exception e) { + e.printStackTrace(); + LOG.error("Exception in S3SinkWorker : \n Error message {} \n Exception cause {}", e.getMessage(), + e.getCause(), e); + } + } + + /** + * Accumulates data from buffer and store into in memory + */ + public void inMemmoryAccumulator() { + HashSet inMemoryEventSet = null; + HashMap> inMemoryEventMap = null; + try { + StopWatch watch = new StopWatch(); + watch.start(); + int streamCount = 0; + int byteCount = 0; + int eventCount = 0; + long eventCollectionDuration = 0; + inMemoryEventMap = new HashMap<>(numStreams); + for (int stream = 0; stream < numStreams; stream++) { + inMemoryEventSet = new HashSet<>(eventsPerChunk); + boolean flag = Boolean.FALSE; + for (int data = 0; data < eventsPerChunk + && thresholdsCheck(eventCount, watch, byteCount); data++, eventCount++) { + Event event = S3Sink.getEventQueue().take(); + inMemoryEventSet.add(event); + byteCount += event.toJsonString().getBytes().length; + flag = Boolean.TRUE; + eventCollectionDuration = watch.getTime(TimeUnit.SECONDS); + } + if (flag) { + inMemoryEventMap.put(stream, inMemoryEventSet); + streamCount++; + } else { + // Once threshold reached then No more streaming required per snapshot, hence + // terminate the streaming(outer) loop + break; + } + } + + LOG.info( + "In-Memory snapshot info : Byte_count = {} Bytes \t Event_count = {} Records \t Event_collection_duration = {} sec & \t Number of stream {}", + byteCount, eventCount, eventCollectionDuration, streamCount); + + //new InMemoryAccumulator(inMemoryEventMap, streamCount, s3SinkService, s3SinkConfig).doAccumulate(); + } catch (Exception e) { + LOG.error("Exception while storing recoreds into In-Memory", e); + } + } + + /** + * Accumulates data from buffer and store in local file + */ + public void localFileAccumulator() { + DB db = null; + NavigableSet localFileEventSet = null; + int byteCount = 0; + int eventCount = 0; + long eventCollectionDuration = 0; + try { + StopWatch watch = new StopWatch(); + watch.start(); + db = DBMaker.memoryDB().make(); + localFileEventSet = db.treeSet("mySet").serializer(Serializer.STRING).createOrOpen(); + for (int data = 0; thresholdsCheck(data, watch, byteCount); data++) { + String event = S3Sink.getEventQueue().take().toJsonString(); + byteCount += event.getBytes().length; + localFileEventSet.add(event); + eventCount++; + eventCollectionDuration = watch.getTime(TimeUnit.SECONDS); + } + db.commit(); + LOG.info( + "Local-File snapshot info : Byte_count = {} Bytes, \t Event_count = {} Records \n & Event_collection_duration = {} Sec", + byteCount, eventCount, eventCollectionDuration); + + //new LocalFileAccumulator(localFileEventSet, s3SinkService, s3SinkConfig).doAccumulate(); + + } catch (Exception e) { + LOG.error("Exception while storing recoreds into Local-file", e); + } finally { + if (db !=null && !db.isClosed()) { + db.close(); + } + } + } + + /** + * Bunch of events based on thresholds set in the configuration. The Thresholds + * such as events count, bytes capacity and data collection duration. + * + * @param i + * @param watch + * @param byteCount + * @return + */ + private boolean thresholdsCheck(int eventCount, StopWatch watch, int byteCount) { + boolean flag = Boolean.FALSE; + flag = eventCount < numEvents + && watch.getTime(TimeUnit.SECONDS) < s3SinkConfig.getThresholdOptions().getEventCollectionDuration() + && byteCount < s3SinkConfig.getThresholdOptions().getByteCapacity(); + return flag; + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptions.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptions.java new file mode 100644 index 0000000000..73d83d668d --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptions.java @@ -0,0 +1,82 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.Size; +import software.amazon.awssdk.arns.Arn; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; +import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; + +import java.util.Map; +import java.util.UUID; + +/* + An implementation class AWS Authentication configuration + */ +public class AwsAuthenticationOptions { + @JsonProperty("region") + @Size(min = 1, message = "Region cannot be empty string") + private String awsRegion; + + @JsonProperty("sts_role_arn") + @Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters") + private String awsStsRoleArn; + + @JsonProperty("sts_header_overrides") + @Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override") + private Map awsStsHeaderOverrides; + + /* + AWS Region configuration + */ + public Region getAwsRegion() { + return awsRegion != null ? Region.of(awsRegion) : null; + } + + /* + Aws Credentials Provider configuration + */ + public AwsCredentialsProvider authenticateAwsConfiguration() { + + final AwsCredentialsProvider awsCredentialsProvider; + if (awsStsRoleArn != null && !awsStsRoleArn.isEmpty()) { + try { + Arn.fromString(awsStsRoleArn); + } catch (final Exception e) { + throw new IllegalArgumentException("Invalid ARN format for awsStsRoleArn"); + } + + final StsClient stsClient = StsClient.builder() + .region(getAwsRegion()) + .build(); + + AssumeRoleRequest.Builder assumeRoleRequestBuilder = AssumeRoleRequest.builder() + .roleSessionName("S3-Sink-" + UUID.randomUUID()) + .roleArn(awsStsRoleArn); + + if(awsStsHeaderOverrides != null && !awsStsHeaderOverrides.isEmpty()) { + assumeRoleRequestBuilder = assumeRoleRequestBuilder + .overrideConfiguration(configuration -> awsStsHeaderOverrides.forEach(configuration::putHeader)); + } + + awsCredentialsProvider = StsAssumeRoleCredentialsProvider.builder() + .stsClient(stsClient) + .refreshRequest(assumeRoleRequestBuilder.build()) + .build(); + + } else { + // use default credential provider + awsCredentialsProvider = DefaultCredentialsProvider.create(); + } + + return awsCredentialsProvider; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java new file mode 100644 index 0000000000..8501f70daa --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotNull; + +/* + An implementation class of Threshold configuration Options + */ +public class ObjectOptions { + private static final String DEFAULT_KEY_PATTERN = "logs-${YYYY-MM-DD hh:mm:ss}"; + + @JsonProperty("file_pattern") + @NotNull + private String filePattern = DEFAULT_KEY_PATTERN; + + /* + Read s3 object index file patten configuration + */ + public String getFilePattern() { + return filePattern; + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java new file mode 100644 index 0000000000..b50d1219f5 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java @@ -0,0 +1,50 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotNull; + +/* + An implementation class of s3 index configuration Options + */ +public class ThresholdOptions { + static final int DEFAULT_EVENT_COUNT = 1000; + private static final long DEFAULT_BYTE_CAPACITY = 5000000; + private static final long DEFAULT_TIMEOUT = 60; + + @JsonProperty("event_count") + @NotNull + private int eventCount = DEFAULT_EVENT_COUNT; + + @JsonProperty("byte_capacity") + @NotNull + private long byteCapacity = DEFAULT_BYTE_CAPACITY; + + @JsonProperty("event_collection_duration") + private long eventCollectionDuration = DEFAULT_TIMEOUT; + + /* + Read event collection duration configuration + */ + public long getEventCollectionDuration() { + return eventCollectionDuration; + } + + /* + Read byte capacity configuration + */ + public long getByteCapacity() { + return byteCapacity; + } + + /* + Read the event count configuration + */ + public int getEeventCount() { + return eventCount; + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/resources/pipelines.yaml b/data-prepper-plugins/s3-sink/src/main/resources/pipelines.yaml new file mode 100644 index 0000000000..6e51f4a11d --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/resources/pipelines.yaml @@ -0,0 +1,21 @@ +simple-sample-pipeline: + workers: 4 + delay: "5000" + source: + random: + sink: + - s3: + aws: + region: us-east-1 + sts_role_arn: arn:aws:iam::701869769844 :role/s3-full-access + bucket: dataprepper + key_path_prefix: logdata + object: + file_pattern: logs-${yyyy-MM-dd} + threshold: + event_count: 200 + byte_capacity: 2500 + event_collection_duration: 20 + codec: + json: + temporary_storage: local_file \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 751567838a..a3c749d922 100644 --- a/settings.gradle +++ b/settings.gradle @@ -91,4 +91,5 @@ include 'release:docker' include 'release:maven' include 'e2e-test:peerforwarder' include 'rss-source' +include 'data-prepper-plugins:s3-sink'