Skip to content

Commit

Permalink
Signed-off-by: deepaksahu562 deepak.sahu562@gmail.com
Browse files Browse the repository at this point in the history
Commits are signed per the DCO using --signoff

Description

Created "s3-sink" plugin. Github issue :
opensearch-project#1048

Added Functionality

Configurations for the bucket name, key path and key pattern.
The key pattern support timestamps such as logs-${YYYY.mm}-${uniqueId}.
Collection of objects from Buffer and store it in RAM/Local file.

Check List
New functionality s3-sink plugin.
New functionality has been documented.
New functionality has javadoc added.
Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license.<br>
For more information on following Developer Certificate of Origin and
signing off your commits, please check
https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md
  • Loading branch information
DE20436406 authored and DE20436406 committed Feb 28, 2023
1 parent 8669138 commit 9b57169
Show file tree
Hide file tree
Showing 11 changed files with 13 additions and 9 deletions.
2 changes: 1 addition & 1 deletion data-prepper-plugins/s3-sink/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Attached simple-sample-pipeline: opensearch-data-prepper\data-prepper\data-prepper-plugins\s3-sink\src\main\resources\pipelines.yaml

Functional Requirements
Functional Requirements:
1 Provide a mechanism to received events from buffer then process and write to s3.
2 Codecs encode the events into the desired format based on the configuration.
3 Flush the encoded events into s3 bucket as objects.
Expand Down
1 change: 1 addition & 0 deletions data-prepper-plugins/s3-sink/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
*/

plugins {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

/**
* Reference to an S3 object key Index patterns.
*
*/

public class S3ObjectIndex {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public class S3Sink extends AbstractSink<Record<Event>> {
private static boolean isStopRequested;

private final Codec codec;
private final ObjectMapper objectMapper = new ObjectMapper();

/**
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;

/*
/**
An implementation class of s3 sink configuration
*/
public class S3SinkConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.services.s3.S3Client;

/**
* Create s3 client
*/
public class S3SinkService {

private static final Logger LOG = LoggerFactory.getLogger(S3SinkService.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void inMemmoryAccumulator() {
"In-Memory snapshot info : Byte_count = {} Bytes \t Event_count = {} Records \t Event_collection_duration = {} sec & \t Number of stream {}",
byteCount, eventCount, eventCollectionDuration, streamCount);

new InMemoryAccumulator(inMemoryEventMap, streamCount, s3SinkService, s3SinkConfig).doAccumulate();
//new InMemoryAccumulator(inMemoryEventMap, streamCount, s3SinkService, s3SinkConfig).doAccumulate();
} catch (Exception e) {
LOG.error("Exception while storing recoreds into In-Memory", e);
}
Expand Down Expand Up @@ -147,7 +147,7 @@ public void localFileAccumulator() {
"Local-File snapshot info : Byte_count = {} Bytes, \t Event_count = {} Records \n & Event_collection_duration = {} Sec",
byteCount, eventCount, eventCollectionDuration);

new LocalFileAccumulator(localFileEventSet, s3SinkService, s3SinkConfig).doAccumulate();
//new LocalFileAccumulator(localFileEventSet, s3SinkService, s3SinkConfig).doAccumulate();

} catch (Exception e) {
LOG.error("Exception while storing recoreds into Local-file", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import java.util.Map;
import java.util.UUID;

/*
/**
An implementation class AWS Authentication configuration
*/
public class AwsAuthenticationOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotNull;

/*
/**
An implementation class of Threshold configuration Options
*/
public class ObjectOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotNull;

/*
/**
An implementation class of s3 index configuration Options
*/
public class ThresholdOptions {
Expand Down
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
*/

pluginManagement {
Expand Down

0 comments on commit 9b57169

Please sign in to comment.