Skip to content

Commit

Permalink
[HWKMETRICS-52] initial support for lease management
Browse files Browse the repository at this point in the history
  • Loading branch information
John Sanda committed May 12, 2015
1 parent 2bcb046 commit 9797214
Show file tree
Hide file tree
Showing 13 changed files with 864 additions and 0 deletions.
20 changes: 20 additions & 0 deletions core/metrics-core-impl/src/main/resources/schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,23 @@ CREATE TABLE ${keyspace}.counters (
c_value counter,
PRIMARY KEY ((tenant_id, group), c_name)
);

-- #

CREATE TABLE ${keyspace}.task_queue (
task_type text,
time_slice timestamp,
segment int,
metric text,
task_details text
PRIMARY KEY ((task_type, time_slice, segment), metric)
);

CREATE TABLE ${keyspace}.leases (
time_slice timestamp,
task_type text,
segment_offset int,
owner text,
finished boolean,
PRIMARY KEY (time_slice, task_type, segment_offset)
) WITH default_time_to_live = 180;
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
<modules>
<module>core/metrics-core-api</module>
<module>core/metrics-core-impl</module>
<module>task-queue</module>

This comment has been minimized.

Copy link
@stefannegrea

stefannegrea May 12, 2015

Contributor

Should we move this module under core? Since it is a generic & core concept?

<module>embedded-cassandra/embedded-cassandra-service</module>
<module>embedded-cassandra/embedded-cassandra-ear</module>
<module>api/metrics-api-jaxrs</module>
Expand Down
65 changes: 65 additions & 0 deletions task-queue/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2014-2015 Red Hat, Inc. and/or its affiliates
and other contributors as indicated by the @author tags.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.hawkular.metrics</groupId>
<artifactId>hawkular-metrics-parent</artifactId>
<version>0.3.2-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>hawkular-metrics-task-queue</artifactId>
<name>Hawkular Metrics Task Queue</name>

