Skip to content

Commit

Permalink
<body>
Browse files Browse the repository at this point in the history
<h3>Description</h3>
Created "s3-sink" plugin. Github issue : <a
href="https://github.com/opensearch-project/data-prepper/issues/1048">#1048</a>

<h3>Added Functionality</h3>

<ul>
<li>Configurations for the bucket name, key path and key pattern.</li>
<li>The key pattern support timestamps such as
logs-${YYYY.mm}-${uniqueId}.</li>
<li>Collection of objects from Buffer and store it in RAM/Local file
before writing to S3 bucket based on threshold limit </li>
</ul>

<h3>Check List</h3>
<input type = "checkbox"> New functionality s3-sink plugin<br>
<input type = "checkbox"> New functionality has been documented.<br>
<input type = "checkbox"> New functionality has javadoc added.<br>
<br>

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license.<br>
For more information on following Developer Certificate of Origin and
signing off your commits, please check <a
href="https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md">here</a>
</body>
</html>
  • Loading branch information
de20436406 authored and DE20436406 committed Feb 28, 2023
1 parent 637427e commit 8669138
Show file tree
Hide file tree
Showing 25 changed files with 1,944 additions and 43 deletions.
9 changes: 9 additions & 0 deletions data-prepper-plugins/s3-sink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Attached simple-sample-pipeline: opensearch-data-prepper\data-prepper\data-prepper-plugins\s3-sink\src\main\resources\pipelines.yaml

Functional Requirements
1 Provide a mechanism to received events from buffer then process and write to s3.
2 Codecs encode the events into the desired format based on the configuration.
3 Flush the encoded events into s3 bucket as objects.
4 Object name based on the key-pattern.
5 Object length depends on the thresholds provided in the configuration.
6 The Thresholds such as events count, bytes capacity and data collection duration.
67 changes: 67 additions & 0 deletions data-prepper-plugins/s3-sink/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java'
}

repositories {
mavenCentral()
}

dependencies {
implementation project(':data-prepper-api')
implementation 'io.micrometer:micrometer-core'
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:sts'
implementation 'software.amazon.awssdk:sqs'
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'org.apache.commons:commons-compress:1.21'
implementation 'joda-time:joda-time:2.11.1'
implementation 'org.hibernate.validator:hibernate-validator:7.0.5.Final'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv'
implementation 'software.amazon.awssdk:aws-sdk-java:2.17.148'
implementation 'org.mapdb:mapdb:3.0.8'
testImplementation 'org.apache.commons:commons-lang3:3.12.0'
testImplementation project(':data-prepper-test-common')
}

test {
useJUnitPlatform()
}

sourceSets {
integrationTest {
java {
compileClasspath += main.output + test.output
runtimeClasspath += main.output + test.output
srcDir file('src/integrationTest/java')
}
resources.srcDir file('src/integrationTest/resources')
}
}

configurations {
integrationTestImplementation.extendsFrom testImplementation
integrationTestRuntime.extendsFrom testRuntime
}

