Skip to content

Commit

Permalink
new Observable API (instead of the Event one)
Browse files Browse the repository at this point in the history
  • Loading branch information
volgar1x committed Nov 7, 2013
1 parent f43f3fb commit b2c7a7c
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 83 deletions.
68 changes: 0 additions & 68 deletions common/src/org/photon/common/Event.scala

This file was deleted.

67 changes: 67 additions & 0 deletions common/src/org/photon/common/Observable.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package org.photon.common

import scala.collection.mutable
import java.util.concurrent.locks.ReentrantReadWriteLock

object Observable {
type Listener = PartialFunction[Any, Any]
type UnitListener = PartialFunction[Any, Unit]
type Lid = Long

sealed case class UnavailableEventException(tpe: Symbol) extends RuntimeException
}

trait Observable {
import Observable._
import JavaConversion._

private var nextLid = 0L
private val l = new ReentrantReadWriteLock
private val subs = mutable.Map.empty[Symbol, mutable.Map[Lid, Listener]]

def subscribe(tpe: Symbol, fn: Listener): Lid = {
l.write {
val listeners = subs.getOrElse(tpe, throw UnavailableEventException(tpe))
nextLid += 1
listeners(nextLid) = fn
nextLid
}
}

def listen(tpe: Symbol)(fn: Listener): Lid = subscribe(tpe, fn)

def unsubscribe(tpe: Symbol, lid: Lid) {
l.write {
for (listeners <- subs.get(tpe)) {
listeners -= lid
}
}
}

def unsubscribe(tpe: Symbol, fn: Listener) {
l.write {
for (
listeners <- subs.get(tpe);
(lid, _) <- listeners.find { case (_, it) => it == fn }
) listeners -= lid
}
}

def emitted(tpe: Symbol, types: Symbol*) {
l.write {
for (t <- types :+ tpe) {
subs.getOrElseUpdate(t, mutable.Map.empty)
}
}
}

def emit(tpe: Symbol, args: Any = ()) {
l.read {
val listeners = subs.getOrElse(tpe, throw UnavailableEventException(tpe)).values
for (
listener <- listeners
if listener.isDefinedAt(args)
) listener(args)
}
}
}
9 changes: 5 additions & 4 deletions login/src/org/photon/login/HandlerComponentImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import com.typesafe.scalalogging.slf4j.Logger
import com.twitter.util.{Future, Throw, Return}
import org.slf4j.LoggerFactory
import org.photon.protocol.dofus.DofusProtocol
import org.photon.common.Event
import org.photon.common.Observable

