Skip to content

Commit

Permalink
Adding the S3 sink plugin. Contributes to opensearch-project#1048
Browse files Browse the repository at this point in the history
Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
  • Loading branch information
deepaksahu562 committed Apr 19, 2023
1 parent 232f0de commit be5a4f5
Show file tree
Hide file tree
Showing 34 changed files with 2,464 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.s3keyindex;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Set;
import java.util.TimeZone;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* Class responsible for creation of s3 key pattern based on date time stamp
*/
public class S3ObjectIndex {

private static final String TIME_PATTERN_STARTING_SYMBOLS = "\\%{";

// For matching a string that begins with a "%{" and ends with a "}".
// For a string like "data-prepper-%{yyyy-MM-dd}", "%{yyyy-MM-dd}" is matched.
private static final String TIME_PATTERN_REGULAR_EXPRESSION = "\\%\\{.*?\\}";

// For matching a string enclosed by "%{" and "}".
// For a string like "data-prepper-%{yyyy-MM}", "yyyy-MM" is matched.
private static final String TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION = "\\%\\{(.*?)\\}";

private static final ZoneId UTC_ZONE_ID = ZoneId.of(TimeZone.getTimeZone("UTC").getID());

S3ObjectIndex() {
}

/**
* Create Object Name with date,time and UniqueID prepended.
*/
public static String getObjectNameWithDateTimeId(final String indexAlias) {
DateTimeFormatter dateFormatter = getDatePatternFormatter(indexAlias);
String suffix = (dateFormatter != null) ? dateFormatter.format(getCurrentUtcTime()) : "";
return indexAlias.replaceAll(TIME_PATTERN_REGULAR_EXPRESSION, "") + suffix + "-" + getTimeNanos() + "-"
+ UUID.randomUUID();
}

/**
* Create Object path prefix.
*/
public static String getObjectPathPrefix(final String indexAlias) {
DateTimeFormatter dateFormatter = getDatePatternFormatter(indexAlias);
String suffix = (dateFormatter != null) ? dateFormatter.format(getCurrentUtcTime()) : "";
return indexAlias.replaceAll(TIME_PATTERN_REGULAR_EXPRESSION, "") + suffix;
}

/**
* Creates epoch seconds.
*/
public static long getTimeNanos() {
Instant time = Instant.now();
final long NANO_MULTIPLIER = 1_000 * 1_000 * 1_000;
long currentTimeNanos = time.getEpochSecond() * NANO_MULTIPLIER + time.getNano();
return currentTimeNanos;
}

/**
* Validate the index with the regular expression pattern. Throws exception if validation fails
*/
public static DateTimeFormatter getDatePatternFormatter(final String indexAlias) {
final Pattern pattern = Pattern.compile(TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION);
final Matcher timePatternMatcher = pattern.matcher(indexAlias);
if (timePatternMatcher.find()) {
final String timePattern = timePatternMatcher.group(1);
if (timePatternMatcher.find()) { // check if there is a one more match.
throw new IllegalArgumentException("An index only allows one date-time pattern.");
}
if (timePattern.contains(TIME_PATTERN_STARTING_SYMBOLS)) { // check if it is a nested pattern such as
// "data-prepper-%{%{yyyy.MM.dd}}"
throw new IllegalArgumentException("An index doesn't allow nested date-time patterns.");
}
validateTimePatternIsAtTheEnd(indexAlias, timePattern);
validateNoSpecialCharsInTimePattern(timePattern);
validateTimePatternGranularity(timePattern);
return DateTimeFormatter.ofPattern(timePattern);
}
return null;
}

/**
* Data Prepper only allows time pattern as a suffix.
*/
private static void validateTimePatternIsAtTheEnd(final String indexAlias, final String timePattern) {
if (!indexAlias.endsWith(timePattern + "}")) {
throw new IllegalArgumentException("Time pattern can only be a suffix of an index.");
}
}

/**
* Special characters can cause failures in creating indexes.
*/
private static final Set<Character> INVALID_CHARS = Set.of('#', '\\', '/', '*', '?', '"', '<', '>', '|', ',', ':');

public static void validateNoSpecialCharsInTimePattern(String timePattern) {
boolean containsInvalidCharacter = timePattern.chars().mapToObj(c -> (char) c)
.anyMatch(character -> INVALID_CHARS.contains(character));
if (containsInvalidCharacter) {
throw new IllegalArgumentException(
"Index time pattern contains one or multiple special characters: " + INVALID_CHARS);
}
}

/**
* Validates the time pattern, support creating indexes with time patterns that are too granular
* hour, minute and second
*/
private static final Set<Character> UNSUPPORTED_TIME_GRANULARITY_CHARS = Set.of('A', 'n', 'N');

public static void validateTimePatternGranularity(String timePattern) {
boolean containsUnsupportedTimeSymbol = timePattern.chars().mapToObj(c -> (char) c)
.anyMatch(character -> UNSUPPORTED_TIME_GRANULARITY_CHARS.contains(character));
if (containsUnsupportedTimeSymbol) {
throw new IllegalArgumentException("Index time pattern contains time patterns that are less than one hour: "
+ UNSUPPORTED_TIME_GRANULARITY_CHARS);
}
}

/**
* Returns the current UTC Date and Time
*/
public static ZonedDateTime getCurrentUtcTime() {
return LocalDateTime.now().atZone(ZoneId.systemDefault()).withZoneSameInstant(UTC_ZONE_ID);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.s3keyindex;

import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import org.junit.jupiter.api.Test;

class S3ObjectIndexTest {

@Test
void testObjectDateTimePatterns_not_equal() throws IllegalArgumentException {

String expectedIndex = S3ObjectIndex.getObjectNameWithDateTimeId("events-%{yyyy-MM-dd}");
String actualIndex = S3ObjectIndex.getObjectNameWithDateTimeId("events-%{yyyy-MM-dd}");
assertFalse(actualIndex.contains(expectedIndex));
}

@Test
void testgetObjectPathPrefix_not_equal() throws IllegalArgumentException {

String expectedIndex = S3ObjectIndex.getObjectPathPrefix("events-%{yyyy}");
String actualIndex = S3ObjectIndex.getObjectPathPrefix("events-%{yyyy}");
assertTrue(actualIndex.contains(expectedIndex));
}

@Test
void testObjectTimePattern_Exceptional_time_TooGranular() throws IllegalArgumentException {
assertThrows(IllegalArgumentException.class, () -> {
S3ObjectIndex.getDatePatternFormatter("events-%{yyyy-AA-dd}");
});
}

@Test
void testObjectTimePatterns_equal() throws IllegalArgumentException {

DateTimeFormatter expectedIndex = S3ObjectIndex.getDatePatternFormatter("events-%{yyyy-MM-dd}");
DateTimeFormatter actualIndex = S3ObjectIndex.getDatePatternFormatter("events-%{yyyy-MM-dd}");
assertEquals(actualIndex.toString(), expectedIndex.toString());
}

@Test
void test_utc_current_time() throws IllegalArgumentException {

ZonedDateTime expectedIndex = S3ObjectIndex.getCurrentUtcTime();
ZonedDateTime actualIndex = S3ObjectIndex.getCurrentUtcTime();

assertEquals(expectedIndex.getDayOfYear(), actualIndex.getDayOfYear());
assertEquals(expectedIndex.getDayOfMonth(), actualIndex.getDayOfMonth());
assertEquals(expectedIndex.getDayOfWeek(), actualIndex.getDayOfWeek());
}

@Test
void testObjectTimePattern_Exceptional_TooGranular() {
assertThrows(IllegalArgumentException.class, () -> {
S3ObjectIndex.getDatePatternFormatter("events-%{yyyy-AA-ddThh:mm}");
});
}

@Test
void testObjectTimePattern_Exceptional_at_theEnd() {
assertThrows(IllegalArgumentException.class, () -> {
S3ObjectIndex.getDatePatternFormatter("events-%{yyy{MM}dd}");
});
}

@Test
void testObject_allows_one_date_time_pattern_Exceptional() {
assertThrows(IllegalArgumentException.class, () -> {
S3ObjectIndex.getDatePatternFormatter("events-%{yyyy-MM-dd}-%{yyyy-MM-dd}");
});
}

@Test
void testObject_nested_pattern_Exceptional() {
assertThrows(IllegalArgumentException.class, () -> {
S3ObjectIndex.getDatePatternFormatter("bucket-name-\\%{\\%{yyyy.MM.dd}}");
});
}

@Test
void testObject_null_time_pattern() throws NullPointerException {
assertNull(S3ObjectIndex.getDatePatternFormatter("bucket-name"));
}

@Test
void testObjectAliasWithDatePrefix_Exceptional_time_TooGranular() throws IllegalArgumentException {
assertThrows(IllegalArgumentException.class, () -> {
S3ObjectIndex.getObjectNameWithDateTimeId("events-%{yyyy-AA-dd}");
});
}

@Test
void testObjectAliasWithDatePrefix_equal() throws IllegalArgumentException {

String expectedIndex = S3ObjectIndex.getObjectNameWithDateTimeId("events-%{yyyy-MM-dd}");
String actualIndex = S3ObjectIndex.getObjectNameWithDateTimeId("events-%{yyyy-MM-dd}");
assertNotEquals(actualIndex.toString(), expectedIndex.toString());
}

@Test
void test_default_constructor() {
S3ObjectIndex object = new S3ObjectIndex();
assertNotNull(object);
}
}
65 changes: 65 additions & 0 deletions data-prepper-plugins/s3-sink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# S3 Sink

This is the Data Prepper S3 sink plugin that sends records to an S3 bucket via S3Client.

The S3 sink plugin supports OpenSearch 2.0.0 and greater.

## Usages

The s3 sink should be configured as part of Data Prepper pipeline yaml file.

## Configuration Options

```
pipeline:
...
sink:
- s3:
aws:
region: us-east-1
sts_role_arn: arn:aws:iam::123456789012:role/Data-Prepper
sts_header_overrides:
max_retries: 5
bucket:
name: bucket_name
object_key:
path_prefix: my-elb/%{yyyy}/%{MM}/%{dd}/
threshold:
event_count: 2000
maximum_size: 50mb
event_collect: 15s
codec:
ndjson:
```

## Configuration

- `aws_region` (Optional) : The AWS region to use for credentials. Defaults to [standard SDK behavior to determine the region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html). Defaults to `none`.

- `aws_sts_role_arn` (Optional) : The AWS STS role to assume for requests to SQS and S3. Defaults to null, which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html). Defaults to `none`.

- `aws_sts_header_overrides` (Optional) : An optional map of header overrides to make when assuming the IAM role for the sink plugin. Defaults to `none`.

- `max_retries` (Optional) : An integer value indicates the maximum number of times that single request should be retired in-order to ingest data to amazon s3. Defaults to `5`.

- `bucket` (Required) : Object storage built to store and retrieve any amount of data from anywhere, User must provide bucket name.

- `object_key` (Optional) : It contains `path_prefix` and `file_pattern`. Defaults to s3 object `events-%{yyyy-MM-dd'T'hh-mm-ss}` inside bucket root directory.

- `path_prefix` (Optional) : path_prefix nothing but directory structure inside bucket in-order to store objects. Defaults to `none`.

- `event_count` (Required) : An integer value indicates the maximum number of events required to ingest into s3-bucket as part of threshold.

- `maximum_size` (Optional) : A String representing the count or size of bytes required to ingest into s3-bucket as part of threshold. Defaults to `50mb`.

- `event_collect` (Required) : A String representing how long events should be collected before ingest into s3-bucket as part of threshold. All Duration values are a string that represents a duration. They support ISO_8601 notation string ("PT20.345S", "PT15M", etc.) as well as simple notation Strings for seconds ("60s") and milliseconds ("1500ms").

- `buffer_type` (Optional) : Records stored temporary before flushing into s3 bucket. Possible values are `local_file` and `in_memory`. Defaults to `in_memory`.


## Developer Guide

This plugin is compatible with Java 8. See

- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md)
66 changes: 66 additions & 0 deletions data-prepper-plugins/s3-sink/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java'
}

