diff --git a/data-prepper-plugins/s3-sink/README.md b/data-prepper-plugins/s3-sink/README.md new file mode 100644 index 0000000000..1fbdefe20a --- /dev/null +++ b/data-prepper-plugins/s3-sink/README.md @@ -0,0 +1,9 @@ +Attached simple-sample-pipeline: opensearch-data-prepper\data-prepper\data-prepper-plugins\s3-sink\src\main\resources\pipelines.yaml + +Functional Requirements +1 Provide a mechanism to received events from buffer then process and write to s3. +2 Codecs encode the events into the desired format based on the configuration. +3 Flush the encoded events into s3 bucket as objects. +4 Object name based on the key-pattern. +5 Object length depends on the thresholds provided in the configuration. +6 The Thresholds such as events count, bytes capacity and data collection duration. diff --git a/data-prepper-plugins/s3-sink/build.gradle b/data-prepper-plugins/s3-sink/build.gradle new file mode 100644 index 0000000000..acfa7bfff6 --- /dev/null +++ b/data-prepper-plugins/s3-sink/build.gradle @@ -0,0 +1,67 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +plugins { + id 'java' +} + +repositories { + mavenCentral() +} + +dependencies { + implementation project(':data-prepper-api') + implementation 'io.micrometer:micrometer-core' + implementation 'software.amazon.awssdk:s3' + implementation 'software.amazon.awssdk:sts' + implementation 'software.amazon.awssdk:sqs' + implementation 'com.fasterxml.jackson.core:jackson-core' + implementation 'com.fasterxml.jackson.core:jackson-databind' + implementation 'org.apache.commons:commons-compress:1.21' + implementation 'joda-time:joda-time:2.11.1' + implementation 'org.hibernate.validator:hibernate-validator:7.0.5.Final' + implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv' + implementation 'software.amazon.awssdk:aws-sdk-java:2.17.148' + implementation 'org.mapdb:mapdb:3.0.8' + testImplementation 'org.apache.commons:commons-lang3:3.12.0' + testImplementation project(':data-prepper-test-common') +} + +test { + useJUnitPlatform() +} + +sourceSets { + integrationTest { + java { + compileClasspath += main.output + test.output + runtimeClasspath += main.output + test.output + srcDir file('src/integrationTest/java') + } + resources.srcDir file('src/integrationTest/resources') + } +} + +configurations { + integrationTestImplementation.extendsFrom testImplementation + integrationTestRuntime.extendsFrom testRuntime +} + +task integrationTest(type: Test) { + group = 'verification' + testClassesDirs = sourceSets.integrationTest.output.classesDirs + + useJUnitPlatform() + + classpath = sourceSets.integrationTest.runtimeClasspath + systemProperty 'tests.s3source.bucket', System.getProperty('tests.s3source.bucket') + systemProperty 'tests.s3source.region', System.getProperty('tests.s3source.region') + systemProperty 'tests.s3source.queue.url', System.getProperty('tests.s3source.queue.url') + + filter { + includeTestsMatching '*IT' + } +} + diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3ObjectIndex.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3ObjectIndex.java new file mode 100644 index 0000000000..31a173a621 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3ObjectIndex.java @@ -0,0 +1,111 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Set; +import java.util.TimeZone; +import java.util.UUID; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Reference to an S3 object key Index patterns. + */ + +public class S3ObjectIndex { + + private static final String TIME_PATTERN_STARTING_SYMBOLS = "\\${"; + + //For matching a string that begins with a "${" and ends with a "}". + //For a string like "data-prepper-${yyyy-MM-dd}", "${yyyy-MM-dd}" is matched. + private static final String TIME_PATTERN_REGULAR_EXPRESSION = "\\$\\{.*?\\}"; + + //For matching a string enclosed by "%{" and "}". + //For a string like "data-prepper-${yyyy-MM}", "yyyy-MM" is matched. + private static final String TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION = "\\$\\{(.*?)\\}"; + + private static final ZoneId UTC_ZONE_ID = ZoneId.of(TimeZone.getTimeZone("UTC").getID()); + + S3ObjectIndex() { } + + /* + Create Index with date,time with UniqueID prepended. + */ + public static String getIndexAliasWithDate(final String indexAlias) { + DateTimeFormatter dateFormatter = getDatePatternFormatter(indexAlias); + String suffix = (dateFormatter != null) ? dateFormatter.format(getCurrentUtcTime()) : ""; + return indexAlias.replaceAll(TIME_PATTERN_REGULAR_EXPRESSION, "") + suffix + UUID.randomUUID(); + } + + /* + Validate the index with the regular expression pattern. Throws exception if validation fails + */ + public static DateTimeFormatter getDatePatternFormatter(final String indexAlias) { + final Pattern pattern = Pattern.compile(TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION); + final Matcher timePatternMatcher = pattern.matcher(indexAlias); + if (timePatternMatcher.find()) { + final String timePattern = timePatternMatcher.group(1); + if (timePatternMatcher.find()) { // check if there is a one more match. + throw new IllegalArgumentException("An index only allows one date-time pattern."); + } + if(timePattern.contains(TIME_PATTERN_STARTING_SYMBOLS)){ //check if it is a nested pattern such as "data-prepper-%{%{yyyy.MM.dd}}" + throw new IllegalArgumentException("An index doesn't allow nested date-time patterns."); + } + validateTimePatternIsAtTheEnd(indexAlias, timePattern); + validateNoSpecialCharsInTimePattern(timePattern); + validateTimePatternGranularity(timePattern); + return DateTimeFormatter.ofPattern(timePattern); + } + return null; + } + + /* + Data Prepper only allows time pattern as a suffix. + */ + private static void validateTimePatternIsAtTheEnd(final String indexAlias, final String timePattern) { + if (!indexAlias.endsWith(timePattern + "}")) { + throw new IllegalArgumentException("Time pattern can only be a suffix of an index."); + } + } + + /* + * Special characters can cause failures in creating indexes. + * */ + private static final Set INVALID_CHARS = Set.of('#', '\\', '/', '*', '?', '"', '<', '>', '|', ',', ':'); + public static void validateNoSpecialCharsInTimePattern(String timePattern) { + boolean containsInvalidCharacter = timePattern.chars() + .mapToObj(c -> (char) c) + .anyMatch(character -> INVALID_CHARS.contains(character)); + if (containsInvalidCharacter) { + throw new IllegalArgumentException("Index time pattern contains one or multiple special characters: " + INVALID_CHARS); + } + } + + /* + * Validates the time pattern, support creating indexes with time patterns that are too granular hour, minute and second + */ + private static final Set UNSUPPORTED_TIME_GRANULARITY_CHARS = Set.of('A', 'n', 'N'); + public static void validateTimePatternGranularity(String timePattern) { + boolean containsUnsupportedTimeSymbol = timePattern.chars() + .mapToObj(c -> (char) c) + .anyMatch(character -> UNSUPPORTED_TIME_GRANULARITY_CHARS.contains(character)); + if (containsUnsupportedTimeSymbol) { + throw new IllegalArgumentException("Index time pattern contains time patterns that are less than one hour: " + + UNSUPPORTED_TIME_GRANULARITY_CHARS); + } + } + + /* + Returns the current UTC Date and Time + */ + public static ZonedDateTime getCurrentUtcTime() { + return LocalDateTime.now().atZone(ZoneId.systemDefault()).withZoneSameInstant(UTC_ZONE_ID); + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java index 6c643268e9..70ea4a4dd7 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java @@ -1,48 +1,116 @@ -/** - * +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 */ package org.opensearch.dataprepper.plugins.sink; import java.util.Collection; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.AbstractSink; import org.opensearch.dataprepper.model.sink.Sink; +import org.opensearch.dataprepper.plugins.sink.codec.Codec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * Implementation class of s3-sink plugin + * + */ @DataPrepperPlugin(name = "s3", pluginType = Sink.class, pluginConfigurationType = S3SinkConfig.class) -public class S3Sink implements Sink> { - +public class S3Sink extends AbstractSink> { + private static final Logger LOG = LoggerFactory.getLogger(S3Sink.class); + private static final int EVENT_QUEUE_SIZE = 100000; + private final S3SinkConfig s3SinkConfig; + private volatile boolean initialized; + private static BlockingQueue eventQueue; + private static boolean isStopRequested; - private final String outputS3Path; - private static final String SAMPLE_S3_PATH = "src/resources/"; - public static final String PATH = "path"; - - + private final Codec codec; + private final ObjectMapper objectMapper = new ObjectMapper(); + + /** + * + * @param pluginSetting + * @param s3SinkConfig + * @param pluginFactory + */ @DataPrepperPluginConstructor - public S3Sink(final S3SinkConfig s3SinkConfig, final PluginSetting pluginSetting) { + public S3Sink(PluginSetting pluginSetting, final S3SinkConfig s3SinkConfig, final PluginFactory pluginFactory) { + super(pluginSetting); this.s3SinkConfig = s3SinkConfig; - final String outputS3 = (String) pluginSetting.getAttributeFromSettings(PATH); - outputS3Path = outputS3 == null ? SAMPLE_S3_PATH : outputS3; - + final PluginModel codecConfiguration = s3SinkConfig.getCodec(); + final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings()); + codec = pluginFactory.loadPlugin(Codec.class, codecPluginSettings); + initialized = Boolean.FALSE; } @Override - public void output(Collection> records) { - - final S3SinkService s3SinkService = new S3SinkService(s3SinkConfig); + public boolean isReady() { + return initialized; + } + + @Override + public void doInitialize() { + try { + doInitializeInternal(); + } catch (InvalidPluginConfigurationException e) { + LOG.error("Failed to initialize S3-Sink."); + this.shutdown(); + throw new RuntimeException(e.getMessage(), e); + } catch (Exception e) { + LOG.warn("Failed to initialize S3-Sink, retrying. Error - {} \n {}", e.getMessage(), e.getCause()); + } + } + + private void doInitializeInternal() { + eventQueue = new ArrayBlockingQueue<>(EVENT_QUEUE_SIZE); + S3SinkService s3SinkService = new S3SinkService(s3SinkConfig); + S3SinkWorker worker = new S3SinkWorker(s3SinkService, s3SinkConfig, codec); + new Thread(worker).start(); + initialized = Boolean.TRUE; + } + + @Override + public void doOutput(final Collection> records) { + LOG.debug("Records size : {}", records.size()); + if (records.isEmpty()) { + return; + } + for (final Record recordData : records) { + + Event event = recordData.getData(); + getEventQueue().add(event); + + } } @Override public void shutdown() { - // TODO Auto-generated method stub - + super.shutdown(); + isStopRequested = Boolean.TRUE; + LOG.info("s3-sink sutdonwn completed"); } + public static BlockingQueue getEventQueue() { + return eventQueue; + } + + public static boolean isStopRequested() { + return isStopRequested; + } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java index fcf21d475e..718ce504b2 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java @@ -1,21 +1,96 @@ -package org.opensearch.dataprepper.plugins.sink; +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ -import org.opensearch.dataprepper.plugins.sink.configuration.SinkAwsAuthenticationOptions; +package org.opensearch.dataprepper.plugins.sink; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.plugins.sink.configuration.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.sink.configuration.ObjectOptions; +import org.opensearch.dataprepper.plugins.sink.configuration.ThresholdOptions; import com.fasterxml.jackson.annotation.JsonProperty; - import jakarta.validation.Valid; import jakarta.validation.constraints.NotNull; +/* + An implementation class of s3 sink configuration + */ public class S3SinkConfig { - @JsonProperty("sink_aws") + @JsonProperty("aws") + @NotNull + @Valid + private AwsAuthenticationOptions awsAuthenticationOptions; + + @JsonProperty("threshold") + @NotNull + private ThresholdOptions thresholdOptions; + + @JsonProperty("object") + @NotNull + private ObjectOptions objectOptions; + + @JsonProperty("codec") @NotNull - @Valid - private SinkAwsAuthenticationOptions sinkAwsAuthenticationOptions; + private PluginModel codec; - public SinkAwsAuthenticationOptions getSinkAwsAuthenticationOptions() { - return sinkAwsAuthenticationOptions; - } + @JsonProperty("temporary_storage") + @NotNull + private String temporaryStorage; + + @JsonProperty("bucket") + @NotNull + private String bucketName; + @JsonProperty("key_path_prefix") + @NotNull + private String keyPathPrefix; + + /* + Aws Authentication configuration Options + */ + public AwsAuthenticationOptions getAwsAuthenticationOptions() { + return awsAuthenticationOptions; + } + + /* + Threshold configuration Options + */ + public ThresholdOptions getThresholdOptions() { + return thresholdOptions; + } + + /* + s3 index configuration Options + */ + public ObjectOptions getObjectOptions() { + return objectOptions; + } + + /* + sink codec configuration Options + */ + public PluginModel getCodec() { return codec; } + + /* + s3 index path configuration Option + */ + public String getKeyPathPrefix() { + return keyPathPrefix; + } + + /* + s3 bucket name configuration Option + */ + public String getBucketName() { + return bucketName; + } + + /* + Temporary storage location configuration Options + */ + public String getTemporaryStorage() { + return temporaryStorage; + } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java index 77b7e77c11..711a03d621 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java @@ -1,3 +1,7 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ package org.opensearch.dataprepper.plugins.sink; import org.slf4j.Logger; @@ -19,15 +23,20 @@ public class S3SinkService { this.s3Client = createS3Client(); } + S3Client createS3Client() { LOG.info("Creating S3 client"); return S3Client.builder() - .region(s3SinkConfig.getSinkAwsAuthenticationOptions().getAwsRegion()) - .credentialsProvider(s3SinkConfig.getSinkAwsAuthenticationOptions().authenticateAwsConfiguration()) + .region(s3SinkConfig.getAwsAuthenticationOptions().getAwsRegion()) + .credentialsProvider(s3SinkConfig.getAwsAuthenticationOptions().authenticateAwsConfiguration()) .overrideConfiguration(ClientOverrideConfiguration.builder() .retryPolicy(RetryPolicy.builder().numRetries(5).build()) .build()) .build(); } + + public S3Client getS3Client() { + return s3Client; + } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java new file mode 100644 index 0000000000..e1597aa631 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java @@ -0,0 +1,177 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.NavigableSet; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.time.StopWatch; +import org.mapdb.DB; +import org.mapdb.DBMaker; +import org.mapdb.Serializer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.plugins.sink.codec.Codec; +import org.opensearch.dataprepper.plugins.sink.stream.InMemoryAccumulator; +import org.opensearch.dataprepper.plugins.sink.stream.LocalFileAccumulator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * In-order to process bulk records, records splits into numStreams & + * eventsPerChunk. numStreams & eventsPerChunk depends on numEvents provided by + * user in pipelines.yaml eventsPerChunk will be always 20, only numStreams will + * be vary based on numEvents. + * + * numEvents(event_count) must be always divided by 100 completely without any + * remnant. + * + * Ex. 1) if numEvents = 100 then numStreams = 2 and eventsPerChunk = 50 + * 2) if numEvents = 1000 then numStreams = 20 and eventsPerChunk = 50 + */ +public class S3SinkWorker implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(S3SinkWorker.class); + private static final float LOAD_FACTOR = 0.02f; + private static final String IN_MEMORY = "in_memory"; + private static final String LOCAL_FILE = "local_file"; + private final int numEvents; + private int numStreams; + private final int eventsPerChunk; + private final S3SinkService s3SinkService; + private final S3SinkConfig s3SinkConfig; + private final Codec codec; + + /** + * + * @param s3SinkService + * @param s3SinkConfig + */ + public S3SinkWorker(S3SinkService s3SinkService, S3SinkConfig s3SinkConfig, Codec codec) { + this.numEvents = s3SinkConfig.getThresholdOptions().getEeventCount(); + this.numStreams = (int) (numEvents * LOAD_FACTOR); + this.eventsPerChunk = numEvents / numStreams; + this.s3SinkService = s3SinkService; + this.s3SinkConfig = s3SinkConfig; + this.codec = codec; + } + + @Override + public void run() { + try { + while (!S3Sink.isStopRequested()) { + if (s3SinkConfig.getTemporaryStorage().equalsIgnoreCase(IN_MEMORY)) { + inMemmoryAccumulator(); + } else { + localFileAccumulator(); + } + } + } catch (Exception e) { + e.printStackTrace(); + LOG.error("Exception in S3SinkWorker : \n Error message {} \n Exception cause {}", e.getMessage(), + e.getCause(), e); + } + } + + /** + * Accumulates data from buffer and store into in memory + */ + public void inMemmoryAccumulator() { + HashSet inMemoryEventSet = null; + HashMap> inMemoryEventMap = null; + try { + StopWatch watch = new StopWatch(); + watch.start(); + int streamCount = 0; + int byteCount = 0; + int eventCount = 0; + long eventCollectionDuration = 0; + inMemoryEventMap = new HashMap<>(numStreams); + for (int stream = 0; stream < numStreams; stream++) { + inMemoryEventSet = new HashSet<>(eventsPerChunk); + boolean flag = Boolean.FALSE; + for (int data = 0; data < eventsPerChunk + && thresholdsCheck(eventCount, watch, byteCount); data++, eventCount++) { + Event event = S3Sink.getEventQueue().take(); + inMemoryEventSet.add(event); + byteCount += event.toJsonString().getBytes().length; + flag = Boolean.TRUE; + eventCollectionDuration = watch.getTime(TimeUnit.SECONDS); + } + if (flag) { + inMemoryEventMap.put(stream, inMemoryEventSet); + streamCount++; + } else { + // Once threshold reached then No more streaming required per snapshot, hence + // terminate the streaming(outer) loop + break; + } + } + + LOG.info( + "In-Memory snapshot info : Byte_count = {} Bytes \t Event_count = {} Records \t Event_collection_duration = {} sec & \t Number of stream {}", + byteCount, eventCount, eventCollectionDuration, streamCount); + + new InMemoryAccumulator(inMemoryEventMap, streamCount, s3SinkService, s3SinkConfig).doAccumulate(); + } catch (Exception e) { + LOG.error("Exception while storing recoreds into In-Memory", e); + } + } + + /** + * Accumulates data from buffer and store in local file + */ + public void localFileAccumulator() { + DB db = null; + NavigableSet localFileEventSet = null; + int byteCount = 0; + int eventCount = 0; + long eventCollectionDuration = 0; + try { + StopWatch watch = new StopWatch(); + watch.start(); + db = DBMaker.memoryDB().make(); + localFileEventSet = db.treeSet("mySet").serializer(Serializer.STRING).createOrOpen(); + for (int data = 0; thresholdsCheck(data, watch, byteCount); data++) { + String event = S3Sink.getEventQueue().take().toJsonString(); + byteCount += event.getBytes().length; + localFileEventSet.add(event); + eventCount++; + eventCollectionDuration = watch.getTime(TimeUnit.SECONDS); + } + db.commit(); + LOG.info( + "Local-File snapshot info : Byte_count = {} Bytes, \t Event_count = {} Records \n & Event_collection_duration = {} Sec", + byteCount, eventCount, eventCollectionDuration); + + new LocalFileAccumulator(localFileEventSet, s3SinkService, s3SinkConfig).doAccumulate(); + + } catch (Exception e) { + LOG.error("Exception while storing recoreds into Local-file", e); + } finally { + if (db !=null && !db.isClosed()) { + db.close(); + } + } + } + + /** + * Bunch of events based on thresholds set in the configuration. The Thresholds + * such as events count, bytes capacity and data collection duration. + * + * @param i + * @param watch + * @param byteCount + * @return + */ + private boolean thresholdsCheck(int eventCount, StopWatch watch, int byteCount) { + boolean flag = Boolean.FALSE; + flag = eventCount < numEvents + && watch.getTime(TimeUnit.SECONDS) < s3SinkConfig.getThresholdOptions().getEventCollectionDuration() + && byteCount < s3SinkConfig.getThresholdOptions().getByteCapacity(); + return flag; + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/SinkAccumulator.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/SinkAccumulator.java new file mode 100644 index 0000000000..cdd5f4a669 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/SinkAccumulator.java @@ -0,0 +1,10 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink; + +public interface SinkAccumulator { + + void doAccumulate(); +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/Codec.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/Codec.java new file mode 100644 index 0000000000..488adffb1d --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/Codec.java @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.codec; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collection; + +/** + * A codec parsing data through an output stream. Each implementation of this class should + * support parsing a specific type or format of data. See sub-classes for examples. + */ +public interface Codec { + /** + * Parses an {@link OutputStream}. Implementors should call the {@link Collection} for each + * {@link Record} loaded from the {@link OutputStream}. + * + * @param outputStream The output stream for the json data + * @param eventCollection The collection which holds record events + */ + void parse(OutputStream outputStream, Collection> eventCollection) throws IOException; + + void parse(OutputStream outputStream, Record eventCollection) throws IOException; + + void parse(OutputStream outputStream, Event event) throws IOException; +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodec.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodec.java new file mode 100644 index 0000000000..cb13088b39 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodec.java @@ -0,0 +1,66 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.codec; + +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.event.Event; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.opensearch.dataprepper.model.record.Record; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collection; +import java.util.Objects; + +/** + * An implementation of {@link Codec} which serializes to JSON. + */ +@DataPrepperPlugin(name = "json", pluginType = Codec.class) +public class JsonCodec implements Codec { + private final ObjectMapper objectMapper = new ObjectMapper(); + + /* + * Generates a serialized json string of the Events + */ + + @Override + public void parse(final OutputStream outputStream, final Collection> eventCollection) throws IOException { + Objects.requireNonNull(outputStream); + Objects.requireNonNull(eventCollection); + + StringBuilder recordEventData = new StringBuilder(); + for (final Record recordEvent : eventCollection) { + recordEventData.append(recordEvent.getData().toJsonString()); + + } + objectMapper.writeValue(outputStream, recordEventData.toString()); + } + + /* + * Generates a serialized json string of the Events + */ + @Override + public void parse(OutputStream outputStream, Record eventCollection) throws IOException + { + Objects.requireNonNull(outputStream); + Objects.requireNonNull(eventCollection); + + objectMapper.writeValue(outputStream, eventCollection.getData().toJsonString()); + + } + /* + * Generates a serialized json string of the Event + */ + @Override + public void parse(OutputStream outputStream, Event event) throws IOException + { + Objects.requireNonNull(outputStream); + Objects.requireNonNull(event); + + objectMapper.writeValue(outputStream, event.toJsonString()); + + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/SinkAwsAuthenticationOptions.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptions.java similarity index 61% rename from data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/SinkAwsAuthenticationOptions.java rename to data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptions.java index 334128bbc2..73d83d668d 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/SinkAwsAuthenticationOptions.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptions.java @@ -1,9 +1,11 @@ -package org.opensearch.dataprepper.plugins.sink.configuration; +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ -import java.util.UUID; +package org.opensearch.dataprepper.plugins.sink.configuration; import com.fasterxml.jackson.annotation.JsonProperty; - import jakarta.validation.constraints.Size; import software.amazon.awssdk.arns.Arn; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; @@ -13,7 +15,13 @@ import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; -public class SinkAwsAuthenticationOptions { +import java.util.Map; +import java.util.UUID; + +/* + An implementation class AWS Authentication configuration + */ +public class AwsAuthenticationOptions { @JsonProperty("region") @Size(min = 1, message = "Region cannot be empty string") private String awsRegion; @@ -21,11 +29,21 @@ public class SinkAwsAuthenticationOptions { @JsonProperty("sts_role_arn") @Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters") private String awsStsRoleArn; + + @JsonProperty("sts_header_overrides") + @Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override") + private Map awsStsHeaderOverrides; + /* + AWS Region configuration + */ public Region getAwsRegion() { return awsRegion != null ? Region.of(awsRegion) : null; } + /* + Aws Credentials Provider configuration + */ public AwsCredentialsProvider authenticateAwsConfiguration() { final AwsCredentialsProvider awsCredentialsProvider; @@ -39,15 +57,21 @@ public AwsCredentialsProvider authenticateAwsConfiguration() { final StsClient stsClient = StsClient.builder() .region(getAwsRegion()) .build(); - + + AssumeRoleRequest.Builder assumeRoleRequestBuilder = AssumeRoleRequest.builder() + .roleSessionName("S3-Sink-" + UUID.randomUUID()) + .roleArn(awsStsRoleArn); + + if(awsStsHeaderOverrides != null && !awsStsHeaderOverrides.isEmpty()) { + assumeRoleRequestBuilder = assumeRoleRequestBuilder + .overrideConfiguration(configuration -> awsStsHeaderOverrides.forEach(configuration::putHeader)); + } + awsCredentialsProvider = StsAssumeRoleCredentialsProvider.builder() .stsClient(stsClient) - .refreshRequest(AssumeRoleRequest.builder() - .roleSessionName("S3-Sink-" + UUID.randomUUID()) - .roleArn(awsStsRoleArn) - .build()) + .refreshRequest(assumeRoleRequestBuilder.build()) .build(); - + } else { // use default credential provider awsCredentialsProvider = DefaultCredentialsProvider.create(); @@ -55,4 +79,4 @@ public AwsCredentialsProvider authenticateAwsConfiguration() { return awsCredentialsProvider; } -} +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java new file mode 100644 index 0000000000..8501f70daa --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotNull; + +/* + An implementation class of Threshold configuration Options + */ +public class ObjectOptions { + private static final String DEFAULT_KEY_PATTERN = "logs-${YYYY-MM-DD hh:mm:ss}"; + + @JsonProperty("file_pattern") + @NotNull + private String filePattern = DEFAULT_KEY_PATTERN; + + /* + Read s3 object index file patten configuration + */ + public String getFilePattern() { + return filePattern; + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java new file mode 100644 index 0000000000..b50d1219f5 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java @@ -0,0 +1,50 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotNull; + +/* + An implementation class of s3 index configuration Options + */ +public class ThresholdOptions { + static final int DEFAULT_EVENT_COUNT = 1000; + private static final long DEFAULT_BYTE_CAPACITY = 5000000; + private static final long DEFAULT_TIMEOUT = 60; + + @JsonProperty("event_count") + @NotNull + private int eventCount = DEFAULT_EVENT_COUNT; + + @JsonProperty("byte_capacity") + @NotNull + private long byteCapacity = DEFAULT_BYTE_CAPACITY; + + @JsonProperty("event_collection_duration") + private long eventCollectionDuration = DEFAULT_TIMEOUT; + + /* + Read event collection duration configuration + */ + public long getEventCollectionDuration() { + return eventCollectionDuration; + } + + /* + Read byte capacity configuration + */ + public long getByteCapacity() { + return byteCapacity; + } + + /* + Read the event count configuration + */ + public int getEeventCount() { + return eventCount; + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/ClosableQueue.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/ClosableQueue.java new file mode 100644 index 0000000000..3c017ca350 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/ClosableQueue.java @@ -0,0 +1,31 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.stream; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; + +public class ClosableQueue extends ArrayBlockingQueue { + private volatile boolean closed = false; + + public ClosableQueue(int capacity) { + super(capacity); + } + + public void close() { + closed = true; + } + + @Override + public void put(T t) throws InterruptedException { + while (!offer(t, 1, TimeUnit.SECONDS)) { + if (closed) { + throw new IllegalStateException( + "The queue is now closed due to an error elsewhere" + ); + } + } + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/ConvertibleOutputStream.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/ConvertibleOutputStream.java new file mode 100644 index 0000000000..76dfdd71be --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/ConvertibleOutputStream.java @@ -0,0 +1,71 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.stream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.security.MessageDigest; + +/** + * A ByteArrayOutputStream with some useful additional functionality. + */ +class ConvertibleOutputStream extends ByteArrayOutputStream { + + private static final Logger log = LoggerFactory.getLogger(ConvertibleOutputStream.class); + + public ConvertibleOutputStream(int initialCapacity) { + super(initialCapacity); + } + + /** + * Creates an InputStream sharing the same underlying byte array, reducing memory usage and copying time. + */ + public InputStream toInputStream() { + return new ByteArrayInputStream(buf, 0, count); + } + + /** + * Truncates this stream to a given size and returns a new stream containing a copy of the remaining data. + * + * @param countToKeep number of bytes to keep in this stream, starting from the first written byte. + * @param initialCapacityForNewStream buffer capacity to construct the new stream (NOT the number of bytes + * that the new stream will take from this one) + * @return a new stream containing all the bytes previously contained in this one, i.e. from countToKeep + 1 onwards. + */ + public ConvertibleOutputStream split(int countToKeep, int initialCapacityForNewStream) { + int newCount = count - countToKeep; + log.debug("Splitting stream of size {} into parts with sizes {} and {}", count, countToKeep, newCount); + initialCapacityForNewStream = Math.max(initialCapacityForNewStream, newCount); + ConvertibleOutputStream newStream = new ConvertibleOutputStream(initialCapacityForNewStream); + newStream.write(buf, countToKeep, newCount); + count = countToKeep; + return newStream; + } + + /** + * Concatenates the given stream to this stream. + */ + public void append(ConvertibleOutputStream otherStream) { + try { + otherStream.writeTo(this); + } catch (IOException e) { + + // Should never happen because these are all ByteArrayOutputStreams + throw new AssertionError(e); + } + } + + public byte[] getMD5Digest() { + MessageDigest md = Utils.md5(); + md.update(buf, 0, count); + return md.digest(); + } + +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/ExecutorServiceResultsHandler.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/ExecutorServiceResultsHandler.java new file mode 100644 index 0000000000..40609ead9f --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/ExecutorServiceResultsHandler.java @@ -0,0 +1,103 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.stream; + +import java.util.Iterator; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Wrapper around an ExecutorService that allows you to easily submit {@link Callable}s, get results via iteration, + * and handle failure quickly. When a submitted callable throws an exception in its thread this + * will result in a {@code RuntimeException} when iterating over results. Typical usage is as follows: + *

