From be5a4f526d5bffb4c91c0435bdd1d208f64ab586 Mon Sep 17 00:00:00 2001 From: Deepak Sahu Date: Wed, 19 Apr 2023 20:04:15 +0530 Subject: [PATCH] Adding the S3 sink plugin. Contributes to #1048 Signed-off-by: Deepak Sahu --- .../plugins/s3keyindex/S3ObjectIndex.java | 135 ++++++++++++++ .../plugins/s3keyindex/S3ObjectIndexTest.java | 116 ++++++++++++ data-prepper-plugins/s3-sink/README.md | 65 +++++++ data-prepper-plugins/s3-sink/build.gradle | 66 +++++++ .../dataprepper/plugins/sink/S3Sink.java | 91 ++++++++++ .../plugins/sink/S3SinkConfig.java | 99 +++++++++++ .../plugins/sink/S3SinkService.java | 78 +++++++++ .../plugins/sink/S3SinkWorker.java | 129 ++++++++++++++ .../plugins/sink/accumulator/BufferType.java | 61 +++++++ .../sink/accumulator/BufferTypeOptions.java | 39 +++++ .../sink/accumulator/InMemoryBuffer.java | 46 +++++ .../sink/accumulator/LocalFileBuffer.java | 84 +++++++++ .../plugins/sink/accumulator/ObjectKey.java | 61 +++++++ .../dataprepper/plugins/sink/codec/Codec.java | 20 +++ .../plugins/sink/codec/JsonCodec.java | 26 +++ .../AwsAuthenticationOptions.java | 74 ++++++++ .../sink/configuration/BucketOptions.java | 39 +++++ .../sink/configuration/ObjectKeyOptions.java | 32 ++++ .../sink/configuration/ThresholdOptions.java | 57 ++++++ .../plugins/sink/S3SinkConfigTest.java | 53 ++++++ .../plugins/sink/S3SinkServiceTest.java | 116 ++++++++++++ .../dataprepper/plugins/sink/S3SinkTest.java | 165 ++++++++++++++++++ .../plugins/sink/S3SinkWorkerTest.java | 154 ++++++++++++++++ .../accumulator/BufferTypeOptionsTest.java | 17 ++ .../sink/accumulator/BufferTypeTest.java | 89 ++++++++++ .../sink/accumulator/InMemoryBufferTest.java | 156 +++++++++++++++++ .../sink/accumulator/LocalFileBufferTest.java | 120 +++++++++++++ .../sink/accumulator/ObjectKeyTest.java | 84 +++++++++ .../plugins/sink/codec/JsonCodecTest.java | 47 +++++ .../AwsAuthenticationOptionsTest.java | 62 +++++++ .../sink/configuration/BucketOptionsTest.java | 24 +++ .../configuration/ObjectKeyOptionsTest.java | 25 +++ .../configuration/ThresholdOptionsTest.java | 32 ++++ settings.gradle | 3 +- 34 files changed, 2464 insertions(+), 1 deletion(-) create mode 100644 data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/s3keyindex/S3ObjectIndex.java create mode 100644 data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/s3keyindex/S3ObjectIndexTest.java create mode 100644 data-prepper-plugins/s3-sink/README.md create mode 100644 data-prepper-plugins/s3-sink/build.gradle create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/BufferType.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/BufferTypeOptions.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/InMemoryBuffer.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/LocalFileBuffer.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/ObjectKey.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/Codec.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodec.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptions.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/BucketOptions.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectKeyOptions.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java create mode 100644 data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfigTest.java create mode 100644 data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceTest.java create mode 100644 data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkTest.java create mode 100644 data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorkerTest.java create mode 100644 data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/accumulator/BufferTypeOptionsTest.java create mode 100644 data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/accumulator/BufferTypeTest.java create mode 100644 data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/accumulator/InMemoryBufferTest.java create mode 100644 data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/accumulator/LocalFileBufferTest.java create mode 100644 data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/accumulator/ObjectKeyTest.java create mode 100644 data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodecTest.java create mode 100644 data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptionsTest.java create mode 100644 data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/BucketOptionsTest.java create mode 100644 data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectKeyOptionsTest.java create mode 100644 data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptionsTest.java diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/s3keyindex/S3ObjectIndex.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/s3keyindex/S3ObjectIndex.java new file mode 100644 index 0000000000..84a6cd3c9a --- /dev/null +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/s3keyindex/S3ObjectIndex.java @@ -0,0 +1,135 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.s3keyindex; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Set; +import java.util.TimeZone; +import java.util.UUID; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Class responsible for creation of s3 key pattern based on date time stamp + */ +public class S3ObjectIndex { + + private static final String TIME_PATTERN_STARTING_SYMBOLS = "\\%{"; + + // For matching a string that begins with a "%{" and ends with a "}". + // For a string like "data-prepper-%{yyyy-MM-dd}", "%{yyyy-MM-dd}" is matched. + private static final String TIME_PATTERN_REGULAR_EXPRESSION = "\\%\\{.*?\\}"; + + // For matching a string enclosed by "%{" and "}". + // For a string like "data-prepper-%{yyyy-MM}", "yyyy-MM" is matched. + private static final String TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION = "\\%\\{(.*?)\\}"; + + private static final ZoneId UTC_ZONE_ID = ZoneId.of(TimeZone.getTimeZone("UTC").getID()); + + S3ObjectIndex() { + } + + /** + * Create Object Name with date,time and UniqueID prepended. + */ + public static String getObjectNameWithDateTimeId(final String indexAlias) { + DateTimeFormatter dateFormatter = getDatePatternFormatter(indexAlias); + String suffix = (dateFormatter != null) ? dateFormatter.format(getCurrentUtcTime()) : ""; + return indexAlias.replaceAll(TIME_PATTERN_REGULAR_EXPRESSION, "") + suffix + "-" + getTimeNanos() + "-" + + UUID.randomUUID(); + } + + /** + * Create Object path prefix. + */ + public static String getObjectPathPrefix(final String indexAlias) { + DateTimeFormatter dateFormatter = getDatePatternFormatter(indexAlias); + String suffix = (dateFormatter != null) ? dateFormatter.format(getCurrentUtcTime()) : ""; + return indexAlias.replaceAll(TIME_PATTERN_REGULAR_EXPRESSION, "") + suffix; + } + + /** + * Creates epoch seconds. + */ + public static long getTimeNanos() { + Instant time = Instant.now(); + final long NANO_MULTIPLIER = 1_000 * 1_000 * 1_000; + long currentTimeNanos = time.getEpochSecond() * NANO_MULTIPLIER + time.getNano(); + return currentTimeNanos; + } + + /** + * Validate the index with the regular expression pattern. Throws exception if validation fails + */ + public static DateTimeFormatter getDatePatternFormatter(final String indexAlias) { + final Pattern pattern = Pattern.compile(TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION); + final Matcher timePatternMatcher = pattern.matcher(indexAlias); + if (timePatternMatcher.find()) { + final String timePattern = timePatternMatcher.group(1); + if (timePatternMatcher.find()) { // check if there is a one more match. + throw new IllegalArgumentException("An index only allows one date-time pattern."); + } + if (timePattern.contains(TIME_PATTERN_STARTING_SYMBOLS)) { // check if it is a nested pattern such as + // "data-prepper-%{%{yyyy.MM.dd}}" + throw new IllegalArgumentException("An index doesn't allow nested date-time patterns."); + } + validateTimePatternIsAtTheEnd(indexAlias, timePattern); + validateNoSpecialCharsInTimePattern(timePattern); + validateTimePatternGranularity(timePattern); + return DateTimeFormatter.ofPattern(timePattern); + } + return null; + } + + /** + * Data Prepper only allows time pattern as a suffix. + */ + private static void validateTimePatternIsAtTheEnd(final String indexAlias, final String timePattern) { + if (!indexAlias.endsWith(timePattern + "}")) { + throw new IllegalArgumentException("Time pattern can only be a suffix of an index."); + } + } + + /** + * Special characters can cause failures in creating indexes. + */ + private static final Set INVALID_CHARS = Set.of('#', '\\', '/', '*', '?', '"', '<', '>', '|', ',', ':'); + + public static void validateNoSpecialCharsInTimePattern(String timePattern) { + boolean containsInvalidCharacter = timePattern.chars().mapToObj(c -> (char) c) + .anyMatch(character -> INVALID_CHARS.contains(character)); + if (containsInvalidCharacter) { + throw new IllegalArgumentException( + "Index time pattern contains one or multiple special characters: " + INVALID_CHARS); + } + } + + /** + * Validates the time pattern, support creating indexes with time patterns that are too granular + * hour, minute and second + */ + private static final Set UNSUPPORTED_TIME_GRANULARITY_CHARS = Set.of('A', 'n', 'N'); + + public static void validateTimePatternGranularity(String timePattern) { + boolean containsUnsupportedTimeSymbol = timePattern.chars().mapToObj(c -> (char) c) + .anyMatch(character -> UNSUPPORTED_TIME_GRANULARITY_CHARS.contains(character)); + if (containsUnsupportedTimeSymbol) { + throw new IllegalArgumentException("Index time pattern contains time patterns that are less than one hour: " + + UNSUPPORTED_TIME_GRANULARITY_CHARS); + } + } + + /** + * Returns the current UTC Date and Time + */ + public static ZonedDateTime getCurrentUtcTime() { + return LocalDateTime.now().atZone(ZoneId.systemDefault()).withZoneSameInstant(UTC_ZONE_ID); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/s3keyindex/S3ObjectIndexTest.java b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/s3keyindex/S3ObjectIndexTest.java new file mode 100644 index 0000000000..402ae84ffc --- /dev/null +++ b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/s3keyindex/S3ObjectIndexTest.java @@ -0,0 +1,116 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.s3keyindex; + +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import org.junit.jupiter.api.Test; + +class S3ObjectIndexTest { + + @Test + void testObjectDateTimePatterns_not_equal() throws IllegalArgumentException { + + String expectedIndex = S3ObjectIndex.getObjectNameWithDateTimeId("events-%{yyyy-MM-dd}"); + String actualIndex = S3ObjectIndex.getObjectNameWithDateTimeId("events-%{yyyy-MM-dd}"); + assertFalse(actualIndex.contains(expectedIndex)); + } + + @Test + void testgetObjectPathPrefix_not_equal() throws IllegalArgumentException { + + String expectedIndex = S3ObjectIndex.getObjectPathPrefix("events-%{yyyy}"); + String actualIndex = S3ObjectIndex.getObjectPathPrefix("events-%{yyyy}"); + assertTrue(actualIndex.contains(expectedIndex)); + } + + @Test + void testObjectTimePattern_Exceptional_time_TooGranular() throws IllegalArgumentException { + assertThrows(IllegalArgumentException.class, () -> { + S3ObjectIndex.getDatePatternFormatter("events-%{yyyy-AA-dd}"); + }); + } + + @Test + void testObjectTimePatterns_equal() throws IllegalArgumentException { + + DateTimeFormatter expectedIndex = S3ObjectIndex.getDatePatternFormatter("events-%{yyyy-MM-dd}"); + DateTimeFormatter actualIndex = S3ObjectIndex.getDatePatternFormatter("events-%{yyyy-MM-dd}"); + assertEquals(actualIndex.toString(), expectedIndex.toString()); + } + + @Test + void test_utc_current_time() throws IllegalArgumentException { + + ZonedDateTime expectedIndex = S3ObjectIndex.getCurrentUtcTime(); + ZonedDateTime actualIndex = S3ObjectIndex.getCurrentUtcTime(); + + assertEquals(expectedIndex.getDayOfYear(), actualIndex.getDayOfYear()); + assertEquals(expectedIndex.getDayOfMonth(), actualIndex.getDayOfMonth()); + assertEquals(expectedIndex.getDayOfWeek(), actualIndex.getDayOfWeek()); + } + + @Test + void testObjectTimePattern_Exceptional_TooGranular() { + assertThrows(IllegalArgumentException.class, () -> { + S3ObjectIndex.getDatePatternFormatter("events-%{yyyy-AA-ddThh:mm}"); + }); + } + + @Test + void testObjectTimePattern_Exceptional_at_theEnd() { + assertThrows(IllegalArgumentException.class, () -> { + S3ObjectIndex.getDatePatternFormatter("events-%{yyy{MM}dd}"); + }); + } + + @Test + void testObject_allows_one_date_time_pattern_Exceptional() { + assertThrows(IllegalArgumentException.class, () -> { + S3ObjectIndex.getDatePatternFormatter("events-%{yyyy-MM-dd}-%{yyyy-MM-dd}"); + }); + } + + @Test + void testObject_nested_pattern_Exceptional() { + assertThrows(IllegalArgumentException.class, () -> { + S3ObjectIndex.getDatePatternFormatter("bucket-name-\\%{\\%{yyyy.MM.dd}}"); + }); + } + + @Test + void testObject_null_time_pattern() throws NullPointerException { + assertNull(S3ObjectIndex.getDatePatternFormatter("bucket-name")); + } + + @Test + void testObjectAliasWithDatePrefix_Exceptional_time_TooGranular() throws IllegalArgumentException { + assertThrows(IllegalArgumentException.class, () -> { + S3ObjectIndex.getObjectNameWithDateTimeId("events-%{yyyy-AA-dd}"); + }); + } + + @Test + void testObjectAliasWithDatePrefix_equal() throws IllegalArgumentException { + + String expectedIndex = S3ObjectIndex.getObjectNameWithDateTimeId("events-%{yyyy-MM-dd}"); + String actualIndex = S3ObjectIndex.getObjectNameWithDateTimeId("events-%{yyyy-MM-dd}"); + assertNotEquals(actualIndex.toString(), expectedIndex.toString()); + } + + @Test + void test_default_constructor() { + S3ObjectIndex object = new S3ObjectIndex(); + assertNotNull(object); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/README.md b/data-prepper-plugins/s3-sink/README.md new file mode 100644 index 0000000000..17a5b0b4e2 --- /dev/null +++ b/data-prepper-plugins/s3-sink/README.md @@ -0,0 +1,65 @@ +# S3 Sink + +This is the Data Prepper S3 sink plugin that sends records to an S3 bucket via S3Client. + +The S3 sink plugin supports OpenSearch 2.0.0 and greater. + +## Usages + +The s3 sink should be configured as part of Data Prepper pipeline yaml file. + +## Configuration Options + +``` +pipeline: + ... + sink: + - s3: + aws: + region: us-east-1 + sts_role_arn: arn:aws:iam::123456789012:role/Data-Prepper + sts_header_overrides: + max_retries: 5 + bucket: + name: bucket_name + object_key: + path_prefix: my-elb/%{yyyy}/%{MM}/%{dd}/ + threshold: + event_count: 2000 + maximum_size: 50mb + event_collect: 15s + codec: + ndjson: +``` + +## Configuration + +- `aws_region` (Optional) : The AWS region to use for credentials. Defaults to [standard SDK behavior to determine the region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html). Defaults to `none`. + +- `aws_sts_role_arn` (Optional) : The AWS STS role to assume for requests to SQS and S3. Defaults to null, which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html). Defaults to `none`. + +- `aws_sts_header_overrides` (Optional) : An optional map of header overrides to make when assuming the IAM role for the sink plugin. Defaults to `none`. + +- `max_retries` (Optional) : An integer value indicates the maximum number of times that single request should be retired in-order to ingest data to amazon s3. Defaults to `5`. + +- `bucket` (Required) : Object storage built to store and retrieve any amount of data from anywhere, User must provide bucket name. + +- `object_key` (Optional) : It contains `path_prefix` and `file_pattern`. Defaults to s3 object `events-%{yyyy-MM-dd'T'hh-mm-ss}` inside bucket root directory. + +- `path_prefix` (Optional) : path_prefix nothing but directory structure inside bucket in-order to store objects. Defaults to `none`. + +- `event_count` (Required) : An integer value indicates the maximum number of events required to ingest into s3-bucket as part of threshold. + +- `maximum_size` (Optional) : A String representing the count or size of bytes required to ingest into s3-bucket as part of threshold. Defaults to `50mb`. + +- `event_collect` (Required) : A String representing how long events should be collected before ingest into s3-bucket as part of threshold. All Duration values are a string that represents a duration. They support ISO_8601 notation string ("PT20.345S", "PT15M", etc.) as well as simple notation Strings for seconds ("60s") and milliseconds ("1500ms"). + +- `buffer_type` (Optional) : Records stored temporary before flushing into s3 bucket. Possible values are `local_file` and `in_memory`. Defaults to `in_memory`. + + +## Developer Guide + +This plugin is compatible with Java 8. See + +- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md) +- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md) \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/build.gradle b/data-prepper-plugins/s3-sink/build.gradle new file mode 100644 index 0000000000..eb80b6175a --- /dev/null +++ b/data-prepper-plugins/s3-sink/build.gradle @@ -0,0 +1,66 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +plugins { + id 'java' +} + +repositories { + mavenCentral() +} + +dependencies { + implementation project(':data-prepper-api') + implementation project(path: ':data-prepper-plugins:common') + implementation 'io.micrometer:micrometer-core' + implementation 'com.fasterxml.jackson.core:jackson-core' + implementation 'com.fasterxml.jackson.core:jackson-databind' + implementation 'org.apache.commons:commons-compress:1.21' + implementation 'joda-time:joda-time:2.11.1' + implementation 'org.hibernate.validator:hibernate-validator:7.0.5.Final' + implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv' + implementation 'software.amazon.awssdk:aws-sdk-java:2.17.148' + implementation 'org.mapdb:mapdb:3.0.8' + implementation 'org.jetbrains.kotlin:kotlin-stdlib:1.7.10' + implementation 'org.jetbrains.kotlin:kotlin-stdlib-common:1.7.10' + implementation 'org.apache.commons:commons-lang3:3.12.0' + testImplementation project(':data-prepper-test-common') +} + +test { + useJUnitPlatform() +} + +sourceSets { + integrationTest { + java { + compileClasspath += main.output + test.output + runtimeClasspath += main.output + test.output + srcDir file('src/integrationTest/java') + } + resources.srcDir file('src/integrationTest/resources') + } +} + +configurations { + integrationTestImplementation.extendsFrom testImplementation + integrationTestRuntime.extendsFrom testRuntime +} + +task integrationTest(type: Test) { + group = 'verification' + testClassesDirs = sourceSets.integrationTest.output.classesDirs + + useJUnitPlatform() + + classpath = sourceSets.integrationTest.runtimeClasspath + systemProperty 'tests.s3source.bucket', System.getProperty('tests.s3source.bucket') + systemProperty 'tests.s3source.region', System.getProperty('tests.s3source.region') + systemProperty 'tests.s3source.queue.url', System.getProperty('tests.s3source.queue.url') + + filter { + includeTestsMatching '*IT' + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java new file mode 100644 index 0000000000..8ca5f51ac9 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java @@ -0,0 +1,91 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink; + +import java.util.Collection; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.AbstractSink; +import org.opensearch.dataprepper.model.sink.Sink; +import org.opensearch.dataprepper.plugins.sink.codec.Codec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation class of s3-sink plugin. It is responsible for receive the collection of + * {@link Event} and upload to amazon s3 based on thresholds configured. + */ +@DataPrepperPlugin(name = "s3", pluginType = Sink.class, pluginConfigurationType = S3SinkConfig.class) +public class S3Sink extends AbstractSink> { + + private static final Logger LOG = LoggerFactory.getLogger(S3Sink.class); + private final S3SinkConfig s3SinkConfig; + private final Codec codec; + private S3SinkWorker sinkWorker; + private volatile boolean sinkInitialized; + private S3SinkService s3SinkService; + + /** + * @param pluginSetting + * @param s3SinkConfig + * @param pluginFactory + */ + @DataPrepperPluginConstructor + public S3Sink(final PluginSetting pluginSetting, final S3SinkConfig s3SinkConfig, + final PluginFactory pluginFactory) { + super(pluginSetting); + this.s3SinkConfig = s3SinkConfig; + final PluginModel codecConfiguration = s3SinkConfig.getCodec(); + final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), + codecConfiguration.getPluginSettings()); + codec = pluginFactory.loadPlugin(Codec.class, codecPluginSettings); + sinkInitialized = Boolean.FALSE; + } + + @Override + public boolean isReady() { + return sinkInitialized; + } + + @Override + public void doInitialize() { + try { + doInitializeInternal(); + } catch (InvalidPluginConfigurationException e) { + LOG.error("Invalid plugin configuration, Hence failed to initialize s3-sink plugin."); + this.shutdown(); + throw e; + } catch (Exception e) { + LOG.error("Failed to initialize s3-sink plugin."); + this.shutdown(); + throw e; + } + } + + /** + * Initialize {@link S3SinkService} and {@link S3SinkWorker} + */ + private void doInitializeInternal() { + s3SinkService = new S3SinkService(s3SinkConfig); + sinkWorker = new S3SinkWorker(s3SinkService.createS3Client(), s3SinkConfig, codec); + sinkInitialized = Boolean.TRUE; + } + + @Override + public void doOutput(final Collection> records) { + if (records.isEmpty()) { + return; + } + s3SinkService.processRecords(records); + s3SinkService.accumulateBufferEvents(sinkWorker); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java new file mode 100644 index 0000000000..ecb3ababcd --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java @@ -0,0 +1,99 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink; + +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.plugins.sink.accumulator.BufferTypeOptions; +import org.opensearch.dataprepper.plugins.sink.configuration.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.sink.configuration.BucketOptions; +import org.opensearch.dataprepper.plugins.sink.configuration.ThresholdOptions; +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotNull; + +/** + * An implementation class of s3 sink configuration + */ +public class S3SinkConfig { + + private static final int DEFAULT_CONNECTION_RETRIES = 5; + private static final int DEFAULT_UPLOAD_RETRIES = 5; + + @JsonProperty("aws") + @NotNull + @Valid + private AwsAuthenticationOptions awsAuthenticationOptions; + + @JsonProperty("bucket") + @NotNull + @Valid + private BucketOptions bucketOptions; + + @JsonProperty("threshold") + @NotNull + private ThresholdOptions thresholdOptions; + + @JsonProperty("codec") + @NotNull + private PluginModel codec; + + @JsonProperty("buffer_type") + private BufferTypeOptions bufferType = BufferTypeOptions.INMEMORY; + + private int maxConnectionRetries = DEFAULT_CONNECTION_RETRIES; + + @JsonProperty("max_retries") + private int maxUploadRetries = DEFAULT_UPLOAD_RETRIES; + + /** + * Aws Authentication configuration Options + */ + public AwsAuthenticationOptions getAwsAuthenticationOptions() { + return awsAuthenticationOptions; + } + + /** + * Threshold configuration Options + */ + public ThresholdOptions getThresholdOptions() { + return thresholdOptions; + } + + /** + * S3 bucket configuration Options + */ + public BucketOptions getBucketOptions() { + return bucketOptions; + } + + /** + * Sink codec configuration Options + */ + public PluginModel getCodec() { + return codec; + } + + /** + * Buffer type configuration Options + */ + public BufferTypeOptions getBufferType() { + return bufferType; + } + + /** + * S3 client connection retries configuration Options + */ + public int getMaxConnectionRetries() { + return maxConnectionRetries; + } + + /** + * S3 object upload retries configuration Options + */ + public int getMaxUploadRetries() { + return maxUploadRetries; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java new file mode 100644 index 0000000000..b79dc6a934 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java @@ -0,0 +1,78 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink; + +import java.util.Collection; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.retry.RetryPolicy; +import software.amazon.awssdk.services.s3.S3Client; + +/** + * Class responsible for taking an {@link S3SinkConfig} and creating all the necessary + * {@link S3Client} and event accumulate operations. + */ +public class S3SinkService { + + private static final Logger LOG = LoggerFactory.getLogger(S3SinkService.class); + private final S3SinkConfig s3SinkConfig; + private static final int EVENT_QUEUE_SIZE = 10000000; + private BlockingQueue eventQueue; + private final Lock reentrantLock; + + /** + * @param s3SinkConfig s3 sink related configuration. + */ + public S3SinkService(final S3SinkConfig s3SinkConfig) { + this.s3SinkConfig = s3SinkConfig; + eventQueue = new ArrayBlockingQueue<>(EVENT_QUEUE_SIZE); + reentrantLock = new ReentrantLock(); + } + + /** + * @param records received buffered records add into queue. + */ + public void processRecords(Collection> records) { + for (final Record recordData : records) { + Event event = recordData.getData(); + eventQueue.add(event); + } + } + + /** + * @param worker {@link S3SinkWorker} accumulate buffer events + */ + public void accumulateBufferEvents(S3SinkWorker worker) { + reentrantLock.lock(); + try { + worker.bufferAccumulator(eventQueue); + } catch (Exception e) { + LOG.error("Exception while accumulate buffer events: ", e); + } finally { + reentrantLock.unlock(); + } + } + + /** + * @return {@link S3Client} + */ + public S3Client createS3Client() { + LOG.info("Creating S3 client"); + return S3Client.builder().region(s3SinkConfig.getAwsAuthenticationOptions().getAwsRegion()) + .credentialsProvider(s3SinkConfig.getAwsAuthenticationOptions().authenticateAwsConfiguration()) + .overrideConfiguration(ClientOverrideConfiguration.builder() + .retryPolicy(RetryPolicy.builder().numRetries(s3SinkConfig.getMaxConnectionRetries()).build()) + .build()) + .build(); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java new file mode 100644 index 0000000000..a7b57c00c4 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java @@ -0,0 +1,129 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink; + +import java.io.IOException; +import java.util.NavigableSet; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.time.StopWatch; +import org.mapdb.DB; +import org.mapdb.DBMaker; +import org.mapdb.Serializer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.sink.accumulator.BufferType; +import org.opensearch.dataprepper.plugins.sink.accumulator.BufferTypeOptions; +import org.opensearch.dataprepper.plugins.sink.accumulator.InMemoryBuffer; +import org.opensearch.dataprepper.plugins.sink.accumulator.LocalFileBuffer; +import org.opensearch.dataprepper.plugins.sink.codec.Codec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import software.amazon.awssdk.services.s3.S3Client; + +/** + * Class responsible for threshold check and instantiation of {@link BufferType} + */ +public class S3SinkWorker { + + private static final Logger LOG = LoggerFactory.getLogger(S3SinkWorker.class); + private final S3Client s3Client; + private final S3SinkConfig s3SinkConfig; + private final Codec codec; + private final int numEvents; + private final ByteCount byteCapacity; + private final long duration; + + /** + * @param s3Client + * @param s3SinkConfig + * @param codec + */ + public S3SinkWorker(final S3Client s3Client, final S3SinkConfig s3SinkConfig, final Codec codec) { + this.s3Client = s3Client; + this.s3SinkConfig = s3SinkConfig; + this.codec = codec; + numEvents = s3SinkConfig.getThresholdOptions().getEventCount(); + byteCapacity = s3SinkConfig.getThresholdOptions().getMaximumSize(); + duration = s3SinkConfig.getThresholdOptions().getEventCollect().getSeconds(); + } + + /** + * Accumulates data from buffer and store in local-file/in-memory. + * + * @param eventQueue + */ + public void bufferAccumulator(BlockingQueue eventQueue) { + boolean isFileUploadedToS3 = Boolean.FALSE; + DB eventDb = null; + NavigableSet bufferedEventSet = null; + int byteCount = 0; + int eventCount = 0; + long eventCollectionDuration = 0; + try { + StopWatch watch = new StopWatch(); + watch.start(); + eventDb = DBMaker.memoryDB().make(); + bufferedEventSet = eventDb.treeSet("set").serializer(Serializer.STRING).createOrOpen(); + int data = 0; + while (thresholdsCheck(data, watch, byteCount)) { + if (!eventQueue.isEmpty()) { + Event event = eventQueue.take(); + String jsonSerEvent = codec.parse(event); + byteCount += jsonSerEvent.getBytes().length; + bufferedEventSet.add(codec.parse(event).concat(System.lineSeparator())); + eventCount++; + data++; + eventCollectionDuration = watch.getTime(TimeUnit.SECONDS); + } + } + eventDb.commit(); + LOG.info( + "Snapshot info : Byte_capacity = {} Bytes, Event_count = {} Records & Event_collection_duration = {} Sec", + byteCount, eventCount, eventCollectionDuration); + + if (s3SinkConfig.getBufferType().equals(BufferTypeOptions.LOCALFILE)) { + isFileUploadedToS3 = new LocalFileBuffer(s3Client, s3SinkConfig).localFileAccumulate(bufferedEventSet); + } else { + isFileUploadedToS3 = new InMemoryBuffer(s3Client, s3SinkConfig).inMemoryAccumulate(bufferedEventSet); + } + + if (isFileUploadedToS3) { + LOG.info("Snapshot uploaded successfully"); + } else { + LOG.info("Snapshot upload failed"); + } + + } catch (InterruptedException e) { + LOG.error("Exception while storing recoreds to buffer, or upload object to Amazon s3 bucket", e); + Thread.currentThread().interrupt(); + } catch (IOException e) { + LOG.error("Exception while json serialization", e); + } finally { + if (eventDb != null && !eventDb.isClosed()) { + eventDb.close(); + } + } + } + + /** + * Toggle the flag based on the threshold limits. If flag is false write to in-memory/local-file. + * + * @param eventCount + * @param watch + * @param byteCount + * @return + */ + private boolean thresholdsCheck(int eventCount, StopWatch watch, int byteCount) { + if (eventCount > 0) { + return eventCount < numEvents && watch.getTime(TimeUnit.SECONDS) < duration + && byteCount < byteCapacity.getBytes(); + } else { + return watch.getTime(TimeUnit.SECONDS) < duration && byteCount < byteCapacity.getBytes(); + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/BufferType.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/BufferType.java new file mode 100644 index 0000000000..39f0c2f27c --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/BufferType.java @@ -0,0 +1,61 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.accumulator; + +import org.opensearch.dataprepper.plugins.sink.S3SinkConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; + +/** + * Interface for building buffer types. + */ +public interface BufferType { + + public static final Logger LOG = LoggerFactory.getLogger(BufferType.class); + + /** + * Upload accumulated data to amazon s3 and perform retry in-case any issue occurred, based on + * max_upload_retries configuration. + * + * @param s3SinkConfig + * @param s3Client + * @param requestBody + * @return + * @throws InterruptedException + */ + public default boolean uploadToAmazonS3(S3SinkConfig s3SinkConfig, S3Client s3Client, RequestBody requestBody) + throws InterruptedException { + + final String pathPrefix = ObjectKey.buildingPathPrefix(s3SinkConfig); + final String namePattern = ObjectKey.objectFileName(s3SinkConfig); + final String bucketName = s3SinkConfig.getBucketOptions().getBucketName(); + + final String key = (pathPrefix != null && !pathPrefix.isEmpty()) ? pathPrefix + namePattern : namePattern; + boolean isFileUploadedToS3 = Boolean.FALSE; + int retryCount = s3SinkConfig.getMaxUploadRetries(); + do { + try { + PutObjectRequest request = PutObjectRequest.builder().bucket(bucketName).key(key).build(); + s3Client.putObject(request, requestBody); + isFileUploadedToS3 = Boolean.TRUE; + } catch (AwsServiceException | SdkClientException e) { + LOG.error("Exception occurred while upload file {} to amazon s3 bucket. Retry count : {} exception:", + namePattern, retryCount, e); + --retryCount; + if (retryCount == 0) { + return isFileUploadedToS3; + } + Thread.sleep(5000); + } + } while (!isFileUploadedToS3); + return isFileUploadedToS3; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/BufferTypeOptions.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/BufferTypeOptions.java new file mode 100644 index 0000000000..9d9134fc2e --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/BufferTypeOptions.java @@ -0,0 +1,39 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.accumulator; + +import com.fasterxml.jackson.annotation.JsonCreator; +import java.util.Arrays; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Defines all the buffer types enumerations. + */ +public enum BufferTypeOptions { + + INMEMORY("in_memory", new InMemoryBuffer()), + LOCALFILE("local_file", new LocalFileBuffer()); + + private final String option; + private final BufferType bufferType; + private static final Map OPTIONS_MAP = Arrays.stream(BufferTypeOptions.values()) + .collect(Collectors.toMap(value -> value.option, value -> value)); + + BufferTypeOptions(final String option, final BufferType bufferType) { + this.option = option.toLowerCase(); + this.bufferType = bufferType; + } + + public BufferType getBufferType() { + return bufferType; + } + + @JsonCreator + static BufferTypeOptions fromOptionValue(final String option) { + return OPTIONS_MAP.get(option.toLowerCase()); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/InMemoryBuffer.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/InMemoryBuffer.java new file mode 100644 index 0000000000..23a92dc52c --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/InMemoryBuffer.java @@ -0,0 +1,46 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.accumulator; + +import java.util.NavigableSet; +import org.opensearch.dataprepper.plugins.sink.S3SinkConfig; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3Client; + +/** + * Implements the in memory buffer type. + */ +public class InMemoryBuffer implements BufferType { + + private S3Client s3Client; + private S3SinkConfig s3SinkConfig; + + public InMemoryBuffer() { + } + + /** + * @param s3Client + * @param s3SinkConfig + */ + public InMemoryBuffer(final S3Client s3Client, final S3SinkConfig s3SinkConfig) { + this.s3Client = s3Client; + this.s3SinkConfig = s3SinkConfig; + } + + /** + * @param bufferedEventSet + * @return boolean + * @throws InterruptedException + */ + public boolean inMemoryAccumulate(final NavigableSet bufferedEventSet) throws InterruptedException { + + StringBuilder eventBuilder = new StringBuilder(); + for (String event : bufferedEventSet) { + eventBuilder.append(event); + } + return uploadToAmazonS3(s3SinkConfig, s3Client, RequestBody.fromString(eventBuilder.toString())); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/LocalFileBuffer.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/LocalFileBuffer.java new file mode 100644 index 0000000000..4a9c0aec13 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/LocalFileBuffer.java @@ -0,0 +1,84 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.accumulator; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.NavigableSet; +import org.opensearch.dataprepper.plugins.sink.S3SinkConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3Client; + +/** + * Implements the local file buffer type. + */ +public class LocalFileBuffer implements BufferType { + + private static final Logger LOG = LoggerFactory.getLogger(LocalFileBuffer.class); + private S3Client s3Client; + private S3SinkConfig s3SinkConfig; + private File fileAbsolutePath; + + public LocalFileBuffer() { + } + + /** + * @param s3Client + * @param s3SinkConfig + */ + public LocalFileBuffer(final S3Client s3Client, final S3SinkConfig s3SinkConfig) { + this.s3Client = s3Client; + this.s3SinkConfig = s3SinkConfig; + } + + /** + * @param bufferedEventSet + * @return boolean + * @throws InterruptedException + */ + public boolean localFileAccumulate(final NavigableSet bufferedEventSet) throws InterruptedException { + boolean isFileUploadedToS3 = Boolean.FALSE; + String s3ObjectFileName = ObjectKey.objectFileName(s3SinkConfig); + File file = new File(s3ObjectFileName); + try (BufferedWriter eventWriter = new BufferedWriter(new FileWriter(s3ObjectFileName))) { + for (String event : bufferedEventSet) { + eventWriter.write(event); + } + fileAbsolutePath = file.getAbsoluteFile(); + eventWriter.flush(); + isFileUploadedToS3 = uploadToAmazonS3(s3SinkConfig, s3Client, RequestBody.fromFile(fileAbsolutePath)); + } catch (IOException e) { + LOG.error("Events unable to save into : {}", s3SinkConfig.getBufferType(), e); + } finally { + removeTemporaryFile(); + } + return isFileUploadedToS3; + } + + /** + * Remove local file after successfully upload to Amazon s3 bucket. + */ + private void removeTemporaryFile() { + if (fileAbsolutePath != null) { + try { + boolean isLocalFileDeleted = Files.deleteIfExists(Paths.get(fileAbsolutePath.toString())); + if (isLocalFileDeleted) { + LOG.info("Local file deleted successfully {}", fileAbsolutePath); + } else { + LOG.warn("Local file not deleted {}", fileAbsolutePath); + } + } catch (IOException e) { + LOG.error("Local file unable to deleted {}", fileAbsolutePath, e); + } + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/ObjectKey.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/ObjectKey.java new file mode 100644 index 0000000000..f1181ef9a5 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/ObjectKey.java @@ -0,0 +1,61 @@ +package org.opensearch.dataprepper.plugins.sink.accumulator; + +import java.util.regex.Pattern; +import org.opensearch.dataprepper.plugins.s3keyindex.S3ObjectIndex; +import org.opensearch.dataprepper.plugins.sink.S3SinkConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Building the path prefix and name pattern. + * + */ +public class ObjectKey { + + public static final Logger LOG = LoggerFactory.getLogger(ObjectKey.class); + private static final String DEFAULT_CODEC_FILE_EXTENSION = "json"; + private static final String TIME_PATTERN_REGULAR_EXPRESSION = "\\%\\{.*?\\}"; + private static final Pattern SIMPLE_DURATION_PATTERN = Pattern.compile(TIME_PATTERN_REGULAR_EXPRESSION); + + private ObjectKey() { + } + + /** + * Building path inside bucket based on path_prefix. + * + * @param s3SinkConfig + * @return s3ObjectPath + */ + public static String buildingPathPrefix(final S3SinkConfig s3SinkConfig) { + String pathPrefix = s3SinkConfig.getBucketOptions().getObjectKeyOptions().getPathPrefix(); + StringBuilder s3ObjectPath = new StringBuilder(); + if (pathPrefix != null && !pathPrefix.isEmpty()) { + String[] pathPrefixList = pathPrefix.split("\\/"); + for (int i = 0; i < pathPrefixList.length; i++) { + if (SIMPLE_DURATION_PATTERN.matcher(pathPrefixList[i]).find()) { + s3ObjectPath.append(S3ObjectIndex.getObjectPathPrefix(pathPrefixList[i]) + "/"); + } else { + s3ObjectPath.append(pathPrefixList[i] + "/"); + } + } + } + return s3ObjectPath.toString(); + } + + /** + * Get the object file name with the extension. + * + * @param s3SinkConfig + * @return + */ + public static String objectFileName(S3SinkConfig s3SinkConfig) { + String configNamePattern = s3SinkConfig.getBucketOptions().getObjectKeyOptions().getNamePattern(); + int extensionIndex = configNamePattern.lastIndexOf('.'); + if (extensionIndex > 0) { + return S3ObjectIndex.getObjectNameWithDateTimeId(configNamePattern.substring(0, extensionIndex)) + "." + + configNamePattern.substring(extensionIndex + 1); + } else { + return S3ObjectIndex.getObjectNameWithDateTimeId(configNamePattern) + "." + DEFAULT_CODEC_FILE_EXTENSION; + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/Codec.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/Codec.java new file mode 100644 index 0000000000..cf7205a01c --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/Codec.java @@ -0,0 +1,20 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.codec; + +import java.io.IOException; +import org.opensearch.dataprepper.model.event.Event; + +/** + * Each implementation of this class should support parsing a specific type or format of data. See + * sub-classes for examples. + */ +public interface Codec { + /** + * @param event + */ + String parse(Event event) throws IOException; +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodec.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodec.java new file mode 100644 index 0000000000..8a768de6e6 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodec.java @@ -0,0 +1,26 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.codec; + +import java.io.IOException; +import java.util.Objects; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.event.Event; + +/** + * An implementation of {@link Codec} which serializes to JSON. + */ +@DataPrepperPlugin(name = "ndjson", pluginType = Codec.class) +public class JsonCodec implements Codec { + /** + * Generates a serialized json string of the Event + */ + @Override + public String parse(Event event) throws IOException { + Objects.requireNonNull(event); + return event.toJsonString(); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptions.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptions.java new file mode 100644 index 0000000000..7567f2b4a4 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptions.java @@ -0,0 +1,74 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.configuration; + +import java.util.Map; +import java.util.UUID; +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.Size; +import software.amazon.awssdk.arns.Arn; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; +import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; + +/** + * An implementation class AWS Authentication configuration + */ +public class AwsAuthenticationOptions { + @JsonProperty("region") + @Size(min = 1, message = "Region cannot be empty string") + private String awsRegion; + + @JsonProperty("sts_role_arn") + @Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters") + private String awsStsRoleArn; + + @JsonProperty("sts_header_overrides") + @Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override") + private Map awsStsHeaderOverrides; + + /** + * AWS Region configuration + */ + public Region getAwsRegion() { + return awsRegion != null ? Region.of(awsRegion) : null; + } + + /** + * Aws Credentials Provider configuration + */ + public AwsCredentialsProvider authenticateAwsConfiguration() { + + final AwsCredentialsProvider awsCredentialsProvider; + if (awsStsRoleArn != null && !awsStsRoleArn.isEmpty()) { + try { + Arn.fromString(awsStsRoleArn); + } catch (final Exception e) { + throw new IllegalArgumentException("Invalid ARN format for awsStsRoleArn"); + } + + final StsClient stsClient = StsClient.builder().region(getAwsRegion()).build(); + + AssumeRoleRequest.Builder assumeRoleRequestBuilder = AssumeRoleRequest.builder() + .roleSessionName("S3-Sink-" + UUID.randomUUID()).roleArn(awsStsRoleArn); + + if (awsStsHeaderOverrides != null && !awsStsHeaderOverrides.isEmpty()) { + assumeRoleRequestBuilder = assumeRoleRequestBuilder.overrideConfiguration( + configuration -> awsStsHeaderOverrides.forEach(configuration::putHeader)); + } + + awsCredentialsProvider = StsAssumeRoleCredentialsProvider.builder().stsClient(stsClient) + .refreshRequest(assumeRoleRequestBuilder.build()).build(); + + } else { + awsCredentialsProvider = DefaultCredentialsProvider.create(); + } + return awsCredentialsProvider; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/BucketOptions.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/BucketOptions.java new file mode 100644 index 0000000000..6562e07ad5 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/BucketOptions.java @@ -0,0 +1,39 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotNull; + +/** + * An implementation class of bucket name and {@link ObjectKeyOptions} configuration Options + */ +public class BucketOptions { + + @JsonProperty("name") + @NotNull + private String bucketName; + + @JsonProperty("object_key") + private ObjectKeyOptions objectKeyOptions; + + /** + * Read s3 bucket name configuration. + */ + public String getBucketName() { + return bucketName; + } + + /** + * S3 {@link ObjectKeyOptions} configuration Options. + */ + public ObjectKeyOptions getObjectKeyOptions() { + if (objectKeyOptions == null) { + objectKeyOptions = new ObjectKeyOptions(); + } + return objectKeyOptions; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectKeyOptions.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectKeyOptions.java new file mode 100644 index 0000000000..9e47b7400f --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectKeyOptions.java @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * An implementation class of path prefix and file pattern configuration Options + */ +public class ObjectKeyOptions { + private static final String DEFAULT_OBJECT_NAME_PATTERN = "events-%{yyyy-MM-dd'T'hh-mm-ss}"; + + @JsonProperty("path_prefix") + private String pathPrefix; + + /** + * S3 index path configuration Option + */ + public String getPathPrefix() { + return pathPrefix; + } + + /** + * Read s3 object index file pattern configuration + */ + public String getNamePattern() { + return DEFAULT_OBJECT_NAME_PATTERN; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java new file mode 100644 index 0000000000..980ad087a3 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java @@ -0,0 +1,57 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.configuration; + +import java.time.Duration; +import org.hibernate.validator.constraints.time.DurationMax; +import org.hibernate.validator.constraints.time.DurationMin; +import org.opensearch.dataprepper.model.types.ByteCount; +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotNull; +import jakarta.validation.constraints.Size; + +/** + * An implementation class of s3 index configuration Options + */ +public class ThresholdOptions { + + private static final String DEFAULT_BYTE_CAPACITY = "50mb"; + + @JsonProperty("event_count") + @Size(min = 0, max = 10000000, message = "event_count size should be between 0 and 10000000") + @NotNull + private int eventCount; + + @JsonProperty("maximum_size") + private String maximumSize = DEFAULT_BYTE_CAPACITY; + + @JsonProperty("event_collect") + @DurationMin(seconds = 1) + @DurationMax(seconds = 3600) + @NotNull + private Duration eventCollect; + + /** + * Read event collection duration configuration + */ + public Duration getEventCollect() { + return eventCollect; + } + + /** + * Read byte capacity configuration + */ + public ByteCount getMaximumSize() { + return ByteCount.parse(maximumSize); + } + + /** + * Read the event count configuration + */ + public int getEventCount() { + return eventCount; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfigTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfigTest.java new file mode 100644 index 0000000000..317fb9e656 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfigTest.java @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertNull; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.plugins.sink.accumulator.BufferTypeOptions; + +class S3SinkConfigTest { + + private static final int MAX_CONNECTION_RETRIES = 5; + private static final int MAX_UPLOAD_RETRIES = 5; + + @Test + void default_buffer_type_option_test() { + assertThat(new S3SinkConfig().getBufferType(), equalTo(BufferTypeOptions.INMEMORY)); + } + + @Test + void default_max_connection_retries_test() throws NoSuchFieldException, IllegalAccessException { + assertThat(new S3SinkConfig().getMaxConnectionRetries(), equalTo(MAX_CONNECTION_RETRIES)); + } + + @Test + void default_max_upload_retries_test() throws NoSuchFieldException, IllegalAccessException { + assertThat(new S3SinkConfig().getMaxUploadRetries(), equalTo(MAX_UPLOAD_RETRIES)); + } + + @Test + void get_bucket_option_test() { + assertThat(new S3SinkConfig().getBucketOptions(), equalTo(null)); + } + + @Test + void get_threshold_option_test() { + assertThat(new S3SinkConfig().getThresholdOptions(), equalTo(null)); + } + + @Test + void get_AWS_Auth_options_in_sinkconfig_exception() { + assertNull(new S3SinkConfig().getAwsAuthenticationOptions()); + } + + @Test + void get_json_codec_test() { + assertNull(new S3SinkConfig().getCodec()); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceTest.java new file mode 100644 index 0000000000..10cad90267 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceTest.java @@ -0,0 +1,116 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.only; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.sink.accumulator.BufferTypeOptions; +import org.opensearch.dataprepper.plugins.sink.configuration.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.sink.configuration.BucketOptions; +import org.opensearch.dataprepper.plugins.sink.configuration.ObjectKeyOptions; +import org.opensearch.dataprepper.plugins.sink.configuration.ThresholdOptions; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; + +class S3SinkServiceTest { + + private S3SinkConfig s3SinkConfig; + private ThresholdOptions thresholdOptions; + private AwsAuthenticationOptions awsAuthenticationOptions; + private AwsCredentialsProvider awsCredentialsProvider; + private BucketOptions bucketOptions; + private ObjectKeyOptions objectKeyOptions; + + @BeforeEach + void setUp() throws Exception { + + s3SinkConfig = mock(S3SinkConfig.class); + thresholdOptions = mock(ThresholdOptions.class); + bucketOptions = mock(BucketOptions.class); + objectKeyOptions = mock(ObjectKeyOptions.class); + awsAuthenticationOptions = mock(AwsAuthenticationOptions.class); + + when(objectKeyOptions.getNamePattern()).thenReturn("my-elb-%{yyyy-MM-dd'T'hh-mm-ss}"); + when(s3SinkConfig.getThresholdOptions()).thenReturn(thresholdOptions); + when(s3SinkConfig.getThresholdOptions().getEventCount()).thenReturn(100); + when(s3SinkConfig.getThresholdOptions().getMaximumSize()).thenReturn(ByteCount.parse("1kb")); + when(s3SinkConfig.getThresholdOptions().getEventCollect()).thenReturn(Duration.ofSeconds(5)); + when(s3SinkConfig.getBufferType()).thenReturn(BufferTypeOptions.LOCALFILE); + + when(s3SinkConfig.getBucketOptions()).thenReturn(bucketOptions); + when(s3SinkConfig.getBucketOptions().getObjectKeyOptions()).thenReturn(objectKeyOptions); + when(s3SinkConfig.getBucketOptions().getBucketName()).thenReturn("dataprepper"); + when(s3SinkConfig.getBucketOptions().getObjectKeyOptions().getPathPrefix()).thenReturn("logdata/"); + + when(s3SinkConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions); + when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.of("us-east-1")); + when(awsAuthenticationOptions.authenticateAwsConfiguration()).thenReturn(awsCredentialsProvider); + } + + @Test + void test_call_createS3Client() { + S3SinkService s3SinkService = new S3SinkService(s3SinkConfig); + S3Client s3Client = s3SinkService.createS3Client(); + assertNotNull(s3Client); + } + + @Test + void test_call_processRecords() { + S3SinkService s3SinkService = new S3SinkService(s3SinkConfig); + assertNotNull(s3SinkService); + s3SinkService.processRecords(generateRandomStringEventRecord()); + s3SinkService = mock(S3SinkService.class); + Collection> records = generateRandomStringEventRecord(); + s3SinkService.processRecords(records); + verify(s3SinkService, atLeastOnce()).processRecords(records); + } + + @Test + void test_call_bufferAccumulator() { + Collection> records = generateRandomStringEventRecord(); + S3SinkService service = new S3SinkService(s3SinkConfig); + S3SinkWorker worker = mock(S3SinkWorker.class); + service.processRecords(records); + service.accumulateBufferEvents(worker); + verify(worker, only()).bufferAccumulator(any(BlockingQueue.class)); + } + + private Collection> generateRandomStringEventRecord() { + Collection> records = new ArrayList<>(); + for (int i = 0; i < 50; i++) { + final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + records.add(new Record<>(event)); + } + return records; + } + + private BlockingQueue generateEventQueue() { + BlockingQueue eventQueue = new ArrayBlockingQueue<>(100); + for (int i = 0; i < 50; i++) { + final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + eventQueue.add(event); + } + return eventQueue; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkTest.java new file mode 100644 index 0000000000..03b9bd0806 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkTest.java @@ -0,0 +1,165 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.UUID; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.InOrder; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.sink.accumulator.BufferTypeOptions; +import org.opensearch.dataprepper.plugins.sink.codec.Codec; +import org.opensearch.dataprepper.plugins.sink.codec.JsonCodec; +import org.opensearch.dataprepper.plugins.sink.configuration.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.sink.configuration.ObjectKeyOptions; +import org.opensearch.dataprepper.plugins.sink.configuration.ThresholdOptions; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.regions.Region; + +class S3SinkTest { + + private S3SinkConfig s3SinkConfig; + private ThresholdOptions thresholdOptions; + private AwsAuthenticationOptions awsAuthenticationOptions; + private AwsCredentialsProvider awsCredentialsProvider; + private ObjectKeyOptions objectKeyOptions; + private JsonCodec codec; + private S3Sink s3Sink; + private PluginSetting pluginSetting; + private PluginFactory pluginFactory; + private PluginModel pluginModel; + + @BeforeEach + void setUp() { + + s3SinkConfig = mock(S3SinkConfig.class); + thresholdOptions = mock(ThresholdOptions.class); + awsAuthenticationOptions = mock(AwsAuthenticationOptions.class); + codec = mock(JsonCodec.class); + objectKeyOptions = mock(ObjectKeyOptions.class); + + pluginSetting = mock(PluginSetting.class); + pluginModel = mock(PluginModel.class); + pluginFactory = mock(PluginFactory.class); + + when(s3SinkConfig.getThresholdOptions()).thenReturn(thresholdOptions); + when(s3SinkConfig.getThresholdOptions().getEventCount()).thenReturn(100); + when(s3SinkConfig.getThresholdOptions().getMaximumSize()).thenReturn(ByteCount.parse("1kb")); + when(s3SinkConfig.getThresholdOptions().getEventCollect()).thenReturn(Duration.ofSeconds(5)); + + when(objectKeyOptions.getNamePattern()).thenReturn("my-elb-%{yyyy-MM-dd'T'hh-mm-ss}"); + + when(s3SinkConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions); + when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.of("us-east-1")); + when(awsAuthenticationOptions.authenticateAwsConfiguration()).thenReturn(awsCredentialsProvider); + + when(s3SinkConfig.getCodec()).thenReturn(pluginModel); + when(pluginModel.getPluginName()).thenReturn("json"); + when(pluginFactory.loadPlugin(Codec.class, pluginSetting)).thenReturn(codec); + + when(pluginSetting.getName()).thenReturn("s3"); + when(pluginSetting.getPipelineName()).thenReturn("S3-sink-pipeline"); + } + + @Test + void test_s3_sink_plugin_isReady_true() { + s3Sink = new S3Sink(pluginSetting, s3SinkConfig, pluginFactory); + s3Sink.doInitialize(); + assertTrue(s3Sink.isReady(), "s3 sink is not ready to work"); + } + + @Test + void test_s3_Sink_plugin_isReady_false() { + s3Sink = new S3Sink(pluginSetting, s3SinkConfig, pluginFactory); + assertFalse(s3Sink.isReady(), "s3 sink is ready to work"); + } + + @Test + void test_doOutput_with_exception() { + s3Sink = new S3Sink(pluginSetting, s3SinkConfig, pluginFactory); + when(s3SinkConfig.getThresholdOptions()).thenReturn(null); + assertThrows(NullPointerException.class, s3Sink::doInitialize); + } + + @Test + void test_doOutput_with_data_local_file() { + when(s3SinkConfig.getBufferType()).thenReturn(BufferTypeOptions.LOCALFILE); + s3Sink = new S3Sink(pluginSetting, s3SinkConfig, pluginFactory); + s3Sink.doInitialize(); + s3Sink.doOutput(generateRandomStringEventRecord()); + assertNotNull(s3Sink); + } + + @Test + void test_doOutput_with_data_in_memory() { + when(s3SinkConfig.getBufferType()).thenReturn(BufferTypeOptions.INMEMORY); + s3Sink = new S3Sink(pluginSetting, s3SinkConfig, pluginFactory); + s3Sink.doInitialize(); + s3Sink.doOutput(generateRandomStringEventRecord()); + assertNotNull(s3Sink); + } + + @Test + void test_doOutput_noInteractions_with_s3SinkService() { + Collection> records = generateRandomStringEventRecord(); + S3SinkService s3SinkService = mock(S3SinkService.class); + S3SinkWorker s3Worker = mock(S3SinkWorker.class); + s3Sink = new S3Sink(pluginSetting, s3SinkConfig, pluginFactory); + s3Sink.doInitialize(); + s3Sink.doOutput(records); + verifyNoInteractions(s3SinkService); + verifyNoInteractions(s3Worker); + } + + @Test + void test_doOutput_invocation_order() { + Collection> records = generateRandomStringEventRecord(); + + S3SinkService s3SinkService = mock(S3SinkService.class); + S3SinkWorker worker = mock(S3SinkWorker.class); + + s3SinkService.createS3Client(); + s3SinkService.processRecords(records); + s3SinkService.accumulateBufferEvents(worker); + + InOrder inOrder = inOrder(s3SinkService); + inOrder.verify(s3SinkService).createS3Client(); + inOrder.verify(s3SinkService).processRecords(records); + inOrder.verify(s3SinkService).accumulateBufferEvents(worker); + + verifyNoMoreInteractions(s3SinkService); + } + + private Collection> generateRandomStringEventRecord() { + + Collection> records = new ArrayList<>(); + for (int i = 0; i < 50; i++) { + final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + records.add(new Record<>(event)); + } + return records; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorkerTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorkerTest.java new file mode 100644 index 0000000000..e75b0fb2a3 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorkerTest.java @@ -0,0 +1,154 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; +import java.io.IOException; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.sink.accumulator.BufferTypeOptions; +import org.opensearch.dataprepper.plugins.sink.accumulator.InMemoryBuffer; +import org.opensearch.dataprepper.plugins.sink.accumulator.LocalFileBuffer; +import org.opensearch.dataprepper.plugins.sink.codec.Codec; +import org.opensearch.dataprepper.plugins.sink.codec.JsonCodec; +import org.opensearch.dataprepper.plugins.sink.configuration.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.sink.configuration.BucketOptions; +import org.opensearch.dataprepper.plugins.sink.configuration.ObjectKeyOptions; +import org.opensearch.dataprepper.plugins.sink.configuration.ThresholdOptions; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; + +class S3SinkWorkerTest { + + private S3SinkConfig s3SinkConfig; + private ThresholdOptions thresholdOptions; + private AwsAuthenticationOptions awsAuthenticationOptions; + private AwsCredentialsProvider awsCredentialsProvider; + private BucketOptions bucketOptions; + private ObjectKeyOptions objectKeyOptions; + private JsonCodec codec; + private PluginSetting pluginSetting; + private PluginFactory pluginFactory; + private PluginModel pluginModel; + + @BeforeEach + void setUp() throws Exception { + + s3SinkConfig = mock(S3SinkConfig.class); + thresholdOptions = mock(ThresholdOptions.class); + bucketOptions = mock(BucketOptions.class); + objectKeyOptions = mock(ObjectKeyOptions.class); + awsAuthenticationOptions = mock(AwsAuthenticationOptions.class); + codec = mock(JsonCodec.class); + + pluginSetting = mock(PluginSetting.class); + pluginModel = mock(PluginModel.class); + pluginFactory = mock(PluginFactory.class); + + when(s3SinkConfig.getThresholdOptions()).thenReturn(thresholdOptions); + when(s3SinkConfig.getThresholdOptions().getEventCount()).thenReturn(100); + when(s3SinkConfig.getThresholdOptions().getMaximumSize()).thenReturn(ByteCount.parse("1kb")); + when(s3SinkConfig.getThresholdOptions().getEventCollect()).thenReturn(Duration.ofSeconds(5)); + + when(objectKeyOptions.getNamePattern()).thenReturn("my-elb-%{yyyy-MM-dd'T'hh-mm-ss}"); + + when(s3SinkConfig.getBufferType()).thenReturn(BufferTypeOptions.LOCALFILE); + + when(s3SinkConfig.getBucketOptions()).thenReturn(bucketOptions); + when(s3SinkConfig.getBucketOptions().getObjectKeyOptions()).thenReturn(objectKeyOptions); + when(s3SinkConfig.getBucketOptions().getBucketName()).thenReturn("dataprepper"); + when(s3SinkConfig.getBucketOptions().getObjectKeyOptions().getPathPrefix()).thenReturn("logdata/"); + + when(s3SinkConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions); + when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.of("us-east-1")); + when(awsAuthenticationOptions.authenticateAwsConfiguration()).thenReturn(awsCredentialsProvider); + + when(s3SinkConfig.getCodec()).thenReturn(pluginModel); + when(pluginModel.getPluginName()).thenReturn("json"); + when(pluginFactory.loadPlugin(Codec.class, pluginSetting)).thenReturn(codec); + + when(pluginSetting.getName()).thenReturn("s3"); + when(pluginSetting.getPipelineName()).thenReturn("S3-sink-pipeline"); + } + + @Test + void verify_interactions() throws InterruptedException { + BlockingQueue queue = generateEventQueue(); + + S3SinkWorker worker = mock(S3SinkWorker.class); + worker.bufferAccumulator(queue); + + InMemoryBuffer inMemoryBuffer = mock(InMemoryBuffer.class); + verifyNoInteractions(inMemoryBuffer); + + LocalFileBuffer localFileBuffer = mock(LocalFileBuffer.class); + verifyNoInteractions(localFileBuffer); + } + + @Test + void test_cover_localFile_bufferAccumulator() throws IOException { + when(s3SinkConfig.getBufferType()).thenReturn(BufferTypeOptions.LOCALFILE); + when(codec.parse(any())).thenReturn("{\"message\":\"31824252-adba-4c47-a2ac-05d16c5b8140\"}"); + S3SinkService s3SinkService = new S3SinkService(s3SinkConfig); + S3Client s3Client = s3SinkService.createS3Client(); + S3SinkWorker worker = new S3SinkWorker(s3Client, s3SinkConfig, codec); + assertNotNull(worker); + worker = mock(S3SinkWorker.class); + BlockingQueue queue = generateEventQueue(); + worker.bufferAccumulator(queue); + verify(worker, atLeastOnce()).bufferAccumulator(queue); + } + + @Test + void test_cover_inMemory_bufferAccumulator() throws IOException { + when(s3SinkConfig.getBufferType()).thenReturn(BufferTypeOptions.INMEMORY); + when(codec.parse(any())).thenReturn("{\"message\":\"31824252-adba-4c47-a2ac-05d16c5b8140\"}"); + S3SinkService s3SinkService = new S3SinkService(s3SinkConfig); + S3Client s3Client = s3SinkService.createS3Client(); + S3SinkWorker worker = new S3SinkWorker(s3Client, s3SinkConfig, codec); + assertNotNull(worker); + worker = mock(S3SinkWorker.class); + BlockingQueue queue = generateEventQueue(); + worker.bufferAccumulator(queue); + verify(worker, atLeastOnce()).bufferAccumulator(queue); + } + + @Test + void test_cover_exception() throws IOException { + when(s3SinkConfig.getBufferType()).thenReturn(BufferTypeOptions.LOCALFILE); + when(codec.parse(any())).thenReturn("{\"message\":\"31824252-adba-4c47-a2ac-05d16c5b8140\"}"); + S3SinkWorker worker = new S3SinkWorker(null, s3SinkConfig, codec); + assertNotNull(worker); + assertThrows(Throwable.class, () -> worker.bufferAccumulator(generateEventQueue())); + } + + private BlockingQueue generateEventQueue() { + BlockingQueue eventQueue = new ArrayBlockingQueue<>(100); + for (int i = 0; i < 50; i++) { + final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + eventQueue.add(event); + } + return eventQueue; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/accumulator/BufferTypeOptionsTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/accumulator/BufferTypeOptionsTest.java new file mode 100644 index 0000000000..a17e625104 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/accumulator/BufferTypeOptionsTest.java @@ -0,0 +1,17 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.accumulator; + +import static org.junit.Assert.assertNotNull; +import org.junit.jupiter.api.Test; + +class BufferTypeOptionsTest { + + @Test + void test_notNull() { + assertNotNull(BufferTypeOptions.LOCALFILE); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/accumulator/BufferTypeTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/accumulator/BufferTypeTest.java new file mode 100644 index 0000000000..15defffa3a --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/accumulator/BufferTypeTest.java @@ -0,0 +1,89 @@ +package org.opensearch.dataprepper.plugins.sink.accumulator; + +import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.util.NavigableSet; +import java.util.UUID; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mapdb.DB; +import org.mapdb.DBMaker; +import org.mapdb.Serializer; +import org.mockito.Mock; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.plugins.sink.S3SinkConfig; +import org.opensearch.dataprepper.plugins.sink.configuration.BucketOptions; +import org.opensearch.dataprepper.plugins.sink.configuration.ObjectKeyOptions; + +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; + +class BufferTypeTest { + + + private S3Client s3Client; + + private S3SinkConfig s3SinkConfig; + @Mock + private BufferType bufferType; + private String codecFileExtension = null; + private static final String DEFAULT_CODEC_FILE_EXTENSION = "json"; + + @BeforeEach + public void setUp() { + + s3Client = S3Client.builder().region(Region.of("us-east-1")).build(); + String bucket = "dataprepper"; + s3SinkConfig = mock(S3SinkConfig.class); + BucketOptions bucketOptions = mock(BucketOptions.class); + ObjectKeyOptions objectKeyOptions = mock(ObjectKeyOptions.class); + PluginModel pluginModel = mock(PluginModel.class); + when(s3SinkConfig.getCodec()).thenReturn(pluginModel); + + codecFileExtension = s3SinkConfig.getCodec().getPluginName(); + if (codecFileExtension == null || codecFileExtension.isEmpty()) { + codecFileExtension = DEFAULT_CODEC_FILE_EXTENSION; + } + when(s3SinkConfig.getBucketOptions()).thenReturn(bucketOptions); + when(s3SinkConfig.getBucketOptions().getObjectKeyOptions()).thenReturn(objectKeyOptions); + when(objectKeyOptions.getNamePattern()).thenReturn("my-elb-%{yyyy-MM-dd'T'hh-mm-ss}"); + when(s3SinkConfig.getBucketOptions().getBucketName()).thenReturn(bucket); + } + + @Test + void process_upload_to_s3_bucket() throws InterruptedException { + BufferType bufferType = spy(BufferType.class); + NavigableSet bufferedEventSet = generateSet(); + StringBuilder eventBuilder = new StringBuilder(); + for (String event : bufferedEventSet) { + eventBuilder.append(event); + } + Boolean uploadSuccess = bufferType.uploadToAmazonS3(s3SinkConfig,s3Client,RequestBody.fromString(eventBuilder.toString())); + assertEquals(Boolean.TRUE,uploadSuccess); + } + + @Test + void start_should_throw_IllegalStateException_when_buffer_is_null() { + BufferType bufferType = spy(BufferType.class); + + assertThrows(NullPointerException.class, () -> bufferType.uploadToAmazonS3(s3SinkConfig,s3Client,null)); + } + + private NavigableSet generateSet() { + DB eventDb = DBMaker.memoryDB().make(); + NavigableSet bufferedEventSet = eventDb.treeSet("set").serializer(Serializer.STRING).createOrOpen(); + for (int i = 0; i < 5; i++) { + final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + bufferedEventSet.add(event.toString()); + } + return bufferedEventSet; + } +} diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/accumulator/InMemoryBufferTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/accumulator/InMemoryBufferTest.java new file mode 100644 index 0000000000..6c7ad3ade6 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/accumulator/InMemoryBufferTest.java @@ -0,0 +1,156 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.accumulator; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; +import java.util.NavigableSet; +import java.util.UUID; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mapdb.DB; +import org.mapdb.DBMaker; +import org.mapdb.Serializer; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.plugins.sink.S3SinkConfig; +import org.opensearch.dataprepper.plugins.sink.S3SinkService; +import org.opensearch.dataprepper.plugins.sink.configuration.BucketOptions; +import org.opensearch.dataprepper.plugins.sink.configuration.ObjectKeyOptions; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; + +@ExtendWith(MockitoExtension.class) +class InMemoryBufferTest { + private static final String DEFAULT_CODEC_FILE_EXTENSION = "json"; + @Mock + private S3SinkConfig s3SinkConfig; + @Mock + private BucketOptions bucketOptions; + @Mock + private ObjectKeyOptions objectKeyOptions; + @Mock + private PluginModel pluginModel; + + @BeforeEach + void setUp() throws Exception { + } + + @Test + void verify_interactions_putObject() throws InterruptedException { + NavigableSet bufferedEventSet = generateSet(); + S3Client s3Client = mock(S3Client.class); + + when(s3SinkConfig.getBucketOptions()).thenReturn(bucketOptions); + when(s3SinkConfig.getBucketOptions().getObjectKeyOptions()).thenReturn(objectKeyOptions); + when(s3SinkConfig.getBucketOptions().getBucketName()).thenReturn("dataprepper"); + when(s3SinkConfig.getBucketOptions().getObjectKeyOptions().getPathPrefix()).thenReturn("logdata/"); + when(objectKeyOptions.getNamePattern()).thenReturn("my-elb-%{yyyy-MM-dd'T'hh-mm-ss}"); + + InMemoryBuffer inMemoryBuffer = new InMemoryBuffer(s3Client, s3SinkConfig); + inMemoryBuffer.inMemoryAccumulate(bufferedEventSet); + verify(s3Client).putObject(any(PutObjectRequest.class), any(RequestBody.class)); + } + + @Test + void verify_interactions_with_upload() throws InterruptedException { + NavigableSet bufferedEventSet = generateSet(); + InMemoryBuffer inMemoryBuffer = mock(InMemoryBuffer.class); + inMemoryBuffer.inMemoryAccumulate(bufferedEventSet); + verify(inMemoryBuffer, never()).uploadToAmazonS3(any(), any(), any()); + } + + @Test + void verify_interactions_s3Client() { + S3SinkService s3SinkService = mock(S3SinkService.class); + s3SinkService.createS3Client(); + verify(s3SinkService).createS3Client(); + verifyNoMoreInteractions(s3SinkService); + } + + @Test + void test_in_order_invocation() throws InterruptedException { + NavigableSet bufferedEventSet = generateSet(); + + InMemoryBuffer inMemoryBuffer = mock(InMemoryBuffer.class); + inMemoryBuffer.inMemoryAccumulate(bufferedEventSet); + inMemoryBuffer.uploadToAmazonS3(any(), any(), any()); + + S3SinkService s3SinkService = mock(S3SinkService.class); + s3SinkService.createS3Client(); + + InOrder inOrder = inOrder(inMemoryBuffer, s3SinkService); + inOrder.verify(inMemoryBuffer).inMemoryAccumulate(bufferedEventSet); + inOrder.verify(inMemoryBuffer).uploadToAmazonS3(any(), any(), any()); + inOrder.verify(s3SinkService).createS3Client(); + + verifyNoMoreInteractions(inMemoryBuffer); + verifyNoMoreInteractions(s3SinkService); + } + + @Test + void test_in_memoryAccumulate_with_s3Upload_success() throws InterruptedException { + + S3SinkService s3SinkService = mock(S3SinkService.class); + S3Client s3Client = mock(S3Client.class); + + when(s3SinkConfig.getBucketOptions()).thenReturn(bucketOptions); + when(s3SinkConfig.getBucketOptions().getObjectKeyOptions()).thenReturn(objectKeyOptions); + when(s3SinkConfig.getBucketOptions().getBucketName()).thenReturn("dataprepper"); + when(s3SinkConfig.getBucketOptions().getObjectKeyOptions().getPathPrefix()).thenReturn("logdata/"); + when(objectKeyOptions.getNamePattern()).thenReturn("my-elb-%{yyyy-MM-dd'T'hh-mm-ss}"); + + InMemoryBuffer inMemoryBuffer = new InMemoryBuffer(s3Client, s3SinkConfig); + assertNotNull(inMemoryBuffer); + assertTrue(inMemoryBuffer.inMemoryAccumulate(generateSet())); + verify(s3SinkService, never()).createS3Client(); + } + + @Test + void test_in_memoryAccumulate_with_s3Upload_fail() throws InterruptedException { + InMemoryBuffer inMemoryBuffer = mock(InMemoryBuffer.class); + assertNotNull(inMemoryBuffer); + assertFalse(inMemoryBuffer.inMemoryAccumulate(generateSet())); + } + + @Test + void test_in_memoryAccumulate_cover_exception() throws InterruptedException { + InMemoryBuffer inMemoryBuffer = new InMemoryBuffer(null, s3SinkConfig); + assertNotNull(inMemoryBuffer); + assertThrows(Throwable.class, () -> inMemoryBuffer.inMemoryAccumulate(generateSet())); + } + + @Test + void test_default_constructor_notNull() { + InMemoryBuffer inMemoryBuffer = new InMemoryBuffer(); + assertNotNull(inMemoryBuffer); + } + + private NavigableSet generateSet() { + DB eventDb = DBMaker.memoryDB().make(); + NavigableSet bufferedEventSet = eventDb.treeSet("set").serializer(Serializer.STRING).createOrOpen(); + for (int i = 0; i < 50; i++) { + final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + bufferedEventSet.add(event.toString()); + } + return bufferedEventSet; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/accumulator/LocalFileBufferTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/accumulator/LocalFileBufferTest.java new file mode 100644 index 0000000000..8a6af7dc84 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/accumulator/LocalFileBufferTest.java @@ -0,0 +1,120 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.accumulator; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; +import java.time.Duration; +import java.util.NavigableSet; +import java.util.UUID; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mapdb.DB; +import org.mapdb.DBMaker; +import org.mapdb.Serializer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.sink.S3SinkConfig; +import org.opensearch.dataprepper.plugins.sink.S3SinkService; +import org.opensearch.dataprepper.plugins.sink.configuration.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.sink.configuration.BucketOptions; +import org.opensearch.dataprepper.plugins.sink.configuration.ObjectKeyOptions; +import org.opensearch.dataprepper.plugins.sink.configuration.ThresholdOptions; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +class LocalFileBufferTest { + + private S3SinkConfig s3SinkConfig; + private ThresholdOptions thresholdOptions; + private BucketOptions bucketOptions; + private AwsAuthenticationOptions awsAuthenticationOptions; + private AwsCredentialsProvider awsCredentialsProvider; + private ObjectKeyOptions objectKeyOptions; + + @BeforeEach + void setUp() throws Exception { + + s3SinkConfig = mock(S3SinkConfig.class); + thresholdOptions = mock(ThresholdOptions.class); + bucketOptions = mock(BucketOptions.class); + awsAuthenticationOptions = mock(AwsAuthenticationOptions.class); + objectKeyOptions = mock(ObjectKeyOptions.class); + + when(s3SinkConfig.getThresholdOptions()).thenReturn(thresholdOptions); + when(s3SinkConfig.getThresholdOptions().getEventCount()).thenReturn(100); + when(s3SinkConfig.getThresholdOptions().getMaximumSize()).thenReturn(ByteCount.parse("1kb")); + when(s3SinkConfig.getThresholdOptions().getEventCollect()).thenReturn(Duration.ofSeconds(5)); + + when(objectKeyOptions.getNamePattern()).thenReturn("my-elb-%{yyyy-MM-dd'T'hh-mm-ss}"); + + when(s3SinkConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions); + when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.of("us-east-1")); + when(awsAuthenticationOptions.authenticateAwsConfiguration()).thenReturn(awsCredentialsProvider); + + when(s3SinkConfig.getBucketOptions()).thenReturn(bucketOptions); + when(s3SinkConfig.getBucketOptions().getObjectKeyOptions()).thenReturn(objectKeyOptions); + when(s3SinkConfig.getBucketOptions().getBucketName()).thenReturn("dataprepper"); + when(s3SinkConfig.getBucketOptions().getObjectKeyOptions().getPathPrefix()).thenReturn("logdata/"); + } + + @Test + void verify_interactions(){ + S3SinkService s3SinkService = mock(S3SinkService.class); + s3SinkService.createS3Client(); + verify(s3SinkService).createS3Client(); + verifyNoMoreInteractions(s3SinkService); + } + + @Test + void test_local_file_accumulate_with_s3Upload_success() throws InterruptedException { + S3SinkService s3SinkService = mock(S3SinkService.class); + S3Client s3Client = mock(S3Client.class); + + LocalFileBuffer localFileBuffer = new LocalFileBuffer(s3Client, s3SinkConfig); + assertNotNull(localFileBuffer); + assertTrue(localFileBuffer.localFileAccumulate(generateSet())); + verify(s3SinkService, never()).createS3Client(); + } + + @Test + void test_local_file_accumulate_with_s3Upload_fail() throws InterruptedException { + LocalFileBuffer localFileBuffer = mock(LocalFileBuffer.class); + assertNotNull(localFileBuffer); + assertFalse(localFileBuffer.localFileAccumulate (generateSet())); + } + + @Test + void test_local_file_accumulate_cover_exception() throws InterruptedException { + LocalFileBuffer localFileBuffer = new LocalFileBuffer(null, s3SinkConfig); + assertNotNull(localFileBuffer); + assertThrows(Throwable.class, () -> localFileBuffer.localFileAccumulate(generateSet())); + } + + @Test + void test_default_constructor_notNull() { + LocalFileBuffer localFileBuffer = new LocalFileBuffer(); + assertNotNull(localFileBuffer); + } + + private NavigableSet generateSet() { + DB eventDb = DBMaker.memoryDB().make(); + NavigableSet bufferedEventSet = eventDb.treeSet("set").serializer(Serializer.STRING).createOrOpen(); + for (int i = 0; i < 50; i++) { + final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + bufferedEventSet.add(event.toString()); + } + return bufferedEventSet; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/accumulator/ObjectKeyTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/accumulator/ObjectKeyTest.java new file mode 100644 index 0000000000..d0a2996c2d --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/accumulator/ObjectKeyTest.java @@ -0,0 +1,84 @@ +package org.opensearch.dataprepper.plugins.sink.accumulator; + +import static org.hamcrest.CoreMatchers.startsWith; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.when; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.plugins.sink.S3SinkConfig; +import org.opensearch.dataprepper.plugins.sink.codec.JsonCodec; +import org.opensearch.dataprepper.plugins.sink.configuration.BucketOptions; +import org.opensearch.dataprepper.plugins.sink.configuration.ObjectKeyOptions; + +@ExtendWith(MockitoExtension.class) +class ObjectKeyTest { + + @Mock + private ObjectKey objectKey; + @Mock + private S3SinkConfig s3SinkConfig; + @Mock + private PluginModel pluginModel; + @Mock + private PluginSetting pluginSetting; + @Mock + private PluginFactory pluginFactory; + @Mock + private JsonCodec codec; + @Mock + private BucketOptions bucketOptions; + @Mock + private ObjectKeyOptions objectKeyOptions; + + @BeforeEach + void setUp() throws Exception { + when(s3SinkConfig.getBucketOptions()).thenReturn(bucketOptions); + when(s3SinkConfig.getBucketOptions().getObjectKeyOptions()).thenReturn(objectKeyOptions); + } + + @Test + void test_buildingPathPrefix() { + + when(objectKeyOptions.getPathPrefix()).thenReturn("events/%{yyyy}/%{MM}/%{dd}/"); + String pathPrefix = ObjectKey.buildingPathPrefix(s3SinkConfig); + assertNotNull(pathPrefix); + assertThat(pathPrefix, startsWith("events")); + } + + @Test + void test_objectFileName() { + + when(objectKeyOptions.getNamePattern()).thenReturn("my-elb-%{yyyy-MM-dd'T'hh-mm-ss}"); + String objectFileName = ObjectKey.objectFileName(s3SinkConfig); + assertNotNull(objectFileName); + assertThat(objectFileName, startsWith("my-elb")); + } + + @Test + void test_objectFileName_with_fileExtension() { + + when(s3SinkConfig.getBucketOptions().getObjectKeyOptions().getNamePattern()) + .thenReturn("events-%{yyyy-MM-dd'T'hh-mm-ss}.pdf"); + String objectFileName = ObjectKey.objectFileName(s3SinkConfig); + assertNotNull(objectFileName); + assertTrue(objectFileName.contains(".pdf")); + } + + @Test + void test_objectFileName_default_fileExtension() { + + when(s3SinkConfig.getBucketOptions().getObjectKeyOptions().getNamePattern()) + .thenReturn("events-%{yyyy-MM-dd'T'hh-mm-ss}"); + String objectFileName = ObjectKey.objectFileName(s3SinkConfig); + assertNotNull(objectFileName); + assertTrue(objectFileName.contains(".json")); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodecTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodecTest.java new file mode 100644 index 0000000000..41b13595b4 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodecTest.java @@ -0,0 +1,47 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.codec; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.log.JacksonLog; +import com.fasterxml.jackson.databind.ObjectMapper; + +class JsonCodecTest { + + @Test + void parse_with_events_output_stream_json_codec() throws IOException { + + final Map eventData = new HashMap<>(); + String value1 = UUID.randomUUID().toString(); + eventData.put("key1", value1); + String value2 = UUID.randomUUID().toString(); + eventData.put("key2", value2); + final JacksonEvent event = JacksonLog.builder().withData(eventData).withEventType("LOG").build(); + String output = createObjectUnderTest().parse(event); + assertNotNull(output); + + ObjectMapper objectMapper = new ObjectMapper(); + Map deserializedData = objectMapper.readValue(output, Map.class); + assertThat(deserializedData, notNullValue()); + assertThat(deserializedData.get("key1"), notNullValue()); + assertThat(deserializedData.get("key1"), equalTo(value1)); + assertThat(deserializedData.get("key2"), notNullValue()); + assertThat(deserializedData.get("key2"), equalTo(value2)); + } + + private JsonCodec createObjectUnderTest() { + return new JsonCodec(); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptionsTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptionsTest.java new file mode 100644 index 0000000000..f6fa40be48 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptionsTest.java @@ -0,0 +1,62 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.configuration; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.Map; +import java.util.UUID; +import java.util.function.Consumer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.MockedStatic; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.services.sts.StsClientBuilder; +import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; + +class AwsAuthenticationOptionsTest { + + private AwsAuthenticationOptions awsAuthenticationOptions; + + @BeforeEach + void setUp() { + awsAuthenticationOptions = new AwsAuthenticationOptions(); + } + + @Test + void getAwsRegion_returns_null_when_region_is_null() throws NoSuchFieldException, IllegalAccessException { + reflectivelySetField(awsAuthenticationOptions, "awsRegion", null); + assertThat(awsAuthenticationOptions.getAwsRegion(), nullValue()); + } + private void reflectivelySetField(final AwsAuthenticationOptions awsAuthenticationOptions, final String fieldName, + final Object value) throws NoSuchFieldException, IllegalAccessException { + final Field field = AwsAuthenticationOptions.class.getDeclaredField(fieldName); + try { + field.setAccessible(true); + field.set(awsAuthenticationOptions, value); + } finally { + field.setAccessible(false); + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/BucketOptionsTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/BucketOptionsTest.java new file mode 100644 index 0000000000..eee4a7370c --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/BucketOptionsTest.java @@ -0,0 +1,24 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.configuration; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import org.junit.jupiter.api.Test; + +class BucketOptionsTest { + + @Test + void get_bucket_name_test() { + assertThat(new BucketOptions().getBucketName(), equalTo(null)); + } + + @Test + void get_object_key_test() { + assertThat("Object key is not an instance of ObjectKeyOptions", + new BucketOptions().getObjectKeyOptions() instanceof ObjectKeyOptions); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectKeyOptionsTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectKeyOptionsTest.java new file mode 100644 index 0000000000..a18842747c --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectKeyOptionsTest.java @@ -0,0 +1,25 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.configuration; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import org.junit.jupiter.api.Test; + +class ObjectKeyOptionsTest { + + private static final String DEFAULT_FILE_PATTERN = "events-%{yyyy-MM-dd'T'hh-mm-ss}"; + + @Test + void default_file_pattern_test() { + assertThat(new ObjectKeyOptions().getNamePattern(), equalTo(DEFAULT_FILE_PATTERN)); + } + + @Test + void default_path_prefix_test() { + assertThat(new ObjectKeyOptions().getPathPrefix(), equalTo(null)); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptionsTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptionsTest.java new file mode 100644 index 0000000000..b246dfefa1 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptionsTest.java @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.configuration; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.types.ByteCount; + +class ThresholdOptionsTest { + private static final String DEFAULT_BYTE_CAPACITY = "50mb"; + private static final int DEFAULT_EVENT_COUNT = 0; + + @Test + void default_byte_capacity_test() { + assertThat(new ThresholdOptions().getMaximumSize().getBytes(), + equalTo(ByteCount.parse(DEFAULT_BYTE_CAPACITY).getBytes())); + } + + @Test + void get_event_collection_duration_test() { + assertThat(new ThresholdOptions().getEventCollect(), equalTo(null)); + } + + @Test + void get_event_count_test() { + assertThat(new ThresholdOptions().getEventCount(), equalTo(DEFAULT_EVENT_COUNT)); + } +} \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index bcfaeb08a0..295daa55bd 100644 --- a/settings.gradle +++ b/settings.gradle @@ -107,4 +107,5 @@ include 'release:maven' include 'e2e-test:peerforwarder' include 'rss-source' include 'data-prepper-plugins:failures-common' -include 'data-prepper-plugins:newline-codecs' \ No newline at end of file +include 'data-prepper-plugins:newline-codecs' +include 'data-prepper-plugins:s3-sink' \ No newline at end of file