Skip to content

Commit

Permalink
[HWKMETRICS-185] Update the JAX-RS 1.1 handlers to match the latest r…
Browse files Browse the repository at this point in the history
…ebase changes.
  • Loading branch information
Stefan Negrea committed Aug 13, 2015
1 parent 4cdd997 commit 96ca30c
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,49 +16,52 @@
*/
package org.hawkular.metrics.api.jaxrs;

import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.CASSANDRA_CQL_PORT;
import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.CASSANDRA_KEYSPACE;
import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.CASSANDRA_NODES;
import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.CASSANDRA_RESETDB;
import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.CASSANDRA_USESSL;
import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.TASK_SCHEDULER_TIME_UNITS;
import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.WAIT_FOR_SERVICE;

import java.util.Arrays;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;
import javax.inject.Inject;

import com.codahale.metrics.MetricRegistry;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Uninterruptibles;
import org.hawkular.metrics.api.jaxrs.config.Configurable;
import org.hawkular.metrics.api.jaxrs.config.ConfigurationProperty;
import org.hawkular.metrics.core.api.MetricsService;
import org.hawkular.metrics.core.impl.GenerateRate;
import org.hawkular.metrics.core.impl.MetricsServiceImpl;
import org.hawkular.metrics.core.impl.TaskTypes;
import org.hawkular.metrics.schema.SchemaManager;
import org.hawkular.metrics.tasks.api.TaskService;
import org.hawkular.metrics.tasks.api.TaskServiceBuilder;
import org.hawkular.metrics.tasks.api.Task2;
import org.hawkular.metrics.tasks.api.TaskScheduler;
import org.hawkular.metrics.tasks.api.Trigger;
import org.hawkular.metrics.tasks.impl.Lease;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.MetricRegistry;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Uninterruptibles;

import rx.Observable;

/**
* Bean created on startup to manage the lifecycle of the {@link MetricsService} instance shared in application scope.
*
Expand All @@ -78,7 +81,7 @@ public enum State {

private MetricsServiceImpl metricsService;

private TaskService taskService;
private TaskScheduler taskScheduler;

private final ScheduledExecutorService lifecycleExecutor;

Expand Down Expand Up @@ -112,6 +115,11 @@ public enum State {
@ConfigurationProperty(TASK_SCHEDULER_TIME_UNITS)
private String timeUnits;

@Inject
@Configurable
@ConfigurationProperty(CASSANDRA_USESSL)
private String cassandraUseSSL;

private volatile State state;
private int connectionAttempts;
private Session session;
Expand Down Expand Up @@ -166,7 +174,8 @@ private void startMetricsService() {
session = createSession();
} catch (Exception t) {
Throwable rootCause = Throwables.getRootCause(t);
LOG.warn("Could not connect to Cassandra cluster - assuming its not up yet", rootCause);
LOG.warn("Could not connect to Cassandra cluster - assuming its not up yet: ",
rootCause.getLocalizedMessage());
// cycle between original and more wait time - avoid waiting huge amounts of time
long delay = 1L + ((connectionAttempts - 1L) % 4L);
LOG.warn("[{}] Retrying connecting to Cassandra cluster in [{}]s...", connectionAttempts, delay);
Expand All @@ -182,11 +191,29 @@ private void startMetricsService() {
// will change at some point though because the task scheduling service will
// probably move to the hawkular-commons repo.
initSchema();
initTaskService();

taskScheduler = new TaskScheduler() {
@Override
public Observable<Lease> start() {
LOG.warn("Task scheduling is not yet supported");
return Observable.empty();
}

@Override
public Observable<Task2> scheduleTask(String name, String groupKey, int executionOrder,
Map<String, String> parameters, Trigger trigger) {
LOG.warn("Task scheduling is not yet supported");
return Observable.empty();
}

@Override
public void shutdown() {

}
};

metricsService = new MetricsServiceImpl();
metricsService.setTaskService(taskService);
taskService.subscribe(TaskTypes.COMPUTE_RATE, new GenerateRate(metricsService));
metricsService.setTaskScheduler(taskScheduler);

// TODO Set up a managed metric registry
// We want a managed registry that can be shared by the JAX-RS endpoint and the core. Then we can expose
Expand Down Expand Up @@ -222,6 +249,10 @@ private Session createSession() {
clusterBuilder.withPort(port);
Arrays.stream(nodes.split(",")).forEach(clusterBuilder::addContactPoint);

if (Boolean.parseBoolean(cassandraUseSSL)) {
clusterBuilder.withSSL();
}

Cluster cluster = null;
Session createdSession = null;
try {
Expand All @@ -244,23 +275,6 @@ private void initSchema() {
session.execute("USE " + keyspace);
}

private void initTaskService() {
LOG.info("Initializing {}", TaskService.class.getSimpleName());
taskService = new TaskServiceBuilder()
.withSession(session)
.withTimeUnit(getTimeUnit())
.withTaskTypes(singletonList(TaskTypes.COMPUTE_RATE))
.build();
taskService.start();
}

private TimeUnit getTimeUnit() {
if ("seconds".equals(timeUnits)) {
return SECONDS;
}
return MINUTES;
}

/**
* @return a {@link MetricsService} instance to share in application scope
*/
Expand All @@ -284,7 +298,7 @@ void destroy() {
private void stopMetricsService() {
state = State.STOPPING;
metricsService.shutdown();
taskService.shutdown();
taskScheduler.shutdown();
if (session != null) {
try {
session.close();
Expand All @@ -294,4 +308,4 @@ private void stopMetricsService() {
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public enum ConfigurationKey {
CASSANDRA_CQL_PORT("hawkular-metrics.cassandra-cql-port", "9042", "CASSANDRA_CQL_PORT", false),
CASSANDRA_KEYSPACE("cassandra.keyspace", "hawkular_metrics", null, false),
CASSANDRA_RESETDB("cassandra.resetdb", null, null, true),
CASSANDRA_USESSL("hawkular-metrics.cassandra-use-ssl", "false", "CASSANDRA_USESSL", false),
WAIT_FOR_SERVICE("hawkular.metrics.waitForService", null, null, true),
TASK_SCHEDULER_TIME_UNITS("hawkular.scheduler.time-units", "minutes", "SCHEDULER_TIME_UNITS", false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,14 @@ public Response findMetrics(
required = false,
allowableValues = "[gauge, availability, counter]")
@QueryParam("type") MetricType metricType,
@ApiParam(value = "List of tags", required = false) @QueryParam("tags") Tags tags) {
@ApiParam(value = "List of tags filters", required = false) @QueryParam("tags") Tags tags) {

if (metricType != null && !MetricType.userTypes().contains(metricType)) {
return badRequest(new ApiError("Incorrect type param"));
}

Observable<Metric> metricObservable = (tags == null) ? metricsService.findMetrics(tenantId, metricType)
: metricsService.findMetricsWithTags(tenantId, tags.getTags(), metricType);
: metricsService.findMetricsWithFilters(tenantId, tags.getTags(), metricType);

try {
return metricObservable
Expand Down

0 comments on commit 96ca30c

Please sign in to comment.