# test env setting

In [1]:
import org.apache.spark.sql.SparkSession

object sparkSqlDemo {
    val sparkSession = SparkSession.builder()
        .master("local[1]")
        .appName("spark session example")
        .getOrCreate()

    def main(args: Array[String]) {
        val input = sparkSession.read.json("jsonSample.json")
        input.createOrReplaceTempView("Cars1")
        val result = sparkSession.sql("select * from Cars1")
        result.show()
    }
}

sparkSqlDemo.main(Array()) 

defined object sparkSqlDemo


+---+-------+
|age|   name|
+---+-------+
| 30|   John|
| 28|    Tom|
| 35|    Jim|
| 40|  Randy|
| 56|  Bryan|
| 22|Shannon|
| 33|   Rick|
+---+-------+



# lending club

In [1]:
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType

In [9]:
case class LoanType (
                      loan_amnt: Option[String],
                      term: Option[String],
                      int_rate: Option[String],
                      installment: Option[String],
                      home_ownership: Option[String],
                      annual_inc: Option[String],
                      DTI: Option[String],
                      addr_state: Option[String],
                      emp_length: Option[String],
                      title: Option[String],
                      has_collection: Option[Int]
                    )

defined class LoanType


In [24]:
def readLoanData(inputPath: String, spark: SparkSession): Dataset[LoanType] = {

    import spark.implicits._

    val rawData = spark.read.option("header", "true").csv(inputPath)

    printf("reading data from %s".format(inputPath))

    val filteredRawDf = rawData
      .filter($"loan_status" =!= "Fully Paid") //not fully paid

    val fields = List("loan_amnt", "term", "int_rate", "installment", "home_ownership",
      "annual_inc", "emp_length", "title", "addr_state", "loan_status", "tot_coll_amt").map(col)

    filteredRawDf.select(fields: _*)
      .withColumn("has_collection", when($"tot_coll_amt" =!= "0", 1).otherwise(0).as("has_collection"))
      .withColumn("DTI", $"installment"/($"annual_inc"/12))
      .drop("loan_status")
      .drop("tot_coll_amt")
      .as[LoanType]
  }

readLoanData: (inputPath: String, spark: org.apache.spark.sql.SparkSession)org.apache.spark.sql.Dataset[LoanType]


In [25]:
//sjlakfjdl;sk
def readRejectionData(inputPath: String, spark: SparkSession): Dataset[LoanType] = {

    import spark.implicits._

    val rawData = spark.read.option("header", "true").csv(inputPath)

    printf("reading data from %s".format(inputPath))

    val fields = List("Amount Requested", "Loan Title", "Debt-To-Income Ratio", "State", "Employment Length").map(col)

    rawData.select(fields: _*)
      .withColumnRenamed("Amount Requested", "loan_amnt")
      .withColumnRenamed("Loan Title", "title")
      .withColumnRenamed("Debt-To-Income Ratio", "DTI")
      .withColumnRenamed("State", "addr_state")
      .withColumnRenamed("Employment Length", "emp_length")
      .withColumn("term", lit(null: StringType))
      .withColumn("int_rate", lit(null: StringType))
      .withColumn("installment", lit(null: StringType))
      .withColumn("home_ownership", lit(null: StringType))
      .withColumn("annual_inc", lit(null: StringType))
      .withColumn("has_collection", lit(0))
      .as[LoanType]
  }

readRejectionData: (inputPath: String, spark: org.apache.spark.sql.SparkSession)org.apache.spark.sql.Dataset[LoanType]


In [22]:
def loanInfoAggregator(rejectionDs: Dataset[LoanType], loanDs: Dataset[LoanType], spark: SparkSession): DataFrame = {
    import spark.implicits._

    val unionDs = rejectionDs.unionByName(loanDs)

    val aggregatedDf = unionDs.groupBy("term", "home_ownership", "addr_state", "title", "emp_length")
      .agg(avg($"loan_amnt").as("avg_loan_amnt"),
        avg($"int_rate").as("avg_int_rate"),
        avg($"annual_inc").as("avg_annual_inc"),
        avg($"DTI").as("avg_DTI"),
        sum($"has_collection").as("sum_has_collection"),
        avg($"installment").as("avg_installment")
      )

    aggregatedDf
  }

