From 8669138f0cae0ef1632a4d9bd95c24a074158516 Mon Sep 17 00:00:00 2001 From: de20436406 Date: Tue, 28 Feb 2023 14:29:41 +0530 Subject: [PATCH] MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit

Description

Created "s3-sink" plugin. Github issue : #1048

Added Functionality

Check List

New functionality s3-sink plugin
New functionality has been documented.
New functionality has javadoc added.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here --- data-prepper-plugins/s3-sink/README.md | 9 + data-prepper-plugins/s3-sink/build.gradle | 67 ++++ .../plugins/sink/S3ObjectIndex.java | 111 ++++++ .../dataprepper/plugins/sink/S3Sink.java | 104 ++++- .../plugins/sink/S3SinkConfig.java | 93 ++++- .../plugins/sink/S3SinkService.java | 13 +- .../plugins/sink/S3SinkWorker.java | 177 +++++++++ .../plugins/sink/SinkAccumulator.java | 10 + .../dataprepper/plugins/sink/codec/Codec.java | 32 ++ .../plugins/sink/codec/JsonCodec.java | 66 ++++ ...ons.java => AwsAuthenticationOptions.java} | 46 ++- .../sink/configuration/ObjectOptions.java | 27 ++ .../sink/configuration/ThresholdOptions.java | 50 +++ .../plugins/sink/stream/ClosableQueue.java | 31 ++ .../sink/stream/ConvertibleOutputStream.java | 71 ++++ .../stream/ExecutorServiceResultsHandler.java | 103 +++++ .../sink/stream/InMemoryAccumulator.java | 130 ++++++ .../sink/stream/IntegrityCheckException.java | 16 + .../sink/stream/LocalFileAccumulator.java | 137 +++++++ .../sink/stream/MultiPartOutputStream.java | 186 +++++++++ .../plugins/sink/stream/StreamPart.java | 56 +++ .../sink/stream/StreamTransferManager.java | 372 ++++++++++++++++++ .../plugins/sink/stream/Utils.java | 55 +++ .../s3-sink/src/main/resources/pipelines.yaml | 21 + settings.gradle | 4 +- 25 files changed, 1944 insertions(+), 43 deletions(-) create mode 100644 data-prepper-plugins/s3-sink/README.md create mode 100644 data-prepper-plugins/s3-sink/build.gradle create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3ObjectIndex.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/SinkAccumulator.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/Codec.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodec.java rename data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/{SinkAwsAuthenticationOptions.java => AwsAuthenticationOptions.java} (61%) create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/ClosableQueue.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/ConvertibleOutputStream.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/ExecutorServiceResultsHandler.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/InMemoryAccumulator.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/IntegrityCheckException.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/LocalFileAccumulator.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/MultiPartOutputStream.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/StreamPart.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/StreamTransferManager.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/Utils.java create mode 100644 data-prepper-plugins/s3-sink/src/main/resources/pipelines.yaml diff --git a/data-prepper-plugins/s3-sink/README.md b/data-prepper-plugins/s3-sink/README.md new file mode 100644 index 0000000000..1fbdefe20a --- /dev/null +++ b/data-prepper-plugins/s3-sink/README.md @@ -0,0 +1,9 @@ +Attached simple-sample-pipeline: opensearch-data-prepper\data-prepper\data-prepper-plugins\s3-sink\src\main\resources\pipelines.yaml + +Functional Requirements +1 Provide a mechanism to received events from buffer then process and write to s3. +2 Codecs encode the events into the desired format based on the configuration. +3 Flush the encoded events into s3 bucket as objects. +4 Object name based on the key-pattern. +5 Object length depends on the thresholds provided in the configuration. +6 The Thresholds such as events count, bytes capacity and data collection duration. diff --git a/data-prepper-plugins/s3-sink/build.gradle b/data-prepper-plugins/s3-sink/build.gradle new file mode 100644 index 0000000000..acfa7bfff6 --- /dev/null +++ b/data-prepper-plugins/s3-sink/build.gradle @@ -0,0 +1,67 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +plugins { + id 'java' +} + +repositories { + mavenCentral() +} + +dependencies { + implementation project(':data-prepper-api') + implementation 'io.micrometer:micrometer-core' + implementation 'software.amazon.awssdk:s3' + implementation 'software.amazon.awssdk:sts' + implementation 'software.amazon.awssdk:sqs' + implementation 'com.fasterxml.jackson.core:jackson-core' + implementation 'com.fasterxml.jackson.core:jackson-databind' + implementation 'org.apache.commons:commons-compress:1.21' + implementation 'joda-time:joda-time:2.11.1' + implementation 'org.hibernate.validator:hibernate-validator:7.0.5.Final' + implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv' + implementation 'software.amazon.awssdk:aws-sdk-java:2.17.148' + implementation 'org.mapdb:mapdb:3.0.8' + testImplementation 'org.apache.commons:commons-lang3:3.12.0' + testImplementation project(':data-prepper-test-common') +} + +test { + useJUnitPlatform() +} + +sourceSets { + integrationTest { + java { + compileClasspath += main.output + test.output + runtimeClasspath += main.output + test.output + srcDir file('src/integrationTest/java') + } + resources.srcDir file('src/integrationTest/resources') + } +} + +configurations { + integrationTestImplementation.extendsFrom testImplementation + integrationTestRuntime.extendsFrom testRuntime +} + +task integrationTest(type: Test) { + group = 'verification' + testClassesDirs = sourceSets.integrationTest.output.classesDirs + + useJUnitPlatform() + + classpath = sourceSets.integrationTest.runtimeClasspath + systemProperty 'tests.s3source.bucket', System.getProperty('tests.s3source.bucket') + systemProperty 'tests.s3source.region', System.getProperty('tests.s3source.region') + systemProperty 'tests.s3source.queue.url', System.getProperty('tests.s3source.queue.url') + + filter { + includeTestsMatching '*IT' + } +} + diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3ObjectIndex.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3ObjectIndex.java new file mode 100644 index 0000000000..31a173a621 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3ObjectIndex.java @@ -0,0 +1,111 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Set; +import java.util.TimeZone; +import java.util.UUID; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Reference to an S3 object key Index patterns. + */ + +public class S3ObjectIndex { + + private static final String TIME_PATTERN_STARTING_SYMBOLS = "\\${"; + + //For matching a string that begins with a "${" and ends with a "}". + //For a string like "data-prepper-${yyyy-MM-dd}", "${yyyy-MM-dd}" is matched. + private static final String TIME_PATTERN_REGULAR_EXPRESSION = "\\$\\{.*?\\}"; + + //For matching a string enclosed by "%{" and "}". + //For a string like "data-prepper-${yyyy-MM}", "yyyy-MM" is matched. + private static final String TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION = "\\$\\{(.*?)\\}"; + + private static final ZoneId UTC_ZONE_ID = ZoneId.of(TimeZone.getTimeZone("UTC").getID()); + + S3ObjectIndex() { } + + /* + Create Index with date,time with UniqueID prepended. + */ + public static String getIndexAliasWithDate(final String indexAlias) { + DateTimeFormatter dateFormatter = getDatePatternFormatter(indexAlias); + String suffix = (dateFormatter != null) ? dateFormatter.format(getCurrentUtcTime()) : ""; + return indexAlias.replaceAll(TIME_PATTERN_REGULAR_EXPRESSION, "") + suffix + UUID.randomUUID(); + } + + /* + Validate the index with the regular expression pattern. Throws exception if validation fails + */ + public static DateTimeFormatter getDatePatternFormatter(final String indexAlias) { + final Pattern pattern = Pattern.compile(TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION); + final Matcher timePatternMatcher = pattern.matcher(indexAlias); + if (timePatternMatcher.find()) { + final String timePattern = timePatternMatcher.group(1); + if (timePatternMatcher.find()) { // check if there is a one more match. + throw new IllegalArgumentException("An index only allows one date-time pattern."); + } + if(timePattern.contains(TIME_PATTERN_STARTING_SYMBOLS)){ //check if it is a nested pattern such as "data-prepper-%{%{yyyy.MM.dd}}" + throw new IllegalArgumentException("An index doesn't allow nested date-time patterns."); + } + validateTimePatternIsAtTheEnd(indexAlias, timePattern); + validateNoSpecialCharsInTimePattern(timePattern); + validateTimePatternGranularity(timePattern); + return DateTimeFormatter.ofPattern(timePattern); + } + return null; + } + + /* + Data Prepper only allows time pattern as a suffix. + */ + private static void validateTimePatternIsAtTheEnd(final String indexAlias, final String timePattern) { + if (!indexAlias.endsWith(timePattern + "}")) { + throw new IllegalArgumentException("Time pattern can only be a suffix of an index."); + } + } + + /* + * Special characters can cause failures in creating indexes. + * */ + private static final Set INVALID_CHARS = Set.of('#', '\\', '/', '*', '?', '"', '<', '>', '|', ',', ':'); + public static void validateNoSpecialCharsInTimePattern(String timePattern) { + boolean containsInvalidCharacter = timePattern.chars() + .mapToObj(c -> (char) c) + .anyMatch(character -> INVALID_CHARS.contains(character)); + if (containsInvalidCharacter) { + throw new IllegalArgumentException("Index time pattern contains one or multiple special characters: " + INVALID_CHARS); + } + } + + /* + * Validates the time pattern, support creating indexes with time patterns that are too granular hour, minute and second + */ + private static final Set UNSUPPORTED_TIME_GRANULARITY_CHARS = Set.of('A', 'n', 'N'); + public static void validateTimePatternGranularity(String timePattern) { + boolean containsUnsupportedTimeSymbol = timePattern.chars() + .mapToObj(c -> (char) c) + .anyMatch(character -> UNSUPPORTED_TIME_GRANULARITY_CHARS.contains(character)); + if (containsUnsupportedTimeSymbol) { + throw new IllegalArgumentException("Index time pattern contains time patterns that are less than one hour: " + + UNSUPPORTED_TIME_GRANULARITY_CHARS); + } + } + + /* + Returns the current UTC Date and Time + */ + public static ZonedDateTime getCurrentUtcTime() { + return LocalDateTime.now().atZone(ZoneId.systemDefault()).withZoneSameInstant(UTC_ZONE_ID); + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java index 6c643268e9..70ea4a4dd7 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java @@ -1,48 +1,116 @@ -/** - * +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 */ package org.opensearch.dataprepper.plugins.sink; import java.util.Collection; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.AbstractSink; import org.opensearch.dataprepper.model.sink.Sink; +import org.opensearch.dataprepper.plugins.sink.codec.Codec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * Implementation class of s3-sink plugin + * + */ @DataPrepperPlugin(name = "s3", pluginType = Sink.class, pluginConfigurationType = S3SinkConfig.class) -public class S3Sink implements Sink> { - +public class S3Sink extends AbstractSink> { + private static final Logger LOG = LoggerFactory.getLogger(S3Sink.class); + private static final int EVENT_QUEUE_SIZE = 100000; + private final S3SinkConfig s3SinkConfig; + private volatile boolean initialized; + private static BlockingQueue eventQueue; + private static boolean isStopRequested; - private final String outputS3Path; - private static final String SAMPLE_S3_PATH = "src/resources/"; - public static final String PATH = "path"; - - + private final Codec codec; + private final ObjectMapper objectMapper = new ObjectMapper(); + + /** + * + * @param pluginSetting + * @param s3SinkConfig + * @param pluginFactory + */ @DataPrepperPluginConstructor - public S3Sink(final S3SinkConfig s3SinkConfig, final PluginSetting pluginSetting) { + public S3Sink(PluginSetting pluginSetting, final S3SinkConfig s3SinkConfig, final PluginFactory pluginFactory) { + super(pluginSetting); this.s3SinkConfig = s3SinkConfig; - final String outputS3 = (String) pluginSetting.getAttributeFromSettings(PATH); - outputS3Path = outputS3 == null ? SAMPLE_S3_PATH : outputS3; - + final PluginModel codecConfiguration = s3SinkConfig.getCodec(); + final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings()); + codec = pluginFactory.loadPlugin(Codec.class, codecPluginSettings); + initialized = Boolean.FALSE; } @Override - public void output(Collection> records) { - - final S3SinkService s3SinkService = new S3SinkService(s3SinkConfig); + public boolean isReady() { + return initialized; + } + + @Override + public void doInitialize() { + try { + doInitializeInternal(); + } catch (InvalidPluginConfigurationException e) { + LOG.error("Failed to initialize S3-Sink."); + this.shutdown(); + throw new RuntimeException(e.getMessage(), e); + } catch (Exception e) { + LOG.warn("Failed to initialize S3-Sink, retrying. Error - {} \n {}", e.getMessage(), e.getCause()); + } + } + + private void doInitializeInternal() { + eventQueue = new ArrayBlockingQueue<>(EVENT_QUEUE_SIZE); + S3SinkService s3SinkService = new S3SinkService(s3SinkConfig); + S3SinkWorker worker = new S3SinkWorker(s3SinkService, s3SinkConfig, codec); + new Thread(worker).start(); + initialized = Boolean.TRUE; + } + + @Override + public void doOutput(final Collection> records) { + LOG.debug("Records size : {}", records.size()); + if (records.isEmpty()) { + return; + } + for (final Record recordData : records) { + + Event event = recordData.getData(); + getEventQueue().add(event); + + } } @Override public void shutdown() { - // TODO Auto-generated method stub - + super.shutdown(); + isStopRequested = Boolean.TRUE; + LOG.info("s3-sink sutdonwn completed"); } + public static BlockingQueue getEventQueue() { + return eventQueue; + } + + public static boolean isStopRequested() { + return isStopRequested; + } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java index fcf21d475e..718ce504b2 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java @@ -1,21 +1,96 @@ -package org.opensearch.dataprepper.plugins.sink; +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ -import org.opensearch.dataprepper.plugins.sink.configuration.SinkAwsAuthenticationOptions; +package org.opensearch.dataprepper.plugins.sink; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.plugins.sink.configuration.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.sink.configuration.ObjectOptions; +import org.opensearch.dataprepper.plugins.sink.configuration.ThresholdOptions; import com.fasterxml.jackson.annotation.JsonProperty; - import jakarta.validation.Valid; import jakarta.validation.constraints.NotNull; +/* + An implementation class of s3 sink configuration + */ public class S3SinkConfig { - @JsonProperty("sink_aws") + @JsonProperty("aws") + @NotNull + @Valid + private AwsAuthenticationOptions awsAuthenticationOptions; + + @JsonProperty("threshold") + @NotNull + private ThresholdOptions thresholdOptions; + + @JsonProperty("object") + @NotNull + private ObjectOptions objectOptions; + + @JsonProperty("codec") @NotNull - @Valid - private SinkAwsAuthenticationOptions sinkAwsAuthenticationOptions; + private PluginModel codec; - public SinkAwsAuthenticationOptions getSinkAwsAuthenticationOptions() { - return sinkAwsAuthenticationOptions; - } + @JsonProperty("temporary_storage") + @NotNull + private String temporaryStorage; + + @JsonProperty("bucket") + @NotNull + private String bucketName; + @JsonProperty("key_path_prefix") + @NotNull + private String keyPathPrefix; + + /* + Aws Authentication configuration Options + */ + public AwsAuthenticationOptions getAwsAuthenticationOptions() { + return awsAuthenticationOptions; + } + + /* + Threshold configuration Options + */ + public ThresholdOptions getThresholdOptions() { + return thresholdOptions; + } + + /* + s3 index configuration Options + */ + public ObjectOptions getObjectOptions() { + return objectOptions; + } + + /* + sink codec configuration Options + */ + public PluginModel getCodec() { return codec; } + + /* + s3 index path configuration Option + */ + public String getKeyPathPrefix() { + return keyPathPrefix; + } + + /* + s3 bucket name configuration Option + */ + public String getBucketName() { + return bucketName; + } + + /* + Temporary storage location configuration Options + */ + public String getTemporaryStorage() { + return temporaryStorage; + } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java index 77b7e77c11..711a03d621 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java @@ -1,3 +1,7 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ package org.opensearch.dataprepper.plugins.sink; import org.slf4j.Logger; @@ -19,15 +23,20 @@ public class S3SinkService { this.s3Client = createS3Client(); } + S3Client createS3Client() { LOG.info("Creating S3 client"); return S3Client.builder() - .region(s3SinkConfig.getSinkAwsAuthenticationOptions().getAwsRegion()) - .credentialsProvider(s3SinkConfig.getSinkAwsAuthenticationOptions().authenticateAwsConfiguration()) + .region(s3SinkConfig.getAwsAuthenticationOptions().getAwsRegion()) + .credentialsProvider(s3SinkConfig.getAwsAuthenticationOptions().authenticateAwsConfiguration()) .overrideConfiguration(ClientOverrideConfiguration.builder() .retryPolicy(RetryPolicy.builder().numRetries(5).build()) .build()) .build(); } + + public S3Client getS3Client() { + return s3Client; + } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java new file mode 100644 index 0000000000..e1597aa631 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java @@ -0,0 +1,177 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.NavigableSet; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.time.StopWatch; +import org.mapdb.DB; +import org.mapdb.DBMaker; +import org.mapdb.Serializer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.plugins.sink.codec.Codec; +import org.opensearch.dataprepper.plugins.sink.stream.InMemoryAccumulator; +import org.opensearch.dataprepper.plugins.sink.stream.LocalFileAccumulator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * In-order to process bulk records, records splits into numStreams & + * eventsPerChunk. numStreams & eventsPerChunk depends on numEvents provided by + * user in pipelines.yaml eventsPerChunk will be always 20, only numStreams will + * be vary based on numEvents. + * + * numEvents(event_count) must be always divided by 100 completely without any + * remnant. + * + * Ex. 1) if numEvents = 100 then numStreams = 2 and eventsPerChunk = 50 + * 2) if numEvents = 1000 then numStreams = 20 and eventsPerChunk = 50 + */ +public class S3SinkWorker implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(S3SinkWorker.class); + private static final float LOAD_FACTOR = 0.02f; + private static final String IN_MEMORY = "in_memory"; + private static final String LOCAL_FILE = "local_file"; + private final int numEvents; + private int numStreams; + private final int eventsPerChunk; + private final S3SinkService s3SinkService; + private final S3SinkConfig s3SinkConfig; + private final Codec codec; + + /** + * + * @param s3SinkService + * @param s3SinkConfig + */ + public S3SinkWorker(S3SinkService s3SinkService, S3SinkConfig s3SinkConfig, Codec codec) { + this.numEvents = s3SinkConfig.getThresholdOptions().getEeventCount(); + this.numStreams = (int) (numEvents * LOAD_FACTOR); + this.eventsPerChunk = numEvents / numStreams; + this.s3SinkService = s3SinkService; + this.s3SinkConfig = s3SinkConfig; + this.codec = codec; + } + + @Override + public void run() { + try { + while (!S3Sink.isStopRequested()) { + if (s3SinkConfig.getTemporaryStorage().equalsIgnoreCase(IN_MEMORY)) { + inMemmoryAccumulator(); + } else { + localFileAccumulator(); + } + } + } catch (Exception e) { + e.printStackTrace(); + LOG.error("Exception in S3SinkWorker : \n Error message {} \n Exception cause {}", e.getMessage(), + e.getCause(), e); + } + } + + /** + * Accumulates data from buffer and store into in memory + */ + public void inMemmoryAccumulator() { + HashSet inMemoryEventSet = null; + HashMap> inMemoryEventMap = null; + try { + StopWatch watch = new StopWatch(); + watch.start(); + int streamCount = 0; + int byteCount = 0; + int eventCount = 0; + long eventCollectionDuration = 0; + inMemoryEventMap = new HashMap<>(numStreams); + for (int stream = 0; stream < numStreams; stream++) { + inMemoryEventSet = new HashSet<>(eventsPerChunk); + boolean flag = Boolean.FALSE; + for (int data = 0; data < eventsPerChunk + && thresholdsCheck(eventCount, watch, byteCount); data++, eventCount++) { + Event event = S3Sink.getEventQueue().take(); + inMemoryEventSet.add(event); + byteCount += event.toJsonString().getBytes().length; + flag = Boolean.TRUE; + eventCollectionDuration = watch.getTime(TimeUnit.SECONDS); + } + if (flag) { + inMemoryEventMap.put(stream, inMemoryEventSet); + streamCount++; + } else { + // Once threshold reached then No more streaming required per snapshot, hence + // terminate the streaming(outer) loop + break; + } + } + + LOG.info( + "In-Memory snapshot info : Byte_count = {} Bytes \t Event_count = {} Records \t Event_collection_duration = {} sec & \t Number of stream {}", + byteCount, eventCount, eventCollectionDuration, streamCount); + + new InMemoryAccumulator(inMemoryEventMap, streamCount, s3SinkService, s3SinkConfig).doAccumulate(); + } catch (Exception e) { + LOG.error("Exception while storing recoreds into In-Memory", e); + } + } + + /** + * Accumulates data from buffer and store in local file + */ + public void localFileAccumulator() { + DB db = null; + NavigableSet localFileEventSet = null; + int byteCount = 0; + int eventCount = 0; + long eventCollectionDuration = 0; + try { + StopWatch watch = new StopWatch(); + watch.start(); + db = DBMaker.memoryDB().make(); + localFileEventSet = db.treeSet("mySet").serializer(Serializer.STRING).createOrOpen(); + for (int data = 0; thresholdsCheck(data, watch, byteCount); data++) { + String event = S3Sink.getEventQueue().take().toJsonString(); + byteCount += event.getBytes().length; + localFileEventSet.add(event); + eventCount++; + eventCollectionDuration = watch.getTime(TimeUnit.SECONDS); + } + db.commit(); + LOG.info( + "Local-File snapshot info : Byte_count = {} Bytes, \t Event_count = {} Records \n & Event_collection_duration = {} Sec", + byteCount, eventCount, eventCollectionDuration); + + new LocalFileAccumulator(localFileEventSet, s3SinkService, s3SinkConfig).doAccumulate(); + + } catch (Exception e) { + LOG.error("Exception while storing recoreds into Local-file", e); + } finally { + if (db !=null && !db.isClosed()) { + db.close(); + } + } + } + + /** + * Bunch of events based on thresholds set in the configuration. The Thresholds + * such as events count, bytes capacity and data collection duration. + * + * @param i + * @param watch + * @param byteCount + * @return + */ + private boolean thresholdsCheck(int eventCount, StopWatch watch, int byteCount) { + boolean flag = Boolean.FALSE; + flag = eventCount < numEvents + && watch.getTime(TimeUnit.SECONDS) < s3SinkConfig.getThresholdOptions().getEventCollectionDuration() + && byteCount < s3SinkConfig.getThresholdOptions().getByteCapacity(); + return flag; + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/SinkAccumulator.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/SinkAccumulator.java new file mode 100644 index 0000000000..cdd5f4a669 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/SinkAccumulator.java @@ -0,0 +1,10 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink; + +public interface SinkAccumulator { + + void doAccumulate(); +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/Codec.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/Codec.java new file mode 100644 index 0000000000..488adffb1d --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/Codec.java @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.codec; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collection; + +/** + * A codec parsing data through an output stream. Each implementation of this class should + * support parsing a specific type or format of data. See sub-classes for examples. + */ +public interface Codec { + /** + * Parses an {@link OutputStream}. Implementors should call the {@link Collection} for each + * {@link Record} loaded from the {@link OutputStream}. + * + * @param outputStream The output stream for the json data + * @param eventCollection The collection which holds record events + */ + void parse(OutputStream outputStream, Collection> eventCollection) throws IOException; + + void parse(OutputStream outputStream, Record eventCollection) throws IOException; + + void parse(OutputStream outputStream, Event event) throws IOException; +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodec.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodec.java new file mode 100644 index 0000000000..cb13088b39 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodec.java @@ -0,0 +1,66 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.codec; + +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.event.Event; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.opensearch.dataprepper.model.record.Record; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collection; +import java.util.Objects; + +/** + * An implementation of {@link Codec} which serializes to JSON. + */ +@DataPrepperPlugin(name = "json", pluginType = Codec.class) +public class JsonCodec implements Codec { + private final ObjectMapper objectMapper = new ObjectMapper(); + + /* + * Generates a serialized json string of the Events + */ + + @Override + public void parse(final OutputStream outputStream, final Collection> eventCollection) throws IOException { + Objects.requireNonNull(outputStream); + Objects.requireNonNull(eventCollection); + + StringBuilder recordEventData = new StringBuilder(); + for (final Record recordEvent : eventCollection) { + recordEventData.append(recordEvent.getData().toJsonString()); + + } + objectMapper.writeValue(outputStream, recordEventData.toString()); + } + + /* + * Generates a serialized json string of the Events + */ + @Override + public void parse(OutputStream outputStream, Record eventCollection) throws IOException + { + Objects.requireNonNull(outputStream); + Objects.requireNonNull(eventCollection); + + objectMapper.writeValue(outputStream, eventCollection.getData().toJsonString()); + + } + /* + * Generates a serialized json string of the Event + */ + @Override + public void parse(OutputStream outputStream, Event event) throws IOException + { + Objects.requireNonNull(outputStream); + Objects.requireNonNull(event); + + objectMapper.writeValue(outputStream, event.toJsonString()); + + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/SinkAwsAuthenticationOptions.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptions.java similarity index 61% rename from data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/SinkAwsAuthenticationOptions.java rename to data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptions.java index 334128bbc2..73d83d668d 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/SinkAwsAuthenticationOptions.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptions.java @@ -1,9 +1,11 @@ -package org.opensearch.dataprepper.plugins.sink.configuration; +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ -import java.util.UUID; +package org.opensearch.dataprepper.plugins.sink.configuration; import com.fasterxml.jackson.annotation.JsonProperty; - import jakarta.validation.constraints.Size; import software.amazon.awssdk.arns.Arn; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; @@ -13,7 +15,13 @@ import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; -public class SinkAwsAuthenticationOptions { +import java.util.Map; +import java.util.UUID; + +/* + An implementation class AWS Authentication configuration + */ +public class AwsAuthenticationOptions { @JsonProperty("region") @Size(min = 1, message = "Region cannot be empty string") private String awsRegion; @@ -21,11 +29,21 @@ public class SinkAwsAuthenticationOptions { @JsonProperty("sts_role_arn") @Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters") private String awsStsRoleArn; + + @JsonProperty("sts_header_overrides") + @Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override") + private Map awsStsHeaderOverrides; + /* + AWS Region configuration + */ public Region getAwsRegion() { return awsRegion != null ? Region.of(awsRegion) : null; } + /* + Aws Credentials Provider configuration + */ public AwsCredentialsProvider authenticateAwsConfiguration() { final AwsCredentialsProvider awsCredentialsProvider; @@ -39,15 +57,21 @@ public AwsCredentialsProvider authenticateAwsConfiguration() { final StsClient stsClient = StsClient.builder() .region(getAwsRegion()) .build(); - + + AssumeRoleRequest.Builder assumeRoleRequestBuilder = AssumeRoleRequest.builder() + .roleSessionName("S3-Sink-" + UUID.randomUUID()) + .roleArn(awsStsRoleArn); + + if(awsStsHeaderOverrides != null && !awsStsHeaderOverrides.isEmpty()) { + assumeRoleRequestBuilder = assumeRoleRequestBuilder + .overrideConfiguration(configuration -> awsStsHeaderOverrides.forEach(configuration::putHeader)); + } + awsCredentialsProvider = StsAssumeRoleCredentialsProvider.builder() .stsClient(stsClient) - .refreshRequest(AssumeRoleRequest.builder() - .roleSessionName("S3-Sink-" + UUID.randomUUID()) - .roleArn(awsStsRoleArn) - .build()) + .refreshRequest(assumeRoleRequestBuilder.build()) .build(); - + } else { // use default credential provider awsCredentialsProvider = DefaultCredentialsProvider.create(); @@ -55,4 +79,4 @@ public AwsCredentialsProvider authenticateAwsConfiguration() { return awsCredentialsProvider; } -} +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java new file mode 100644 index 0000000000..8501f70daa --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotNull; + +/* + An implementation class of Threshold configuration Options + */ +public class ObjectOptions { + private static final String DEFAULT_KEY_PATTERN = "logs-${YYYY-MM-DD hh:mm:ss}"; + + @JsonProperty("file_pattern") + @NotNull + private String filePattern = DEFAULT_KEY_PATTERN; + + /* + Read s3 object index file patten configuration + */ + public String getFilePattern() { + return filePattern; + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java new file mode 100644 index 0000000000..b50d1219f5 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java @@ -0,0 +1,50 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotNull; + +/* + An implementation class of s3 index configuration Options + */ +public class ThresholdOptions { + static final int DEFAULT_EVENT_COUNT = 1000; + private static final long DEFAULT_BYTE_CAPACITY = 5000000; + private static final long DEFAULT_TIMEOUT = 60; + + @JsonProperty("event_count") + @NotNull + private int eventCount = DEFAULT_EVENT_COUNT; + + @JsonProperty("byte_capacity") + @NotNull + private long byteCapacity = DEFAULT_BYTE_CAPACITY; + + @JsonProperty("event_collection_duration") + private long eventCollectionDuration = DEFAULT_TIMEOUT; + + /* + Read event collection duration configuration + */ + public long getEventCollectionDuration() { + return eventCollectionDuration; + } + + /* + Read byte capacity configuration + */ + public long getByteCapacity() { + return byteCapacity; + } + + /* + Read the event count configuration + */ + public int getEeventCount() { + return eventCount; + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/ClosableQueue.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/ClosableQueue.java new file mode 100644 index 0000000000..3c017ca350 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/ClosableQueue.java @@ -0,0 +1,31 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.stream; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; + +public class ClosableQueue extends ArrayBlockingQueue { + private volatile boolean closed = false; + + public ClosableQueue(int capacity) { + super(capacity); + } + + public void close() { + closed = true; + } + + @Override + public void put(T t) throws InterruptedException { + while (!offer(t, 1, TimeUnit.SECONDS)) { + if (closed) { + throw new IllegalStateException( + "The queue is now closed due to an error elsewhere" + ); + } + } + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/ConvertibleOutputStream.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/ConvertibleOutputStream.java new file mode 100644 index 0000000000..76dfdd71be --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/ConvertibleOutputStream.java @@ -0,0 +1,71 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.stream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.security.MessageDigest; + +/** + * A ByteArrayOutputStream with some useful additional functionality. + */ +class ConvertibleOutputStream extends ByteArrayOutputStream { + + private static final Logger log = LoggerFactory.getLogger(ConvertibleOutputStream.class); + + public ConvertibleOutputStream(int initialCapacity) { + super(initialCapacity); + } + + /** + * Creates an InputStream sharing the same underlying byte array, reducing memory usage and copying time. + */ + public InputStream toInputStream() { + return new ByteArrayInputStream(buf, 0, count); + } + + /** + * Truncates this stream to a given size and returns a new stream containing a copy of the remaining data. + * + * @param countToKeep number of bytes to keep in this stream, starting from the first written byte. + * @param initialCapacityForNewStream buffer capacity to construct the new stream (NOT the number of bytes + * that the new stream will take from this one) + * @return a new stream containing all the bytes previously contained in this one, i.e. from countToKeep + 1 onwards. + */ + public ConvertibleOutputStream split(int countToKeep, int initialCapacityForNewStream) { + int newCount = count - countToKeep; + log.debug("Splitting stream of size {} into parts with sizes {} and {}", count, countToKeep, newCount); + initialCapacityForNewStream = Math.max(initialCapacityForNewStream, newCount); + ConvertibleOutputStream newStream = new ConvertibleOutputStream(initialCapacityForNewStream); + newStream.write(buf, countToKeep, newCount); + count = countToKeep; + return newStream; + } + + /** + * Concatenates the given stream to this stream. + */ + public void append(ConvertibleOutputStream otherStream) { + try { + otherStream.writeTo(this); + } catch (IOException e) { + + // Should never happen because these are all ByteArrayOutputStreams + throw new AssertionError(e); + } + } + + public byte[] getMD5Digest() { + MessageDigest md = Utils.md5(); + md.update(buf, 0, count); + return md.digest(); + } + +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/ExecutorServiceResultsHandler.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/ExecutorServiceResultsHandler.java new file mode 100644 index 0000000000..40609ead9f --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/ExecutorServiceResultsHandler.java @@ -0,0 +1,103 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.stream; + +import java.util.Iterator; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Wrapper around an ExecutorService that allows you to easily submit {@link Callable}s, get results via iteration, + * and handle failure quickly. When a submitted callable throws an exception in its thread this + * will result in a {@code RuntimeException} when iterating over results. Typical usage is as follows: + *

