Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Updated futures impl for different queues
Browse files Browse the repository at this point in the history
Added onError to catch errors

Added a gauge so we can track index operation in flight

Updated queue scope to only be a name, since we only use them at the system level.
  • Loading branch information
Todd Nine committed Mar 13, 2015
1 parent 361060e commit 2d6ae36
Show file tree
Hide file tree
Showing 22 changed files with 359 additions and 353 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,7 @@ protected void configure() {
install( new CommonModule());
install(new CollectionModule());
install(new GraphModule());
install( new IndexModule() {
@Override
public void wireBufferQueue() {
bind(BufferQueue.class).to( BufferQueueSQSImpl.class );
}
} );
install( new IndexModule() );
// install(new MapModule()); TODO, re-enable when index module doesn't depend on queue
// install(new QueueModule());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,10 @@ public void testCleanupOnDelete() throws Exception {
em.delete( thing );
}


//put this into the top of the queue, once it's acked we've been flushed
em.refreshIndex();

// wait for indexes to be cleared for the deleted entities
count = 0;
do {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,53 @@
*/
package org.apache.usergrid.persistence.core.future;


import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;


/**
* Future without the exception nastiness
*/
public class BetterFuture<T> extends FutureTask<T> {
public BetterFuture(Callable<T> callable){
super(callable);
public class BetterFuture<T> extends FutureTask<T> {

private Throwable error;


public BetterFuture( Callable<T> callable ) {
super( callable );
}

public void done(){

public void setError( final Throwable t ) {
this.error = t;
}


public void done() {
run();
}

public T get(){

public T get() {

T returnValue = null;

try {
return super.get();
}catch (Exception e){
throw new RuntimeException(e);
returnValue = super.get();
}
catch ( InterruptedException e ) {
//swallow
}
catch ( ExecutionException e ) {
//swallow
}
}

if ( error != null ) {
throw new RuntimeException( "Error in getting future", error );
}

return returnValue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,13 @@ public interface MetricsFactory {
Counter getCounter(Class<?> klass, String name);

Meter getMeter(Class<?> klass, String name);

/**
* Get a gauge and create it
* @param clazz
* @param name
* @param gauge
* @return
*/
void addGauge( Class<?> clazz, String name, Gauge<?> gauge );
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,27 @@
package org.apache.usergrid.persistence.core.metrics;


import com.codahale.metrics.*;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.JmxReporter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.codahale.metrics.graphite.Graphite;
import com.codahale.metrics.graphite.GraphiteReporter;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/**
* Singleton class to manage metrics.
Expand All @@ -39,74 +49,101 @@ public class MetricsFactoryImpl implements MetricsFactory {
private MetricRegistry registry;
private GraphiteReporter graphiteReporter;
private JmxReporter jmxReporter;
private ConcurrentHashMap<String,Metric> hashMap;
private static final Logger LOG = LoggerFactory.getLogger(MetricsFactoryImpl.class);
private ConcurrentHashMap<String, Metric> hashMap;
private static final Logger LOG = LoggerFactory.getLogger( MetricsFactoryImpl.class );


@Inject
public MetricsFactoryImpl(MetricsFig metricsFig) {
public MetricsFactoryImpl( MetricsFig metricsFig ) {
registry = new MetricRegistry();
String metricsHost = metricsFig.getHost();
if(!metricsHost.equals("false")) {
Graphite graphite = new Graphite(new InetSocketAddress(metricsHost, 2003));
graphiteReporter = GraphiteReporter.forRegistry(registry)
.prefixedWith("usergrid-metrics")
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.filter(MetricFilter.ALL)
.build(graphite);
graphiteReporter.start(30, TimeUnit.SECONDS);
}else {
LOG.warn("MetricsService:Logger not started.");
if ( !metricsHost.equals( "false" ) ) {
Graphite graphite = new Graphite( new InetSocketAddress( metricsHost, 2003 ) );
graphiteReporter = GraphiteReporter.forRegistry( registry ).prefixedWith( "usergrid-metrics" )
.convertRatesTo( TimeUnit.SECONDS )
.convertDurationsTo( TimeUnit.MILLISECONDS ).filter( MetricFilter.ALL )
.build( graphite );
graphiteReporter.start( 30, TimeUnit.SECONDS );
}
else {
LOG.warn( "MetricsService:Logger not started." );
}
hashMap = new ConcurrentHashMap<String, Metric>();

jmxReporter = JmxReporter.forRegistry(registry).build();
jmxReporter = JmxReporter.forRegistry( registry ).build();
jmxReporter.start();
}


@Override
public MetricRegistry getRegistry() {
return registry;
}


@Override
public Timer getTimer(Class<?> klass, String name) {
return getMetric(Timer.class, klass, name);
public Timer getTimer( Class<?> klass, String name ) {
return getMetric( Timer.class, klass, name );
}


@Override
public Histogram getHistogram(Class<?> klass, String name) {
return getMetric(Histogram.class, klass, name);
public Histogram getHistogram( Class<?> klass, String name ) {
return getMetric( Histogram.class, klass, name );
}


@Override
public Counter getCounter(Class<?> klass, String name) {
return getMetric(Counter.class, klass, name);
public Counter getCounter( Class<?> klass, String name ) {
return getMetric( Counter.class, klass, name );
}


@Override
public Meter getMeter(Class<?> klass, String name) {
return getMetric(Meter.class, klass, name);
public Meter getMeter( Class<?> klass, String name ) {
return getMetric( Meter.class, klass, name );
}

private <T> T getMetric(Class<T> metricClass, Class<?> klass, String name) {

@Override
public void addGauge( final Class<?> clazz, final String name, final Gauge<?> gauge ) {

this.getRegistry().register( MetricRegistry.name( clazz, name ), gauge );
}


private <T> T getMetric( Class<T> metricClass, Class<?> klass, String name ) {
String key = metricClass.getName() + klass.getName() + name;
Metric metric = hashMap.get(key);
if (metric == null) {
if (metricClass == Histogram.class) {
metric = this.getRegistry().histogram(MetricRegistry.name(klass, name));
Metric metric = hashMap.get( key );
if ( metric == null ) {
if ( metricClass == Histogram.class ) {
metric = this.getRegistry().histogram( MetricRegistry.name( klass, name ) );
}
if (metricClass == Timer.class) {
metric = this.getRegistry().timer(MetricRegistry.name(klass, name));
if ( metricClass == Timer.class ) {
metric = this.getRegistry().timer( MetricRegistry.name( klass, name ) );
}
if (metricClass == Meter.class) {
metric = this.getRegistry().meter(MetricRegistry.name(klass, name));
if ( metricClass == Meter.class ) {
metric = this.getRegistry().meter( MetricRegistry.name( klass, name ) );
}
if (metricClass == Counter.class) {
metric = this.getRegistry().counter(MetricRegistry.name(klass, name));
if ( metricClass == Counter.class ) {
metric = this.getRegistry().counter( MetricRegistry.name( klass, name ) );
}
hashMap.put(key, metric);


hashMap.put( key, metric );
}
return (T) metric;
return ( T ) metric;
}


/**
*
* @param metricClass
* @param klass
* @param name
* @return
*/
private String getKey( Class<?> metricClass, Class<?> klass, String name ) {
return metricClass.getName() + klass.getName() + name;
}
}
8 changes: 7 additions & 1 deletion stack/corepersistence/queryindex/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,13 @@
<classifier>tests</classifier>
</dependency>


<dependency>
<groupId>${project.parent.groupId}</groupId>
<artifactId>queue</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,15 @@
* Classy class class.
*/
public interface IndexBufferConsumer {


/**
* Start the consumer
*/
public void start();

/**
* Stop the consumers
*/
public void stop();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.safehaus.guicyfig.GuicyFig;
import org.safehaus.guicyfig.Key;

import org.apache.usergrid.persistence.index.guice.QueueProvider;


@FigSingleton
public interface IndexFig extends GuicyFig {
Expand Down Expand Up @@ -86,6 +88,11 @@ public interface IndexFig extends GuicyFig {
*/
public static final String ELASTICSEARCH_WORKER_COUNT = "elasticsearch.worker_count";

/**
* The queue implementation to use. Values come from <class>QueueProvider.Implementations</class>
*/
public static final String ELASTICSEARCH_QUEUE_IMPL = "elasticsearch.queue_impl";

public static final String QUERY_LIMIT_DEFAULT = "index.query.limit.default";

@Default( "127.0.0.1" )
Expand Down Expand Up @@ -190,4 +197,8 @@ public interface IndexFig extends GuicyFig {
@Key( ELASTICSEARCH_WORKER_COUNT )
int getWorkerCount();

@Default( "LOCAL" )
@Key( ELASTICSEARCH_QUEUE_IMPL )
String getQueueImplementation();

}
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ public int hashCode() {
}

public void done() {
getFuture().done();
//if this has been serialized, it could be null. don't NPE if it is, there's nothing to ack
final BetterFuture<IndexOperationMessage> future = getFuture();

if(future != null ){
future.done();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.inject.AbstractModule;
import com.google.inject.assistedinject.FactoryModuleBuilder;

import org.apache.usergrid.persistence.index.impl.BufferQueue;
import org.apache.usergrid.persistence.index.impl.EsEntityIndexFactoryImpl;
import org.apache.usergrid.persistence.index.impl.EsEntityIndexImpl;
import org.apache.usergrid.persistence.index.impl.EsIndexBufferConsumerImpl;
Expand All @@ -33,7 +34,7 @@
import org.safehaus.guicyfig.GuicyFigModule;


public abstract class IndexModule extends AbstractModule {
public class IndexModule extends AbstractModule {

@Override
protected void configure() {
Expand All @@ -50,14 +51,10 @@ protected void configure() {
bind(IndexBufferProducer.class).to(EsIndexBufferProducerImpl.class);
bind(IndexBufferConsumer.class).to(EsIndexBufferConsumerImpl.class).asEagerSingleton();

wireBufferQueue();
}

bind( BufferQueue.class).toProvider( QueueProvider.class );
}

/**
* Write the <class>BufferQueue</class> for this implementation
*/
public abstract void wireBufferQueue();


}
Loading

0 comments on commit 2d6ae36

Please sign in to comment.