loanInfoAggregator: (rejectionDs: org.apache.spark.sql.Dataset[LoanType], loanDs: org.apache.spark.sql.Dataset[LoanType], spark: org.apache.spark.sql.SparkSession)org.apache.spark.sql.DataFrame


In [26]:
def writeLoanAggregatedData(outputDataframe: DataFrame, outputPath: String): Unit = {
    outputDataframe.repartition(1).write.json(outputPath)
  }

writeLoanAggregatedData: (outputDataframe: org.apache.spark.sql.DataFrame, outputPath: String)Unit


In [31]:
val spark = SparkSession
  .master("local[*]")
  .builder()
  .appName("Loan-analyze")
  .getOrCreate()

val loanInputPath = "/Users/jenny/Desktop/a/LoanStats_2019Q1.csv"
val rejectionInputPath = "/Users/jenny/Desktop/a/RejectStats_2019Q1.csv"
val outputPath = "/Users/jenny/Desktop/b"

val loanDs = readLoanData(loanInputPath, spark)

val rejectionDs = readRejectionData(rejectionInputPath, spark)

val aggregatedDf = loanInfoAggregator(rejectionDs, loanDs, spark)

writeLoanAggregatedData(aggregatedDf, outputPath)

reading data from /Users/jenny/Desktop/a/LoanStats_2019Q1.csvreading data from /Users/jenny/Desktop/a/RejectStats_2019Q1.csv

spark = org.apache.spark.sql.SparkSession@b8c2568
loanInputPath = /Users/jenny/Desktop/a/LoanStats_2019Q1.csv
rejectionInputPath = /Users/jenny/Desktop/a/RejectStats_2019Q1.csv
outputPath = /Users/jenny/Desktop/b
loanDs = [loan_amnt: string, term: string ... 9 more fields]
rejectionDs = [loan_amnt: string, title: string ... 9 more fields]
aggregatedDf = [term: string, home_ownership: string ... 9 more fields]


[term: string, home_ownership: string ... 9 more fields]

# city bike

- Unique-user: 

the first time user used bike share

daily updated；Unique-user size will be larger and larger once there are daily new users

- Out-put:

[folder] average usage duration on the day

[1-day-retention folder] user_id

[3-day-retention folder] user_id

[7-day-retention folder] user_d

In [6]:
val bike = spark.read.json("/Users/jenny/Downloads/bike-bike-trips.json")
val user = bike.withColumnRenamed("bike_number", "user_id").withColumnRenamed("start_date", "start_timestamp")
import spark.implicits._
import org.apache.spark.sql.functions._
val userDate = user.withColumn("start_date", to_date($"start_timestamp", "yyyy-MM-dd"))
userDate.write.partitionBy("start_date").json("/Users/jenny/Desktop/bike_data")

bike = [bike_number: string, bike_share_for_all_trip: string ... 19 more fields]
user = [user_id: string, bike_share_for_all_trip: string ... 19 more fields]
userDate = [user_id: string, bike_share_for_all_trip: string ... 20 more fields]


[user_id: string, bike_share_for_all_trip: string ... 20 more fields]

In [2]:
%AddDeps org.rogach scallop_2.11 3.4.0 
%AddDeps org.scalatest scalatest_2.11 3.0.1 

Marking org.rogach:scallop_2.11:3.4.0 for download
Obtained 2 files
Marking org.scalatest:scalatest_2.11:3.0.1 for download
Obtained 2 files


In [3]:
// process the commond line
import org.rogach.scallop.{ScallopConf, ScallopOption}  //load pack at the same cell
class CohortConf(args: Seq[String]) extends ScallopConf(args) with Serializable {

  val selectColumnsConfigFile: ScallopOption[String] =
    opt[String]("select.columns.config",
      descr = "name of columns that you want to select",
      required = false,
      default = Option("select-columns"))

