Skip to content
This repository was archived by the owner on Aug 20, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import java.util.List;

/**
* Created by sblackmon on 12/10/13.
* Converts a {@link com.typesafe.config.Config} element into an instance of ElasticSearchConfiguration
*/
public class ElasticsearchConfigurator {

Expand Down Expand Up @@ -51,9 +51,11 @@ public static ElasticsearchWriterConfiguration detectWriterConfiguration(Config

String index = elasticsearch.getString("index");
String type = elasticsearch.getString("type");
Long maxMsBeforeFlush = elasticsearch.hasPath("MaxTimeBetweenFlushMs") ? elasticsearch.getLong("MaxTimeBetweenFlushMs") : null;

elasticsearchWriterConfiguration.setIndex(index);
elasticsearchWriterConfiguration.setType(type);
elasticsearchWriterConfiguration.setMaxTimeBetweenFlushMs(maxMsBeforeFlush);

return elasticsearchWriterConfiguration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,15 @@
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushable, Closeable, DatumStatusCountable {
public static final String STREAMS_ID = "ElasticsearchPersistWriter";

public volatile long flushThresholdSizeInBytes = DEFAULT_BULK_FLUSH_THRESHOLD;

private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriter.class);
Expand All @@ -49,8 +54,11 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
private static final long WAITING_DOCS_LIMIT = 10000;
private static final int BYTES_IN_MB = 1024 * 1024;
private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
private static final long DEFAULT_MAX_WAIT = 10000;

private final List<String> affectedIndexes = new ArrayList<String>();
private final ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
private final ReadWriteLock lock = new ReentrantReadWriteLock();

private ObjectMapper mapper = new StreamsJacksonMapper();
private ElasticsearchClientManager manager;
Expand All @@ -61,6 +69,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
private OutputStreamWriter currentWriter = null;
private int batchSize = 50;
private int totalRecordsWritten = 0;
private long maxMsBeforeFlush;
private boolean veryLargeBulk = false; // by default this setting is set to false

protected Thread task;
Expand All @@ -78,10 +87,11 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
private volatile int batchItemsSent = 0;
private volatile int totalByteCount = 0;
private volatile int byteCount = 0;
private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis());

public ElasticsearchPersistWriter() {
Config config = StreamsConfigurator.config.getConfig("elasticsearch");
this.config = (ElasticsearchWriterConfiguration) ElasticsearchConfigurator.detectConfiguration(config);
this.config = ElasticsearchConfigurator.detectWriterConfiguration(config);
}

public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
Expand Down Expand Up @@ -173,6 +183,7 @@ public void cleanUp() {

try {
flush();
backgroundFlushTask.shutdownNow();
} catch (IOException e) {
e.printStackTrace();
}
Expand Down Expand Up @@ -240,6 +251,7 @@ public void flush() throws IOException {

@Override
public void prepare(Object configurationObject) {
maxMsBeforeFlush = config.getMaxTimeBetweenFlushMs() == null ? DEFAULT_MAX_WAIT : config.getMaxTimeBetweenFlushMs();
mapper = StreamsJacksonMapper.getInstance();
start();
}
Expand All @@ -254,70 +266,82 @@ public DatumStatusCounter getDatumStatusCounter() {
}

public void start() {

backgroundFlushTask.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
LOGGER.debug("Checking to see if data needs to be flushed");
long time = System.currentTimeMillis() - lastWrite.get();
if (time > maxMsBeforeFlush && batchItemsSent > 0) {
LOGGER.debug("Background Flush task determined {} are waiting to be flushed. It has been {} since the last write to ES", batchItemsSent, time);
flushInternal();
}
}
}, 0, maxMsBeforeFlush * 2, TimeUnit.MILLISECONDS);
manager = new ElasticsearchClientManager(config);
client = manager.getClient();

LOGGER.info(client.toString());
}

