Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possibility to manage statuses #11

Merged
merged 44 commits into from Dec 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
b892468
Remove default for status timeout
dlanza1 Dec 16, 2017
b4928a7
Issue solved
dlanza1 Dec 16, 2017
3322bd1
Remove from doc and driver
dlanza1 Dec 16, 2017
9ca392b
UpdateStatusFunction
dlanza1 Dec 16, 2017
a632132
update func
dlanza1 Dec 16, 2017
e1407cb
Optional
dlanza1 Dec 16, 2017
043be57
improv
dlanza1 Dec 16, 2017
76f8858
time and new status
dlanza1 Dec 16, 2017
9983618
improv
dlanza1 Dec 16, 2017
8917162
try
dlanza1 Dec 16, 2017
276fe2e
nullpointexcep
dlanza1 Dec 16, 2017
e183c44
improv
dlanza1 Dec 16, 2017
ade85d9
Renaming
dlanza1 Dec 16, 2017
09fea23
Implement remove in status storages
dlanza1 Dec 16, 2017
29da0d7
Driver conf
dlanza1 Dec 16, 2017
4f36971
Catch exceptions an docs
dlanza1 Dec 16, 2017
d7a1c94
improv
dlanza1 Dec 16, 2017
c1e72d2
Implement StatusesKeyReceiver
dlanza1 Dec 16, 2017
4dd41a8
store
dlanza1 Dec 16, 2017
3e4ae33
Last value is not null
dlanza1 Dec 17, 2017
a1a4172
First implementation
dlanza1 Dec 15, 2017
5d980d9
improv
dlanza1 Dec 15, 2017
3c60b3b
corred
dlanza1 Dec 15, 2017
369d1bb
Generalize
dlanza1 Dec 16, 2017
9c452be
OFF logging
dlanza1 Dec 16, 2017
d34cb40
From conf file
dlanza1 Dec 17, 2017
3be409e
solve issue
dlanza1 Dec 17, 2017
cab27c0
travis notif
dlanza1 Dec 17, 2017
4185f53
filtering
dlanza1 Dec 17, 2017
15163ab
close
dlanza1 Dec 17, 2017
8f62df4
JSON print
dlanza1 Dec 17, 2017
beb9059
Filter by ID
dlanza1 Dec 18, 2017
9a7e7ab
filter by FQCN
dlanza1 Dec 18, 2017
e851bb5
http sink
dlanza1 Dec 18, 2017
1333e8a
seria
dlanza1 Dec 18, 2017
c70dc49
print
dlanza1 Dec 18, 2017
128fcf3
running
dlanza1 Dec 18, 2017
ca9ce25
issue removing
dlanza1 Dec 18, 2017
baa85fa
issue removing
dlanza1 Dec 18, 2017
71a527b
removal
dlanza1 Dec 18, 2017
9ba44f3
remove count
dlanza1 Dec 18, 2017
4ad9ea4
issues
dlanza1 Dec 18, 2017
4cdd671
move pair
dlanza1 Dec 18, 2017
3d3256c
q
dlanza1 Dec 18, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 6 additions & 1 deletion .travis.yml
@@ -1,4 +1,9 @@
language: java

jdk:
- oraclejdk8
- oraclejdk8

notifications:
email:
on_success: change
on_failure: change
8 changes: 8 additions & 0 deletions doc/users-manual/define-metrics.md
Expand Up @@ -158,6 +158,14 @@ Metrics can be grouped by (e.g. machine) with the "metrics.groupby" parameter in
Group by can be set to ALL, then each metric will be treated independently.
If group by is configured to ALL (or all attributes the metrics contain are listed) there is no attributes to differenciate metrics and aggregate them, so aggregation is done over the historical values coming from the metric.

## Status

In order to perform the computation, previous activity needs to be stored. This is stored in a status.

You may want to list the current statuses or remove them in order to stop the generation of metrics of, for example, a specific host.

For that, please read the [statuses management documentation](statuses-management.md).

## Examples

Some examples of defined metrics can be:
Expand Down
11 changes: 10 additions & 1 deletion doc/users-manual/monitor.md
Expand Up @@ -58,4 +58,13 @@ The value of any tag can be extracted from an attribute of the analyzed metric.