  val bikeTripKey: ScallopOption[String] =
    opt[String]("bike.trip.key",
      descr = "bike trip path key",
      required = false,
      default = Option("bike-trips"))

  val env: ScallopOption[String] =
    opt[String]("env",
      descr = "env name that job is running on, test, stage, prod",
      required = false,
      default = Option("stage")) //show environment handling

  val inputBikeSharePath: ScallopOption[String] =
    opt[String]("input.bike.path",
      descr = "input data path for bike share data",
      required = false,
      default = env() match {
        case "test" => Option("/Users/jenny/Desktop/bike_data")
        case "stage" => Option("/Users/jenny/Desktop/bike_data")
        case "prod" => Option("/Users/jenny/Desktop/bike_data")
        case _ => None
          throw new Exception(s"env error, env name can either be test, stage, prod \ncannot be ${env()}")
      })

  val inputMetaDataPath: ScallopOption[String] =
    opt[String]("input.meta.path",
      descr = "input meta data parent path",
      required = false,
      default = env() match {
        case "test" => Option("/Users/jenny/Desktop/meta")
        case "stage" => Option("/Users/jenny/Desktop/meta")
        case "prod" => Option("/Users/jenny/Desktop/meta")
        case _ => None
          throw new Exception(s"env error, env name can either be test, stage, prod \ncannot be ${env()}")
      })

  val outputDataPath: ScallopOption[String] =
    opt[String]("output.data.path",
      descr = "output data parent path",
      required = false,
      default = env() match {
        case "test" => Option("/Users/jenny/Desktop/output")
        case "stage" => Option("/Users/jenny/Desktop/output")
        case "prod" => Option("/Users/jenny/Desktop/output")
        case _ => None
          throw new Exception(s"env error, env name can either be test, stage, prod \ncannot be ${env()}")
      })

  val datePrefix: ScallopOption[String] =
    opt[String]("date.prefix",
      descr = "date prefix for path",
      required = false,
      default = Option("start_date")) //show environment handling

  val processDate: ScallopOption[String] =
    opt[String]("process.date",
      descr = "date to string" +
        ", in YYYY-MM-DD format",
      required = true)

  val uniqueUserPath: ScallopOption[String] =
    opt[String]("unique.user.path",
      descr = "path to save unique user id and start date info",
      required = false,
      default = Option("/Users/jenny/Desktop/unique-user"))

  val dayAgo: ScallopOption[Int] =
    opt[Int]("day.ago",
      descr = "how many day ago you are going to overwrite",
      required = false,
      default = Option(1))

  verify()
}

defined class CohortConf


In [6]:
// choose the columns 
import org.apache.spark.sql.DataFrame
import com.typesafe.config.ConfigFactory
import java.io.File

import scala.collection.JavaConversions._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.functions._
import org.joda.time.DateTime
import org.joda.time.format.{DateTimeFormat, DateTimeFormatter}

object Utils extends Logging {

  def selectColumns(conf: CohortConf, sourceKey: String, inputDf: DataFrame): DataFrame = {
    val fields          = getListFromConf(conf.selectColumnsConfigFile(), sourceKey).map(col)
    val outputDf        = inputDf.select(fields: _*)
    outputDf
  }

  def getListFromConf(configFileName: String, confKey: String): List[String] = {
    try {
      ConfigFactory.load(ConfigFactory.parseFile(new File(configFileName))).getStringList(confKey).toList
      //ConfigFactory.load(configFileName).getStringList(confKey).toList
    } catch {
      case e: Exception =>
        logError(s"*** Error parsing for $confKey as List[String] from $configFileName ***\n${e.getMessage}")
        List[String]()
    }
  }

  def pathGenerator(inputParentPath: String, datePrefix: String, processDate: String): String = {
    s"$inputParentPath/$datePrefix=$processDate/"
  }

  def dayAgoDateString(conf: CohortConf, dayAgo: Int): String = {
    val dateFormat: DateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd")
    val processDate: DateTime         = DateTime.parse(conf.processDate(), dateFormat)
    dateFormat.print(processDate.minusDays(dayAgo))
  }
}


defined object Utils


