Skip to content

Commit

Permalink
[HWKMETRICS-130] store tenant id with tasks in task queue
Browse files Browse the repository at this point in the history
  • Loading branch information
John Sanda committed Jun 19, 2015
1 parent d29aa5d commit 89bf6b3
Show file tree
Hide file tree
Showing 12 changed files with 201 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ public Observable<Void> createMetric(Metric<?> metric) {

if (metric.getType() == COUNTER) {
TaskType generateRatesType = taskTypes.get(0);
Task task = generateRatesType.createTask(metric.getId().getName() + "$rate",
Task task = generateRatesType.createTask(metric.getTenantId(), metric.getId().getName() + "$rate",
metric.getId().getName());
taskService.scheduleTask(now(), task);
}
Expand Down Expand Up @@ -816,15 +816,15 @@ private Consumer<Task> generateRate() {
return task -> {
logger.info("Generating rate for {}", task);
// TODO Store tenant id with task
String tenantId = "rate-tenant";
MetricId id = new MetricId(task.getSources().iterator().next());
long end = task.getTimeSlice().getMillis();
long start = task.getTimeSlice().minusSeconds(task.getWindow()).getMillis();
logger.debug("start = {}, end = {}", start, end);
findCounterData(tenantId, id, start, end)
findCounterData(task.getTenantId(), id, start, end)
.take(1)
.map(dataPoint -> ((dataPoint.getValue().doubleValue() / (end - start) * 1000)))
.map(rate -> new Metric<>(tenantId, COUNTER_RATE, id, singletonList(new DataPoint<>(start, rate))))
.map(rate -> new Metric<>(task.getTenantId(), COUNTER_RATE, id,
singletonList(new DataPoint<>(start, rate))))
.flatMap(metric -> addGaugeData(Observable.just(metric)))
.subscribe(
aVoid -> {},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ public TaskType getTaskType() {
return task.getTaskType();
}

@Override
public String getTenantId() {
return task.getTenantId();
}

@Override
public String getTarget() {
return task.getTarget();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void shutdown() {

@Test
public void generateRates() throws Exception {
String tenantId = "rate-tenant";
String tenantId = "generate-rates-test";
MetricId id = new MetricId("c1");
DateTime start = dateTimeService.getTimeSlice(now(), standardSeconds(5)).plusSeconds(5);
DateTime end = start.plusSeconds(30);
Expand Down
1 change: 1 addition & 0 deletions schema-manager/src/main/resources/schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ CREATE TABLE ${keyspace}.task_queue (
task_type text,
time_slice timestamp,
segment int,
tenant_id text,
target text,
sources set<text>,
interval int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public interface Task {

TaskType getTaskType();

String getTenantId();

/**
* This is a key or identifier of the time series that is associated with the task. Consider aggregating metrics or
* events as an example. Let's say there is a task for computing a 5 minute rollup from raw
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ public TaskType setWindow(int window) {
*
* @return A {@link Task}
*/
public Task createTask(String target, String source) {
return createTask(target, source, interval, window);
public Task createTask(String tenantId, String target, String source) {
return createTask(tenantId, target, source, interval, window);
}

/**
Expand All @@ -149,8 +149,8 @@ public Task createTask(String target, String source) {
* @param window Specifies the amount of data to include, expressed as a duration
* @return A {@link Task}
*/
public Task createTask(String target, String source, int interval, int window) {
return new TaskImpl(this, null, target, source, interval, window);
public Task createTask(String tenantId, String target, String source, int interval, int window) {
return new TaskImpl(this, tenantId, null, target, source, interval, window);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,15 @@ public Queries(Session session) {
deleteLeases = session.prepare("DELETE FROM leases WHERE time_slice = ?");

createTask = session.prepare(
"INSERT INTO task_queue (task_type, time_slice, segment, target, sources, interval, window) " +
"VALUES (?, ?, ?, ?, ?, ?, ?)");
"INSERT INTO task_queue (task_type, tenant_id, time_slice, segment, target, sources, interval, window) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)");

createTaskWithFailures = session.prepare(
"INSERT INTO task_queue (task_type, time_slice, segment, target, sources, interval, window, " +
"failed_time_slices) VALUES (?, ?, ?, ?, ?, ?, ?, ?)");
"INSERT INTO task_queue (task_type, tenant_id, time_slice, segment, target, sources, interval, window, " +
"failed_time_slices) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)");

findTasks = session.prepare(
"SELECT target, sources, interval, window, failed_time_slices " +
"SELECT tenant_id, target, sources, interval, window, failed_time_slices " +
"FROM task_queue " +
"WHERE task_type = ? AND time_slice = ? AND segment = ?");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class TaskContainer implements Iterable<Task> {

private TaskType taskType;

private String tenantId;

private String target;

private Set<String> sources;
Expand All @@ -56,6 +58,7 @@ class TaskContainer implements Iterable<Task> {
public static TaskContainer copyWithoutFailures(TaskContainer container) {
TaskContainer newContainer = new TaskContainer();
newContainer.taskType = container.taskType;
newContainer.tenantId = container.tenantId;
newContainer.timeSlice = container.timeSlice;
newContainer.target = container.target;
newContainer.sources = container.sources;
Expand All @@ -69,15 +72,14 @@ public static TaskContainer copyWithoutFailures(TaskContainer container) {
private TaskContainer() {
}

public TaskContainer(TaskType taskType, DateTime timeSlice, int segment, String target, Set<String> sources, int
interval, int window, Set<DateTime> failedTimeSlices) {
public TaskContainer(TaskType taskType, String tenantId, DateTime timeSlice, int segment, String target,
Set<String> sources, int interval, int window, Set<DateTime> failedTimeSlices) {
this.taskType = taskType;
this.tenantId = tenantId;
this.timeSlice = timeSlice;
this.segment = segment;
this.target = target;
this.sources = sources;
// this.interval = Duration.standardMinutes(interval);
// this.window = Duration.standardMinutes(window);
this.interval = interval;
this.window = window;
this.failedTimeSlices.addAll(failedTimeSlices);
Expand All @@ -87,6 +89,10 @@ public TaskType getTaskType() {
return taskType;
}

public String getTenantId() {
return tenantId;
}

public String getTarget() {
return target;
}
Expand Down Expand Up @@ -133,7 +139,7 @@ public boolean hasNext() {
@Override
public Task next() {
timeSlice = timeSliceIterator.next();
return new TaskImpl(taskType, timeSlice, target, sources, interval, window);
return new TaskImpl(taskType, tenantId, timeSlice, target, sources, interval, window);
}
};
}
Expand All @@ -142,7 +148,7 @@ public Task next() {
public void forEach(Consumer<? super Task> action) {
getAllTimeSlices().forEach(time -> {
timeSlice = time;
action.accept(new TaskImpl(taskType, time, target, sources, interval, window));
action.accept(new TaskImpl(taskType, tenantId, time, target, sources, interval, window));
});
}

Expand All @@ -165,6 +171,7 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
TaskContainer task = (TaskContainer) o;
return Objects.equals(taskType, task.taskType) &&
Objects.equals(tenantId, task.tenantId) &&
Objects.equals(target, task.target) &&
Objects.equals(sources, task.sources) &&
Objects.equals(interval, task.interval) &&
Expand All @@ -175,18 +182,16 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(taskType, target, sources, interval, window, failedTimeSlices,
timeSlice);
return Objects.hash(taskType, tenantId, target, sources, interval, window, failedTimeSlices, timeSlice);
}

@Override
public String toString() {
return com.google.common.base.Objects.toStringHelper(TaskContainer.class)
.add("taskDef", taskType.getName())
.add("tenantId", tenantId)
.add("target", target)
.add("sources", sources)
// .add("interval", interval.toStandardMinutes())
// .add("window", window.toStandardMinutes())
.add("interval", interval)
.add("window", window)
.add("failedTimeSlices", failedTimeSlices)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class TaskImpl implements Task {

private TaskType taskType;

private String tenantId;

private String target;

private Set<String> sources;
Expand All @@ -41,18 +43,21 @@ public class TaskImpl implements Task {

private DateTime timeSlice;

public TaskImpl(TaskType taskType, DateTime timeSlice, String target, String source, int interval, int window) {
public TaskImpl(TaskType taskType, String tenantId, DateTime timeSlice, String target, String source, int interval,
int window) {
this.taskType = taskType;
this.tenantId = tenantId;
this.timeSlice = timeSlice;
this.target = target;
this.sources = ImmutableSet.of(source);
this.interval = interval;
this.window = window;
}

public TaskImpl(TaskType taskType, DateTime timeSlice, String target, Set<String> sources, int interval,
int window) {
public TaskImpl(TaskType taskType, String tenantId, DateTime timeSlice, String target, Set<String> sources,
int interval, int window) {
this.taskType = taskType;
this.tenantId = tenantId;
this.timeSlice = timeSlice;
this.target = target;
this.sources = sources;
Expand All @@ -65,6 +70,11 @@ public TaskType getTaskType() {
return taskType;
}

@Override
public String getTenantId() {
return tenantId;
}

@Override
public String getTarget() {
return target;
Expand Down Expand Up @@ -96,6 +106,7 @@ public boolean equals(Object o) {
if (!(o instanceof Task)) return false;
Task that = (Task) o;
return Objects.equals(taskType, that.getTaskType()) &&
Objects.equals(tenantId, that.getTenantId()) &&
Objects.equals(target, that.getTarget()) &&
Objects.equals(sources, that.getSources()) &&
Objects.equals(interval, that.getInterval()) &&
Expand All @@ -105,13 +116,14 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(taskType, target, sources, interval, window, timeSlice);
return Objects.hash(taskType, tenantId, target, sources, interval, window, timeSlice);
}

@Override
public String toString() {
return com.google.common.base.Objects.toStringHelper(TaskImpl.class)
.add("taskType", taskType.getName())
.add("tenantId", tenantId)
.add("timeSlice", timeSlice)
.add("target", target)
.add("sources", sources)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ public Observable<TaskContainer> findTasks(String type, DateTime timeSlice, int
TaskType taskType = findTaskType(type);
return rxSession.execute(queries.findTasks.bind(type, timeSlice.toDate(), segment))
.flatMap(Observable::from)
.map(row -> new TaskContainer(taskType, timeSlice, segment, row.getString(0),
row.getSet(1, String.class), row.getInt(2), row.getInt(3), row.getSet(4, Date.class).stream()
.map(row -> new TaskContainer(taskType, row.getString(0), timeSlice, segment, row.getString(1),
row.getSet(2, String.class), row.getInt(3), row.getInt(4), row.getSet(5, Date.class).stream()
.map(DateTime::new).collect(toSet())));
}

Expand All @@ -203,8 +203,8 @@ public Observable<Task> scheduleTask(DateTime time, Task task) {
DateTime currentTimeSlice = dateTimeService.getTimeSlice(time, getDuration(task.getInterval()));
DateTime timeSlice = currentTimeSlice.plus(getDuration(task.getInterval()));

return scheduleTaskAt(timeSlice, task).map(scheduledTime -> new TaskImpl(task.getTaskType(), scheduledTime,
task.getTarget(), task.getSources(), task.getInterval(), task.getWindow()));
return scheduleTaskAt(timeSlice, task).map(scheduledTime -> new TaskImpl(task.getTaskType(), task.getTenantId(),
scheduledTime, task.getTarget(), task.getSources(), task.getInterval(), task.getWindow()));
}

private Observable<TaskContainer> rescheduleTask(TaskContainer taskContainer) {
Expand All @@ -216,13 +216,13 @@ private Observable<TaskContainer> rescheduleTask(TaskContainer taskContainer) {
Observable<ResultSet> queueObservable;

if (taskContainer.getFailedTimeSlices().isEmpty()) {
queueObservable = rxSession.execute(queries.createTask.bind(taskType.getName(), nextTimeSlice.toDate(),
segment, taskContainer.getTarget(), taskContainer.getSources(), taskContainer.getInterval(),
taskContainer.getWindow()));
queueObservable = rxSession.execute(queries.createTask.bind(taskType.getName(), taskContainer.getTenantId(),
nextTimeSlice.toDate(), segment, taskContainer.getTarget(), taskContainer.getSources(),
taskContainer.getInterval(), taskContainer.getWindow()));
} else {
queueObservable = rxSession.execute(queries.createTaskWithFailures.bind(taskType.getName(),
nextTimeSlice.toDate(), segment, taskContainer.getTarget(), taskContainer.getSources(),
taskContainer.getInterval(), taskContainer.getWindow(),
taskContainer.getTenantId(), nextTimeSlice.toDate(), segment, taskContainer.getTarget(),
taskContainer.getSources(), taskContainer.getInterval(), taskContainer.getWindow(),
toDates(taskContainer.getFailedTimeSlices())));
}
Observable<ResultSet> leaseObservable = rxSession.execute(queries.createLease.bind(nextTimeSlice.toDate(),
Expand Down Expand Up @@ -250,7 +250,7 @@ private Observable<DateTime> scheduleTaskAt(DateTime time, Task task) {
int segmentOffset = (segment / segmentsPerOffset) * segmentsPerOffset;

Observable<ResultSet> queueObservable = rxSession.execute(queries.createTask.bind(taskType.getName(),
time.toDate(), segment, task.getTarget(), task.getSources(), (int) task.getInterval(),
task.getTenantId(), time.toDate(), segment, task.getTarget(), task.getSources(), task.getInterval(),
task.getWindow()));
Observable<ResultSet> leaseObservable = rxSession.execute(queries.createLease.bind(time.toDate(),
taskType.getName(), segmentOffset));
Expand Down

0 comments on commit 89bf6b3

Please sign in to comment.