trait HandlerComponentImpl extends HandlerComponent {
self: UserAuthenticationComponent with RealmManagerComponent =>
Expand All @@ -20,7 +20,8 @@ trait HandlerComponentImpl extends HandlerComponent {
s ! HelloConnectMessage(s.ticket)

case Disconnect(s) =>
s.subscriptions.unsubscribe(realmManager.updated)
s.realmUpdatedLid foreach (realmManager.unsubscribe('updated, _))
s.realmUpdatedLid = None
Future.Done
}

Expand All @@ -45,7 +46,7 @@ trait HandlerComponentImpl extends HandlerComponent {
case Return(user) =>
s.state = ServerSelectionState
s.userOption = Some(user)
s.subscriptions(realmManager.updated) = realmServerUpdated(s)
s.realmUpdatedLid = Some(realmManager.subscribe('updated, realmServerUpdated(s)))

s.transaction(
SetNicknameMessage(user.nickname),
Expand All @@ -66,7 +67,7 @@ trait HandlerComponentImpl extends HandlerComponent {
}


def realmServerUpdated(s: NetworkSession): Event.UnitListener = {
def realmServerUpdated(s: NetworkSession): Observable.UnitListener = {
case realm: RealmServer =>
s ! ServerListMessage(Seq(realm.infos))
}
Expand Down
4 changes: 2 additions & 2 deletions login/src/org/photon/login/NetworkComponent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ import java.nio.charset.Charset
import java.net.SocketAddress
import scala.collection.mutable
import scala.annotation.tailrec
import org.photon.common.EventSubscriptionBag
import org.photon.common.Observable

trait NetworkSession {
import NetworkSession._

var state: State
var realmUpdatedLid: Option[Observable.Lid]
var userOption: Option[User]
def user = userOption.get
def ticket: String
def subscriptions: EventSubscriptionBag

def service: NetworkService
def closeFuture: Future[NetworkSession]
Expand Down
4 changes: 2 additions & 2 deletions login/src/org/photon/login/NetworkComponentImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.twitter.util.Future
import org.apache.mina.core.session.IoSession
import org.apache.mina.transport.socket.nio.{NioProcessor, NioSocketAcceptor}
import java.net.InetSocketAddress
import org.photon.common.{EventSubscriptionBag, Strings, Async}
import org.photon.common.{Observable, Strings, Async}
import scala.collection.mutable
import org.apache.mina.core.service.IoHandlerAdapter
import org.apache.mina.filter.codec._
Expand Down Expand Up @@ -59,8 +59,8 @@ trait NetworkComponentImpl extends NetworkComponent {
import NetworkSession._

var state: State = VersionCheckState
var realmUpdatedLid: Option[Observable.Lid] = None
var userOption: Option[User] = None
val subscriptions = EventSubscriptionBag.empty

def service = networkService
val closeFuture = session.getCloseFuture.toTw(this)
Expand Down
6 changes: 3 additions & 3 deletions login/src/org/photon/login/RealmManagerComponent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package org.photon.login

import com.twitter.util.Future
import org.photon.protocol.dofus.login.{PlayersOfServer, Server}
import org.photon.common.Event
import org.photon.common.Observable

trait RealmServer {
def address: String
Expand All @@ -14,8 +14,8 @@ trait RealmServer {
def grantAccess(user: User, ticket: String): Future[Unit]
}

trait RealmManager extends Service {
def updated: Event
trait RealmManager extends Service with Observable {
emitted('updated)

def availableServers: Seq[Server]
def playerList(user: User): Future[Seq[PlayersOfServer]]
Expand Down
9 changes: 5 additions & 4 deletions login/src/org/photon/login/RealmManagerComponentImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import org.photon.protocol.dofus.login.{PlayersOfServer, Server, ServerState}
import com.typesafe.scalalogging.slf4j.Logging
import scala.collection.mutable
import org.apache.mina.transport.socket.nio.{NioProcessor, NioSocketAcceptor}
import org.photon.common.{Event, Async}
import org.photon.common.Async
import java.net.InetSocketAddress
import org.apache.mina.core.service.IoHandlerAdapter
import org.apache.mina.core.filterchain.IoFilter.NextFilter
Expand Down Expand Up @@ -37,7 +37,6 @@ trait RealmManagerComponentImpl extends RealmManagerComponent {
acceptor.getFilterChain.addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory))
acceptor.getFilterChain.addLast("logging", new RealmManagerLoggingImpl)

val updated = Event.newEvent
def find(serverId: Int) = servers.get(serverId)
def availableServers = servers.values.toStream filter (_.isAvailable) map (_.infos)
def playerList(user: User) = Future collect (servers.values.toSeq map (_.fetchPlayers(user)))
Expand Down Expand Up @@ -127,12 +126,14 @@ trait RealmManagerComponentImpl extends RealmManagerComponent {
case Message(s, InfosUpdateMessage(infos)) =>
val realm = s.attr[RealmServerImpl].get
realm.infosOption = Some(infos)
realmManager.updated(realm) flatMap { _ => s ! Ack }
realmManager.emit('updated, realm)
s ! Ack

case Message(s, StateUpdateMessage(state)) =>
val realm = s.attr[RealmServerImpl].get
realm.infosOption = Some(realm.infos.copy(state = state))
realmManager.updated(realm) flatMap { _ => s ! Ack }
realmManager.emit('updated, realm)
s ! Ack

case Message(s, PlayerListSuccessMessage(userId, nplayers)) =>
val realm = s.attr[RealmServerImpl].get
Expand Down

0 comments on commit b2c7a7c

Please sign in to comment.