In [7]:
import org.apache.spark.internal.Logging
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.types.{DoubleType, StringType}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.joda.time.DateTime
import org.joda.time.format.{DateTimeFormat, DateTimeFormatter}

trait BikeShareTripReader extends Logging{

  def readBikeShareTrip(conf: CohortConf, spark: SparkSession): DataFrame = {
    val path = Utils.pathGenerator(conf.inputBikeSharePath(), conf.datePrefix(), conf.processDate())

logInfo("reading from %s".format(path))

val bikeShareDf: DataFrame = try {
Some(spark.read.json(path)).get
} catch {
case e: Exception => spark.emptyDataFrame
.withColumn("user_id", lit(null: StringType))
.withColumn("subscriber_type", lit(null: StringType))
.withColumn("start_station_id", lit(null: StringType))
.withColumn("end_station_id", lit(null: StringType))
.withColumn("zip_code", lit(null: StringType))
.withColumn("duration_sec", lit(null: DoubleType))
.withColumn("start_timestamp", lit(null: StringType))
}
Utils.selectColumns(conf, "bike.share.trip", bikeShareDf)
}

def readDayAgoBikeShareTrip(conf: CohortConf, spark: SparkSession): DataFrame = {
val path = dayAgoReadDataOutPath(conf)

logInfo("reading from %s".format(path))

val bikeShareDf: DataFrame = try {
Some(spark.read.json(path)).get
} catch {
case e: Exception => spark.emptyDataFrame
.withColumn("user_id", lit(null: StringType))
.withColumn("subscriber_type", lit(null: StringType))
.withColumn("start_station_id", lit(null: StringType))
.withColumn("end_station_id", lit(null: StringType))
.withColumn("zip_code", lit(null: StringType))
.withColumn("avg_duration_sec", lit(null: DoubleType))
}
    bikeShareDf
  }

  def dayAgoReadDataOutPath(conf: CohortConf): String = {
    val dateString = Utils.dayAgoDateString(conf, conf.dayAgo())

    val path : String = conf.dayAgo() match {
      case 1 => Utils.pathGenerator(conf.outputDataPath(), conf.datePrefix(), dateString)
      case 3 => Utils.pathGenerator(conf.outputDataPath()+"/1", conf.datePrefix(), dateString)
      case 7 => Utils.pathGenerator(conf.outputDataPath()+"/3", conf.datePrefix(), dateString)
      case _ => throw new Exception("input date is invalid")
    }
    path
  }

  def dayAgoWriteDataOutPath(conf: CohortConf): String = {
    val dateString = Utils.dayAgoDateString(conf, conf.dayAgo())

    val path : String = conf.dayAgo() match {
      case 1 => Utils.pathGenerator(conf.outputDataPath()+"/1", conf.datePrefix(), dateString)
      case 3 => Utils.pathGenerator(conf.outputDataPath()+"/3", conf.datePrefix(), dateString)
      case 7 => Utils.pathGenerator(conf.outputDataPath()+"/7", conf.datePrefix(), dateString)
      case _ => throw new Exception("input date is invalid")
    }
    path
  }
}

defined trait BikeShareTripReader


In [8]:
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparkSession}

trait BikeStationInfoReader extends Logging{

  def readBikeStation(conf: CohortConf, spark: SparkSession): DataFrame = {
    val path = "%s/bike-station-info".format(conf.inputMetaDataPath())

    logInfo("reading from %s".format(path))

    val bikeStationDf = spark.read.json(path)
    Utils.selectColumns(conf, "bike.station.info", bikeStationDf)
  }
}


defined trait BikeStationInfoReader


In [24]:
import org.apache.spark.internal.Logging
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{DataFrame, SparkSession}

trait UserReader extends Logging{

  def readUserInfo(conf: CohortConf, spark: SparkSession, date: String): DataFrame = {
    val inputPath = Utils.pathGenerator(conf.uniqueUserPath(), conf.datePrefix(), date)

    logInfo("reading from %s".format(inputPath))

    val inputUniqueUsersDf: DataFrame = try { //reading unique user list
      Some(spark.read.json(inputPath)).get
    } catch {
      case e: Exception => spark.emptyDataFrame.withColumn("user_id", lit(null: StringType))
        .withColumn("first_timestamp", lit(null: StringType))
    }

    inputUniqueUsersDf
  }

}


