-
Notifications
You must be signed in to change notification settings - Fork 0
/
CorrelatedIterable.java
71 lines (59 loc) · 2.43 KB
/
CorrelatedIterable.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package com.teketik.cip;
import java.util.Iterator;
/**
* {@link Iterable} that allows sequential iteration of multiple sorted {@link Iterator}s using a common {@link CorrelationKey}.<br>
* @see CorrelatedIterables for convenient wrappers.
* @param <K> the type of the key
*/
public class CorrelatedIterable<K extends Comparable<K>> implements Iterable<CorrelatedPayload<K>> {
private final IteratorDefinition<K, ?>[] iterators;
@SafeVarargs
public CorrelatedIterable(IteratorDefinition<K, ?>... iterators) {
this.iterators = iterators;
}
@Override
public Iterator<CorrelatedPayload<K>> iterator() {
return new CorrelatedIterator();
}
private class CorrelatedIterator implements Iterator<CorrelatedPayload<K>> {
@Override
public boolean hasNext() {
for (IteratorDefinition<K, ?> iteratorDefinition : iterators) {
if (iteratorDefinition.iterator.hasNext()) {
return true;
}
}
return false;
}
@Override
public CorrelatedPayload<K> next() {
final K nextProcessKey = determineNextProcessKey();
return buildPayload(nextProcessKey);
}
private K determineNextProcessKey() {
K lowestNextKey = null;
for (IteratorDefinition<K, ?> iteratorDefinition : iterators) {
if (iteratorDefinition.iterator.hasNext()) {
final K nextItemKey = iteratorDefinition.iterator.peekNext().getKey();
if (lowestNextKey == null || nextItemKey.compareTo(lowestNextKey) < 0) {
lowestNextKey = nextItemKey;
}
}
}
return lowestNextKey;
}
private CorrelatedPayload<K> buildPayload(final K nextProcessKey) {
final CorrelatedPayload<K> correlatedPayload = new CorrelatedPayload<>(nextProcessKey);
for (IteratorDefinition<K, ?> iteratorDefinition : iterators) {
while (
iteratorDefinition.iterator.hasNext()
&&
iteratorDefinition.iterator.peekNext().getKey().equals(nextProcessKey)
) {
correlatedPayload.add(iteratorDefinition.type, iteratorDefinition.iterator.next().getValue());
}
}
return correlatedPayload;
}
}
}