Skip to content

Commit

Permalink
[Remote table driver implementation and leveraging it in HTTP service]
Browse files Browse the repository at this point in the history
[#59][switched internal communication to gRPC]
  • Loading branch information
asubb committed Jun 4, 2020
1 parent e80e78c commit 12dcc33
Show file tree
Hide file tree
Showing 73 changed files with 2,433 additions and 1,265 deletions.
18 changes: 1 addition & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,23 +141,7 @@ Among everything else, you can find artifact of Cli tool under `cli/build/distri

### Using IDE for development

Intellij IDEA is recommended way to develop the framework, however you may find any other IDE like Eclipse with Kotlin plugin work smoothly with no issues.

**Intellij IDEA project set up**

1. Open `build.gradle` as a project inside IDE
2. Choose to use gradle wrapper.
3. Wait for IDE to fetch the project and index everything... and you're pretty much done.

**Running tests in Intellij IDEA**

Project uses [Spek 2](https://www.spekframework.org/) testing framework. You need to install [appropriate Spek plugin](https://plugins.jetbrains.com/plugin/10915-spek-framework/) first. However, at the time of writing you weren't been able to run all tests within one Run configuration, so any module needed to be configured separately via JUnit runner:

* Create JUnit runner, name it, let's say `exe tests`, `lib tests`, `cli tests`
* Test Kind: `All in package`
* Package: `io.wavebeans`
* User classpath of module choose one of: `wavebeans.exe.test`, `wavebeans.lib.test`, or `wavebeans.cli.test`
* Everything else may remain with default values.
Follow [this instructions](/docs/dev/setting-up-environment.md).

## Contribution

Expand Down
2 changes: 1 addition & 1 deletion cli/src/main/kotlin/io/wavebeans/cli/WaveBeansCli.kt
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class WaveBeansCli(
val m = Option("m", "run-mode", true, "Running script in distributed mode, specify exact overseer. Default is: ${RunMode.LOCAL.id}. Supported: ${RunMode.values().joinToString(", ") { it.id }}. ")
val p = Option("p", "partitions", true, "Number of partitions to use in Distributed mode.")
val t = Option("t", "threads", true, "Number of threads to use in Distributed mode.")
val l = Option("l", "facilitators", true, "Comma-separated list of facilitator locations, i.e. http://10.0.0.1:40000,http://10.0.0.2:40000")
val l = Option("l", "facilitators", true, "Comma-separated list of facilitator locations, i.e. 10.0.0.1:40000,10.0.0.2:40000")
val s = Option("s", "sample-rate", true, "Sample rate in Hz to use for outputs. By default, it's 44100.")
val v = Option("v", "version", false, "Prints version of the tool.")
val debug = Option(null, "debug", false, "DEBUG level of logging in to file under `logs` directory. By default it is INFO")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class DistributedScriptEvaluator(
overseer = DistributedOverseer(
outputs,
facilitatorLocations,
emptyList(),
partitions,
additionalClasses = additionalClasses ?: emptyMap(),
ignoreLocations = listOf(
Expand Down
7 changes: 2 additions & 5 deletions cli/src/test/kotlin/io/wavebeans/cli/WaveBeansCliSpec.kt
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,8 @@ object WaveBeansCliSpec : Spek({
val portRange = 40000..40001
val gardeners = portRange.map {
Facilitator(
advertisingHostAddress = "127.0.0.1",
listeningPortRange = it..it,
startingUpAttemptsCount = 1,
communicatorPort = it,
threadsNumber = 2,
onServerShutdownGracePeriodMillis = 100,
onServerShutdownTimeoutMillis = 100,
podDiscovery = object : PodDiscovery() {}
)
Expand Down Expand Up @@ -136,7 +133,7 @@ object WaveBeansCliSpec : Spek({
"--time",
"--run-mode", "distributed",
"--partitions", "2",
"--facilitators", portRange.map { "http://127.0.0.1:$it" }.joinToString(",")
"--facilitators", portRange.map { "127.0.0.1:$it" }.joinToString(",")
)),
printer = PrintWriter(out)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@ object ScriptRunnerSpec : Spek({
val portRange = 40000..40001
val gardeners = portRange.map {
Facilitator(
advertisingHostAddress = "127.0.0.1",
listeningPortRange = it..it,
startingUpAttemptsCount = 1,
communicatorPort = it,
threadsNumber = 2,
onServerShutdownGracePeriodMillis = 100,
onServerShutdownTimeoutMillis = 100,
podDiscovery = object : PodDiscovery() {}
)
Expand All @@ -32,7 +29,7 @@ object ScriptRunnerSpec : Spek({
}

arrayOf(
Pair(RunMode.DISTRIBUTED, mapOf("partitions" to 2, "facilitatorLocations" to portRange.map { "http://127.0.0.1:$it" })),
Pair(RunMode.DISTRIBUTED, mapOf("partitions" to 2, "facilitatorLocations" to portRange.map { "127.0.0.1:$it" })),
Pair(RunMode.MULTI_THREADED, mapOf<String, Any>("partitions" to 2, "threads" to 2)),
Pair(RunMode.LOCAL, emptyMap<String, Any>())
).forEach { (runMode, runOptions) ->
Expand Down
62 changes: 62 additions & 0 deletions docs/dev/setting-up-environment.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
Settting up environment
=======

The project is developed using Intelliji Idea. Community Edition should work fine. All further steps assume you have it already installed.

Preliminary steps
----

* Make sure you have Java 8 installed (the JDK version is as an example, Oracle JDK will do either):

```bash
> java -version
openjdk version "1.8.0_252"
OpenJDK Runtime Environment (AdoptOpenJDK)(build 1.8.0_252-b09)
OpenJDK 64-Bit Server VM (AdoptOpenJDK)(build 25.252-b09, mixed mode)
```

* Clone repository

```bash
git clone git@github.com:WaveBeans/wavebeans.git
```

* Make your first build and wait test to pass. No special requirements here.

```bash
./gradlew build
```

Install plugins
----

A few extra plugins you need to make sure you have installed.

* To run tests using IDE: [Spek Framework plugin](https://plugins.jetbrains.com/plugin/10915-spek-framework).
* For protobuf support: [Protobuf support plugin](https://plugins.jetbrains.com/plugin/8277-protobuf-support)

Import project
----

To import the project just open via `File > Open` in IDE the `build.gradle.kts` file in the root directory of the project, select `Open as project`. Select to use gradle wrapper.

Setting up build configurations
----

A few basic configurations are handy to create. Especially for running tests. As by the time of writing this there was no way found to run all tests at once, as well as using Spek plugin. So you need to cre JUnit runner and repeat steps for all projects (`lib`, `exe`, `cli`, `http`).

* Create configuration, then select `JUnit`.
* Name configuration, for example `LIB tests`.
* Select `Test kind: All in package`.
* Select `Search for tests: In single module`.
* Select `Use classpath or module: io.wavebeans.lib.test` (or corresponding to the project).
* Add additional VM option `-DSPEK_TIMEOUT=0` as some tests are taking more than default 10 seconds timeout and failing weirdly.

While working on Protobuf stuff
----

IDE doesn't currently build automatically proto-files, but all source folders are configured correctly. Every time you change them (or cleaned the project) regenerate them by running gradle command:

```bash
./gradlew clean generateProto
```
2 changes: 1 addition & 1 deletion docs/user/cli/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ For `multi-threaded` mode you need to pass additional parameters:

For `distributed` mode, firstly, you need to make sure facilitators are started and available. Follow [execution docs](../exe/readme.md#facilitators) for more details. The following parameters are required to start processing:
* the same as for multi-threaded, how many partitioned your processing topology will be tried to split up to: `-p` or `--partitions`.
* the list of facilitators: `-l` or `--facilitators`. The comma-separated Facilitator endpoints to run on, i.e. `http://10.0.0.1:4000,http://10.0.0.2:4000`. The execution will be spread over all facilitators automatically.
* the list of facilitators: `-l` or `--facilitators`. The comma-separated Facilitator endpoints to run on, i.e. `10.0.0.1:4000,10.0.0.2:4000`. The execution will be spread over all facilitators automatically.

**More information about execution**

Expand Down
4 changes: 2 additions & 2 deletions docs/user/exe/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ To start the overseer you need to instantiate the Distributed Overseer in your a
val overseer = DistributedOverseer(
outputs,
listOf(
"http://10.0.0.1:4000",
"http://10.0.0.2:4000"
"10.0.0.1:4000",
"10.0.0.2:4000"
),
10
)
Expand Down
1 change: 1 addition & 0 deletions exe/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies {
val ktorVersion: String by System.getProperties()

implementation(project(":lib"))
implementation(project(":proto"))

val kotlinxSerializationRuntimeVersion: String by System.getProperties()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class MultiThreadedOverseer(
override fun eval(sampleRate: Float): List<Future<ExecutionResult>> {
ExecutionConfig.threadsLimitForJvm = threadsCount
ExecutionConfig.initForMultiThreadedProcessing()
log.info { "Deploying topology: ${TopologySerializer.serialize(topology, TopologySerializer.jsonPretty)}" }
log.info { "Deploying topology: ${TopologySerializer.serialize(topology, jsonPretty())}" }
val pods = PodBuilder(topology).build()
log.info { "Pods: $pods" }
controllers += Gardener()
Expand Down
2 changes: 1 addition & 1 deletion exe/src/main/kotlin/io/wavebeans/execution/PodRef.kt
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ data class PodRef(
*/
val podProxies: List<PodProxyRef>,
/**
* If more that one then assuming that the pod should split data among several partitions. If null then not applicable
* If more than one then assuming that the pod should split data among several partitions. If null then not applicable
*/
val splitToPartitions: Int? = null
) {
Expand Down
48 changes: 48 additions & 0 deletions exe/src/main/kotlin/io/wavebeans/execution/SerializationUtils.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.wavebeans.execution

import io.wavebeans.lib.BeanParams
import io.wavebeans.lib.NoParams
import io.wavebeans.lib.io.*
import io.wavebeans.lib.stream.*
import io.wavebeans.lib.stream.fft.FftStreamParams
import io.wavebeans.lib.stream.window.WindowStreamParams
import io.wavebeans.lib.stream.window.WindowStreamParamsSerializer
import io.wavebeans.lib.table.*
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonConfiguration
import kotlinx.serialization.modules.EmptyModule
import kotlinx.serialization.modules.SerialModule
import kotlinx.serialization.modules.SerializersModuleBuilder

fun jsonCompact(paramsModule: SerialModule? = null) = Json(context = paramsModule ?: EmptyModule)
fun jsonPretty(paramsModule: SerialModule? = null) = Json(context = paramsModule
?: EmptyModule, configuration = JsonConfiguration.Stable.copy(prettyPrint = true))

fun SerializersModuleBuilder.tableQuery() {
polymorphic(TableQuery::class) {
TimeRangeTableQuery::class with TimeRangeTableQuery.serializer()
LastIntervalTableQuery::class with LastIntervalTableQuery.serializer()
ContinuousReadTableQuery::class with ContinuousReadTableQuery.serializer()
}
}

fun SerializersModuleBuilder.beanParams() {
polymorphic(BeanParams::class) {
ChangeAmplitudeSampleStreamParams::class with ChangeAmplitudeSampleStreamParams.serializer()
SineGeneratedInputParams::class with SineGeneratedInputParams.serializer()
NoParams::class with NoParams.serializer()
TrimmedFiniteSampleStreamParams::class with TrimmedFiniteSampleStreamParams.serializer()
CsvStreamOutputParams::class with CsvWindowStreamOutputParamsSerializer
BeanGroupParams::class with BeanGroupParams.serializer()
CsvFftStreamOutputParams::class with CsvFftStreamOutputParams.serializer()
FftStreamParams::class with FftStreamParams.serializer()
WindowStreamParams::class with WindowStreamParamsSerializer
ProjectionBeanStreamParams::class with ProjectionBeanStreamParams.serializer()
MapStreamParams::class with MapStreamParamsSerializer
InputParams::class with InputParamsSerializer
FunctionMergedStreamParams::class with FunctionMergedStreamParamsSerializer
ListAsInputParams::class with ListAsInputParamsSerializer
TableOutputParams::class with TableOutputParamsSerializer
TableDriverStreamParams::class with TableDriverStreamParams.serializer()
}
}
18 changes: 18 additions & 0 deletions exe/src/main/kotlin/io/wavebeans/execution/TableQuerySerializer.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.wavebeans.execution

import io.wavebeans.lib.table.TableQuery
import kotlinx.serialization.PolymorphicSerializer
import kotlinx.serialization.json.Json
import kotlinx.serialization.modules.SerializersModule

object TableQuerySerializer {
val paramsModule = SerializersModule {
tableQuery()
}

private val json = jsonCompact(paramsModule)

fun deserialize(query: String): TableQuery = json.parse(PolymorphicSerializer(TableQuery::class), query)

fun serialize(query: TableQuery, json: Json = this.json): String = json.stringify(PolymorphicSerializer(TableQuery::class), query)
}
42 changes: 5 additions & 37 deletions exe/src/main/kotlin/io/wavebeans/execution/TopologySerializer.kt
Original file line number Diff line number Diff line change
@@ -1,50 +1,18 @@
package io.wavebeans.execution

import io.wavebeans.execution.distributed.AnySerializer
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonConfiguration
import kotlinx.serialization.modules.SerializersModule
import io.wavebeans.lib.BeanParams
import io.wavebeans.lib.NoParams
import io.wavebeans.lib.io.*
import io.wavebeans.lib.stream.*
import io.wavebeans.lib.stream.fft.FftStreamParams
import io.wavebeans.lib.stream.window.WindowStreamParams
import io.wavebeans.lib.stream.window.WindowStreamParamsSerializer
import io.wavebeans.lib.table.*

object TopologySerializer {

val paramsModule = SerializersModule {
polymorphic(BeanParams::class) {
ChangeAmplitudeSampleStreamParams::class with ChangeAmplitudeSampleStreamParams.serializer()
SineGeneratedInputParams::class with SineGeneratedInputParams.serializer()
NoParams::class with NoParams.serializer()
TrimmedFiniteSampleStreamParams::class with TrimmedFiniteSampleStreamParams.serializer()
CsvStreamOutputParams::class with CsvWindowStreamOutputParamsSerializer
BeanGroupParams::class with BeanGroupParams.serializer()
CsvFftStreamOutputParams::class with CsvFftStreamOutputParams.serializer()
FftStreamParams::class with FftStreamParams.serializer()
WindowStreamParams::class with WindowStreamParamsSerializer
ProjectionBeanStreamParams::class with ProjectionBeanStreamParams.serializer()
MapStreamParams::class with MapStreamParamsSerializer
InputParams::class with InputParamsSerializer
FunctionMergedStreamParams::class with FunctionMergedStreamParamsSerializer
ListAsInputParams::class with ListAsInputParamsSerializer
TableOutputParams::class with TableOutputParamsSerializer
TableDriverStreamParams::class with TableDriverStreamParams.serializer()
}
polymorphic(TableQuery::class) {
TimeRangeTableQuery::class with TimeRangeTableQuery.serializer()
LastIntervalTableQuery::class with LastIntervalTableQuery.serializer()
}
beanParams()
tableQuery()
}

val jsonCompact = Json(context = paramsModule)
private val json = jsonCompact(paramsModule)

val jsonPretty = Json(context = paramsModule, configuration = JsonConfiguration.Stable.copy(prettyPrint = true))
fun deserialize(topology: String): Topology = json.parse(Topology.serializer(), topology)

fun deserialize(topology: String): Topology = jsonCompact.parse(Topology.serializer(), topology)

fun serialize(topology: Topology, json: Json = jsonCompact): String = json.stringify(Topology.serializer(), topology)
fun serialize(topology: Topology, json: Json = this.json): String = json.stringify(Topology.serializer(), topology)
}
Loading

0 comments on commit 12dcc33

Please sign in to comment.