Permalink
Browse files

xmppbots - work on getting list of services from the service registry

  • Loading branch information...
asimihsan committed May 13, 2012
1 parent 3e22d59 commit 5902374587729983a9ad7d17359db01f234606d7
@@ -2,28 +2,33 @@ import sbt._
import Keys._
object BuildSettings {
- val buildOrganization = "com.ai"
- val buildVersion = "0.1"
- val buildScalaVersion = "2.9.2"
-
- val buildSettings = Defaults.defaultSettings ++ Seq (
- organization := buildOrganization,
- version := buildVersion,
- scalaVersion := buildScalaVersion
- )
+ val buildOrganization = "com.ai"
+ val buildVersion = "0.1"
+ val buildScalaVersion = "2.9.2"
+ val buildJavacOptions = Seq("-Xlint:all", "-deprecation")
+ val buildScalacOptions = Seq("-unchecked", "-deprecation", "-optimise", "-explaintypes")
+
+ val buildSettings = Defaults.defaultSettings ++ Seq (
+ organization := buildOrganization,
+ version := buildVersion,
+ scalaVersion := buildScalaVersion,
+ javacOptions ++= buildJavacOptions,
+ scalacOptions ++= buildScalacOptions
+ )
}
object Resolvers {
- val akkaRepo = "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
+ val akkaRepo = "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
}
object Dependencies {
- val akka_version = "2.0.1"
- val smack = "org.igniterealtime.smack" % "smack" % "3.2.1"
- val smackx = "org.igniterealtime.smack" % "smackx" % "3.2.1"
- val akka_actor = "com.typesafe.akka" % "akka-actor" % akka_version
- val http = "net.databinder" %% "dispatch-http" % "0.8.8"
- val json = "net.liftweb" % "lift-json_2.8.0" % "2.4"
+ val akka_version = "2.0.1"
+ val smack = "org.igniterealtime.smack" % "smack" % "3.2.1"
+ val smackx = "org.igniterealtime.smack" % "smackx" % "3.2.1"
+ val akka_actor = "com.typesafe.akka" % "akka-actor" % akka_version
+ val http = "net.databinder" %% "dispatch-http" % "0.8.8"
+ val json = "net.liftweb" % "lift-json_2.8.0" % "2.4"
+ val scalaj_http = "org.scalaj" %% "scalaj-http" % "0.3.1"
}
object BuildSetup extends Build {
@@ -36,7 +41,8 @@ object BuildSetup extends Build {
smackx,
akka_actor,
http,
- json
+ json,
+ scalaj_http
)
val res = Seq(
@@ -13,8 +13,12 @@ object Main{
//val ls = system.actorOf(Props(new Ls), name = "ls")
//val interpret = system.actorOf(Props(new Interpret), name = "interpret")
+ val serviceRegistryURI = "http://mink.datcon.co.uk:10000"
+ val serviceRegistry = system.actorOf(Props(new ServiceRegistryActor(serviceRegistryURI)),
+ name = "serviceRegistry")
+
implicit val config = new ArgsConfig(args)
- val manager = system.actorOf(Props(new XMPPAgentManager()), name = "manager")
+ val manager = system.actorOf(Props(new XMPPAgentManager(serviceRegistry)), name = "manager")
println("Press any key to stop.")
Thread.sleep(60000)
@@ -8,8 +8,15 @@ import org.jivesoftware.smackx.muc.{DiscussionHistory, MultiUserChat}
import org.jivesoftware.smack.provider.ProviderManager
import org.jivesoftware.smackx.provider.{MUCUserProvider, MUCAdminProvider, MUCOwnerProvider}
import org.jivesoftware.smackx.GroupChatInvitation
+import org.jivesoftware.smackx.muc.{InvitationListener, ParticipantStatusListener, DefaultParticipantStatusListener}
import collection.JavaConversions._
import ai.agent.SmackConversions._
+import akka.util.duration._
+import akka.util.Timeout
+import akka.dispatch.Await
+import akka.dispatch.Future
+import akka.pattern.ask
+import akka.pattern.AskTimeoutException
case class ChatSink(chat: Chat, connection: XMPPConnection) extends Sink {
override def output(ans: Any) = {
@@ -31,8 +38,24 @@ case class XMPPAgentManager(agents: ActorRef*)(implicit config: Config)
with MessageListener
with akka.actor.ActorLogging {
+ override def postStop() {
+ log.debug("postStop entry.")
+ connection.disconnect()
+ }
+
log.debug("starting.")
+ ServiceRegistryActor.ping(
+ context = context,
+ onSuccessCallback = result => {
+ log.debug("Received ServiceRegistryPong. contents: %s".format(result.contents))
+ },
+ onAskTimeoutExceptionCallback = exception => {
+ log.error("serviceRegistry did not respond in time")
+ context.actorFor("/user/manager") ! akka.actor.PoisonPill
+ }
+ )
+
// -------------------------------------------------------------------------
// In order to use the InvitationListener on MultiUserChat we need to
// set up the Providers. Typically this is done by default in a conf
@@ -67,10 +90,11 @@ case class XMPPAgentManager(agents: ActorRef*)(implicit config: Config)
// If someone invies us to a multi-user chat then unconditionally join
// the chat and output a message.
// ------------------------------------------------------------------------
- MultiUserChat.addInvitationListener(connection, (e: InvitationListenerArguments) => {
+ MultiUserChat.addInvitationListener(connection, invitationListener)
+ lazy val invitationListener = (e: InvitationListenerArguments) => {
log.debug("invitationReceived. room: %s, inviter: %s, reason: %s, password: %s, message: %s".format(e.room, e.inviter, e.reason, e.password, e.message));
- val chat = new MultiUserChat(e.conn, e.room)
- val history: DiscussionHistory = new DiscussionHistory()
+ var chat = new MultiUserChat(e.conn, e.room)
+ var history = new DiscussionHistory()
history.setMaxStanzas(0)
val nickname = username.split("@").head
chat.join(nickname, e.password, history, SmackConfiguration.getPacketReplyTimeout())
@@ -82,19 +106,21 @@ case class XMPPAgentManager(agents: ActorRef*)(implicit config: Config)
// this is flaky, so in the background poll the participants of the
// room and leave if the inviter is no longer part of that list.
// --------------------------------------------------------------------
- chat.addParticipantStatusListener((e: ParticipantStatusListenerLeftArguments) => {
+ chat.addParticipantStatusListener(participantStatusListener)
+ lazy val participantStatusListener: ParticipantStatusListener = (e: ParticipantStatusListenerLeftArguments) => {
log.debug("ParticipantStatusListener: participant %s left.".format(e.participant))
if (e.participant == inviterAsParticipant) {
log.debug("User who invited me left, so I'm leaving too.")
chat.leave()
+ chat.removeParticipantStatusListener(participantStatusListener)
+ chat = null
+ history = null
}
- })
+ }
// --------------------------------------------------------------------
-
- })
+ }
// ------------------------------------------------------------------------
-
connection.getChatManager.addChatListener(this)
val roster: Roster = connection.getRoster()
@@ -122,11 +148,6 @@ case class XMPPAgentManager(agents: ActorRef*)(implicit config: Config)
//self !(message.getBody, ChatSink(chat, connection))
}
- //Actor shutdown hook
- override def postStop() {
- connection.disconnect()
- log.debug("Disconnected.")
- }
}
@@ -0,0 +1,115 @@
+package ai.agent
+
+import akka.actor.Actor
+import akka.actor.Props
+import akka.event.Logging
+import akka.dispatch.Future
+import akka.pattern.AskTimeoutException
+import akka.util.duration._
+import akka.util.Duration
+import akka.actor.ActorContext
+import akka.pattern.ask
+import akka.actor.ActorSystem
+import akka.actor.PoisonPill
+
+import scalaj.http.{Http, HttpOptions}
+import net.liftweb.json.JsonParser
+import java.io.InputStreamReader
+
+sealed class XmppBotMessage
+
+case class ServiceRegistryGetHostnames(getListOfServicesURI: String) extends XmppBotMessage
+case class ServiceRegistryGetServicesForHostname(hostname: String) extends XmppBotMessage
+case class ServiceRegistryPing(contents: String) extends XmppBotMessage
+case class ServiceRegistryPong(contents: String) extends XmppBotMessage
+
+case class HTTPRequestGETRequest(URI: String) extends XmppBotMessage
+case class HTTPRequestGETResponse() extends XmppBotMessage
+case class HTTPRequestSuicide() extends XmppBotMessage
+
+object HTTPRequestActor {
+ def getJSONFromURI(URI: String) = {
+ Http(URI).
+ option(HttpOptions.connTimeout(1000)).
+ option(HttpOptions.readTimeout(5000)) { inputStream =>
+ val parsed = JsonParser.parse(new InputStreamReader(inputStream))
+ parsed
+ }
+ }
+}
+
+case class HTTPRequestActor(URI: String)
+ extends akka.actor.Actor
+ with akka.actor.ActorLogging {
+
+ override def preStart = {
+ log.debug("preStart entry. URI: %s".format(URI))
+ ActorSystem("XMPPBot").scheduler.scheduleOnce(10 seconds) {
+ self ! PoisonPill
+ }
+ }
+
+ override def postStop = {
+ log.debug("postStop entry. URI: %s".format(URI))
+ }
+
+ log.debug("entry. URI: %s".format(URI))
+
+ def receive = {
+ case HTTPRequestGETRequest(URI) => {
+ val parsed = HTTPRequestActor.getJSONFromURI(URI)
+ log.debug("parsed: %s".format(parsed))
+ }
+ }
+}
+
+object ServiceRegistryActor {
+ def ping(context: ActorContext,
+ actorPath: String = "/user/serviceRegistry",
+ timeout: Duration = 5 seconds,
+ contents: String = "hello",
+ onSuccessCallback: (ServiceRegistryPong) => Unit,
+ onAskTimeoutExceptionCallback: (AskTimeoutException) => Unit) = {
+
+ val message = ServiceRegistryPing(contents)
+ val future = context.actorFor(actorPath).ask(message)(timeout)
+ future onComplete {
+ case Right(result: ServiceRegistryPong) => onSuccessCallback(result)
+ case Left(exception: AskTimeoutException) => onAskTimeoutExceptionCallback(exception)
+ }
+ future
+ }
+
+ def getListOfServices(listOfServicesURI: String) = {
+
+ }
+}
+
+case class ServiceRegistryActor(serviceRegistryURI: String)
+ extends Actor
+ with akka.actor.ActorLogging {
+
+ val getListOfServicesURI = "%s/list_of_services".format(serviceRegistryURI)
+
+ override def preStart = {
+ log.debug("preStart entry.")
+ self ! ServiceRegistryGetHostnames(getListOfServicesURI)
+ }
+
+ override def postStop = {
+ log.debug("postStop entry.")
+ }
+
+ def receive = {
+ case ServiceRegistryGetHostnames(getListOfServicesURI) => {
+ log.debug("ServiceRegistryGetHostnames received.")
+ val httpRequestActor = context.actorOf(Props(new HTTPRequestActor(getListOfServicesURI)))
+ httpRequestActor ! HTTPRequestGETRequest(getListOfServicesURI)
+ }
+ case ServiceRegistryPing(contents) => {
+ log.debug("ServiceRegistryPing received. Contents: %s".format(contents))
+ sender ! ServiceRegistryPong(contents)
+ }
+ }
+} // class ServiceRegistryActor
+
@@ -2,7 +2,7 @@ package ai.agent
import org.jivesoftware.smack.Connection
import org.jivesoftware.smack.packet.{Packet, Message}
-import org.jivesoftware.smackx.muc.{InvitationListener, DefaultParticipantStatusListener}
+import org.jivesoftware.smackx.muc.{InvitationListener, ParticipantStatusListener, DefaultParticipantStatusListener}
object SmackConversions {

0 comments on commit 5902374

Please sign in to comment.