```
<tag-key-1> = %<metric-key>
```
```

## Status

In order to perform the analysis, previous activity needs to be stored. This is stored in a status.

You may want to list the current statuses in order to understand the analysis results or remove the status of, for example, a specific host.

For that, please read the [statuses management documentation](statuses-management.md).

2 changes: 2 additions & 0 deletions doc/users-manual/running.md
Expand Up @@ -28,6 +28,8 @@ To run this applications you can use the following command:

```
$SPARK_HOME/bin/spark-submit \
--repositories https://repository.cloudera.com/artifactory/cloudera-repos/ \
--packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.1.0,org.reflections:reflections:0.9.9 \
--class ch.cern.spark.metrics.Driver \
target/metrics-monitor-VERSION.jar \
<path_to_conf_file>
Expand Down
70 changes: 70 additions & 0 deletions doc/users-manual/statuses-management.md
@@ -0,0 +1,70 @@
# Statuses management

In order to perform the computation in same of the components, previous activity needs to be stored. This information is stored in a object that we call "status".

You may want to list the current statuses in order to understand the results or remove the status of, for example, a specific host.

Statuses are stored in an external system for easy management.

## Storage system

Statuses must be stored externally in a statuses store. The following sections describe the options.

### Single file statuses store

```
spark.cern.streaming.status.storage.type = single-file
spark.cern.streaming.status.storage.path = <path> (default: /tmp/metrics-monitor-statuses/)
```

### Kafka statuses store

