Skip to content

Commit

Permalink
[HWKMETRICS-168] adding group_key and exec_order columns to task_queu…
Browse files Browse the repository at this point in the history
…e table

These columns help if/when tasks have interdependencies. All tasks having the
same group key will be stored in the same queue, which means that they will be
associated with the same lease. The exec_order column defines an execution
order for tasks within the same group. Tasks with a lower number are executed
first.

The commit also adds/updates TaskSchedulerTest to use RxJava's TestScheduler.
It took me a good bit of time over the weekend to understand how to set things
up, but it was well worth the effort. Tests will be much more reliable,
consistent, and faster as they use a virtual clock.
  • Loading branch information
John Sanda committed Aug 6, 2015
1 parent bdec66d commit 24517e2
Show file tree
Hide file tree
Showing 9 changed files with 485 additions and 217 deletions.
9 changes: 6 additions & 3 deletions schema-manager/src/main/resources/schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,9 @@ CREATE TYPE ${keyspace}.trigger_def (

CREATE TABLE ${keyspace}.tasks (
id uuid,
shard int,
group_key text,
name text,
exec_order int,
params map<text, text>,
trigger frozen <trigger_def>,
PRIMARY KEY (id)
Expand All @@ -180,11 +181,13 @@ CREATE TABLE ${keyspace}.tasks (
CREATE TABLE ${keyspace}.task_queue (
time_slice timestamp,
shard int,
task_id uuid,
group_key text,
exec_order int,
task_name text,
task_id uuid,
task_params map<text, text>,
trigger frozen <trigger_def>,
PRIMARY KEY ((time_slice, shard), task_id)
PRIMARY KEY ((time_slice, shard), group_key, exec_order, task_name, task_id)
);

-- #
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright 2014-2015 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.metrics.tasks.api;

import java.util.Objects;
import java.util.concurrent.TimeUnit;

import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;

/**
* @author jsanda
*/
public class SingleExecutionTrigger implements Trigger {

private long triggerTime;

public static class Builder {

private long delay;

private Long triggerTime;

public Builder withDelay(long delay, TimeUnit timeUnit) {
this.delay = TimeUnit.MILLISECONDS.convert(delay, timeUnit);
return this;
}

public Builder withTriggerTime(long time) {
this.triggerTime = time;
return this;
}

public SingleExecutionTrigger build() {
SingleExecutionTrigger trigger = new SingleExecutionTrigger();
if (triggerTime == null) {
trigger.triggerTime = getExecutionDateTime(System.currentTimeMillis()).getMillis();
}
trigger.triggerTime += delay;
return trigger;
}
}

private SingleExecutionTrigger() {
}

public SingleExecutionTrigger(long triggerTime) {
this.triggerTime = getExecutionDateTime(triggerTime).getMillis();
}

@Override
public long getTriggerTime() {
return triggerTime;
}

@Override
public Trigger nextTrigger() {
return null;
}

private static DateTime getExecutionDateTime(long time) {
DateTime dt = new DateTime(time);
Duration duration = Duration.millis(1000);
Period p = duration.toPeriod();

if (p.getYears() != 0) {
return dt.yearOfEra().roundFloorCopy().minusYears(dt.getYearOfEra() % p.getYears());
} else if (p.getMonths() != 0) {
return dt.monthOfYear().roundFloorCopy().minusMonths((dt.getMonthOfYear() - 1) % p.getMonths());
} else if (p.getWeeks() != 0) {
return dt.weekOfWeekyear().roundFloorCopy().minusWeeks((dt.getWeekOfWeekyear() - 1) % p.getWeeks());
} else if (p.getDays() != 0) {
return dt.dayOfMonth().roundFloorCopy().minusDays((dt.getDayOfMonth() - 1) % p.getDays());
} else if (p.getHours() != 0) {
return dt.hourOfDay().roundFloorCopy().minusHours(dt.getHourOfDay() % p.getHours());
} else if (p.getMinutes() != 0) {
return dt.minuteOfHour().roundFloorCopy().minusMinutes(dt.getMinuteOfHour() % p.getMinutes());
} else if (p.getSeconds() != 0) {
return dt.secondOfMinute().roundFloorCopy().minusSeconds(dt.getSecondOfMinute() % p.getSeconds());
}
return dt.millisOfSecond().roundCeilingCopy().minusMillis(dt.getMillisOfSecond() % p.getMillis());
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SingleExecutionTrigger that = (SingleExecutionTrigger) o;
return Objects.equals(triggerTime, that.triggerTime);
}

@Override
public int hashCode() {
return Objects.hash(triggerTime);
}

@Override
public String toString() {
return "SingleExecutionTrigger{" +
"triggerTime=" + triggerTime +
'}';
}
}
40 changes: 40 additions & 0 deletions task-queue/src/main/java/org/hawkular/metrics/tasks/api/Task2.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,52 @@
*/
public interface Task2 {

/**
* This is essentially a primary key that uniquely identifies a task.
*/
UUID getId();

/**
* <p>
* The group key provides a way to logically and physically group tasks. In conjunction with the
* {@link #getOrder() priority} it can be used to control the order of execution of tasks which is desirable
* when there are interdependencies between tasks. For example, suppose we have a task, T1, that aggregates data
* from multiple time series to produce a new time series. Then we have a task, T2, that aggregates the data from
* the time series produced by T2. T2 in effect depends on T1. They should use the same group key.
* </p>
* <p>
* For Hawkular the group key will generally be the tenant ID. It is a good choice because tenants and all of their
* data are isolated from one another.
* </p>
* <p>
* In terms of implementation, all tasks with the same group key will be stored in the same queue shard, which
* means that they will all be stored within the same physical partition. This should be taken into consideration
* when choosing a group key because you do not want to wind up with queue shards/partitions that are excessively
* large.
* </p>
*/
String getGroupKey();

/**
* Identifies the type of task. Multiple tasks can have the same name.
*/
String getName();

/**
* Defines the order of execution of tasks within a group. Lower values are executed first. Note that this
* ordering only applies to tasks within the same group, which is defined by the {@link #getGroupKey() group key}.
*/
int getOrder();

/**
* An optional, arbitrary set of key/value parameters that the task receives upon execution.
*/
Map<String, String> getParameters();

/**
* Defines the execution policy of the task, like when the task is scheduled to execute, how many times it should
* execute, etc.
*/
Trigger getTrigger();

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,16 @@ public Queries(Session session) {
// "VALUES (?, ?, ?, ?, ?, ?, ?, ?)");

createTask2 = session.prepare(
"INSERT INTO tasks (id, shard, name, params, trigger) VALUES (?, ?, ?, ?, ?)");
"INSERT INTO tasks (id, group_key, exec_order, name, params, trigger) VALUES (?, ?, ?, ?, ?, ?)");

insertIntoQueue = session.prepare(
"INSERT INTO task_queue (time_slice, shard, task_id, task_name, task_params, trigger) " +
"VALUES (?, ?, ?, ?, ?, ?)");
"INSERT INTO task_queue (time_slice, shard, task_id, group_key, exec_order, task_name, task_params, " +
"trigger) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?)");

getTasksFromQueue = session.prepare(
"SELECT task_id, task_name, task_params, trigger FROM task_queue WHERE time_slice = ? AND shard = ?");
"SELECT group_key, exec_order, task_id, task_name, task_params, trigger " +
"FROM task_queue " +
"WHERE time_slice = ? AND shard = ?");

// createTaskWithFailures = session.prepare(
// "INSERT INTO task_queue (task_type, tenant_id, time_slice, segment, target, sources, interval, window, " +
Expand All @@ -103,7 +105,7 @@ public Queries(Session session) {
// "FROM task_queue " +
// "WHERE task_type = ? AND time_slice = ? AND segment = ?");

findTask = session.prepare("SELECT shard, name, params, trigger FROM tasks WHERE id = ?");
findTask = session.prepare("SELECT group_key, exec_order, name, params, trigger FROM tasks WHERE id = ?");

deleteTasks = session.prepare("DELETE FROM task_queue WHERE time_slice = ? AND shard = ?");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,21 @@ public class Task2Impl implements Task2 {

private UUID id;

private int shard;
private String groupKey;

private int order;

private String name;

private ImmutableMap<String, String> parameters;

private Trigger trigger;

public Task2Impl(UUID id, int shard, String name, Map<String, String> parameters, Trigger trigger) {
public Task2Impl(UUID id, String groupKey, int order, String name, Map<String, String> parameters,
Trigger trigger) {
this.id = id;
this.shard = shard;
this.groupKey = groupKey;
this.order = order;
this.name = name;
this.parameters = ImmutableMap.copyOf(parameters);
this.trigger = trigger;
Expand All @@ -52,8 +56,14 @@ public UUID getId() {
return id;
}

public int getShard() {
return shard;
@Override
public String getGroupKey() {
return groupKey;
}

@Override
public int getOrder() {
return order;
}

@Override
Expand All @@ -76,23 +86,25 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Task2Impl task2 = (Task2Impl) o;
return Objects.equals(shard, task2.shard) &&
return Objects.equals(order, task2.order) &&
Objects.equals(id, task2.id) &&
Objects.equals(groupKey, task2.groupKey) &&
Objects.equals(name, task2.name) &&
Objects.equals(parameters, task2.parameters) &&
Objects.equals(trigger, task2.trigger);
}

@Override
public int hashCode() {
return Objects.hash(id, shard, name, parameters, trigger);
return Objects.hash(id, groupKey, order, name, parameters, trigger);
}

@Override
public String toString() {
return "Task2Impl{" +
"id=" + id +
", shard=" + shard +
", groupKey='" + groupKey + '\'' +
", order=" + order +
", name='" + name + '\'' +
", parameters=" + parameters +
", trigger=" + trigger +
Expand Down

0 comments on commit 24517e2

Please sign in to comment.