Skip to content

Commit

Permalink
[HWKMETRICS-52] initial support for scheduling and retrieving tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
John Sanda committed May 12, 2015
1 parent 9797214 commit dbb682c
Show file tree
Hide file tree
Showing 14 changed files with 421 additions and 145 deletions.
12 changes: 8 additions & 4 deletions core/metrics-core-impl/src/main/resources/schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -166,16 +166,20 @@ CREATE TABLE ${keyspace}.task_queue (
task_type text,
time_slice timestamp,
segment int,
metric text,
task_details text
PRIMARY KEY ((task_type, time_slice, segment), metric)
target text,
sources set<text>,
interval int,
window int,
PRIMARY KEY ((task_type, time_slice, segment), target)
);

-- #

CREATE TABLE ${keyspace}.leases (
time_slice timestamp,
task_type text,
segment_offset int,
owner text,
finished boolean,
PRIMARY KEY (time_slice, task_type, segment_offset)
) WITH default_time_to_live = 180;
);
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ public class Lease {

private DateTime timeSlice;

private TaskType taskType;
private String taskType;

private int segmentOffset;

private String owner;

private boolean finished;

public Lease(DateTime timeSlice, TaskType taskType, int segmentOffset, String owner, boolean finished) {
public Lease(DateTime timeSlice, String taskType, int segmentOffset, String owner, boolean finished) {
this.timeSlice = timeSlice;
this.taskType = taskType;
this.segmentOffset = segmentOffset;
Expand All @@ -49,7 +49,7 @@ public DateTime getTimeSlice() {
return timeSlice;
}

public TaskType getTaskType() {
public String getTaskType() {
return taskType;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,41 +49,40 @@ public LeaseManager(Session session, Queries queries) {
public ListenableFuture<List<Lease>> findUnfinishedLeases(DateTime timeSlice) {
ResultSetFuture future = session.executeAsync(queries.findLeases.bind(timeSlice.toDate()));
return Futures.transform(future, (ResultSet resultSet) -> StreamSupport.stream(resultSet.spliterator(), false)
.map(row->new Lease(timeSlice, new TaskType(0, row.getString(0)), row.getInt(1), row.getString(2),
row.getBool(3)))
.map(row->new Lease(timeSlice, row.getString(0), row.getInt(1), row.getString(2), row.getBool(3)))
.filter(lease -> !lease.isFinished())
.collect(toList()));
}

public ListenableFuture<Boolean> acquire(Lease lease) {
ResultSetFuture future = session.executeAsync(queries.acquireLease.bind(DEFAULT_LEASE_TTL, lease.getOwner(),
lease.getTimeSlice().toDate(), lease.getTaskType().getText(), lease.getSegmentOffset()));
lease.getTimeSlice().toDate(), lease.getTaskType(), lease.getSegmentOffset()));
return Futures.transform(future, ResultSet::wasApplied);
}

public ListenableFuture<Boolean> acquire(Lease lease, int ttl) {
ResultSetFuture future = session.executeAsync(queries.acquireLease.bind(ttl, lease.getOwner(),
lease.getTimeSlice().toDate(), lease.getTaskType().getText(), lease.getSegmentOffset()));
lease.getTimeSlice().toDate(), lease.getTaskType(), lease.getSegmentOffset()));
return Futures.transform(future, ResultSet::wasApplied);
}

public ListenableFuture<Boolean> renew(Lease lease) {
ResultSetFuture future = session.executeAsync(queries.renewLease.bind(DEFAULT_LEASE_TTL, lease.getOwner(),
lease.getTimeSlice().toDate(), lease.getTaskType().getText(), lease.getSegmentOffset(),
lease.getTimeSlice().toDate(), lease.getTaskType(), lease.getSegmentOffset(),
lease.getOwner()));
return Futures.transform(future, ResultSet::wasApplied);
}

public ListenableFuture<Boolean> renew(Lease lease, int ttl) {
ResultSetFuture future = session.executeAsync(queries.renewLease.bind(ttl, lease.getOwner(),
lease.getTimeSlice().toDate(), lease.getTaskType().getText(), lease.getSegmentOffset(),
lease.getTimeSlice().toDate(), lease.getTaskType(), lease.getSegmentOffset(),
lease.getOwner()));
return Futures.transform(future, ResultSet::wasApplied);
}

public ListenableFuture<Boolean> finish(Lease lease) {
ResultSetFuture future = session.executeAsync(queries.finishLease.bind(lease.getTimeSlice().toDate(),
lease.getTaskType().getText(), lease.getSegmentOffset(), lease.getOwner()));
lease.getTaskType(), lease.getSegmentOffset(), lease.getOwner()));
return Futures.transform(future, ResultSet::wasApplied);
}

Expand Down
13 changes: 13 additions & 0 deletions task-queue/src/main/java/org/hawkular/metrics/tasks/Queries.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ public class Queries {

public PreparedStatement finishLease;

public PreparedStatement createTask;

public PreparedStatement findTasks;

public Queries(Session session) {
createLease = session.prepare(
"INSERT INTO leases (time_slice, task_type, segment_offset) VALUES (?, ?, ?)");
Expand All @@ -62,6 +66,15 @@ public Queries(Session session) {
"SET finished = true " +
"WHERE time_slice = ? AND task_type = ? AND segment_offset = ? " +
"IF owner = ?");

createTask = session.prepare(
"INSERT INTO task_queue (task_type, time_slice, segment, target, sources, interval, window) " +
"VALUES (?, ?, ?, ?, ?, ?, ?)");

findTasks = session.prepare(
"SELECT target, sources, interval, window " +
"FROM task_queue " +
"WHERE task_type = ? AND time_slice = ? AND segment = ?");
}

}
29 changes: 12 additions & 17 deletions task-queue/src/main/java/org/hawkular/metrics/tasks/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,10 @@

