Skip to content

Commit

Permalink
Addressed review comments on PR opensearch-project#2324, Git-issue op…
Browse files Browse the repository at this point in the history
…ensearch-project#1048

Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
  • Loading branch information
deepaksahu562 committed Mar 3, 2023
1 parent 938be79 commit 2c6847e
Show file tree
Hide file tree
Showing 15 changed files with 242 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

/**
* Reference to an S3 object key Index patterns.
*
*/

public class S3ObjectIndex {
Expand All @@ -36,17 +35,17 @@ public class S3ObjectIndex {

S3ObjectIndex() { }

/*
Create Index with date,time with UniqueID prepended.
/**
* Create Index with date,time with UniqueID prepended.
*/
public static String getIndexAliasWithDate(final String indexAlias) {
DateTimeFormatter dateFormatter = getDatePatternFormatter(indexAlias);
String suffix = (dateFormatter != null) ? dateFormatter.format(getCurrentUtcTime()) : "";
return indexAlias.replaceAll(TIME_PATTERN_REGULAR_EXPRESSION, "") + suffix + UUID.randomUUID();
}

/*
Validate the index with the regular expression pattern. Throws exception if validation fails
/**
* Validate the index with the regular expression pattern. Throws exception if validation fails
*/
public static DateTimeFormatter getDatePatternFormatter(final String indexAlias) {
final Pattern pattern = Pattern.compile(TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION);
Expand All @@ -67,18 +66,18 @@ public static DateTimeFormatter getDatePatternFormatter(final String indexAlias)
return null;
}

/*
Data Prepper only allows time pattern as a suffix.
*/
/**
* Data Prepper only allows time pattern as a suffix.
*/
private static void validateTimePatternIsAtTheEnd(final String indexAlias, final String timePattern) {
if (!indexAlias.endsWith(timePattern + "}")) {
throw new IllegalArgumentException("Time pattern can only be a suffix of an index.");
}
}

/*
/**
* Special characters can cause failures in creating indexes.
* */
*/
private static final Set<Character> INVALID_CHARS = Set.of('#', '\\', '/', '*', '?', '"', '<', '>', '|', ',', ':');
public static void validateNoSpecialCharsInTimePattern(String timePattern) {
boolean containsInvalidCharacter = timePattern.chars()
Expand All @@ -89,7 +88,7 @@ public static void validateNoSpecialCharsInTimePattern(String timePattern) {
}
}

/*
/**
* Validates the time pattern, support creating indexes with time patterns that are too granular hour, minute and second
*/
private static final Set<Character> UNSUPPORTED_TIME_GRANULARITY_CHARS = Set.of('A', 'n', 'N');
Expand All @@ -103,8 +102,8 @@ public static void validateTimePatternGranularity(String timePattern) {
}
}

/*
Returns the current UTC Date and Time
/**
* Returns the current UTC Date and Time
*/
public static ZonedDateTime getCurrentUtcTime() {
return LocalDateTime.now().atZone(ZoneId.systemDefault()).withZoneSameInstant(UTC_ZONE_ID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,58 +25,57 @@

/**
* Implementation class of s3-sink plugin
*
*/
@DataPrepperPlugin(name = "s3", pluginType = Sink.class, pluginConfigurationType = S3SinkConfig.class)
public class S3Sink extends AbstractSink<Record<Event>> {

private static final Logger LOG = LoggerFactory.getLogger(S3Sink.class);
private static final int EVENT_QUEUE_SIZE = 100000;
private static final String IN_MEMORY = "in_memory";
private static final String LOCAL_FILE = "local_file";
private final S3SinkConfig s3SinkConfig;
private S3SinkWorker worker;
private SinkAccumulator accumulator;
private static final Logger LOG = LoggerFactory.getLogger(S3Sink.class);
private static final int EVENT_QUEUE_SIZE = 100000;
private static final String IN_MEMORY = "in_memory";
private static final String LOCAL_FILE = "local_file";

private final S3SinkConfig s3SinkConfig;
private S3SinkWorker worker;
private SinkAccumulator accumulator;
private final Codec codec;
private final String storageType;
private volatile boolean initialized;
private static BlockingQueue<Event> eventQueue;
private static boolean isStopRequested;
private static volatile boolean isStopRequested;
private Thread workerThread;



/**
*
* @param pluginSetting
* @param s3SinkConfig
* @param pluginFactory
*/
@DataPrepperPluginConstructor
public S3Sink(PluginSetting pluginSetting, final S3SinkConfig s3SinkConfig, final PluginFactory pluginFactory) {
public S3Sink(final PluginSetting pluginSetting, final S3SinkConfig s3SinkConfig, final PluginFactory pluginFactory) {
super(pluginSetting);
this.s3SinkConfig = s3SinkConfig;
storageType = s3SinkConfig.getTemporaryStorage();
final PluginModel codecConfiguration = s3SinkConfig.getCodec();
final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings());
codec = pluginFactory.loadPlugin(Codec.class, codecPluginSettings);
final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(),
codecConfiguration.getPluginSettings());
codec = pluginFactory.loadPlugin(Codec.class, codecPluginSettings);
initialized = Boolean.FALSE;
}

@Override
public boolean isReady() {
public final boolean isReady() {
return initialized;
}

@Override
public void doInitialize() {
public final void doInitialize() {
try {
doInitializeInternal();
} catch (InvalidPluginConfigurationException e) {
LOG.error("Failed to initialize S3-Sink.");
LOG.error("Failed to initialize S3-Sink.", e);
this.shutdown();
throw new RuntimeException(e.getMessage(), e);
} catch (Exception e) {
LOG.warn("Failed to initialize S3-Sink, retrying. Error - {} \n {}", e.getMessage(), e.getCause());
LOG.warn("Failed to initialize S3-Sink, retrying. Error - {} ", e.getMessage(), e);
}
}

Expand All @@ -91,20 +90,20 @@ private void doInitializeInternal() {
}

@Override
public void doOutput(final Collection<Record<Event>> records) {
public final void doOutput(final Collection<Record<Event>> records) {
LOG.debug("Records size : {}", records.size());
if (records.isEmpty()) {
return;
}

for (final Record<Event> recordData : records) {
Event event = recordData.getData();
getEventQueue().add(event);
}
}

@Override
public void shutdown() {
public final void shutdown() {
super.shutdown();
isStopRequested = Boolean.TRUE;
if (workerThread.isAlive()) {
Expand All @@ -120,23 +119,24 @@ public static BlockingQueue<Event> getEventQueue() {
public static boolean isStopRequested() {
return isStopRequested;
}


/**
* This {@link S3SinkWorkerRunner} keep listing event from {@link doOutput}
*/
private class S3SinkWorkerRunner implements Runnable {
@Override
public void run() {
try {
while (!S3Sink.isStopRequested()) {
if (s3SinkConfig.getTemporaryStorage().equalsIgnoreCase(IN_MEMORY)) {
if (storageType.equalsIgnoreCase(IN_MEMORY)) {
accumulator = worker.inMemmoryAccumulator();
} else {
accumulator = worker.localFileAccumulator();
}
accumulator.doAccumulate();
}
} catch (Exception e) {
e.printStackTrace();
LOG.error("Exception in S3Sink : \n Error message {} \n Exception cause {}", e.getMessage(),
e.getCause(), e);
LOG.error("Exception while runing S3SinkWorkerRunner : ", e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,12 @@
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;

/*
An implementation class of s3 sink configuration
/**
* An implementation class of s3 sink configuration
*/
public class S3SinkConfig {

static final String DEFAULT_BUCKET_NAME = "dataprepper";
static final String DEFAULT_PATH_PREFIX = "logdata";

static final String DEFAULT_TEMP_STORAGE = "local_file";
static final String DEFAULT_TEMP_STORAGE = "local_file";

@JsonProperty("aws")
@NotNull
Expand All @@ -46,54 +43,54 @@ public class S3SinkConfig {

@JsonProperty("bucket")
@NotNull
private String bucketName = DEFAULT_BUCKET_NAME;
private String bucketName;

@JsonProperty("key_path_prefix")
@NotNull
private String keyPathPrefix = DEFAULT_PATH_PREFIX;
private String keyPathPrefix;

/*
Aws Authentication configuration Options
/**
* Aws Authentication configuration Options
*/
public AwsAuthenticationOptions getAwsAuthenticationOptions() {
return awsAuthenticationOptions;
}

/*
Threshold configuration Options
/**
* Threshold configuration Options
*/
public ThresholdOptions getThresholdOptions() {
return thresholdOptions;
}

/*
s3 index configuration Options
/**
* s3 index configuration Options
*/
public ObjectOptions getObjectOptions() {
return objectOptions;
}

/*
sink codec configuration Options
/**
* sink codec configuration Options
*/
public PluginModel getCodec() { return codec; }

/*
s3 index path configuration Option
/**
* s3 index path configuration Option
*/
public String getKeyPathPrefix() {
return keyPathPrefix;
}

/*
s3 bucket name configuration Option
/**
* s3 bucket name configuration Option
*/
public String getBucketName() {
return bucketName;
}

/*
Temporary storage location configuration Options
/**
* Temporary storage location configuration Options
*/
public String getTemporaryStorage() {
return temporaryStorage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ public class S3SinkService {
private final S3SinkConfig s3SinkConfig;
private final S3Client s3Client;

/**
* @param s3SinkConfig
*/
S3SinkService(final S3SinkConfig s3SinkConfig){
this.s3SinkConfig = s3SinkConfig;
this.s3Client = createS3Client();
Expand All @@ -37,6 +40,9 @@ S3Client createS3Client() {
.build();
}

/**
* @return s3Client object
*/
public S3Client getS3Client() {
return s3Client;
}
Expand Down

0 comments on commit 2c6847e

Please sign in to comment.