+ *

    + *
  1. Create an ExecutorService and pass it to the constructor.
  2. + *
  3. Create Callables and ensure that they respond to interruption, e.g. regularly call:
    {@code
    + *     if (Thread.currentThread().isInterrupted()) {
    + *         throw new RuntimeException("The thread was interrupted, likely indicating failure in a sibling thread.");
    + *     }}
  4. + *
  5. Pass the callables to the {@code submit()} method.
  6. + *
  7. Call {@code finishedSubmitting()}.
  8. + *
  9. Iterate over this object (e.g. with a foreach loop) to get results from the callables. + * Each iteration will block waiting for the next result. + * If one of the callables throws an unhandled exception or the thread is interrupted during iteration + * then {@link ExecutorService#shutdownNow()} will be called resulting in all still running callables being interrupted, + * and a {@code RuntimeException} will be thrown
  10. + *
+ *

+ * You can also call {@code abort()} to shut down the threads yourself. + */ +public class ExecutorServiceResultsHandler implements Iterable { + + private ExecutorCompletionService completionService; + private ExecutorService executorService; + private AtomicInteger taskCount = new AtomicInteger(0); + + public ExecutorServiceResultsHandler(ExecutorService executorService) { + this.executorService = executorService; + completionService = new ExecutorCompletionService(executorService); + } + + public void submit(Callable task) { + completionService.submit(task); + taskCount.incrementAndGet(); + } + + public void finishedSubmitting() { + executorService.shutdown(); + } + + @Override + public Iterator iterator() { + return new Iterator() { + @Override + public boolean hasNext() { + return taskCount.getAndDecrement() > 0; + } + + @Override + public V next() { + Exception exception; + try { + return completionService.take().get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + exception = e; + } catch (ExecutionException e) { + exception = e; + } + abort(); + throw new RuntimeException(exception); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove"); + } + + }; + } + + public void abort() { + if (executorService != null) { + executorService.shutdownNow(); + } + } + + /** + * Convenience method to wait for the callables to finish for when you don't care about the results. + */ + public void awaitCompletion() { + //noinspection StatementWithEmptyBody + for (V ignored : this) { + // do nothing + } + } + +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/InMemoryAccumulator.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/InMemoryAccumulator.java new file mode 100644 index 0000000000..095a77ffd4 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/InMemoryAccumulator.java @@ -0,0 +1,130 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.stream; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.plugins.sink.S3ObjectIndex; +import org.opensearch.dataprepper.plugins.sink.S3SinkConfig; +import org.opensearch.dataprepper.plugins.sink.S3SinkService; +import org.opensearch.dataprepper.plugins.sink.SinkAccumulator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import software.amazon.awssdk.services.s3.S3Client; +/** + * Accumulates data from buffer and store into in memory + */ +public class InMemoryAccumulator implements SinkAccumulator { + + private static final Logger LOG = LoggerFactory.getLogger(InMemoryAccumulator.class); + final Map> inMemoryEventMap; + private final int numStreams; + private final S3SinkService s3SinkService; + private final S3SinkConfig s3SinkConfig; + private boolean retry = Boolean.FALSE; + private static final int MAX_RETRY = 3; + private static final int PART_SIZE_COUNT = 5; + + /** + * + * @param inMemoryEventMap + * @param numStreams + * @param s3SinkService + * @param s3SinkConfig + */ + public InMemoryAccumulator(final Map> inMemoryEventMap, final int numStreams, + final S3SinkService s3SinkService, final S3SinkConfig s3SinkConfig) { + this.inMemoryEventMap = inMemoryEventMap; + this.numStreams = numStreams; + this.s3SinkService = s3SinkService; + this.s3SinkConfig = s3SinkConfig; + } + + @Override + public void doAccumulate() { + + String bucket = s3SinkConfig.getBucketName(); + String path = s3SinkConfig.getKeyPathPrefix(); + String index = S3ObjectIndex.getIndexAliasWithDate(s3SinkConfig.getObjectOptions().getFilePattern()); + String key = path + "/" + index; + + S3Client client = s3SinkService.getS3Client(); + int retryCount = MAX_RETRY; + + LOG.info("S3-Sink will caeate Amazon S3 object : {}", key); + + do { + try { + // Setting up + final StreamTransferManager manager = new StreamTransferManager(bucket, key, client, path) + .numStreams(numStreams).numUploadThreads(2).queueCapacity(2).partSize(PART_SIZE_COUNT); + final List streams = manager.getMultiPartOutputStreams(); + + ExecutorService pool = Executors.newFixedThreadPool(numStreams); + for (int streamsInput = 0; streamsInput < numStreams; streamsInput++) { + final int streamIndex = streamsInput; + HashSet eventSet = inMemoryEventMap.get(streamsInput); + pool.submit(new Runnable() { + public void run() { + try { + MultiPartOutputStream outputStream = streams.get(streamIndex); + if (eventSet != null) { + for (Event event : eventSet) { + outputStream.write(event.toJsonString().getBytes()); + } + } + // The stream must be closed once all the data has been written + outputStream.close(); + } catch (Exception e) { + // Aborts all uploads + retry = Boolean.TRUE; + manager.abort(e); + LOG.error("Aborts all uploads = {}", manager, e); + } + } + }); + } + pool.shutdown(); + sleep(pool); + // Finishing off + manager.complete(); + + } catch (Exception e) { + retry = Boolean.TRUE; + LOG.error("Issue with the streaming recoreds via s3 client : \n Error message {} \n Exception cause {}", e.getMessage(), e.getCause(), e); + } + + if (retryCount == 0) { + retry = Boolean.FALSE; + LOG.warn("Maximum retry count 3 reached, Unable to store {} into Amazon S3", key); + } + if (retry) { + LOG.info("Retry : {}", (MAX_RETRY - --retryCount)); + sleep(null); + } + } while (retry); + } + + private void sleep(ExecutorService pool) { + try { + if (pool != null) { + pool.awaitTermination(5, TimeUnit.SECONDS); + } else { + Thread.sleep(5000); + } + + } catch (InterruptedException e) { + LOG.error("InterruptedException - \n Error message {} \n Exception cause {}", e.getMessage(), e.getCause()); + } + } + +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/IntegrityCheckException.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/IntegrityCheckException.java new file mode 100644 index 0000000000..ccaf095c7c --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/IntegrityCheckException.java @@ -0,0 +1,16 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.stream; + +/** + * Thrown when final integrity check fails. It suggests that the multipart upload failed + * due to data corruption. See {@link StreamTransferManager#checkIntegrity(boolean)} for details. + */ +public class IntegrityCheckException extends RuntimeException { + + public IntegrityCheckException(String message) { + super(message); + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/LocalFileAccumulator.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/LocalFileAccumulator.java new file mode 100644 index 0000000000..0f75b1f4ba --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/LocalFileAccumulator.java @@ -0,0 +1,137 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.stream; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.NavigableSet; +import java.util.Optional; + +import org.opensearch.dataprepper.plugins.sink.S3ObjectIndex; +import org.opensearch.dataprepper.plugins.sink.S3SinkConfig; +import org.opensearch.dataprepper.plugins.sink.S3SinkService; +import org.opensearch.dataprepper.plugins.sink.SinkAccumulator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.core.waiters.WaiterResponse; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; +import software.amazon.awssdk.services.s3.model.NoSuchUploadException; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.waiters.S3Waiter; + +/** + * + * @author de20436406 + * + */ +public class LocalFileAccumulator implements SinkAccumulator { + private static final Logger LOG = LoggerFactory.getLogger(LocalFileAccumulator.class); + private static final int MAX_RETRY = 3; + private final S3SinkService s3SinkService; + private final S3SinkConfig s3SinkConfig; + + private final NavigableSet localFileEventSet; + private String fileAbsolutePath = null; + private String localFileName = null; + + /** + * + * @param localFileEventSet + * @param s3SinkService + * @param s3SinkConfig + */ + public LocalFileAccumulator(final NavigableSet localFileEventSet, final S3SinkService s3SinkService, + final S3SinkConfig s3SinkConfig) { + this.localFileEventSet = localFileEventSet; + this.s3SinkService = s3SinkService; + this.s3SinkConfig = s3SinkConfig; + } + + @Override + public void doAccumulate() { + boolean retry = Boolean.FALSE; + localFileName = S3ObjectIndex.getIndexAliasWithDate(s3SinkConfig.getObjectOptions().getFilePattern()); + File file = new File(localFileName); + int retryCount = MAX_RETRY; + + try (BufferedWriter writer = new BufferedWriter(new FileWriter(localFileName))) { + for (String event : localFileEventSet) { + writer.write(event); + } + fileAbsolutePath = file.getAbsoluteFile().toString(); + writer.flush(); + LOG.info("data stored in local file {}", fileAbsolutePath); + do { + retry = !fileSaveToS3(); + if (retryCount == 0) { + retry = Boolean.FALSE; + LOG.warn("Maximum retry count 3 reached, Unable to store {} into Amazon S3", localFileName); + } + if (retry) { + LOG.info("Retry : {}", (MAX_RETRY - --retryCount)); + Thread.sleep(5000); + } + } while (retry); + } catch (IOException e) { + LOG.error("Events unable to save into local file : {}", localFileName, e); + } catch (Exception e) { + LOG.error("Unable to store {} into Amazon S3", localFileName, e); + } finally { + try { + boolean isLocalFileDeleted = Files.deleteIfExists(Paths.get(fileAbsolutePath)); + if (isLocalFileDeleted) { + LOG.info("Local file deleted successfully {}", fileAbsolutePath); + } else { + LOG.warn("Local file not deleted {}", fileAbsolutePath); + } + } catch (IOException e) { + LOG.error("Local file unable to deleted {}", fileAbsolutePath); + e.printStackTrace(); + } + } + } + + @SuppressWarnings("finally") + private boolean fileSaveToS3() { + final String bucketName = s3SinkConfig.getBucketName(); + final String path = s3SinkConfig.getKeyPathPrefix(); + final String key = path + "/" + localFileName; + boolean isFileSaveToS3 = Boolean.FALSE; + try { + S3Client client = s3SinkService.getS3Client(); + PutObjectRequest request = PutObjectRequest.builder().bucket(bucketName).key(key).acl("public-read") + .build(); + client.putObject(request, RequestBody.fromFile(new File(fileAbsolutePath))); + S3Waiter waiter = client.waiter(); + HeadObjectRequest requestWait = HeadObjectRequest.builder().bucket(bucketName).key(key).build(); + WaiterResponse waiterResponse = waiter.waitUntilObjectExists(requestWait); + Optional response = waiterResponse.matched().response(); + isFileSaveToS3 = response.isPresent(); + } catch (AwsServiceException | SdkClientException e) { + LOG.error("Amazon s3 client Exception : ", e); + } finally { + if (isFileSaveToS3) { + LOG.info("File {} was uploaded..... Success !!", localFileName); + } else { + LOG.info("File {} was uploaded..... Failed !!", localFileName); + } + return isFileSaveToS3; + } + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/MultiPartOutputStream.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/MultiPartOutputStream.java new file mode 100644 index 0000000000..3ceb04fd99 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/MultiPartOutputStream.java @@ -0,0 +1,186 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.stream; + +import java.io.OutputStream; +import java.util.concurrent.BlockingQueue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + + +/** + * An {@code OutputStream} which packages data written to it into discrete {@link StreamPart}s which can be obtained + * in a separate thread via iteration and uploaded to S3. + *

+ * A single {@code MultiPartOutputStream} is allocated a range of part numbers it can assign to the {@code StreamPart}s + * it produces, which is determined at construction. + *

+ * It's essential to call + * {@link MultiPartOutputStream#close()} when finished so that it can create the final {@code StreamPart} and consumers + * can finish. + *

+ * Writing to the stream may lead to trying to place a completed part on a queue, + * which will block if the queue is full and may lead to an {@code InterruptedException}. + */ +public class MultiPartOutputStream extends OutputStream { + + private static final Logger log = LoggerFactory.getLogger(MultiPartOutputStream.class); + + public static final int KB = 1024; + /** Megabytes */ + public static final int MB = 1024 * KB; + + private ConvertibleOutputStream currentStream; + + public static final int S3_MIN_PART_SIZE = 5 * MB; + private static final int STREAM_EXTRA_ROOM = MB; + + private BlockingQueue queue; + + private final int partNumberStart; + private final int partNumberEnd; + private final int partSize; + private int currentPartNumber; + + /** + * Creates a new stream that will produce parts of the given size with part numbers in the given range. + * + * @param partNumberStart the part number of the first part the stream will produce. Minimum 1. + * @param partNumberEnd 1 more than the last part number that the parts are allowed to have. Maximum 10 001. + * @param partSize the minimum size in bytes of parts to be produced. + * @param queue where stream parts are put on production. + */ + MultiPartOutputStream(int partNumberStart, int partNumberEnd, int partSize, BlockingQueue queue) { + if (partNumberStart < 1) { + throw new IndexOutOfBoundsException("The lowest allowed part number is 1. The value given was " + partNumberStart); + } + if (partNumberEnd > 10001) { + throw new IndexOutOfBoundsException( + "The highest allowed part number is 10 000, so partNumberEnd must be at most 10 001. The value given was " + partNumberEnd); + } + if (partNumberEnd <= partNumberStart) { + throw new IndexOutOfBoundsException( + String.format("The part number end (%d) must be greater than the part number start (%d).", partNumberEnd, partNumberStart)); + } + if (partSize < S3_MIN_PART_SIZE) { + throw new IllegalArgumentException(String.format( + "The given part size (%d) is less than 5 MB.", partSize)); + } + + this.partNumberStart = partNumberStart; + this.partNumberEnd = partNumberEnd; + this.queue = queue; + this.partSize = partSize; + + log.debug("Creating {}", this); + + currentPartNumber = partNumberStart; + currentStream = new ConvertibleOutputStream(getStreamAllocatedSize()); + } + + /** + * Returns the initial capacity in bytes of the {@code ByteArrayOutputStream} that a part uses. + */ + private int getStreamAllocatedSize() { + /* + This consists of the size that the user asks for, the extra 5 MB to avoid small parts (see the comment in + checkSize()), and some extra space to make resizing and copying unlikely. + */ + return partSize + S3_MIN_PART_SIZE + STREAM_EXTRA_ROOM; + } + + /** + * Checks if the stream currently contains enough data to create a new part. + */ + private void checkSize() { + /* + This class avoids producing parts < 5 MB if possible by only producing a part when it has an extra 5 MB to spare + for the next part. For example, suppose the following. A stream is producing parts of 10 MB. Someone writes + 10 MB and then calls this method, and then writes only 3 MB more before closing. If the initial 10 MB were + immediately packaged into a StreamPart and a new ConvertibleOutputStream was started on for the rest, it would + end up producing a part with just 3 MB. So instead the class waits until it contains 15 MB of data and then it + splits the stream into two: one with 10 MB that gets produced as a part, and one with 5 MB that it continues with. + In this way users of the class are less likely to encounter parts < 5 MB which cause trouble: see the caveat + on order in the StreamTransferManager and the way it handles these small parts, referred to as 'leftover'. + Such parts are only produced when the user closes a stream that never had more than 5 MB written to it. + */ + if (currentStream == null) { + throw new IllegalStateException("The stream is closed and cannot be written to."); + } + if (currentStream.size() > partSize + S3_MIN_PART_SIZE) { + ConvertibleOutputStream newStream = currentStream.split( + currentStream.size() - S3_MIN_PART_SIZE, + getStreamAllocatedSize()); + putCurrentStream(); + currentStream = newStream; + } + } + + private void putCurrentStream() { + if (currentStream.size() == 0) { + return; + } + if (currentPartNumber >= partNumberEnd) { + throw new IndexOutOfBoundsException( + String.format("This stream was allocated the part numbers from %d (inclusive) to %d (exclusive)" + + "and it has gone beyond the end.", + partNumberStart, partNumberEnd)); + } + StreamPart streamPart = new StreamPart(currentStream, currentPartNumber++); + log.debug("Putting {} on queue", streamPart); + try { + queue.put(streamPart); + } catch (InterruptedException e) { + throw Utils.runtimeInterruptedException(e); + } + } + + @Override + public void write(int b) { + currentStream.write(b); + checkSize(); + } + + @Override + public void write(byte b[], int off, int len) { + currentStream.write(b, off, len); + checkSize(); + } + + @Override + public void write(byte b[]) { + write(b, 0, b.length); + checkSize(); + } + + /** + * Packages any remaining data into a {@link StreamPart} and signals to the {@code StreamTransferManager} that there are no more parts + * afterwards. You cannot write to the stream after it has been closed. + */ + @Override + public void close() { + //log.info("Called close() on {}", this); + if (currentStream == null) { + log.warn("{} is already closed", this); + return; + } + try { + putCurrentStream(); + log.debug("Placing poison pill on queue for {}", this); + queue.put(StreamPart.POISON); + } catch (InterruptedException e) { + log.error("Interrupted while closing {}", this); + throw Utils.runtimeInterruptedException(e); + } + currentStream = null; + } + + @Override + public String toString() { + return String.format("[MultipartOutputStream for parts %d - %d]", partNumberStart, partNumberEnd - 1); + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/StreamPart.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/StreamPart.java new file mode 100644 index 0000000000..45e231565f --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/StreamPart.java @@ -0,0 +1,56 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.stream; + +import java.io.InputStream; +import java.util.Base64; + +/** + * A simple class which holds some data which can be uploaded to S3 as part of a multipart upload and a part number + * identifying it. + */ +class StreamPart { + + private ConvertibleOutputStream stream; + private int partNumber; + + /** + * A 'poison pill' placed on the queue to indicate that there are no further parts from a stream. + */ + static final StreamPart POISON = new StreamPart(null, -1); + + public StreamPart(ConvertibleOutputStream stream, int partNumber) { + this.stream = stream; + this.partNumber = partNumber; + } + + public int getPartNumber() { + return partNumber; + } + + public ConvertibleOutputStream getOutputStream() { + return stream; + } + + public InputStream getInputStream() { + return stream.toInputStream(); + } + + public long size() { + return stream.size(); + } + + public String getMD5Digest() { + return Base64.getEncoder().encodeToString(stream.getMD5Digest()); + } + + @Override + public String toString() { + return String.format("[Part number %d %s]", partNumber, + stream == null ? + "with null stream" : + String.format("containing %.2f MB", size() / (1024 * 1024.0))); + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/StreamTransferManager.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/StreamTransferManager.java new file mode 100644 index 0000000000..d84c48285a --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/StreamTransferManager.java @@ -0,0 +1,372 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.stream; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload; +import software.amazon.awssdk.services.s3.model.CompletedPart; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.ListMultipartUploadsRequest; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.UploadPartRequest; +import software.amazon.awssdk.services.s3.model.UploadPartResponse; + +public class StreamTransferManager { + + private static final Logger log = LoggerFactory.getLogger(StreamTransferManager.class); + /** Kilobytes */ + public static final int KB = 1024; + /** Megabytes */ + public static final int MB = 1024 * KB; + + protected final String bucketName; + protected final String putKey; + protected final String path; + protected final S3Client s3Client; + protected String uploadId; + protected int numStreams = 1; + protected int numUploadThreads = 1; + protected int queueCapacity = 1; + protected int partSize = 5 * MB; + protected boolean checkIntegrity = false; + private final List partETags = Collections.synchronizedList(new ArrayList()); + private final List partETagss = Collections.synchronizedList(new ArrayList()); + private List multiPartOutputStreams; + private ExecutorServiceResultsHandler executorServiceResultsHandler; + private ClosableQueue queue; + private int finishedCount = 0; + private StreamPart leftoverStreamPart = null; + private final Object leftoverStreamPartLock = new Object(); + private boolean isAborting = false; + private static final int MAX_PART_NUMBER = 10000; + + public StreamTransferManager(String bucketName, String putKey, S3Client s3Client, String path) { + this.bucketName = bucketName; + this.putKey = putKey; + this.s3Client = s3Client; + this.path = path; + } + + public StreamTransferManager numStreams(int numStreams) { + ensureCanSet(); + if (numStreams < 1) { + throw new IllegalArgumentException("There must be at least one stream"); + } + this.numStreams = numStreams; + return this; + } + + public StreamTransferManager numUploadThreads(int numUploadThreads) { + ensureCanSet(); + if (numUploadThreads < 1) { + throw new IllegalArgumentException("There must be at least one upload thread"); + } + this.numUploadThreads = numUploadThreads; + return this; + } + + public StreamTransferManager queueCapacity(int queueCapacity) { + ensureCanSet(); + if (queueCapacity < 1) { + throw new IllegalArgumentException("The queue capacity must be at least 1"); + } + this.queueCapacity = queueCapacity; + return this; + } + + public StreamTransferManager partSize(long partSize) { + ensureCanSet(); + partSize *= MB; + if (partSize < MultiPartOutputStream.S3_MIN_PART_SIZE) { + throw new IllegalArgumentException(String.format("The given part size (%d) is less than 5 MB.", partSize)); + } + if (partSize > Integer.MAX_VALUE) { + throw new IllegalArgumentException(String + .format("The given part size (%d) is too large as it does not fit in a 32 bit int", partSize)); + } + this.partSize = (int) partSize; + return this; + } + + public StreamTransferManager checkIntegrity(boolean checkIntegrity) { + ensureCanSet(); + if (checkIntegrity) { + Utils.md5(); // check that algorithm is available + } + this.checkIntegrity = checkIntegrity; + return this; + } + + private void ensureCanSet() { + if (queue != null) { + abort(); + throw new IllegalStateException("Setters cannot be called after getMultiPartOutputStreams"); + } + + } + + public List getMultiPartOutputStreams() { + if (multiPartOutputStreams != null) { + return multiPartOutputStreams; + } + + queue = new ClosableQueue(queueCapacity); + log.debug("Initiating multipart upload to {}/{}", bucketName, putKey); + + CreateMultipartUploadRequest createMultipartUploadRequest = CreateMultipartUploadRequest.builder() + .bucket(bucketName).key(putKey).build(); + + CreateMultipartUploadResponse response = s3Client.createMultipartUpload(createMultipartUploadRequest); + uploadId = response.uploadId(); + + log.info("Initiated multipart upload to {}/{} with full ID {}", bucketName, putKey, uploadId); + try { + multiPartOutputStreams = new ArrayList(); + ExecutorService threadPool = Executors.newFixedThreadPool(numUploadThreads); + + int partNumberStart = 1; + + for (int i = 0; i < numStreams; i++) { + int partNumberEnd = (i + 1) * MAX_PART_NUMBER / numStreams + 1; + MultiPartOutputStream multiPartOutputStream = new MultiPartOutputStream(partNumberStart, partNumberEnd, + partSize, queue); + partNumberStart = partNumberEnd; + multiPartOutputStreams.add(multiPartOutputStream); + } + + executorServiceResultsHandler = new ExecutorServiceResultsHandler(threadPool); + for (int i = 0; i < numUploadThreads; i++) { + executorServiceResultsHandler.submit(new UploadTask()); + } + executorServiceResultsHandler.finishedSubmitting(); + } catch (Exception e) { + throw abort(e); + } + + return multiPartOutputStreams; + } + + //TODO needs to refactor this code + @SuppressWarnings("static-access") + public void complete() { + try { + log.debug("{}: Waiting for pool termination", this); + executorServiceResultsHandler.awaitCompletion(); + log.debug("{}: Pool terminated", this); + if (leftoverStreamPart != null) { + log.info("{}: Uploading leftover stream {}", this, leftoverStreamPart); + uploadStreamPart(leftoverStreamPart); + log.debug("{}: Leftover uploaded", this); + } + log.debug("{}: Completing", this); + + CompletedMultipartUpload completedMultipartUpload = null; + + if(partETags.size() < partSize) { + switch (partETags.size()) { + case 1: + completedMultipartUpload = CompletedMultipartUpload.builder().parts(partETags.get(0)).build(); + break; + case 2: + completedMultipartUpload = CompletedMultipartUpload.builder().parts(partETags.get(0), partETags.get(1)).build(); + break; + case 3: + completedMultipartUpload = CompletedMultipartUpload.builder().parts(partETags.get(0), partETags.get(1), partETags.get(2)).build(); + break; + case 4: + completedMultipartUpload = CompletedMultipartUpload.builder().parts(partETags.get(0), partETags.get(1), partETags.get(2), partETags.get(3)).build(); + break; + case 5: + completedMultipartUpload = CompletedMultipartUpload.builder().parts(partETags.get(0), partETags.get(1), partETags.get(2), partETags.get(3), partETags.get(4)).build(); + break; + default: + log.error("Part should not be null or empty"); + break; + }} + + //CompletedMultipartUpload completedMultipartUpload = CompletedMultipartUpload.builder().parts(part).build(); + + CompleteMultipartUploadRequest completeMultipartUploadRequest = CompleteMultipartUploadRequest.builder() + .bucket(bucketName).key(putKey).uploadId(uploadId).multipartUpload(completedMultipartUpload) + .build(); + + CompleteMultipartUploadResponse completeMultipartUploadResult = s3Client + .completeMultipartUpload(completeMultipartUploadRequest); + + log.info("{}: Completed", this); + } catch (IntegrityCheckException e) { + // Nothing to abort. Upload has already finished. + throw e; + } catch (Exception e) { + throw abort(e); + } + } + + /** + * Aborts the upload and rethrows the argument, wrapped in a RuntimeException if + * necessary. Write {@code throw abort(e)} to make it clear to the compiler and + * readers that the code stops here. + */ + public RuntimeException abort(Throwable t) { + if (!isAborting) { + log.error("Aborting {} due to error: {}", this, t.toString()); + } + abort(); + if (t instanceof Error) { + throw (Error) t; + + } else if (t instanceof RuntimeException) { + throw (RuntimeException) t; + + } else if (t instanceof InterruptedException) { + throw Utils.runtimeInterruptedException((InterruptedException) t); + + } else { + throw new RuntimeException(t); + } + } + + /** + * Aborts the upload. Repeated calls have no effect. + */ + public void abort() { + synchronized (this) { + if (isAborting) { + return; + } + isAborting = true; + } + if (executorServiceResultsHandler != null) { + executorServiceResultsHandler.abort(); + } + if (queue != null) { + queue.close(); + } + if (uploadId != null) { + log.debug("{}: Aborting", this); + /* + * AbortMultipartUploadRequest abortMultipartUploadRequest = new + * AbortMultipartUploadRequest(bucketName, putKey, uploadId); + */ + + AbortMultipartUploadRequest abortMultipartUploadRequest = AbortMultipartUploadRequest.builder() + .bucket(bucketName).key(putKey).uploadId(uploadId).build(); + + s3Client.abortMultipartUpload(abortMultipartUploadRequest); + log.info("{}: Aborted", this); + } + } + + private class UploadTask implements Callable { + + @Override + public Void call() { + try { + while (true) { + StreamPart part; + // noinspection SynchronizeOnNonFinalField + synchronized (queue) { + if (finishedCount < multiPartOutputStreams.size()) { + part = queue.take(); + if (part == StreamPart.POISON) { + finishedCount++; + continue; + } + } else { + break; + } + } + if (part.size() < MultiPartOutputStream.S3_MIN_PART_SIZE) { + /* + * Each stream does its best to avoid producing parts smaller than 5 MB, but if + * a user doesn't write that much data there's nothing that can be done. These + * are considered 'leftover' parts, and must be merged with other leftovers to + * try producing a part bigger than 5 MB which can be uploaded without problems. + * After the threads have completed there may be at most one leftover part + * remaining, which S3 can accept. It is uploaded in the complete() method. + */ + log.debug("{}: Received part {} < 5 MB that needs to be handled as 'leftover'", this, part); + StreamPart originalPart = part; + part = null; + synchronized (leftoverStreamPartLock) { + if (leftoverStreamPart == null) { + leftoverStreamPart = originalPart; + log.debug("{}: Created new leftover part {}", this, leftoverStreamPart); + } else { + /* + * Try to preserve order within the data by appending the part with the higher + * number to the part with the lower number. This is not meant to produce a + * perfect solution: if the client is producing multiple leftover parts all bets + * are off on order. + */ + if (leftoverStreamPart.getPartNumber() > originalPart.getPartNumber()) { + StreamPart temp = originalPart; + originalPart = leftoverStreamPart; + leftoverStreamPart = temp; + } + leftoverStreamPart.getOutputStream().append(originalPart.getOutputStream()); + log.debug("{}: Merged with existing leftover part to create {}", this, + leftoverStreamPart); + if (leftoverStreamPart.size() >= MultiPartOutputStream.S3_MIN_PART_SIZE) { + log.debug("{}: Leftover part can now be uploaded as normal and reset", this); + part = leftoverStreamPart; + leftoverStreamPart = null; + } + } + } + } + if (part != null) { + uploadStreamPart(part); + } + } + } catch (Exception t) { + throw abort(t); + } + + return null; + } + + } + + private void uploadStreamPart(StreamPart streamPart) { + log.debug("{}: Uploading {}", this, streamPart); + + UploadPartRequest uploadRequest = UploadPartRequest.builder() + .bucket(bucketName) + .key(putKey) + .uploadId(uploadId) + .partNumber(streamPart.getPartNumber()).build(); + + UploadPartResponse response = s3Client.uploadPart(uploadRequest, RequestBody + .fromInputStream(streamPart.getInputStream(), streamPart.size())); + String eTag = response.eTag(); + CompletedPart part = CompletedPart.builder().partNumber(streamPart.getPartNumber()).eTag(eTag).build(); + + partETags.add(part); + + log.info("{}: Finished uploading {}", this, streamPart); + } + + @Override + public String toString() { + return String.format("[Manager uploading to %s/%s with id %s]", bucketName, putKey, + Utils.skipMiddle(String.valueOf(uploadId), 21)); + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/Utils.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/Utils.java new file mode 100644 index 0000000000..7cfbcba90e --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/Utils.java @@ -0,0 +1,55 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.stream; + +import java.lang.InterruptedException; +import java.lang.RuntimeException; +import java.lang.Thread; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; + +/** + * Miscellaneous useful functions. + */ +class Utils { + + /** + * Lets you avoid dealing with {@code InterruptedException} as a checked exception and ensures + * that the interrupted status of the thread is not lost. + * Write {@code throw runtimeInterruptedException(e)} to make it clear to the + * compiler and readers that the code stops here. + */ + public static RuntimeException runtimeInterruptedException(InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + + /** + * Shortens the given string to the given length by replacing the middle with ..., + * unless the string is already short enough or almost short enough in which case it is returned unmodified. + */ + public static String skipMiddle(String string, int length) { + int inputLength = string.length(); + if (inputLength < length * 1.1) { + return string; + } + int sideLength = (length - 3) / 2; + StringBuilder builder = new StringBuilder(length); + builder.append(string, 0, sideLength); + builder.append("..."); + builder.append(string, inputLength - sideLength, inputLength); + return builder.toString(); + } + + public static MessageDigest md5() { + try { + MessageDigest md = MessageDigest.getInstance("MD5"); + md.reset(); + return md; + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/resources/pipelines.yaml b/data-prepper-plugins/s3-sink/src/main/resources/pipelines.yaml new file mode 100644 index 0000000000..6e51f4a11d --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/resources/pipelines.yaml @@ -0,0 +1,21 @@ +simple-sample-pipeline: + workers: 4 + delay: "5000" + source: + random: + sink: + - s3: + aws: + region: us-east-1 + sts_role_arn: arn:aws:iam::701869769844 :role/s3-full-access + bucket: dataprepper + key_path_prefix: logdata + object: + file_pattern: logs-${yyyy-MM-dd} + threshold: + event_count: 200 + byte_capacity: 2500 + event_collection_duration: 20 + codec: + json: + temporary_storage: local_file \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 402748b71d..a3c749d922 100644 --- a/settings.gradle +++ b/settings.gradle @@ -10,8 +10,7 @@ pluginManagement { } } -//rootProject.name = 'opensearch-data-prepper' -rootProject.name = 'data-prepper' +rootProject.name = 'opensearch-data-prepper' dependencyResolutionManagement { versionCatalogs { @@ -93,5 +92,4 @@ include 'release:maven' include 'e2e-test:peerforwarder' include 'rss-source' include 'data-prepper-plugins:s3-sink' -findProject(':data-prepper-plugins:s3-sink')?.name = 's3-sink'