Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
changing buffer impl to queue
Browse files Browse the repository at this point in the history
  • Loading branch information
Shawn Feldman committed Mar 6, 2015
1 parent 615a5af commit 5ad1a8c
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 18 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
import rx.Observable; import rx.Observable;
import rx.Subscriber; import rx.Subscriber;


import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/** /**
* Classy class class. * Classy class class.
*/ */
public interface IndexBufferProducer extends Observable.OnSubscribe<IndexOperationMessage> { public interface IndexBufferProducer {

@Override
void call(Subscriber<? super IndexOperationMessage> subscriber);


BetterFuture put(IndexOperationMessage message); BetterFuture put(IndexOperationMessage message);

BlockingQueue<IndexOperationMessage> getSource();
} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public interface IndexFig extends GuicyFig {


public static final String INDEX_BUFFER_SIZE = "elasticsearch.buffer_size"; public static final String INDEX_BUFFER_SIZE = "elasticsearch.buffer_size";


public static final String INDEX_QUEUE_SIZE = "elasticsearch.queue_size";

public static final String INDEX_BUFFER_TIMEOUT = "elasticsearch.buffer_timeout"; public static final String INDEX_BUFFER_TIMEOUT = "elasticsearch.buffer_timeout";


public static final String INDEX_BATCH_SIZE = "elasticsearch.batch_size"; public static final String INDEX_BATCH_SIZE = "elasticsearch.batch_size";
Expand Down Expand Up @@ -127,7 +129,7 @@ public interface IndexFig extends GuicyFig {
*/ */
@Default("250") @Default("250")
@Key( INDEX_BUFFER_TIMEOUT ) @Key( INDEX_BUFFER_TIMEOUT )
int getIndexBufferTimeout(); long getIndexBufferTimeout();


/** /**
* size of the buffer to build up before you send results * size of the buffer to build up before you send results
Expand All @@ -137,6 +139,14 @@ public interface IndexFig extends GuicyFig {
@Key( INDEX_BUFFER_SIZE ) @Key( INDEX_BUFFER_SIZE )
int getIndexBufferSize(); int getIndexBufferSize();


/**
* size of the buffer to build up before you send results
* @return
*/
@Default("1000")
@Key( INDEX_QUEUE_SIZE )
int getIndexQueueSize();

/** /**
* Request batch size for ES * Request batch size for ES
* @return * @return
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -42,12 +42,14 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import rx.Observable; import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1; import rx.functions.Action1;
import rx.functions.Func1; import rx.functions.Func1;
import rx.schedulers.Schedulers; import rx.schedulers.Schedulers;


import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.*;


/** /**
* Consumer for IndexOperationMessages * Consumer for IndexOperationMessages
Expand All @@ -63,6 +65,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
private final Timer flushTimer; private final Timer flushTimer;
private final Counter indexSizeCounter; private final Counter indexSizeCounter;
private final Meter flushMeter; private final Meter flushMeter;
private final Timer produceTimer;


@Inject @Inject
public EsIndexBufferConsumerImpl(final IndexFig config, final IndexBufferProducer producer, final EsProvider provider, final MetricsFactory metricsFactory){ public EsIndexBufferConsumerImpl(final IndexFig config, final IndexBufferProducer producer, final EsProvider provider, final MetricsFactory metricsFactory){
Expand All @@ -72,14 +75,48 @@ public EsIndexBufferConsumerImpl(final IndexFig config, final IndexBufferProduce
this.config = config; this.config = config;
this.failureMonitor = new FailureMonitorImpl(config,provider); this.failureMonitor = new FailureMonitorImpl(config,provider);
this.client = provider.getClient(); this.client = provider.getClient();
this.produceTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class,"index.buffer.consumer.messageFetch");
final BlockingQueue<IndexOperationMessage> producerQueue = producer.getSource();


//batch up sets of some size and send them in batch //batch up sets of some size and send them in batch
this.consumer = Observable.create(producer) this.consumer = Observable.create(new Observable.OnSubscribe<IndexOperationMessage>() {
@Override
public void call(final Subscriber<? super IndexOperationMessage> subscriber) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
List<IndexOperationMessage> drainList = new ArrayList<>(config.getIndexBufferSize() + 1);
do {
try {
Timer.Context timer = produceTimer.time();
IndexOperationMessage polled = producerQueue.poll(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS);
if(polled!=null) {
drainList.add(polled);
producerQueue.drainTo(drainList, config.getIndexBufferSize());
System.out.println("Consumer Thread" + Thread.currentThread().getName());
for(IndexOperationMessage drained : drainList){
subscriber.onNext(drained);
}
drainList.clear();
}
timer.stop();

} catch (InterruptedException ie) {
log.error("failed to dequeue", ie);
}
} while (true);
}
});
thread.setName("EsEntityIndex_Consumer");
thread.start();
}
})
.subscribeOn(Schedulers.io()) .subscribeOn(Schedulers.io())
.buffer(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, config.getIndexBufferSize()) .buffer(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, config.getIndexBufferSize())
.doOnNext(new Action1<List<IndexOperationMessage>>() { .doOnNext(new Action1<List<IndexOperationMessage>>() {
@Override @Override
public void call(List<IndexOperationMessage> containerList) { public void call(List<IndexOperationMessage> containerList) {
System.out.println("Buffered Consumer Thread" + Thread.currentThread().getName());
if (containerList.size() > 0) { if (containerList.size() > 0) {
flushMeter.mark(containerList.size()); flushMeter.mark(containerList.size());
Timer.Context time = flushTimer.time(); Timer.Context time = flushTimer.time();
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -20,38 +20,48 @@
package org.apache.usergrid.persistence.index.impl; package org.apache.usergrid.persistence.index.impl;


import com.codahale.metrics.Counter; import com.codahale.metrics.Counter;
import com.codahale.metrics.Timer;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import org.apache.usergrid.persistence.core.future.BetterFuture; import org.apache.usergrid.persistence.core.future.BetterFuture;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.index.IndexBufferProducer; import org.apache.usergrid.persistence.index.IndexBufferProducer;
import org.apache.usergrid.persistence.index.IndexFig;
import org.apache.usergrid.persistence.index.IndexOperationMessage; import org.apache.usergrid.persistence.index.IndexOperationMessage;
import rx.Subscriber; import rx.Subscriber;


import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/** /**
* Producer for index operation messages * Producer for index operation messages
*/ */
@Singleton @Singleton
public class EsIndexBufferProducerImpl implements IndexBufferProducer { public class EsIndexBufferProducerImpl implements IndexBufferProducer {


private final Counter indexSizeCounter; private final Counter indexSizeCounter;
private Subscriber<? super IndexOperationMessage> subscriber; private final ArrayBlockingQueue<IndexOperationMessage> messages;
private final Timer timer;


@Inject @Inject
public EsIndexBufferProducerImpl(MetricsFactory metricsFactory){ public EsIndexBufferProducerImpl(MetricsFactory metricsFactory,IndexFig fig){
this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferProducerImpl.class,"index.buffer.size"); this.messages = new ArrayBlockingQueue<>(fig.getIndexQueueSize()*5);

this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferProducerImpl.class, "index.buffer.size");
} this.timer = metricsFactory.getTimer(EsIndexBufferProducerImpl.class,"index.buffer.producer.timer");
@Override
public void call(Subscriber<? super IndexOperationMessage> subscriber) {
this.subscriber = subscriber;
} }


public BetterFuture put(IndexOperationMessage message){ public BetterFuture put(IndexOperationMessage message){
Preconditions.checkNotNull(message,"Message cannot be null"); Preconditions.checkNotNull(message, "Message cannot be null");
indexSizeCounter.inc(message.getOperations().size()); indexSizeCounter.inc(message.getOperations().size());
subscriber.onNext(message); Timer.Context time = timer.time();
messages.offer(message);
time.stop();
return message.getFuture(); return message.getFuture();
} }

@Override
public BlockingQueue<IndexOperationMessage> getSource() {
return messages;
}
} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void testIndexThreads() throws IOException {


long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
final int threads = 20; final int threads = 20;
final int size = 20; final int size = 30;
final EntityIndex entityIndex = eif.createEntityIndex( applicationScope ); final EntityIndex entityIndex = eif.createEntityIndex( applicationScope );
final IndexScope indexScope = new IndexScopeImpl(appId, "things"); final IndexScope indexScope = new IndexScopeImpl(appId, "things");
final String entityType = "thing"; final String entityType = "thing";
Expand Down

0 comments on commit 5ad1a8c

Please sign in to comment.