Skip to content
This repository has been archived by the owner on Feb 8, 2023. It is now read-only.

Commit

Permalink
Add Get{OrCreate,OrFail,AndUpdate}Users
Browse files Browse the repository at this point in the history
  • Loading branch information
fedragon committed Apr 4, 2014
0 parents commit 707141a
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .gitignore
@@ -0,0 +1,4 @@
target/*
project/project/*
project/target/*
*.sublime-project
97 changes: 97 additions & 0 deletions README.md
@@ -0,0 +1,97 @@
# Spark Jobserver Experiments

Experiments with [Apache Spark](http://spark.apache.org) and [Ooyala's Spark Job Server](https://github.com/ooyala/spark-jobserver).

## Reasons for Ooyala's Spark Job Server:

* Allows you to share Spark Contexts between jobs (!!);
* Provides a RESTful API to manage jobs, contexts and jars.

## Goal

Let's find out the Top 5 Stack Overflow users (by sheer reputation!)

In this example there are 3 implementations of `spark.jobserver.SparkJob`: their goal is to get the top 5 users out of the `users` RDD and they describe 3 functionalities provided by `spark.jobserver.NamedRddSupport`:

* GetOrCreateUsers: tries to get the RDD **or creates it**, if it doesn't exist;
* GetOrFailUsers: tries to get the RDD **or throws an exception**, if it doesn't exist;
* GetAndUpdateUsers: tries to get the RDD **and updates it** to only include users which signed up in the last 100 days, then returns the top 5 users; throws an exception, if the RDD doesn't exist.

## Prerequisites

### Download StackOverflow's users file

Download `stackoverflow.com-Users.7z` from [Stack Exchange Data Dump](https://archive.org/details/stackexchange), uncompress it and put it in `/tmp`.

### Clone Ooyala's Spark Job Server

$ git clone https://github.com/ooyala/spark-jobserver
$ cd spark-jobserver

#### Publish it to your local repository and run it

$ sbt publish-local
$ sbt
> re-start

This will start a background server process which will run until you close sbt.

### Clone this project and package it

$ git clone https://github.com/fedragon/spark-jobserver-example
$ sbt package

## Get your hands dirty with Ooyala's Spark Jobserver

### Deploy our jar

curl --data-binary @target/scala-2.10/spark-jobserver-example_2.10-1.0.0.jar localhost:8090/jars/sparking

curl 'localhost:8090/jars'

### Create the context that will be shared

curl -X POST 'localhost:8090/contexts/users-context'

curl 'localhost:8090/contexts'

### Run our jobs

#### 0. How to check job status/response:

To find a single job you can use:

curl 'localhost:8090/jobs/<job-id>''

the actual `job-id` can be found in the response you get when you run the job (see below).

In alternative, you can find all the jobs using:

curl 'localhost:8090/jobs'


#### 1. GetOrFailUsers

curl -X POST 'localhost:8090/jobs?appName=sparking&classPath=sparking.jobserver.GetOrFailUsers&context=users-context'

Check the job status/response as described above: if you are following this README and you are running this job before any other, it will fail (as intended) because the `users` RDD does not exist yet.

#### 2. GetOrCreateUsers

curl -X POST 'localhost:8090/jobs?appName=sparking&classPath=sparking.jobserver.GetOrCreateUsers&context=users-context'

Check the job status/response as described above: once the job completes, the response will contain the top 5 users.

**Note:** now that the `users` RDD has been created and cached, you can re-run GetOrFailUsers and see it complete successfully (and fast)!

#### 3. GetAndUpdateUsers

curl -X POST 'localhost:8090/jobs?appName=sparking&classPath=sparking.jobserver.GetAndUpdateUsers&context=users-context'

Check the job status/response as described above: once the job completes, it will return the top 5 users among those who signed up in the last 100 days.

### Check jobs' completion times

curl 'localhost:8090/jobs'

You should now see a big difference between the time it took the first job (= the one that actually created the `users` RDD) to complete and the other jobs' times.
24 changes: 24 additions & 0 deletions build.sbt
@@ -0,0 +1,24 @@
name := "spark-jobserver-examples"

version := "1.0.0"

scalacOptions ++= Seq("-deprecation")

lazy val buildSettings = Seq(
version := "0.1-SNAPSHOT",
organization := "com.github.fedragon.sparking.jobserver",
scalaVersion := "2.10.3"
)

libraryDependencies ++= Seq (
"joda-time" % "joda-time" % "2.3",
"org.joda" % "joda-convert" % "1.2",
("org.apache.spark" %% "spark-core" % "0.9.0-incubating").
exclude("org.mortbay.jetty", "servlet-api").
exclude("commons-beanutils", "commons-beanutils-core").
exclude("commons-collections", "commons-collections").
exclude("com.esotericsoftware.minlog", "minlog").
exclude("junit", "junit").
exclude("org.slf4j", "log4j12"),
"ooyala.cnd" % "job-server" % "0.3.0"
)
21 changes: 21 additions & 0 deletions src/main/scala/sparking/jobserver/User.scala
@@ -0,0 +1,21 @@
package sparking
package jobserver

import org.joda.time.{Days, LocalDate}
import org.joda.time.format.DateTimeFormat

case class User(displayName: String, reputation: Int, activityDays: Int)

object User {

// This is never going to be the best regex ever seen, but it's good enough for this example.
val Regex = """^.*Reputation="([0-9]+)" CreationDate="([0-9]{4}-[0-9]{2}-[0-9]{2})T.*" DisplayName="(.+)" LastAccess.*$""".r
val DateTimeFormatter = DateTimeFormat.forPattern("YYYY-MM-dd")

def fromRow(row: String) = row match {
case Regex(reputation, activityDays, displayName) =>
val activity = Days.daysBetween(LocalDate.parse(activityDays, DateTimeFormatter), LocalDate.now)
Some(User(displayName, reputation.toInt, activity.getDays))
case _ => None
}
}
20 changes: 20 additions & 0 deletions src/main/scala/sparking/jobserver/UsersRDDBuilder.scala
@@ -0,0 +1,20 @@
package sparking
package jobserver

import org.apache.spark.SparkContext, SparkContext._
import org.apache.spark.rdd.RDD

trait UsersRDDBuilder {

val inputPath = "/tmp/stackoverflow.com-Users"

def build(sc: SparkContext): RDD[(Reputation, User)] = {
sc.textFile(inputPath).
map(User.fromRow).
collect {
case Some(user) => user.reputation -> user
}.
sortByKey(ascending = false)
}

}
52 changes: 52 additions & 0 deletions src/main/scala/sparking/jobserver/UsersSparkJobs.scala
@@ -0,0 +1,52 @@
package sparking
package jobserver

import org.apache.spark.SparkContext, SparkContext._
import org.apache.spark.rdd.RDD

import spark.jobserver._
import com.typesafe.config.Config

trait UsersSparkJob extends SparkJob with UsersRDDBuilder with NamedRddSupport {
val rddName = "users"

def validate(sc: SparkContext, config: Config): SparkJobValidation = SparkJobValid
}

object GetOrCreateUsers extends UsersSparkJob {

override def runJob(sc: SparkContext, config: Config) = {
val users: RDD[(Reputation, User)] = namedRdds.getOrElseCreate(
rddName,
build(sc))

users.take(5)
}
}

object GetOrFailUsers extends UsersSparkJob {

override def runJob(sc: SparkContext, config: Config) = {
val users: Option[RDD[(Reputation, User)]] = namedRdds.get(rddName)

users.map(_.take(5)).getOrElse(throw new IllegalStateException(s"RDD [$rddName] does not exist!"))
}
}

object GetAndUpdateUsers extends UsersSparkJob {

override def runJob(sc: SparkContext, config: Config) = {
val users: Option[RDD[(Reputation, User)]] = namedRdds.get(rddName)

users.map { rdd =>
val newcomers = namedRdds.update(
rddName,
rdd.filter {
case (_, user) => user.activityDays < 100
})

newcomers.take(5)
}.getOrElse(throw new IllegalStateException(s"RDD [$rddName] does not exist"))

}
}
7 changes: 7 additions & 0 deletions src/main/scala/sparking/jobserver/package.scala
@@ -0,0 +1,7 @@
package sparking

package object jobserver {

type Reputation = Int

}

0 comments on commit 707141a

Please sign in to comment.