Skip to content
Permalink
master
Switch branches/tags
Go to file
 
 
Cannot retrieve contributors at this time
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:JvmMultifileClass
@file:JvmName("FlowKt")
@file:Suppress("unused", "DeprecatedCallableAddReplaceWith", "UNUSED_PARAMETER", "NO_EXPLICIT_RETURN_TYPE_IN_API_MODE")
package kotlinx.coroutines.flow
import kotlinx.coroutines.*
import kotlin.coroutines.*
import kotlin.jvm.*
/**
* **GENERAL NOTE**
*
* These deprecations are added to improve user experience when they will start to
* search for their favourite operators and/or patterns that are missing or renamed in Flow.
* Deprecated functions also are moved here when they renamed. The difference is that they have
* a body with their implementation while pure stubs have [noImpl].
*/
internal fun noImpl(): Nothing =
throw UnsupportedOperationException("Not implemented, should not be called")
/**
* `observeOn` has no direct match in [Flow] API because all terminal flow operators are suspending and
* thus use the context of the caller.
*
* For example, the following code:
* ```
* flowable
* .observeOn(Schedulers.io())
* .doOnEach { value -> println("Received $value") }
* .subscribe()
* ```
*
* has the following Flow equivalent:
* ```
* withContext(Dispatchers.IO) {
* flow.collect { value -> println("Received $value") }
* }
*
* ```
* @suppress
*/
@Deprecated(message = "Collect flow in the desired context instead", level = DeprecationLevel.ERROR)
public fun <T> Flow<T>.observeOn(context: CoroutineContext): Flow<T> = noImpl()
/**
* `publishOn` has no direct match in [Flow] API because all terminal flow operators are suspending and
* thus use the context of the caller.
*
* For example, the following code:
* ```
* flux
* .publishOn(Schedulers.io())
* .doOnEach { value -> println("Received $value") }
* .subscribe()
* ```
*
* has the following Flow equivalent:
* ```
* withContext(Dispatchers.IO) {
* flow.collect { value -> println("Received $value") }
* }
*
* ```
* @suppress
*/
@Deprecated(message = "Collect flow in the desired context instead", level = DeprecationLevel.ERROR)
public fun <T> Flow<T>.publishOn(context: CoroutineContext): Flow<T> = noImpl()
/**
* `subscribeOn` has no direct match in [Flow] API because [Flow] preserves its context and does not leak it.
*
* For example, the following code:
* ```
* flowable
* .map { value -> println("Doing map in IO"); value }
* .subscribeOn(Schedulers.io())
* .observeOn(Schedulers.computation())
* .doOnEach { value -> println("Processing $value in computation")
* .subscribe()
* ```
* has the following Flow equivalent:
* ```
* withContext(Dispatchers.Default) {
* flow
* .map { value -> println("Doing map in IO"); value }
* .flowOn(Dispatchers.IO) // Works upstream, doesn't change downstream
* .collect { value ->
* println("Processing $value in computation")
* }
* }
* ```
* Opposed to subscribeOn, it it **possible** to use multiple `flowOn` operators in the one flow
* @suppress
*/
@Deprecated(message = "Use 'flowOn' instead", level = DeprecationLevel.ERROR)
public fun <T> Flow<T>.subscribeOn(context: CoroutineContext): Flow<T> = noImpl()
/**
* Flow analogue of `onErrorXxx` is [catch].
* Use `catch { emitAll(fallback) }`.
* @suppress
*/
@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow analogue of 'onErrorXxx' is 'catch'. Use 'catch { emitAll(fallback) }'",
replaceWith = ReplaceWith("catch { emitAll(fallback) }")
)
public fun <T> Flow<T>.onErrorResume(fallback: Flow<T>): Flow<T> = noImpl()
/**
* Flow analogue of `onErrorXxx` is [catch].
* Use `catch { emitAll(fallback) }`.
* @suppress
*/
@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow analogue of 'onErrorXxx' is 'catch'. Use 'catch { emitAll(fallback) }'",
replaceWith = ReplaceWith("catch { emitAll(fallback) }")
)
public fun <T> Flow<T>.onErrorResumeNext(fallback: Flow<T>): Flow<T> = noImpl()
/**
* `subscribe` is Rx-specific API that has no direct match in flows.
* One can use [launchIn] instead, for example the following:
* ```
* flowable
* .observeOn(Schedulers.io())
* .subscribe({ println("Received $it") }, { println("Exception $it happened") }, { println("Flowable is completed successfully") }
* ```
*
* has the following Flow equivalent:
* ```
* flow
* .onEach { value -> println("Received $value") }
* .onCompletion { cause -> if (cause == null) println("Flow is completed successfully") }
* .catch { cause -> println("Exception $cause happened") }
* .flowOn(Dispatchers.IO)
* .launchIn(myScope)
* ```
*
* Note that resulting value of [launchIn] is not used because the provided scope takes care of cancellation.
*
* Or terminal operators like [single] can be used from suspend functions.
* @suppress
*/
@Deprecated(
message = "Use 'launchIn' with 'onEach', 'onCompletion' and 'catch' instead",
level = DeprecationLevel.ERROR
)
public fun <T> Flow<T>.subscribe(): Unit = noImpl()
/**
* Use [launchIn] with [onEach], [onCompletion] and [catch] operators instead.
* @suppress
*/
@Deprecated(
message = "Use 'launchIn' with 'onEach', 'onCompletion' and 'catch' instead",
level = DeprecationLevel.ERROR
)public fun <T> Flow<T>.subscribe(onEach: suspend (T) -> Unit): Unit = noImpl()
/**
* Use [launchIn] with [onEach], [onCompletion] and [catch] operators instead.
* @suppress
*/
@Deprecated(
message = "Use 'launchIn' with 'onEach', 'onCompletion' and 'catch' instead",
level = DeprecationLevel.ERROR
)public fun <T> Flow<T>.subscribe(onEach: suspend (T) -> Unit, onError: suspend (Throwable) -> Unit): Unit = noImpl()
/**
* Note that this replacement is sequential (`concat`) by default.
* For concurrent flatMap [flatMapMerge] can be used instead.
* @suppress
*/
@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow analogue is 'flatMapConcat'",
replaceWith = ReplaceWith("flatMapConcat(mapper)")
)
public fun <T, R> Flow<T>.flatMap(mapper: suspend (T) -> Flow<R>): Flow<R> = noImpl()
/**
* Flow analogue of `concatMap` is [flatMapConcat].
* @suppress
*/
@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow analogue of 'concatMap' is 'flatMapConcat'",
replaceWith = ReplaceWith("flatMapConcat(mapper)")
)
public fun <T, R> Flow<T>.concatMap(mapper: (T) -> Flow<R>): Flow<R> = noImpl()
/**
* Note that this replacement is sequential (`concat`) by default.
* For concurrent flatMap [flattenMerge] can be used instead.
* @suppress
*/
@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow analogue of 'merge' is 'flattenConcat'",
replaceWith = ReplaceWith("flattenConcat()")
)
public fun <T> Flow<Flow<T>>.merge(): Flow<T> = noImpl()
/**
* Flow analogue of `flatten` is [flattenConcat].
* @suppress
*/
@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow analogue of 'flatten' is 'flattenConcat'",
replaceWith = ReplaceWith("flattenConcat()")
)
public fun <T> Flow<Flow<T>>.flatten(): Flow<T> = noImpl()
/**
* Kotlin has a built-in generic mechanism for making chained calls.
* If you wish to write something like
* ```
* myFlow.compose(MyFlowExtensions.ignoreErrors()).collect { ... }
* ```
* you can replace it with
*
* ```
* myFlow.let(MyFlowExtensions.ignoreErrors()).collect { ... }
* ```
* @suppress
*/
@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow analogue of 'compose' is 'let'",
replaceWith = ReplaceWith("let(transformer)")
)
public fun <T, R> Flow<T>.compose(transformer: Flow<T>.() -> Flow<R>): Flow<R> = noImpl()
/**
* Flow analogue of `skip` is [drop].
* @suppress
*/
@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow analogue of 'skip' is 'drop'",
replaceWith = ReplaceWith("drop(count)")
)
public fun <T> Flow<T>.skip(count: Int): Flow<T> = noImpl()
/**
* Flow extension to iterate over elements is [collect].
* Foreach wasn't introduced deliberately to avoid confusion.
* Flow is not a collection, iteration over it may be not idempotent
* and can *launch* computations with side-effects.
* This behaviour is not reflected in [forEach] name.
* @suppress
*/
@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow analogue of 'forEach' is 'collect'",
replaceWith = ReplaceWith("collect(action)")
)
public fun <T> Flow<T>.forEach(action: suspend (value: T) -> Unit): Unit = noImpl()
/**
* Flow has less verbose [scan] shortcut.
* @suppress
*/
@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow has less verbose 'scan' shortcut",
replaceWith = ReplaceWith("scan(initial, operation)")
)
public fun <T, R> Flow<T>.scanFold(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R> =
noImpl()
/**
* Flow analogue of `onErrorXxx` is [catch].
* Use `catch { emit(fallback) }`.
* @suppress
*/
@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow analogue of 'onErrorXxx' is 'catch'. Use 'catch { emit(fallback) }'",
replaceWith = ReplaceWith("catch { emit(fallback) }")
)
// Note: this version without predicate gives better "replaceWith" action
public fun <T> Flow<T>.onErrorReturn(fallback: T): Flow<T> = noImpl()
/**
* Flow analogue of `onErrorXxx` is [catch].
* Use `catch { e -> if (predicate(e)) emit(fallback) else throw e }`.
* @suppress
*/
@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow analogue of 'onErrorXxx' is 'catch'. Use 'catch { e -> if (predicate(e)) emit(fallback) else throw e }'",
replaceWith = ReplaceWith("catch { e -> if (predicate(e)) emit(fallback) else throw e }")
)
public fun <T> Flow<T>.onErrorReturn(fallback: T, predicate: (Throwable) -> Boolean = { true }): Flow<T> =
catch { e ->
// Note: default value is for binary compatibility with preview version, that is why it has body
if (!predicate(e)) throw e
emit(fallback)
}
/**
* Flow analogue of `startWith` is [onStart].
* Use `onStart { emit(value) }`.
* @suppress
*/
@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow analogue of 'startWith' is 'onStart'. Use 'onStart { emit(value) }'",
replaceWith = ReplaceWith("onStart { emit(value) }")
)
public fun <T> Flow<T>.startWith(value: T): Flow<T> = noImpl()
/**
* Flow analogue of `startWith` is [onStart].
* Use `onStart { emitAll(other) }`.
* @suppress
*/
@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow analogue of 'startWith' is 'onStart'. Use 'onStart { emitAll(other) }'",
replaceWith = ReplaceWith("onStart { emitAll(other) }")
)
public fun <T> Flow<T>.startWith(other: Flow<T>): Flow<T> = noImpl()
/**
* Flow analogue of `concatWith` is [onCompletion].
* Use `onCompletion { emit(value) }`.
* @suppress
*/
@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow analogue of 'concatWith' is 'onCompletion'. Use 'onCompletion { emit(value) }'",
replaceWith = ReplaceWith("onCompletion { emit(value) }")
)
public fun <T> Flow<T>.concatWith(value: T): Flow<T> = noImpl()
/**
* Flow analogue of `concatWith` is [onCompletion].
* Use `onCompletion { if (it == null) emitAll(other) }`.
* @suppress
*/
@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow analogue of 'concatWith' is 'onCompletion'. Use 'onCompletion { if (it == null) emitAll(other) }'",
replaceWith = ReplaceWith("onCompletion { if (it == null) emitAll(other) }")
)
public fun <T> Flow<T>.concatWith(other: Flow<T>): Flow<T> = noImpl()
/** @suppress */
@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow analogue of 'combineLatest' is 'combine'",
replaceWith = ReplaceWith("this.combine(other, transform)")
)
public fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
combine(this, other, transform)
/** @suppress */
@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow analogue of 'combineLatest' is 'combine'",
replaceWith = ReplaceWith("combine(this, other, other2, transform)")
)
public fun <T1, T2, T3, R> Flow<T1>.combineLatest(
other: Flow<T2>,
other2: Flow<T3>,
transform: suspend (T1, T2, T3) -> R
) = combine(this, other, other2, transform)
/** @suppress */
@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow analogue of 'combineLatest' is 'combine'",
replaceWith = ReplaceWith("combine(this, other, other2, other3, transform)")
)
public fun <T1, T2, T3, T4, R> Flow<T1>.combineLatest(
other: Flow<T2>,
other2: Flow<T3>,
other3: Flow<T4>,
transform: suspend (T1, T2, T3, T4) -> R
) = combine(this, other, other2, other3, transform)
/** @suppress */
@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow analogue of 'combineLatest' is 'combine'",
replaceWith = ReplaceWith("combine(this, other, other2, other3, transform)")
)
public fun <T1, T2, T3, T4, T5, R> Flow<T1>.combineLatest(
other: Flow<T2>,
other2: Flow<T3>,
other3: Flow<T4>,
other4: Flow<T5>,
transform: suspend (T1, T2, T3, T4, T5) -> R
): Flow<R> = combine(this, other, other2, other3, other4, transform)
/**
* Delays the emission of values from this flow for the given [timeMillis].
* Use `onStart { delay(timeMillis) }`.
* @suppress
*/
@Deprecated(
level = DeprecationLevel.ERROR, // since 1.3.0, error in 1.5.0
message = "Use 'onStart { delay(timeMillis) }'",
replaceWith = ReplaceWith("onStart { delay(timeMillis) }")
)
public fun <T> Flow<T>.delayFlow(timeMillis: Long): Flow<T> = onStart { delay(timeMillis) }
/**
* Delays each element emitted by the given flow for the given [timeMillis].
* Use `onEach { delay(timeMillis) }`.
* @suppress
*/
@Deprecated(
level = DeprecationLevel.ERROR, // since 1.3.0, error in 1.5.0
message = "Use 'onEach { delay(timeMillis) }'",
replaceWith = ReplaceWith("onEach { delay(timeMillis) }")
)
public fun <T> Flow<T>.delayEach(timeMillis: Long): Flow<T> = onEach { delay(timeMillis) }
/** @suppress */
@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow analogues of 'switchMap' are 'transformLatest', 'flatMapLatest' and 'mapLatest'",
replaceWith = ReplaceWith("this.flatMapLatest(transform)")
)
public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): Flow<R> = flatMapLatest(transform)
/** @suppress */
@Deprecated(
level = DeprecationLevel.ERROR, // Warning since 1.3.8, was experimental when deprecated, ERROR since 1.5.0
message = "'scanReduce' was renamed to 'runningReduce' to be consistent with Kotlin standard library",
replaceWith = ReplaceWith("runningReduce(operation)")
)
public fun <T> Flow<T>.scanReduce(operation: suspend (accumulator: T, value: T) -> T): Flow<T> = runningReduce(operation)
/** @suppress */
@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow analogue of 'publish()' is 'shareIn'. \n" +
"publish().connect() is the default strategy (no extra call is needed), \n" +
"publish().autoConnect() translates to 'started = SharingStared.Lazily' argument, \n" +
"publish().refCount() translates to 'started = SharingStared.WhileSubscribed()' argument.",
replaceWith = ReplaceWith("this.shareIn(scope, 0)")
)
public fun <T> Flow<T>.publish(): Flow<T> = noImpl()
/** @suppress */
@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow analogue of 'publish(bufferSize)' is 'buffer' followed by 'shareIn'. \n" +
"publish().connect() is the default strategy (no extra call is needed), \n" +
"publish().autoConnect() translates to 'started = SharingStared.Lazily' argument, \n" +
"publish().refCount() translates to 'started = SharingStared.WhileSubscribed()' argument.",
replaceWith = ReplaceWith("this.buffer(bufferSize).shareIn(scope, 0)")
)
public fun <T> Flow<T>.publish(bufferSize: Int): Flow<T> = noImpl()
/** @suppress */
@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow analogue of 'replay()' is 'shareIn' with unlimited replay. \n" +
"replay().connect() is the default strategy (no extra call is needed), \n" +
"replay().autoConnect() translates to 'started = SharingStared.Lazily' argument, \n" +
"replay().refCount() translates to 'started = SharingStared.WhileSubscribed()' argument.",
replaceWith = ReplaceWith("this.shareIn(scope, Int.MAX_VALUE)")
)
public fun <T> Flow<T>.replay(): Flow<T> = noImpl()
/** @suppress */
@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow analogue of 'replay(bufferSize)' is 'shareIn' with the specified replay parameter. \n" +
"replay().connect() is the default strategy (no extra call is needed), \n" +
"replay().autoConnect() translates to 'started = SharingStared.Lazily' argument, \n" +
"replay().refCount() translates to 'started = SharingStared.WhileSubscribed()' argument.",
replaceWith = ReplaceWith("this.shareIn(scope, bufferSize)")
)
public fun <T> Flow<T>.replay(bufferSize: Int): Flow<T> = noImpl()
/** @suppress */
@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow analogue of 'cache()' is 'shareIn' with unlimited replay and 'started = SharingStared.Lazily' argument'",
replaceWith = ReplaceWith("this.shareIn(scope, Int.MAX_VALUE, started = SharingStared.Lazily)")
)
public fun <T> Flow<T>.cache(): Flow<T> = noImpl()