repositories {
mavenCentral()
}

dependencies {
implementation project(':data-prepper-api')
implementation project(path: ':data-prepper-plugins:common')
implementation 'io.micrometer:micrometer-core'
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'org.apache.commons:commons-compress:1.21'
implementation 'joda-time:joda-time:2.11.1'
implementation 'org.hibernate.validator:hibernate-validator:7.0.5.Final'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv'
implementation 'software.amazon.awssdk:aws-sdk-java:2.17.148'
implementation 'org.mapdb:mapdb:3.0.8'
implementation 'org.jetbrains.kotlin:kotlin-stdlib:1.7.10'
implementation 'org.jetbrains.kotlin:kotlin-stdlib-common:1.7.10'
implementation 'org.apache.commons:commons-lang3:3.12.0'
testImplementation project(':data-prepper-test-common')
}

test {
useJUnitPlatform()
}

sourceSets {
integrationTest {
java {
compileClasspath += main.output + test.output
runtimeClasspath += main.output + test.output
srcDir file('src/integrationTest/java')
}
resources.srcDir file('src/integrationTest/resources')
}
}

configurations {
integrationTestImplementation.extendsFrom testImplementation
integrationTestRuntime.extendsFrom testRuntime
}

task integrationTest(type: Test) {
group = 'verification'
testClassesDirs = sourceSets.integrationTest.output.classesDirs

useJUnitPlatform()

classpath = sourceSets.integrationTest.runtimeClasspath
systemProperty 'tests.s3source.bucket', System.getProperty('tests.s3source.bucket')
systemProperty 'tests.s3source.region', System.getProperty('tests.s3source.region')
systemProperty 'tests.s3source.queue.url', System.getProperty('tests.s3source.queue.url')

filter {
includeTestsMatching '*IT'
}
}

0 comments on commit be5a4f5

Please sign in to comment.