Topic should be configured with [log compaction](https://kafka.apache.org/documentation/#compaction).

```
spark.cern.streaming.status.storage.type = kafka
spark.cern.streaming.status.storage.topic = <topic>
spark.cern.streaming.status.storage.timeout = <period like 1s, 1m, 3h> (default: 2s)
spark.cern.streaming.status.storage.serialization = <java or json> (default: json)
```

## Removing statuses

The application can be configured to listen to a TCP socket from which JSON documents will be collected.

JSON documents should represent status keys. These keys will be removed.

```
statuses.removal.socket = <host:port>
```

## Statuses management: list and remove

A command line interface is available to manage statuses: list keys, see values and remove.

```
$SPARK_HOME/bin/spark-submit \
--master local \
--repositories https://repository.cloudera.com/artifactory/cloudera-repos/ \
--packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.1.0,org.reflections:reflections:0.9.9 \
--class ch.cern.spark.status.storage.manager.StatusesManagerCLI \
target/metrics-monitor-VERSION.jar \

usage: spark-statuses-manager
-c,--conf <arg> path to configuration file
-id,--id <arg> filter by status key id
-n,--fqcn <arg> filter by FQCN or alias
-p,--print <arg> print mode: java or json
-s,--save <arg> path to write result as JSON
```

--conf should be the path to the configuration file of the application

For filtering statuses you can use:
* --fqcn: defined-metric-key, monitor-key or notificator-key
* --id: defined metric or monitor id, for notificators: monitor-id:notificator-id

For removing statuses, statuses.removal.socket must be configured and this command line must be run where this property is pointing.



15 changes: 11 additions & 4 deletions doc/users-manual/users-manual.md
Expand Up @@ -25,14 +25,19 @@ The general structure of the configuration file is shown below.
checkpoint.dir = <path_to_store_stateful_data> (default: /tmp/)
spark.batch.time = <period like 1h, 3m or 45s> (default: 1m)

# Data for metrics that are not coming will expire
data.expiration = <period like 1h, 3m or 45s> (default: 30m)

# Optional
properties.source.type = <properties_source_type> (default: "file" with path to this configuration file)
properties.source.expire = <period like 1h, 3m or 45s> (default: 1m)
properties.source.<other_confs> = <value>

# Optional
# +info at components that store statuses: defined metrics, monitors and notificators
statuses.removal.socket = <host:port>

# Default statuses store
spark.cern.streaming.status.storage.type = single-file
spark.cern.streaming.status.storage.path = /tmp/metrics-monitor-statuses/

# At least one source is mandatory
metrics.source.<metric-source-id-1>.type = <metric_source_type>
metrics.source.<metric-source-id-1>.<other_confs> = <value>
Expand Down Expand Up @@ -63,7 +68,7 @@ notifications.sink.<sink-id>.type = <notifications_sink_type>
notifications.sink.<sink-id>.<other_confs> = <value>
```

### Configuration of each component
### Index

* [Properties source](properties-source.md)
* [Metrics source](metric-sources.md)
Expand All @@ -76,6 +81,8 @@ notifications.sink.<sink-id>.<other_confs> = <value>
* [Analysis results sink](analysis-results-sink.md)
* [Notifications sinks](notifications-sink.md)

* [Statuses management](statuses-management.md)

### Example of full configuration can be:

```
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/ch/cern/properties/Properties.java
Expand Up @@ -19,7 +19,7 @@
import ch.cern.components.Component.Type;
import ch.cern.components.ComponentManager;
import ch.cern.properties.source.PropertiesSource;
import ch.cern.spark.Pair;
import ch.cern.utils.Pair;
import ch.cern.utils.TimeUtils;
import scala.Tuple2;

Expand Down
67 changes: 46 additions & 21 deletions src/main/java/ch/cern/spark/PairStream.java
@@ -1,32 +1,33 @@
package ch.cern.spark;

import java.io.IOException;
import java.util.Optional;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function4;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.State;
import org.apache.spark.streaming.StateSpec;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;

import ch.cern.components.Component.Type;
import ch.cern.components.ComponentManager;
import ch.cern.properties.ConfigurationException;
import ch.cern.properties.Properties;
import ch.cern.spark.status.ActionOrValue;
import ch.cern.spark.status.ActionOrValue.Action;
import ch.cern.spark.status.StatusKey;
import ch.cern.spark.status.StatusStream;
import ch.cern.spark.status.StatusValue;
import ch.cern.spark.status.UpdateStatusFunction;
import ch.cern.spark.status.storage.StatusesStorage;
import scala.Option;
import scala.Tuple2;

public class PairStream<K, V> extends Stream<Tuple2<K, V>>{

public static final String CHECKPPOINT_DURATION_PARAM = "spark.cern.streaming.rdd.checkpoint.timeout";
public static final String CHECKPPOINT_DURATION_DEFAULT = java.time.Duration.ofMinutes(30).toString();
public static final String STATUSES_EXPIRATION_PERIOD_PARAM = "spark.cern.streaming.status.timeout";

private PairStream(JavaPairDStream<K, V> stream) {
super(stream.map(tuple -> tuple));
Expand All @@ -35,43 +36,67 @@ private PairStream(JavaPairDStream<K, V> stream) {
public static<K, V> PairStream<K, V> from(JavaPairDStream<K, V> input) {
return new PairStream<>(input);
}

public static <K, V> PairStream<K, V> fromT(JavaDStream<Tuple2<K, V>> input) {
return new PairStream<>(input.mapToPair(p -> p));
}

public static<K extends StatusKey, V, S extends StatusValue, R> StatusStream<K, V, S, R> mapWithState(
Class<K> keyClass,
Class<S> statusClass,
PairStream<K, V> input,
Function4<Time, K, Optional<V>, State<S>, Optional<R>> updateStatusFunction)
PairStream<K, V> valuesStream,
UpdateStatusFunction<K, V, S, R> updateStatusFunction,
Optional<Stream<K>> removeKeysStream)
throws ClassNotFoundException, IOException, ConfigurationException {

JavaSparkContext context = input.getSparkContext();
JavaSparkContext context = valuesStream.getSparkContext();

java.util.Optional<StatusesStorage> storageOpt = getStorage(context);
Optional<StatusesStorage> storageOpt = getStorage(context);
if(!storageOpt.isPresent())
throw new ConfigurationException("Storgae need to be configured");
throw new ConfigurationException("Storage needs to be configured");
StatusesStorage storage = storageOpt.get();

JavaRDD<Tuple2<K, S>> initialStates = storage.load(context, keyClass, statusClass);

StateSpec<K, V, S, R> statusSpec = StateSpec
.function(updateStatusFunction)
.initialState(initialStates.rdd())
.timeout(getDataExpirationPeriod(input.getSparkContext()));
StateSpec<K, ActionOrValue<V>, S, R> statusSpec = StateSpec
.function(updateStatusFunction)
.initialState(initialStates.rdd());

Option<Duration> timeout = getStatusExpirationPeriod(valuesStream.getSparkContext());
if(timeout.isDefined())
statusSpec = statusSpec.timeout(timeout.get());

StatusStream<K, V, S, R> statusStream = StatusStream.from(input.asJavaDStream()
.mapToPair(pair -> pair)
.mapWithState(statusSpec));
PairStream<K, ActionOrValue<V>> actionsAndValues = valuesStream.mapToPair(tuple -> new Tuple2<K, ActionOrValue<V>>(tuple._1, new ActionOrValue<>(tuple._2)));

if(removeKeysStream.isPresent()) {
actionsAndValues = actionsAndValues.union(
removeKeysStream.get().mapToPair(k -> new Tuple2<K, ActionOrValue<V>>(k, new ActionOrValue<>(Action.REMOVE))));

removeKeysStream.get().foreachRDD(rdd -> storage.remove(rdd));
}

StatusStream<K, V, S, R> statusStream = StatusStream.from(actionsAndValues.asJavaDStream()
.mapToPair(pair -> pair)
.mapWithState(statusSpec));

statusStream.getStatuses().foreachRDD((rdd, time) -> storage.save(rdd, time));

return statusStream;
}

private static Duration getDataExpirationPeriod(JavaSparkContext context) {
private PairStream<K, V> union(PairStream<K, V> other) {
return fromT(asJavaDStream().union(other.asJavaDStream()));
}

private static Option<Duration> getStatusExpirationPeriod(JavaSparkContext context) {
SparkConf conf = context.getConf();

String valueString = conf.get(CHECKPPOINT_DURATION_PARAM, CHECKPPOINT_DURATION_DEFAULT);
Option<String> valueString = conf.getOption(STATUSES_EXPIRATION_PERIOD_PARAM);

return new Duration(java.time.Duration.parse(valueString).toMillis());
if(valueString.isDefined())
return Option.apply(new Duration(java.time.Duration.parse(valueString.get()).toMillis()));
else
return Option.empty();
}

public JavaPairDStream<K, V> asJavaPairDStream() {
Expand Down
15 changes: 10 additions & 5 deletions src/main/java/ch/cern/spark/Stream.java
Expand Up @@ -2,18 +2,17 @@

import java.io.IOException;
import java.util.List;
import java.util.Optional;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.Function4;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.streaming.State;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;

Expand All @@ -24,6 +23,7 @@
import ch.cern.spark.status.StatusKey;
import ch.cern.spark.status.StatusStream;
import ch.cern.spark.status.StatusValue;
import ch.cern.spark.status.UpdateStatusFunction;

public class Stream<V> {

Expand All @@ -45,11 +45,12 @@ public<K extends StatusKey, S extends StatusValue, R> StatusStream<K, V, S, R> m
Class<K> keyClass,
Class<S> statusClass,
PairFlatMapFunction<V, K, V> toPairFunction,
Function4<Time, K, Optional<V>, State<S>, Optional<R>> updateStatusFunction) throws ClassNotFoundException, IOException, ConfigurationException {
Optional<Stream<K>> removeKeys,
UpdateStatusFunction<K, V, S, R> updateStatusFunction) throws ClassNotFoundException, IOException, ConfigurationException {

PairStream<K, V> keyValuePairs = toPair(toPairFunction);

return PairStream.mapWithState(keyClass, statusClass, keyValuePairs, updateStatusFunction);
return PairStream.mapWithState(keyClass, statusClass, keyValuePairs, updateStatusFunction, removeKeys);
}

public Stream<V> union(Stream<V> input) {
Expand Down Expand Up @@ -126,5 +127,9 @@ public<R> Stream<R> flatMap(FlatMapFunction<V, R> func) {
public void cache() {
stream = stream.cache();
}

public<K, T> PairStream<K, T> mapToPair(PairFunction<V, K, T> func) {
return PairStream.from(stream.mapToPair(func));
}

}
2 changes: 1 addition & 1 deletion src/main/java/ch/cern/spark/http/HTTPSink.java
Expand Up @@ -133,7 +133,7 @@ public void sink(Stream<?> outputStream) {
thrownExceptions.add(thrownException);

if(!thrownExceptions.isEmpty())
throw new IOException("Same batches could not be sent, details in logs. Exceptions: " + thrownExceptions);
LOG.error(new IOException("Same batches could not be sent. Exceptions: " + thrownExceptions));
});
});
}
Expand Down