forked from ReactiveX/RxJava
/
Single.java
5437 lines (5246 loc) · 272 KB
/
Single.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.core;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
import org.reactivestreams.*;
import io.reactivex.rxjava3.annotations.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.functions.*;
import io.reactivex.rxjava3.internal.fuseable.*;
import io.reactivex.rxjava3.internal.jdk8.*;
import io.reactivex.rxjava3.internal.observers.*;
import io.reactivex.rxjava3.internal.operators.completable.*;
import io.reactivex.rxjava3.internal.operators.flowable.*;
import io.reactivex.rxjava3.internal.operators.maybe.*;
import io.reactivex.rxjava3.internal.operators.mixed.*;
import io.reactivex.rxjava3.internal.operators.observable.ObservableSingleSingle;
import io.reactivex.rxjava3.internal.operators.single.*;
import io.reactivex.rxjava3.internal.util.ErrorMode;
import io.reactivex.rxjava3.observers.TestObserver;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.schedulers.*;
/**
* The {@code Single} class implements the Reactive Pattern for a single value response.
* <p>
* {@code Single} behaves similarly to {@link Observable} except that it can only emit either a single successful
* value or an error (there is no {@code onComplete} notification as there is for an {@code Observable}).
* <p>
* The {@code Single} class implements the {@link SingleSource} base interface and the default consumer
* type it interacts with is the {@link SingleObserver} via the {@link #subscribe(SingleObserver)} method.
* <p>
* The {@code Single} operates with the following sequential protocol:
* <pre>
* <code>onSubscribe (onSuccess | onError)?</code>
* </pre>
* <p>
* Note that {@code onSuccess} and {@code onError} are mutually exclusive events; unlike {@code Observable},
* {@code onSuccess} is never followed by {@code onError}.
* <p>
* Like {@code Observable}, a running {@code Single} can be stopped through the {@link Disposable} instance
* provided to consumers through {@link SingleObserver#onSubscribe}.
* <p>
* Like an {@code Observable}, a {@code Single} is lazy, can be either "hot" or "cold", synchronous or
* asynchronous. {@code Single} instances returned by the methods of this class are <em>cold</em>
* and there is a standard <em>hot</em> implementation in the form of a subject:
* {@link io.reactivex.rxjava3.subjects.SingleSubject SingleSubject}.
* <p>
* The documentation for this class makes use of marble diagrams. The following legend explains these diagrams:
* <p>
* <img width="640" height="301" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.legend.png" alt="">
* <p>
* See {@link Flowable} or {@code Observable} for the
* implementation of the Reactive Pattern for a stream or vector of values.
* <p>
* For more information see the <a href="http://reactivex.io/documentation/single.html">ReactiveX
* documentation</a>.
* <p>
* Example:
* <pre><code>
* Disposable d = Single.just("Hello World")
* .delay(10, TimeUnit.SECONDS, Schedulers.io())
* .subscribeWith(new DisposableSingleObserver<String>() {
* @Override
* public void onStart() {
* System.out.println("Started");
* }
*
* @Override
* public void onSuccess(String value) {
* System.out.println("Success: " + value);
* }
*
* @Override
* public void onError(Throwable error) {
* error.printStackTrace();
* }
* });
*
* Thread.sleep(5000);
*
* d.dispose();
* </code></pre>
* <p>
* Note that by design, subscriptions via {@link #subscribe(SingleObserver)} can't be disposed
* from the outside (hence the
* {@code void} return of the {@link #subscribe(SingleObserver)} method) and it is the
* responsibility of the implementor of the {@code SingleObserver} to allow this to happen.
* RxJava supports such usage with the standard
* {@link io.reactivex.rxjava3.observers.DisposableSingleObserver DisposableSingleObserver} instance.
* For convenience, the {@link #subscribeWith(SingleObserver)} method is provided as well to
* allow working with a {@code SingleObserver} (or subclass) instance to be applied with in
* a fluent manner (such as in the example above).
* @param <T>
* the type of the item emitted by the {@code Single}
* @since 2.0
* @see io.reactivex.rxjava3.observers.DisposableSingleObserver
*/
public abstract class Single<@NonNull T> implements SingleSource<T> {
/**
* Runs multiple {@link SingleSource}s and signals the events of the first one that signals (disposing
* the rest).
* <p>
* <img width="640" height="515" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.amb.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code amb} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources the {@link Iterable} sequence of sources. A subscription to each source will
* occur in the same order as in this {@code Iterable}.
* @return the new {@code Single} instance
* @throws NullPointerException if {@code sources} is {@code null}
* @since 2.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Single<T> amb(@NonNull Iterable<@NonNull ? extends SingleSource<? extends T>> sources) {
Objects.requireNonNull(sources, "sources is null");
return RxJavaPlugins.onAssembly(new SingleAmb<>(null, sources));
}
/**
* Runs multiple {@link SingleSource}s and signals the events of the first one that signals (disposing
* the rest).
* <p>
* <img width="640" height="515" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.ambArray.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code ambArray} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources the array of sources. A subscription to each source will
* occur in the same order as in this array.
* @return the new {@code Single} instance
* @throws NullPointerException if {@code sources} is {@code null}
* @since 2.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@SafeVarargs
@NonNull
public static <T> Single<T> ambArray(@NonNull SingleSource<? extends T>... sources) {
Objects.requireNonNull(sources, "sources is null");
if (sources.length == 0) {
return error(SingleInternalHelper.emptyThrower());
}
if (sources.length == 1) {
@SuppressWarnings("unchecked")
SingleSource<T> source = (SingleSource<T>)sources[0];
return wrap(source);
}
return RxJavaPlugins.onAssembly(new SingleAmb<>(sources, null));
}
/**
* Concatenate the single values, in a non-overlapping fashion, of the {@link SingleSource}s provided by
* an {@link Iterable} sequence.
* <p>
* <img width="640" height="319" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.concat.i.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@link Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concat} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources the {@code Iterable} sequence of {@code SingleSource} instances
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code sources} is {@code null}
* @since 2.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static <T> Flowable<T> concat(@NonNull Iterable<@NonNull ? extends SingleSource<? extends T>> sources) {
return Flowable.fromIterable(sources).concatMapSingleDelayError(Functions.identity(), false);
}
/**
* Concatenate the single values, in a non-overlapping fashion, of the {@link SingleSource}s provided by
* an {@link ObservableSource} sequence.
* <p>
* <img width="640" height="319" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.concat.o.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concat} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources the {@code ObservableSource} of {@code SingleSource} instances
* @return the new {@link Observable} instance
* @throws NullPointerException if {@code sources} is {@code null}
* @since 2.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concat(@NonNull ObservableSource<? extends SingleSource<? extends T>> sources) {
Objects.requireNonNull(sources, "sources is null");
return RxJavaPlugins.onAssembly(new ObservableConcatMapSingle<>(sources, Functions.identity(), ErrorMode.IMMEDIATE, 2));
}
/**
* Concatenate the single values, in a non-overlapping fashion, of the {@link SingleSource}s provided by
* a {@link Publisher} sequence.
* <p>
* <img width="640" height="308" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.concat.p.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@link Flowable} honors the backpressure of the downstream consumer
* and the sources {@code Publisher} is expected to honor it as well.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concat} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources the {@code Publisher} of {@code SingleSource} instances
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code sources} is {@code null}
* @since 2.0
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> concat(@NonNull Publisher<@NonNull ? extends SingleSource<? extends T>> sources) {
return concat(sources, 2);
}
/**
* Concatenate the single values, in a non-overlapping fashion, of the {@link SingleSource}s provided by
* a {@link Publisher} sequence and prefetched by the specified amount.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.concat.pn.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@link Flowable} honors the backpressure of the downstream consumer
* and the sources {@code Publisher} is expected to honor it as well.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concat} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources the {@code Publisher} of {@code SingleSource} instances
* @param prefetch the number of {@code SingleSource}s to prefetch from the {@code Publisher}
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code sources} is {@code null}
* @throws IllegalArgumentException if {@code prefetch} is non-positive
* @since 2.0
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> concat(@NonNull Publisher<@NonNull ? extends SingleSource<? extends T>> sources, int prefetch) {
Objects.requireNonNull(sources, "sources is null");
ObjectHelper.verifyPositive(prefetch, "prefetch");
return RxJavaPlugins.onAssembly(new FlowableConcatMapSinglePublisher<>(sources, Functions.identity(), ErrorMode.IMMEDIATE, prefetch));
}
/**
* Returns a {@link Flowable} that emits the items emitted by two {@link SingleSource}s, one after the other.
* <p>
* <img width="640" height="366" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.concat.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concat} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the common value type
* @param source1
* a {@code SingleSource} to be concatenated
* @param source2
* a {@code SingleSource} to be concatenated
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code source1} or {@code source2} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/concat.html">ReactiveX operators documentation: Concat</a>
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> concat(
@NonNull SingleSource<? extends T> source1, @NonNull SingleSource<? extends T> source2
) {
Objects.requireNonNull(source1, "source1 is null");
Objects.requireNonNull(source2, "source2 is null");
return Flowable.fromArray(source1, source2).concatMapSingleDelayError(Functions.identity(), false);
}
/**
* Returns a {@link Flowable} that emits the items emitted by three {@link SingleSource}s, one after the other.
* <p>
* <img width="640" height="366" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.concat.o3.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concat} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the common value type
* @param source1
* a {@code SingleSource} to be concatenated
* @param source2
* a {@code SingleSource} to be concatenated
* @param source3
* a {@code SingleSource} to be concatenated
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code source1}, {@code source2} or {@code source3} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/concat.html">ReactiveX operators documentation: Concat</a>
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> concat(
@NonNull SingleSource<? extends T> source1, @NonNull SingleSource<? extends T> source2,
@NonNull SingleSource<? extends T> source3
) {
Objects.requireNonNull(source1, "source1 is null");
Objects.requireNonNull(source2, "source2 is null");
Objects.requireNonNull(source3, "source3 is null");
return Flowable.fromArray(source1, source2, source3).concatMapSingleDelayError(Functions.identity(), false);
}
/**
* Returns a {@link Flowable} that emits the items emitted by four {@link SingleSource}s, one after the other.
* <p>
* <img width="640" height="362" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.concat.o4.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concat} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the common value type
* @param source1
* a {@code SingleSource} to be concatenated
* @param source2
* a {@code SingleSource} to be concatenated
* @param source3
* a {@code SingleSource} to be concatenated
* @param source4
* a {@code SingleSource} to be concatenated
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code source1}, {@code source2}, {@code source3} or {@code source4} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/concat.html">ReactiveX operators documentation: Concat</a>
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> concat(
@NonNull SingleSource<? extends T> source1, @NonNull SingleSource<? extends T> source2,
@NonNull SingleSource<? extends T> source3, @NonNull SingleSource<? extends T> source4
) {
Objects.requireNonNull(source1, "source1 is null");
Objects.requireNonNull(source2, "source2 is null");
Objects.requireNonNull(source3, "source3 is null");
Objects.requireNonNull(source4, "source4 is null");
return Flowable.fromArray(source1, source2, source3, source4).concatMapSingleDelayError(Functions.identity(), false);
}
/**
* Concatenate the single values, in a non-overlapping fashion, of the {@link SingleSource}s provided in
* an array.
* <p>
* <img width="640" height="319" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.concatArray.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@link Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatArray} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources the array of {@code SingleSource} instances
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code sources} is {@code null}
* @since 2.0
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@SafeVarargs
public static <T> Flowable<T> concatArray(@NonNull SingleSource<? extends T>... sources) {
return Flowable.fromArray(sources).concatMapSingleDelayError(Functions.identity(), false);
}
/**
* Concatenate the single values, in a non-overlapping fashion, of the {@link SingleSource}s provided in
* an array.
* <p>
* <img width="640" height="408" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.concatArrayDelayError.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@link Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatArrayDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources the array of {@code SingleSource} instances
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code sources} is {@code null}
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@SafeVarargs
public static <T> Flowable<T> concatArrayDelayError(@NonNull SingleSource<? extends T>... sources) {
return Flowable.fromArray(sources).concatMapSingleDelayError(Functions.identity(), true);
}
/**
* Concatenates a sequence of {@link SingleSource} eagerly into a single stream of values.
* <p>
* <img width="640" height="257" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.concatArrayEager.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source {@code SingleSource}s. The operator buffers the value emitted by these {@code SingleSource}s and then drains them
* in order, each one after the previous one succeeds.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources a sequence of {@code SingleSource}s that need to be eagerly concatenated
* @return the new {@link Flowable} instance with the specified concatenation behavior
* @throws NullPointerException if {@code sources} is {@code null}
*/
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@SafeVarargs
public static <T> Flowable<T> concatArrayEager(@NonNull SingleSource<? extends T>... sources) {
return Flowable.fromArray(sources).concatMapEager(SingleInternalHelper.toFlowable());
}
/**
* Concatenates a sequence of {@link SingleSource} eagerly into a single stream of values.
* <p>
* <img width="640" height="426" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.concatArrayEagerDelayError.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source {@code SingleSource}s. The operator buffers the value emitted by these {@code SingleSource}s and then drains them
* in order, each one after the previous one succeeds.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources a sequence of {@code SingleSource}s that need to be eagerly concatenated
* @return the new {@link Flowable} instance with the specified concatenation behavior
* @throws NullPointerException if {@code sources} is {@code null}
* @since 3.0.0
*/
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@SafeVarargs
public static <T> Flowable<T> concatArrayEagerDelayError(@NonNull SingleSource<? extends T>... sources) {
return Flowable.fromArray(sources).concatMapEagerDelayError(SingleInternalHelper.toFlowable(), true);
}
/**
* Concatenates a {@link Publisher} sequence of {@link SingleSource}s eagerly into a single stream of values.
* <p>
* <img width="640" height="307" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.concatEager.p.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* emitted source {@code SingleSource}s as they are observed. The operator buffers the values emitted by these
* {@code SingleSource}s and then drains them in order, each one after the previous one succeeds.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>Backpressure is honored towards the downstream and the outer {@code Publisher} is
* expected to support backpressure. Violating this assumption, the operator will
* signal {@link io.reactivex.rxjava3.exceptions.MissingBackpressureException}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources a sequence of {@code SingleSource}s that need to be eagerly concatenated
* @return the new {@link Flowable} instance with the specified concatenation behavior
* @throws NullPointerException if {@code sources} is {@code null}
*/
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> concatEager(@NonNull Publisher<@NonNull ? extends SingleSource<? extends T>> sources) {
return Flowable.fromPublisher(sources).concatMapEager(SingleInternalHelper.toFlowable());
}
/**
* Concatenates the {@link Iterable} sequence of {@link SingleSource}s into a single sequence by subscribing to each {@code SingleSource},
* one after the other, one at a time and delays any errors till the all inner {@code SingleSource}s terminate
* as a {@link Flowable} sequence.
* <p>
* <img width="640" height="451" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.concatDelayError.i.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the common element base type
* @param sources the {@code Iterable} sequence of {@code SingleSource}s
* @return the new {@code Flowable} with the concatenating behavior
* @throws NullPointerException if {@code sources} is {@code null}
* @since 3.0.0
*/
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> concatDelayError(@NonNull Iterable<@NonNull ? extends SingleSource<? extends T>> sources) {
return Flowable.fromIterable(sources).concatMapSingleDelayError(Functions.identity());
}
/**
* Concatenates the {@link Publisher} sequence of {@link SingleSource}s into a single sequence by subscribing to each inner {@code SingleSource},
* one after the other, one at a time and delays any errors till the all inner and the outer {@code Publisher} terminate
* as a {@link Flowable} sequence.
* <p>
* <img width="640" height="345" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.concatDelayError.p.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>{@code concatDelayError} fully supports backpressure.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the common element base type
* @param sources the {@code Publisher} sequence of {@code SingleSource}s
* @return the new {@code Flowable} with the concatenating behavior
* @throws NullPointerException if {@code sources} is {@code null}
* @since 3.0.0
*/
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public static <T> Flowable<T> concatDelayError(@NonNull Publisher<@NonNull ? extends SingleSource<? extends T>> sources) {
return Flowable.fromPublisher(sources).concatMapSingleDelayError(Functions.identity());
}
/**
* Concatenates the {@link Publisher} sequence of {@link SingleSource}s into a single sequence by subscribing to each inner {@code SingleSource},
* one after the other, one at a time and delays any errors till the all inner and the outer {@code Publisher} terminate
* as a {@link Flowable} sequence.
* <p>
* <img width="640" height="299" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.concatDelayError.pn.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>{@code concatDelayError} fully supports backpressure.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the common element base type
* @param sources the {@code Publisher} sequence of {@code SingleSource}s
* @param prefetch The number of upstream items to prefetch so that fresh items are
* ready to be mapped when a previous {@code SingleSource} terminates.
* The operator replenishes after half of the prefetch amount has been consumed
* and turned into {@code SingleSource}s.
* @return the new {@code Flowable} with the concatenating behavior
* @throws NullPointerException if {@code sources} is {@code null}
* @throws IllegalArgumentException if {@code prefetch} is non-positive
* @since 3.0.0
*/
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public static <T> Flowable<T> concatDelayError(@NonNull Publisher<@NonNull ? extends SingleSource<? extends T>> sources, int prefetch) {
return Flowable.fromPublisher(sources).concatMapSingleDelayError(Functions.identity(), true, prefetch);
}
/**
* Concatenates an {@link Iterable} sequence of {@link SingleSource}s eagerly into a single stream of values.
* <p>
* <img width="640" height="319" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.concatEager.i.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source {@code SingleSource}s. The operator buffers the values emitted by these {@code SingleSource}s and then drains them
* in order, each one after the previous one succeeds.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>Backpressure is honored towards the downstream.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources an {@code Iterable} sequence of {@code SingleSource} that need to be eagerly concatenated
* @return the new {@link Flowable} instance with the specified concatenation behavior
* @throws NullPointerException if {@code sources} is {@code null}
*/
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> concatEager(@NonNull Iterable<@NonNull ? extends SingleSource<? extends T>> sources) {
return Flowable.fromIterable(sources).concatMapEager(SingleInternalHelper.toFlowable());
}
/**
* Provides an API (via a cold {@code Single}) that bridges the reactive world with the callback-style world.
* <p>
* <img width="640" height="454" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.create.png" alt="">
* <p>
* Example:
* <pre><code>
* Single.<Event>create(emitter -> {
* Callback listener = new Callback() {
* @Override
* public void onEvent(Event e) {
* emitter.onSuccess(e);
* }
*
* @Override
* public void onFailure(Exception e) {
* emitter.onError(e);
* }
* };
*
* AutoCloseable c = api.someMethod(listener);
*
* emitter.setCancellable(c::close);
*
* });
* </code></pre>
* <p>
* Whenever a {@link SingleObserver} subscribes to the returned {@code Single}, the provided
* {@link SingleOnSubscribe} callback is invoked with a fresh instance of a {@link SingleEmitter}
* that will interact only with that specific {@code SingleObserver}. If this {@code SingleObserver}
* disposes the flow (making {@link SingleEmitter#isDisposed} return {@code true}),
* other observers subscribed to the same returned {@code Single} are not affected.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code create} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param source the emitter that is called when a {@code SingleObserver} subscribes to the returned {@code Single}
* @return the new {@code Single} instance
* @throws NullPointerException if {@code source} is {@code null}
* @see SingleOnSubscribe
* @see Cancellable
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <@NonNull T> Single<T> create(@NonNull SingleOnSubscribe<T> source) {
Objects.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new SingleCreate<>(source));
}
/**
* Calls a {@link Supplier} for each individual {@link SingleObserver} to return the actual {@link SingleSource} to
* be subscribed to.
* <p>
* <img width="640" height="515" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.defer.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code defer} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param supplier the {@code Supplier} that is called for each individual {@code SingleObserver} and
* returns a {@code SingleSource} instance to subscribe to
* @throws NullPointerException if {@code supplier} is {@code null}
* @return the new {@code Single} instance
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Single<T> defer(@NonNull Supplier<? extends SingleSource<? extends T>> supplier) {
Objects.requireNonNull(supplier, "supplier is null");
return RxJavaPlugins.onAssembly(new SingleDefer<>(supplier));
}
/**
* Signals a {@link Throwable} returned by the callback function for each individual {@link SingleObserver}.
* <p>
* <img width="640" height="283" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.error.c.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code error} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param supplier the {@link Supplier} that is called for each individual {@code SingleObserver} and
* returns a {@code Throwable} instance to be emitted.
* @throws NullPointerException if {@code supplier} is {@code null}
* @return the new {@code Single} instance
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Single<T> error(@NonNull Supplier<? extends Throwable> supplier) {
Objects.requireNonNull(supplier, "supplier is null");
return RxJavaPlugins.onAssembly(new SingleError<>(supplier));
}
/**
* Returns a {@code Single} that invokes a subscriber's {@link SingleObserver#onError onError} method when the
* subscriber subscribes to it.
* <p>
* <img width="640" height="283" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.error.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code error} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param throwable
* the particular {@link Throwable} to pass to {@link SingleObserver#onError onError}
* @param <T>
* the type of the item (ostensibly) emitted by the {@code Single}
* @return the new {@code Single} that invokes the subscriber's {@link SingleObserver#onError onError} method when
* the subscriber subscribes to it
* @throws NullPointerException if {@code throwable} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/empty-never-throw.html">ReactiveX operators documentation: Throw</a>
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Single<T> error(@NonNull Throwable throwable) {
Objects.requireNonNull(throwable, "throwable is null");
return error(Functions.justSupplier(throwable));
}
/**
* Returns a {@code Single} that invokes the given {@link Callable} for each incoming {@link SingleObserver}
* and emits its value or exception to them.
* <p>
* Allows you to defer execution of passed function until {@code SingleObserver} subscribes to the {@code Single}.
* It makes passed function "lazy".
* Result of the function invocation will be emitted by the {@link Single}.
* <p>
* <img width="640" height="467" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.fromCallable.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromCallable} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd> If the {@code Callable} throws an exception, the respective {@link Throwable} is
* delivered to the downstream via {@link SingleObserver#onError(Throwable)},
* except when the downstream has disposed this {@code Single} source.
* In this latter case, the {@code Throwable} is delivered to the global error handler via
* {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.rxjava3.exceptions.UndeliverableException UndeliverableException}.
* </dd>
* </dl>
*
* @param callable
* function which execution should be deferred, it will be invoked when {@code SingleObserver} will subscribe to the {@link Single}.
* @param <T>
* the type of the item emitted by the {@code Single}.
* @return the new {@code Single} whose {@code SingleObserver}s' subscriptions trigger an invocation of the given function.
* @throws NullPointerException if {@code callable} is {@code null}
* @see #defer(Supplier)
* @see #fromSupplier(Supplier)
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <@NonNull T> Single<T> fromCallable(@NonNull Callable<? extends T> callable) {
Objects.requireNonNull(callable, "callable is null");
return RxJavaPlugins.onAssembly(new SingleFromCallable<>(callable));
}
/**
* Converts a {@link Future} into a {@code Single} and awaits its outcome in a blocking fashion.
* <p>
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.from.Future.png" alt="">
* <p>
* The operator calls {@link Future#get()}, which is a blocking method, on the subscription thread.
* It is recommended applying {@link #subscribeOn(Scheduler)} to move this blocking wait to a
* background thread, and if the {@link Scheduler} supports it, interrupt the wait when the flow
* is disposed.
* <p>
* A non-{@code null} value is then emitted via {@code onSuccess} or any exception is emitted via
* {@code onError}. If the {@code Future} completes with {@code null}, a {@link NullPointerException}
* is signaled.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromFuture} does not operate by default on a particular {@code Scheduler}.</dd>
* </dl>
*
* @param future
* the source {@code Future}
* @param <T>
* the type of object that the {@code Future} returns, and also the type of item to be emitted by
* the resulting {@code Single}
* @return the new {@code Single} that emits the item from the source {@code Future}
* @throws NullPointerException if {@code future} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/from.html">ReactiveX operators documentation: From</a>
* @see #fromFuture(Future, long, TimeUnit)
* @see #fromCompletionStage(CompletionStage)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public static <@NonNull T> Single<T> fromFuture(@NonNull Future<? extends T> future) {
return toSingle(Flowable.fromFuture(future));
}
/**
* Converts a {@link Future} into a {@code Single} and awaits its outcome, or timeout, in a blocking fashion.
* <p>
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.from.Future.png" alt="">
* <p>
* The operator calls {@link Future#get(long, TimeUnit)}, which is a blocking method, on the subscription thread.
* It is recommended applying {@link #subscribeOn(Scheduler)} to move this blocking wait to a
* background thread, and if the {@link Scheduler} supports it, interrupt the wait when the flow
* is disposed.
* <p>
* A non-{@code null} value is then emitted via {@code onSuccess} or any exception is emitted via
* {@code onError}. If the {@code Future} completes with {@code null}, a {@link NullPointerException}
* is signaled.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromFuture} does not operate by default on a particular {@code Scheduler}.</dd>
* </dl>
*
* @param future
* the source {@code Future}
* @param timeout
* the maximum time to wait before calling {@code get}
* @param unit
* the {@link TimeUnit} of the {@code timeout} argument
* @param <T>
* the type of object that the {@code Future} returns, and also the type of item to be emitted by
* the resulting {@code Single}
* @return the new {@code Single} that emits the item from the source {@code Future}
* @throws NullPointerException if {@code future} or {@code unit} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/from.html">ReactiveX operators documentation: From</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public static <@NonNull T> Single<T> fromFuture(@NonNull Future<? extends T> future, long timeout, @NonNull TimeUnit unit) {
return toSingle(Flowable.fromFuture(future, timeout, unit));
}
/**
* Returns a {@code Single} instance that when subscribed to, subscribes to the {@link MaybeSource} instance and
* emits {@code onSuccess} as a single item, turns an {@code onComplete} into {@link NoSuchElementException} error signal or
* forwards the {@code onError} signal.
* <p>
* <img width="640" height="241" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.fromMaybe.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromMaybe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type of the {@code MaybeSource} element
* @param maybe the {@code MaybeSource} instance to subscribe to, not {@code null}
* @return the new {@code Single} instance
* @throws NullPointerException if {@code maybe} is {@code null}
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Single<T> fromMaybe(@NonNull MaybeSource<T> maybe) {
Objects.requireNonNull(maybe, "maybe is null");
return RxJavaPlugins.onAssembly(new MaybeToSingle<>(maybe, null));
}
/**
* Returns a {@code Single} instance that when subscribed to, subscribes to the {@link MaybeSource} instance and
* emits {@code onSuccess} as a single item, emits the {@code defaultItem} for an {@code onComplete} signal or
* forwards the {@code onError} signal.
* <p>
* <img width="640" height="353" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.fromMaybe.v.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromMaybe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type of the {@code MaybeSource} element
* @param maybe the {@code MaybeSource} instance to subscribe to, not {@code null}
* @param defaultItem the item to signal if the current {@code MaybeSource} is empty
* @return the new {@code Single} instance
* @throws NullPointerException if {@code maybe} or {@code defaultItem} is {@code null}
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Single<T> fromMaybe(@NonNull MaybeSource<T> maybe, @NonNull T defaultItem) {
Objects.requireNonNull(maybe, "maybe is null");
Objects.requireNonNull(defaultItem, "defaultItem is null");
return RxJavaPlugins.onAssembly(new MaybeToSingle<>(maybe, defaultItem));
}
/**
* Wraps a specific {@link Publisher} into a {@code Single} and signals its single element or error.
* <p>
* <img width="640" height="322" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.fromPublisher.png" alt="">
* <p>
* If the source {@code Publisher} is empty, a {@link NoSuchElementException} is signaled. If
* the source has more than one element, an {@link IndexOutOfBoundsException} is signaled.
* <p>
* The {@code Publisher} must follow the
* <a href="https://github.com/reactive-streams/reactive-streams-jvm#reactive-streams">Reactive Streams specification</a>.
* Violating the specification may result in undefined behavior.
* <p>
* If possible, use {@link #create(SingleOnSubscribe)} to create a
* source-like {@code Single} instead.
* <p>
* Note that even though {@code Publisher} appears to be a functional interface, it
* is not recommended to implement it through a lambda as the specification requires
* state management that is not achievable with a stateless lambda.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The {@code publisher} is consumed in an unbounded fashion but will be cancelled
* if it produced more than one item.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromPublisher} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param publisher the source {@code Publisher} instance, not {@code null}
* @return the new {@code Single} instance
* @throws NullPointerException if {@code publisher} is {@code null}
* @see #create(SingleOnSubscribe)
*/
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Single<T> fromPublisher(@NonNull Publisher<@NonNull ? extends T> publisher) {
Objects.requireNonNull(publisher, "publisher is null");
return RxJavaPlugins.onAssembly(new SingleFromPublisher<>(publisher));
}
/**
* Wraps a specific {@link ObservableSource} into a {@code Single} and signals its single element or error.
* <p>
* <img width="640" height="343" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.fromObservable.png" alt="">
* <p>
* If the {@code ObservableSource} is empty, a {@link NoSuchElementException} is signaled.
* If the source has more than one element, an {@link IndexOutOfBoundsException} is signaled.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromObservable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param observable the source sequence to wrap, not {@code null}
* @param <T>
* the type of the item emitted by the {@code Single}.
* @return the new {@code Single} instance
* @throws NullPointerException if {@code observable} is {@code null}
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Single<T> fromObservable(@NonNull ObservableSource<? extends T> observable) {
Objects.requireNonNull(observable, "observable is null");
return RxJavaPlugins.onAssembly(new ObservableSingleSingle<>(observable, null));
}
/**
* Returns a {@code Single} that invokes passed supplier and emits its result
* for each individual {@link SingleObserver} that subscribes.
* <p>
* Allows you to defer execution of passed function until a {@code SingleObserver} subscribes to the {@link Single}.
* It makes passed function "lazy".
* Result of the function invocation will be emitted by the {@link Single}.
* <p>
* <img width="640" height="467" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.fromSupplier.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromSupplier} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>