Skip to content


Signed-off-by: deepaksahu562
Browse files Browse the repository at this point in the history

Created "s3-sink" plugin.
Github issue : opensearch-project#1048

Added Functionality

Configurations for the bucket name, key path and key pattern.
The key pattern support timestamps such as logs-${}-${uniqueId}.
Collection of objects from Buffer and store it in RAM/Local file.

Check List
New functionality s3-sink plugin.
New functionality has been documented.
 New functionality has javadoc added.
Commits are signed per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and
signing off your commits, please check
  • Loading branch information
DE20436406 authored and DE20436406 committed Feb 28, 2023
1 parent 768b50f commit 1211958
Show file tree
Hide file tree
Showing 12 changed files with 796 additions and 0 deletions.
9 changes: 9 additions & 0 deletions data-prepper-plugins/s3-sink/
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Attached simple-sample-pipeline: opensearch-data-prepper\data-prepper\data-prepper-plugins\s3-sink\src\main\resources\pipelines.yaml

Functional Requirements
1 Provide a mechanism to received events from buffer then process and write to s3.
2 Codecs encode the events into the desired format based on the configuration.
3 Flush the encoded events into s3 bucket as objects.
4 Object name based on the key-pattern.
5 Object length depends on the thresholds provided in the configuration.
6 The Thresholds such as events count, bytes capacity and data collection duration.
67 changes: 67 additions & 0 deletions data-prepper-plugins/s3-sink/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0

