Skip to content

Commit

Permalink
s3-sink - Code refactor (opensearch-project#1048)
Browse files Browse the repository at this point in the history
Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
  • Loading branch information
deepaksahu562 committed Mar 2, 2023
1 parent cefd018 commit 515b7b6
Show file tree
Hide file tree
Showing 23 changed files with 701 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.AbstractSink;
import org.opensearch.dataprepper.model.sink.Sink;
import org.opensearch.dataprepper.plugins.sink.accumulator.SinkAccumulator;
import org.opensearch.dataprepper.plugins.sink.codec.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.ObjectMapper;

/**
* Implementation class of s3-sink plugin
*
Expand All @@ -33,13 +32,19 @@ public class S3Sink extends AbstractSink<Record<Event>> {

private static final Logger LOG = LoggerFactory.getLogger(S3Sink.class);
private static final int EVENT_QUEUE_SIZE = 100000;

private static final String IN_MEMORY = "in_memory";
private static final String LOCAL_FILE = "local_file";

private final S3SinkConfig s3SinkConfig;
private S3SinkWorker worker;
private SinkAccumulator accumulator;
private final Codec codec;
private volatile boolean initialized;
private static BlockingQueue<Event> eventQueue;
private static boolean isStopRequested;
private Thread workerThread;


private final Codec codec;

/**
*
Expand Down Expand Up @@ -78,8 +83,10 @@ public void doInitialize() {
private void doInitializeInternal() {
eventQueue = new ArrayBlockingQueue<>(EVENT_QUEUE_SIZE);
S3SinkService s3SinkService = new S3SinkService(s3SinkConfig);
S3SinkWorker worker = new S3SinkWorker(s3SinkService, s3SinkConfig, codec);
new Thread(worker).start();
worker = new S3SinkWorker(s3SinkService, s3SinkConfig, codec);
S3SinkWorkerRunner runner = new S3SinkWorkerRunner();
workerThread = new Thread(runner);
workerThread.start();
initialized = Boolean.TRUE;
}

Expand All @@ -91,17 +98,18 @@ public void doOutput(final Collection<Record<Event>> records) {
}

for (final Record<Event> recordData : records) {

Event event = recordData.getData();
getEventQueue().add(event);

}
}

@Override
public void shutdown() {
super.shutdown();
isStopRequested = Boolean.TRUE;
if (workerThread.isAlive()) {
workerThread.stop();
}
LOG.info("s3-sink sutdonwn completed");
}

Expand All @@ -112,4 +120,24 @@ public static BlockingQueue<Event> getEventQueue() {
public static boolean isStopRequested() {
return isStopRequested;
}

private class S3SinkWorkerRunner implements Runnable {
@Override
public void run() {
try {
while (!S3Sink.isStopRequested()) {
if (s3SinkConfig.getTemporaryStorage().equalsIgnoreCase(IN_MEMORY)) {
accumulator = worker.inMemmoryAccumulator();
} else {
accumulator = worker.localFileAccumulator();
}
accumulator.doAccumulate();
}
} catch (Exception e) {
e.printStackTrace();
LOG.error("Exception in S3Sink : \n Error message {} \n Exception cause {}", e.getMessage(),
e.getCause(), e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,16 @@
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;

/**
/*
An implementation class of s3 sink configuration
*/
public class S3SinkConfig {

static final String DEFAULT_BUCKET_NAME = "dataprepper";
static final String DEFAULT_PATH_PREFIX = "logdata";

static final String DEFAULT_TEMP_STORAGE = "local_file";

@JsonProperty("aws")
@NotNull
@Valid
Expand All @@ -37,15 +42,15 @@ public class S3SinkConfig {

@JsonProperty("temporary_storage")
@NotNull
private String temporaryStorage;
private String temporaryStorage = DEFAULT_TEMP_STORAGE;

@JsonProperty("bucket")
@NotNull
private String bucketName;
private String bucketName = DEFAULT_BUCKET_NAME;

@JsonProperty("key_path_prefix")
@NotNull
private String keyPathPrefix;
private String keyPathPrefix = DEFAULT_PATH_PREFIX;

/*
Aws Authentication configuration Options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
*/
package org.opensearch.dataprepper.plugins.sink;

import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.HashSet;
import java.util.NavigableSet;
Expand All @@ -14,9 +16,10 @@
import org.mapdb.DBMaker;
import org.mapdb.Serializer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.plugins.sink.accumulator.InMemoryAccumulator;
import org.opensearch.dataprepper.plugins.sink.accumulator.LocalFileAccumulator;
import org.opensearch.dataprepper.plugins.sink.accumulator.SinkAccumulator;
import org.opensearch.dataprepper.plugins.sink.codec.Codec;
import org.opensearch.dataprepper.plugins.sink.stream.InMemoryAccumulator;
import org.opensearch.dataprepper.plugins.sink.stream.LocalFileAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -32,60 +35,42 @@
* Ex. 1) if numEvents = 100 then numStreams = 2 and eventsPerChunk = 50
* 2) if numEvents = 1000 then numStreams = 20 and eventsPerChunk = 50
*/
public class S3SinkWorker implements Runnable {
public class S3SinkWorker {

private static final Logger LOG = LoggerFactory.getLogger(S3SinkWorker.class);
private static final float LOAD_FACTOR = 0.02f;
private static final String IN_MEMORY = "in_memory";
private static final String LOCAL_FILE = "local_file";
private final int numEvents;
private int numStreams;
private final int eventsPerChunk;
private final S3SinkService s3SinkService;
private final S3SinkConfig s3SinkConfig;
private final Codec codec;

private SinkAccumulator accumulator;
private final int numEvents;
private int numStreams;
private final int eventsPerChunk;

/**
*
* @param s3SinkService
* @param s3SinkConfig
*/
public S3SinkWorker(S3SinkService s3SinkService, S3SinkConfig s3SinkConfig, Codec codec) {
public S3SinkWorker(final S3SinkService s3SinkService, final S3SinkConfig s3SinkConfig, final Codec codec) {
this.numEvents = s3SinkConfig.getThresholdOptions().getEeventCount();
this.numStreams = (int) (numEvents * LOAD_FACTOR);
this.eventsPerChunk = numEvents / numStreams;
this.s3SinkService = s3SinkService;
this.s3SinkConfig = s3SinkConfig;
this.codec = codec;
}

@Override
public void run() {
try {
while (!S3Sink.isStopRequested()) {
if (s3SinkConfig.getTemporaryStorage().equalsIgnoreCase(IN_MEMORY)) {
inMemmoryAccumulator();
} else {
localFileAccumulator();
}
}
} catch (Exception e) {
e.printStackTrace();
LOG.error("Exception in S3SinkWorker : \n Error message {} \n Exception cause {}", e.getMessage(),
e.getCause(), e);
}
numStreams = (int) (numEvents * LOAD_FACTOR);
eventsPerChunk = numEvents / numStreams;
}

/**
* Accumulates data from buffer and store into in memory
*/
public void inMemmoryAccumulator() {
HashSet<Event> inMemoryEventSet = null;
HashMap<Integer, HashSet<Event>> inMemoryEventMap = null;
public SinkAccumulator inMemmoryAccumulator() {
HashSet<String> inMemoryEventSet = null;
HashMap<Integer, HashSet<String>> inMemoryEventMap = null;
int streamCount = 0;
try {
StopWatch watch = new StopWatch();
watch.start();
int streamCount = 0;
int byteCount = 0;
int eventCount = 0;
long eventCollectionDuration = 0;
Expand All @@ -96,7 +81,9 @@ public void inMemmoryAccumulator() {
for (int data = 0; data < eventsPerChunk
&& thresholdsCheck(eventCount, watch, byteCount); data++, eventCount++) {
Event event = S3Sink.getEventQueue().take();
inMemoryEventSet.add(event);
OutputStream outPutStream = new ByteArrayOutputStream();
codec.parse(outPutStream, event);
inMemoryEventSet.add(outPutStream.toString());
byteCount += event.toJsonString().getBytes().length;
flag = Boolean.TRUE;
eventCollectionDuration = watch.getTime(TimeUnit.SECONDS);
Expand All @@ -115,16 +102,18 @@ public void inMemmoryAccumulator() {
"In-Memory snapshot info : Byte_count = {} Bytes \t Event_count = {} Records \t Event_collection_duration = {} sec & \t Number of stream {}",
byteCount, eventCount, eventCollectionDuration, streamCount);

//new InMemoryAccumulator(inMemoryEventMap, streamCount, s3SinkService, s3SinkConfig).doAccumulate();
accumulator = new InMemoryAccumulator(inMemoryEventMap, streamCount, s3SinkService, s3SinkConfig);
} catch (Exception e) {
LOG.error("Exception while storing recoreds into In-Memory", e);
}

return accumulator;
}

/**
* Accumulates data from buffer and store in local file
*/
public void localFileAccumulator() {
public SinkAccumulator localFileAccumulator() {
DB db = null;
NavigableSet<String> localFileEventSet = null;
int byteCount = 0;
Expand All @@ -136,26 +125,23 @@ public void localFileAccumulator() {
db = DBMaker.memoryDB().make();
localFileEventSet = db.treeSet("mySet").serializer(Serializer.STRING).createOrOpen();
for (int data = 0; thresholdsCheck(data, watch, byteCount); data++) {
String event = S3Sink.getEventQueue().take().toJsonString();
byteCount += event.getBytes().length;
localFileEventSet.add(event);
Event event = S3Sink.getEventQueue().take();
OutputStream outPutStream = new ByteArrayOutputStream();
codec.parse(outPutStream, event);
byteCount += event.toJsonString().getBytes().length;
localFileEventSet.add(outPutStream.toString());
eventCount++;
eventCollectionDuration = watch.getTime(TimeUnit.SECONDS);
}
db.commit();
LOG.info(
"Local-File snapshot info : Byte_count = {} Bytes, \t Event_count = {} Records \n & Event_collection_duration = {} Sec",
byteCount, eventCount, eventCollectionDuration);

//new LocalFileAccumulator(localFileEventSet, s3SinkService, s3SinkConfig).doAccumulate();

accumulator = new LocalFileAccumulator(localFileEventSet, s3SinkService, s3SinkConfig, db);
} catch (Exception e) {
LOG.error("Exception while storing recoreds into Local-file", e);
} finally {
if (db !=null && !db.isClosed()) {
db.close();
}
}
return accumulator;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.sink.stream;
package org.opensearch.dataprepper.plugins.sink.accumulator;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.sink.stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
package org.opensearch.dataprepper.plugins.sink.accumulator;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.security.MessageDigest;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A ByteArrayOutputStream with some useful additional functionality.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.sink.stream;
package org.opensearch.dataprepper.plugins.sink.accumulator;

import java.util.Iterator;
import java.util.concurrent.Callable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.sink.stream;
package org.opensearch.dataprepper.plugins.sink.accumulator;

import java.util.HashSet;
import java.util.List;
Expand All @@ -11,22 +11,20 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.plugins.sink.S3ObjectIndex;
import org.opensearch.dataprepper.plugins.sink.S3SinkConfig;
import org.opensearch.dataprepper.plugins.sink.S3SinkService;
import org.opensearch.dataprepper.plugins.sink.SinkAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import software.amazon.awssdk.services.s3.S3Client;
/**
* Accumulates data from buffer and store into in memory
* Upload accumulated data(in-memory) to amazon s3
*/
public class InMemoryAccumulator implements SinkAccumulator {

private static final Logger LOG = LoggerFactory.getLogger(InMemoryAccumulator.class);
final Map<Integer, HashSet<Event>> inMemoryEventMap;
final Map<Integer, HashSet<String>> inMemoryEventMap;
private final int numStreams;
private final S3SinkService s3SinkService;
private final S3SinkConfig s3SinkConfig;
Expand All @@ -41,7 +39,7 @@ public class InMemoryAccumulator implements SinkAccumulator {
* @param s3SinkService
* @param s3SinkConfig
*/
public InMemoryAccumulator(final Map<Integer, HashSet<Event>> inMemoryEventMap, final int numStreams,
public InMemoryAccumulator(final Map<Integer, HashSet<String>> inMemoryEventMap, final int numStreams,
final S3SinkService s3SinkService, final S3SinkConfig s3SinkConfig) {
this.inMemoryEventMap = inMemoryEventMap;
this.numStreams = numStreams;
Expand Down Expand Up @@ -72,14 +70,14 @@ public void doAccumulate() {
ExecutorService pool = Executors.newFixedThreadPool(numStreams);
for (int streamsInput = 0; streamsInput < numStreams; streamsInput++) {
final int streamIndex = streamsInput;
HashSet<Event> eventSet = inMemoryEventMap.get(streamsInput);
HashSet<String> eventSet = inMemoryEventMap.get(streamsInput);
pool.submit(new Runnable() {
public void run() {
try {
MultiPartOutputStream outputStream = streams.get(streamIndex);
if (eventSet != null) {
for (Event event : eventSet) {
outputStream.write(event.toJsonString().getBytes());
for (String event : eventSet) {
outputStream.write(event.getBytes());
}
}
// The stream must be closed once all the data has been written
Expand Down
Loading

0 comments on commit 515b7b6

Please sign in to comment.