Skip to content

Commit

Permalink
Merge pull request #59 from 422404/feature/new-actorsystem-runtime
Browse files Browse the repository at this point in the history
New actor system runtime + separated the interpreter and the actor system runtime into two modules
  • Loading branch information
422404 committed Apr 8, 2023
2 parents 6fa7228 + 98479ed commit 765150b
Show file tree
Hide file tree
Showing 155 changed files with 2,927 additions and 1,460 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ jobs:
- uses: eskatos/gradle-command-action@v2
with:
arguments: test
- uses: codecov/codecov-action@v1
- uses: codecov/codecov-action@v3
with:
token: ${{ secrets.CODECOV_TOKEN }}
files: ./parser/build/reports/jacoco/report.xml, ./runtime/build/reports/jacoco/report.xml
files: ./parser/build/reports/jacoco/report.xml,./interpreter/build/reports/jacoco/report.xml,./actorsystem-servicecontract/build/reports/jacoco/report.xml,./actorsystem-impl/build/reports/jacoco/report.xml
name: codecov-actorlang
fail_ci_if_error: true
57 changes: 57 additions & 0 deletions actorsystem-impl/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
plugins {
id 'org.jetbrains.kotlin.jvm'
id 'org.jlleitschuh.gradle.ktlint'
id 'com.adarshr.test-logger'
id 'jacoco'
}

group = 'org.actorlang'
version = '1.1.1-SNAPSHOT'
archivesBaseName = 'org.actorlang.actorsystem-impl'

repositories {
mavenCentral()
}

jacoco {
toolVersion = '0.8.7'
}

jacocoTestReport {
reports {
xml.enabled = true
xml.destination = file("${buildDir}/reports/jacoco/report.xml")
html.enabled = true
html.destination = file("${buildDir}/reports/jacoco/html")
csv.enabled = false
}
}

dependencies {
implementation project(":actorsystem-servicecontract")

// Tests
testImplementation "org.mockito:mockito-core:3.+"
testImplementation 'org.mockito:mockito-inline:3.11.0'
testImplementation "org.mockito.kotlin:mockito-kotlin:3.2.0"
testImplementation 'org.jetbrains.kotlin:kotlin-test-junit5'
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.6.0'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.6.0'

implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8"
implementation "io.github.microutils:kotlin-logging-jvm:3.0.4"
runtimeOnly "org.slf4j:slf4j-simple:2.0.3"
}

test {
useJUnitPlatform()
finalizedBy jacocoTestReport
}

compileKotlin {
kotlinOptions.jvmTarget = '1.8'
}