task integrationTest(type: Test) {
group = 'verification'
testClassesDirs = sourceSets.integrationTest.output.classesDirs

useJUnitPlatform()

classpath = sourceSets.integrationTest.runtimeClasspath
systemProperty 'tests.s3source.bucket', System.getProperty('tests.s3source.bucket')
systemProperty 'tests.s3source.region', System.getProperty('tests.s3source.region')
systemProperty 'tests.s3source.queue.url', System.getProperty('tests.s3source.queue.url')

filter {
includeTestsMatching '*IT'
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Set;
import java.util.TimeZone;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* Reference to an S3 object key Index patterns.
*/

public class S3ObjectIndex {

private static final String TIME_PATTERN_STARTING_SYMBOLS = "\\${";

//For matching a string that begins with a "${" and ends with a "}".
//For a string like "data-prepper-${yyyy-MM-dd}", "${yyyy-MM-dd}" is matched.
private static final String TIME_PATTERN_REGULAR_EXPRESSION = "\\$\\{.*?\\}";

//For matching a string enclosed by "%{" and "}".
//For a string like "data-prepper-${yyyy-MM}", "yyyy-MM" is matched.
private static final String TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION = "\\$\\{(.*?)\\}";

private static final ZoneId UTC_ZONE_ID = ZoneId.of(TimeZone.getTimeZone("UTC").getID());

S3ObjectIndex() { }

/*
Create Index with date,time with UniqueID prepended.
*/
public static String getIndexAliasWithDate(final String indexAlias) {
DateTimeFormatter dateFormatter = getDatePatternFormatter(indexAlias);
String suffix = (dateFormatter != null) ? dateFormatter.format(getCurrentUtcTime()) : "";
return indexAlias.replaceAll(TIME_PATTERN_REGULAR_EXPRESSION, "") + suffix + UUID.randomUUID();
}

/*
Validate the index with the regular expression pattern. Throws exception if validation fails
*/
public static DateTimeFormatter getDatePatternFormatter(final String indexAlias) {
final Pattern pattern = Pattern.compile(TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION);
final Matcher timePatternMatcher = pattern.matcher(indexAlias);
if (timePatternMatcher.find()) {
final String timePattern = timePatternMatcher.group(1);
if (timePatternMatcher.find()) { // check if there is a one more match.
throw new IllegalArgumentException("An index only allows one date-time pattern.");
}
if(timePattern.contains(TIME_PATTERN_STARTING_SYMBOLS)){ //check if it is a nested pattern such as "data-prepper-%{%{yyyy.MM.dd}}"
throw new IllegalArgumentException("An index doesn't allow nested date-time patterns.");
}
validateTimePatternIsAtTheEnd(indexAlias, timePattern);
validateNoSpecialCharsInTimePattern(timePattern);
validateTimePatternGranularity(timePattern);
return DateTimeFormatter.ofPattern(timePattern);
}
return null;
}

/*
Data Prepper only allows time pattern as a suffix.
*/
private static void validateTimePatternIsAtTheEnd(final String indexAlias, final String timePattern) {
if (!indexAlias.endsWith(timePattern + "}")) {
throw new IllegalArgumentException("Time pattern can only be a suffix of an index.");
}
}

/*
* Special characters can cause failures in creating indexes.
* */
private static final Set<Character> INVALID_CHARS = Set.of('#', '\\', '/', '*', '?', '"', '<', '>', '|', ',', ':');
public static void validateNoSpecialCharsInTimePattern(String timePattern) {
boolean containsInvalidCharacter = timePattern.chars()
.mapToObj(c -> (char) c)
.anyMatch(character -> INVALID_CHARS.contains(character));
if (containsInvalidCharacter) {
throw new IllegalArgumentException("Index time pattern contains one or multiple special characters: " + INVALID_CHARS);
}
}

/*
* Validates the time pattern, support creating indexes with time patterns that are too granular hour, minute and second
*/
private static final Set<Character> UNSUPPORTED_TIME_GRANULARITY_CHARS = Set.of('A', 'n', 'N');
public static void validateTimePatternGranularity(String timePattern) {
boolean containsUnsupportedTimeSymbol = timePattern.chars()
.mapToObj(c -> (char) c)
.anyMatch(character -> UNSUPPORTED_TIME_GRANULARITY_CHARS.contains(character));
if (containsUnsupportedTimeSymbol) {
throw new IllegalArgumentException("Index time pattern contains time patterns that are less than one hour: "
+ UNSUPPORTED_TIME_GRANULARITY_CHARS);
}
}

/*
Returns the current UTC Date and Time
*/
public static ZonedDateTime getCurrentUtcTime() {
return LocalDateTime.now().atZone(ZoneId.systemDefault()).withZoneSameInstant(UTC_ZONE_ID);
}
}
Original file line number Diff line number Diff line change
@@ -1,48 +1,116 @@
/**
*
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.sink;

import java.util.Collection;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.AbstractSink;
import org.opensearch.dataprepper.model.sink.Sink;
import org.opensearch.dataprepper.plugins.sink.codec.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.ObjectMapper;

/**
* Implementation class of s3-sink plugin
*
*/
@DataPrepperPlugin(name = "s3", pluginType = Sink.class, pluginConfigurationType = S3SinkConfig.class)
public class S3Sink implements Sink<Record<Object>> {
public class S3Sink extends AbstractSink<Record<Event>> {

private static final Logger LOG = LoggerFactory.getLogger(S3Sink.class);
private static final int EVENT_QUEUE_SIZE = 100000;

private final S3SinkConfig s3SinkConfig;
private volatile boolean initialized;
private static BlockingQueue<Event> eventQueue;
private static boolean isStopRequested;

private final String outputS3Path;
private static final String SAMPLE_S3_PATH = "src/resources/";
public static final String PATH = "path";


private final Codec codec;
private final ObjectMapper objectMapper = new ObjectMapper();

/**
*
* @param pluginSetting
* @param s3SinkConfig
* @param pluginFactory
*/
@DataPrepperPluginConstructor
public S3Sink(final S3SinkConfig s3SinkConfig, final PluginSetting pluginSetting) {
public S3Sink(PluginSetting pluginSetting, final S3SinkConfig s3SinkConfig, final PluginFactory pluginFactory) {
super(pluginSetting);
this.s3SinkConfig = s3SinkConfig;
final String outputS3 = (String) pluginSetting.getAttributeFromSettings(PATH);
outputS3Path = outputS3 == null ? SAMPLE_S3_PATH : outputS3;

final PluginModel codecConfiguration = s3SinkConfig.getCodec();
final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings());
codec = pluginFactory.loadPlugin(Codec.class, codecPluginSettings);
initialized = Boolean.FALSE;
}

@Override
public void output(Collection<Record<Object>> records) {

final S3SinkService s3SinkService = new S3SinkService(s3SinkConfig);
public boolean isReady() {
return initialized;
}

@Override
public void doInitialize() {
try {
doInitializeInternal();
} catch (InvalidPluginConfigurationException e) {
LOG.error("Failed to initialize S3-Sink.");
this.shutdown();
throw new RuntimeException(e.getMessage(), e);
} catch (Exception e) {
LOG.warn("Failed to initialize S3-Sink, retrying. Error - {} \n {}", e.getMessage(), e.getCause());
}
}

private void doInitializeInternal() {
eventQueue = new ArrayBlockingQueue<>(EVENT_QUEUE_SIZE);
S3SinkService s3SinkService = new S3SinkService(s3SinkConfig);
S3SinkWorker worker = new S3SinkWorker(s3SinkService, s3SinkConfig, codec);
new Thread(worker).start();
initialized = Boolean.TRUE;
}

@Override
public void doOutput(final Collection<Record<Event>> records) {
LOG.debug("Records size : {}", records.size());
if (records.isEmpty()) {
return;
}

for (final Record<Event> recordData : records) {

Event event = recordData.getData();
getEventQueue().add(event);

}
}

@Override
public void shutdown() {
// TODO Auto-generated method stub

super.shutdown();
isStopRequested = Boolean.TRUE;
LOG.info("s3-sink sutdonwn completed");
}

public static BlockingQueue<Event> getEventQueue() {
return eventQueue;
}

public static boolean isStopRequested() {
return isStopRequested;
}
}

0 comments on commit 8669138

Please sign in to comment.