import static org.joda.time.DateTime.now;

import java.util.Comparator;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.joda.time.DateTime;
import org.joda.time.Seconds;

Expand All @@ -37,7 +32,7 @@
*/
public class Scheduler {

private static final Comparator<TaskType> TASK_TYPE_COMPARATOR = Comparator.comparing(TaskType::getPriority);
// private static final Comparator<TaskType> TASK_TYPE_COMPARATOR = Comparator.comparing(TaskType::getPriority);

private int numWorkers;

Expand All @@ -47,23 +42,23 @@ public class Scheduler {

private DateTimeService dateTimeService;

private Set<TaskType> taskTypes;
// private Set<TaskType> taskTypes;

public Scheduler(int numWorkers, Set<TaskType> taskTypes) {
this.numWorkers = numWorkers;
workersPool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numWorkers));
executor = Executors.newSingleThreadScheduledExecutor();
dateTimeService = new DateTimeService();
this.taskTypes = taskTypes;
}
// public Scheduler(int numWorkers, Set<TaskType> taskTypes) {
// this.numWorkers = numWorkers;
// workersPool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numWorkers));
// executor = Executors.newSingleThreadScheduledExecutor();
// dateTimeService = new DateTimeService();
// this.taskTypes = taskTypes;
// }

public void start() {
executor.scheduleAtFixedRate(() -> {
for (int i = 0; i < numWorkers; ++i) {
DateTime timeSlice = dateTimeService.getTimeSlice(now(), Seconds.ONE.toStandardDuration());
PriorityQueue<TaskType> typesQueue = new PriorityQueue<>(TASK_TYPE_COMPARATOR);
typesQueue.addAll(taskTypes);
workersPool.submit(new Worker(timeSlice, typesQueue));
// PriorityQueue<TaskType> typesQueue = new PriorityQueue<>(TASK_TYPE_COMPARATOR);
// typesQueue.addAll(taskTypes);
// workersPool.submit(new Worker(timeSlice, typesQueue));
}
}, 1, 1, TimeUnit.SECONDS);
}
Expand Down
99 changes: 99 additions & 0 deletions task-queue/src/main/java/org/hawkular/metrics/tasks/Task.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
*
* * Copyright 2014-2015 Red Hat, Inc. and/or its affiliates
* * and other contributors as indicated by the @author tags.
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
package org.hawkular.metrics.tasks;

import static org.joda.time.Minutes.minutes;

import java.util.Objects;
import java.util.Set;

import com.google.common.collect.ImmutableSet;
import org.joda.time.Duration;

/**
* @author jsanda
*/
public class Task {

private TaskDef taskDef;

private String target;

private Set<String> sources;

private Duration interval;

private Duration window;

public Task() {
}

public Task(TaskDef taskDef, String target, Set<String> sources, int interval, int window) {
this.taskDef = taskDef;
this.target = target;
this.sources = sources;
this.interval = minutes(interval).toStandardDuration();
this.window = minutes(window).toStandardDuration();
}

public Task(TaskDef taskDef, String target, String source, int interval, int window) {
this.taskDef = taskDef;
this.target = target;
this.sources = ImmutableSet.of(source);
this.interval = minutes(interval).toStandardDuration();
this.window = minutes(window).toStandardDuration();
}

public TaskDef getTaskDef() {
return taskDef;
}

public String getTarget() {
return target;
}

public Set<String> getSources() {
return sources;
}

public Duration getInterval() {
return interval;
}

public Duration getWindow() {
return window;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Task task = (Task) o;
return Objects.equals(taskDef, task.taskDef) &&
Objects.equals(target, task.target) &&
Objects.equals(sources, task.sources) &&
Objects.equals(interval, task.interval) &&
Objects.equals(window, task.window);
}

@Override
public int hashCode() {
return Objects.hash(taskDef, target, sources, interval, window);
}
}
86 changes: 86 additions & 0 deletions task-queue/src/main/java/org/hawkular/metrics/tasks/TaskDef.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
*
* * Copyright 2014-2015 Red Hat, Inc. and/or its affiliates
* * and other contributors as indicated by the @author tags.
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
package org.hawkular.metrics.tasks;

import java.util.Objects;

import com.google.common.base.Supplier;

/**
* @author jsanda
*/
public class TaskDef {

private String name;

private Supplier<Runnable> factory;

private int segments;

private int segmentOffsets;

public String getName() {
return name;
}

public TaskDef setName(String name) {
this.name = name;
return this;
}

public Supplier<Runnable> getFactory() {
return factory;
}

public TaskDef setFactory(Supplier<Runnable> factory) {
this.factory = factory;
return this;
}

public int getSegments() {
return segments;
}

public TaskDef setSegments(int segments) {
this.segments = segments;
return this;
}

public int getSegmentOffsets() {
return segmentOffsets;
}

public TaskDef setSegmentOffsets(int segmentOffsets) {
this.segmentOffsets = segmentOffsets;
return this;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TaskDef taskDef = (TaskDef) o;
return Objects.equals(name, taskDef.name);
}

@Override
public int hashCode() {
return Objects.hash(name);
}
}

0 comments on commit dbb682c

Please sign in to comment.