public void flushInternal() {
synchronized (this) {
// we do not have a working bulk request, we can just exit here.
if (this.bulkRequest == null || batchItemsSent == 0)
return;
lock.writeLock().lock();
// we do not have a working bulk request, we can just exit here.
if (this.bulkRequest == null || batchItemsSent == 0)
return;

// call the flush command.
flush(this.bulkRequest, batchItemsSent, batchSizeInBytes);
// call the flush command.
flush(this.bulkRequest, batchItemsSent, batchSizeInBytes);

// null the flush request, this will be created in the 'add' function below
this.bulkRequest = null;
// null the flush request, this will be created in the 'add' function below
this.bulkRequest = null;

// record the proper statistics, and add it to our totals.
this.totalSizeInBytes += this.batchSizeInBytes;
this.totalSent += batchItemsSent;
// record the proper statistics, and add it to our totals.
this.totalSizeInBytes += this.batchSizeInBytes;
this.totalSent += batchItemsSent;

// reset the current batch statistics
this.batchSizeInBytes = 0;
this.batchItemsSent = 0;
// reset the current batch statistics
this.batchSizeInBytes = 0;
this.batchItemsSent = 0;

try {
int count = 0;
if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT) {
/****************************************************************************
* Author:
* Smashew
*
* Date:
* 2013-10-20
*
* Note:
* With the information that we have on hand. We need to develop a heuristic
* that will determine when the cluster is having a problem indexing records
* by telling it to pause and wait for it to catch back up. A
*
* There is an impact to us, the caller, whenever this happens as well. Items
* that are not yet fully indexed by the server sit in a queue, on the client
* that can cause the heap to overflow. This has been seen when re-indexing
* large amounts of data to a small cluster. The "deletes" + "indexes" can
* cause the server to have many 'outstandingItems" in queue. Running this
* software with large amounts of data, on a small cluster, can re-create
* this problem.
*
* DO NOT DELETE THESE LINES
****************************************************************************/

// wait for the flush to catch up. We are going to cap this at
while (this.getTotalOutstanding() > WAITING_DOCS_LIMIT && count++ < 500)
Thread.sleep(10);

if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT)
LOGGER.warn("Even after back-off there are {} items still in queue.", this.getTotalOutstanding());
}
} catch (Exception e) {
LOGGER.info("We were broken from our loop: {}", e.getMessage());
try {
int count = 0;
if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT) {
/****************************************************************************
* Author:
* Smashew
*
* Date:
* 2013-10-20
*
* Note:
* With the information that we have on hand. We need to develop a heuristic
* that will determine when the cluster is having a problem indexing records
* by telling it to pause and wait for it to catch back up. A
*
* There is an impact to us, the caller, whenever this happens as well. Items
* that are not yet fully indexed by the server sit in a queue, on the client
* that can cause the heap to overflow. This has been seen when re-indexing
* large amounts of data to a small cluster. The "deletes" + "indexes" can
* cause the server to have many 'outstandingItems" in queue. Running this
* software with large amounts of data, on a small cluster, can re-create
* this problem.
*
* DO NOT DELETE THESE LINES
****************************************************************************/

// wait for the flush to catch up. We are going to cap this at
while (this.getTotalOutstanding() > WAITING_DOCS_LIMIT && count++ < 500)
Thread.sleep(10);

if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT)
LOGGER.warn("Even after back-off there are {} items still in queue.", this.getTotalOutstanding());
}
} catch (Exception e) {
LOGGER.info("We were broken from our loop: {}", e.getMessage());
} finally {
lock.writeLock().unlock();
}

}

