Skip to content

Commit

Permalink
Merge pull request #83 from WaveBeans/stream-concatenation
Browse files Browse the repository at this point in the history
#82 Concatenation operation
  • Loading branch information
asubb committed Aug 13, 2020
2 parents 78d25c6 + a7b1673 commit 41b592a
Show file tree
Hide file tree
Showing 17 changed files with 358 additions and 58 deletions.
1 change: 1 addition & 0 deletions .release/concatenation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* [ [#82](https://github.com/WaveBeans/wavebeans/issues/82) ] [Concatenation](/docs/user/api/operations/concatenation-operation.md) operation
8 changes: 5 additions & 3 deletions docs/user/api/inputs/finite-converters.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ Finite converters

All WaveBeans streams supposed to be infinite during processing, however certain inputs like [wav-files](wav-file.md) are finite by their nature and that conflict needs to be resolved. For that particular purpose there is an abstraction that allows you to convert such finite stream into infinite one based on defined strategy. For example, you may replace all samples with zeros which are out of range of the source stream -- basically replace with silence, so it won't affect any other stream while you're mixing the up together for instance.

It is required for some of the cases and implemented only for type `Sample` currently. Also, only one converter is supported out of the box which fills with zeros everything what is out of the range of the source stream -- `ZeroFilling`. To convert finite stream to infinite one, you can call method `sampleStream()` on the FiniteSampleStream instance:
The conversion is recommended but not required, as many operations are designed to match the lengths of the finite streams and assume Zero samples automatically or ask to provide one.

At the moment, only one converter is supported out of the box which fills with provided value everything what is out of the range of the source stream, it is called `AfterFilling`. To convert finite stream to infinite one, you can call method `stream(converter)` on the `FiniteStream<T>` instance providing the converted. For example to convert `FiniteStream<Sample>` to infinite one `BeanStream<Sample>` with filling of zero samples at the end:

```kotlin
someFiniteStream.sampleStream(ZeroFilling())
someFiniteStream.stream(AfterFilling(ZeroSample))
```

Also some of the API way call this method implicitly, for example, you can read wav-file and convert it to a infinite stream at once:

```kotlin
wave("file:///path/to/file.wav", ZeroFilling())
wave("file:///path/to/file.wav", AfterFilling(ZeroSample))
```

*Note: That functionality will be reworked in the near future to provide more transparent and flexible API.*
4 changes: 3 additions & 1 deletion docs/user/api/inputs/list-as-input.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ List as Input

Whenever you want to generate the input based on some predefined value such as list of values, you may that approach and convert any `List<T>` into a stream with corresponding type T. The main difference between List as Input and [input as function](function-as-input.md) is that all values are available at the time the stream starts, but not being evaluated.

You can convert any `List<T>` to `BeanStream<T>` by simply calling `input()` function of the list:
You can convert any `List<T>` to `FiniteStream<T>` by simply calling `input()` function of the list:

```kotlin
listOf(1, 2, 3).input()
Expand All @@ -16,4 +16,6 @@ listOf(1, 2, 3).input()
(0..44100).map { sin(44100.0 / it) }.input()
```

The important thing that the stream created is finite. If you want to convert it into infinite one, consider using [converter](finite-converters.md).

It's important to remember, that list is being evaluated before the stream itself and in case of distributed executions the result should be propagated across all executors, so being lean about using such inputs is essential.
50 changes: 50 additions & 0 deletions docs/user/api/operations/concatenation-operation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
Concatenation operation
=====

<!-- START doctoc generated TOC please keep comment here to allow auto update -->
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
<!-- END doctoc generated TOC please keep comment here to allow auto update -->

Whenever you need to concatenate stream one after another you may use this operator. The leading stream though should be finite and be the type of `FiniteStream<T>`, another stream operand should be finite or infinite but the same type. Depending on the second argument type, the resulting stream will be either finite of type `FiniteStream<T>`, or infinite of type `BeanStream<T>`.

This is what is happening schematically, the stream are being read one after another:

```text
[1, 2, 3, 4] .. [10, 12, 13] -> [1, 2, 3, 4, 10, 12, 13]
```

The inline operator `..` concatenate two streams, you may use them one-by-one as many as you like to concatenate more streams altogether:

```
// finite streams
val a = listOf(1, 2).input()
val b = listOf(3, 4, 5).input()
val c = listOf(10, 20, 30).input()
// infinite stream
val d = listOf(100, 200).input().stream(AfterFilling(0))
(a..b).asSequence(44100.0f).toList()
// results in stream [1, 2, 3, 4, 5]
(b..c).asSequence(44100.0f).toList()
// results in stream [3, 4, 5, 10, 20, 30]
(a..b..c).asSequence(44100.0f).toList()
// results in stream [1, 2, 3, 4, 5, 10, 20, 30]
(a..b..c..d).asSequence(44100.0f).take(15).toList()
// results in stream [1, 2, 3, 4, 5, 10, 20, 30, 100, 200, 0, 0, 0, 0, 0]
(d..c).asSequence(44100.0f).take(15).toList()
// won't compile as the first operand `d` is infinite stream
```

If you want to do a finite stream from the infinite one, you may use [trim operation](trim-operation.md):

```kotlin
val sine1 = 440.sine().trim(3)
val sine2 = listOf(0.1, 0.2, 0.3).map { sampleOf(it) }.input()

(sine1..sine2).asSequence(1000.0f).toList()
// results in stream [1.0, -0.9297764858882515, 0.7289686274214119, 0.1, 0.2, 0.3]
```
2 changes: 2 additions & 0 deletions exe/src/main/kotlin/io/wavebeans/execution/PodRegistry.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ object PodRegistry {
private val splittingPodRegistry = mutableMapOf<KType, KFunction<Pod>>()

init {
registerPodProxy(typeOf<FiniteStream<*>>(), AnyFiniteStreamPodProxy::class.constructors.first())
registerPodProxy(typeOf<BeanStream<*>>(), AnyStreamPodProxy::class.constructors.first())

registerMergingPodProxy(typeOf<BeanStream<*>>(), AnyStreamMergingPodProxy::class.constructors.first())

registerPod(typeOf<FiniteStream<*>>(), AnyFiniteStreamingPod::class.constructors.single { it.parameters.size == 2 })
registerPod(typeOf<BeanStream<*>>(), AnyStreamingPod::class.constructors.single { it.parameters.size == 2 })
registerPod(typeOf<StreamOutput<*>>(), AnyStreamOutputPod::class.constructors.first())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class Facilitator(
private val startupClasses = startUpClasses()
private val jobStates = ConcurrentHashMap<JobKey, JobState>()
private var startedFrom: List<StackTraceElement>? = null
private lateinit var communicator: io.grpc.Server
private var communicator: io.grpc.Server? = null

fun start(): Facilitator {
if (startedFrom != null)
Expand All @@ -88,7 +88,7 @@ class Facilitator(
.addService(FacilitatorGrpcService.instance(this))
.build()
.start()
log.info { "Communicator on port $communicatorPort started." }
log.info { "Communicator on port $it started." }
}

return this
Expand Down Expand Up @@ -215,10 +215,12 @@ class Facilitator(
}

override fun close() {
communicatorPort?.let {
if (!communicator.shutdown().awaitTermination(onServerShutdownTimeoutMillis, TimeUnit.MILLISECONDS)) {
communicator.shutdownNow()
}
if (
communicator
?.shutdown()
?.awaitTermination(onServerShutdownTimeoutMillis, TimeUnit.MILLISECONDS) == false
) {
communicator?.shutdownNow()
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
package io.wavebeans.execution.pod

import io.wavebeans.lib.BeanStream
import io.wavebeans.lib.stream.FiniteStream

class AnyStreamingPod(
bean: BeanStream<Any>,
podKey: PodKey
) : StreamingPod<Any, BeanStream<Any>>(
bean = bean,
podKey = podKey
)
class AnyFiniteStreamingPod(
bean: FiniteStream<Any>,
podKey: PodKey
) : StreamingPod<Any, FiniteStream<Any>>(
bean = bean,
podKey = podKey
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.wavebeans.execution.podproxy

import io.wavebeans.execution.medium.value
import io.wavebeans.execution.pod.PodKey
import io.wavebeans.lib.stream.FiniteStream
import java.util.concurrent.TimeUnit

class AnyFiniteStreamPodProxy(
podKey: PodKey,
forPartition: Int
) : FiniteStream<Any>, StreamingPodProxy(
pointedTo = podKey,
forPartition = forPartition
) {
override fun length(timeUnit: TimeUnit): Long {
val bush = podDiscovery.bushFor(pointedTo)
val caller = bushCallerRepository.create(bush, pointedTo)
return caller.call("length?timeUnit=${timeUnit}")
.get(5000, TimeUnit.MILLISECONDS)
.value()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -381,4 +381,49 @@ object OverseerIntegrationSpec : Spek({
it("should have the same output as local") { assertThat(fileContent).isEqualTo(fileContentLocal) }

}

describe("Concatenation") {
describe("finite..finite") {
val file = File.createTempFile("test", ".csv").also { it.deleteOnExit() }

val stream1 = seqStream().trim(1000)
val stream2 = seqStream().trim(1000)
val concatenation = (stream1..stream2).toCsv("file://${file.absolutePath}")

runInParallel(listOf(concatenation), partitions = 2)

val fileContent = file.readLines()

it("should have non-empty output") { assertThat(fileContent).size().isGreaterThan(1) }

runLocally(listOf(concatenation))

val fileContentLocal = file.readLines()
log.info {
it("should have the same output as local") { assertThat(fileContent).isEqualTo(fileContentLocal) }
}
}
describe("finite..infinite") {
val file = File.createTempFile("test", ".csv").also { it.deleteOnExit() }

val stream1 = seqStream().trim(1000)
val stream2 = seqStream()
val concatenation = (stream1..stream2)
.trim(5000)
.toCsv("file://${file.absolutePath}")

runInParallel(listOf(concatenation), partitions = 2)

val fileContent = file.readLines()

it("should have non-empty output") { assertThat(fileContent).size().isGreaterThan(1) }

runLocally(listOf(concatenation))

val fileContentLocal = file.readLines()
log.info {
it("should have the same output as local") { assertThat(fileContent).isEqualTo(fileContentLocal) }
}
}
}
})
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ object RemoteTimeseriesTableDriverSpec : Spek({

val facilitator by memoized(SCOPE) {
Facilitator(
communicatorPort = 5000,
communicatorPort = 50001,
threadsNumber = 1,
onServerShutdownTimeoutMillis = 100
)
Expand All @@ -44,7 +44,7 @@ object RemoteTimeseriesTableDriverSpec : Spek({
}


val remoteTableDriver by memoized(SCOPE) { RemoteTimeseriesTableDriver<Sample>(tableName, "127.0.0.1:5000", Sample::class) }
val remoteTableDriver by memoized(SCOPE) { RemoteTimeseriesTableDriver<Sample>(tableName, "127.0.0.1:50001", Sample::class) }

it("should not return sample rate if not initialized") {
assertThat(catch { remoteTableDriver.sampleRate })
Expand Down
8 changes: 6 additions & 2 deletions lib/src/main/kotlin/io/wavebeans/lib/io/ListAsInput.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import io.wavebeans.lib.BeanParams
import io.wavebeans.lib.BeanStream
import io.wavebeans.lib.SourceBean
import io.wavebeans.lib.WaveBeansClassLoader
import io.wavebeans.lib.stream.FiniteStream
import kotlinx.serialization.*
import kotlinx.serialization.builtins.ListSerializer
import kotlinx.serialization.builtins.serializer
import java.util.concurrent.TimeUnit
import kotlin.reflect.jvm.jvmName

fun <T : Any> List<T>.input(): BeanStream<T> {
fun <T : Any> List<T>.input(): FiniteStream<T> {
require(this.isNotEmpty()) { "Input list should not be empty" }
return ListAsInput(ListAsInputParams(this))
}
Expand Down Expand Up @@ -72,8 +74,10 @@ object ListAsInputParamsSerializer : KSerializer<ListAsInputParams> {

class ListAsInput<T : Any>(
override val parameters: ListAsInputParams
) : BeanStream<T>, SourceBean<T> {
) : FiniteStream<T>, SourceBean<T> {

@Suppress("UNCHECKED_CAST")
override fun asSequence(sampleRate: Float): Sequence<T> = parameters.list.asSequence().map { it as T }

override fun length(timeUnit: TimeUnit): Long = 0
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.wavebeans.lib.stream

import io.wavebeans.lib.*
import kotlinx.serialization.Serializable

class AfterFilling<T : Any>(
private val zeroFiller: T
) : FiniteToStream<T> {
override fun convert(finiteStream: FiniteStream<T>): BeanStream<T> {
return AfterFillingFiniteStream(finiteStream, AfterFillingFiniteStreamParams(zeroFiller))
}
}

@Serializable
data class AfterFillingFiniteStreamParams<T>(
val zeroFiller: T
) : BeanParams()

private class AfterFillingFiniteStream<T : Any>(
val finiteStream: FiniteStream<T>,
val params: AfterFillingFiniteStreamParams<T>
) : BeanStream<T>, SingleBean<T> {

override val parameters: BeanParams = params

override val input: Bean<T> = finiteStream

override fun asSequence(sampleRate: Float): Sequence<T> {
return object : Iterator<T> {

val iterator = finiteStream
.asSequence(sampleRate)
.iterator()

override fun hasNext(): Boolean = true

override fun next(): T {
return if (iterator.hasNext()) { // there is something left to read
iterator.next()
} else {
params.zeroFiller
}
}
}.asSequence()
}
}
42 changes: 42 additions & 0 deletions lib/src/main/kotlin/io/wavebeans/lib/stream/ConcatenatedStream.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.wavebeans.lib.stream

import io.wavebeans.lib.*
import java.util.concurrent.TimeUnit

operator fun <T : Any> FiniteStream<T>.rangeTo(finiteStream: FiniteStream<T>): FiniteStream<T> =
ConcatenatedFiniteStream(this, finiteStream)

operator fun <T : Any> FiniteStream<T>.rangeTo(stream: BeanStream<T>): BeanStream<T> =
ConcatenatedStream(this, stream)

class ConcatenatedFiniteStream<T : Any>(
private val stream1: FiniteStream<T>,
private val stream2: FiniteStream<T>,
override val parameters: NoParams = NoParams()
) : FiniteStream<T>, MultiBean<T>, SinglePartitionBean {

override val inputs: List<Bean<T>>
get() = listOf(stream1, stream2)

override fun asSequence(sampleRate: Float): Sequence<T> {
return stream1.asSequence(sampleRate) + stream2.asSequence(sampleRate)
}

override fun length(timeUnit: TimeUnit): Long {
return stream1.length(timeUnit) + stream2.length(timeUnit)
}
}

class ConcatenatedStream<T : Any>(
private val stream1: FiniteStream<T>,
private val stream2: BeanStream<T>,
override val parameters: NoParams = NoParams()
) : BeanStream<T>, MultiBean<T>, SinglePartitionBean {

override val inputs: List<Bean<T>>
get() = listOf(stream1, stream2)

override fun asSequence(sampleRate: Float): Sequence<T> {
return stream1.asSequence(sampleRate) + stream2.asSequence(sampleRate)
}
}
Loading

0 comments on commit 41b592a

Please sign in to comment.