Skip to content

Commit

Permalink
Merge pull request #925 from jsanda/release/0.30.0-hwkmetrics-765
Browse files Browse the repository at this point in the history
[HWKMETRICS-765] remove DeleteExpiredMetrics job and related code (#923)
  • Loading branch information
John Sanda committed Mar 27, 2018
2 parents b125e30 + b0a6f4c commit d328ca0
Show file tree
Hide file tree
Showing 20 changed files with 39 additions and 662 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@
import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.INGEST_MAX_RETRIES;
import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.INGEST_MAX_RETRY_DELAY;
import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.JMX_REPORTING_ENABLED;
import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.METRICS_EXPIRATION_DELAY;
import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.METRICS_EXPIRATION_JOB_ENABLED;
import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.METRICS_EXPIRATION_JOB_FREQUENCY;
import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.METRICS_REPORTING_COLLECTION_INTERVAL;
import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.METRICS_REPORTING_ENABLED;
import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.METRICS_REPORTING_HOSTNAME;
Expand Down Expand Up @@ -280,21 +277,6 @@ public enum State {
@ConfigurationProperty(METRICS_REPORTING_COLLECTION_INTERVAL)
private String collectionIntervalConfig;

@Inject
@Configurable
@ConfigurationProperty(METRICS_EXPIRATION_DELAY)
private String metricExpirationDelay;

@Inject
@Configurable
@ConfigurationProperty(METRICS_EXPIRATION_JOB_FREQUENCY)
private String metricsExpirationJobFrequency;

@Inject
@Configurable
@ConfigurationProperty(METRICS_EXPIRATION_JOB_ENABLED)
private String metricsExpirationJobEnabled;

@Inject
@ServiceReady
Event<ServiceReadyEvent> metricsServiceReady;
Expand Down Expand Up @@ -650,10 +632,7 @@ private boolean parseBooleanConfig(String value, ConfigurationKey configKey) {
private void initJobsService() {

RxSession rxSession = new RxSessionImpl(session);
jobsService = new JobsServiceImpl(
parseIntConfig(metricExpirationDelay, METRICS_EXPIRATION_DELAY),
parseIntConfig(metricsExpirationJobFrequency, METRICS_EXPIRATION_JOB_FREQUENCY),
parseBooleanConfig(metricsExpirationJobEnabled, METRICS_EXPIRATION_JOB_ENABLED));
jobsService = new JobsServiceImpl();
jobsService.setMetricsService(metricsService);
jobsService.setConfigurationService(configurationService);
jobsService.setSession(rxSession);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,6 @@ public enum ConfigurationKey {
METRICS_REPORTING_COLLECTION_INTERVAL("hawkular.metrics.reporting.collection-interval", "300",
"METRICS_REPORTING_COLLECTION_INTERVAL", false),

//Metric expiration job configuration
METRICS_EXPIRATION_DELAY("hawkular.metrics.expiration.delay", "1", "METRICS_EXPIRATION_DELAY", false),
METRICS_EXPIRATION_JOB_FREQUENCY("hawkular.metrics.jobs.expiration.frequency", "7",
"METRICS_EXPIRATION_JOB_FREQUENCY", false),
METRICS_EXPIRATION_JOB_ENABLED("hawkular.metrics.jobs.expiration.enabled", "true",
"METRICS_EXPIRATION_JOB_ENABLED", false),

// Request logging properties
// Useful for debugging
REQUEST_LOGGING_LEVEL("hawkular.metrics.request.logging.level", null, "REQUEST_LOGGING_LEVEL", false),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2017 Red Hat, Inc. and/or its affiliates
* Copyright 2014-2018 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -24,7 +24,6 @@

import org.hawkular.metrics.core.service.MetricsService;
import org.hawkular.metrics.datetime.DateTimeService;
import org.hawkular.metrics.model.Metric;
import org.hawkular.metrics.model.MetricId;
import org.hawkular.metrics.scheduler.api.JobDetails;
import org.hawkular.metrics.scheduler.api.RepeatingTrigger;
Expand All @@ -41,7 +40,6 @@
import rx.Completable;
import rx.Observable;
import rx.functions.Func1;
import rx.subjects.PublishSubject;

/**
* @author Michael Burman
Expand Down Expand Up @@ -124,25 +122,11 @@ public Completable call(JobDetails jobDetails) {
Observable<? extends MetricId<?>> metricIds = metricsService.findAllMetricIdentifiers()
.filter(m -> (m.getType() == GAUGE || m.getType() == COUNTER || m.getType() == AVAILABILITY));

PublishSubject<Metric<?>> subject = PublishSubject.create();
subject.subscribe(metric -> {
try {
this.metricsService.updateMetricExpiration(metric.getMetricId());
} catch (Exception e) {
logger.error("Could not update the metric expiration index for metric " + metric.getId()
+ " of tenant " + metric.getTenantId());
}
});

// Fetch all partition keys and compress the previous timeSlice
// TODO Optimization - new worker per token - use parallelism in Cassandra (with configured parallelism)
return metricsService.compressBlock(metricIds, startOfSlice, endOfSlice, pageSize, subject)
.doOnError(t -> {
subject.onCompleted();
logger.warn("Failed to compress data", t);
})
return metricsService.compressBlock(metricIds, startOfSlice, endOfSlice, pageSize)
.doOnError(t -> logger.warn("Failed to compress data", t))
.doOnCompleted(() -> {
subject.onCompleted();
stopwatch.stop();
logger.info("Finished compressing data in " + stopwatch.elapsed(TimeUnit.MILLISECONDS) +
" ms");
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,4 @@ public interface JobsService {

Single<? extends JobDetails> submitDeleteTenantJob(String tenantId, String jobName);

Single<? extends JobDetails> submitDeleteExpiredMetricsJob(long expiration, String jobName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,24 +51,8 @@ public class JobsServiceImpl implements JobsService, JobsServiceImplMBean {

private DeleteTenant deleteTenant;

private DeleteExpiredMetrics deleteExpiredMetrics;
private int metricExpirationJobFrequencyInDays;
private int metricExpirationDelay;
private boolean metricExpirationJobEnabled;

private ConfigurationService configurationService;

public JobsServiceImpl() {
this(1, 7, true);
}

public JobsServiceImpl(int metricExpirationDelay, int metricExpirationJobFrequencyInDays,
boolean metricExpirationJobEnabled) {
this.metricExpirationJobFrequencyInDays = metricExpirationJobFrequencyInDays;
this.metricExpirationDelay = metricExpirationDelay;
this.metricExpirationJobEnabled = metricExpirationJobEnabled;
}

public void setMetricsService(MetricsService metricsService) {
this.metricsService = metricsService;
}
Expand Down Expand Up @@ -109,10 +93,6 @@ public void start() {
TempDataCompressor tempJob = new TempDataCompressor(metricsService, configurationService);
scheduler.register(TempDataCompressor.JOB_NAME, tempJob);

deleteExpiredMetrics = new DeleteExpiredMetrics(metricsService, session, configurationService,
this.metricExpirationDelay);
scheduler.register(DeleteExpiredMetrics.JOB_NAME, deleteExpiredMetrics);

scheduler.start();
}

Expand All @@ -127,13 +107,6 @@ public Single<? extends JobDetails> submitDeleteTenantJob(String tenantId, Strin
new SingleExecutionTrigger.Builder().withDelay(1, TimeUnit.MINUTES).build());
}

@Override
public Single<? extends JobDetails> submitDeleteExpiredMetricsJob(long expiration, String jobName) {
return scheduler.scheduleJob(DeleteExpiredMetrics.JOB_NAME, jobName,
ImmutableMap.of("expirationTimestamp", expiration + ""),
new SingleExecutionTrigger.Builder().withDelay(1, TimeUnit.MINUTES).build());
}

// JMX management
private void submitCompressJob(Map<String, String> parameters) {
String jobName = String.format("%s_single_%s", CompressData.JOB_NAME, parameters.get(CompressData.TARGET_TIME));
Expand All @@ -159,10 +132,4 @@ public void submitCompressJob(long timestamp, String blockSize) {
);
}

@Override
public void submitDeleteExpiredMetrics() {
long time = System.currentTimeMillis();
logger.debugf("Scheduling manual deleteExpiredMetrics job with timestamp->%d", time);
submitDeleteExpiredMetricsJob(time, "delete_expired_" + time);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2017 Red Hat, Inc. and/or its affiliates
* Copyright 2014-2018 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -38,8 +38,4 @@ public interface JobsServiceImplMBean {
*/
void submitCompressJob(long timestamp, String blockSize);

/**
* Execute the job that deletes expired metrics.
*/
void submitDeleteExpiredMetrics();
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,6 @@ Observable<Row> findCompressedData(MetricId<?> id, long startTime, long endTime,
<T> Observable<Row> findTempData(MetricId<T> id, long startTime, long endTime, int limit, Order order,
int pageSize);

// <T> Observable<Row> findOldData(MetricId<T> id, long startTime, long endTime, int limit, Order order,
// int pageSize);

Observable<Row> findStringData(MetricId<String> id, long startTime, long endTime, int limit, Order order,
int pageSize);

Expand Down Expand Up @@ -131,11 +128,5 @@ <T> Observable<ResultSet> deleteAndInsertCompressedGauge(MetricId<T> id, long ti
CompressedPointContainer cpc,
long sliceStart, long sliceEnd, int ttl);

<T> Observable<ResultSet> updateMetricExpirationIndex(MetricId<T> id, long expirationTime);

<T> Observable<ResultSet> deleteFromMetricExpirationIndex(MetricId<T> id);

<T> Observable<Row> findMetricExpiration(MetricId<T> id);

void shutdown();
}
Loading

0 comments on commit d328ca0

Please sign in to comment.