Skip to content

Commit

Permalink
Added structure of s3-sink plugin(opensearch-project#1048)
Browse files Browse the repository at this point in the history
  • Loading branch information
de20436406 authored and de20436406 committed Jan 25, 2023
1 parent 590ee21 commit e2bbd0a
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
*
*/
package org.opensearch.dataprepper.plugins.sink;

import java.util.Collection;

import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.Sink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DataPrepperPlugin(name = "s3", pluginType = Sink.class, pluginConfigurationType = S3SinkConfig.class)
public class S3Sink implements Sink<Record<Object>> {

private static final Logger LOG = LoggerFactory.getLogger(S3Sink.class);
private final S3SinkConfig s3SinkConfig;

private final String outputS3Path;
private static final String SAMPLE_S3_PATH = "src/resources/";
public static final String PATH = "path";


@DataPrepperPluginConstructor
public S3Sink(final S3SinkConfig s3SinkConfig, final PluginSetting pluginSetting) {
this.s3SinkConfig = s3SinkConfig;
final String outputS3 = (String) pluginSetting.getAttributeFromSettings(PATH);
outputS3Path = outputS3 == null ? SAMPLE_S3_PATH : outputS3;

}

@Override
public void output(Collection<Record<Object>> records) {

final S3SinkService s3SinkService = new S3SinkService(s3SinkConfig);

}

@Override
public void shutdown() {
// TODO Auto-generated method stub

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.opensearch.dataprepper.plugins.sink;

import org.opensearch.dataprepper.plugins.sink.configuration.SinkAwsAuthenticationOptions;

import com.fasterxml.jackson.annotation.JsonProperty;

import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;

public class S3SinkConfig {

@JsonProperty("sink_aws")
@NotNull
@Valid
private SinkAwsAuthenticationOptions sinkAwsAuthenticationOptions;

public SinkAwsAuthenticationOptions getSinkAwsAuthenticationOptions() {
return sinkAwsAuthenticationOptions;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.opensearch.dataprepper.plugins.sink;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.services.s3.S3Client;

public class S3SinkService {

private static final Logger LOG = LoggerFactory.getLogger(S3SinkService.class);

private final S3SinkConfig s3SinkConfig;
private final S3Client s3Client;

S3SinkService(final S3SinkConfig s3SinkConfig){
this.s3SinkConfig = s3SinkConfig;
this.s3Client = createS3Client();
}

S3Client createS3Client() {
LOG.info("Creating S3 client");
return S3Client.builder()
.region(s3SinkConfig.getSinkAwsAuthenticationOptions().getAwsRegion())
.credentialsProvider(s3SinkConfig.getSinkAwsAuthenticationOptions().authenticateAwsConfiguration())
.overrideConfiguration(ClientOverrideConfiguration.builder()
.retryPolicy(RetryPolicy.builder().numRetries(5).build())
.build())
.build();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package org.opensearch.dataprepper.plugins.sink.configuration;

import java.util.UUID;

import com.fasterxml.jackson.annotation.JsonProperty;

import jakarta.validation.constraints.Size;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;

public class SinkAwsAuthenticationOptions {
@JsonProperty("region")
@Size(min = 1, message = "Region cannot be empty string")
private String awsRegion;

@JsonProperty("sts_role_arn")
@Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters")
private String awsStsRoleArn;

public Region getAwsRegion() {
return awsRegion != null ? Region.of(awsRegion) : null;
}

public AwsCredentialsProvider authenticateAwsConfiguration() {

final AwsCredentialsProvider awsCredentialsProvider;
if (awsStsRoleArn != null && !awsStsRoleArn.isEmpty()) {
try {
Arn.fromString(awsStsRoleArn);
} catch (final Exception e) {
throw new IllegalArgumentException("Invalid ARN format for awsStsRoleArn");
}

final StsClient stsClient = StsClient.builder()
.region(getAwsRegion())
.build();

awsCredentialsProvider = StsAssumeRoleCredentialsProvider.builder()
.stsClient(stsClient)
.refreshRequest(AssumeRoleRequest.builder()
.roleSessionName("S3-Sink-" + UUID.randomUUID())
.roleArn(awsStsRoleArn)
.build())
.build();

} else {
// use default credential provider
awsCredentialsProvider = DefaultCredentialsProvider.create();
}

return awsCredentialsProvider;
}
}
5 changes: 4 additions & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ pluginManagement {
}
}

rootProject.name = 'opensearch-data-prepper'
//rootProject.name = 'opensearch-data-prepper'
rootProject.name = 'data-prepper'

include 'data-prepper-api'
include 'data-prepper-plugins'
Expand Down Expand Up @@ -58,4 +59,6 @@ include 'release:docker'
include 'release:maven'
include 'e2e-test:peerforwarder'
include 'rss-source'
include 'data-prepper-plugins:s3-sink'
findProject(':data-prepper-plugins:s3-sink')?.name = 's3-sink'

0 comments on commit e2bbd0a

Please sign in to comment.