compileTestKotlin {
kotlinOptions.jvmTarget = '1.8'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.actorlang.actorsystem

import org.actorlang.actorsystem.messaging.LocalActorRef
import java.util.UUID

class ActorFactoryImpl : ActorFactory {
override fun createActor() = Actor(LocalActorRef(UUID.randomUUID()))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package org.actorlang.actorsystem

import org.actorlang.actorsystem.events.ActorThreadEventListener
import org.actorlang.actorsystem.events.ActorThreadExceptionEvent
import org.actorlang.actorsystem.events.AllLocalActorThreadsTimedOutEvent
import org.actorlang.actorsystem.events.EventListener
import org.actorlang.actorsystem.exception.ActorAlreadyRegisteredInActorSystem
import org.actorlang.actorsystem.exception.ActorNotRegisteredInActorSystem
import org.actorlang.actorsystem.exception.NoActorStateException
import org.actorlang.actorsystem.messaging.MailboxFactory
import org.actorlang.actorsystem.messaging.MessagingServer
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock

class ActorSystemImpl(
override val messagingServer: MessagingServer,
private val actorFactory: ActorFactory,
private val actorThreadFactory: ActorThreadFactory,
private val mailboxFactory: MailboxFactory,
private val eventListener: EventListener
) : ActorSystem, ActorThreadEventListener {

private val stateLock = ReentrantLock()
private var state = ServerLikeState.CREATED
private val aliveActors = ConcurrentLinkedQueue<Actor>()
private val actorStates = ConcurrentHashMap<Actor, ActorState>()
private val runningActorThreadsLock = ReentrantLock()
private val runningActorThreads = mutableMapOf<Actor, ActorThread>()
private val executorService = Executors.newCachedThreadPool()

override val actorAwakener = object : ActorAwakener {
override fun isActorAwake(actor: Actor): Boolean {
if (!isActorAlive(actor)) {
throw ActorNotRegisteredInActorSystem(actor)
}

return runningActorThreadsLock.withLock {
runningActorThreads.containsKey(actor)
}
}

override fun wakeActorUp(actor: Actor) {
if (!isActorAlive(actor)) {
throw ActorNotRegisteredInActorSystem(actor)
}

runningActorThreadsLock.withLock {
// The lock is reentrant, so we do not deadlock when calling isActorAwake()
if (!isActorAwake(actor)) {
val actorThread = actorThreadFactory.createActorThread(
actor,
this@ActorSystemImpl, this@ActorSystemImpl
)
runningActorThreads[actor] = actorThread
executorService.execute(actorThread)
}
}
}
}

override fun registerActor(actor: Actor) {
if (isActorAlive(actor)) {
throw ActorAlreadyRegisteredInActorSystem(actor)
}
aliveActors.add(actor)
}

override fun createActorWithState(actorState: ActorState) = actorFactory.createActor().also {
registerActor(it)
bindActorToNewActorState(it, actorState)
messagingServer.registerMailbox(it.ref, mailboxFactory.createMailbox(this, it))
actorAwakener.wakeActorUp(it)
}

override fun bindActorToNewActorState(actor: Actor, newActorState: ActorState) {
if (!isActorAlive(actor)) {
throw ActorNotRegisteredInActorSystem(actor)
}
actorStates[actor] = newActorState
newActorState.onBind(actor)
}

override fun getActorState(actor: Actor): ActorState {
if (!isActorAlive(actor)) {
throw ActorNotRegisteredInActorSystem(actor)
}
return actorStates[actor] ?: throw NoActorStateException(actor)
}

override fun isActorAlive(actor: Actor) = aliveActors.contains(actor)

override fun killActor(actor: Actor) {
if (!isActorAlive(actor)) {
throw ActorNotRegisteredInActorSystem(actor)
}
aliveActors.remove(actor)
}

override fun onActorThreadExit(actor: Actor) {
runningActorThreadsLock.withLock {
runningActorThreads.remove(actor)

if (runningActorThreads.isEmpty()) {
eventListener.onEvent(AllLocalActorThreadsTimedOutEvent)
}
}
}

override fun onActorThreadException(e: Throwable) {
eventListener.onEvent(ActorThreadExceptionEvent(e))
}

override fun start() {
stateLock.withLock {
if (state == ServerLikeState.CREATED) {
state = ServerLikeState.STARTED
messagingServer.start()
}
}
}

override fun shutdown(timeoutMillis: Long) {
stateLock.withLock {
if (state == ServerLikeState.STARTED) {
state = ServerLikeState.STOPPED
executorService.shutdown()
executorService.awaitTermination(timeoutMillis / 2, TimeUnit.MILLISECONDS)
messagingServer.shutdown(timeoutMillis / 2)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.actorlang.actorsystem

import org.actorlang.actorsystem.events.ActorThreadEventListener

/**
* @param idleTimeout The timeout after which this thread stops waiting for messages (in milliseconds)
*/
class ActorThreadFactoryImpl(
private val idleTimeout: Long
) : ActorThreadFactory {

override fun createActorThread(
actor: Actor,
actorSystem: ActorSystem,
actorThreadEventListener: ActorThreadEventListener
): ActorThread {
return ActorThreadImpl(actor, idleTimeout, actorSystem, actorThreadEventListener)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.actorlang.actorsystem

import org.actorlang.actorsystem.events.ActorThreadEventListener
import org.actorlang.actorsystem.exception.ActorStateChangedException
import org.actorlang.actorsystem.messaging.Message

class ActorThreadImpl(
private val actor: Actor,
private val idleTimeout: Long,
private val actorSystem: ActorSystem,
private val actorThreadEventListener: ActorThreadEventListener
) : ActorThread {

override fun run() {
var loop = true

while (loop) {
val message = actorSystem.messagingServer.tryGetMessage(actor.ref, idleTimeout)

if (message.isPresent) {
val actorState = actorSystem.getActorState(actor)
handleMessage(message.get(), actorState)
}

if (!message.isPresent || !actorSystem.isActorAlive(actor)) {
loop = false
}
}

actorThreadEventListener.onActorThreadExit(actor)
}

private fun handleMessage(message: Message, actorState: ActorState) {
try {
actorState.handleMessage(message)
} catch (e: ActorStateChangedException) {
// We ignore it has it is made to short circuit the call stack
} catch (e: Exception) {
actorThreadEventListener.onActorThreadException(e)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package org.actorlang.actorsystem.messaging

import mu.KotlinLogging
import org.actorlang.actorsystem.ServerLikeState
import org.actorlang.actorsystem.messaging.exception.NoMailboxRegisteredForActorRefException
import org.actorlang.actorsystem.messaging.exception.NoMailboxRegisteredForMessageTargetException
import org.actorlang.actorsystem.messaging.exception.NonLocalActorRefException
import java.util.Optional
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.locks.ReentrantLock
import kotlin.Exception
import kotlin.concurrent.withLock

private val logger = KotlinLogging.logger {}

class LocalMessagingServer : MessagingServer {

private val stateLock = ReentrantLock()
private var state = ServerLikeState.CREATED
private val mailboxes = ConcurrentHashMap<LocalActorRef, Mailbox>()
private val pendingMessages = ArrayDeque<Message>()
private val pendingMessagesLock = ReentrantLock()
private val newMessageCondition = pendingMessagesLock.newCondition()
private var mustStop = false

private val dispatcherThread = Thread {
do {
try {
dispatchNextMessage()
} catch (e: Exception) {
logger.error("Exception thrown in dispatcher thread", e)
}
} while (!mustStop)
}

override fun sendMessage(message: Message) {
if (message.target !is LocalActorRef) {
throw NonLocalActorRefException()
}

pendingMessagesLock.withLock {
// When add the message to the head of the queue
pendingMessages.addFirst(message)
newMessageCondition.signal()
}
}

override fun tryGetMessage(actorRef: LocalActorRef, timeoutMillis: Long): Optional<Message> {
return mailboxes[actorRef]?.tryGetMessage(timeoutMillis)
?: throw NoMailboxRegisteredForActorRefException(actorRef)
}

override fun registerMailbox(actorRef: LocalActorRef, mailbox: Mailbox) {
mailboxes[actorRef] = mailbox
}

override fun unregisterMailbox(actorRef: LocalActorRef) {
mailboxes.remove(actorRef)
}

override fun start() {
stateLock.withLock {
if (state == ServerLikeState.CREATED) {
state = ServerLikeState.STARTED
dispatcherThread.start()
}
}
}

override fun shutdown(timeoutMillis: Long) {
stateLock.withLock {
if (state == ServerLikeState.STARTED) {
state = ServerLikeState.STOPPED
mustStop = true
pendingMessagesLock.withLock {
newMessageCondition.signal()
}
dispatcherThread.join(timeoutMillis)
}
}
}

private fun dispatchNextMessage() {
pendingMessagesLock.withLock {
if (pendingMessages.isEmpty()) {
newMessageCondition.await()
}

if (!mustStop) {
// We remove the message at the tail of the queue
val message = pendingMessages.removeLast()
// We already ensured that it is a local actor ref in sendMessage()
val target = message.target as LocalActorRef

val mailbox = mailboxes[target]
?: throw NoMailboxRegisteredForMessageTargetException(target)

mailbox.postMessage(message)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.actorlang.actorsystem.messaging

import org.actorlang.actorsystem.Actor
import org.actorlang.actorsystem.ActorSystem

class MailboxFactoryImpl : MailboxFactory {
override fun createMailbox(actorSystem: ActorSystem, actor: Actor): Mailbox {
return MailboxImpl(actorSystem, actor)
}
}

0 comments on commit 765150b

Please sign in to comment.