-
Notifications
You must be signed in to change notification settings - Fork 214
/
SupervisedStream.java
98 lines (85 loc) · 3.33 KB
/
SupervisedStream.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
/*
* Copyright (c) 2020 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.gateway.service.streaming.actors;
import java.util.function.Consumer;
import org.apache.pekko.event.Logging;
import org.apache.pekko.stream.KillSwitch;
import org.apache.pekko.stream.KillSwitches;
import org.apache.pekko.stream.OverflowStrategy;
import org.apache.pekko.stream.javadsl.Keep;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.stream.javadsl.SourceQueueWithComplete;
/**
* Materialized value of a source queue for supervision of the stream for which the queue is a part.
* Provides a kill switch and a termination future.
*/
public interface SupervisedStream {
/**
* Create a source queue that materializes an additional value for supervision.
*
* @param queueSize size of the source queue.
* @return the source queue.
*/
static Source<SessionedJsonifiable, WithQueue> sourceQueue(final int queueSize) {
return Source.<SessionedJsonifiable>queue(queueSize, OverflowStrategy.backpressure().withLogLevel(Logging.WarningLevel()))
.viaMat(KillSwitches.single(), Keep.both())
.mapMaterializedValue(pair -> {
final SourceQueueWithComplete<SessionedJsonifiable> sourceQueue = pair.first();
final KillSwitch killSwitch = pair.second();
final SupervisedStream supervised =
new DefaultSupervisedStream(killSwitch, sourceQueue.watchCompletion());
return new WithQueue(sourceQueue, supervised);
});
}
/**
* Add a listener for stream termination.
*
* @param errorConsumer called when the stream terminates. The argument is null after a normal termination
* or an error after an abnormal termination.
*/
void whenComplete(Consumer<? super Throwable> errorConsumer);
/**
* Shutdown the supervised stream.
*/
void shutdown();
/**
* Abort the supervised stream with an error.
*
* @param error the error with which to fail the stream.
*/
void abort(Throwable error);
/**
* Materialized value containing a {@code SourceQueue} and a {@code SupervisedSource}.
*/
final class WithQueue {
private final SourceQueueWithComplete<SessionedJsonifiable> sourceQueue;
private final SupervisedStream supervisedStream;
private WithQueue(final SourceQueueWithComplete<SessionedJsonifiable> sourceQueue,
final SupervisedStream supervisedStream) {
this.sourceQueue = sourceQueue;
this.supervisedStream = supervisedStream;
}
/**
* @return the source queue.
*/
public SourceQueueWithComplete<SessionedJsonifiable> getSourceQueue() {
return sourceQueue;
}
/**
* @return the supervised source.
*/
public SupervisedStream getSupervisedStream() {
return supervisedStream;
}
}
}