From b63a834a4136674bfb56882b8900ed4928468bc4 Mon Sep 17 00:00:00 2001 From: Edi Weissmann Date: Fri, 15 Jun 2012 22:26:19 +0200 Subject: [PATCH 1/3] gitignore IDEA files and convert to sbt 0.11.3 --- .gitignore | 3 +++ project/build.properties | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 8acee61..d4f68f4 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,6 @@ dist .settings .target .cache +*.iml +*.ipr +*.iws diff --git a/project/build.properties b/project/build.properties index f4ff7a5..d428711 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=0.11.2 +sbt.version=0.11.3 From 4e8b234899f1157d8b3142aba428a0ff7ba2ef07 Mon Sep 17 00:00:00 2001 From: Edi Weissmann Date: Sat, 16 Jun 2012 01:04:04 +0200 Subject: [PATCH 2/3] challenge #2: topics and subcribers via websockets --- .../TopicSubscriberWebSockets.scala | 109 ++++++++++++++++++ app/views/websockets/index.scala.html | 2 +- conf/routes | 1 + 3 files changed, 111 insertions(+), 1 deletion(-) create mode 100644 app/controllers/TopicSubscriberWebSockets.scala diff --git a/app/controllers/TopicSubscriberWebSockets.scala b/app/controllers/TopicSubscriberWebSockets.scala new file mode 100644 index 0000000..51457eb --- /dev/null +++ b/app/controllers/TopicSubscriberWebSockets.scala @@ -0,0 +1,109 @@ +package controllers + +import play.api.mvc.{WebSocket, Controller} +import play.api.libs.iteratee.{Iteratee, Enumerator} +import akka.actor.{Props, ActorSystem, Actor} +import scala.collection.mutable +import play.api.libs.concurrent.Promise +import akka.util.duration._ +import java.util +import management.ManagementFactory + +/** + * + * Challenge #2: Clients subscribe to different topics (weather news and system load info) through websockets and receive updates every X seconds + * + * @author: Edi Weissmann + */ +object TopicSubscriberWebSockets extends Controller { + + /** + * Actor messages + */ + case class SubscribeTopic(topic:Topic, client:util.UUID) + case class UnsubscribeTopic(topic:Topic, client:util.UUID) + + val subscribers = mutable.Map[util.UUID, mutable.Set[Topic]]() + + /** + * Actor that manages clients and their subscriptions + */ + class TopicActor extends Actor { + def receive = { + case SubscribeTopic(topic, client) => { + subscribers.getOrElseUpdate(client, mutable.Set[Topic]()).add(topic) + } + case UnsubscribeTopic(topic, client) => { + subscribers.getOrElseUpdate(client, mutable.Set[Topic]()).remove(topic) + } + } + } + + /** + * Topics - System load and Weather + */ + trait Topic + object SystemLoad extends Topic { + val statsSystem = ManagementFactory.getOperatingSystemMXBean + override def toString = "System load [%s]".format(statsSystem.getSystemLoadAverage) + } + object WeatherForecast extends Topic { + override def toString = "Weather [sunny all day]" + } + object Topic { + def apply(name:String):Option[Topic] = name match { + case "weather" => Some(WeatherForecast) + case "system" => Some(SystemLoad) + case _ => None + } + } + + /** + * Akka actor bootstrap + */ + val system = ActorSystem("CpuAndOrSystemLoad") + val topicActor = system.actorOf(Props[TopicActor], name = "topicActor") + + + def topicSubscribers() = WebSocket.using[String] { request => + + val client = util.UUID.randomUUID() + + /** + * Prints topic updates to subscribed clients + */ + val topicOut = Enumerator.fromCallback { () => + Promise.timeout({ + Some(subscribers.getOrElse(client, Set()).mkString(" ")) + }, 3 seconds) + } + + /** + * Prints usage options + */ + val usageOut = Enumerator("Commands are: 'subscribe' and 'unsubscribe'.\nTopics are: 'weather' and 'system'\nType your command: subscribe weather") + + /** + * Processes incoming commands + */ + val in = Iteratee.foreach[String] { msg => { + val Command = """(\w+) (\w+)""".r + msg match { + case Command(action, topicName) => + Topic(topicName) match { + case Some(topic) => + action match { + case "subscribe" => topicActor ! SubscribeTopic(topic, client) + case "unsubscribe" => topicActor ! UnsubscribeTopic(topic, client) + case _ => // unknown action + } + case _ => // unknown topic + } + case _ => // unknown, noop + } + }} + + + (in, topicOut >- usageOut) + } +} diff --git a/app/views/websockets/index.scala.html b/app/views/websockets/index.scala.html index 42ae763..411f637 100644 --- a/app/views/websockets/index.scala.html +++ b/app/views/websockets/index.scala.html @@ -45,7 +45,7 @@

