-
Notifications
You must be signed in to change notification settings - Fork 214
/
ResumeSourceBuilder.java
170 lines (154 loc) · 5.3 KB
/
ResumeSourceBuilder.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
/*
* Copyright (c) 2020 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.internal.utils.pekko.controlflow;
import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import org.apache.pekko.NotUsed;
import org.apache.pekko.stream.javadsl.Source;
/**
* Builder of the resume source.
*
* @since 1.1.0
*/
public final class ResumeSourceBuilder<S, E> {
Duration minBackoff = Duration.ofSeconds(1L);
Duration maxBackoff = Duration.ofMinutes(2L);
int maxRestarts = -1;
Duration recovery = Duration.ofMinutes(5L);
S initialSeed;
Function<S, Source<E, ?>> resume;
int lookBehind = 1;
Function<List<E>, S> nextSeed;
Function<Throwable, Optional<Throwable>> mapError = error -> Optional.empty();
ResumeSourceBuilder() {}
/**
* Create a source that resumes based on a seed computed from the final N elements that passed through the stream.
* Resumption is delayed by exponential backoff with a fixed recovery interval.
* <ul>
* <li>Emits when: stream created by {@code resume} emits.</li>
* <li>Completes when: a stream created by {@code resume} completes.</li>
* <li>Cancels when: downstream cancels.</li>
* <li>Fails when: more than {@code maxRestarts} streams created by {@code resume} failed with a recoverable
* error, or any stream created by {@code resume} failed with an unrecoverable error. Whether an exception
* is recoverable is determined by {@code mapError}.</li>
* </ul>
*
* @return the resume-source.
*/
public Source<E, NotUsed> build() {
checkNotNull(minBackoff, "minBackoff");
checkNotNull(maxBackoff, "maxBackoff");
checkNotNull(recovery, "recovery");
checkNotNull(initialSeed, "initialSeed");
checkNotNull(resume, "resume");
checkNotNull(nextSeed, "nextSeed");
checkNotNull(mapError, "mapError");
return ResumeSource.build(this);
}
/**
* Set the minimum backoff after a recoverable error.
*
* @param minBackoff the minimum backoff.
* @return this builder.
*/
public ResumeSourceBuilder<S, E> minBackoff(final Duration minBackoff) {
this.minBackoff = minBackoff;
return this;
}
/**
* Set the maximum backoff after a recoverable error.
*
* @param maxBackoff the maximum backoff.
* @return this builder.
*/
public ResumeSourceBuilder<S, E> maxBackoff(final Duration maxBackoff) {
this.maxBackoff = maxBackoff;
return this;
}
/**
* Set the maximum number of restarts before failing the stream. 0 means no recovery and a negative number
* means unlimited recovery.
*
* @param maxRestarts the number of restarts to attempt.
* @return this builder.
*/
public ResumeSourceBuilder<S, E> maxRestarts(final int maxRestarts) {
this.maxRestarts = maxRestarts;
return this;
}
/**
* Set the recovery period. If so much time passes without error then the backoff is reset.
*
* @param recovery the recovery period.
* @return this builder.
*/
public ResumeSourceBuilder<S, E> recovery(final Duration recovery) {
this.recovery = recovery;
return this;
}
/**
* Set the initial seed to start the stream.
*
* @param initialSeed the seed.
* @return this builder.
*/
public ResumeSourceBuilder<S, E> initialSeed(final S initialSeed) {
this.initialSeed = initialSeed;
return this;
}
/**
* Create or resume a stream from a seed.
*
* @param resume the resumption function.
* @return this builder.
*/
public ResumeSourceBuilder<S, E> resume(final Function<S, Source<E, ?>> resume) {
this.resume = resume;
return this;
}
/**
* Set how many stream elements to keep for resumption.
*
* @param lookBehind the number of elements to keep.
* @return this builder.
*/
public ResumeSourceBuilder<S, E> lookBehind(final int lookBehind) {
this.lookBehind = lookBehind;
return this;
}
/**
* Compute the next seed from the last N elements to pass through the stream.
*
* @param nextSeed the function to compute the next seed.
* @return this builder.
*/
public ResumeSourceBuilder<S, E> nextSeed(final Function<List<E>, S> nextSeed) {
this.nextSeed = nextSeed;
return this;
}
/**
* Set a function that maps recoverable errors to the empty optional and non-recoverable errors to
* errors the source should fail with.
* If not set, all errors are considered recoverable.
*
* @param mapError the function.
* @return this builder.
*/
public ResumeSourceBuilder<S, E> mapError(final Function<Throwable, Optional<Throwable>> mapError) {
this.mapError = mapError;
return this;
}
}