+ *

    + *
  1. Create an ExecutorService and pass it to the constructor.
  2. + *
  3. Create Callables and ensure that they respond to interruption, e.g. regularly call:
    {@code
    + *     if (Thread.currentThread().isInterrupted()) {
    + *         throw new RuntimeException("The thread was interrupted, likely indicating failure in a sibling thread.");
    + *     }}
  4. + *
  5. Pass the callables to the {@code submit()} method.
  6. + *
  7. Call {@code finishedSubmitting()}.
  8. + *
  9. Iterate over this object (e.g. with a foreach loop) to get results from the callables. + * Each iteration will block waiting for the next result. + * If one of the callables throws an unhandled exception or the thread is interrupted during iteration + * then {@link ExecutorService#shutdownNow()} will be called resulting in all still running callables being interrupted, + * and a {@code RuntimeException} will be thrown
  10. + *
+ *

+ * You can also call {@code abort()} to shut down the threads yourself. + */ +public class ExecutorServiceResultsHandler implements Iterable { + + private ExecutorCompletionService completionService; + private ExecutorService executorService; + private AtomicInteger taskCount = new AtomicInteger(0); + + public ExecutorServiceResultsHandler(ExecutorService executorService) { + this.executorService = executorService; + completionService = new ExecutorCompletionService(executorService); + } + + public void submit(Callable task) { + completionService.submit(task); + taskCount.incrementAndGet(); + } + + public void finishedSubmitting() { + executorService.shutdown(); + } + + @Override + public Iterator iterator() { + return new Iterator() { + @Override + public boolean hasNext() { + return taskCount.getAndDecrement() > 0; + } + + @Override + public V next() { + Exception exception; + try { + return completionService.take().get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + exception = e; + } catch (ExecutionException e) { + exception = e; + } + abort(); + throw new RuntimeException(exception); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove"); + } + + }; + } + + public void abort() { + if (executorService != null) { + executorService.shutdownNow(); + } + } + + /** + * Convenience method to wait for the callables to finish for when you don't care about the results. + */ + public void awaitCompletion() { + //noinspection StatementWithEmptyBody + for (V ignored : this) { + // do nothing + } + } + +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/InMemoryAccumulator.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/InMemoryAccumulator.java new file mode 100644 index 0000000000..095a77ffd4 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/InMemoryAccumulator.java @@ -0,0 +1,130 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.stream; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.plugins.sink.S3ObjectIndex; +import org.opensearch.dataprepper.plugins.sink.S3SinkConfig; +import org.opensearch.dataprepper.plugins.sink.S3SinkService; +import org.opensearch.dataprepper.plugins.sink.SinkAccumulator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import software.amazon.awssdk.services.s3.S3Client; +/** + * Accumulates data from buffer and store into in memory + */ +public class InMemoryAccumulator implements SinkAccumulator { + + private static final Logger LOG = LoggerFactory.getLogger(InMemoryAccumulator.class); + final Map> inMemoryEventMap; + private final int numStreams; + private final S3SinkService s3SinkService; + private final S3SinkConfig s3SinkConfig; + private boolean retry = Boolean.FALSE; + private static final int MAX_RETRY = 3; + private static final int PART_SIZE_COUNT = 5; + + /** + * + * @param inMemoryEventMap + * @param numStreams + * @param s3SinkService + * @param s3SinkConfig + */ + public InMemoryAccumulator(final Map> inMemoryEventMap, final int numStreams, + final S3SinkService s3SinkService, final S3SinkConfig s3SinkConfig) { + this.inMemoryEventMap = inMemoryEventMap; + this.numStreams = numStreams; + this.s3SinkService = s3SinkService; + this.s3SinkConfig = s3SinkConfig; + } + + @Override + public void doAccumulate() { + + String bucket = s3SinkConfig.getBucketName(); + String path = s3SinkConfig.getKeyPathPrefix(); + String index = S3ObjectIndex.getIndexAliasWithDate(s3SinkConfig.getObjectOptions().getFilePattern()); + String key = path + "/" + index; + + S3Client client = s3SinkService.getS3Client(); + int retryCount = MAX_RETRY; + + LOG.info("S3-Sink will caeate Amazon S3 object : {}", key); + + do { + try { + // Setting up + final StreamTransferManager manager = new StreamTransferManager(bucket, key, client, path) + .numStreams(numStreams).numUploadThreads(2).queueCapacity(2).partSize(PART_SIZE_COUNT); + final List streams = manager.getMultiPartOutputStreams(); + + ExecutorService pool = Executors.newFixedThreadPool(numStreams); + for (int streamsInput = 0; streamsInput < numStreams; streamsInput++) { + final int streamIndex = streamsInput; + HashSet eventSet = inMemoryEventMap.get(streamsInput); + pool.submit(new Runnable() { + public void run() { + try { + MultiPartOutputStream outputStream = streams.get(streamIndex); + if (eventSet != null) { + for (Event event : eventSet) { + outputStream.write(event.toJsonString().getBytes()); + } + } + // The stream must be closed once all the data has been written + outputStream.close(); + } catch (Exception e) { + // Aborts all uploads + retry = Boolean.TRUE; + manager.abort(e); + LOG.error("Aborts all uploads = {}", manager, e); + } + } + }); + } + pool.shutdown(); + sleep(pool); + // Finishing off + manager.complete(); + + } catch (Exception e) { + retry = Boolean.TRUE; + LOG.error("Issue with the streaming recoreds via s3 client : \n Error message {} \n Exception cause {}", e.getMessage(), e.getCause(), e); + } + + if (retryCount == 0) { + retry = Boolean.FALSE; + LOG.warn("Maximum retry count 3 reached, Unable to store {} into Amazon S3", key); + } + if (retry) { + LOG.info("Retry : {}", (MAX_RETRY - --retryCount)); + sleep(null); + } + } while (retry); + } + + private void sleep(ExecutorService pool) { + try { + if (pool != null) { + pool.awaitTermination(5, TimeUnit.SECONDS); + } else { + Thread.sleep(5000); + } + + } catch (InterruptedException e) { + LOG.error("InterruptedException - \n Error message {} \n Exception cause {}", e.getMessage(), e.getCause()); + } + } + +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/IntegrityCheckException.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/IntegrityCheckException.java new file mode 100644 index 0000000000..ccaf095c7c --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/IntegrityCheckException.java @@ -0,0 +1,16 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.stream; + +/** + * Thrown when final integrity check fails. It suggests that the multipart upload failed + * due to data corruption. See {@link StreamTransferManager#checkIntegrity(boolean)} for details. + */ +public class IntegrityCheckException extends RuntimeException { + + public IntegrityCheckException(String message) { + super(message); + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/LocalFileAccumulator.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/LocalFileAccumulator.java new file mode 100644 index 0000000000..0f75b1f4ba --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/LocalFileAccumulator.java @@ -0,0 +1,137 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.stream; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.NavigableSet; +import java.util.Optional; + +import org.opensearch.dataprepper.plugins.sink.S3ObjectIndex; +import org.opensearch.dataprepper.plugins.sink.S3SinkConfig; +import org.opensearch.dataprepper.plugins.sink.S3SinkService; +import org.opensearch.dataprepper.plugins.sink.SinkAccumulator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.core.waiters.WaiterResponse; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; +import software.amazon.awssdk.services.s3.model.NoSuchUploadException; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.waiters.S3Waiter; + +/** + * + * @author de20436406 + * + */ +public class LocalFileAccumulator implements SinkAccumulator { + private static final Logger LOG = LoggerFactory.getLogger(LocalFileAccumulator.class); + private static final int MAX_RETRY = 3; + private final S3SinkService s3SinkService; + private final S3SinkConfig s3SinkConfig; + + private final NavigableSet localFileEventSet; + private String fileAbsolutePath = null; + private String localFileName = null; + + /** + * + * @param localFileEventSet + * @param s3SinkService + * @param s3SinkConfig + */ + public LocalFileAccumulator(final NavigableSet localFileEventSet, final S3SinkService s3SinkService, + final S3SinkConfig s3SinkConfig) { + this.localFileEventSet = localFileEventSet; + this.s3SinkService = s3SinkService; + this.s3SinkConfig = s3SinkConfig; + } + + @Override + public void doAccumulate() { + boolean retry = Boolean.FALSE; + localFileName = S3ObjectIndex.getIndexAliasWithDate(s3SinkConfig.getObjectOptions().getFilePattern()); + File file = new File(localFileName); + int retryCount = MAX_RETRY; + + try (BufferedWriter writer = new BufferedWriter(new FileWriter(localFileName))) { + for (String event : localFileEventSet) { + writer.write(event); + } + fileAbsolutePath = file.getAbsoluteFile().toString(); + writer.flush(); + LOG.info("data stored in local file {}", fileAbsolutePath); + do { + retry = !fileSaveToS3(); + if (retryCount == 0) { + retry = Boolean.FALSE; + LOG.warn("Maximum retry count 3 reached, Unable to store {} into Amazon S3", localFileName); + } + if (retry) { + LOG.info("Retry : {}", (MAX_RETRY - --retryCount)); + Thread.sleep(5000); + } + } while (retry); + } catch (IOException e) { + LOG.error("Events unable to save into local file : {}", localFileName, e); + } catch (Exception e) { + LOG.error("Unable to store {} into Amazon S3", localFileName, e); + } finally { + try { + boolean isLocalFileDeleted = Files.deleteIfExists(Paths.get(fileAbsolutePath)); + if (isLocalFileDeleted) { + LOG.info("Local file deleted successfully {}", fileAbsolutePath); + } else { + LOG.warn("Local file not deleted {}", fileAbsolutePath); + } + } catch (IOException e) { + LOG.error("Local file unable to deleted {}", fileAbsolutePath); + e.printStackTrace(); + } + } + } + + @SuppressWarnings("finally") + private boolean fileSaveToS3() { + final String bucketName = s3SinkConfig.getBucketName(); + final String path = s3SinkConfig.getKeyPathPrefix(); + final String key = path + "/" + localFileName; + boolean isFileSaveToS3 = Boolean.FALSE; + try { + S3Client client = s3SinkService.getS3Client(); + PutObjectRequest request = PutObjectRequest.builder().bucket(bucketName).key(key).acl("public-read") + .build(); + client.putObject(request, RequestBody.fromFile(new File(fileAbsolutePath))); + S3Waiter waiter = client.waiter(); + HeadObjectRequest requestWait = HeadObjectRequest.builder().bucket(bucketName).key(key).build(); + WaiterResponse waiterResponse = waiter.waitUntilObjectExists(requestWait); + Optional response = waiterResponse.matched().response(); + isFileSaveToS3 = response.isPresent(); + } catch (AwsServiceException | SdkClientException e) { + LOG.error("Amazon s3 client Exception : ", e); + } finally { + if (isFileSaveToS3) { + LOG.info("File {} was uploaded..... Success !!", localFileName); + } else { + LOG.info("File {} was uploaded..... Failed !!", localFileName); + } + return isFileSaveToS3; + } + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/MultiPartOutputStream.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/MultiPartOutputStream.java new file mode 100644 index 0000000000..3ceb04fd99 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/MultiPartOutputStream.java @@ -0,0 +1,186 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.stream; + +import java.io.OutputStream; +import java.util.concurrent.BlockingQueue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + + +/** + * An {@code OutputStream} which packages data written to it into discrete {@link StreamPart}s which can be obtained + * in a separate thread via iteration and uploaded to S3. + *

+ * A single {@code MultiPartOutputStream} is allocated a range of part numbers it can assign to the {@code StreamPart}s + * it produces, which is determined at construction. + *

+ * It's essential to call + * {@link MultiPartOutputStream#close()} when finished so that it can create the final {@code StreamPart} and consumers + * can finish. + *

+ * Writing to the stream may lead to trying to place a completed part on a queue, + * which will block if the queue is full and may lead to an {@code InterruptedException}. + */ +public class MultiPartOutputStream extends OutputStream { + + private static final Logger log = LoggerFactory.getLogger(MultiPartOutputStream.class); + + public static final int KB = 1024; + /** Megabytes */ + public static final int MB = 1024 * KB; + + private ConvertibleOutputStream currentStream; + + public static final int S3_MIN_PART_SIZE = 5 * MB; + private static final int STREAM_EXTRA_ROOM = MB; + + private BlockingQueue queue; + + private final int partNumberStart; + private final int partNumberEnd; + private final int partSize; + private int currentPartNumber; + + /** + * Creates a new stream that will produce parts of the given size with part numbers in the given range. + * + * @param partNumberStart the part number of the first part the stream will produce. Minimum 1. + * @param partNumberEnd 1 more than the last part number that the parts are allowed to have. Maximum 10 001. + * @param partSize the minimum size in bytes of parts to be produced. + * @param queue where stream parts are put on production. + */ + MultiPartOutputStream(int partNumberStart, int partNumberEnd, int partSize, BlockingQueue queue) { + if (partNumberStart < 1) { + throw new IndexOutOfBoundsException("The lowest allowed part number is 1. The value given was " + partNumberStart); + } + if (partNumberEnd > 10001) { + throw new IndexOutOfBoundsException( + "The highest allowed part number is 10 000, so partNumberEnd must be at most 10 001. The value given was " + partNumberEnd); + } + if (partNumberEnd <= partNumberStart) { + throw new IndexOutOfBoundsException( + String.format("The part number end (%d) must be greater than the part number start (%d).", partNumberEnd, partNumberStart)); + } + if (partSize < S3_MIN_PART_SIZE) { + throw new IllegalArgumentException(String.format( + "The given part size (%d) is less than 5 MB.", partSize)); + } + + this.partNumberStart = partNumberStart; + this.partNumberEnd = partNumberEnd; + this.queue = queue; + this.partSize = partSize; + + log.debug("Creating {}", this); + + currentPartNumber = partNumberStart; + currentStream = new ConvertibleOutputStream(getStreamAllocatedSize()); + } + + /** + * Returns the initial capacity in bytes of the {@code ByteArrayOutputStream} that a part uses. + */ + private int getStreamAllocatedSize() { + /* + This consists of the size that the user asks for, the extra 5 MB to avoid small parts (see the comment in + checkSize()), and some extra space to make resizing and copying unlikely. + */ + return partSize + S3_MIN_PART_SIZE + STREAM_EXTRA_ROOM; + } + + /** + * Checks if the stream currently contains enough data to create a new part. + */ + private void checkSize() { + /* + This class avoids producing parts < 5 MB if possible by only producing a part when it has an extra 5 MB to spare + for the next part. For example, suppose the following. A stream is producing parts of 10 MB. Someone writes + 10 MB and then calls this method, and then writes only 3 MB more before closing. If the initial 10 MB were + immediately packaged into a StreamPart and a new ConvertibleOutputStream was started on for the rest, it would + end up producing a part with just 3 MB. So instead the class waits until it contains 15 MB of data and then it + splits the stream into two: one with 10 MB that gets produced as a part, and one with 5 MB that it continues with. + In this way users of the class are less likely to encounter parts < 5 MB which cause trouble: see the caveat + on order in the StreamTransferManager and the way it handles these small parts, referred to as 'leftover'. + Such parts are only produced when the user closes a stream that never had more than 5 MB written to it. + */ + if (currentStream == null) { + throw new IllegalStateException("The stream is closed and cannot be written to."); + } + if (currentStream.size() > partSize + S3_MIN_PART_SIZE) { + ConvertibleOutputStream newStream = currentStream.split( + currentStream.size() - S3_MIN_PART_SIZE, + getStreamAllocatedSize()); + putCurrentStream(); + currentStream = newStream; + } + } + + private void putCurrentStream() { + if (currentStream.size() == 0) { + return; + } + if (currentPartNumber >= partNumberEnd) { + throw new IndexOutOfBoundsException( + String.format("This stream was allocated the part numbers from %d (inclusive) to %d (exclusive)" + + "and it has gone beyond the end.", + partNumberStart, partNumberEnd)); + } + StreamPart streamPart = new StreamPart(currentStream, currentPartNumber++); + log.debug("Putting {} on queue", streamPart); + try { + queue.put(streamPart); + } catch (InterruptedException e) { + throw Utils.runtimeInterruptedException(e); + } + } + + @Override + public void write(int b) { + currentStream.write(b); + checkSize(); + } + + @Override + public void write(byte b[], int off, int len) { + currentStream.write(b, off, len); + checkSize(); + } + + @Override + public void write(byte b[]) { + write(b, 0, b.length); + checkSize(); + } + + /** + * Packages any remaining data into a {@link StreamPart} and signals to the {@code StreamTransferManager} that there are no more parts + * afterwards. You cannot write to the stream after it has been closed. + */ + @Override + public void close() { + //log.info("Called close() on {}", this); + if (currentStream == null) { + log.warn("{} is already closed", this); + return; + } + try { + putCurrentStream(); + log.debug("Placing poison pill on queue for {}", this); + queue.put(StreamPart.POISON); + } catch (InterruptedException e) { + log.error("Interrupted while closing {}", this); + throw Utils.runtimeInterruptedException(e); + } + currentStream = null; + } + + @Override + public String toString() { + return String.format("[MultipartOutputStream for parts %d - %d]", partNumberStart, partNumberEnd - 1); + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/StreamPart.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/StreamPart.java new file mode 100644 index 0000000000..45e231565f --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/StreamPart.java @@ -0,0 +1,56 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.stream; + +import java.io.InputStream; +import java.util.Base64; + +/** + * A simple class which holds some data which can be uploaded to S3 as part of a multipart upload and a part number + * identifying it. + */ +class StreamPart { + + private ConvertibleOutputStream stream; + private int partNumber; + + /** + * A 'poison pill' placed on the queue to indicate that there are no further parts from a stream. + */ + static final StreamPart POISON = new StreamPart(null, -1); + + public StreamPart(ConvertibleOutputStream stream, int partNumber) { + this.stream = stream; + this.partNumber = partNumber; + } + + public int getPartNumber() { + return partNumber; + } + + public ConvertibleOutputStream getOutputStream() { + return stream; + } + + public InputStream getInputStream() { + return stream.toInputStream(); + } + + public long size() { + return stream.size(); + } + + public String getMD5Digest() { + return Base64.getEncoder().encodeToString(stream.getMD5Digest()); + } + + @Override + public String toString() { + return String.format("[Part number %d %s]", partNumber, + stream == null ? + "with null stream" : + String.format("containing %.2f MB", size() / (1024 * 1024.0))); + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/StreamTransferManager.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/StreamTransferManager.java new file mode 100644 index 0000000000..d84c48285a --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/StreamTransferManager.java @@ -0,0 +1,372 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.stream; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload; +import software.amazon.awssdk.services.s3.model.CompletedPart; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.ListMultipartUploadsRequest; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.UploadPartRequest; +import software.amazon.awssdk.services.s3.model.UploadPartResponse; + +public class StreamTransferManager { + + private static final Logger log = LoggerFactory.getLogger(StreamTransferManager.class); + /** Kilobytes */ + public static final int KB = 1024; + /** Megabytes */ + public static final int MB = 1024 * KB; + + protected final String bucketName; + protected final String putKey; + protected final String path; + protected final S3Client s3Client; + protected String uploadId; + protected int numStreams = 1; + protected int numUploadThreads = 1; + protected int queueCapacity = 1; + protected int partSize = 5 * MB; + protected boolean checkIntegrity = false; + private final List partETags = Collections.synchronizedList(new ArrayList()); + private final List partETagss = Collections.synchronizedList(new ArrayList()); + private List multiPartOutputStreams; + private ExecutorServiceResultsHandler executorServiceResultsHandler; + private ClosableQueue queue; + private int finishedCount = 0; + private StreamPart leftoverStreamPart = null; + private final Object leftoverStreamPartLock = new Object(); + private boolean isAborting = false; + private static final int MAX_PART_NUMBER = 10000; + + public StreamTransferManager(String bucketName, String putKey, S3Client s3Client, String path) { + this.bucketName = bucketName; + this.putKey = putKey; + this.s3Client = s3Client; + this.path = path; + } + + public StreamTransferManager numStreams(int numStreams) { + ensureCanSet(); + if (numStreams < 1) { + throw new IllegalArgumentException("There must be at least one stream"); + } + this.numStreams = numStreams; + return this; + } + + public StreamTransferManager numUploadThreads(int numUploadThreads) { + ensureCanSet(); + if (numUploadThreads < 1) { + throw new IllegalArgumentException("There must be at least one upload thread"); + } + this.numUploadThreads = numUploadThreads; + return this; + } + + public StreamTransferManager queueCapacity(int queueCapacity) { + ensureCanSet(); + if (queueCapacity < 1) { + throw new IllegalArgumentException("The queue capacity must be at least 1"); + } + this.queueCapacity = queueCapacity; + return this; + } + + public StreamTransferManager partSize(long partSize) { + ensureCanSet(); + partSize *= MB; + if (partSize < MultiPartOutputStream.S3_MIN_PART_SIZE) { + throw new IllegalArgumentException(String.format("The given part size (%d) is less than 5 MB.", partSize)); + } + if (partSize > Integer.MAX_VALUE) { + throw new IllegalArgumentException(String + .format("The given part size (%d) is too large as it does not fit in a 32 bit int", partSize)); + } + this.partSize = (int) partSize; + return this; + } + + public StreamTransferManager checkIntegrity(boolean checkIntegrity) { + ensureCanSet(); + if (checkIntegrity) { + Utils.md5(); // check that algorithm is available + } + this.checkIntegrity = checkIntegrity; + return this; + } + + private void ensureCanSet() { + if (queue != null) { + abort(); + throw new IllegalStateException("Setters cannot be called after getMultiPartOutputStreams"); + } + + } + + public List getMultiPartOutputStreams() { + if (multiPartOutputStreams != null) { + return multiPartOutputStreams; + } + + queue = new ClosableQueue(queueCapacity); + log.debug("Initiating multipart upload to {}/{}", bucketName, putKey); + + CreateMultipartUploadRequest createMultipartUploadRequest = CreateMultipartUploadRequest.builder() + .bucket(bucketName).key(putKey).build(); + + CreateMultipartUploadResponse response = s3Client.createMultipartUpload(createMultipartUploadRequest); + uploadId = response.uploadId(); + + log.info("Initiated multipart upload to {}/{} with full ID {}", bucketName, putKey, uploadId); + try { + multiPartOutputStreams = new ArrayList(); + ExecutorService threadPool = Executors.newFixedThreadPool(numUploadThreads); + + int partNumberStart = 1; + + for (int i = 0; i < numStreams; i++) { + int partNumberEnd = (i + 1) * MAX_PART_NUMBER / numStreams + 1; + MultiPartOutputStream multiPartOutputStream = new MultiPartOutputStream(partNumberStart, partNumberEnd, + partSize, queue); + partNumberStart = partNumberEnd; + multiPartOutputStreams.add(multiPartOutputStream); + } + + executorServiceResultsHandler = new ExecutorServiceResultsHandler(threadPool); + for (int i = 0; i < numUploadThreads; i++) { + executorServiceResultsHandler.submit(new UploadTask()); + } + executorServiceResultsHandler.finishedSubmitting(); + } catch (Exception e) { + throw abort(e); + } + + return multiPartOutputStreams; + } + + //TODO needs to refactor this code + @SuppressWarnings("static-access") + public void complete() { + try { + log.debug("{}: Waiting for pool termination", this); + executorServiceResultsHandler.awaitCompletion(); + log.debug("{}: Pool terminated", this); + if (leftoverStreamPart != null) { + log.info("{}: Uploading leftover stream {}", this, leftoverStreamPart); + uploadStreamPart(leftoverStreamPart); + log.debug("{}: Leftover uploaded", this); + } + log.debug("{}: Completing", this); + + CompletedMultipartUpload completedMultipartUpload = null; + + if(partETags.size() < partSize) { + switch (partETags.size()) { + case 1: + completedMultipartUpload = CompletedMultipartUpload.builder().parts(partETags.get(0)).build(); + break; + case 2: + completedMultipartUpload = CompletedMultipartUpload.builder().parts(partETags.get(0), partETags.get(1)).build(); + break; + case 3: + completedMultipartUpload = CompletedMultipartUpload.builder().parts(partETags.get(0), partETags.get(1), partETags.get(2)).build(); + break; + case 4: + completedMultipartUpload = CompletedMultipartUpload.builder().parts(partETags.get(0), partETags.get(1), partETags.get(2), partETags.get(3)).build(); + break; + case 5: + completedMultipartUpload = CompletedMultipartUpload.builder().parts(partETags.get(0), partETags.get(1), partETags.get(2), partETags.get(3), partETags.get(4)).build(); + break; + default: + log.error("Part should not be null or empty"); + break; + }} + + //CompletedMultipartUpload completedMultipartUpload = CompletedMultipartUpload.builder().parts(part).build(); + + CompleteMultipartUploadRequest completeMultipartUploadRequest = CompleteMultipartUploadRequest.builder() + .bucket(bucketName).key(putKey).uploadId(uploadId).multipartUpload(completedMultipartUpload) + .build(); + + CompleteMultipartUploadResponse completeMultipartUploadResult = s3Client + .completeMultipartUpload(completeMultipartUploadRequest); + + log.info("{}: Completed", this); + } catch (IntegrityCheckException e) { + // Nothing to abort. Upload has already finished. + throw e; + } catch (Exception e) { + throw abort(e); + } + } + + /** + * Aborts the upload and rethrows the argument, wrapped in a RuntimeException if + * necessary. Write {@code throw abort(e)} to make it clear to the compiler and + * readers that the code stops here. + */ + public RuntimeException abort(Throwable t) { + if (!isAborting) { + log.error("Aborting {} due to error: {}", this, t.toString()); + } + abort(); + if (t instanceof Error) { + throw (Error) t; + + } else if (t instanceof RuntimeException) { + throw (RuntimeException) t; + + } else if (t instanceof InterruptedException) { + throw Utils.runtimeInterruptedException((InterruptedException) t); + + } else { + throw new RuntimeException(t); + } + } + + /** + * Aborts the upload. Repeated calls have no effect. + */ + public void abort() { + synchronized (this) { + if (isAborting) { + return; + } + isAborting = true; + } + if (executorServiceResultsHandler != null) { + executorServiceResultsHandler.abort(); + } + if (queue != null) { + queue.close(); + } + if (uploadId != null) { + log.debug("{}: Aborting", this); + /* + * AbortMultipartUploadRequest abortMultipartUploadRequest = new + * AbortMultipartUploadRequest(bucketName, putKey, uploadId); + */ + + AbortMultipartUploadRequest abortMultipartUploadRequest = AbortMultipartUploadRequest.builder() + .bucket(bucketName).key(putKey).uploadId(uploadId).build(); + + s3Client.abortMultipartUpload(abortMultipartUploadRequest); + log.info("{}: Aborted", this); + } + } + + private class UploadTask implements Callable { + + @Override + public Void call() { + try { + while (true) { + StreamPart part; + // noinspection SynchronizeOnNonFinalField + synchronized (queue) { + if (finishedCount < multiPartOutputStreams.size()) { + part = queue.take(); + if (part == StreamPart.POISON) { + finishedCount++; + continue; + } + } else { + break; + } + } + if (part.size() < MultiPartOutputStream.S3_MIN_PART_SIZE) { + /* + * Each stream does its best to avoid producing parts smaller than 5 MB, but if + * a user doesn't write that much data there's nothing that can be done. These + * are considered 'leftover' parts, and must be merged with other leftovers to + * try producing a part bigger than 5 MB which can be uploaded without problems. + * After the threads have completed there may be at most one leftover part + * remaining, which S3 can accept. It is uploaded in the complete() method. + */ + log.debug("{}: Received part {} < 5 MB that needs to be handled as 'leftover'", this, part); + StreamPart originalPart = part; + part = null; + synchronized (leftoverStreamPartLock) { + if (leftoverStreamPart == null) { + leftoverStreamPart = originalPart; + log.debug("{}: Created new leftover part {}", this, leftoverStreamPart); + } else { + /* + * Try to preserve order within the data by appending the part with the higher + * number to the part with the lower number. This is not meant to produce a + * perfect solution: if the client is producing multiple leftover parts all bets + * are off on order. + */ + if (leftoverStreamPart.getPartNumber() > originalPart.getPartNumber()) { + StreamPart temp = originalPart; + originalPart = leftoverStreamPart; + leftoverStreamPart = temp; + } + leftoverStreamPart.getOutputStream().append(originalPart.getOutputStream()); + log.debug("{}: Merged with existing leftover part to create {}", this, + leftoverStreamPart); + if (leftoverStreamPart.size() >= MultiPartOutputStream.S3_MIN_PART_SIZE) { + log.debug("{}: Leftover part can now be uploaded as normal and reset", this); + part = leftoverStreamPart; + leftoverStreamPart = null; + } + } + } + } + if (part != null) { + uploadStreamPart(part); + } + } + } catch (Exception t) { + throw abort(t); + } + + return null; + } + + } + + private void uploadStreamPart(StreamPart streamPart) { + log.debug("{}: Uploading {}", this, streamPart); + + UploadPartRequest uploadRequest = UploadPartRequest.builder() + .bucket(bucketName) + .key(putKey) + .uploadId(uploadId) + .partNumber(streamPart.getPartNumber()).build(); + + UploadPartResponse response = s3Client.uploadPart(uploadRequest, RequestBody + .fromInputStream(streamPart.getInputStream(), streamPart.size())); + String eTag = response.eTag(); + CompletedPart part = CompletedPart.builder().partNumber(streamPart.getPartNumber()).eTag(eTag).build(); + + partETags.add(part); + + log.info("{}: Finished uploading {}", this, streamPart); + } + + @Override + public String toString() { + return String.format("[Manager uploading to %s/%s with id %s]", bucketName, putKey, + Utils.skipMiddle(String.valueOf(uploadId), 21)); + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/Utils.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/Utils.java new file mode 100644 index 0000000000..7cfbcba90e --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/stream/Utils.java @@ -0,0 +1,55 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.stream; + +import java.lang.InterruptedException; +import java.lang.RuntimeException; +import java.lang.Thread; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; + +/** + * Miscellaneous useful functions. + */ +class Utils { + + /** + * Lets you avoid dealing with {@code InterruptedException} as a checked exception and ensures + * that the interrupted status of the thread is not lost. + * Write {@code throw runtimeInterruptedException(e)} to make it clear to the + * compiler and readers that the code stops here. + */ + public static RuntimeException runtimeInterruptedException(InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + + /** + * Shortens the given string to the given length by replacing the middle with ..., + * unless the string is already short enough or almost short enough in which case it is returned unmodified. + */ + public static String skipMiddle(String string, int length) { + int inputLength = string.length(); + if (inputLength < length * 1.1) { + return string; + } + int sideLength = (length - 3) / 2; + StringBuilder builder = new StringBuilder(length); + builder.append(string, 0, sideLength); + builder.append("..."); + builder.append(string, inputLength - sideLength, inputLength); + return builder.toString(); + } + + public static MessageDigest md5() { + try { + MessageDigest md = MessageDigest.getInstance("MD5"); + md.reset(); + return md; + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/resources/pipelines.yaml b/data-prepper-plugins/s3-sink/src/main/resources/pipelines.yaml new file mode 100644 index 0000000000..6e51f4a11d --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/resources/pipelines.yaml @@ -0,0 +1,21 @@ +simple-sample-pipeline: + workers: 4 + delay: "5000" + source: + random: + sink: + - s3: + aws: + region: us-east-1 + sts_role_arn: arn:aws:iam::701869769844 :role/s3-full-access + bucket: dataprepper + key_path_prefix: logdata + object: + file_pattern: logs-${yyyy-MM-dd} + threshold: + event_count: 200 + byte_capacity: 2500 + event_collection_duration: 20 + codec: + json: + temporary_storage: local_file \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 402748b71d..a3c749d922 100644 --- a/settings.gradle +++ b/settings.gradle @@ -10,8 +10,7 @@ pluginManagement { } } -//rootProject.name = 'opensearch-data-prepper' -rootProject.name = 'data-prepper' +rootProject.name = 'opensearch-data-prepper' dependencyResolutionManagement { versionCatalogs { @@ -93,5 +92,4 @@ include 'release:maven' include 'e2e-test:peerforwarder' include 'rss-source' include 'data-prepper-plugins:s3-sink' -findProject(':data-prepper-plugins:s3-sink')?.name = 's3-sink'