<dependencies>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>${datastax.driver.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>

<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>${joda.time.version}</version>
</dependency>

<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
*
* * Copyright 2014-2015 Red Hat, Inc. and/or its affiliates
* * and other contributors as indicated by the @author tags.
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
package org.hawkular.metrics.tasks;

import static org.joda.time.DateTime.now;

import org.joda.time.DateTime;
import org.joda.time.Days;
import org.joda.time.Duration;
import org.joda.time.Hours;
import org.joda.time.Period;

/**
* @author jsanda
*/
class DateTimeService {

/**
* @return A DateTime object rounded down to the start of the current hour. For example, if the current time is
* 17:21:09, then 17:00:00 is returned.
*/
public DateTime currentHour() {
return getTimeSlice(now(), Hours.ONE.toStandardDuration());

This comment has been minimized.

Copy link
@stefannegrea

stefannegrea May 12, 2015

Contributor

We need to fix the timezone to UTC, otherwise we will run into the same issues as RHQ and the aggregation time slices.

Here is the thread (with references) from the old issue:
https://lists.fedorahosted.org/pipermail/rhq-devel/2014-November/003709.html

}

/**
* The 24 hour time slices are fix - 00:00 to 24:00. This method determines the 24 hour time slice based on
* {@link #currentHour()} and returns the start of the time slice.
*
* @return A DateTime object rounded down to the start of the current 24 hour time slice.
*/
public DateTime current24HourTimeSlice() {
return get24HourTimeSlice(currentHour());
}

/**
* This method determines the 24 hour time slice for the specified time and returns the start of that time slice.
*
* @param time The DateTime to be rounded down
* @return A DateTime rounded down to the start of the 24 hour time slice in which the time parameter falls.
* @see #current24HourTimeSlice()
*/
public DateTime get24HourTimeSlice(DateTime time) {
return getTimeSlice(time, Days.ONE.toStandardDuration());
}

public DateTime getTimeSlice(DateTime dt, Duration duration) {
Period p = duration.toPeriod();

if (p.getYears() != 0) {
return dt.yearOfEra().roundFloorCopy().minusYears(dt.getYearOfEra() % p.getYears());
} else if (p.getMonths() != 0) {
return dt.monthOfYear().roundFloorCopy().minusMonths((dt.getMonthOfYear() - 1) % p.getMonths());
} else if (p.getWeeks() != 0) {
return dt.weekOfWeekyear().roundFloorCopy().minusWeeks((dt.getWeekOfWeekyear() - 1) % p.getWeeks());
} else if (p.getDays() != 0) {
return dt.dayOfMonth().roundFloorCopy().minusDays((dt.getDayOfMonth() - 1) % p.getDays());
} else if (p.getHours() != 0) {
return dt.hourOfDay().roundFloorCopy().minusHours(dt.getHourOfDay() % p.getHours());
} else if (p.getMinutes() != 0) {
return dt.minuteOfHour().roundFloorCopy().minusMinutes(dt.getMinuteOfHour() % p.getMinutes());
} else if (p.getSeconds() != 0) {
return dt.secondOfMinute().roundFloorCopy().minusSeconds(dt.getSecondOfMinute() % p.getSeconds());
}
return dt.millisOfSecond().roundCeilingCopy().minusMillis(dt.getMillisOfSecond() % p.getMillis());
}

}
99 changes: 99 additions & 0 deletions task-queue/src/main/java/org/hawkular/metrics/tasks/Lease.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
*
* * Copyright 2014-2015 Red Hat, Inc. and/or its affiliates
* * and other contributors as indicated by the @author tags.
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
package org.hawkular.metrics.tasks;

import java.util.Objects;

import org.joda.time.DateTime;

/**
* @author jsanda
*/
public class Lease {

private DateTime timeSlice;

private TaskType taskType;

private int segmentOffset;

private String owner;

private boolean finished;

public Lease(DateTime timeSlice, TaskType taskType, int segmentOffset, String owner, boolean finished) {
this.timeSlice = timeSlice;
this.taskType = taskType;
this.segmentOffset = segmentOffset;
this.owner = owner;
this.finished = finished;
}

public DateTime getTimeSlice() {
return timeSlice;
}

public TaskType getTaskType() {
return taskType;
}

public int getSegmentOffset() {

This comment has been minimized.

Copy link
@stefannegrea

stefannegrea May 12, 2015

Contributor

Can you please explain the concept of a segment offset? And maybe sprinkle some documentation in this class too?

return segmentOffset;
}

public String getOwner() {
return owner;
}

public void setOwner(String owner) {
this.owner = owner;
}

public boolean isFinished() {
return finished;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Lease lease = (Lease) o;
return Objects.equals(segmentOffset, lease.segmentOffset) &&
Objects.equals(finished, lease.finished) &&
Objects.equals(timeSlice, lease.timeSlice) &&
Objects.equals(taskType, lease.taskType) &&
Objects.equals(owner, lease.owner);
}

@Override
public int hashCode() {
return Objects.hash(timeSlice, taskType, segmentOffset, owner, finished);
}

@Override
public String toString() {
return com.google.common.base.Objects.toStringHelper(this)
.add("timeSlice", timeSlice)
.add("taskType", taskType)
.add("segmentOffset", segmentOffset)
.add("owner", owner)
.add("finished", finished)
.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
*
* * Copyright 2014-2015 Red Hat, Inc. and/or its affiliates
* * and other contributors as indicated by the @author tags.
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
package org.hawkular.metrics.tasks;

import static java.util.stream.Collectors.toList;

import java.util.List;
import java.util.stream.StreamSupport;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.joda.time.DateTime;

/**
* @author jsanda
*/
public class LeaseManager {

public static final int DEFAULT_LEASE_TTL = 180;

private Session session;

private Queries queries;

public LeaseManager(Session session, Queries queries) {
this.session = session;
this.queries = queries;
}

public ListenableFuture<List<Lease>> findUnfinishedLeases(DateTime timeSlice) {
ResultSetFuture future = session.executeAsync(queries.findLeases.bind(timeSlice.toDate()));
return Futures.transform(future, (ResultSet resultSet) -> StreamSupport.stream(resultSet.spliterator(), false)
.map(row->new Lease(timeSlice, new TaskType(0, row.getString(0)), row.getInt(1), row.getString(2),
row.getBool(3)))
.filter(lease -> !lease.isFinished())
.collect(toList()));
}

public ListenableFuture<Boolean> acquire(Lease lease) {
ResultSetFuture future = session.executeAsync(queries.acquireLease.bind(DEFAULT_LEASE_TTL, lease.getOwner(),
lease.getTimeSlice().toDate(), lease.getTaskType().getText(), lease.getSegmentOffset()));
return Futures.transform(future, ResultSet::wasApplied);
}

public ListenableFuture<Boolean> acquire(Lease lease, int ttl) {
ResultSetFuture future = session.executeAsync(queries.acquireLease.bind(ttl, lease.getOwner(),
lease.getTimeSlice().toDate(), lease.getTaskType().getText(), lease.getSegmentOffset()));
return Futures.transform(future, ResultSet::wasApplied);
}

public ListenableFuture<Boolean> renew(Lease lease) {
ResultSetFuture future = session.executeAsync(queries.renewLease.bind(DEFAULT_LEASE_TTL, lease.getOwner(),
lease.getTimeSlice().toDate(), lease.getTaskType().getText(), lease.getSegmentOffset(),
lease.getOwner()));
return Futures.transform(future, ResultSet::wasApplied);
}

public ListenableFuture<Boolean> renew(Lease lease, int ttl) {
ResultSetFuture future = session.executeAsync(queries.renewLease.bind(ttl, lease.getOwner(),
lease.getTimeSlice().toDate(), lease.getTaskType().getText(), lease.getSegmentOffset(),
lease.getOwner()));
return Futures.transform(future, ResultSet::wasApplied);
}

public ListenableFuture<Boolean> finish(Lease lease) {
ResultSetFuture future = session.executeAsync(queries.finishLease.bind(lease.getTimeSlice().toDate(),
lease.getTaskType().getText(), lease.getSegmentOffset(), lease.getOwner()));
return Futures.transform(future, ResultSet::wasApplied);
}

}

0 comments on commit 9797214

Please sign in to comment.