diff --git a/.release/concatenation.md b/.release/concatenation.md new file mode 100644 index 00000000..4ec3a26b --- /dev/null +++ b/.release/concatenation.md @@ -0,0 +1 @@ +* [ [#82](https://github.com/WaveBeans/wavebeans/issues/82) ] [Concatenation](/docs/user/api/operations/concatenation-operation.md) operation \ No newline at end of file diff --git a/docs/user/api/inputs/finite-converters.md b/docs/user/api/inputs/finite-converters.md index b97fcc12..06c29214 100644 --- a/docs/user/api/inputs/finite-converters.md +++ b/docs/user/api/inputs/finite-converters.md @@ -7,16 +7,18 @@ Finite converters All WaveBeans streams supposed to be infinite during processing, however certain inputs like [wav-files](wav-file.md) are finite by their nature and that conflict needs to be resolved. For that particular purpose there is an abstraction that allows you to convert such finite stream into infinite one based on defined strategy. For example, you may replace all samples with zeros which are out of range of the source stream -- basically replace with silence, so it won't affect any other stream while you're mixing the up together for instance. -It is required for some of the cases and implemented only for type `Sample` currently. Also, only one converter is supported out of the box which fills with zeros everything what is out of the range of the source stream -- `ZeroFilling`. To convert finite stream to infinite one, you can call method `sampleStream()` on the FiniteSampleStream instance: +The conversion is recommended but not required, as many operations are designed to match the lengths of the finite streams and assume Zero samples automatically or ask to provide one. + +At the moment, only one converter is supported out of the box which fills with provided value everything what is out of the range of the source stream, it is called `AfterFilling`. To convert finite stream to infinite one, you can call method `stream(converter)` on the `FiniteStream` instance providing the converted. For example to convert `FiniteStream` to infinite one `BeanStream` with filling of zero samples at the end: ```kotlin -someFiniteStream.sampleStream(ZeroFilling()) +someFiniteStream.stream(AfterFilling(ZeroSample)) ``` Also some of the API way call this method implicitly, for example, you can read wav-file and convert it to a infinite stream at once: ```kotlin -wave("file:///path/to/file.wav", ZeroFilling()) +wave("file:///path/to/file.wav", AfterFilling(ZeroSample)) ``` *Note: That functionality will be reworked in the near future to provide more transparent and flexible API.* \ No newline at end of file diff --git a/docs/user/api/inputs/list-as-input.md b/docs/user/api/inputs/list-as-input.md index bc22599a..a1318f4e 100644 --- a/docs/user/api/inputs/list-as-input.md +++ b/docs/user/api/inputs/list-as-input.md @@ -6,7 +6,7 @@ List as Input Whenever you want to generate the input based on some predefined value such as list of values, you may that approach and convert any `List` into a stream with corresponding type T. The main difference between List as Input and [input as function](function-as-input.md) is that all values are available at the time the stream starts, but not being evaluated. -You can convert any `List` to `BeanStream` by simply calling `input()` function of the list: +You can convert any `List` to `FiniteStream` by simply calling `input()` function of the list: ```kotlin listOf(1, 2, 3).input() @@ -16,4 +16,6 @@ listOf(1, 2, 3).input() (0..44100).map { sin(44100.0 / it) }.input() ``` +The important thing that the stream created is finite. If you want to convert it into infinite one, consider using [converter](finite-converters.md). + It's important to remember, that list is being evaluated before the stream itself and in case of distributed executions the result should be propagated across all executors, so being lean about using such inputs is essential. \ No newline at end of file diff --git a/docs/user/api/operations/concatenation-operation.md b/docs/user/api/operations/concatenation-operation.md new file mode 100644 index 00000000..b5534554 --- /dev/null +++ b/docs/user/api/operations/concatenation-operation.md @@ -0,0 +1,50 @@ +Concatenation operation +===== + + + + + +Whenever you need to concatenate stream one after another you may use this operator. The leading stream though should be finite and be the type of `FiniteStream`, another stream operand should be finite or infinite but the same type. Depending on the second argument type, the resulting stream will be either finite of type `FiniteStream`, or infinite of type `BeanStream`. + +This is what is happening schematically, the stream are being read one after another: + +```text +[1, 2, 3, 4] .. [10, 12, 13] -> [1, 2, 3, 4, 10, 12, 13] +``` + +The inline operator `..` concatenate two streams, you may use them one-by-one as many as you like to concatenate more streams altogether: + +``` +// finite streams +val a = listOf(1, 2).input() +val b = listOf(3, 4, 5).input() +val c = listOf(10, 20, 30).input() +// infinite stream +val d = listOf(100, 200).input().stream(AfterFilling(0)) + +(a..b).asSequence(44100.0f).toList() +// results in stream [1, 2, 3, 4, 5] + +(b..c).asSequence(44100.0f).toList() +// results in stream [3, 4, 5, 10, 20, 30] + +(a..b..c).asSequence(44100.0f).toList() +// results in stream [1, 2, 3, 4, 5, 10, 20, 30] + +(a..b..c..d).asSequence(44100.0f).take(15).toList() +// results in stream [1, 2, 3, 4, 5, 10, 20, 30, 100, 200, 0, 0, 0, 0, 0] + +(d..c).asSequence(44100.0f).take(15).toList() +// won't compile as the first operand `d` is infinite stream +``` + +If you want to do a finite stream from the infinite one, you may use [trim operation](trim-operation.md): + +```kotlin +val sine1 = 440.sine().trim(3) +val sine2 = listOf(0.1, 0.2, 0.3).map { sampleOf(it) }.input() + +(sine1..sine2).asSequence(1000.0f).toList() +// results in stream [1.0, -0.9297764858882515, 0.7289686274214119, 0.1, 0.2, 0.3] +``` diff --git a/exe/src/main/kotlin/io/wavebeans/execution/PodRegistry.kt b/exe/src/main/kotlin/io/wavebeans/execution/PodRegistry.kt index aceb3125..df21b713 100644 --- a/exe/src/main/kotlin/io/wavebeans/execution/PodRegistry.kt +++ b/exe/src/main/kotlin/io/wavebeans/execution/PodRegistry.kt @@ -21,10 +21,12 @@ object PodRegistry { private val splittingPodRegistry = mutableMapOf>() init { + registerPodProxy(typeOf>(), AnyFiniteStreamPodProxy::class.constructors.first()) registerPodProxy(typeOf>(), AnyStreamPodProxy::class.constructors.first()) registerMergingPodProxy(typeOf>(), AnyStreamMergingPodProxy::class.constructors.first()) + registerPod(typeOf>(), AnyFiniteStreamingPod::class.constructors.single { it.parameters.size == 2 }) registerPod(typeOf>(), AnyStreamingPod::class.constructors.single { it.parameters.size == 2 }) registerPod(typeOf>(), AnyStreamOutputPod::class.constructors.first()) diff --git a/exe/src/main/kotlin/io/wavebeans/execution/distributed/Facilitator.kt b/exe/src/main/kotlin/io/wavebeans/execution/distributed/Facilitator.kt index f750a8a3..daa1c6b9 100644 --- a/exe/src/main/kotlin/io/wavebeans/execution/distributed/Facilitator.kt +++ b/exe/src/main/kotlin/io/wavebeans/execution/distributed/Facilitator.kt @@ -70,7 +70,7 @@ class Facilitator( private val startupClasses = startUpClasses() private val jobStates = ConcurrentHashMap() private var startedFrom: List? = null - private lateinit var communicator: io.grpc.Server + private var communicator: io.grpc.Server? = null fun start(): Facilitator { if (startedFrom != null) @@ -88,7 +88,7 @@ class Facilitator( .addService(FacilitatorGrpcService.instance(this)) .build() .start() - log.info { "Communicator on port $communicatorPort started." } + log.info { "Communicator on port $it started." } } return this @@ -215,10 +215,12 @@ class Facilitator( } override fun close() { - communicatorPort?.let { - if (!communicator.shutdown().awaitTermination(onServerShutdownTimeoutMillis, TimeUnit.MILLISECONDS)) { - communicator.shutdownNow() - } + if ( + communicator + ?.shutdown() + ?.awaitTermination(onServerShutdownTimeoutMillis, TimeUnit.MILLISECONDS) == false + ) { + communicator?.shutdownNow() } } diff --git a/exe/src/main/kotlin/io/wavebeans/execution/pod/AnyStreamingPod.kt b/exe/src/main/kotlin/io/wavebeans/execution/pod/AnyStreamingPod.kt index 42f012a8..fcbc70ea 100644 --- a/exe/src/main/kotlin/io/wavebeans/execution/pod/AnyStreamingPod.kt +++ b/exe/src/main/kotlin/io/wavebeans/execution/pod/AnyStreamingPod.kt @@ -1,6 +1,7 @@ package io.wavebeans.execution.pod import io.wavebeans.lib.BeanStream +import io.wavebeans.lib.stream.FiniteStream class AnyStreamingPod( bean: BeanStream, @@ -8,4 +9,11 @@ class AnyStreamingPod( ) : StreamingPod>( bean = bean, podKey = podKey +) +class AnyFiniteStreamingPod( + bean: FiniteStream, + podKey: PodKey +) : StreamingPod>( + bean = bean, + podKey = podKey ) \ No newline at end of file diff --git a/exe/src/main/kotlin/io/wavebeans/execution/podproxy/AnyFiniteStreamPodProxy.kt b/exe/src/main/kotlin/io/wavebeans/execution/podproxy/AnyFiniteStreamPodProxy.kt new file mode 100644 index 00000000..e1d3487d --- /dev/null +++ b/exe/src/main/kotlin/io/wavebeans/execution/podproxy/AnyFiniteStreamPodProxy.kt @@ -0,0 +1,22 @@ +package io.wavebeans.execution.podproxy + +import io.wavebeans.execution.medium.value +import io.wavebeans.execution.pod.PodKey +import io.wavebeans.lib.stream.FiniteStream +import java.util.concurrent.TimeUnit + +class AnyFiniteStreamPodProxy( + podKey: PodKey, + forPartition: Int +) : FiniteStream, StreamingPodProxy( + pointedTo = podKey, + forPartition = forPartition +) { + override fun length(timeUnit: TimeUnit): Long { + val bush = podDiscovery.bushFor(pointedTo) + val caller = bushCallerRepository.create(bush, pointedTo) + return caller.call("length?timeUnit=${timeUnit}") + .get(5000, TimeUnit.MILLISECONDS) + .value() + } +} \ No newline at end of file diff --git a/exe/src/test/kotlin/io/wavebeans/execution/OverseerIntegrationSpec.kt b/exe/src/test/kotlin/io/wavebeans/execution/OverseerIntegrationSpec.kt index e6cdee47..dd7400e6 100644 --- a/exe/src/test/kotlin/io/wavebeans/execution/OverseerIntegrationSpec.kt +++ b/exe/src/test/kotlin/io/wavebeans/execution/OverseerIntegrationSpec.kt @@ -381,4 +381,49 @@ object OverseerIntegrationSpec : Spek({ it("should have the same output as local") { assertThat(fileContent).isEqualTo(fileContentLocal) } } + + describe("Concatenation") { + describe("finite..finite") { + val file = File.createTempFile("test", ".csv").also { it.deleteOnExit() } + + val stream1 = seqStream().trim(1000) + val stream2 = seqStream().trim(1000) + val concatenation = (stream1..stream2).toCsv("file://${file.absolutePath}") + + runInParallel(listOf(concatenation), partitions = 2) + + val fileContent = file.readLines() + + it("should have non-empty output") { assertThat(fileContent).size().isGreaterThan(1) } + + runLocally(listOf(concatenation)) + + val fileContentLocal = file.readLines() + log.info { + it("should have the same output as local") { assertThat(fileContent).isEqualTo(fileContentLocal) } + } + } + describe("finite..infinite") { + val file = File.createTempFile("test", ".csv").also { it.deleteOnExit() } + + val stream1 = seqStream().trim(1000) + val stream2 = seqStream() + val concatenation = (stream1..stream2) + .trim(5000) + .toCsv("file://${file.absolutePath}") + + runInParallel(listOf(concatenation), partitions = 2) + + val fileContent = file.readLines() + + it("should have non-empty output") { assertThat(fileContent).size().isGreaterThan(1) } + + runLocally(listOf(concatenation)) + + val fileContentLocal = file.readLines() + log.info { + it("should have the same output as local") { assertThat(fileContent).isEqualTo(fileContentLocal) } + } + } + } }) diff --git a/exe/src/test/kotlin/io/wavebeans/execution/distributed/RemoteTimeseriesTableDriverSpec.kt b/exe/src/test/kotlin/io/wavebeans/execution/distributed/RemoteTimeseriesTableDriverSpec.kt index 9c847487..e4d1717e 100644 --- a/exe/src/test/kotlin/io/wavebeans/execution/distributed/RemoteTimeseriesTableDriverSpec.kt +++ b/exe/src/test/kotlin/io/wavebeans/execution/distributed/RemoteTimeseriesTableDriverSpec.kt @@ -28,7 +28,7 @@ object RemoteTimeseriesTableDriverSpec : Spek({ val facilitator by memoized(SCOPE) { Facilitator( - communicatorPort = 5000, + communicatorPort = 50001, threadsNumber = 1, onServerShutdownTimeoutMillis = 100 ) @@ -44,7 +44,7 @@ object RemoteTimeseriesTableDriverSpec : Spek({ } - val remoteTableDriver by memoized(SCOPE) { RemoteTimeseriesTableDriver(tableName, "127.0.0.1:5000", Sample::class) } + val remoteTableDriver by memoized(SCOPE) { RemoteTimeseriesTableDriver(tableName, "127.0.0.1:50001", Sample::class) } it("should not return sample rate if not initialized") { assertThat(catch { remoteTableDriver.sampleRate }) diff --git a/lib/src/main/kotlin/io/wavebeans/lib/io/ListAsInput.kt b/lib/src/main/kotlin/io/wavebeans/lib/io/ListAsInput.kt index a9eaf7bc..71f45fbe 100644 --- a/lib/src/main/kotlin/io/wavebeans/lib/io/ListAsInput.kt +++ b/lib/src/main/kotlin/io/wavebeans/lib/io/ListAsInput.kt @@ -4,12 +4,14 @@ import io.wavebeans.lib.BeanParams import io.wavebeans.lib.BeanStream import io.wavebeans.lib.SourceBean import io.wavebeans.lib.WaveBeansClassLoader +import io.wavebeans.lib.stream.FiniteStream import kotlinx.serialization.* import kotlinx.serialization.builtins.ListSerializer import kotlinx.serialization.builtins.serializer +import java.util.concurrent.TimeUnit import kotlin.reflect.jvm.jvmName -fun List.input(): BeanStream { +fun List.input(): FiniteStream { require(this.isNotEmpty()) { "Input list should not be empty" } return ListAsInput(ListAsInputParams(this)) } @@ -72,8 +74,10 @@ object ListAsInputParamsSerializer : KSerializer { class ListAsInput( override val parameters: ListAsInputParams -) : BeanStream, SourceBean { +) : FiniteStream, SourceBean { @Suppress("UNCHECKED_CAST") override fun asSequence(sampleRate: Float): Sequence = parameters.list.asSequence().map { it as T } + + override fun length(timeUnit: TimeUnit): Long = 0 } \ No newline at end of file diff --git a/lib/src/main/kotlin/io/wavebeans/lib/stream/AfterFillingFiniteStream.kt b/lib/src/main/kotlin/io/wavebeans/lib/stream/AfterFillingFiniteStream.kt new file mode 100644 index 00000000..3ebe85f7 --- /dev/null +++ b/lib/src/main/kotlin/io/wavebeans/lib/stream/AfterFillingFiniteStream.kt @@ -0,0 +1,46 @@ +package io.wavebeans.lib.stream + +import io.wavebeans.lib.* +import kotlinx.serialization.Serializable + +class AfterFilling( + private val zeroFiller: T +) : FiniteToStream { + override fun convert(finiteStream: FiniteStream): BeanStream { + return AfterFillingFiniteStream(finiteStream, AfterFillingFiniteStreamParams(zeroFiller)) + } +} + +@Serializable +data class AfterFillingFiniteStreamParams( + val zeroFiller: T +) : BeanParams() + +private class AfterFillingFiniteStream( + val finiteStream: FiniteStream, + val params: AfterFillingFiniteStreamParams +) : BeanStream, SingleBean { + + override val parameters: BeanParams = params + + override val input: Bean = finiteStream + + override fun asSequence(sampleRate: Float): Sequence { + return object : Iterator { + + val iterator = finiteStream + .asSequence(sampleRate) + .iterator() + + override fun hasNext(): Boolean = true + + override fun next(): T { + return if (iterator.hasNext()) { // there is something left to read + iterator.next() + } else { + params.zeroFiller + } + } + }.asSequence() + } +} \ No newline at end of file diff --git a/lib/src/main/kotlin/io/wavebeans/lib/stream/ConcatenatedStream.kt b/lib/src/main/kotlin/io/wavebeans/lib/stream/ConcatenatedStream.kt new file mode 100644 index 00000000..3283c830 --- /dev/null +++ b/lib/src/main/kotlin/io/wavebeans/lib/stream/ConcatenatedStream.kt @@ -0,0 +1,42 @@ +package io.wavebeans.lib.stream + +import io.wavebeans.lib.* +import java.util.concurrent.TimeUnit + +operator fun FiniteStream.rangeTo(finiteStream: FiniteStream): FiniteStream = + ConcatenatedFiniteStream(this, finiteStream) + +operator fun FiniteStream.rangeTo(stream: BeanStream): BeanStream = + ConcatenatedStream(this, stream) + +class ConcatenatedFiniteStream( + private val stream1: FiniteStream, + private val stream2: FiniteStream, + override val parameters: NoParams = NoParams() +) : FiniteStream, MultiBean, SinglePartitionBean { + + override val inputs: List> + get() = listOf(stream1, stream2) + + override fun asSequence(sampleRate: Float): Sequence { + return stream1.asSequence(sampleRate) + stream2.asSequence(sampleRate) + } + + override fun length(timeUnit: TimeUnit): Long { + return stream1.length(timeUnit) + stream2.length(timeUnit) + } +} + +class ConcatenatedStream( + private val stream1: FiniteStream, + private val stream2: BeanStream, + override val parameters: NoParams = NoParams() +) : BeanStream, MultiBean, SinglePartitionBean { + + override val inputs: List> + get() = listOf(stream1, stream2) + + override fun asSequence(sampleRate: Float): Sequence { + return stream1.asSequence(sampleRate) + stream2.asSequence(sampleRate) + } +} \ No newline at end of file diff --git a/lib/src/main/kotlin/io/wavebeans/lib/stream/ZeroFillingFiniteSampleStream.kt b/lib/src/main/kotlin/io/wavebeans/lib/stream/ZeroFillingFiniteSampleStream.kt deleted file mode 100644 index 3a8b6141..00000000 --- a/lib/src/main/kotlin/io/wavebeans/lib/stream/ZeroFillingFiniteSampleStream.kt +++ /dev/null @@ -1,38 +0,0 @@ -package io.wavebeans.lib.stream - -import io.wavebeans.lib.* - -class ZeroFilling : FiniteToStream { - override fun convert(finiteStream: FiniteStream): BeanStream { - return ZeroFillingFiniteSampleStream(finiteStream) - } -} - -private class ZeroFillingFiniteSampleStream( - val finiteStream: FiniteStream, - val params: NoParams = NoParams() -) : BeanStream, SingleBean { - - override val parameters: BeanParams = params - - override val input: Bean = finiteStream - - override fun asSequence(sampleRate: Float): Sequence { - return object : Iterator { - - val iterator = finiteStream - .asSequence(sampleRate) - .iterator() - - override fun hasNext(): Boolean = true - - override fun next(): Sample { - return if (iterator.hasNext()) { // there is something left to read - iterator.next() - } else { - ZeroSample - } - } - }.asSequence() - } -} \ No newline at end of file diff --git a/lib/src/test/kotlin/io/wavebeans/lib/TestUtils.kt b/lib/src/test/kotlin/io/wavebeans/lib/TestUtils.kt index ba709869..0782e99d 100644 --- a/lib/src/test/kotlin/io/wavebeans/lib/TestUtils.kt +++ b/lib/src/test/kotlin/io/wavebeans/lib/TestUtils.kt @@ -17,7 +17,7 @@ import io.wavebeans.lib.math.ComplexNumber import io.wavebeans.lib.math.minus import io.wavebeans.lib.math.plus import io.wavebeans.lib.stream.FiniteInputStream -import io.wavebeans.lib.stream.ZeroFilling +import io.wavebeans.lib.stream.AfterFilling import io.wavebeans.lib.stream.stream import io.wavebeans.lib.stream.window.Window import org.spekframework.spek2.dsl.Skip @@ -79,7 +79,7 @@ fun Iterable.stream(sampleRate: Float, bitDepth: BitDepth = BitDepth.BIT_8) } }.flatten().toList().toByteArray().asInput(sampleRate, bitDepth), NoParams() - ).stream(ZeroFilling()) + ).stream(AfterFilling(ZeroSample)) } fun ByteArray.asInput(sampleRate: Float, bitDepth: BitDepth = BitDepth.BIT_8): FiniteInput = diff --git a/lib/src/test/kotlin/io/wavebeans/lib/stream/ConcatenatedStreamSpec.kt b/lib/src/test/kotlin/io/wavebeans/lib/stream/ConcatenatedStreamSpec.kt new file mode 100644 index 00000000..2f9f823a --- /dev/null +++ b/lib/src/test/kotlin/io/wavebeans/lib/stream/ConcatenatedStreamSpec.kt @@ -0,0 +1,112 @@ +package io.wavebeans.lib.stream + +import assertk.Assert +import assertk.all +import assertk.assertThat +import assertk.assertions.isCloseTo +import assertk.assertions.isEqualTo +import assertk.assertions.isInstanceOf +import assertk.assertions.prop +import io.wavebeans.lib.BeanStream +import io.wavebeans.lib.Sample +import io.wavebeans.lib.eachIndexed +import io.wavebeans.lib.seqStream +import org.spekframework.spek2.Spek +import org.spekframework.spek2.style.specification.describe +import java.util.concurrent.TimeUnit + +object ConcatenatedStreamSpec : Spek({ + + describe("Concatenate finite streams") { + + it("should concatenate two non empty streams") { + val s1 = seqStream().trim(100) + val s2 = seqStream().trim(100) + val r = s1..s2 + assertThat(r).all { + isInstanceOf(FiniteStream::class) + lengthInMs().isEqualTo(200L) + sequence(1000.0f).eachIndexed(200) { sample, idx -> + val sampleValue = (idx % 100) * 1e-10 + sample.isCloseTo(sampleValue, 1e-20) + } + } + } + it("should concatenate when first stream is empty") { + val s1 = seqStream().trim(0) + val s2 = seqStream().trim(100) + val r = s1..s2 + assertThat(r).all { + isInstanceOf(FiniteStream::class) + lengthInMs().isEqualTo(100L) + sequence(1000.0f).eachIndexed(100) { sample, idx -> + val sampleValue = (idx % 100) * 1e-10 + sample.isCloseTo(sampleValue, 1e-20) + } + } + } + it("should concatenate when second stream is empty") { + val s1 = seqStream().trim(100) + val s2 = seqStream().trim(0) + val r = s1..s2 + assertThat(r).all { + isInstanceOf(FiniteStream::class) + lengthInMs().isEqualTo(100L) + sequence(1000.0f).eachIndexed(100) { sample, idx -> + val sampleValue = (idx % 100) * 1e-10 + sample.isCloseTo(sampleValue, 1e-20) + } + } + } + it("should concatenate five non empty streams") { + val s1 = seqStream().trim(100) + val s2 = seqStream().trim(100) + val s3 = seqStream().trim(100) + val s4 = seqStream().trim(100) + val s5 = seqStream().trim(100) + val r = s1..s2..s3..s4..s5 + assertThat(r).all { + isInstanceOf(FiniteStream::class) + lengthInMs().isEqualTo(500L) + sequence(1000.0f).eachIndexed(500) { sample, idx -> + val sampleValue = (idx % 100) * 1e-10 + sample.isCloseTo(sampleValue, 1e-20) + } + } + } + } + + describe("Concatenate finite and infinite streams") { + + it("should concatenate two non empty streams") { + val s1 = seqStream().trim(100) + val s2 = seqStream() + val r = s1..s2 + assertThat(r).all { + isInstanceOf(BeanStream::class) + sequence(1000.0f, take = 200).eachIndexed(200) { sample, idx -> + val sampleValue = (idx % 100) * 1e-10 + sample.isCloseTo(sampleValue, 1e-20) + } + } + } + it("should concatenate empty finite stream with non-empty infinite one") { + val s1 = seqStream().trim(0) + val s2 = seqStream() + val r = s1..s2 + assertThat(r).all { + isInstanceOf(BeanStream::class) + sequence(1000.0f, take = 200).eachIndexed(200) { sample, idx -> + val sampleValue = (idx % 200) * 1e-10 + sample.isCloseTo(sampleValue, 1e-20) + } + } + } + } +}) + +private fun Assert>.lengthInMs() = + prop("length") { it.length(TimeUnit.MILLISECONDS) } + +private fun Assert>.sequence(sampleRate: Float, take: Int = Int.MAX_VALUE) = + prop("asSequence($sampleRate)") { it.asSequence(sampleRate).take(take).toList() } \ No newline at end of file diff --git a/lib/src/test/kotlin/io/wavebeans/lib/stream/ZeroFillingFiniteSampleStreamSpec.kt b/lib/src/test/kotlin/io/wavebeans/lib/stream/ZeroFillingFiniteSampleStreamSpec.kt index 22338590..a44f61d6 100644 --- a/lib/src/test/kotlin/io/wavebeans/lib/stream/ZeroFillingFiniteSampleStreamSpec.kt +++ b/lib/src/test/kotlin/io/wavebeans/lib/stream/ZeroFillingFiniteSampleStreamSpec.kt @@ -27,7 +27,7 @@ object ZeroFillingFiniteSampleStreamSpec : Spek({ describe("Finite stream having 10 elements") { val seq = 10.repeat { it } - val zeroFilling = stream(seq).stream(ZeroFilling()) + val zeroFilling = stream(seq).stream(AfterFilling(ZeroSample)) it("should return first 10 elements") { assertThat( @@ -50,7 +50,7 @@ object ZeroFillingFiniteSampleStreamSpec : Spek({ describe("Finite stream having more elements than default sample array size") { val elCount = (512 * 3.14).toInt() val seq = elCount.repeat { it } - val zeroFilling = stream(seq).stream(ZeroFilling()) + val zeroFilling = stream(seq).stream(AfterFilling(ZeroSample)) it("should return first $elCount elements") { assertThat( @@ -72,7 +72,7 @@ object ZeroFillingFiniteSampleStreamSpec : Spek({ describe("Finite stream having 10 elements taking first half") { val seq = 10.repeat { it } - val zeroFilling = stream(seq).stream(ZeroFilling()).rangeProjection(0, 500) + val zeroFilling = stream(seq).stream(AfterFilling(ZeroSample)).rangeProjection(0, 500) it("should return first 5 elements") { assertThat( @@ -95,7 +95,7 @@ object ZeroFillingFiniteSampleStreamSpec : Spek({ describe("Finite stream having 10 elements taking second half") { val seq = 10.repeat { it } - val zeroFilling = stream(seq).stream(ZeroFilling()).rangeProjection(500, 1000) + val zeroFilling = stream(seq).stream(AfterFilling(ZeroSample)).rangeProjection(500, 1000) it("should return second 5 elements") { assertThat(