From aa0af48729366cc14900b9879dcedfe6bfed4b85 Mon Sep 17 00:00:00 2001 From: asubb Date: Fri, 5 Jun 2020 22:47:42 -0400 Subject: [PATCH] [#62] [ HTTP API improvements. Audio and Table service no longer require some parameters (sampleRate, sourceType) which can be inferred from the table itself. ] --- .release/fix-audio-stream-api.md | 1 + docs/user/http/readme.md | 7 +-- .../distributed/DistributedOverseer.kt | 6 +- .../RemoteTimeseriesTableDriver.kt | 9 ++- .../RemoteTimeseriesTableDriverSpec.kt | 13 +++- .../kotlin/io/wavebeans/http/AudioService.kt | 36 +++++------ .../wavebeans/http/HttpCommunicatorService.kt | 6 +- .../kotlin/io/wavebeans/http/TableService.kt | 22 +++---- .../io/wavebeans/http/AudioServiceSpec.kt | 28 ++++++--- .../io/wavebeans/http/HttpCommunicatorSpec.kt | 2 +- .../io/wavebeans/http/HttpServiceSpec.kt | 62 ++++++++++++------- .../io/wavebeans/http/TableServiceSpec.kt | 10 +-- .../table/InMemoryTimeseriesTableDriver.kt | 8 ++- .../io/wavebeans/lib/table/TableOutput.kt | 2 +- .../lib/table/TimeseriesTableDriver.kt | 7 ++- .../communicator/HttpCommunicatorClient.kt | 3 +- proto/src/main/proto/httpCommunicator.proto | 1 + 17 files changed, 132 insertions(+), 91 deletions(-) create mode 100644 .release/fix-audio-stream-api.md diff --git a/.release/fix-audio-stream-api.md b/.release/fix-audio-stream-api.md new file mode 100644 index 00000000..e569c6e4 --- /dev/null +++ b/.release/fix-audio-stream-api.md @@ -0,0 +1 @@ +* [ [#62](https://github.com/WaveBeans/wavebeans/issues/62) ] HTTP API improvements. Audio and Table service no longer require some parameters (sampleRate, sourceType) which can be inferred from the table itself. \ No newline at end of file diff --git a/docs/user/http/readme.md b/docs/user/http/readme.md index 40fc2771..28df2cf1 100644 --- a/docs/user/http/readme.md +++ b/docs/user/http/readme.md @@ -67,11 +67,9 @@ Following methods available, mostly this methods are exposed from [table query f * Getting last Interval: `/table/{tableName}/last?interval={interval}[&sampleRate={sampleRate}]`: * `tableName` -- the name of the table to query, if the table can't be found the API call with return `404 Not Found` HTTP code. * `interval` -- the interval you're requesting, the type of [TimeMeasure](#time-measure). If malformed you'll see `400 Bad Request` HTTP code. - * `sampleRate` -- is optional parameter which defines which sample rate to use to form the stream. By default 44100. In this case only affects the offset time values. * Getting specific time range: `/table/{tableName}/timeRange?from={from}&to={to}[&sampleRate={sampleRate}]`: * `tableName` -- the name of the table to query, if the table can't be found the API call with return `404 Not Found` HTTP code. * `from`, `to` -- the from and to values of the interval you're requesting, the type of [TimeMeasure](#time-measure). If malformed you'll see `400 Bad Request` HTTP code. - * `sampleRate` -- is optional parameter which defines which sample rate to use to form the stream. By default 44100. In this case only affects the offset time values. Both endpoints return stream as new-line separated JSON objects like this: @@ -222,11 +220,8 @@ Additional useful parameters:SampleCountMeasurementSpec * `bitDepth` -- either 8, 16, 24, 32 or 64. The number oif bits per sample to stream. FYI, wav-format support up to 32 bits per sample. By default it is 16 bit. * `limit` -- interval to limit by, follow [Time Measure](#time-measure) rules. By default, it is unlimited as not specified. -Current limitations and considerations: -* You need to specify `sampleRate` explicitly if it is different from 44100Hz. It should be the same the data is being stored into the table, otherwise it won't be converted automatically and you'll get data reproduced on higher or lower rate as format needs to define that value. -* The table type doesn't contain currently the type of data it is keeping, so you need to define it explicitly. Parameter `sourceType` -- you can stream from `sample` or `sampleArray` table, by default it is `sample`. +Current considerations: * The wav format requires to specify the length in the header. To make streaming possible the length value is populated with `Int.MAX_VALUE`. Depending on the number of bits and channels it may last for a few days nonstop. Then most players just stop playing sound, however the actual data is being transferred normally. -* Most of these limitations can be addressed in next releases. ## Distributed mode diff --git a/exe/src/main/kotlin/io/wavebeans/execution/distributed/DistributedOverseer.kt b/exe/src/main/kotlin/io/wavebeans/execution/distributed/DistributedOverseer.kt index 14f47a01..674220ee 100644 --- a/exe/src/main/kotlin/io/wavebeans/execution/distributed/DistributedOverseer.kt +++ b/exe/src/main/kotlin/io/wavebeans/execution/distributed/DistributedOverseer.kt @@ -135,13 +135,13 @@ class DistributedOverseer( val bushEndpoints = plantBushes(jobKey, sampleRate) bushEndpoints.forEach { FacilitatorCheckJob(it.location).start() } registerBushEndpoint(bushEndpoints) - registerTables() + registerTables(sampleRate) startJob(jobKey) return locationFutures.values.toList() } - private fun registerTables() { + private fun registerTables(sampleRate: Float) { val facilitatorToTableNames = distribution.entries.map { val tableNames = it.value.asSequence() .map { podRef -> podRef.internalBeans } @@ -156,7 +156,7 @@ class DistributedOverseer( facilitatorToTableNames .flatMap { it.second.map { v -> it.first to v } } .forEach { (facilitatorLocation, tableName) -> - client.registerTable(tableName, facilitatorLocation) + client.registerTable(tableName, facilitatorLocation, sampleRate) } } } diff --git a/exe/src/main/kotlin/io/wavebeans/execution/distributed/RemoteTimeseriesTableDriver.kt b/exe/src/main/kotlin/io/wavebeans/execution/distributed/RemoteTimeseriesTableDriver.kt index bc2c1698..966dd890 100644 --- a/exe/src/main/kotlin/io/wavebeans/execution/distributed/RemoteTimeseriesTableDriver.kt +++ b/exe/src/main/kotlin/io/wavebeans/execution/distributed/RemoteTimeseriesTableDriver.kt @@ -18,9 +18,16 @@ class RemoteTimeseriesTableDriver( override val tableType: KClass ) : TimeseriesTableDriver { + override val sampleRate: Float + get() = sampleRateValue[0] + .let { if (it < 0) throw IllegalStateException("Sample rate value is not initialized yet") else it } + lateinit var client: TableApiClient - override fun init() { + private val sampleRateValue: FloatArray = FloatArray(1) { Float.NEGATIVE_INFINITY } + + override fun init(sampleRate: Float) { + sampleRateValue[0] = sampleRate client = TableApiClient(tableName, facilitatorLocation) } diff --git a/exe/src/test/kotlin/io/wavebeans/execution/distributed/RemoteTimeseriesTableDriverSpec.kt b/exe/src/test/kotlin/io/wavebeans/execution/distributed/RemoteTimeseriesTableDriverSpec.kt index c4332e2f..bfbb5a60 100644 --- a/exe/src/test/kotlin/io/wavebeans/execution/distributed/RemoteTimeseriesTableDriverSpec.kt +++ b/exe/src/test/kotlin/io/wavebeans/execution/distributed/RemoteTimeseriesTableDriverSpec.kt @@ -47,8 +47,15 @@ object RemoteTimeseriesTableDriverSpec : Spek({ val remoteTableDriver by memoized(SCOPE) { RemoteTimeseriesTableDriver(tableName, "127.0.0.1:5000", Sample::class) } + it("should not return sample rate if not initialized") { + assertThat(catch { remoteTableDriver.sampleRate }) + .isNotNull() + .isInstanceOf(IllegalStateException::class) + } + it("should init") { - assertThat(catch { remoteTableDriver.init() }).isNull() + remoteTableDriver.init(12345.0f) + assertThat(remoteTableDriver.sampleRate).isEqualTo(12345.0f) } it("should return null for first marker") { whenever(tableDriver.firstMarker()).thenReturn(null) @@ -67,12 +74,12 @@ object RemoteTimeseriesTableDriverSpec : Spek({ assertThat(remoteTableDriver.lastMarker()).isNotNull().isEqualTo(100.s) } it("should put sample value") { - tableDriver.init() // need to initialize memoized value + tableDriver.init(12345.0f) // need to initialize memoized value remoteTableDriver.put(1.s, sampleOf(1.0)) verify(tableDriver, times(1)).put(eq(1.s), eq(sampleOf(1.0))) } it("should reset") { - tableDriver.init() // need to initialize memoized value + tableDriver.init(12345.0f) // need to initialize memoized value remoteTableDriver.reset() verify(tableDriver, times(1)).reset() } diff --git a/http/src/main/kotlin/io/wavebeans/http/AudioService.kt b/http/src/main/kotlin/io/wavebeans/http/AudioService.kt index 571a76a0..d4e780cb 100644 --- a/http/src/main/kotlin/io/wavebeans/http/AudioService.kt +++ b/http/src/main/kotlin/io/wavebeans/http/AudioService.kt @@ -16,11 +16,9 @@ import io.wavebeans.lib.table.TimeseriesTableDriver import mu.KotlinLogging import java.io.BufferedOutputStream import java.io.InputStream -import java.io.Serializable import java.util.* import java.util.concurrent.LinkedTransferQueue import java.util.concurrent.TimeUnit -import kotlin.reflect.KClass enum class AudioStreamOutputFormat(val id: String, val contentType: ContentType) { WAV("wav", ContentType("audio", "wav")) @@ -39,25 +37,21 @@ fun Application.audioService(tableRegistry: TableRegistry) { get("/audio/{tableName}/stream/{format}") { val tableName = call.parameters.required("tableName") { it } val format = call.parameters.required("format") { AudioStreamOutputFormat.byId(it) } - val sampleRate = call.request.queryParameters.optional("sampleRate") { it.toFloat() } ?: 44100.0f val bitDepth = call.request.queryParameters.optional("bitDepth") { BitDepth.safelyOf(it.toInt()) ?: throw BadRequestException("Bit depth $it is not recognized") } ?: BitDepth.BIT_16 - val clazz = when (val sourceType = call.request.queryParameters.optional("sourceType") { it }) { - null -> Sample::class - "sample" -> Sample::class - "sampleArray" -> SampleArray::class - else -> throw BadRequestException("$sourceType is not supported") - } val limit = call.request.queryParameters.optional("limit") { TimeMeasure.parseOrNull(it) ?: throw BadRequestException("Limit $it can't be parsed") } + val offset = call.request.queryParameters.optional("offset") { + TimeMeasure.parseOrNull(it) ?: throw BadRequestException("Offset $it can't be parsed") + } if (!audioService.tableRegistry.exists(tableName)) throw NotFoundException("$tableName is not found") call.respondOutputStream(format.contentType) { BufferedOutputStream(this).use { buffer -> - audioService.stream(format, tableName, sampleRate, bitDepth, clazz, limit).use { + audioService.stream(format, tableName, bitDepth, limit, offset).use { while (true) { val b = it.read() if (b < 0) break; @@ -78,23 +72,21 @@ class AudioService(internal val tableRegistry: TableRegistry) { fun stream( format: AudioStreamOutputFormat, tableName: String, - sampleRate: Float, bitDepth: BitDepth, - elementClazz: KClass, - limit: TimeMeasure? + limit: TimeMeasure?, + offset: TimeMeasure? ): InputStream { val table = tableRegistry.byName(tableName) return when (format) { - AudioStreamOutputFormat.WAV -> streamAsWav(table, sampleRate, bitDepth, elementClazz, limit) + AudioStreamOutputFormat.WAV -> streamAsWav(table, bitDepth, limit, offset) } } private fun streamAsWav( table: TimeseriesTableDriver, - sampleRate: Float, bitDepth: BitDepth, - elementClazz: KClass, - limit: TimeMeasure? + limit: TimeMeasure?, + offset: TimeMeasure? ): InputStream { val nextBytes: Queue = LinkedTransferQueue() @@ -104,11 +96,13 @@ class AudioService(internal val tableRegistry: TableRegistry) { } } + val sampleRate = table.sampleRate + val tableType = table.tableType val writer: Writer = @Suppress("UNCHECKED_CAST") - when (elementClazz) { + when (tableType) { Sample::class -> WavWriter( - (table as TimeseriesTableDriver).stream(0.s) + (table as TimeseriesTableDriver).stream(offset ?: 0.s) .let { if (limit != null) it.trim(limit.asNanoseconds(), TimeUnit.NANOSECONDS) else it }, bitDepth, sampleRate, @@ -116,14 +110,14 @@ class AudioService(internal val tableRegistry: TableRegistry) { writerDelegate ) SampleArray::class -> WavWriterFromSampleArray( - (table as TimeseriesTableDriver).stream(0.s) + (table as TimeseriesTableDriver).stream(offset ?: 0.s) .let { if (limit != null) it.trim(limit.asNanoseconds(), TimeUnit.NANOSECONDS) else it }, bitDepth, sampleRate, 1, writerDelegate ) - else -> throw UnsupportedOperationException("$elementClazz is not supported for audio streaming") + else -> throw UnsupportedOperationException("Table type $tableType is not supported for audio streaming") } val header = WavHeader(bitDepth, sampleRate, 1, Int.MAX_VALUE).header() diff --git a/http/src/main/kotlin/io/wavebeans/http/HttpCommunicatorService.kt b/http/src/main/kotlin/io/wavebeans/http/HttpCommunicatorService.kt index 153c2843..c62ee75e 100644 --- a/http/src/main/kotlin/io/wavebeans/http/HttpCommunicatorService.kt +++ b/http/src/main/kotlin/io/wavebeans/http/HttpCommunicatorService.kt @@ -16,10 +16,10 @@ class HttpCommunicatorService( HttpCommunicatorGrpcService(HttpCommunicatorService(tableRegistry)) } - fun registerTable(tableName: String, facilitatorLocation: String) { + fun registerTable(tableName: String, facilitatorLocation: String, sampleRate: Float) { log.info { "Registering remote table `$tableName` pointed to Facilitator on $facilitatorLocation" } val tableDriver = RemoteTimeseriesTableDriver(tableName, facilitatorLocation, Any::class) - tableDriver.init() + tableDriver.init(sampleRate) tableRegistry.register(tableName, tableDriver) } @@ -34,7 +34,7 @@ class HttpCommunicatorGrpcService(val service: HttpCommunicatorService) : HttpCo override fun registerTable(request: RegisterTableRequest, responseObserver: StreamObserver) { responseObserver.single("registerTable", request) { - service.registerTable(request.tableName, request.facilitatorLocation) + service.registerTable(request.tableName, request.facilitatorLocation, request.sampleRate) RegisterTableResponse.newBuilder().build() } } diff --git a/http/src/main/kotlin/io/wavebeans/http/TableService.kt b/http/src/main/kotlin/io/wavebeans/http/TableService.kt index c94cd243..c2dfa41b 100644 --- a/http/src/main/kotlin/io/wavebeans/http/TableService.kt +++ b/http/src/main/kotlin/io/wavebeans/http/TableService.kt @@ -38,10 +38,9 @@ fun Application.tableService(tableRegistry: TableRegistry) { get("/table/{tableName}/last") { val tableName: String = call.parameters.required("tableName") { it } val interval: TimeMeasure = call.request.queryParameters.required("interval") { TimeMeasure.parseOrNull(it) } - val sampleRate: Float? = call.request.queryParameters.optional("sampleRate") { it.toFloatOrNull() } if (tableService.exists(tableName)) { - val stream = tableService.last(tableName, interval, sampleRate ?: 44100.0f) + val stream = tableService.last(tableName, interval) call.respondOutputStream { streamOutput(stream) } @@ -53,10 +52,9 @@ fun Application.tableService(tableRegistry: TableRegistry) { val tableName: String = call.parameters.required("tableName") { it } val from: TimeMeasure = call.request.queryParameters.required("from") { TimeMeasure.parseOrNull(it) } val to: TimeMeasure = call.request.queryParameters.required("to") { TimeMeasure.parseOrNull(it) } - val sampleRate: Float? = call.request.queryParameters.optional("sampleRate") { it.toFloatOrNull() } if (tableService.exists(tableName)) { - val stream = tableService.timeRange(tableName, from, to, sampleRate ?: 44100.0f) + val stream = tableService.timeRange(tableName, from, to) call.respondOutputStream { streamOutput(stream) } @@ -87,23 +85,25 @@ class TableService(private val tableRegistry: TableRegistry) { fun exists(tableName: String): Boolean = tableRegistry.exists(tableName) - fun last(tableName: String, interval: TimeMeasure, sampleRate: Float): InputStream = + fun last(tableName: String, interval: TimeMeasure): InputStream = if (tableRegistry.exists(tableName)) { + val table = tableRegistry.byName(tableName) JsonBeanStreamReader( - stream = tableRegistry.byName(tableName).last(interval), - sampleRate = sampleRate, - offset = tableRegistry.byName(tableName).lastMarker() ?: 0.s + stream = table.last(interval), + sampleRate = table.sampleRate, + offset = table.lastMarker() ?: 0.s ) } else { ByteArrayInputStream(ByteArray(0)) } - fun timeRange(tableName: String, from: TimeMeasure, to: TimeMeasure, sampleRate: Float): InputStream = + fun timeRange(tableName: String, from: TimeMeasure, to: TimeMeasure): InputStream = if (tableRegistry.exists(tableName)) { + val table = tableRegistry.byName(tableName) JsonBeanStreamReader( - stream = tableRegistry.byName(tableName).timeRange(from, to), - sampleRate = sampleRate, + stream = table.timeRange(from, to), + sampleRate = table.sampleRate, offset = from ) } else { diff --git a/http/src/test/kotlin/io/wavebeans/http/AudioServiceSpec.kt b/http/src/test/kotlin/io/wavebeans/http/AudioServiceSpec.kt index 883ea901..0f5e2c74 100644 --- a/http/src/test/kotlin/io/wavebeans/http/AudioServiceSpec.kt +++ b/http/src/test/kotlin/io/wavebeans/http/AudioServiceSpec.kt @@ -24,7 +24,11 @@ object AudioServiceSpec : Spek({ describe("Streaming WAV") { describe("Sample table") { val tableRegistry by memoized(TEST) { mock() } - val tableDriver by memoized(TEST) { mock>() } + val tableDriver by memoized(TEST) { + val driver = mock>() + whenever(driver.tableType).thenReturn(Sample::class) + driver + } val service by memoized(TEST) { whenever(tableRegistry.exists(eq("table"))).thenReturn(true) whenever(tableRegistry.byName("table")).thenReturn(tableDriver) @@ -51,7 +55,11 @@ object AudioServiceSpec : Spek({ describe("SampleArray table") { val tableRegistry by memoized(TEST) { mock() } - val tableDriver by memoized(TEST) { mock>() } + val tableDriver by memoized(TEST) { + val driver = mock>() + whenever(driver.tableType).thenReturn(SampleArray::class) + driver + } val service by memoized(TEST) { whenever(tableRegistry.exists(eq("table"))).thenReturn(true) whenever(tableRegistry.byName("table")).thenReturn(tableDriver) @@ -86,8 +94,8 @@ private fun input16Bit() = input { (i, _) -> sampleOf((i and 0xFFFF).toShort()) private fun input8Bit() = input { (i, _) -> sampleOf((i and 0xFF).toByte()) } -private inline fun assert8BitWavOutput(service: AudioService) { - assertThat(service.stream(AudioStreamOutputFormat.WAV, "table", 44100.0f, BitDepth.BIT_8, T::class, null)).all { +private fun assert8BitWavOutput(service: AudioService) { + assertThat(service.stream(AudioStreamOutputFormat.WAV, "table", BitDepth.BIT_8, null, 0.s)).all { take(44).all { isNotEmpty() // header is there range(0, 4).isEqualTo("RIFF".toByteArray()) @@ -101,8 +109,8 @@ private inline fun assert8BitWavOutput(service: AudioService) } } -private inline fun assert24BitWavOutput(service: AudioService) { - assertThat(service.stream(AudioStreamOutputFormat.WAV, "table", 44100.0f, BitDepth.BIT_24, T::class, null)).all { +private fun assert24BitWavOutput(service: AudioService) { + assertThat(service.stream(AudioStreamOutputFormat.WAV, "table", BitDepth.BIT_24, null, 0.s)).all { take(44).all { isNotEmpty() // header is there range(0, 4).isEqualTo("RIFF".toByteArray()) @@ -113,8 +121,8 @@ private inline fun assert24BitWavOutput(service: AudioService) } } -private inline fun assert32BitWavOutput(service: AudioService) { - assertThat(service.stream(AudioStreamOutputFormat.WAV, "table", 44100.0f, BitDepth.BIT_32, T::class, null)).all { +private fun assert32BitWavOutput(service: AudioService) { + assertThat(service.stream(AudioStreamOutputFormat.WAV, "table", BitDepth.BIT_32, null, 0.s)).all { take(44).all { isNotEmpty() // header is there range(0, 4).isEqualTo("RIFF".toByteArray()) @@ -125,8 +133,8 @@ private inline fun assert32BitWavOutput(service: AudioService) } } -private inline fun assert16BitWavOutput(service: AudioService) { - assertThat(service.stream(AudioStreamOutputFormat.WAV, "table", 44100.0f, BitDepth.BIT_16, T::class, null)).all { +private fun assert16BitWavOutput(service: AudioService) { + assertThat(service.stream(AudioStreamOutputFormat.WAV, "table", BitDepth.BIT_16, null, 0.s)).all { take(44).all { isNotEmpty() // header is there range(0, 4).isEqualTo("RIFF".toByteArray()) diff --git a/http/src/test/kotlin/io/wavebeans/http/HttpCommunicatorSpec.kt b/http/src/test/kotlin/io/wavebeans/http/HttpCommunicatorSpec.kt index 54057669..83ac8062 100644 --- a/http/src/test/kotlin/io/wavebeans/http/HttpCommunicatorSpec.kt +++ b/http/src/test/kotlin/io/wavebeans/http/HttpCommunicatorSpec.kt @@ -17,7 +17,7 @@ object HttpCommunicatorSpec : Spek({ it("should register remote table driver") { val tableName = "myTable" - service.registerTable(tableName, "127.0.0.1:4000") + service.registerTable(tableName, "127.0.0.1:4000", 44100.0f) assertThat(tableRegistry).all { prop("exists($tableName)") { it.exists(tableName) }.isTrue() prop("byName($tableName)") { it.byName(tableName) } diff --git a/http/src/test/kotlin/io/wavebeans/http/HttpServiceSpec.kt b/http/src/test/kotlin/io/wavebeans/http/HttpServiceSpec.kt index 7fc25582..d0939e8a 100644 --- a/http/src/test/kotlin/io/wavebeans/http/HttpServiceSpec.kt +++ b/http/src/test/kotlin/io/wavebeans/http/HttpServiceSpec.kt @@ -26,6 +26,7 @@ import io.wavebeans.lib.table.toTable import kotlinx.serialization.* import kotlinx.serialization.builtins.serializer import org.spekframework.spek2.Spek +import org.spekframework.spek2.lifecycle.CachingMode.* import org.spekframework.spek2.style.specification.describe import java.util.concurrent.TimeUnit @@ -45,11 +46,13 @@ object HttpServiceSpec : Spek({ val elementRegex = elementRegex("-?\\d+\\.\\d+([eE]?-\\d+)?") val overseer = SingleThreadedOverseer(listOf(o)) - overseer.eval(44100.0f).all { it.get(10000, TimeUnit.MILLISECONDS).finished } + beforeGroup { + overseer.eval(44100.0f).all { it.get(10000, TimeUnit.MILLISECONDS).finished } + } afterGroup { overseer.close() } it("should return last 100ms") { - handleRequest(Get, "/table/table1/last?interval=100ms&sampleRate=44100.0").apply { + handleRequest(Get, "/table/table1/last?interval=100ms").apply { assertThat(response.status()).isNotNull().isEqualTo(HttpStatusCode.OK) assertThat(response.content).isNotNull().all { isNotEmpty() @@ -95,7 +98,10 @@ object HttpServiceSpec : Spek({ val elementRegex = elementRegex("\\{\"v\":-?\\d+}") val overseer = SingleThreadedOverseer(listOf(o)) - overseer.eval(44100.0f).all { it.get(10000, TimeUnit.MILLISECONDS).finished } + beforeGroup { + overseer.eval(44100.0f).all { it.get(10000, TimeUnit.MILLISECONDS).finished } + } + afterGroup { overseer.close() } it("should return last 100ms") { @@ -119,6 +125,7 @@ object HttpServiceSpec : Spek({ } } + describe("External serializer") { data class B(val v: String) @@ -145,7 +152,9 @@ object HttpServiceSpec : Spek({ val elementRegex = elementRegex("\\{\"v\":\"-?[0-9a-fA-F]+\"}") val overseer = SingleThreadedOverseer(listOf(o)) - overseer.eval(44100.0f).all { it.get(10000, TimeUnit.MILLISECONDS).finished } + beforeGroup { + overseer.eval(44100.0f).all { it.get(10000, TimeUnit.MILLISECONDS).finished } + } afterGroup { overseer.close() } it("should return last 100ms") { @@ -174,7 +183,9 @@ object HttpServiceSpec : Spek({ "\\}") val overseer = SingleThreadedOverseer(listOf(o)) - overseer.eval(44100.0f).all { it.get(10000, TimeUnit.MILLISECONDS).finished } + beforeGroup { + overseer.eval(44100.0f).all { it.get(10000, TimeUnit.MILLISECONDS).finished } + } afterGroup { overseer.close() } it("should return last 100ms") { @@ -197,7 +208,9 @@ object HttpServiceSpec : Spek({ "\\}") val overseer = SingleThreadedOverseer(listOf(o)) - overseer.eval(44100.0f).all { it.get(10000, TimeUnit.MILLISECONDS).finished } + beforeGroup { + overseer.eval(44100.0f).all { it.get(10000, TimeUnit.MILLISECONDS).finished } + } afterGroup { overseer.close() } it("should return last 100ms") { @@ -215,7 +228,9 @@ object HttpServiceSpec : Spek({ val elementRegex = elementRegex("\\[[-eE\\d\\.\\,]+\\]") val overseer = SingleThreadedOverseer(listOf(o)) - overseer.eval(44100.0f).all { it.get(10000, TimeUnit.MILLISECONDS).finished } + beforeGroup { + overseer.eval(44100.0f).all { it.get(10000, TimeUnit.MILLISECONDS).finished } + } afterGroup { overseer.close() } it("should return last 100ms") { @@ -233,7 +248,9 @@ object HttpServiceSpec : Spek({ val elementRegex = elementRegex("\\[[-\\d\\,]+\\]") val overseer = SingleThreadedOverseer(listOf(o)) - overseer.eval(44100.0f).all { it.get(10000, TimeUnit.MILLISECONDS).finished } + beforeGroup { + overseer.eval(44100.0f).all { it.get(10000, TimeUnit.MILLISECONDS).finished } + } afterGroup { overseer.close() } it("should return last 100ms") { @@ -261,7 +278,9 @@ object HttpServiceSpec : Spek({ "\\}") val overseer = SingleThreadedOverseer(listOf(o)) - overseer.eval(44100.0f).all { it.get(10000, TimeUnit.MILLISECONDS).finished } + beforeGroup { + overseer.eval(44100.0f).all { it.get(10000, TimeUnit.MILLISECONDS).finished } + } afterGroup { overseer.close() } it("should return last 100ms") { @@ -284,11 +303,11 @@ object HttpServiceSpec : Spek({ engine.application.audioService(TableRegistry.default) describe("Streaming Wav") { - val o = 440.sine().toSampleTable("mySampleTable", 10.s) - val o2 = 440.sine().toSampleTable("mySampleArrayTable", 10.s, 441) + val o = 440.sine().trim(2000).toSampleTable("mySampleTable", 1.s) + val o2 = 440.sine().trim(2000).toSampleTable("mySampleArrayTable", 1.s, 441) val overseer = MultiThreadedOverseer(listOf(o, o2), 2, 1) - beforeGroup { Thread { overseer.eval(44100.0f).all { it.get().finished } }.start() } + beforeGroup { overseer.eval(44100.0f).all { it.get().finished } } afterGroup { overseer.close() } @@ -298,7 +317,7 @@ object HttpServiceSpec : Spek({ } } it("should return stream for existing table in 8 bit format") { - engine.handleRequest(Get, "/audio/mySampleTable/stream/wav?limit=100ms&bitDepth=8").apply { + engine.handleRequest(Get, "/audio/mySampleTable/stream/wav?limit=100ms&offset=1s&bitDepth=8").apply { assertThat(response.status()).isNotNull().isEqualTo(HttpStatusCode.OK) assertThat(response.headers) .prop("Content-Type") { it["Content-Type"] } @@ -312,7 +331,7 @@ object HttpServiceSpec : Spek({ } } it("should return stream for existing table in 16 bit format") { - engine.handleRequest(Get, "/audio/mySampleTable/stream/wav?limit=100ms&bitDepth=16").apply { + engine.handleRequest(Get, "/audio/mySampleTable/stream/wav?limit=100ms&offset=1s&bitDepth=16").apply { assertThat(response.status()).isNotNull().isEqualTo(HttpStatusCode.OK) assertThat(response.headers) .prop("Content-Type") { it["Content-Type"] } @@ -326,7 +345,7 @@ object HttpServiceSpec : Spek({ } } it("should return stream for existing table in 24 bit format") { - engine.handleRequest(Get, "/audio/mySampleTable/stream/wav?limit=100ms&bitDepth=24").apply { + engine.handleRequest(Get, "/audio/mySampleTable/stream/wav?limit=100ms&offset=1s&bitDepth=24").apply { assertThat(response.status()).isNotNull().isEqualTo(HttpStatusCode.OK) assertThat(response.headers) .prop("Content-Type") { it["Content-Type"] } @@ -340,7 +359,7 @@ object HttpServiceSpec : Spek({ } } it("should return stream for existing table in 32 bit format") { - engine.handleRequest(Get, "/audio/mySampleTable/stream/wav?limit=100ms&bitDepth=32").apply { + engine.handleRequest(Get, "/audio/mySampleTable/stream/wav?limit=100ms&offset=1s&bitDepth=32").apply { assertThat(response.status()).isNotNull().isEqualTo(HttpStatusCode.OK) assertThat(response.headers) .prop("Content-Type") { it["Content-Type"] } @@ -354,12 +373,12 @@ object HttpServiceSpec : Spek({ } } it("should return 400 for unknown bit depth") { - engine.handleRequest(Get, "/audio/nonExistingTable/stream/wav?limit=100ms&bitDepth=42").apply { + engine.handleRequest(Get, "/audio/nonExistingTable/stream/wav?limit=100ms&offset=1s&bitDepth=42").apply { assertThat(response.status()).isNotNull().isEqualTo(HttpStatusCode.BadRequest) } } it("should return stream for existing sample array table") { - engine.handleRequest(Get, "/audio/mySampleArrayTable/stream/wav?limit=100ms&sourceType=sampleArray").apply { + engine.handleRequest(Get, "/audio/mySampleArrayTable/stream/wav?limit=100ms&offset=1s").apply { assertThat(response.status()).isNotNull().isEqualTo(HttpStatusCode.OK) assertThat(response.headers) .prop("Content-Type") { it["Content-Type"] } @@ -372,13 +391,8 @@ object HttpServiceSpec : Spek({ } } } - it("should return 400 for unknown source type") { - engine.handleRequest(Get, "/audio/mySampleTable/stream/wav?limit=100ms&sourceType=unknown").apply { - assertThat(response.status()).isNotNull().isEqualTo(HttpStatusCode.BadRequest) - } - } it("should return stream limited with 200ms") { - engine.handleRequest(Get, "/audio/mySampleTable/stream/wav?limit=200ms").apply { + engine.handleRequest(Get, "/audio/mySampleTable/stream/wav?limit=200ms&offset=1s").apply { assertThat(response.status()).isNotNull().isEqualTo(HttpStatusCode.OK) assertThat(response.headers) .prop("Content-Type") { it["Content-Type"] } diff --git a/http/src/test/kotlin/io/wavebeans/http/TableServiceSpec.kt b/http/src/test/kotlin/io/wavebeans/http/TableServiceSpec.kt index 047c83ff..0bffac86 100644 --- a/http/src/test/kotlin/io/wavebeans/http/TableServiceSpec.kt +++ b/http/src/test/kotlin/io/wavebeans/http/TableServiceSpec.kt @@ -37,16 +37,17 @@ object TableServiceSpec : Spek({ val tableDriver = mock>() whenever(tableRegistry.exists(eq("table"))).thenReturn(true) whenever(tableRegistry.byName("table")).thenReturn(tableDriver) + whenever(tableDriver.sampleRate).thenReturn(100.0f) whenever(tableDriver.last(100.ms)).thenReturn(input { (i, sampleRate) -> if (i < sampleRate * 0.1) i.toInt() else null }) val service = TableService(tableRegistry) it("should return stored values if table exists") { - assertThat(service.last("table", 100.ms, 100.0f).bufferedReader().use { it.readLines() }) + assertThat(service.last("table", 100.ms).bufferedReader().use { it.readLines() }) .isEqualTo((0..9).map { "{\"offset\":${it * 1_000_000_000L / 100L},\"value\":$it}" }) } it("should not return any values if table doesn't exist") { - assertThat(service.last("non-existing-table", 100.ms, 100.0f).bufferedReader().use { it.readLines() }) + assertThat(service.last("non-existing-table", 100.ms).bufferedReader().use { it.readLines() }) .isEmpty() } } @@ -56,16 +57,17 @@ object TableServiceSpec : Spek({ val tableDriver = mock>() whenever(tableRegistry.exists(eq("table"))).thenReturn(true) whenever(tableRegistry.byName("table")).thenReturn(tableDriver) + whenever(tableDriver.sampleRate).thenReturn(100.0f) whenever(tableDriver.timeRange(0.ms, 100.ms)).thenReturn(input { (i, sampleRate) -> if (i < sampleRate * 0.1) i.toInt() else null }) val service = TableService(tableRegistry) it("should return stored values if table exists") { - assertThat(service.timeRange("table", 0.ms, 100.ms, 100.0f).bufferedReader().use { it.readLines() }) + assertThat(service.timeRange("table", 0.ms, 100.ms).bufferedReader().use { it.readLines() }) .isEqualTo((0..9).map { "{\"offset\":${it * 1_000_000_000L / 100L},\"value\":$it}" }) } it("should not return any values if table doesn't exist") { - assertThat(service.timeRange("non-existing-table", 0.ms, 100.ms, 100.0f).bufferedReader().use { it.readLines() }) + assertThat(service.timeRange("non-existing-table", 0.ms, 100.ms).bufferedReader().use { it.readLines() }) .isEmpty() } } diff --git a/lib/src/main/kotlin/io/wavebeans/lib/table/InMemoryTimeseriesTableDriver.kt b/lib/src/main/kotlin/io/wavebeans/lib/table/InMemoryTimeseriesTableDriver.kt index 512fe1ef..1c6406cc 100644 --- a/lib/src/main/kotlin/io/wavebeans/lib/table/InMemoryTimeseriesTableDriver.kt +++ b/lib/src/main/kotlin/io/wavebeans/lib/table/InMemoryTimeseriesTableDriver.kt @@ -4,6 +4,7 @@ import io.wavebeans.lib.TimeMeasure import io.wavebeans.lib.s import mu.KotlinLogging import java.util.concurrent.* +import java.util.concurrent.atomic.AtomicReference import kotlin.reflect.KClass internal data class Item(val timeMarker: TimeMeasure, val value: T) @@ -42,12 +43,17 @@ class InMemoryTimeseriesTableDriver( } } + override val sampleRate: Float + get() = sampleRateValue[0] + .let { if (it < 0) throw IllegalStateException("Sample rate value is not initialized yet") else it } internal val table = ConcurrentLinkedDeque>() private var cleanUpTask: ScheduledFuture<*>? = null + private val sampleRateValue: FloatArray = FloatArray(1) { Float.MIN_VALUE } - override fun init() { + override fun init(sampleRate: Float) { + sampleRateValue[0] = sampleRate log.debug { "[$this] Initializing driver" } if (cleanUpTask == null) { log.debug { "[$this] Setting cleanup task" } diff --git a/lib/src/main/kotlin/io/wavebeans/lib/table/TableOutput.kt b/lib/src/main/kotlin/io/wavebeans/lib/table/TableOutput.kt index 5427f11f..0fcc3549 100644 --- a/lib/src/main/kotlin/io/wavebeans/lib/table/TableOutput.kt +++ b/lib/src/main/kotlin/io/wavebeans/lib/table/TableOutput.kt @@ -138,7 +138,7 @@ class TableOutput( } override fun writer(sampleRate: Float): Writer { - tableDriver.init() + tableDriver.init(sampleRate) val iterator = input.asSequence(sampleRate).iterator() var index = tableDriver.lastMarker()?.let { timeToSampleIndexCeil(it, sampleRate) } ?: 0L diff --git a/lib/src/main/kotlin/io/wavebeans/lib/table/TimeseriesTableDriver.kt b/lib/src/main/kotlin/io/wavebeans/lib/table/TimeseriesTableDriver.kt index 47170a3d..bc3a2aac 100644 --- a/lib/src/main/kotlin/io/wavebeans/lib/table/TimeseriesTableDriver.kt +++ b/lib/src/main/kotlin/io/wavebeans/lib/table/TimeseriesTableDriver.kt @@ -15,6 +15,11 @@ interface TimeseriesTableDriver : Closeable { */ val tableName: String + /** + * Gets the sample rate. Provided when the table is initialized with [init] call by executor. + */ + val sampleRate: Float + /** * The type of elements the table keeps */ @@ -23,7 +28,7 @@ interface TimeseriesTableDriver : Closeable { /** * Initializes the driver. */ - fun init() + fun init(sampleRate: Float) /** * Resets the driver state. diff --git a/proto/src/main/kotlin/io/wavebeans/communicator/HttpCommunicatorClient.kt b/proto/src/main/kotlin/io/wavebeans/communicator/HttpCommunicatorClient.kt index b506c9dd..f45a9813 100644 --- a/proto/src/main/kotlin/io/wavebeans/communicator/HttpCommunicatorClient.kt +++ b/proto/src/main/kotlin/io/wavebeans/communicator/HttpCommunicatorClient.kt @@ -14,9 +14,10 @@ class HttpCommunicatorClient( client = HttpCommunicatorGrpc.newFutureStub(channel) } - fun registerTable(tableName: String, facilitatorLocation: String, timeoutMs: Long = 5000) { + fun registerTable(tableName: String, facilitatorLocation: String, sampleRate: Float, timeoutMs: Long = 5000) { client.registerTable( RegisterTableRequest.newBuilder() + .setSampleRate(sampleRate) .setTableName(tableName) .setFacilitatorLocation(facilitatorLocation) .build() diff --git a/proto/src/main/proto/httpCommunicator.proto b/proto/src/main/proto/httpCommunicator.proto index 9c9fa1fd..d9af5742 100644 --- a/proto/src/main/proto/httpCommunicator.proto +++ b/proto/src/main/proto/httpCommunicator.proto @@ -12,6 +12,7 @@ service HttpCommunicator { message RegisterTableRequest { string tableName = 1; string facilitatorLocation = 2; + float sampleRate = 3; } message RegisterTableResponse {}