defined trait UserReader


In [42]:
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.functions._

object UserProcess extends Logging with UserReader with BikeShareTripReader {

  val spark = SparkSession
    .builder()
    .master("local[*]")
    .appName("Unique-users")
    .getOrCreate()

  def main(args: Array[String]): Unit = {
    val conf = new CohortConf(args)

    val inputPath = Utils.pathGenerator(conf.inputBikeSharePath(), conf.datePrefix(), conf.processDate())

    val outputUniqueUser = Utils.pathGenerator(conf.uniqueUserPath(), conf.datePrefix(), conf.processDate())

    uniqueUser(outputUniqueUser, inputPath, conf)
  }

  def uniqueUser(uniqueUsersPath: String, inputBikeSharePath: String, conf: CohortConf): Unit = {

    val inputUniqueUsersDf = readUserInfo(conf, spark, Utils.dayAgoDateString(conf, 1))
    val inputBikeShareDf = readBikeShareTrip(conf, spark)

    val users = Utils.selectColumns(conf, "bike.unique.user", inputBikeShareDf)
      .withColumn("first_timestamp", col("start_timestamp"))
      .drop(col("start_timestamp"))

    val uniqueUserDf = inputUniqueUsersDf.unionByName(users)
      .groupBy("user_id")
      .agg(min("first_timestamp").as("first_timestamp"))

    uniqueUserDf.distinct().coalesce(1).write.mode(SaveMode.Overwrite).json(uniqueUsersPath)
  }

}
UserProcess.main(Array("--process.date","2014-01-01","--select.columns.config","/Users/jenny/Desktop/conf/select-columns.conf"))

defined object UserProcess


In [43]:
import org.apache.spark.internal.Logging
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

object BikeShareProcess extends Logging with BikeShareTripReader {

  val fields = List("user_id",
    "subscriber_type",
    "start_station_id",
    "end_station_id",
    "zip_code")

  val avgDurationSec = "avg_duration_sec"

  def main(args: Array[String]): Unit = {
    val conf = new CohortConf(args)
    val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName("Bike-share")
      .getOrCreate()

    val outputPath = Utils.pathGenerator(conf.outputDataPath(), conf.datePrefix(), conf.processDate())

    bikeShareAgg(spark, conf, outputPath)
  }

  def bikeShareAgg(spark: SparkSession, conf: CohortConf, outputPath: String): Unit = {

    val bikeShareDf = readBikeShareTrip(conf, spark)

    val bikeShareAggDf = bikeShareDf
      .groupBy(fields.map(col):_*)
      .agg(avg(col("duration_sec")).as(avgDurationSec))

    bikeShareAggDf.coalesce(1).write.mode(SaveMode.Overwrite).json(outputPath)
  }

}
BikeShareProcess.main(Array("--process.date","2014-01-01","--select.columns.config","/Users/jenny/Desktop/conf/select-columns.conf"))

defined object BikeShareProcess


In [39]:
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.functions._

object RetentionProcess extends Logging with UserReader with BikeShareTripReader {

  def main(args: Array[String]): Unit = {
    val conf                  = new CohortConf(args)
    val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName("Bike-share")
      .getOrCreate()
    spark.conf.set("spark.sql.crossJoin.enabled", "true")
    retentionPrep(spark, conf)
  }

