Skip to content

Commit

Permalink
[#62] [ HTTP API improvements. Audio and Table service no longer requ…
Browse files Browse the repository at this point in the history
…ire some parameters (sampleRate, sourceType) which can be inferred from the table itself. ]
  • Loading branch information
asubb committed Jun 6, 2020
1 parent 1759fde commit aa0af48
Show file tree
Hide file tree
Showing 17 changed files with 132 additions and 91 deletions.
1 change: 1 addition & 0 deletions .release/fix-audio-stream-api.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* [ [#62](https://github.com/WaveBeans/wavebeans/issues/62) ] HTTP API improvements. Audio and Table service no longer require some parameters (sampleRate, sourceType) which can be inferred from the table itself.
7 changes: 1 addition & 6 deletions docs/user/http/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,9 @@ Following methods available, mostly this methods are exposed from [table query f
* Getting last Interval: `/table/{tableName}/last?interval={interval}[&sampleRate={sampleRate}]`:
* `tableName` -- the name of the table to query, if the table can't be found the API call with return `404 Not Found` HTTP code.
* `interval` -- the interval you're requesting, the type of [TimeMeasure](#time-measure). If malformed you'll see `400 Bad Request` HTTP code.
* `sampleRate` -- is optional parameter which defines which sample rate to use to form the stream. By default 44100. In this case only affects the offset time values.
* Getting specific time range: `/table/{tableName}/timeRange?from={from}&to={to}[&sampleRate={sampleRate}]`:
* `tableName` -- the name of the table to query, if the table can't be found the API call with return `404 Not Found` HTTP code.
* `from`, `to` -- the from and to values of the interval you're requesting, the type of [TimeMeasure](#time-measure). If malformed you'll see `400 Bad Request` HTTP code.
* `sampleRate` -- is optional parameter which defines which sample rate to use to form the stream. By default 44100. In this case only affects the offset time values.

Both endpoints return stream as new-line separated JSON objects like this:

Expand Down Expand Up @@ -222,11 +220,8 @@ Additional useful parameters:SampleCountMeasurementSpec
* `bitDepth` -- either 8, 16, 24, 32 or 64. The number oif bits per sample to stream. FYI, wav-format support up to 32 bits per sample. By default it is 16 bit.
* `limit` -- interval to limit by, follow [Time Measure](#time-measure) rules. By default, it is unlimited as not specified.

Current limitations and considerations:
* You need to specify `sampleRate` explicitly if it is different from 44100Hz. It should be the same the data is being stored into the table, otherwise it won't be converted automatically and you'll get data reproduced on higher or lower rate as format needs to define that value.
* The table type doesn't contain currently the type of data it is keeping, so you need to define it explicitly. Parameter `sourceType` -- you can stream from `sample` or `sampleArray` table, by default it is `sample`.
Current considerations:
* The wav format requires to specify the length in the header. To make streaming possible the length value is populated with `Int.MAX_VALUE`. Depending on the number of bits and channels it may last for a few days nonstop. Then most players just stop playing sound, however the actual data is being transferred normally.
* Most of these limitations can be addressed in next releases.

## Distributed mode

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,13 @@ class DistributedOverseer(
val bushEndpoints = plantBushes(jobKey, sampleRate)
bushEndpoints.forEach { FacilitatorCheckJob(it.location).start() }
registerBushEndpoint(bushEndpoints)
registerTables()
registerTables(sampleRate)
startJob(jobKey)

return locationFutures.values.toList()
}

private fun registerTables() {
private fun registerTables(sampleRate: Float) {
val facilitatorToTableNames = distribution.entries.map {
val tableNames = it.value.asSequence()
.map { podRef -> podRef.internalBeans }
Expand All @@ -156,7 +156,7 @@ class DistributedOverseer(
facilitatorToTableNames
.flatMap { it.second.map { v -> it.first to v } }
.forEach { (facilitatorLocation, tableName) ->
client.registerTable(tableName, facilitatorLocation)
client.registerTable(tableName, facilitatorLocation, sampleRate)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,16 @@ class RemoteTimeseriesTableDriver<T : Any>(
override val tableType: KClass<T>
) : TimeseriesTableDriver<T> {

override val sampleRate: Float
get() = sampleRateValue[0]
.let { if (it < 0) throw IllegalStateException("Sample rate value is not initialized yet") else it }

lateinit var client: TableApiClient

override fun init() {
private val sampleRateValue: FloatArray = FloatArray(1) { Float.NEGATIVE_INFINITY }

override fun init(sampleRate: Float) {
sampleRateValue[0] = sampleRate
client = TableApiClient(tableName, facilitatorLocation)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,15 @@ object RemoteTimeseriesTableDriverSpec : Spek({

val remoteTableDriver by memoized(SCOPE) { RemoteTimeseriesTableDriver<Sample>(tableName, "127.0.0.1:5000", Sample::class) }

it("should not return sample rate if not initialized") {
assertThat(catch { remoteTableDriver.sampleRate })
.isNotNull()
.isInstanceOf(IllegalStateException::class)
}

it("should init") {
assertThat(catch { remoteTableDriver.init() }).isNull()
remoteTableDriver.init(12345.0f)
assertThat(remoteTableDriver.sampleRate).isEqualTo(12345.0f)
}
it("should return null for first marker") {
whenever(tableDriver.firstMarker()).thenReturn(null)
Expand All @@ -67,12 +74,12 @@ object RemoteTimeseriesTableDriverSpec : Spek({
assertThat(remoteTableDriver.lastMarker()).isNotNull().isEqualTo(100.s)
}
it("should put sample value") {
tableDriver.init() // need to initialize memoized value
tableDriver.init(12345.0f) // need to initialize memoized value
remoteTableDriver.put(1.s, sampleOf(1.0))
verify(tableDriver, times(1)).put(eq(1.s), eq(sampleOf(1.0)))
}
it("should reset") {
tableDriver.init() // need to initialize memoized value
tableDriver.init(12345.0f) // need to initialize memoized value
remoteTableDriver.reset()
verify(tableDriver, times(1)).reset()
}
Expand Down
36 changes: 15 additions & 21 deletions http/src/main/kotlin/io/wavebeans/http/AudioService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ import io.wavebeans.lib.table.TimeseriesTableDriver
import mu.KotlinLogging
import java.io.BufferedOutputStream
import java.io.InputStream
import java.io.Serializable
import java.util.*
import java.util.concurrent.LinkedTransferQueue
import java.util.concurrent.TimeUnit
import kotlin.reflect.KClass

enum class AudioStreamOutputFormat(val id: String, val contentType: ContentType) {
WAV("wav", ContentType("audio", "wav"))
Expand All @@ -39,25 +37,21 @@ fun Application.audioService(tableRegistry: TableRegistry) {
get("/audio/{tableName}/stream/{format}") {
val tableName = call.parameters.required("tableName") { it }
val format = call.parameters.required("format") { AudioStreamOutputFormat.byId(it) }
val sampleRate = call.request.queryParameters.optional("sampleRate") { it.toFloat() } ?: 44100.0f
val bitDepth = call.request.queryParameters.optional("bitDepth") {
BitDepth.safelyOf(it.toInt()) ?: throw BadRequestException("Bit depth $it is not recognized")
} ?: BitDepth.BIT_16
val clazz = when (val sourceType = call.request.queryParameters.optional("sourceType") { it }) {
null -> Sample::class
"sample" -> Sample::class
"sampleArray" -> SampleArray::class
else -> throw BadRequestException("$sourceType is not supported")
}
val limit = call.request.queryParameters.optional("limit") {
TimeMeasure.parseOrNull(it) ?: throw BadRequestException("Limit $it can't be parsed")
}
val offset = call.request.queryParameters.optional("offset") {
TimeMeasure.parseOrNull(it) ?: throw BadRequestException("Offset $it can't be parsed")
}

if (!audioService.tableRegistry.exists(tableName)) throw NotFoundException("$tableName is not found")

call.respondOutputStream(format.contentType) {
BufferedOutputStream(this).use { buffer ->
audioService.stream(format, tableName, sampleRate, bitDepth, clazz, limit).use {
audioService.stream<Any>(format, tableName, bitDepth, limit, offset).use {
while (true) {
val b = it.read()
if (b < 0) break;
Expand All @@ -78,23 +72,21 @@ class AudioService(internal val tableRegistry: TableRegistry) {
fun <T : Any> stream(
format: AudioStreamOutputFormat,
tableName: String,
sampleRate: Float,
bitDepth: BitDepth,
elementClazz: KClass<T>,
limit: TimeMeasure?
limit: TimeMeasure?,
offset: TimeMeasure?
): InputStream {
val table = tableRegistry.byName<T>(tableName)
return when (format) {
AudioStreamOutputFormat.WAV -> streamAsWav(table, sampleRate, bitDepth, elementClazz, limit)
AudioStreamOutputFormat.WAV -> streamAsWav(table, bitDepth, limit, offset)
}
}

private fun <T : Any> streamAsWav(
table: TimeseriesTableDriver<T>,
sampleRate: Float,
bitDepth: BitDepth,
elementClazz: KClass<T>,
limit: TimeMeasure?
limit: TimeMeasure?,
offset: TimeMeasure?
): InputStream {

val nextBytes: Queue<Byte> = LinkedTransferQueue<Byte>()
Expand All @@ -104,26 +96,28 @@ class AudioService(internal val tableRegistry: TableRegistry) {
}
}

val sampleRate = table.sampleRate
val tableType = table.tableType
val writer: Writer =
@Suppress("UNCHECKED_CAST")
when (elementClazz) {
when (tableType) {
Sample::class -> WavWriter(
(table as TimeseriesTableDriver<Sample>).stream(0.s)
(table as TimeseriesTableDriver<Sample>).stream(offset ?: 0.s)
.let { if (limit != null) it.trim(limit.asNanoseconds(), TimeUnit.NANOSECONDS) else it },
bitDepth,
sampleRate,
1,
writerDelegate
)
SampleArray::class -> WavWriterFromSampleArray(
(table as TimeseriesTableDriver<SampleArray>).stream(0.s)
(table as TimeseriesTableDriver<SampleArray>).stream(offset ?: 0.s)
.let { if (limit != null) it.trim(limit.asNanoseconds(), TimeUnit.NANOSECONDS) else it },
bitDepth,
sampleRate,
1,
writerDelegate
)
else -> throw UnsupportedOperationException("$elementClazz is not supported for audio streaming")
else -> throw UnsupportedOperationException("Table type $tableType is not supported for audio streaming")
}

val header = WavHeader(bitDepth, sampleRate, 1, Int.MAX_VALUE).header()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ class HttpCommunicatorService(
HttpCommunicatorGrpcService(HttpCommunicatorService(tableRegistry))
}

fun registerTable(tableName: String, facilitatorLocation: String) {
fun registerTable(tableName: String, facilitatorLocation: String, sampleRate: Float) {
log.info { "Registering remote table `$tableName` pointed to Facilitator on $facilitatorLocation" }
val tableDriver = RemoteTimeseriesTableDriver<Any>(tableName, facilitatorLocation, Any::class)
tableDriver.init()
tableDriver.init(sampleRate)
tableRegistry.register(tableName, tableDriver)
}

Expand All @@ -34,7 +34,7 @@ class HttpCommunicatorGrpcService(val service: HttpCommunicatorService) : HttpCo

override fun registerTable(request: RegisterTableRequest, responseObserver: StreamObserver<RegisterTableResponse>) {
responseObserver.single("registerTable", request) {
service.registerTable(request.tableName, request.facilitatorLocation)
service.registerTable(request.tableName, request.facilitatorLocation, request.sampleRate)
RegisterTableResponse.newBuilder().build()
}
}
Expand Down
22 changes: 11 additions & 11 deletions http/src/main/kotlin/io/wavebeans/http/TableService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,9 @@ fun Application.tableService(tableRegistry: TableRegistry) {
get("/table/{tableName}/last") {
val tableName: String = call.parameters.required("tableName") { it }
val interval: TimeMeasure = call.request.queryParameters.required("interval") { TimeMeasure.parseOrNull(it) }
val sampleRate: Float? = call.request.queryParameters.optional("sampleRate") { it.toFloatOrNull() }

if (tableService.exists(tableName)) {
val stream = tableService.last(tableName, interval, sampleRate ?: 44100.0f)
val stream = tableService.last(tableName, interval)
call.respondOutputStream {
streamOutput(stream)
}
Expand All @@ -53,10 +52,9 @@ fun Application.tableService(tableRegistry: TableRegistry) {
val tableName: String = call.parameters.required("tableName") { it }
val from: TimeMeasure = call.request.queryParameters.required("from") { TimeMeasure.parseOrNull(it) }
val to: TimeMeasure = call.request.queryParameters.required("to") { TimeMeasure.parseOrNull(it) }
val sampleRate: Float? = call.request.queryParameters.optional("sampleRate") { it.toFloatOrNull() }

if (tableService.exists(tableName)) {
val stream = tableService.timeRange(tableName, from, to, sampleRate ?: 44100.0f)
val stream = tableService.timeRange(tableName, from, to)
call.respondOutputStream {
streamOutput(stream)
}
Expand Down Expand Up @@ -87,23 +85,25 @@ class TableService(private val tableRegistry: TableRegistry) {

fun exists(tableName: String): Boolean = tableRegistry.exists(tableName)

fun last(tableName: String, interval: TimeMeasure, sampleRate: Float): InputStream =
fun last(tableName: String, interval: TimeMeasure): InputStream =
if (tableRegistry.exists(tableName)) {
val table = tableRegistry.byName<Any>(tableName)
JsonBeanStreamReader(
stream = tableRegistry.byName<Any>(tableName).last(interval),
sampleRate = sampleRate,
offset = tableRegistry.byName<Any>(tableName).lastMarker() ?: 0.s
stream = table.last(interval),
sampleRate = table.sampleRate,
offset = table.lastMarker() ?: 0.s
)
} else {
ByteArrayInputStream(ByteArray(0))
}


fun timeRange(tableName: String, from: TimeMeasure, to: TimeMeasure, sampleRate: Float): InputStream =
fun timeRange(tableName: String, from: TimeMeasure, to: TimeMeasure): InputStream =
if (tableRegistry.exists(tableName)) {
val table = tableRegistry.byName<Any>(tableName)
JsonBeanStreamReader(
stream = tableRegistry.byName<Any>(tableName).timeRange(from, to),
sampleRate = sampleRate,
stream = table.timeRange(from, to),
sampleRate = table.sampleRate,
offset = from
)
} else {
Expand Down
28 changes: 18 additions & 10 deletions http/src/test/kotlin/io/wavebeans/http/AudioServiceSpec.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ object AudioServiceSpec : Spek({
describe("Streaming WAV") {
describe("Sample table") {
val tableRegistry by memoized(TEST) { mock<TableRegistry>() }
val tableDriver by memoized(TEST) { mock<TimeseriesTableDriver<Sample>>() }
val tableDriver by memoized(TEST) {
val driver = mock<TimeseriesTableDriver<Sample>>()
whenever(driver.tableType).thenReturn(Sample::class)
driver
}
val service by memoized(TEST) {
whenever(tableRegistry.exists(eq("table"))).thenReturn(true)
whenever(tableRegistry.byName<Sample>("table")).thenReturn(tableDriver)
Expand All @@ -51,7 +55,11 @@ object AudioServiceSpec : Spek({

describe("SampleArray table") {
val tableRegistry by memoized(TEST) { mock<TableRegistry>() }
val tableDriver by memoized(TEST) { mock<TimeseriesTableDriver<SampleArray>>() }
val tableDriver by memoized(TEST) {
val driver = mock<TimeseriesTableDriver<SampleArray>>()
whenever(driver.tableType).thenReturn(SampleArray::class)
driver
}
val service by memoized(TEST) {
whenever(tableRegistry.exists(eq("table"))).thenReturn(true)
whenever(tableRegistry.byName<SampleArray>("table")).thenReturn(tableDriver)
Expand Down Expand Up @@ -86,8 +94,8 @@ private fun input16Bit() = input { (i, _) -> sampleOf((i and 0xFFFF).toShort())

private fun input8Bit() = input { (i, _) -> sampleOf((i and 0xFF).toByte()) }

private inline fun <reified T : Any> assert8BitWavOutput(service: AudioService) {
assertThat(service.stream(AudioStreamOutputFormat.WAV, "table", 44100.0f, BitDepth.BIT_8, T::class, null)).all {
private fun <T : Any> assert8BitWavOutput(service: AudioService) {
assertThat(service.stream<T>(AudioStreamOutputFormat.WAV, "table", BitDepth.BIT_8, null, 0.s)).all {
take(44).all {
isNotEmpty() // header is there
range(0, 4).isEqualTo("RIFF".toByteArray())
Expand All @@ -101,8 +109,8 @@ private inline fun <reified T : Any> assert8BitWavOutput(service: AudioService)
}
}

private inline fun <reified T : Any> assert24BitWavOutput(service: AudioService) {
assertThat(service.stream(AudioStreamOutputFormat.WAV, "table", 44100.0f, BitDepth.BIT_24, T::class, null)).all {
private fun <T : Any> assert24BitWavOutput(service: AudioService) {
assertThat(service.stream<T>(AudioStreamOutputFormat.WAV, "table", BitDepth.BIT_24, null, 0.s)).all {
take(44).all {
isNotEmpty() // header is there
range(0, 4).isEqualTo("RIFF".toByteArray())
Expand All @@ -113,8 +121,8 @@ private inline fun <reified T : Any> assert24BitWavOutput(service: AudioService)
}
}

private inline fun <reified T : Any> assert32BitWavOutput(service: AudioService) {
assertThat(service.stream(AudioStreamOutputFormat.WAV, "table", 44100.0f, BitDepth.BIT_32, T::class, null)).all {
private fun <T : Any> assert32BitWavOutput(service: AudioService) {
assertThat(service.stream<T>(AudioStreamOutputFormat.WAV, "table", BitDepth.BIT_32, null, 0.s)).all {
take(44).all {
isNotEmpty() // header is there
range(0, 4).isEqualTo("RIFF".toByteArray())
Expand All @@ -125,8 +133,8 @@ private inline fun <reified T : Any> assert32BitWavOutput(service: AudioService)
}
}

private inline fun <reified T : Any> assert16BitWavOutput(service: AudioService) {
assertThat(service.stream(AudioStreamOutputFormat.WAV, "table", 44100.0f, BitDepth.BIT_16, T::class, null)).all {
private fun <T : Any> assert16BitWavOutput(service: AudioService) {
assertThat(service.stream<T>(AudioStreamOutputFormat.WAV, "table", BitDepth.BIT_16, null, 0.s)).all {
take(44).all {
isNotEmpty() // header is there
range(0, 4).isEqualTo("RIFF".toByteArray())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ object HttpCommunicatorSpec : Spek({

it("should register remote table driver") {
val tableName = "myTable"
service.registerTable(tableName, "127.0.0.1:4000")
service.registerTable(tableName, "127.0.0.1:4000", 44100.0f)
assertThat(tableRegistry).all {
prop("exists($tableName)") { it.exists(tableName) }.isTrue()
prop("byName<Any>($tableName)") { it.byName<Any>(tableName) }
Expand Down
Loading

0 comments on commit aa0af48

Please sign in to comment.