Skip to content

Commit

Permalink
Created an example app (#212)
Browse files Browse the repository at this point in the history
* work in progress - example app

* scala steward and newlines

* WIP: debugging failed runs

* fixed .sbt settings and updated CI

* added to docs + PR comments

* PR fixes

* fixed misspell

* PR fixes
  • Loading branch information
LukaszKontowski committed Aug 1, 2022
1 parent 3cdb285 commit 87940d7
Show file tree
Hide file tree
Showing 26 changed files with 563 additions and 9 deletions.
14 changes: 13 additions & 1 deletion .github/workflows/standard-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ jobs:
# Curiously, scalafmtCheckAll doesn't cover *.sbt files, hence scalafmtSbtCheck is needed as well
run: sbt scalafmtCheckAll scalafmtSbtCheck "scalafixAll --check"

- name: Check code style - examples/event-migration
run: sbt scalafmtCheckAll scalafmtSbtCheck "scalafixAll --check"
working-directory: examples/event-migration

- name: Check code style - examples/akka-cluster-app
run: sbt scalafmtCheckAll scalafmtSbtCheck "scalafixAll --check"
working-directory: examples/akka-cluster-app

test-212:
runs-on: ubuntu-latest

Expand Down Expand Up @@ -134,10 +142,14 @@ jobs:
with:
java-version: adopt@1.11

- name: Run examples
- name: Test event-migration-examples
run: sbt test
working-directory: examples/event-migration

- name: Compile akka-cluster-app
run: sbt compile
working-directory: examples/akka-cluster-app

publish-maven-artifacts:
needs: [code-style-check, test-212, test-213-1, test-213-2, test-sbt-plugin, run-examples]
if: github.event_name != 'pull_request'
Expand Down
2 changes: 1 addition & 1 deletion .scala-steward.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ updatePullRequests = "always"

commits.message = "Scala-Steward: Update ${artifactName} from ${currentVersion} to ${nextVersion}"

buildRoots = [ ".", "examples/event-migration/" ]
buildRoots = [ ".", "examples/event-migration/", "examples/example-app/" ]
19 changes: 18 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,19 @@ and using it in another project. Make sure that `mavenLocal` is added to the res
sbt publishM2
```

You can find a simple example application that uses Akka Serialization Helper [here](examples/akka-cluster-app).
This app can be used for basic runtime testing as well. First, go to the app's directory:
```shell
cd examples/akka-cluster-app
```
And then invoke following commands in separate terminal
windows so that three processes run in parallel:
```shell
sbt "runMain org.virtuslab.example.App compute 25251"
sbt "runMain org.virtuslab.example.App compute 25252"
sbt "runMain org.virtuslab.example.App client 0"
```

### Testing

To run unit tests, type
Expand Down Expand Up @@ -63,7 +76,11 @@ Before committing, don't forget to type
```shell
sbt scalafmtAll scalafixAll scalafmtSbt
```
to format the code, .sbt files and check imports. You can use `pre-commit` hook, provided in `./pre-commit`, to do formating and checking automatically.
to format the code, .sbt files and check imports. Run this command in the following directories:
- the base directory ( `.` )
- `examples/akka-cluster-app`
- `examples/event-migration`
You can use `pre-commit` hook, provided in `./pre-commit`, to do formatting and checking automatically.

Additionally, all warnings locally are escalated to errors in CI, so make sure there are none.

Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,9 @@ Test / ashCompilerPluginEnable := false

For full list of sbt keys, check [`org.virtuslab.ash.AkkaSerializationHelperKeys`](https://github.com/VirtusLab/akka-serialization-helper/blob/main/sbt-akka-serialization-helper/src/main/scala/org/virtuslab/ash/AkkaSerializationHelperKeys.scala).

## Example application
Check out simple akka-cluster application which uses Akka Serialization Helper: [ASH code example](examples/akka-cluster-app).

## Step-by-step guide
See [full step-by-step guide](docs/GUIDE.md) on Akka Serialization Helper usage.

Expand Down
2 changes: 1 addition & 1 deletion docs/GUIDE.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Step-by-step guide for Akka Serialization Helper usage
This document is a guide on how to use Akka Serialization Helper in your project. If you want to get a more general view of this toolbox, see [README](../README.md).
This document is a guide on how to use Akka Serialization Helper in your project. If you want to get a more general view of this toolbox, see [README](../README.md). Moreover, it could be a good idea to see the [example akka-cluster-app](../examples/akka-cluster-app) first as a code example of basic Akka Serialization Helper usage.

Akka Serialization Helper (ASH) has two major parts:
1. Circe Akka Serializer
Expand Down
13 changes: 13 additions & 0 deletions examples/akka-cluster-app/.scalafix.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
rules = [
OrganizeImports
]

OrganizeImports.expandRelative = true
OrganizeImports.removeUnused = true
OrganizeImports.groupedImports = Explode
OrganizeImports.groups = [
"java.",
"scala.",
"*",
"org.virtuslab.ash"
]
53 changes: 53 additions & 0 deletions examples/akka-cluster-app/.scalafmt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
version = 3.5.8

style = defaultWithAlign

runner.dialect = scala213
docstrings.style = Asterisk
indentOperator.preset = spray
maxColumn = 120
rewrite.rules = [RedundantParens, AvoidInfix]
align.tokens = [{code = "=>", owner = "Case"}]
align.openParenDefnSite = false
align.openParenCallSite = false
optIn.breakChainOnFirstMethodDot = false
optIn.configStyleArguments = false
danglingParentheses.defnSite = false
danglingParentheses.callSite = false
rewrite.neverInfix.excludeFilters = [
and
min
max
until
to
by
eq
ne
"should.*"
"contain.*"
"must.*"
in
ignore
be
taggedAs
thrownBy
synchronized
have
when
size
only
noneOf
oneElementOf
noElementsOf
atLeastOneElementOf
atMostOneElementOf
allElementsOf
inOrderElementsOf
theSameElementsAs
message
]
rewriteTokens = {
"⇒": "=>"
"→": "->"
"←": "<-"
}
36 changes: 36 additions & 0 deletions examples/akka-cluster-app/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import com.typesafe.sbt.SbtMultiJvm.multiJvmSettings
import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.MultiJvm
import org.virtuslab.ash.AkkaSerializationHelperPlugin

name := "akka-cluster-app"
version := "0.1"
scalaVersion := "2.13.8"

val circeVersion = "0.14.2"
val akkaVersion = "2.6.19"
val logbackVersion = "1.2.11"

lazy val `akka-cluster-app` = project
.in(file("."))
.enablePlugins(AkkaSerializationHelperPlugin)
.settings(multiJvmSettings: _*)
.settings(
libraryDependencies ++= akkaDependencies ++ ashDependencies ++ Seq(logbackDependency, circeDependency),
fork := true, // must be true due to https://discuss.lightbend.com/t/akka-projection-getting-started-guide-example-could-not-run-eventgeneratorapp/9434/2
Global / cancelable := false,
scalacOptions += "-Ywarn-unused")
.configs(MultiJvm)

lazy val akkaDependencies =
Seq("com.typesafe.akka" %% "akka-actor-typed", "com.typesafe.akka" %% "akka-cluster-typed").map(_ % akkaVersion)

lazy val circeDependency = "io.circe" %% "circe-core" % circeVersion

lazy val ashDependencies =
Seq(AkkaSerializationHelperPlugin.annotation, AkkaSerializationHelperPlugin.circeAkkaSerializer)

lazy val logbackDependency = "ch.qos.logback" % "logback-classic" % logbackVersion

ThisBuild / semanticdbEnabled := true
ThisBuild / semanticdbVersion := "4.5.9"
ThisBuild / scalafixDependencies += "com.github.liancheng" %% "organize-imports" % "0.6.0"
1 change: 1 addition & 0 deletions examples/akka-cluster-app/project/build.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version=1.7.1
4 changes: 4 additions & 0 deletions examples/akka-cluster-app/project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
addSbtPlugin("com.typesafe.sbt" % "sbt-multi-jvm" % "0.4.0")
addSbtPlugin("org.virtuslab.ash" % "sbt-akka-serialization-helper" % "0.6.1")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6")
addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.10.1")
27 changes: 27 additions & 0 deletions examples/akka-cluster-app/src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
akka {
actor {
provider = cluster

serializers {
circe-json = "org.virtuslab.example.ExampleSerializer"
}

serialization-bindings {
"org.virtuslab.example.CirceAkkaSerializable" = circe-json
}
}
remote.artery {
canonical {
hostname = "127.0.0.1"
port = 0
}
}

cluster {
seed-nodes = [
"akka://ClusterSystem@127.0.0.1:25251",
"akka://ClusterSystem@127.0.0.1:25252"]

downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
}
}
5 changes: 5 additions & 0 deletions examples/akka-cluster-app/src/main/resources/example.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
include "application"

example-service {
workers-per-node = 4
}
20 changes: 20 additions & 0 deletions examples/akka-cluster-app/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!-- This is a development logging configuration that logs to standard out, for an example of a production
logging config, see the Akka docs: https://doc.akka.io/docs/akka/2.6/typed/logging.html#logback -->
<appender name="STDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>[%date{ISO8601}] [%level] [%logger] [%marker] [%thread] - %msg%n</pattern>
</encoder>
</appender>

<appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
<queueSize>8192</queueSize>
<neverBlock>true</neverBlock>
<appender-ref ref="STDOUT" />
</appender>

<root level="INFO">
<appender-ref ref="ASYNC"/>
</root>
</configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package org.virtuslab.example

import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.receptionist.Receptionist
import akka.actor.typed.receptionist.ServiceKey
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.Routers
import akka.cluster.typed.Cluster
import com.typesafe.config.ConfigFactory

/**
* An example akka cluster application that uses Akka Serialization Helper. In order to run the application locally, run
* the following commands in separate terminal windows (so that 3 separate processes run in parallel):
*
* - sbt "runMain org.virtuslab.example.App compute 25251"
* - sbt "runMain org.virtuslab.example.App compute 25252"
* - sbt "runMain org.virtuslab.example.App client 0"
*
* Note: this example-app's logic is based on akka-sample-custer-scala code from the official Akka repository. If you
* want to check this, see https://github.com/akka/akka-samples/tree/2.6/akka-sample-cluster-scala
*/
object App {

val StatsServiceKey = ServiceKey[StatsService.ProcessText]("StatsService")

private object RootBehavior {
def apply(): Behavior[Nothing] = Behaviors.setup[Nothing] { ctx =>
val cluster = Cluster(ctx.system)
if (cluster.selfMember.hasRole("compute")) {
// on every compute node there is one service instance that delegates to N local workers
val numberOfWorkers =
ctx.system.settings.config.getInt("example-service.workers-per-node")
val workers = ctx.spawn(
Routers
.pool(numberOfWorkers)(StatsWorker().narrow[StatsWorker.Process])
// the worker has a per word cache, so send the same word to the same local worker child
.withConsistentHashingRouting(1, _.word),
"WorkerRouter")

val service = ctx.spawn(StatsService(workers), "StatsService")

// published through the receptionist to the other nodes in the cluster
ctx.system.receptionist ! Receptionist.Register(StatsServiceKey, service)
}
if (cluster.selfMember.hasRole("client")) {
val serviceRouter =
ctx.spawn(Routers.group(App.StatsServiceKey), "ServiceRouter")
ctx.spawn(StatsClient(serviceRouter), "Client")
}
Behaviors.empty[Nothing]
}
}

def main(args: Array[String]): Unit = {
require(args.size == 2, "Usage: role port")
startup(args(0), args(1).toInt)
}

private def startup(role: String, port: Int): Unit = {
// Override the configuration of the port when specified as program argument
val config = ConfigFactory
.parseString(s"""
akka.remote.artery.canonical.port=$port
akka.cluster.roles = [$role]
""")
.withFallback(ConfigFactory.load("example"))

ActorSystem[Nothing](RootBehavior(), "ClusterSystem", config)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.virtuslab.example

import org.virtuslab.ash.annotation.SerializabilityTrait

@SerializabilityTrait
trait CirceAkkaSerializable extends Product with Serializable
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.virtuslab.example

import akka.actor.ExtendedActorSystem

import org.virtuslab.ash.annotation.Serializer
import org.virtuslab.ash.circe.CirceAkkaSerializer
import org.virtuslab.ash.circe.Register
import org.virtuslab.ash.circe.Registration

@Serializer(classOf[CirceAkkaSerializable], Register.REGISTRATION_REGEX)
class ExampleSerializer(actorSystem: ExtendedActorSystem)
extends CirceAkkaSerializer[CirceAkkaSerializable](actorSystem) {
override def identifier: Int = 2137

override lazy val codecs: Seq[Registration[_ <: CirceAkkaSerializable]] = Seq(
Register[StatsClient.Event],
Register[StatsService.Command],
Register[StatsService.Response],
Register[StatsAggregator.Event],
Register[StatsWorker.Command],
Register[StatsWorker.Processed])

override lazy val manifestMigrations: Seq[(String, Class[_])] = Nil

override lazy val packagePrefix: String = "org.virtuslab.example"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.virtuslab.example

import scala.concurrent.duration._

import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import io.circe.Codec
import io.circe.generic.semiauto.deriveCodec

object StatsClient {

sealed trait Event extends CirceAkkaSerializable
private case object Tick extends Event
private case class ServiceResponse(result: StatsService.Response) extends Event

implicit lazy val codecEvent: Codec[Event] = deriveCodec

def apply(service: ActorRef[StatsService.ProcessText]): Behavior[Event] =
Behaviors.setup { ctx =>
Behaviors.withTimers { timers =>
timers.startTimerWithFixedDelay(Tick, Tick, 2.seconds)
val responseAdapter = ctx.messageAdapter(ServiceResponse)

Behaviors.receiveMessage {
case Tick =>
ctx.log.info("Sending process request")
service ! StatsService.ProcessText("this is the text that will be analyzed", responseAdapter)
Behaviors.same
case ServiceResponse(result) =>
ctx.log.info("Service result: {}", result)
Behaviors.same
}
}
}

}
Loading

0 comments on commit 87940d7

Please sign in to comment.