  def retentionPrep(spark: SparkSession, conf: CohortConf): Unit = {
    val bikeShareDf = readBikeShareTrip(conf, spark)
    bikeShareDf.printSchema()
    val userDf = readUserInfo(conf, spark, conf.processDate())
    userDf.printSchema()
    val dayAgoBikeShareDf = readDayAgoBikeShareTrip(conf, spark)
    dayAgoBikeShareDf.printSchema()

    val joinedBikeSharedDf = bikeShareDf.join(userDf,
      bikeShareDf.col("user_id") === userDf.col("user_id"), "left")
      .drop(bikeShareDf.col("user_id"))

    val bikeUserAgeDays = joinedBikeSharedDf
      .withColumn("user_age_days",
        datediff(to_date(col("start_timestamp")), to_date(col("first_timestamp"))))

    val bikeFilteredDf : DataFrame = conf.dayAgo() match {
      case 1 => bikeUserAgeDays.filter((col("user_age_days") === 1))
      case 3 => bikeUserAgeDays.filter((col("user_age_days") === 3))
      case 7 => bikeUserAgeDays.filter((col("user_age_days") === 7))
      case _ => throw new Exception("input date is invalid")
    }

    val bikeFilteredAgoDf = bikeFilteredDf.select("user_id", "user_age_days").distinct()

    val aggPrepDf = dayAgoBikeShareDf
      .join(bikeFilteredAgoDf, dayAgoBikeShareDf.col("user_id") === bikeFilteredAgoDf.col("user_id"), "left")
      .drop(bikeFilteredAgoDf.col("user_id"))

    val groupbyFields = BikeShareProcess.fields :+ BikeShareProcess.avgDurationSec

    if(!aggPrepDf.columns.contains("user_age_days") || aggPrepDf.count() == 0){
      logInfo("didn't find anyone fit into %s day ago".format(conf.dayAgo()))
      val aggPrepDfWithageDays = aggPrepDf.withColumn("user_age_days", lit(0))
      retentionAndSave(aggPrepDfWithageDays, conf)
    } else {
      retentionAndSave(aggPrepDf, conf)
    }
  }

  def retentionAndSave(df: DataFrame, conf: CohortConf): Unit = {
    val groupbyFields = BikeShareProcess.fields :+ BikeShareProcess.avgDurationSec :+ "user_age_days"

    val bikeUserAggDf = df.groupBy(groupbyFields.map(col):_*)
      .agg(max(when(df.col("user_age_days") === 1, 1).otherwise(0)).alias("retention_1"),
        max(when(df.col("user_age_days") === 3, 1).otherwise(0)).alias("retention_3"),
        max(when(df.col("user_age_days") === 7, 1).otherwise(0)).alias("retention_7"))

    val outputPath = dayAgoWriteDataOutPath(conf)

    bikeUserAggDf.coalesce(1).write.mode(SaveMode.Overwrite).json(outputPath)
  }
}
RetentionProcess.main(Array("--process.date","2014-01-01","--select.columns.config","/Users/jenny/Desktop/conf/select-columns.conf"))

defined object RetentionProcess


# twitter streaming

In [1]:
%AddDeps org.twitter4j twitter4j-core 4.0.4
%AddDeps org.apache.bahir spark-streaming-twitter_2.11 2.3.3

Marking org.twitter4j:twitter4j-core:4.0.4 for download
Obtained 2 files
Marking org.apache.bahir:spark-streaming-twitter_2.11:2.3.3 for download
Obtained 2 files


Magic AddDeps failed to execute with error: 
requirement failed: Can only call getServletHandlers on a running MetricsSystem

In [2]:
import twitter4j.Status
import scala.io.{AnsiColor, Source}

object Utils {

  // Some type aliases to give a little bit of context
  type Tweet = Status
  type TweetText = String
  type Sentence = Seq[String]

  private def format(n: Int): String = f"$n%2d"

  private def wrapScore(s: String): String = s"[ $s ] "

  private def makeReadable(n: Int): String =
    if (n > 0)      s"${AnsiColor.GREEN + format(n) + AnsiColor.RESET}"
    else if (n < 0) s"${AnsiColor.RED   + format(n) + AnsiColor.RESET}"
    else            s"${format(n)}"

  private def makeReadable(s: String): String =
    s.takeWhile(_ != '\n').take(80) + "..." //short it ?

  def makeReadable(sn: (String, Int)): String =
    sn match {
      case (tweetText, score) => s"${wrapScore(makeReadable(score))}${makeReadable(tweetText)}"
    }

