Skip to content

Commit

Permalink
[HWKMETRICS-168] initial commit for new scheduler and task classes
Browse files Browse the repository at this point in the history
  • Loading branch information
John Sanda committed Aug 6, 2015
1 parent 257d051 commit b3e276f
Show file tree
Hide file tree
Showing 9 changed files with 430 additions and 0 deletions.
21 changes: 21 additions & 0 deletions schema-manager/src/main/resources/schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,27 @@ CREATE TABLE ${keyspace}.retentions_idx (

-- #

-- Task scheduler schema

CREATE TYPE ${keyspace}.trigger_def (
type int,
delay bigint,
interval bigint
);

-- #

CREATE TABLE ${keyspace}.tasks (
id uuid,
shard int,
name text,
params map<text, text>,
trigger frozen <trigger_def>,
PRIMARY KEY (id)
);

-- #

CREATE TABLE ${keyspace}.task_queue (
task_type text,
time_slice timestamp,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2014-2015 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.metrics.tasks.api;

import java.util.Objects;

/**
* @author jsanda
*/
public class RepeatingTrigger implements Trigger {

private long interval;

public RepeatingTrigger(long interval) {
this.interval = interval;
}

public long getInterval() {
return interval;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RepeatingTrigger that = (RepeatingTrigger) o;
return Objects.equals(interval, that.interval);
}

@Override
public int hashCode() {
return Objects.hash(interval);
}
}
35 changes: 35 additions & 0 deletions task-queue/src/main/java/org/hawkular/metrics/tasks/api/Task2.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2014-2015 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.metrics.tasks.api;

import java.util.Map;
import java.util.UUID;

/**
* @author jsanda
*/
public interface Task2 {

UUID getId();

String getName();

Map<String, String> getParameters();

RepeatingTrigger getTrigger();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2014-2015 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.metrics.tasks.api;

import java.util.Map;

import rx.Observable;

/**
* @author jsanda
*/
public interface TaskScheduler {

// void start();

Observable<Task2> createTask(String name, Map<String, String> parameters, RepeatingTrigger trigger);

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,14 @@ public class Queries {

public PreparedStatement createTask;

public PreparedStatement createTask2;

public PreparedStatement createTaskWithFailures;

public PreparedStatement findTasks;

public PreparedStatement findTask;

public PreparedStatement deleteTasks;

public Queries(Session session) {
Expand Down Expand Up @@ -77,6 +81,9 @@ public Queries(Session session) {
"INSERT INTO task_queue (task_type, tenant_id, time_slice, segment, target, sources, interval, window) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)");

createTask2 = session.prepare(
"INSERT INTO tasks (id, shard, name, params, trigger) VALUES (?, ?, ?, ?, ?)");

createTaskWithFailures = session.prepare(
"INSERT INTO task_queue (task_type, tenant_id, time_slice, segment, target, sources, interval, window, " +
"failed_time_slices) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)");
Expand All @@ -86,6 +93,8 @@ public Queries(Session session) {
"FROM task_queue " +
"WHERE task_type = ? AND time_slice = ? AND segment = ?");

findTask = session.prepare("SELECT shard, name, params, trigger FROM tasks WHERE id = ?");

deleteTasks = session.prepare(
"DELETE FROM task_queue WHERE task_type = ? AND time_slice = ? AND segment = ?");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright 2014-2015 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.metrics.tasks.impl;

import java.util.Map;
import java.util.Objects;
import java.util.UUID;

import com.google.common.collect.ImmutableMap;
import org.hawkular.metrics.tasks.api.RepeatingTrigger;
import org.hawkular.metrics.tasks.api.Task2;

/**
* @author jsanda
*/
public class Task2Impl implements Task2 {

private UUID id;

private int shard;

private String name;

private ImmutableMap<String, String> parameters;

private RepeatingTrigger trigger;

public Task2Impl(UUID id, int shard, String name, Map<String, String> parameters, RepeatingTrigger trigger) {
this.id = id;
this.shard = shard;
this.name = name;
this.parameters = ImmutableMap.copyOf(parameters);
this.trigger = trigger;
}

@Override
public UUID getId() {
return id;
}

public int getShard() {
return shard;
}

@Override
public String getName() {
return name;
}

@Override
public Map<String, String> getParameters() {
return parameters;
}

@Override
public RepeatingTrigger getTrigger() {
return trigger;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Task2Impl task2 = (Task2Impl) o;
return Objects.equals(shard, task2.shard) &&
Objects.equals(id, task2.id) &&
Objects.equals(name, task2.name) &&
Objects.equals(parameters, task2.parameters);
}

@Override
public int hashCode() {
return Objects.hash(id, shard, name, parameters);
}

@Override
public String toString() {
return "Task2Impl{" +
"id=" + id +
", shard=" + shard +
", name='" + name + '\'' +
", parameters=" + parameters +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright 2014-2015 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.metrics.tasks.impl;

import java.util.Map;
import java.util.UUID;

import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.UDTValue;
import com.datastax.driver.core.UserType;
import com.google.common.hash.HashCode;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import org.hawkular.metrics.tasks.api.RepeatingTrigger;
import org.hawkular.metrics.tasks.api.Task2;
import org.hawkular.metrics.tasks.api.TaskScheduler;
import org.hawkular.rx.cassandra.driver.RxSession;
import rx.Observable;

/**
* @author jsanda
*/
public class TaskSchedulerImpl implements TaskScheduler {

private int numShards = Integer.parseInt(System.getProperty("hawkular.scheduler.shards", "10"));

private HashFunction hashFunction = Hashing.murmur3_128();

private RxSession session;

private Queries queries;

public TaskSchedulerImpl(RxSession session, Queries queries) {
this.session = session;
this.queries = queries;
}

@Override
public Observable<Task2> createTask(String name, Map<String, String> parameters, RepeatingTrigger trigger) {
UUID id = UUID.randomUUID();
int shard = computeShard(id);
UserType triggerType = getKeyspace().getUserType("trigger_def");
UDTValue triggerUDT = triggerType.newValue();
triggerUDT.setInt("type", 1);
triggerUDT.setLong("interval", trigger.getInterval());

return Observable.create(subscriber ->
session.execute(queries.createTask2.bind(id, shard, name, parameters, triggerUDT)).subscribe(
resultSet -> subscriber.onNext(new Task2Impl(id, shard, name, parameters, trigger)),
t -> subscriber.onError(new RuntimeException("Failed to create task", t)),
subscriber::onCompleted
)
);
}

public Observable<Task2> findTask(UUID id) {
return Observable.create(subscriber ->
session.execute(queries.findTask.bind(id)).flatMap(Observable::from).subscribe(
row -> subscriber.onNext(new Task2Impl(id, row.getInt(0), row.getString(1),
row.getMap(2, String.class, String.class), getTrigger(row))),
t -> subscriber.onError(new RuntimeException("Failed to find task with id " + id, t)),
subscriber::onCompleted
)
);
}

private int computeShard(UUID uuid) {
HashCode hashCode = hashFunction.hashBytes(uuid.toString().getBytes());
return Hashing.consistentHash(hashCode, numShards);
}

private KeyspaceMetadata getKeyspace() {
return session.getCluster().getMetadata().getKeyspace(session.getLoggedKeyspace());
}

private RepeatingTrigger getTrigger(Row row) {
UDTValue value = row.getUDTValue(3);
return new RepeatingTrigger(value.getLong("interval"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public static void initSuite() throws Exception {

@BeforeMethod
protected void resetDB() {
session.execute("TRUNCATE tasks");
session.execute("TRUNCATE task_queue");
session.execute("TRUNCATE leases");
}
Expand Down

0 comments on commit b3e276f

Please sign in to comment.