- +

diff --git a/conf/routes b/conf/routes index 72deb64..5a4a586 100644 --- a/conf/routes +++ b/conf/routes @@ -18,6 +18,7 @@ GET /websockets/logging controllers.WebSockets.logging() GET /websockets/echo controllers.WebSockets.echo() GET /websockets/counter controllers.WebSockets.counter() GET /websockets/echo-and-counter controllers.WebSockets.echoAndCounter() +GET /websockets/topic-subscribers controllers.TopicSubscriberWebSockets.topicSubscribers GET /twitter/stream controllers.Twitter.stream(keywords) From f4201019c8dbe7ca575b79c4098f57fec00df7d4 Mon Sep 17 00:00:00 2001 From: Edi Weissmann Date: Sat, 16 Jun 2012 17:59:34 +0200 Subject: [PATCH 3/3] Simplified, no actors needed --- .../TopicSubscriberWebSockets.scala | 40 ++++--------------- 1 file changed, 7 insertions(+), 33 deletions(-) diff --git a/app/controllers/TopicSubscriberWebSockets.scala b/app/controllers/TopicSubscriberWebSockets.scala index 51457eb..4245956 100644 --- a/app/controllers/TopicSubscriberWebSockets.scala +++ b/app/controllers/TopicSubscriberWebSockets.scala @@ -17,28 +17,6 @@ import management.ManagementFactory */ object TopicSubscriberWebSockets extends Controller { - /** - * Actor messages - */ - case class SubscribeTopic(topic:Topic, client:util.UUID) - case class UnsubscribeTopic(topic:Topic, client:util.UUID) - - val subscribers = mutable.Map[util.UUID, mutable.Set[Topic]]() - - /** - * Actor that manages clients and their subscriptions - */ - class TopicActor extends Actor { - def receive = { - case SubscribeTopic(topic, client) => { - subscribers.getOrElseUpdate(client, mutable.Set[Topic]()).add(topic) - } - case UnsubscribeTopic(topic, client) => { - subscribers.getOrElseUpdate(client, mutable.Set[Topic]()).remove(topic) - } - } - } - /** * Topics - System load and Weather */ @@ -58,23 +36,19 @@ object TopicSubscriberWebSockets extends Controller { } } - /** - * Akka actor bootstrap - */ - val system = ActorSystem("CpuAndOrSystemLoad") - val topicActor = system.actorOf(Props[TopicActor], name = "topicActor") - - def topicSubscribers() = WebSocket.using[String] { request => - val client = util.UUID.randomUUID() + /** + * Topics to which current client is subscribed to + */ + val topics = mutable.Set[Topic]() /** * Prints topic updates to subscribed clients */ val topicOut = Enumerator.fromCallback { () => Promise.timeout({ - Some(subscribers.getOrElse(client, Set()).mkString(" ")) + Some(topics.mkString(" ")) }, 3 seconds) } @@ -93,8 +67,8 @@ object TopicSubscriberWebSockets extends Controller { Topic(topicName) match { case Some(topic) => action match { - case "subscribe" => topicActor ! SubscribeTopic(topic, client) - case "unsubscribe" => topicActor ! UnsubscribeTopic(topic, client) + case "subscribe" => topics.add(topic) + case "unsubscribe" => topics.remove(topic) case _ => // unknown action } case _ => // unknown topic