-
Notifications
You must be signed in to change notification settings - Fork 43
/
TStream.java
1451 lines (1395 loc) · 58.6 KB
/
TStream.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
# Licensed Materials - Property of IBM
# Copyright IBM Corp. 2015
*/
package com.ibm.streamsx.topology;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import com.ibm.streamsx.topology.builder.BInputPort;
import com.ibm.streamsx.topology.builder.BOperatorInvocation;
import com.ibm.streamsx.topology.builder.BOutput;
import com.ibm.streamsx.topology.consistent.ConsistentRegionConfig;
import com.ibm.streamsx.topology.context.Placeable;
import com.ibm.streamsx.topology.function.BiFunction;
import com.ibm.streamsx.topology.function.Consumer;
import com.ibm.streamsx.topology.function.Function;
import com.ibm.streamsx.topology.function.Predicate;
import com.ibm.streamsx.topology.function.Supplier;
import com.ibm.streamsx.topology.function.ToIntFunction;
import com.ibm.streamsx.topology.function.UnaryOperator;
/**
* A {@code TStream} is a declaration of a continuous sequence of tuples. A
* connected topology of streams and functional transformations is built using
* {@link Topology}. <BR>
* Generic methods on this interface provide the ability to
* {@link #filter(Predicate) filter}, {@link #transform(Function)
* transform} or {@link #forEach(Consumer) sink} this declared stream using a
* function. <BR>
* Utility methods in the {@code com.ibm.streams.topology.streams} package
* provide specific source streams, or transformations on streams with specific
* types.
* <P>
* {@code TStream} implements {@link Placeable} to allow placement
* directives against the processing that produced this stream.
* For example, calling a {@code Placeable} method on the stream
* returned from {@link #filter(Predicate)} will apply to the
* container that is executing the {@code Predicate} passed into {@code filter()}.
* <BR>
* When multiple streams are produced by a method (e.g. {@link #split(int, ToIntFunction)}
* placement directives are common to all of the produced streams.
* </P>
*
* @param <T>
* Tuple type, any instance of {@code T} at runtime must be
* serializable.
*/
public interface TStream<T> extends TopologyElement, Placeable<TStream<T>> {
/**
* Enumeration for routing tuples to parallel channels.
* @see TStream#parallel(Supplier, Routing)
*/
public enum Routing {
/**
* Tuples will be routed to parallel channels such that an even
* distribution is maintained.
*/
ROUND_ROBIN,
/**
* Tuples will be consistently routed to the same channel based upon
* their key. The key is obtained through:
* <UL>
* <LI>A function called against each tuple when using {@link TStream#parallel(Supplier, Function)}</LI>
* <LI>The {@link com.ibm.streamsx.topology.logic.Logic#identity() identity function} when using {@link TStream#parallel(Supplier, Routing)}</LI>
* </UL>
* The key for a {@code t} is the return from {@code keyer.apply(t)}.
* <BR>
* Any two tuples {@code t1} and {@code t2} will appear on
* the same channel if for their keys {@code k1} and {@code k2}
* {@code k1.equals(k2)} is true.
* <BR>
* If {@code k1} and {@code k2} are not equal then there is
* no guarantee about which channels {@code t1} and {@code t2}
* will appear on, they may end up on the same or different channels.
* <BR>
* The assumption is made that
* the key classes correctly implement the contract for {@code equals} and
* {@code hashCode()}.
*/
KEY_PARTITIONED,
/**
* Tuples will be consistently routed to the same channel based upon
* their {@code hashCode()}.
*/
HASH_PARTITIONED,
/**
* Tuples are broadcast to all channels.
* For example with a width of four each tuple on the stream results
* in four tuples, one per channel.
*
* @since 1.9
*/
BROADCAST
};
/**
* Declare a new stream that filters tuples from this stream. Each tuple
* {@code t} on this stream will appear in the returned stream if
* {@link Predicate#test(Object) filter.test(t)} returns {@code true}. If
* {@code filter.test(t)} returns {@code false} then then {@code t} will not
* appear in the returned stream.
* <P>
* Examples of filtering out all empty strings from stream {@code s} of type
* {@code String}
*
* <pre>
* <code>
* // Java 8 - Using lambda expression
* TStream<String> s = ...
* TStream<String> filtered = s.filter(t -> !t.isEmpty());
*
* // Java 7 - Using anonymous class
* TStream<String> s = ...
* TStream<String> filtered = s.filter(new Predicate<String>() {
* @Override
* public boolean test(String t) {
* return !t.isEmpty();
* }} );
* </code>
* </pre>
*
* </P>
*
* @param filter
* Filtering logic to be executed against each tuple.
* @return Filtered stream
* @see #split(int, ToIntFunction)
*/
TStream<T> filter(Predicate<T> filter);
/**
* Distribute a stream's tuples among {@code n} streams
* as specified by a {@code splitter}.
*
* <P>
* For each tuple on the stream, {@code splitter.applyAsInt(tuple)} is called.
* The return value {@code r} determines the destination stream:
* <pre>
* if r < 0 the tuple is discarded
* else it is sent to the stream at position (r % n) in the returned array.
* </pre>
* </P>
*
* <P>
* Each split {@code TStream} is exposed by the API. The user
* has full control over the each stream's processing pipeline.
* Each stream's pipeline must be declared explicitly.
* Each stream can have different processing pipelines.
* </P>
* <P>
* An N-way {@code split()} is logically equivalent to a
* collection of N {@code filter()} invocations, each with a
* {@code predicate} to select the tuples for its stream.
* {@code split()} is more efficient. Each tuple is analyzed only once
* by a single {@code splitter} instance to identify the destination stream.
* For example, these are logically equivalent:
* <pre>
* List<TStream<String>> streams = stream.split(2, mySplitter());
*
* TStream<String> stream0 = stream.filter(myPredicate("ch0"));
* TStream<String> stream1 = stream.filter(myPredicate("ch1"));
* </pre>
* </P>
* <P>
* {@link #parallel(Supplier, Routing)} also distributes a stream's
* tuples among N-channels but it presents a different usage model.
* The user specifies a single logical processing pipeline and
* the logical pipeline is transparently replicated for each of the channels.
* The API does not provide access to the individual channels in
* the logical stream.
* {@link #endParallel()} declares the end of the parallel pipeline and combines
* all of the channels into a single resulting stream.
* </P>
* <P>
* Example of splitting a stream of tuples by their severity
* attribute:
* <pre>
* <code>
* interface MyType { String severity; ... };
* TStream<MyType> s = ...
* List<<TStream<MyType>> splits = s.split(3, new ToIntFunction<MyType>() {
* @Override
* public int applyAsInt(MyType tuple) {
* if(tuple.severity.equals("high"))
* return 0;
* else if(tuple.severity.equals("low"))
* return 1;
* else
* return 2;
* }} );
* splits.get(0). ... // high severity processing pipeline
* splits.get(1). ... // low severity processing pipeline
* splits.get(2). ... // "other" severity processing pipeline
* </code>
* </pre>
* </P>
* @param n the number of output streams
* @param splitter the splitter function
* @return List of {@code n} streams
*
* @throws IllegalArgumentException if {@code n <= 0}
* @see #parallel(Supplier, Routing)
*/
List<TStream<T>> split(int n, ToIntFunction<T> splitter);
/**
* Declare a new stream that transforms each tuple from this stream into one
* (or zero) tuple of a different type {@code U}. For each tuple {@code t}
* on this stream, the returned stream will contain a tuple that is the
* result of {@code transformer.apply(t)} when the return is not {@code null}.
* If {@code transformer.apply(t)} returns {@code null} then no tuple
* is submitted to the returned stream for {@code t}.
*
* <P>
* Examples of transforming a stream containing numeric values as
* {@code String} objects into a stream of {@code Double} values.
*
* <pre>
* <code>
* // Java 8 - Using lambda expression
* TStream<String> strings = ...
* TStream<Double> doubles = strings.transform(v -> Double.valueOf(v));
*
* // Java 8 - Using method reference
* TStream<String> strings = ...
* TStream<Double> doubles = strings.transform(Double::valueOf);
*
* // Java 7 - Using anonymous class
* TStream<String> strings = ...
* TStream<Double> doubles = strings.transform(new Function<String, Double>() {
* @Override
* public Double apply(String v) {
* return Double.valueOf(v);
* }});
* </code>
* </pre>
*
* </P>
* <P>
* This function is equivalent to {@link #map(Function)}.
* </P>
* @param transformer
* Transformation logic to be executed against each tuple.
* @return Stream that will contain tuples of type {@code U} transformed from this
* stream's tuples.
*/
<U> TStream<U> transform(Function<T, U> transformer);
/**
* Declare a new stream that maps each tuple from this stream into one
* (or zero) tuple of a different type {@code U}. For each tuple {@code t}
* on this stream, the returned stream will contain a tuple that is the
* result of {@code mapper.apply(t)} when the return is not {@code null}.
* If {@code mapper.apply(t)} returns {@code null} then no tuple
* is submitted to the returned stream for {@code t}.
*
* <P>
* Examples of mapping a stream containing numeric values as
* {@code String} objects into a stream of {@code Double} values.
*
* <pre>
* <code>
* // Java 8 - Using lambda expression
* TStream<String> strings = ...
* TStream<Double> doubles = strings.map(v -> Double.valueOf(v));
*
* // Java 8 - Using method reference
* TStream<String> strings = ...
* TStream<Double> doubles = strings.map(Double::valueOf);
*
* // Java 7 - Using anonymous class
* TStream<String> strings = ...
* TStream<Double> doubles = strings.map(new Function<String, Double>() {
* @Override
* public Double apply(String v) {
* return Double.valueOf(v);
* }});
* </code>
* </pre>
*
* </P>
* <P>
* This function is equivalent to {@link #transform(Function)}.
* The typical term in most apis is {@code map}.
* </P>
* @param mapper
* Mapping logic to be executed against each tuple.
* @return Stream that will contain tuples of type {@code U} mapped from this
* stream's tuples.
*
* @since 1.7
*/
<U> TStream<U> map(Function<T, U> mapper);
/**
* Declare a new stream that modifies each tuple from this stream into one
* (or zero) tuple of the same type {@code T}. For each tuple {@code t}
* on this stream, the returned stream will contain a tuple that is the
* result of {@code modifier.apply(t)} when the return is not {@code null}.
* The function may return the same reference as its input {@code t} or
* a different object of the same type.
* If {@code modifier.apply(t)} returns {@code null} then no tuple
* is submitted to the returned stream for {@code t}.
*
* <P>
* Example of modifying a stream {@code String} values by adding the suffix '{@code extra}'.
*
* <pre>
* <code>
* TStream<String> strings = ...
* TStream<String> modifiedStrings = strings.modify(new UnaryOperator<String>() {
* @Override
* public String apply(String tuple) {
* return tuple.concat("extra");
* }});
* </code>
* </pre>
*
* </P>
* <P>
* This method is equivalent to
* {@code transform(Function<T,T> modifier}).
* </P>
*
* @param modifier
* Modifier logic to be executed against each tuple.
* @return Stream that will contain tuples of type {@code T} modified from this
* stream's tuples.
*/
TStream<T> modify(UnaryOperator<T> modifier);
/**
* Declare a new stream that maps tuples from this stream into one or
* more (or zero) tuples of a different type {@code U}. For each tuple
* {@code t} on this stream, the returned stream will contain all non-null tuples in
* the {@code Iterator<U>} that is the result of {@code mapper.apply(t)}.
* Tuples will be added to the returned stream in the order the iterator
* returns them.
*
* <BR>
* If the return is null or an empty iterator then no tuples are added to
* the returned stream for input tuple {@code t}.
* <P>
* Examples of transforming a stream containing lines of text into a stream
* of words split out from each line. The order of the words in the stream
* will match the order of the words in the lines.
*
* <pre>
* <code>
* // Java 8 - Using lambda expression
* TStream<String> lines = ...
* TStream<String> words = lines.multiTransform(
* line -> Arrays.asList(line.split(" ")));
*
* // Java 7 - Using anonymous class
* TStream<String> lines = ...
* TStream<String> words = lines.multiTransform(new Function<String, Iterable<String>>() {
* @Override
* public Iterable<String> apply(String line) {
* return Arrays.asList(line.split(" "));
* }});
* </code>
* </pre>
*
* </P>
*
* @param mapper
* Mapper logic to be executed against each tuple.
* @return Stream that will contain tuples of type {@code U} mapped from this
* stream's tuples.
*
* @since 1.7
*/
<U> TStream<U> flatMap(Function<T, Iterable<U>> mapper);
/**
* Declare a new stream that maps tuples from this stream into one or
* more (or zero) tuples of a different type {@code U}.
* <P>
* This function is equivalent to {@link #flatMap(Function)}.
* </P>
* @param transformer Mapper logic to be executed against each tuple.
* @return Stream that will contain tuples of type {@code U} mapped from this
* stream's tuples.
*/
<U> TStream<U> multiTransform(Function<T, Iterable<U>> transformer);
/**
* Sink (terminate) this stream. For each tuple {@code t} on this stream
* {@link Consumer#accept(Object) action.accept(t)} will be called. This is
* typically used to send information to external systems, such as databases
* or dashboards.
* <P>
* Example of terminating a stream of {@code String} tuples by printing them
* to {@code System.out}.
*
* <pre>
* <code>
* TStream<String> values = ...
* values.forEach(new Consumer<String>() {
*
* @Override
* public void accept(String tuple) {
* System.out.println(tuple);
*
* }
* });
* </code>
* </pre>
*
* </P>
*
* @param action
* Action to be executed against each tuple on this stream.
* @return the sink element
*
* @since 1.7
*/
TSink forEach(Consumer<T> action);
/**
* Terminate this stream.
* <P>
* This function is equivalent to {@link #forEach(Consumer)}.
* </P>
* @param sinker Action to be executed against each tuple on this stream.
* @return the sink element
*/
TSink sink(Consumer<T> sinker);
/**
* Create a stream that is a union of this stream and {@code other} stream
* of the same type {@code T}. Any tuple on this stream or {@code other}
* will appear on the returned stream. <BR>
* No ordering of tuples across this stream and {@code other} is defined,
* thus the return stream is unordered.
* <BR>
* If {@code other} is this stream or keyed version of this stream
* then {@code this} is returned as a stream cannot be unioned with itself.
*
* @param other
* Stream to union with this stream.
* @return Stream that will contain tuples from this stream and
* {@code other}.
*/
TStream<T> union(TStream<T> other);
/**
* Create a stream that is a union of this stream and {@code others} streams
* of the same type {@code T}. Any tuple on this stream or any of
* {@code others} will appear on the returned stream. <BR>
* No ordering of tuples across this stream and {@code others} is defined,
* thus the return stream is unordered. <BR>
* If others does not contain any streams then {@code this} is returned.
* <BR>
* A stream or a keyed version of a stream cannot be unioned with itself,
* so any stream that is represented multiple times in {@code others}
* or this stream will be reduced to a single copy of itself.
* <BR>
* In the case that no stream is to be unioned with this stream
* then {@code this} is returned (for example, {@code others}
* is empty or only contains the same logical stream as {@code this}.
*
* @param others
* Streams to union with this stream.
* @return Stream containing tuples from this stream and {@code others}.
*/
TStream<T> union(Set<TStream<T>> others);
/**
* Print each tuple on {@code System.out}. For each tuple {@code t} on this
* stream {@code System.out.println(t.toString())} will be called.
*/
TSink print();
/**
* Class of the tuples on this stream, if known.
* Will be the same as {@link #getTupleType()}
* if it is a {@code Class} object.
* @return Class of the tuple on this stream, {@code null}
* if {@link #getTupleType()} is not a {@code Class} object.
*/
Class<T> getTupleClass();
/**
* Type of the tuples on this stream.
* Can be null if no type knowledge can be determined.
*
* @return Type of the tuples on this stream,
* {@code null} if no type knowledge could be determined
*/
Type getTupleType();
/**
* Join this stream with window of type {@code U}. For each tuple on this
* stream, it is joined with the contents of {@code window}. Each tuple is
* passed into {@code joiner} and the return value is submitted to the
* returned stream. If call returns null then no tuple is submitted.
*
* @param joiner Join function.
* @return A stream that is the results of joining this stream with
* {@code window}.
*/
<J, U> TStream<J> join(TWindow<U,?> window,
BiFunction<T, List<U>, J> joiner);
/**
* Join this stream with a partitioned window of type {@code U} with key type {@code K}.
* For each tuple on this stream, it is joined with the contents of {@code window}
* for the key {@code keyer.apply(tuple)}. Each tuple is
* passed into {@code joiner} and the return value is submitted to the
* returned stream. If call returns null then no tuple is submitted.
*
* @param keyer Key function for this stream to match the window's key.
* @param window Keyed window to join this stream with.
* @param joiner Join function.
* @return A stream that is the results of joining this stream with
* {@code window}.
*/
<J, U, K> TStream<J> join(
Function<T,K> keyer,
TWindow<U,K> window,
BiFunction<T, List<U>, J> joiner);
/**
* Join this stream with the last tuple seen on a stream of type {@code U}
* with partitioning.
* For each tuple on this
* stream, it is joined with the last tuple seen on {@code lastStream}
* with a matching key (of type {@code K}).
* <BR>
* Each tuple {@code t} on this stream will match the last tuple
* {@code u} on {@code lastStream} if
* {@code keyer.apply(t).equals(lastStreamKeyer.apply(u))}
* is true.
* <BR>
* The assumption is made that
* the key classes correctly implement the contract for {@code equals} and
* {@code hashCode()}.
* <P>Each tuple is
* passed into {@code joiner} and the return value is submitted to the
* returned stream. If call returns null then no tuple is submitted.
* </P>
* @param keyer Key function for this stream
* @param lastStream Stream to join with.
* @param lastStreamKeyer Key function for {@code lastStream}
* @param joiner Join function.
* @return A stream that is the results of joining this stream with
* {@code lastStream}.
*/
<J,U,K> TStream<J> joinLast(
Function<? super T, ? extends K> keyer,
TStream<U> lastStream,
Function<? super U, ? extends K> lastStreamKeyer,
BiFunction<T, U, J> joiner);
/**
* Join this stream with the last tuple seen on a stream of type {@code U}.
* For each tuple on this
* stream, it is joined with the last tuple seen on {@code lastStream}. Each tuple is
* passed into {@code joiner} and the return value is submitted to the
* returned stream. If call returns null then no tuple is submitted.
* <BR>
* This is a simplified version of {@link #join(TWindow, BiFunction)}
* where instead the window contents are passed as a single tuple of type {@code U}
* rather than a list containing one tuple. If no tuple has been seen on {@code lastStream}
* then {@code null} will be passed as the second argument to {@code joiner}.
*
* @param lastStream Stream to join with.
* @param joiner Join function.
* @return A stream that is the results of joining this stream with
* {@code lastStream}.
*/
<J,U> TStream<J> joinLast(
TStream<U> lastStream,
BiFunction<T, U, J> joiner);
/**
* Declare a {@link TWindow} that continually represents the last {@code time} seconds
* of tuples (in the given time {@code unit}) on this stream.
* If no tuples have been seen on the stream in the last {@code time} seconds
* then the window will be empty.
* <BR>
* The window has a single partition that always contains the
* last {@code time} seconds of tuples seen on this stream
* <BR>
* A key based partitioned window can be created from the returned window
* using {@link TWindow#key(Function)} or {@link TWindow#key()}.
* When the window is partitioned each partition independently maintains the last {@code time}
* seconds of tuples for each key seen on this stream.
*
* @param time Time size of the window
* @param unit Unit for {@code time}
* @return Window on this stream representing the last {@code time} seconds.
*/
TWindow<T,Object> last(long time, TimeUnit unit);
/**
* Declare a {@link TWindow} that continually represents the last {@code time} seconds
* of tuples on this stream.
* Same as {@link #last(long,TimeUnit)} except the {@code time} is
* specified with a {@code Supplier<Integer>} such as one created
* by {@link Topology#createSubmissionParameter(String, Class)}.
*
* @param time Time size of the window in seconds
* @return Window on this stream representing the last {@code time} seconds.
*/
TWindow<T,Object> lastSeconds(Supplier<Integer> time);
/**
* Declare a {@link TWindow} that continually represents the last {@code count} tuples
* seen on this stream.
* Same as {@link #last(int)} except the {@code count} is
* specified with a {@code Supplier<Integer>} such as one created
* by {@link Topology#createSubmissionParameter(String, Class)}.
*
* @param count Tuple size of the window
* @return Window on this stream representing the last {@code count} tuples.
*/
TWindow<T,Object> last(Supplier<Integer> count);
/**
* Declare a {@link TWindow} that continually represents the last {@code count} tuples
* seen on this stream. If the stream has not yet seen {@code count}
* tuples then it will contain all of the tuples seen on the stream,
* which will be less than {@code count}. If no tuples have been
* seen on the stream then the window will be empty.
* <BR>
* The window has a single partition that always contains the
* last {@code count} tuples seen on this stream.
* <BR>
* The window has a single partition that always contains the last tuple seen
* on this stream.
* <BR>
* A key based partitioned window can be created from the returned window
* using {@link TWindow#key(Function)} or {@link TWindow#key()}.
* When the window is partitioned each partition independently maintains the
* last {@code count} tuples for each key seen on this stream.
*
* @param count Tuple size of the window
* @return Window on this stream representing the last {@code count} tuples.
*/
TWindow<T,Object> last(int count);
/**
* Declare a {@link TWindow} that continually represents the last tuple on this stream.
* If no tuples have been seen on the stream then the window will be empty.
* <BR>
* The window has a single partition that always contains the last tuple seen
* on this stream.
* <BR>
* A key based partitioned window can be created from the returned window
* using {@link TWindow#key(Function)} or {@link TWindow#key()}.
* When the window is partitioned each partition independently maintains the
* last tuple for each key seen on this stream.
*
* @return Window on this stream representing the last tuple.
*/
TWindow<T,Object> last();
/**
* Declare a {@link TWindow} on this stream that has the same configuration
* as another window.
* <BR>
* The window has a single partition.
* <BR>
* A key based partitioned window can be created from the returned window
* using {@link TWindow#key(Function)} or {@link TWindow#key()}.
*
* @param configWindow
* Window to copy the configuration from.
* @return Window on this stream with the same configuration as {@code configWindow}.
*/
TWindow<T,Object> window(TWindow<?,?> configWindow);
/**
* Publish tuples from this stream for consumption by other IBM Streams applications.
*
* Applications consume published streams using:
* <UL>
* <LI>
* {@link Topology#subscribe(String, Class)} for Java Streams applications.</LI>
* <LI>
* {@code com.ibm.streamsx.topology.topic::Subscribe} operator for SPL
* Streams applications.</LI>
* <LI>
* {@code com.ibm.streamsx.topology.topic::FilteredSubscribe} operator for SPL
* Streams applications subscribing to a subset of the published tuples.</LI>
* </UL>
* <BR>
* A subscriber matches to a publisher if:
* <UL>
* <LI>
* The topic name is an exact match, and:</LI>
* <LI>
* For JSON streams ({@code TStream<JSONObject>}) the subscription is to
* a JSON stream.
* </LI>
* <LI>
* For Java streams ({@code TStream<T>}) the declared Java type ({@code T}
* ) of the stream is an exact match.</LI>
* <LI>
* For {@link com.ibm.streamsx.topology.spl.SPLStream SPL streams} the {@link com.ibm.streamsx.topology.spl.SPLStream#getSchema() SPL
* schema} is an exact match.</LI>
* </UL>
* <BR>
* This method is identical to {@link #publish(String, boolean) publish(topic, false)}.
* <P>
* A topic name:
* <UL>
* <LI>must not be zero length</LI>
* <LI>must not contain the nul character ({@code \u0000})</LI>
* <LI>must not contain wild card characters number sign ({@code ‘#’ \u0023})
* or the plus sign ({@code ‘+’ \u002B})</LI>
* </UL>
* The forward slash ({@code ‘/’ \u002F}) is used to separate each level within a topic
* tree and provide a hierarchical structure to the topic names.
* The use of the topic level separator is significant when either of the
* two wildcard characters is encountered in topic filters specified
* by subscribing applications. Topic level separators can appear anywhere
* in a topic filter or topic name. Adjacent topic level separators indicate
* a zero length topic level.
* </P>
* <p>
* The type of the stream must be known to ensure that
* {@link Topology#subscribe(String, Class) subscribers}
* match on the Java type. Where possible the type of a
* stream is determined automatically, but due to Java type
* erasure this is not always possible. A stream can be
* assigned its type using {@link #asType(Class)}.
* For example, with a stream containing tuples of type
* {@code Location} it can be correctly published as follows:
* <pre>
* <code>
* TStream<Location> locations = ...
* locations.asType(Location.class).publish("location/bus");
* </code>
* </pre>
* </p>
*
* @param topic Topic name to publish tuples to.
*
* @exception IllegalStateException Type of the stream is not known.
*
* @see Topology#subscribe(String, Class)
* @see com.ibm.streamsx.topology.spl.SPLStreams#subscribe(TopologyElement, String, com.ibm.streams.operator.StreamSchema)
*/
void publish(String topic);
/**
* Publish tuples from this stream for consumption by other IBM Streams applications.
*
* Differs from {@link #publish(String)} in that it
* supports {@code topic} as a submission time parameter, for example
* using the topic defined by the submission parameter {@code eventTopic}:
*
* <pre>
* <code>
* TStream<String> events = ...
* Supplier<String> topicParam = topology.createSubmissionParameter("eventTopic", String.class);
* topology.publish(topicParam);
* </code>
* </pre>
*
* @param topic Topic name to publish tuples to.
*
* @see #publish(String)
*
* @since 1.8
*/
void publish(Supplier<String> topic);
/**
* Publish tuples from this stream for consumption by other IBM Streams applications.
*
* Applications consume published streams using:
* <UL>
* <LI>
* {@link Topology#subscribe(String, Class)} for Java Streams applications.</LI>
* <LI>
* {@code com.ibm.streamsx.topology.topic::Subscribe} operator for SPL
* Streams applications.</LI>
* </UL>
* These tuple types allow publish-subscribe across IBM Streams applications
* implemented in different languages:
* <UL>
* <LI>{@code TStream<JSONObject>} - JSON tuples,
* SPL schema of {@link com.ibm.streamsx.topology.spl.SPLSchemas#JSON Json}.</LI>
* <LI>{@code TStream<String>} - String tuples,
* SPL schema of {@link com.ibm.streamsx.topology.spl.SPLSchemas#STRING String}.</LI>
* <LI>{@code TStream<com.ibm.streams.operator.types.XML>},
* SPL schema of {@link com.ibm.streamsx.topology.spl.SPLSchemas#XML Xml}. </LI>
* <LI>{@code TStream<com.ibm.streams.operator.types.Blob>},
* SPL schema of {@link com.ibm.streamsx.topology.spl.SPLSchemas#BLOB Blob}. </LI>
* </UL>
* <P>
* <BR>
* A subscriber matches to a publisher if:
* <UL>
* <LI>
* The topic name is an exact match, and:</LI>
* <LI>
* For JSON streams ({@code TStream<JSONObject>}) the subscription is to
* a JSON stream.
* </LI>
* <LI>
* For Java streams ({@code TStream<T>}) the declared Java type ({@code T}
* ) of the stream is an exact match.</LI>
* <LI>
* For {@link com.ibm.streamsx.topology.spl.SPLStream SPL streams} the {@link com.ibm.streamsx.topology.spl.SPLStream#getSchema() SPL
* schema} is an exact match.</LI>
* </UL>
* </P>
* <P>
* {@code allowFilter} specifies if SPL filters can be pushed from a subscribing
* application to the publishing application. Executing filters on the publishing
* side reduces network communication between the publisher and the subscriber.
* <BR>
* When {@code allowFilter} is {@code false} SPL filters cannot be pushed to
* the publishing application.
* <BR>
* When {@code allowFilter} is {@code true} SPL filters are executed in the
* publishing applications.
* <BR>
* Regardless of the setting of {@code allowFilter} an invocation of
* {@link Topology#subscribe(String, Class)} or
* {@code com.ibm.streamsx.topology.topic::Subscribe}
* subscribes to all published tuples.
* <BR>
* {@code allowFilter} can only be set to true for:
* <UL>
* <LI>This stream is an instance of {@link com.ibm.streamsx.topology.spl.SPLStream}.</LI>
* <LI>This stream is an instance of {@code TStream<String>}.</LI>
* </UL>
* </P>
* <P>
* A topic name:
* <UL>
* <LI>must not be zero length</LI>
* <LI>must not contain the nul character ({@code \u0000})</LI>
* <LI>must not contain wild card characters number sign ({@code ‘#’ \u0023})
* or the plus sign ({@code ‘+’ \u002B})</LI>
* </UL>
* The forward slash ({@code ‘/’ \u002F}) is used to separate each level within a topic
* tree and provide a hierarchical structure to the topic names.
* The use of the topic level separator is significant when either of the
* two wildcard characters is encountered in topic filters specified
* by subscribing applications. Topic level separators can appear anywhere
* in a topic filter or topic name. Adjacent topic level separators indicate
* a zero length topic level.
* </P>
* <p>
* The type of the stream must be known to ensure that
* {@link Topology#subscribe(String, Class) subscribers}
* match on the Java type. Where possible the type of a
* stream is determined automatically, but due to Java type
* erasure this is not always possible. A stream can be
* assigned its type using {@link #asType(Class)}.
* For example, with a stream containing tuples of type
* {@code Location} it can be correctly published as follows:
* <pre>
* <code>
* TStream<Location> locations = ...
* locations.asType(Location.class).publish("location/bus");
* </code>
* </pre>
* </p>
*
* @param topic Topic name to publish tuples to.
* @param allowFilter Allow SPL filters specified by SPL application to be executed
* in the publishing application.
*
* @exception IllegalStateException Type of the stream is not known.
*
* @see Topology#subscribe(String, Class)
* @see #asType(Class)
* @see com.ibm.streamsx.topology.spl.SPLStreams#subscribe(TopologyElement, String, com.ibm.streams.operator.StreamSchema)
*/
void publish(String topic, boolean allowFilter);
/**
* Publish tuples from this stream for consumption by other IBM Streams applications.
*
* Differs from {@link #publish(String, boolean)} in that it
* supports {@code topic} as a submission time parameter, for example
* using the topic defined by the submission parameter {@code eventTopic}:
*
* <pre>
* <code>
* TStream<String> events = ...
* Supplier<String> topicParam = topology.createSubmissionParameter("eventTopic", String.class);
* topology.publish(topicParam, false);
* </code>
* </pre>
*
* @param topic Topic name to publish tuples to.
* @param allowFilter Allow SPL filters specified by SPL application to be executed.
*
* @see #publish(String, boolean)
*
* @since 1.8
*/
void publish(Supplier<String> topic, boolean allowFilter);
/**
* Parallelizes the stream into a a fixed
* number of parallel channels using round-robin distribution.
* <BR>
* Tuples are routed to the parallel channels in a
* {@link Routing#ROUND_ROBIN round-robin fashion}.
* <BR>
* Subsequent transformations on the returned stream will be executed
* {@code width} channels until {@link #endParallel()} is called or
* the stream terminates.
* <br>
* See {@link #parallel(Supplier, Routing)} for more information.
* @param width
* The degree of parallelism in the parallel region.
* @return A reference to a stream for which subsequent transformations will be
* executed in parallel using {@code width} channels.
*/
TStream<T> parallel(int width);
/**
* Parallelizes the stream into {@code width} parallel channels.
* Same as {@link #parallel(int)} except the {@code width} is
* specified with a {@code Supplier<Integer>} such as one created
* by {@link Topology#createSubmissionParameter(String, Class)}.
*
* @param width
* The degree of parallelism in the parallel region.
* @return A reference to a stream for which subsequent transformations will be
* executed in parallel using {@code width} channels.
*/
TStream<T> parallel(Supplier<Integer> width);
/**
* Parallelizes the stream into {@code width} parallel channels. Tuples are routed
* to the parallel channels based on the {@link Routing} parameter.
* <BR><BR>
* If {@link Routing#ROUND_ROBIN}
* is specified the tuples are routed to parallel channels such that an
* even distribution is maintained.
* <BR>
* If {@link Routing#HASH_PARTITIONED} is specified then the
* {@code hashCode()} of the tuple is used to route the tuple to a corresponding
* channel, so that all tuples with the same hash code are sent to the same channel.
* <BR>
* If {@link Routing#KEY_PARTITIONED} is specified each tuple is
* is taken to be its own key and is
* routed so that all tuples with the same key are sent to the same channel.
* This is equivalent to calling {@link #parallel(Supplier, Function)} with
* an identity function.
* <br><br>
* Source operations may be parallelized as well, refer to {@link TStream#setParallel(Supplier)} for more information.
* <br><br>
* Given the following code:
* <pre>
* <code>
* TStream<String> myStream = topology.source(...);
* TStream<String> parallelStart = myStream.parallel(of(3), TStream.Routing.ROUND_ROBIN);
* TStream<String> inParallel = parallelStart.map(...);
* TStream<String> joinedParallelStreams = inParallel.endParallel();
* joinedParallelStreams.print();
* </code>
* </pre>
*
* The following graph is created:
* <br>
* <img src="doc-files/Diagram2.jpg" width = 500/>
* <br>
* <br>
* Calling {@code parallel(3)} creates three parallel channels. Each of the 3 channels contains separate
* instantiations of the operations (in this case, just <b>map</b>) declared in the region. Such stream operations are
* run in parallel as follows:
* <br>
*
* <style>
table, th, td {
border: 1px solid black;
border-collapse: collapse;
}
th, td {
padding: 5px;
}
th {
text-align: left;
}
* </style>
* <table>
* <tr><th>Execution Context</th><th>Parallel Behavior</th></tr>
* <tr><td>Standalone</td><td>Each parallel channel is separately executed by one or more threads.