Skip to content

Commit

Permalink
[HWKMETRICS-780] fix temp tables not getting dropped (#928)
Browse files Browse the repository at this point in the history
* [HWKMETRICS-763] fix bug in scheduler error handling

I found the bug today which is in SchedulerImpl.java. At some point a while back
I added retry support for when jobs fails. You provide a RetryPolicy that
determines if and when a job should be retried. This was primarily intended for
non-repeating, single-execution jobs. For repeating jobs, i.e., the compression
job, the job is supposed to be executed according to its trigger. Suppose the
compression job is scheduled to run at 15:00 and repeats every two hours. If
the 15:00 execution fails, the scheduler is supposed to retry the job (for all
intensive purposes) immediately with the same trigger time of 15:00. When the
job completes normally, its trigger will get updated in the database to the
next execution time of 17:00. If it already later than 17:00, then the
scheduler will again execute the job right away.

There was a bug with the error handling such that the job wasn't getting
retried for the 15:00 execution. The trigger was getting advanced and set to
17:00. The temp table being compressed gets dropped at the end of the
compression job. Unless the failure was dropping the table, we end up with
orphaned temp tables any time the job fails.

This commit does away with the RetryPolicy since it is not used. This
means that when a job fails, regardless of whether or not it is
repeating, it will be retried with the same trigger until it completes
normally.

* [HWKMETRICS-763] add "local" job to check for expired temp tables

Due to the bug in SchedulerImpl.java which was fixed in my previous
commit, there are openshift clusters with literally hundreds of expired
temp tables. They are expired in the sense that they are older than the
data retention and therefore do not contain any live data.

This commit adds a "local" job that checks for and drops expired tables.
The job is local in the sense that it is local to the hawkular-metrics
server. It is not run via the job scheduler.

* [HWKMETRICS-763] use Observable.empty to get rid of filter call
  • Loading branch information
John Sanda authored and jsanda committed Apr 20, 2018
1 parent 087de7e commit 4689551
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 176 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@
import javax.net.ssl.SSLContext;

import org.hawkular.metrics.api.jaxrs.config.Configurable;
import org.hawkular.metrics.api.jaxrs.config.ConfigurationKey;
import org.hawkular.metrics.api.jaxrs.config.ConfigurationProperty;
import org.hawkular.metrics.api.jaxrs.dropwizard.RESTMetrics;
import org.hawkular.metrics.api.jaxrs.log.RestLogger;
Expand All @@ -102,6 +101,7 @@
import org.hawkular.metrics.core.service.DataAccessImpl;
import org.hawkular.metrics.core.service.MetricsService;
import org.hawkular.metrics.core.service.MetricsServiceImpl;
import org.hawkular.metrics.core.service.TempTablesCleaner;
import org.hawkular.metrics.core.util.GCGraceSecondsManager;
import org.hawkular.metrics.model.CassandraStatus;
import org.hawkular.metrics.scheduler.api.Scheduler;
Expand Down Expand Up @@ -314,6 +314,7 @@ public enum State {
private ConfigurationService configurationService;
private DataAccess dataAcces;
private GCGraceSecondsManager gcGraceSecondsManager;
private TempTablesCleaner tempTablesCleaner;

MetricsServiceLifecycle() {
ThreadFactory threadFactory = r -> {
Expand Down Expand Up @@ -450,6 +451,7 @@ private void startMetricsService() {
}

initGCGraceSecondsManager();
initTempTablesCleaner();

if (Boolean.parseBoolean(jmxReportingEnabled)) {
HawkularObjectNameFactory JMXObjNameFactory = new HawkularObjectNameFactory(metricRegistry);
Expand Down Expand Up @@ -671,6 +673,12 @@ private void initGCGraceSecondsManager() {
gcGraceSecondsManager.maybeUpdateGCGraceSeconds();
}

private void initTempTablesCleaner() {
tempTablesCleaner = new TempTablesCleaner(new RxSessionImpl(session), (DataAccessImpl) dataAcces, keyspace,
Integer.parseInt(defaultTTL));
tempTablesCleaner.run();
}

private int getDefaultTTL() {
try {
return Integer.parseInt(defaultTTL);
Expand All @@ -680,22 +688,6 @@ private int getDefaultTTL() {
}
}

private int parseIntConfig(String value, ConfigurationKey configKey) {
try {
return Integer.parseInt(value);
} catch (NumberFormatException e) {
return Integer.parseInt(configKey.defaultValue());
}
}

private boolean parseBooleanConfig(String value, ConfigurationKey configKey) {
try {
return Boolean.parseBoolean(value);
} catch (NumberFormatException e) {
return Boolean.parseBoolean(configKey.defaultValue());
}
}

private void initJobsService() {

RxSession rxSession = new RxSessionImpl(session);
Expand Down Expand Up @@ -823,6 +815,8 @@ private void stopServices() {
try {
// The order here is important. We need to shutdown jobsService first so that any running jobs can finish
// gracefully.
tempTablesCleaner.shutdown();

if (jobsService != null) {
jobsService.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,17 @@
import org.hawkular.metrics.datetime.DateTimeService;
import org.hawkular.metrics.scheduler.api.JobDetails;
import org.hawkular.metrics.scheduler.api.RepeatingTrigger;
import org.hawkular.metrics.scheduler.api.RetryPolicy;
import org.hawkular.metrics.scheduler.api.Scheduler;
import org.hawkular.metrics.scheduler.api.SingleExecutionTrigger;
import org.hawkular.metrics.sysconfig.Configuration;
import org.hawkular.metrics.sysconfig.ConfigurationService;
import org.hawkular.rx.cassandra.driver.RxSession;
import org.jboss.logging.Logger;
import org.joda.time.Minutes;

import com.google.common.collect.ImmutableMap;

import rx.Completable;
import rx.Single;
import rx.functions.Func2;

/**
* @author jsanda
Expand Down Expand Up @@ -93,15 +90,7 @@ public List<JobDetails> start() {

deleteTenant = new DeleteTenant(session, metricsService);

// Use a simple retry policy to make sure tenant deletion does complete in the event of failure. For now
// we simply retry after 5 minutes. We can implement a more sophisticated strategy later on if need be.
Func2<JobDetails, Throwable, RetryPolicy> deleteTenantRetryPolicy = (details, throwable) ->
() -> {
logger.warn("Execution of " + details + " failed", throwable);
logger.info(details + " will be retried in 5 minutes");
return Minutes.minutes(5).toStandardDuration().getMillis();
};
scheduler.register(DeleteTenant.JOB_NAME, deleteTenant, deleteTenantRetryPolicy);
scheduler.register(DeleteTenant.JOB_NAME, deleteTenant);

TempTableCreator tempCreator = new TempTableCreator(metricsService, configurationService);
scheduler.register(TempTableCreator.JOB_NAME, tempCreator);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright 2014-2018 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.metrics.core.service;

import java.util.concurrent.TimeUnit;

import org.hawkular.rx.cassandra.driver.RxSession;
import org.jboss.logging.Logger;
import org.joda.time.Days;

import com.datastax.driver.core.PreparedStatement;

import rx.Observable;
import rx.schedulers.Schedulers;

/**
* @author jsanda
*/
public class TempTablesCleaner {

private static Logger logger = Logger.getLogger(TempTablesCleaner.class);

private RxSession session;

private PreparedStatement findTables;

private long ttl;

private DataAccessImpl dataAccess;

private volatile boolean finished;

private static final String DROP_TABLE_CQL = "DROP TABLE IF EXISTS %s";

public TempTablesCleaner(RxSession session, DataAccessImpl dataAccess, String keyspace, int ttl) {
this.session = session;
this.dataAccess = dataAccess;
this.ttl = Days.days(ttl).toStandardDuration().getMillis();

findTables = session.getSession().prepare(
"SELECT table_name FROM system_schema.tables WHERE keyspace_name = '" + keyspace + "'");
}

public void run() {
logger.info("Checking for expired temp tables");
Observable.interval(1, TimeUnit.DAYS, Schedulers.io())
.takeUntil(i -> finished)
.flatMap(i -> session.execute(findTables.bind()))
.compose(applyRetryPolicy())
.flatMap(Observable::from)
.filter(row -> row.getString(0).startsWith(DataAccessImpl.TEMP_TABLE_NAME_PROTOTYPE))
.map(row -> row.getString(0))
.filter(this::isTableExpired)
.flatMap(this::dropTable)
.subscribe(
table -> logger.infof("Dropped table %s", table),
t -> logger.warn("Cleaning temp tables failed", t),
() -> logger.infof("Finished cleaning expired temp tables")
);

}

public void shutdown() {
finished = true;
}

private <T> Observable.Transformer<T, T> applyRetryPolicy() {
return tObservable -> tObservable
.retryWhen(observable -> {
Integer maxRetries = Integer.getInteger("hawkular.metrics.temp-table-cleaner.max-retries", 10);
Integer maxDelay = Integer.getInteger("hawkular.metrics.temp-table-cleaner.max-delay", 300);
Observable<Integer> range = Observable.range(1, maxRetries);
Observable<Observable<?>> zipWith = observable.zipWith(range, (t, i) -> {
int delay = Math.min((int) Math.pow(2, i), maxDelay);
logger.debugf(t, "The findTables query failed. Attempting retry # %d seconds", delay);
return Observable.timer(delay, TimeUnit.SECONDS).onBackpressureDrop();
});

return Observable.merge(zipWith);
});
}

private boolean isTableExpired(String table) {
Long timestamp = dataAccess.tableToMapKey(table);
return timestamp < (System.currentTimeMillis() - ttl);
}

private Observable<String> dropTable(String table) {
return session.execute(String.format(DROP_TABLE_CQL, table))
.map(resultSet -> table)
.onErrorResumeNext(t -> {
// If there is an error, we do not retry because it is possible that the table has already been
// dropped. We will instead wait until findTables runs again and retry dropping the table then
// if dropping it did indeed fail for some reason.
logger.infof(t, "Failed to drop %s", table);
return Observable.empty();
});
}

}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2017 Red Hat, Inc. and/or its affiliates
* Copyright 2014-2018 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -21,7 +21,6 @@
import rx.Completable;
import rx.Single;
import rx.functions.Func1;
import rx.functions.Func2;

/**
* @author jsanda
Expand Down Expand Up @@ -60,17 +59,6 @@ public interface Scheduler {
*/
void register(String jobType, Func1<JobDetails, Completable> jobProducer);

/**
* Registers two functions. The first produces a job of the specfied type. The second function returns a retry
* policy that is used with non-repeating jobs when the fail.
*
* @param jobType
* @param jobProducer
* @param retryFunction
*/
void register(String jobType, Func1<JobDetails, Completable> jobProducer,
Func2<JobDetails, Throwable, RetryPolicy> retryFunction);

/**
* Start executing jobs.
*/
Expand Down

0 comments on commit 4689551

Please sign in to comment.