Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Created an example app #212

Merged
merged 11 commits into from
Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/standard-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,14 @@ jobs:
with:
java-version: adopt@1.11

- name: Run examples
- name: Run event-migration-examples
LukaszKontowski marked this conversation as resolved.
Show resolved Hide resolved
run: sbt test
working-directory: examples/event-migration

- name: Compile example-app
run: sbt compile
working-directory: examples/example-app

publish-maven-artifacts:
needs: [code-style-check, test-212, test-213-1, test-213-2, test-sbt-plugin, run-examples]
if: github.event_name != 'pull_request'
Expand Down
2 changes: 1 addition & 1 deletion .scala-steward.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ updatePullRequests = "always"

commits.message = "Scala-Steward: Update ${artifactName} from ${currentVersion} to ${nextVersion}"

buildRoots = [ ".", "examples/event-migration/" ]
buildRoots = [ ".", "examples/event-migration/", "examples/example-app/" ]
15 changes: 14 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ and using it in another project. Make sure that `mavenLocal` is added to the res
sbt publishM2
```

You can find a simple example application that uses Akka Serialization Helper [here](examples/example-app).
LukaszKontowski marked this conversation as resolved.
Show resolved Hide resolved
This app can be used for basic runtime testing as well. Just invoke following commands in separate terminal
windows so that three processes run in parallel:
```shell
LukaszKontowski marked this conversation as resolved.
Show resolved Hide resolved
sbt "runMain org.virtuslab.example.App compute 25251"
sbt "runMain org.virtuslab.example.App compute 25252"
sbt "runMain org.virtuslab.example.App client 0"
```

### Testing

To run unit tests, type
Expand Down Expand Up @@ -63,7 +72,11 @@ Before committing, don't forget to type
```shell
sbt scalafmtAll scalafixAll scalafmtSbt
```
to format the code, .sbt files and check imports. You can use `pre-commit` hook, provided in `./pre-commit`, to do formating and checking automatically.
to format the code, .sbt files and check imports. Run this command in following directories:
- the base directory ( `.` )
- `examples/event-migration`
- `examples/example-app`
You can use `pre-commit` hook, provided in `./pre-commit`, to do formating and checking automatically.
LukaszKontowski marked this conversation as resolved.
Show resolved Hide resolved

Additionally, all warnings locally are escalated to errors in CI, so make sure there are none.

Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,9 @@ Test / ashCompilerPluginEnable := false

For full list of sbt keys, check [`org.virtuslab.ash.AkkaSerializationHelperKeys`](https://github.com/VirtusLab/akka-serialization-helper/blob/main/sbt-akka-serialization-helper/src/main/scala/org/virtuslab/ash/AkkaSerializationHelperKeys.scala).

## Example application
Check out simple akka-cluster application which uses Akka Serialization Helper: [ASH code example](examples/example-app).

## Step-by-step guide
See [full step-by-step guide](docs/GUIDE.md) on Akka Serialization Helper usage.

Expand Down
2 changes: 1 addition & 1 deletion docs/GUIDE.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Step-by-step guide for Akka Serialization Helper usage
This document is a guide on how to use Akka Serialization Helper in your project. If you want to get a more general view of this toolbox, see [README](../README.md).
This document is a guide on how to use Akka Serialization Helper in your project. If you want to get a more general view of this toolbox, see [README](../README.md). Moreover, it could be a good idea to see the [example app](../examples/example-app) first as a code example of basic Akka Serialization Helper usage.

Akka Serialization Helper (ASH) has two major parts:
1. Circe Akka Serializer
Expand Down
35 changes: 35 additions & 0 deletions examples/example-app/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import com.typesafe.sbt.SbtMultiJvm.multiJvmSettings
import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.MultiJvm
import org.virtuslab.ash.AkkaSerializationHelperPlugin

name := "example-app"
version := "0.1"
scalaVersion := "2.13.6"

val circeVersion = "0.14.2"
val akkaVersion = "2.6.19"
val logbackVersion = "1.2.11"

lazy val `example-app` = project
.in (file("."))
.enablePlugins(AkkaSerializationHelperPlugin)
.settings(multiJvmSettings: _*)
.settings(
libraryDependencies ++= akkaDependencies ++ ashDependencies ++ Seq(logbackDependency, circeDependency),
fork := true,
LukaszKontowski marked this conversation as resolved.
Show resolved Hide resolved
Global / cancelable := false
LukaszKontowski marked this conversation as resolved.
Show resolved Hide resolved
)
.configs(MultiJvm)

lazy val akkaDependencies = Seq(
"com.typesafe.akka" %% "akka-actor-typed",
"com.typesafe.akka" %% "akka-cluster-typed").map(_ % akkaVersion)

lazy val circeDependency = "io.circe" %% "circe-core" % circeVersion

lazy val ashDependencies = Seq(
AkkaSerializationHelperPlugin.annotation,
AkkaSerializationHelperPlugin.circeAkkaSerializer
)

lazy val logbackDependency = "ch.qos.logback" % "logback-classic" % logbackVersion
1 change: 1 addition & 0 deletions examples/example-app/project/build.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version=1.7.1
2 changes: 2 additions & 0 deletions examples/example-app/project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
addSbtPlugin("com.typesafe.sbt" % "sbt-multi-jvm" % "0.4.0")
LukaszKontowski marked this conversation as resolved.
Show resolved Hide resolved
addSbtPlugin("org.virtuslab.ash" % "sbt-akka-serialization-helper" % "0.6.1")
27 changes: 27 additions & 0 deletions examples/example-app/src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
akka {
actor {
provider = cluster

serializers {
circe-json = "org.virtuslab.example.ExampleSerializer"
}

serialization-bindings {
"org.virtuslab.example.CirceAkkaSerializable" = circe-json
}
}
remote.artery {
canonical {
hostname = "127.0.0.1"
port = 0
}
}

cluster {
seed-nodes = [
"akka://ClusterSystem@127.0.0.1:25251",
"akka://ClusterSystem@127.0.0.1:25252"]

downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
}
}
5 changes: 5 additions & 0 deletions examples/example-app/src/main/resources/example.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
include "application"

example-service {
workers-per-node = 4
}
20 changes: 20 additions & 0 deletions examples/example-app/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!-- This is a development logging configuration that logs to standard out, for an example of a production
logging config, see the Akka docs: https://doc.akka.io/docs/akka/2.6/typed/logging.html#logback -->
<appender name="STDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>[%date{ISO8601}] [%level] [%logger] [%marker] [%thread] - %msg%n</pattern>
</encoder>
</appender>

<appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
<queueSize>8192</queueSize>
<neverBlock>true</neverBlock>
<appender-ref ref="STDOUT" />
</appender>

<root level="INFO">
<appender-ref ref="ASYNC"/>
</root>
</configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package org.virtuslab.example

import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.receptionist.Receptionist
import akka.actor.typed.receptionist.ServiceKey
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.Routers
import akka.cluster.typed.Cluster
import com.typesafe.config.ConfigFactory

/**
* An example akka cluster application that uses Akka Serialization Helper. In order to run the application locally,
* run following commands in separate terminal windows (so that 3 separate processes run in parallel):
LukaszKontowski marked this conversation as resolved.
Show resolved Hide resolved
*
* sbt "runMain org.virtuslab.example.App compute 25251"
* sbt "runMain org.virtuslab.example.App compute 25252"
* sbt "runMain org.virtuslab.example.App client 0"
*
* Note: this example-app's logic is based on akka-sample-custer-scala code from the official Akka repository.
* If you want to check this, see https://github.com/akka/akka-samples/tree/2.6/akka-sample-cluster-scala
*
*/
object App {

val StatsServiceKey = ServiceKey[StatsService.ProcessText]("StatsService")

private object RootBehavior {
def apply(): Behavior[Nothing] = Behaviors.setup[Nothing] { ctx =>
val cluster = Cluster(ctx.system)
if (cluster.selfMember.hasRole("compute")) {
// on every compute node there is one service instance that delegates to N local workers
val numberOfWorkers =
ctx.system.settings.config.getInt("example-service.workers-per-node")
val workers = ctx
.spawn(
Routers
.pool(numberOfWorkers)(StatsWorker().narrow[StatsWorker.Process])
// the worker has a per word cache, so send the same word to the same local worker child
.withConsistentHashingRouting(1, _.word),
"WorkerRouter"
)

val service = ctx.spawn(StatsService(workers), "StatsService")

// published through the receptionist to the other nodes in the cluster
ctx.system.receptionist ! Receptionist
.Register(StatsServiceKey, service)
}
if (cluster.selfMember.hasRole(("client"))) {
val serviceRouter =
ctx.spawn(Routers.group(App.StatsServiceKey), "ServiceRouter")
ctx.spawn(StatsClient(serviceRouter), "Client")
}
Behaviors.empty[Nothing]
}
}

def main(args: Array[String]): Unit = {
require(args.size == 2, "Usage: role port")
startup(args(0), args(1).toInt)
}

private def startup(role: String, port: Int): Unit = {
// Override the configuration of the port when specified as program argument
val config = ConfigFactory
.parseString(s"""
akka.remote.artery.canonical.port=$port
akka.cluster.roles = [$role]
""")
.withFallback(ConfigFactory.load("example"))

ActorSystem[Nothing](RootBehavior(), "ClusterSystem", config)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.virtuslab.example

import org.virtuslab.ash.annotation.SerializabilityTrait

@SerializabilityTrait
trait CirceAkkaSerializable extends Product with Serializable
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.virtuslab.example

import org.virtuslab.ash.annotation.Serializer
import org.virtuslab.ash.circe.{CirceAkkaSerializer, Register, Registration}
import akka.actor.ExtendedActorSystem

@Serializer(classOf[CirceAkkaSerializable], Register.REGISTRATION_REGEX)
class ExampleSerializer(actorSystem: ExtendedActorSystem) extends CirceAkkaSerializer[CirceAkkaSerializable](actorSystem) {
override def identifier: Int = 2137
LukaszKontowski marked this conversation as resolved.
Show resolved Hide resolved

override lazy val codecs: Seq[Registration[_ <: CirceAkkaSerializable]] = Seq(
Register[StatsClient.Event],
Register[StatsService.Command],
Register[StatsService.Response],
Register[StatsAggregator.Event],
Register[StatsWorker.Command],
Register[StatsWorker.Processed]
)

override lazy val manifestMigrations: Seq[(String, Class[_])] = Nil

override lazy val packagePrefix: String = "org.virtuslab.example"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.virtuslab.example

import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors

import io.circe.Codec
import io.circe.generic.semiauto.deriveCodec

import scala.concurrent.duration._

object StatsClient {

sealed trait Event extends CirceAkkaSerializable // extends is our code
private case object Tick extends Event
private case class ServiceResponse(result: StatsService.Response) extends Event

implicit lazy val codecEvent: Codec[Event] = deriveCodec // our code

def apply(service: ActorRef[StatsService.ProcessText]): Behavior[Event] =
Behaviors.setup { ctx =>
Behaviors.withTimers { timers =>
timers.startTimerWithFixedDelay(Tick, Tick, 2.seconds)
val responseAdapter = ctx.messageAdapter(ServiceResponse)

Behaviors.receiveMessage {
case Tick =>
ctx.log.info("Sending process request")
service ! StatsService.ProcessText("this is the text that will be analyzed", responseAdapter)
Behaviors.same
case ServiceResponse(result) =>
ctx.log.info("Service result: {}", result)
Behaviors.same
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package org.virtuslab.example

import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors

import io.circe.Codec
import io.circe.generic.semiauto.deriveCodec

import scala.concurrent.duration._

import org.virtuslab.ash.circe.AkkaCodecs

//#service
LukaszKontowski marked this conversation as resolved.
Show resolved Hide resolved
object StatsService {

sealed trait Command extends CirceAkkaSerializable // extends is our code
final case class ProcessText(text: String, replyTo: ActorRef[Response]) extends Command {
require(text.nonEmpty)
}
case object Stop extends Command

sealed trait Response extends CirceAkkaSerializable // extends is our code
final case class JobResult(meanWordLength: Double) extends Response
final case class JobFailed(reason: String) extends Response

implicit lazy val codecResponse: Codec[Response] = deriveCodec // our code
implicit lazy val codecActorRefResponse: Codec[ActorRef[Response]] = new AkkaCodecs{}.actorRefCodec // our code
implicit lazy val codecCommand: Codec[Command] = deriveCodec // our code

def apply(workers: ActorRef[StatsWorker.Process]): Behavior[Command] =
Behaviors.setup { ctx =>
// if all workers would crash/stop we want to stop as well
ctx.watch(workers)

Behaviors.receiveMessage {
case ProcessText(text, replyTo) =>
ctx.log.info("Delegating request")
val words = text.split(' ').toIndexedSeq
// create per request actor that collects replies from workers
ctx.spawnAnonymous(StatsAggregator(words, workers, replyTo))
Behaviors.same
case Stop =>
Behaviors.stopped
}
}
}

object StatsAggregator {

sealed trait Event extends CirceAkkaSerializable // extends is our code
private case object Timeout extends Event
private case class CalculationComplete(length: Int) extends Event

implicit lazy val codecEvent: Codec[Event] = deriveCodec // our code

def apply(words: Seq[String], workers: ActorRef[StatsWorker.Process], replyTo: ActorRef[StatsService.Response]): Behavior[Event] =
Behaviors.setup { ctx =>
ctx.setReceiveTimeout(3.seconds, Timeout)
val responseAdapter = ctx.messageAdapter[StatsWorker.Processed](processed =>
CalculationComplete(processed.length)
)

words.foreach { word =>
workers ! StatsWorker.Process(word, responseAdapter)
}
waiting(replyTo, words.size, Nil)
}

private def waiting(replyTo: ActorRef[StatsService.Response], expectedResponses: Int, results: List[Int]): Behavior[Event] =
Behaviors.receiveMessage {
case CalculationComplete(length) =>
val newResults = results :+ length
if (newResults.size == expectedResponses) {
val meanWordLength = newResults.sum.toDouble / newResults.size
replyTo ! StatsService.JobResult(meanWordLength)
Behaviors.stopped
} else {
waiting(replyTo, expectedResponses, newResults)
}
case Timeout =>
replyTo ! StatsService.JobFailed("Service unavailable, try again later")
Behaviors.stopped
}

}
//#service
Loading