Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge pull request #1 from guardian/sns-endpoint

Added ability to receive events via SNS as well as via 0MQ and upgraded libraries from RCs, as they now have full release versions
  • Loading branch information...
commit 6180265985f7008b937a692f438214706b1c9b48 2 parents a26d2d2 + 0ddd570
@philwills philwills authored
View
72 app/controllers/SNS.scala
@@ -0,0 +1,72 @@
+package controllers
+
+import play.api.mvc.{Action, Controller}
+import play.api.libs.ws.WS
+import net.liftweb.json
+import json.DefaultFormats
+import json.JsonAST.JValue
+import org.joda.time.DateTime
+import lib.{Backend, Event}
+
+
+object SNS extends Controller {
+
+ def receive() = Action { request =>
+ implicit val formats = DefaultFormats ++ net.liftweb.json.ext.JodaTimeSerializers.all
+
+ request.body.asText map { text =>
+ val notification = json.parse(text).extract[SNSNotification]
+ notification.Type match {
+ case "Notification" => {
+ val pageViews = (json.parse(notification.Message)).extract[PageViews]
+ val events = pageViews.views map { view => Event(
+ ip = view.clientIp.getOrElse("-"),
+ dt = view.dt,
+ url = view.url,
+ method = "GET",
+ responseCode = 200,
+ referrer = view.documentReferrer,
+ userAgent = view.userAgent.getOrElse("-"),
+ geo = "-"
+ )}
+ for {
+ e <- events.filterNot(_.isSelfRefresh)
+ actor <- Backend.eventProcessors
+ } actor ! e
+
+ Ok("")
+ }
+ case "SubscriptionConfirmation" => {
+ WS.url("https://sns.eu-west-1.amazonaws.com/").withQueryString(
+ "Action" -> "ConfirmSubscription",
+ "TopicArn" -> notification.TopicArn,
+ "Token" -> notification.Token.get
+ ).get()
+ Ok("")
+ }
+ case _ => BadRequest("Unknown type found")
+ }
+ } getOrElse {
+ BadRequest("Expected some JSON")
+ }
+ }
+
+ case class PageViews(views: List[PageView])
+ case class PageView(
+ v: String,
+ dt: DateTime,
+ url: String,
+ documentReferrer: Option[String],
+
+ browserId: BrowserId,
+ userAgent: Option[String],
+ clientIp: Option[String],
+
+ // if this was a navigation, the following will be filled in
+ previousPage: Option[String],
+ previousPageSelector: Option[String],
+ previousPageElemHash: Option[String])
+ case class BrowserId(id: String)
+
+ case class SNSNotification(Message: String, TopicArn: String, Type: String, Token: Option[String])
+}
View
6 app/lib/Backend.scala
@@ -13,8 +13,8 @@ import ops._
object Backend {
implicit val system = ActorSystem("liveDashboard")
- val listener = system.actorOf(Props[ClickStreamActor], name = "clickStreamListener")
val calculator = system.actorOf(Props[Calculator], name = "calculator")
+ val listener = system.actorOf(Props[ClickStreamActor], name = "clickStreamListener")
val searchTerms = system.actorOf(Props[SearchTermActor], name = "searchTermProcessor")
val latestContent = new LatestContent
@@ -22,7 +22,9 @@ object Backend {
val ukFrontLinkTracker = new LinkTracker("http://www.guardian.co.uk")
val usFrontLinkTracker = new LinkTracker("http://www.guardiannews.com")
- val mqReader = new MqReader(listener :: searchTerms :: Nil)
+ val eventProcessors = listener :: searchTerms :: Nil
+
+ val mqReader = new MqReader(eventProcessors)
def start() {
system.scheduler.schedule(1 minute, 1 minute, listener, ClickStreamActor.TruncateClickStream)
View
2  app/lib/Event.scala
@@ -19,6 +19,8 @@ case class Event(
lazy val asLogString = """%15s [%s] "%s %s" %d "%s" "%s" "%s"""".format(
ip, Event.dateFormat.print(dt), method, url, responseCode, referrer.getOrElse("-"), userAgent, geo
)
+
+ def isSelfRefresh = referrer.exists(_.startsWith("http://www.guardian.co.uk" + path))
}
View
6 app/lib/MqReader.scala
@@ -41,7 +41,7 @@ class MqReader(consumers: List[ActorRef]) {
// only interested in "GET"'s
.filter { _.method == "GET" }
// remove "self refreshes"
- .filterNot { isSelfRefresh }
+ .filterNot { _.isSelfRefresh }
// remove common filter
.filterNot { e =>
e.path.endsWith(".ico") || e.path.endsWith(".xml") || e.path.endsWith(".swf") ||
@@ -58,8 +58,4 @@ class MqReader(consumers: List[ActorRef]) {
logger.info("Stopped!")
}
-
-
- def isSelfRefresh(e: Event) = e.referrer.exists(_.startsWith("http://www.guardian.co.uk" + e.path))
-
}
View
2  conf/routes
@@ -21,6 +21,8 @@ GET /api/counts controllers.Api.counts(callback: Option[Stri
GET /api/search controllers.Api.search(callback: Option[String], since: Long ?= 0)
GET /api/content controllers.Api.content(callback: Option[String], since: Long ?= 0)
+POST /incoming/sns controllers.SNS.receive()
+
# Map static resources from the /public folder to the /assets URL path
GET /assets/*file controllers.Assets.at(path="/public", file)
View
7 project/Build.scala
@@ -11,11 +11,12 @@ object ApplicationBuild extends Build {
"org.zeromq" %% "zeromq-scala-binding" % "0.0.3",
"org.scala-tools.time" %% "time" % "0.5",
"com.gu.openplatform" %% "content-api-client" % "1.13",
- "com.typesafe.akka" % "akka-agent" % "2.0-RC1",
+ "com.typesafe.akka" % "akka-agent" % "2.0",
"org.joda" % "joda-convert" % "1.1" % "provided",
"org.jsoup" % "jsoup" % "1.6.1",
- "net.liftweb" %% "lift-json" % "2.4-M4",
- "net.liftweb" %% "lift-json-ext" % "2.4-M4",
+ "net.liftweb" %% "lift-json" % "2.4",
+ "net.liftweb" %% "lift-json-ext" % "2.4",
+ "com.amazonaws" % "aws-java-sdk" % "1.3.4",
"org.specs2" %% "specs2" % "1.6.1" % "test"
)
View
2  project/plugins.sbt
@@ -4,5 +4,5 @@ resolvers ++= Seq(
"Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
)
-addSbtPlugin("play" % "sbt-plugin" % "2.0-RC3")
+addSbtPlugin("play" % "sbt-plugin" % "2.0")
Please sign in to comment.
Something went wrong with that request. Please try again.