plugins {
id 'java'

repositories {

dependencies {
implementation project(':data-prepper-api')
implementation 'io.micrometer:micrometer-core'
implementation ''
implementation ''
implementation ''
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'org.apache.commons:commons-compress:1.21'
implementation 'joda-time:joda-time:2.11.1'
implementation 'org.hibernate.validator:hibernate-validator:7.0.5.Final'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv'
implementation ''
implementation 'org.mapdb:mapdb:3.0.8'
testImplementation 'org.apache.commons:commons-lang3:3.12.0'
testImplementation project(':data-prepper-test-common')

test {

sourceSets {
integrationTest {
java {
compileClasspath += main.output + test.output
runtimeClasspath += main.output + test.output
srcDir file('src/integrationTest/java')
resources.srcDir file('src/integrationTest/resources')

configurations {
integrationTestImplementation.extendsFrom testImplementation
integrationTestRuntime.extendsFrom testRuntime

task integrationTest(type: Test) {
group = 'verification'
testClassesDirs = sourceSets.integrationTest.output.classesDirs


classpath = sourceSets.integrationTest.runtimeClasspath
systemProperty 'tests.s3source.bucket', System.getProperty('tests.s3source.bucket')
systemProperty 'tests.s3source.region', System.getProperty('tests.s3source.region')
systemProperty 'tests.s3source.queue.url', System.getProperty('tests.s3source.queue.url')

filter {
includeTestsMatching '*IT'

Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0

package org.opensearch.dataprepper.plugins.sink;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Set;
import java.util.TimeZone;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

* Reference to an S3 object key Index patterns.

public class S3ObjectIndex {

private static final String TIME_PATTERN_STARTING_SYMBOLS = "\\${";

//For matching a string that begins with a "${" and ends with a "}".
//For a string like "data-prepper-${yyyy-MM-dd}", "${yyyy-MM-dd}" is matched.
private static final String TIME_PATTERN_REGULAR_EXPRESSION = "\\$\\{.*?\\}";

//For matching a string enclosed by "%{" and "}".
//For a string like "data-prepper-${yyyy-MM}", "yyyy-MM" is matched.
private static final String TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION = "\\$\\{(.*?)\\}";

private static final ZoneId UTC_ZONE_ID = ZoneId.of(TimeZone.getTimeZone("UTC").getID());

S3ObjectIndex() { }

Create Index with date,time with UniqueID prepended.
public static String getIndexAliasWithDate(final String indexAlias) {
DateTimeFormatter dateFormatter = getDatePatternFormatter(indexAlias);
String suffix = (dateFormatter != null) ? dateFormatter.format(getCurrentUtcTime()) : "";
return indexAlias.replaceAll(TIME_PATTERN_REGULAR_EXPRESSION, "") + suffix + UUID.randomUUID();

Validate the index with the regular expression pattern. Throws exception if validation fails
public static DateTimeFormatter getDatePatternFormatter(final String indexAlias) {
final Matcher timePatternMatcher = pattern.matcher(indexAlias);
if (timePatternMatcher.find()) {
final String timePattern =;
if (timePatternMatcher.find()) { // check if there is a one more match.
throw new IllegalArgumentException("An index only allows one date-time pattern.");
if(timePattern.contains(TIME_PATTERN_STARTING_SYMBOLS)){ //check if it is a nested pattern such as "data-prepper-%{%{yyyy.MM.dd}}"
throw new IllegalArgumentException("An index doesn't allow nested date-time patterns.");
validateTimePatternIsAtTheEnd(indexAlias, timePattern);
return DateTimeFormatter.ofPattern(timePattern);
return null;

Data Prepper only allows time pattern as a suffix.
private static void validateTimePatternIsAtTheEnd(final String indexAlias, final String timePattern) {
if (!indexAlias.endsWith(timePattern + "}")) {
throw new IllegalArgumentException("Time pattern can only be a suffix of an index.");

* Special characters can cause failures in creating indexes.
* */
private static final Set<Character> INVALID_CHARS = Set.of('#', '\\', '/', '*', '?', '"', '<', '>', '|', ',', ':');
public static void validateNoSpecialCharsInTimePattern(String timePattern) {
boolean containsInvalidCharacter = timePattern.chars()
.mapToObj(c -> (char) c)
.anyMatch(character -> INVALID_CHARS.contains(character));
if (containsInvalidCharacter) {
throw new IllegalArgumentException("Index time pattern contains one or multiple special characters: " + INVALID_CHARS);

* Validates the time pattern, support creating indexes with time patterns that are too granular hour, minute and second
private static final Set<Character> UNSUPPORTED_TIME_GRANULARITY_CHARS = Set.of('A', 'n', 'N');
public static void validateTimePatternGranularity(String timePattern) {
boolean containsUnsupportedTimeSymbol = timePattern.chars()
.mapToObj(c -> (char) c)
.anyMatch(character -> UNSUPPORTED_TIME_GRANULARITY_CHARS.contains(character));
if (containsUnsupportedTimeSymbol) {
throw new IllegalArgumentException("Index time pattern contains time patterns that are less than one hour: "

Returns the current UTC Date and Time
public static ZonedDateTime getCurrentUtcTime() {
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
package org.opensearch.dataprepper.plugins.sink;

import java.util.Collection;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.AbstractSink;
import org.opensearch.dataprepper.model.sink.Sink;
//import org.opensearch.dataprepper.plugins.sink.codec.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.ObjectMapper;

* Implementation class of s3-sink plugin
@DataPrepperPlugin(name = "s3", pluginType = Sink.class, pluginConfigurationType = S3SinkConfig.class)
public class S3Sink extends AbstractSink<Record<Event>> {

private static final Logger LOG = LoggerFactory.getLogger(S3Sink.class);
private static final int EVENT_QUEUE_SIZE = 100000;

private final S3SinkConfig s3SinkConfig;
private volatile boolean initialized;
private static BlockingQueue<Event> eventQueue;
private static boolean isStopRequested;

//private final Codec codec;
private final ObjectMapper objectMapper = new ObjectMapper();

* @param pluginSetting
* @param s3SinkConfig
* @param pluginFactory
public S3Sink(PluginSetting pluginSetting, final S3SinkConfig s3SinkConfig, final PluginFactory pluginFactory) {
this.s3SinkConfig = s3SinkConfig;
final PluginModel codecConfiguration = s3SinkConfig.getCodec();
final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings());
//codec = pluginFactory.loadPlugin(Codec.class, codecPluginSettings);
initialized = Boolean.FALSE;

public boolean isReady() {
return initialized;

public void doInitialize() {
try {
} catch (InvalidPluginConfigurationException e) {
LOG.error("Failed to initialize S3-Sink.");
throw new RuntimeException(e.getMessage(), e);
} catch (Exception e) {
LOG.warn("Failed to initialize S3-Sink, retrying. Error - {} \n {}", e.getMessage(), e.getCause());

private void doInitializeInternal() {
eventQueue = new ArrayBlockingQueue<>(EVENT_QUEUE_SIZE);
S3SinkService s3SinkService = new S3SinkService(s3SinkConfig);
S3SinkWorker worker = new S3SinkWorker(s3SinkService, s3SinkConfig);
new Thread(worker).start();
initialized = Boolean.TRUE;

public void doOutput(final Collection<Record<Event>> records) {
LOG.debug("Records size : {}", records.size());
if (records.isEmpty()) {

for (final Record<Event> recordData : records) {

Event event = recordData.getData();


public void shutdown() {
isStopRequested = Boolean.TRUE;"s3-sink sutdonwn completed");

public static BlockingQueue<Event> getEventQueue() {
return eventQueue;

public static boolean isStopRequested() {
return isStopRequested;

0 comments on commit 1211958

Please sign in to comment.