public void add(String indexName, String type, String json) {
Expand Down Expand Up @@ -353,32 +377,35 @@ public void add(String indexName, String type, String id, String json) {

public void add(UpdateRequest updateRequest) {
Preconditions.checkNotNull(updateRequest);
synchronized (this) {
checkAndCreateBulkRequest();
checkIndexImplications(updateRequest.index());
bulkRequest.add(updateRequest);
try {
Optional<Integer> size = Objects.firstNonNull(
Optional.fromNullable(updateRequest.doc().source().length()),
Optional.fromNullable(updateRequest.script().length()));
trackItemAndBytesWritten(size.get().longValue());
} catch (NullPointerException x) {
trackItemAndBytesWritten(1000);
}
lock.writeLock().lock();
checkAndCreateBulkRequest();
checkIndexImplications(updateRequest.index());
bulkRequest.add(updateRequest);
try {
Optional<Integer> size = Objects.firstNonNull(
Optional.fromNullable(updateRequest.doc().source().length()),
Optional.fromNullable(updateRequest.script().length()));
trackItemAndBytesWritten(size.get().longValue());
} catch (NullPointerException x) {
trackItemAndBytesWritten(1000);
} finally {
lock.writeLock().unlock();
}
}

public void add(IndexRequest indexRequest) {
synchronized (this) {
checkAndCreateBulkRequest();
checkIndexImplications(indexRequest.index());
bulkRequest.add(indexRequest);
try {
trackItemAndBytesWritten(indexRequest.source().length());
} catch (NullPointerException x) {
LOGGER.warn("NPE adding/sizing indexrequest");
}
lock.writeLock().lock();
checkAndCreateBulkRequest();
checkIndexImplications(indexRequest.index());
bulkRequest.add(indexRequest);
try {
trackItemAndBytesWritten(indexRequest.source().length());
} catch (NullPointerException x) {
LOGGER.warn("NPE adding/sizing indexrequest");
} finally {
lock.writeLock().unlock();
}

}

public void createIndexIfMissing(String indexName) {
Expand Down Expand Up @@ -446,10 +473,20 @@ private Set<String> checkIds(Set<String> input, String index, String type) {
return toReturn;
}

/**
* This method is to ONLY be called by flushInternal otherwise the counts will be off.
* @param bulkRequest
* @param thisSent
* @param thisSizeInBytes
*/
private void flush(final BulkRequestBuilder bulkRequest, final Integer thisSent, final Long thisSizeInBytes) {
final Object messenger = new Object();
LOGGER.debug("Attempting to write {} items to ES", thisSent);
bulkRequest.execute().addListener(new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkItemResponses) {
lastWrite.set(System.currentTimeMillis());

if (bulkItemResponses.hasFailures())
LOGGER.warn("Bulk Uploading had totalFailed: " + bulkItemResponses.buildFailureMessage());

Expand Down Expand Up @@ -484,8 +521,6 @@ public void onFailure(Throwable e) {
e.printStackTrace();
}
});

this.notify();
}

private void trackItemAndBytesWritten(long sizeInBytes) {
Expand All @@ -499,9 +534,12 @@ private void trackItemAndBytesWritten(long sizeInBytes) {

private void checkAndCreateBulkRequest() {
// Synchronize to ensure that we don't lose any records
synchronized (this) {
lock.writeLock().lock();
try {
if (bulkRequest == null)
bulkRequest = this.manager.getClient().prepareBulk();
} finally {
lock.writeLock().unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
"type": {
"type": "string",
"description": "Type to write as"
}
},
"MaxTimeBetweenFlushMs": {
"type": "String",
"format": "utc-millisec"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,29 +72,36 @@ protected void sleep() {
}

protected QueryResult executeAPIRequest() {
BeatApi.BeatResponse response = this.client.createRequestBuilder()
.setHeartBeatId(heartbeatId)
.setOffset(0)
.setReturnSetSize(maxApiBatch).execute();

LOGGER.debug("Received {} results from API query", response.getCount());
BeatApi.BeatResponse response = null;
try {
response = this.client.createRequestBuilder()
.setHeartBeatId(heartbeatId)
.setOffset(0)
.setReturnSetSize(maxApiBatch).execute();

LOGGER.debug("Received {} results from API query", response.getCount());
} catch (Exception e) {
LOGGER.warn("Error querying Sysomos API", e);
}

String currentId = null;
boolean matched = false;
for(BeatApi.BeatResponse.Beat beat : response.getBeat()) {
String docId = beat.getDocid();
//We get documents in descending time order. This will set the id to the latest document
if(currentId == null) {
currentId = docId;
}
//We only want to process documents that we know we have not seen before
if(lastID != null && lastID.equals(docId)) {
matched = true;
break;
if(response != null) {
for (BeatApi.BeatResponse.Beat beat : response.getBeat()) {
String docId = beat.getDocid();
//We get documents in descending time order. This will set the id to the latest document
if (currentId == null) {
currentId = docId;
}
//We only want to process documents that we know we have not seen before
if (lastID != null && lastID.equals(docId)) {
matched = true;
break;
}
StreamsDatum item = new StreamsDatum(beat, docId);
item.getMetadata().put("heartbeat", this.heartbeatId);
this.provider.enqueueItem(item);
}
StreamsDatum item = new StreamsDatum(beat, docId);
item.getMetadata().put("heartbeat", this.heartbeatId);
this.provider.enqueueItem(item);
}
return new QueryResult(matched, currentId);
}
Expand Down
Loading