From 9b57169f8b4df6b6b3765d73fa881d77eed9c4d7 Mon Sep 17 00:00:00 2001 From: DE20436406 Date: Tue, 28 Feb 2023 19:08:01 +0530 Subject: [PATCH] =?UTF-8?q?Signed-off-by:=C2=A0deepaksahu562=20deepak.sahu?= =?UTF-8?q?562@gmail.com=20Commits=20are=20signed=20per=20the=20DCO=20usin?= =?UTF-8?q?g=20--signoff?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Description Created "s3-sink" plugin. Github issue : https://github.com/opensearch-project/data-prepper/issues/1048 Added Functionality Configurations for the bucket name, key path and key pattern. The key pattern support timestamps such as logs-${YYYY.mm}-${uniqueId}. Collection of objects from Buffer and store it in RAM/Local file. Check List New functionality s3-sink plugin. New functionality has been documented. New functionality has javadoc added. Commits are signed with a real name per the DCO By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md --- data-prepper-plugins/s3-sink/README.md | 2 +- data-prepper-plugins/s3-sink/build.gradle | 1 + .../opensearch/dataprepper/plugins/sink/S3ObjectIndex.java | 1 + .../java/org/opensearch/dataprepper/plugins/sink/S3Sink.java | 1 - .../org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java | 2 +- .../opensearch/dataprepper/plugins/sink/S3SinkService.java | 4 +++- .../org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java | 4 ++-- .../plugins/sink/configuration/AwsAuthenticationOptions.java | 2 +- .../dataprepper/plugins/sink/configuration/ObjectOptions.java | 2 +- .../plugins/sink/configuration/ThresholdOptions.java | 2 +- settings.gradle | 1 + 11 files changed, 13 insertions(+), 9 deletions(-) diff --git a/data-prepper-plugins/s3-sink/README.md b/data-prepper-plugins/s3-sink/README.md index 1fbdefe20a..2b69f4a58c 100644 --- a/data-prepper-plugins/s3-sink/README.md +++ b/data-prepper-plugins/s3-sink/README.md @@ -1,6 +1,6 @@ Attached simple-sample-pipeline: opensearch-data-prepper\data-prepper\data-prepper-plugins\s3-sink\src\main\resources\pipelines.yaml -Functional Requirements +Functional Requirements: 1 Provide a mechanism to received events from buffer then process and write to s3. 2 Codecs encode the events into the desired format based on the configuration. 3 Flush the encoded events into s3 bucket as objects. diff --git a/data-prepper-plugins/s3-sink/build.gradle b/data-prepper-plugins/s3-sink/build.gradle index acfa7bfff6..9e0dcf9401 100644 --- a/data-prepper-plugins/s3-sink/build.gradle +++ b/data-prepper-plugins/s3-sink/build.gradle @@ -1,6 +1,7 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * */ plugins { diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3ObjectIndex.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3ObjectIndex.java index 31a173a621..940d356d3b 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3ObjectIndex.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3ObjectIndex.java @@ -17,6 +17,7 @@ /** * Reference to an S3 object key Index patterns. + * */ public class S3ObjectIndex { diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java index 70ea4a4dd7..b2cd509a5f 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java @@ -40,7 +40,6 @@ public class S3Sink extends AbstractSink> { private static boolean isStopRequested; private final Codec codec; - private final ObjectMapper objectMapper = new ObjectMapper(); /** * diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java index 718ce504b2..280c03b358 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java @@ -13,7 +13,7 @@ import jakarta.validation.Valid; import jakarta.validation.constraints.NotNull; -/* +/** An implementation class of s3 sink configuration */ public class S3SinkConfig { diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java index 711a03d621..d897e2be46 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java @@ -10,7 +10,9 @@ import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.retry.RetryPolicy; import software.amazon.awssdk.services.s3.S3Client; - +/** + * Create s3 client + */ public class S3SinkService { private static final Logger LOG = LoggerFactory.getLogger(S3SinkService.class); diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java index e1597aa631..41b1b22901 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java @@ -115,7 +115,7 @@ public void inMemmoryAccumulator() { "In-Memory snapshot info : Byte_count = {} Bytes \t Event_count = {} Records \t Event_collection_duration = {} sec & \t Number of stream {}", byteCount, eventCount, eventCollectionDuration, streamCount); - new InMemoryAccumulator(inMemoryEventMap, streamCount, s3SinkService, s3SinkConfig).doAccumulate(); + //new InMemoryAccumulator(inMemoryEventMap, streamCount, s3SinkService, s3SinkConfig).doAccumulate(); } catch (Exception e) { LOG.error("Exception while storing recoreds into In-Memory", e); } @@ -147,7 +147,7 @@ public void localFileAccumulator() { "Local-File snapshot info : Byte_count = {} Bytes, \t Event_count = {} Records \n & Event_collection_duration = {} Sec", byteCount, eventCount, eventCollectionDuration); - new LocalFileAccumulator(localFileEventSet, s3SinkService, s3SinkConfig).doAccumulate(); + //new LocalFileAccumulator(localFileEventSet, s3SinkService, s3SinkConfig).doAccumulate(); } catch (Exception e) { LOG.error("Exception while storing recoreds into Local-file", e); diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptions.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptions.java index 73d83d668d..d42f8502f0 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptions.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptions.java @@ -18,7 +18,7 @@ import java.util.Map; import java.util.UUID; -/* +/** An implementation class AWS Authentication configuration */ public class AwsAuthenticationOptions { diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java index 8501f70daa..540c066929 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java @@ -8,7 +8,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.NotNull; -/* +/** An implementation class of Threshold configuration Options */ public class ObjectOptions { diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java index b50d1219f5..513a47aab1 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java @@ -8,7 +8,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.NotNull; -/* +/** An implementation class of s3 index configuration Options */ public class ThresholdOptions { diff --git a/settings.gradle b/settings.gradle index a3c749d922..63a8c7d27f 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,6 +1,7 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * */ pluginManagement {