  def load(resourcePath: String): Set[String] = {
    //val source = Source.fromInputStream(getClass.getResourceAsStream(resourcePath))
    //val words = source.getLines.toSet
    //source.close()
    //words
    Source.fromFile(resourcePath).getLines.toSet
  }

  def wordsOf(tweet: TweetText): Sentence =
    tweet.split(" ")

  def toLowercase(sentence: Sentence): Sentence =
    sentence.map(_.toLowerCase)

  def keepActualWords(sentence: Sentence): Sentence =
    sentence.filter(_.matches("[a-z]+"))

  def extractWords(sentence: Sentence): Sentence =
    sentence.map(_.toLowerCase).filter(_.matches("[a-z]+"))

  def keepMeaningfulWords(sentence: Sentence, uselessWords: Set[String]): Sentence =
    sentence.filterNot(word => uselessWords.contains(word))

  def computeScore(words: Sentence, positiveWords: Set[String], negativeWords: Set[String]): Int =
    words.map(word => computeWordScore(word, positiveWords, negativeWords)).sum

  def computeWordScore(word: String, positiveWords: Set[String], negativeWords: Set[String]): Int =
    if (positiveWords.contains(word)) 1
    else if (negativeWords.contains(word)) -1
    else 0

}

defined object Utils


In [3]:
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import twitter4j.Status

object TwitterSentimentScore extends App {

  // You can find all functions used to process the stream in the
  // Utils.scala source file, whose contents we import here
  import Utils._

  // First, let's configure Spark
  // We have to at least set an application name and master
  // If no master is given as part of the configuration we
  // will set it to be a local deployment running an
  // executor per thread
  val sparkConfiguration = new SparkConf().
    setAppName("spark-twitter-stream-example").
    setMaster(sys.env.get("spark.master").getOrElse("local[*]"))
    
  // Let's create the Spark Context using the configuration we just created
  val sparkContext = new SparkContext(sparkConfiguration)

  // Now let's wrap the context in a streaming one, passing along the window size
  val streamingContext = new StreamingContext(sparkContext, Seconds(10))

  // Creating a stream from Twitter, filter out english only
  val tweets: DStream[Status] =
    TwitterUtils.createStream(streamingContext, None).filter(_.getLang == "en")

  // To compute the sentiment of a tweet we'll use different set of words used to
  // filter and score each word of a sentence. Since these lists are pretty small
  // it can be worthwhile to broadcast those across the cluster so that every
  // executor can access them locally
  val uselessWords = sparkContext.broadcast(load("/Users/jenny/Desktop/stream/stop-words.dat"))
  val positiveWords = sparkContext.broadcast(load("/Users/jenny/Desktop/stream/pos-words.dat"))
  val negativeWords = sparkContext.broadcast(load("/Users/jenny/Desktop/stream/neg-words.dat"))

  // Let's extract the words of each tweet
  // We'll carry the tweet along in order to print it in the end
  val textAndSentences: DStream[(TweetText, Sentence)] =
  tweets.
    map(_.getText).
    map(tweetText => (tweetText, wordsOf(tweetText)))

  // Apply several transformations that allow us to keep just meaningful sentences
  val textAndMeaningfulSentences: DStream[(TweetText, Sentence)] =
    textAndSentences.
      mapValues(toLowercase).
      mapValues(keepActualWords).
      mapValues(words => keepMeaningfulWords(words, uselessWords.value)).
      filter { case (_, sentence) => sentence.length > 0 }

  // Compute the score of each sentence and keep only the non-neutral ones
  val textAndNonNeutralScore: DStream[(TweetText, Int)] =
    textAndMeaningfulSentences.
      mapValues(sentence => computeScore(sentence, positiveWords.value, negativeWords.value)).
      filter { case (_, score) => score != 0 }

  // Transform the (tweet, score) pair into a readable string and print it
  textAndNonNeutralScore.map(makeReadable).print
  textAndNonNeutralScore.saveAsTextFiles("tweets", "json")

  // Now that the streaming is defined, start it
  streamingContext.start()

  // Let's await the stream to end - forever
  streamingContext.awaitTermination()

}

defined object TwitterSentimentScore
