Skip to content

Commit

Permalink
[HWKMETRICS-168] big refactoring to get concurrency right.
Browse files Browse the repository at this point in the history
The scheduler deals with 4 different thread pools - one for emitting ticks, one
for processing leases, one for executing tasks, and the C* driver's I/O
threads. Making sure things execute on the right thread pool turned out to be
a challenge when chaning various Rx operators together. It is not obvious like
when you explicitly submit some task to a thread pool.
  • Loading branch information
John Sanda committed Aug 6, 2015
1 parent 3af3148 commit e39e337
Show file tree
Hide file tree
Showing 13 changed files with 749 additions and 36 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@
<slf4j.version>1.7.7</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<antlr.version>4.5</antlr.version>
<rxjava.version>1.0.10</rxjava.version>
<rxjava.version>1.0.12</rxjava.version>
<dropwizard.metrics.version>3.1.2</dropwizard.metrics.version>
<rxjava-math.version>1.0.0</rxjava-math.version>

Expand Down
2 changes: 2 additions & 0 deletions schema-manager/src/main/resources/schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,9 @@ CREATE TABLE ${keyspace}.task_queue (
time_slice timestamp,
shard int,
task_id uuid,
task_name text,
task_params map<text, text>,
trigger frozen <trigger_def>,
PRIMARY KEY ((time_slice, shard), task_id)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ public long getTriggerTime() {
return triggerTime;
}

@Override
public Trigger nextTrigger() {
return new RepeatingTrigger(interval, delay, triggerTime + interval);
}

private DateTime getExecutionDateTime() {
DateTime dt = DateTime.now();
Duration duration = Duration.millis(interval);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@

import java.util.Map;

import org.hawkular.metrics.tasks.impl.Lease;
import rx.Observable;

/**
* @author jsanda
*/
public interface TaskScheduler {

// void start();
Observable<Lease> start();

Observable<Task2> createTask(String name, Map<String, String> parameters, Trigger trigger);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ public interface Trigger {

long getTriggerTime();

Trigger nextTrigger();

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
*
* @author jsanda
*/
class Lease {
public class Lease {

private long timeSlice;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class Queries {

public PreparedStatement insertIntoQueue;

public PreparedStatement getTasksFromQueue;

public PreparedStatement createTaskWithFailures;

public PreparedStatement findTasks;
Expand All @@ -57,12 +59,12 @@ public Queries(Session session) {
findLeases = session.prepare(
"SELECT shard, owner, finished FROM leases WHERE time_slice = ?");

// acquireLease = session.prepare(
// "UPDATE leases " +
// "USING TTL ? " +
// "SET owner = ? " +
// "WHERE time_slice = ? AND task_type = ? AND segment_offset = ? " +
// "IF owner = NULL");
acquireLease = session.prepare(
"UPDATE leases " +
"USING TTL ? " +
"SET owner = ? " +
"WHERE time_slice = ? AND shard = ? " +
"IF owner = NULL");
//
// renewLease = session.prepare(
// "UPDATE leases " +
Expand All @@ -71,13 +73,12 @@ public Queries(Session session) {
// "WHERE time_slice = ? AND task_type = ? AND segment_offset = ? " +
// "IF owner = ?");
//
// finishLease = session.prepare(
// "UPDATE leases " +
// "SET finished = true " +
// "WHERE time_slice = ? AND task_type = ? AND segment_offset = ? " +
// "IF owner = ?");
finishLease = session.prepare(
"UPDATE leases " +
"SET finished = true " +
"WHERE time_slice = ? AND shard = ?");
//
// deleteLeases = session.prepare("DELETE FROM leases WHERE time_slice = ?");
deleteLeases = session.prepare("DELETE FROM leases WHERE time_slice = ?");

// createTask = session.prepare(
// "INSERT INTO task_queue (task_type, tenant_id, time_slice, segment, target, sources, interval, window) " +
Expand All @@ -86,7 +87,12 @@ public Queries(Session session) {
createTask2 = session.prepare(
"INSERT INTO tasks (id, shard, name, params, trigger) VALUES (?, ?, ?, ?, ?)");

insertIntoQueue = session.prepare("INSERT INTO task_queue (time_slice, shard, task_id) VALUES (?, ?, ?)");
insertIntoQueue = session.prepare(
"INSERT INTO task_queue (time_slice, shard, task_id, task_name, task_params, trigger) " +
"VALUES (?, ?, ?, ?, ?, ?)");

getTasksFromQueue = session.prepare(
"SELECT task_id, task_name, task_params, trigger FROM task_queue WHERE time_slice = ? AND shard = ?");

// createTaskWithFailures = session.prepare(
// "INSERT INTO task_queue (task_type, tenant_id, time_slice, segment, target, sources, interval, window, " +
Expand All @@ -99,8 +105,7 @@ public Queries(Session session) {

findTask = session.prepare("SELECT shard, name, params, trigger FROM tasks WHERE id = ?");

// deleteTasks = session.prepare(
// "DELETE FROM task_queue WHERE task_type = ? AND time_slice = ? AND segment = ?");
deleteTasks = session.prepare("DELETE FROM task_queue WHERE time_slice = ? AND shard = ?");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,13 @@ public boolean equals(Object o) {
return Objects.equals(shard, task2.shard) &&
Objects.equals(id, task2.id) &&
Objects.equals(name, task2.name) &&
Objects.equals(parameters, task2.parameters);
Objects.equals(parameters, task2.parameters) &&
Objects.equals(trigger, task2.trigger);
}

@Override
public int hashCode() {
return Objects.hash(id, shard, name, parameters);
return Objects.hash(id, shard, name, parameters, trigger);
}

@Override
Expand All @@ -94,6 +95,7 @@ public String toString() {
", shard=" + shard +
", name='" + name + '\'' +
", parameters=" + parameters +
", trigger=" + trigger +
'}';
}
}

0 comments on commit e39e337

Please sign in to comment.