Skip to content

Commit

Permalink
[HWKMETRICS-168] add method/test for scheduling task and update schema
Browse files Browse the repository at this point in the history
Pretty much all of the code prior to this ticket is commented out because it is
basically being rewritten. Tests will be added back though.
  • Loading branch information
John Sanda committed Aug 6, 2015
1 parent 6e55547 commit 3af3148
Show file tree
Hide file tree
Showing 10 changed files with 707 additions and 745 deletions.
5 changes: 2 additions & 3 deletions schema-manager/src/main/resources/schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,8 @@ CREATE TABLE ${keyspace}.task_queue (

CREATE TABLE ${keyspace}.leases (
time_slice timestamp,
task_type text,
segment_offset int,
shard int,
owner text,
finished boolean,
PRIMARY KEY (time_slice, task_type, segment_offset)
PRIMARY KEY (time_slice, shard)
);
79 changes: 21 additions & 58 deletions task-queue/src/main/java/org/hawkular/metrics/tasks/impl/Lease.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

import java.util.Objects;

import org.joda.time.DateTime;

/**
* Tasks are associated with a lease. Clients must acquire a lease before they can start executing tasks associated
* with it; however, leases are not directly exposed to clients. The actions of acquiring, renewing, and finishing a
Expand All @@ -29,100 +27,65 @@
*/
class Lease {

public static final Lease NOT_ACQUIRED = new Lease(null, null, 0, null, false);

private DateTime timeSlice;
private long timeSlice;

private String taskType;

private int segmentOffset;
private int shard;

private String owner;

private boolean finished;

public Lease(DateTime timeSlice, String taskType, int segmentOffset, String owner, boolean finished) {
public Lease(long timeSlice, int shard) {
this.timeSlice = timeSlice;
this.taskType = taskType;
this.segmentOffset = segmentOffset;
this.shard = shard;
}

public Lease(long timeSlice, int shard, String owner, boolean finished) {
this.timeSlice = timeSlice;
this.shard = shard;
this.owner = owner;
this.finished = finished;
}

/**
* The time slice for which the lease has been allocated. For example, if a client schedules a task to aggregate
* data every hour and if the current time is 13:33, then a lease will be created and persisted for 14:00.
*/
public DateTime getTimeSlice() {
public long getTimeSlice() {
return timeSlice;
}

/**
*
* @return The {@link org.hawkular.metrics.tasks.api.TaskType task type} name
*/
public String getTaskType() {
return taskType;
}

/**
* Specifies how many tasks in terms of segments are associated with this lease. Suppose there are 100 segments and
* use a segment offset size of 10. There will be 10 task segments per lease. When a client acquires this lease,
* all tasks in each of the 10 segments assigned to this lease will be executed.
*/
public int getSegmentOffset() {
return segmentOffset;
public int getShard() {
return shard;
}

/**
* The current lease owner or null if there is no owner.
*/
public String getOwner() {
return owner;
}

public Lease setOwner(String owner) {
this.owner = owner;
return this;
}

/**
* @return True if all tasks associated with the lease are finished, false otherwise.
*/
public boolean isFinished() {
return finished;
}

public Lease setFinished(boolean finished) {
this.finished = finished;
return this;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Lease lease = (Lease) o;
return Objects.equals(segmentOffset, lease.segmentOffset) &&
return Objects.equals(timeSlice, lease.timeSlice) &&
Objects.equals(shard, lease.shard) &&
Objects.equals(finished, lease.finished) &&
Objects.equals(timeSlice, lease.timeSlice) &&
Objects.equals(taskType, lease.taskType) &&
Objects.equals(owner, lease.owner);
}

@Override
public int hashCode() {
return Objects.hash(timeSlice, taskType, segmentOffset, owner, finished);
return Objects.hash(timeSlice, shard, owner, finished);
}

@Override
public String toString() {
return com.google.common.base.Objects.toStringHelper(this)
.add("timeSlice", timeSlice)
.add("taskType", taskType)
.add("segmentOffset", segmentOffset)
.add("owner", owner)
.add("finished", finished)
.toString();
return "Lease{" +
"timeSlice=" + timeSlice +
", shard=" + shard +
", owner='" + owner + '\'' +
", finished=" + finished +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,10 @@
*/
package org.hawkular.metrics.tasks.impl;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.datastax.driver.core.ResultSet;
import org.hawkular.rx.cassandra.driver.RxSession;
import org.joda.time.DateTime;
import org.slf4j.Logger;
Expand Down Expand Up @@ -69,11 +66,12 @@ void setRenewalRate(int renewalRate) {
}

public Observable<? extends List<Lease>> loadLeases(DateTime timeSlice) {
return session.execute(queries.findLeases.bind(timeSlice.toDate()))
.flatMap(Observable::from)
.map(row -> new Lease(timeSlice, row.getString(0), row.getInt(1), row.getString(2), row.getBool(3)))
.filter(lease -> !lease.isFinished())
.collect(ArrayList::new, ArrayList::add);
// return session.execute(queries.findLeases.bind(timeSlice.toDate()))
// .flatMap(Observable::from)
// .map(row -> new Lease(timeSlice, row.getString(0), row.getInt(1), row.getString(2), row.getBool(3)))
// .filter(lease -> !lease.isFinished())
// .collect(ArrayList::new, ArrayList::add);
return null;
}

public Observable<Lease> findUnfinishedLeases(DateTime timeSlice) {
Expand All @@ -82,75 +80,76 @@ public Observable<Lease> findUnfinishedLeases(DateTime timeSlice) {
// or all leases are marked finished. And to make things more interesting, we do
// not want to poll again until there is at least one worker thread (i.e., one of
// the threads executing tasks) free.
return Observable.create(subscriber ->
loadLeases(timeSlice)
.flatMap(Observable::from)
.filter(lease -> lease.getOwner() == null)
.flatMap(lease ->
acquire(lease).map(acquired ->
acquired ? lease : null))
.subscribe(subscriber::onNext, subscriber::onError, subscriber::onCompleted)
);
// return Observable.create(subscriber ->
// loadLeases(timeSlice)
// .flatMap(Observable::from)
// .filter(lease -> lease.getOwner() == null)
// .flatMap(lease ->
// acquire(lease).map(acquired ->
// acquired ? lease : null))
// .subscribe(subscriber::onNext, subscriber::onError, subscriber::onCompleted)
// );
return null;
}


public Observable<Boolean> acquire(Lease lease) {
return session.execute(queries.acquireLease.bind(ttl, lease.getOwner(), lease.getTimeSlice().toDate(),
lease.getTaskType(), lease.getSegmentOffset())).map(ResultSet::wasApplied);
}

public Observable<Boolean> acquire(Lease lease, int ttl) {
return session.execute(queries.acquireLease.bind(ttl, lease.getOwner(), lease.getTimeSlice().toDate(),
lease.getTaskType(), lease.getSegmentOffset())).map(ResultSet::wasApplied);
}

public Observable<Boolean> renew(Lease lease) {
return session.execute(queries.renewLease.bind(ttl, lease.getOwner(), lease.getTimeSlice().toDate(),
lease.getTaskType(), lease.getSegmentOffset(), lease.getOwner())).map(ResultSet::wasApplied);
}
// public Observable<Boolean> acquire(Lease lease) {
// return session.execute(queries.acquireLease.bind(ttl, lease.getOwner(), lease.getTimeSlice().toDate(),
// lease.getTaskType(), lease.getSegmentOffset())).map(ResultSet::wasApplied);
// }
//
// public Observable<Boolean> acquire(Lease lease, int ttl) {
// return session.execute(queries.acquireLease.bind(ttl, lease.getOwner(), lease.getTimeSlice().toDate(),
// lease.getTaskType(), lease.getSegmentOffset())).map(ResultSet::wasApplied);
// }
//
// public Observable<Boolean> renew(Lease lease) {
// return session.execute(queries.renewLease.bind(ttl, lease.getOwner(), lease.getTimeSlice().toDate(),
// lease.getTaskType(), lease.getSegmentOffset(), lease.getOwner())).map(ResultSet::wasApplied);
// }

/**
* Schedules the lease to be automatically renewed every {@link #DEFAULT_RENEWAL_RATE} seconds in a background
* thread. Renewals will stop once the lease is set to finished. If the lease cannot be renewed, then the lease
* owner, i.e., the calling thread, will be interrupted. It therefore important for lease owners to handle
* InterruptedExceptions appropriately.
*/
public void autoRenew(Lease lease) {
autoRenew(lease, Thread.currentThread());
}

private void autoRenew(Lease lease, Thread leaseOwner) {
renewals.schedule(createRenewRunnable(lease, leaseOwner), renewalRate, TimeUnit.SECONDS);
}

private Runnable createRenewRunnable(Lease lease, Thread leaseOwner) {
return () -> {
if (lease.isFinished()) {
return;
}
renew(lease).subscribe(
renewed -> {
if (renewed) {
autoRenew(lease, leaseOwner);
} else {
logger.info("Failed to renew " + lease + " for " + leaseOwner);
leaseOwner.interrupt();
}
},
t -> {
logger.warn("Failed to renew " + lease + " for " + leaseOwner);
// TODO figure out what to do in this scenario
});
};
}

public Observable<Boolean> finish(Lease lease) {
return session.execute(queries.finishLease.bind(lease.getTimeSlice().toDate(), lease.getTaskType(),
lease.getSegmentOffset(), lease.getOwner())).map(ResultSet::wasApplied);
}

public Observable<Void> deleteLeases(DateTime timeSlice) {
return session.execute(queries.deleteLeases.bind(timeSlice.toDate())).flatMap(resultSet -> null);
}
// public void autoRenew(Lease lease) {
// autoRenew(lease, Thread.currentThread());
// }

// private void autoRenew(Lease lease, Thread leaseOwner) {
// renewals.schedule(createRenewRunnable(lease, leaseOwner), renewalRate, TimeUnit.SECONDS);
// }

// private Runnable createRenewRunnable(Lease lease, Thread leaseOwner) {
// return () -> {
// if (lease.isFinished()) {
// return;
// }
// renew(lease).subscribe(
// renewed -> {
// if (renewed) {
// autoRenew(lease, leaseOwner);
// } else {
// logger.info("Failed to renew " + lease + " for " + leaseOwner);
// leaseOwner.interrupt();
// }
// },
// t -> {
// logger.warn("Failed to renew " + lease + " for " + leaseOwner);
// // TODO figure out what to do in this scenario
// });
// };
// }
//
// public Observable<Boolean> finish(Lease lease) {
// return session.execute(queries.finishLease.bind(lease.getTimeSlice().toDate(), lease.getTaskType(),
// lease.getSegmentOffset(), lease.getOwner())).map(ResultSet::wasApplied);
// }
//
// public Observable<Void> deleteLeases(DateTime timeSlice) {
// return session.execute(queries.deleteLeases.bind(timeSlice.toDate())).flatMap(resultSet -> null);
// }

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,32 +52,32 @@ public class Queries {

public Queries(Session session) {
createLease = session.prepare(
"INSERT INTO leases (time_slice, task_type, segment_offset) VALUES (?, ?, ?)");
"INSERT INTO leases (time_slice, shard) VALUES (?, ?)");

findLeases = session.prepare(
"SELECT task_type, segment_offset, owner, finished FROM leases WHERE time_slice = ?");

acquireLease = session.prepare(
"UPDATE leases " +
"USING TTL ? " +
"SET owner = ? " +
"WHERE time_slice = ? AND task_type = ? AND segment_offset = ? " +
"IF owner = NULL");

renewLease = session.prepare(
"UPDATE leases " +
"USING TTL ? " +
"SET owner = ? " +
"WHERE time_slice = ? AND task_type = ? AND segment_offset = ? " +
"IF owner = ?");

finishLease = session.prepare(
"UPDATE leases " +
"SET finished = true " +
"WHERE time_slice = ? AND task_type = ? AND segment_offset = ? " +
"IF owner = ?");

deleteLeases = session.prepare("DELETE FROM leases WHERE time_slice = ?");
"SELECT shard, owner, finished FROM leases WHERE time_slice = ?");

// acquireLease = session.prepare(
// "UPDATE leases " +
// "USING TTL ? " +
// "SET owner = ? " +
// "WHERE time_slice = ? AND task_type = ? AND segment_offset = ? " +
// "IF owner = NULL");
//
// renewLease = session.prepare(
// "UPDATE leases " +
// "USING TTL ? " +
// "SET owner = ? " +
// "WHERE time_slice = ? AND task_type = ? AND segment_offset = ? " +
// "IF owner = ?");
//
// finishLease = session.prepare(
// "UPDATE leases " +
// "SET finished = true " +
// "WHERE time_slice = ? AND task_type = ? AND segment_offset = ? " +
// "IF owner = ?");
//
// deleteLeases = session.prepare("DELETE FROM leases WHERE time_slice = ?");

// createTask = session.prepare(
// "INSERT INTO task_queue (task_type, tenant_id, time_slice, segment, target, sources, interval, window) " +
Expand Down

0 comments on commit 3af3148

Please sign in to comment.