Skip to content

Commit

Permalink
HWKMETRICS-99: modification request from initial PR.
Browse files Browse the repository at this point in the history
  • Loading branch information
mwringe committed May 22, 2015
1 parent 6763aa6 commit ef8689f
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@
*/
package org.hawkular.metrics.api.jaxrs;

import javax.inject.Inject;
import javax.ws.rs.ApplicationPath;
import javax.ws.rs.core.Application;

import org.hawkular.metrics.core.api.MetricsService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -33,13 +31,8 @@ public class HawkularMetricsRestApp extends Application {

private static final Logger logger = LoggerFactory.getLogger(HawkularMetricsRestApp.class);

@Inject
private MetricsService metricsService;

public HawkularMetricsRestApp() {

logger.info("Hawkular Metrics starting ..");

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@
import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.CASSANDRA_KEYSPACE;
import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.CASSANDRA_NODES;

import com.datastax.driver.core.Session;
import com.google.common.util.concurrent.FutureCallback;
import java.util.HashMap;
import java.util.Map;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;
import javax.inject.Inject;

import com.datastax.driver.core.Session;
import com.google.common.util.concurrent.FutureCallback;
import org.hawkular.metrics.api.jaxrs.config.Configurable;
import org.hawkular.metrics.api.jaxrs.config.ConfigurationProperty;
import org.hawkular.metrics.api.jaxrs.util.Eager;
Expand Down Expand Up @@ -62,41 +61,42 @@ public class MetricsServiceProducer {
@ConfigurationProperty(CASSANDRA_KEYSPACE)
private String keyspace;

@Produces
private MetricsService metricsService;

private CassandraSession cassandraSession;

@PostConstruct
void init() {
LOG.info("Initializing metrics service");
getMetricsService();
metricsService = new MetricsServiceCassandra();
Map<String, String> options = new HashMap<>();
options.put("cqlport", cqlPort);
options.put("nodes", nodes);
options.put("keyspace", keyspace);

CassandraSession.Builder cassandraSessionBuilder = new CassandraSession.Builder();
cassandraSessionBuilder.withOptions(options);

cassandraSessionBuilder.withInitializationCallback(new FutureCallback<Session>() {
@Override
public void onSuccess(Session session) {
metricsService.startUp(session);
}

@Override
public void onFailure(Throwable t) {
LOG.error("An error occurred trying to connect to the Cassandra cluster.", t);
metricsService.setState(MetricsService.State.FAILED);
}
});

cassandraSession = cassandraSessionBuilder.build();
}

@Produces
public MetricsService getMetricsService() {
if (metricsService == null) {
metricsService = new MetricsServiceCassandra();
Map<String, String> options = new HashMap<>();
options.put("cqlport", cqlPort);
options.put("nodes", nodes);
options.put("keyspace", keyspace);

CassandraSession.Builder cassandraSessionBuilder = new CassandraSession.Builder();
cassandraSessionBuilder.withOptions(options);

cassandraSessionBuilder.withInitializationCallback(new FutureCallback<Session>() {
@Override
public void onSuccess(Session session) {
metricsService.startUp(session);
}

@Override
public void onFailure(Throwable t) {
throw new RuntimeException("Error trying to get the Cassandra Session", t);
}
});

cassandraSessionBuilder.build();
}

return metricsService;
@PreDestroy
void destroy() {
metricsService.shutdown();
cassandraSession.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,39 @@
* Created by mwringe on 20/05/15.
*/
@Provider
public class CassandraAvailabilityFilter implements ContainerRequestFilter {
public class MetricsServiceStateFilter implements ContainerRequestFilter {

private static final String NO_CASSANDRA_SESSION_MSG = "Service unavailable while initializing.";
private static final String STARTING = "Service unavailable while initializing.";
private static final String FAILED = "Internal server error.";
private static final String STOPPED = "The service is no longer running.";

@Inject
private MetricsService metricsService;


@Override
public void filter(ContainerRequestContext containerRequestContext) throws IOException {
if (!metricsService.isStarted()) {
if (metricsService.getState() == MetricsService.State.STARTING) {
// Fail since the Cassandra cluster is not yet up yet.
Response response = Response.status(Response.Status.SERVICE_UNAVAILABLE)
.type(APPLICATION_JSON_TYPE)
.entity(new ApiError(NO_CASSANDRA_SESSION_MSG))
.entity(new ApiError(STARTING))
.build();
containerRequestContext.abortWith(response);
} else if (metricsService.getState() == MetricsService.State.FAILED) {
// Fail since an error has occured trying to start the Metrics service
Response response = Response.status(Response.Status.INTERNAL_SERVER_ERROR)
.type(APPLICATION_JSON_TYPE)
.entity(new ApiError(FAILED))
.build();
containerRequestContext.abortWith(response);
} else if (metricsService.getState() == MetricsService.State.STOPPED ||
metricsService.getState() == MetricsService.State.STOPPING ) {
Response response = Response.status(Response.Status.SERVICE_UNAVAILABLE)
.type(APPLICATION_JSON_TYPE)
.entity(new ApiError(STOPPED))
.build();
containerRequestContext.abortWith(response);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
*/
public interface MetricsService {

public enum State {
STARTING, STARTED, STOPPING, STOPPED, FAILED
}


// For now we will use a default or fake tenant id until we get APIs in place for
// creating tenants.
String DEFAULT_TENANT_ID = "test";
Expand All @@ -42,7 +47,9 @@ public interface MetricsService {
*/
void startUp(Session session);

boolean isStarted();
State getState();

void setState(State state);

void shutdown();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ public Builder withInitializationCallback(FutureCallback<Session> callback) {
return this;
}

public void build() {
public CassandraSession build() {
CassandraSession cassandraSession = new CassandraSession(options, callback);
return cassandraSession;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public class MetricsServiceCassandra implements MetricsService {

public static final int DEFAULT_TTL = Duration.standardDays(7).toStandardSeconds().getSeconds();

public volatile State state;

private static class DataRetentionKey {
private final String tenantId;
private final MetricId metricId;
Expand Down Expand Up @@ -140,26 +142,30 @@ public int hashCode() {
private final ListeningExecutorService metricsTasks = MoreExecutors
.listeningDecorator(Executors.newFixedThreadPool(4, new MetricsThreadFactory()));

private boolean started = false;

/**
* Note that while user specifies the durations in hours, we store them in seconds.
*/
private final Map<DataRetentionKey, Integer> dataRetentions = new ConcurrentHashMap<>();

public MetricsServiceCassandra() {
this.state = State.STARTING;
}

@Override
public void startUp(Session s) {
this.dataAccess = new DataAccessImpl(s);
loadDataRetentions();
started = true;
state = State.STARTED;
}

@Override
public State getState() {
return state;
}

@Override
public boolean isStarted() {
return started;
public void setState(State state) {
this.state = state;
}

void loadDataRetentions() {
Expand Down Expand Up @@ -251,6 +257,7 @@ public void onFailure(Throwable t) {

@Override
public void shutdown() {
state = State.STOPPED;
}

/**
Expand Down

0 comments on commit ef8689f

Please sign in to comment.