In [None]:
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.ml.classification.LogisticRegressionModel
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions.udf
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.ml.Pipeline

// Import the sentiment analysis model from HDFS
val modelUri = "hdfs:///user/client/tweets_model"
val model = LogisticRegressionModel.load(modelUri)

// Get tweets from Kafka
val ssc = new StreamingContext(spark.sparkContext, Seconds(2))
val topic = "tweets"
val brokers = "kafka-0-broker.kafka.autoip.dcos.thisdcos.directory:1025"
val props = Map[String, Object](
  "bootstrap.servers" -> brokers,
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "consumer1",
  "auto.offset.reset" -> "earliest",
  "enable.auto.commit" -> (false: java.lang.Boolean),
  "sasl.kerberos.service.name" -> "kafka",
  "security.protocol" -> "SASL_SSL",
  "sasl.mechanism" -> "GSSAPI",
  "ssl.truststore.location" -> "/mnt/mesos/sandbox/trust-ca.jks",
  "ssl.truststore.password" -> "changeit"
)
val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, String](Array(topic), props))
val lines = stream.map(_.value)

// Define a function to clean (a little bit) the tweets
val combinedPattern = """@[A-Za-z0-9_]+|https?://[^ ]+""".r
val wwwPattern = """www.[^ ]+""".r
val negationsDict: Map[String, String] = Map(("isn't","is not"),("aren't","are not"),("wasn't","was not"),("weren't","were not"),("haven't","have not"),("hasn't","has not"),("hadn't","had not"),("won't","will not"),("wouldn't","would not"),("don't","do not"),("doesn't","does not"),("didn't","did not"),("can't","can not"),("couldn't","could not"),("shouldn't","should not"),("mightn't","might not"),("mustn't","must not"))
def cleanTweets(s: String): String = {
  var stripped = combinedPattern.replaceAllIn(s, "")
  stripped = wwwPattern.replaceAllIn(stripped, "")
  val lowerCase = stripped.toLowerCase()
  var negHandled = lowerCase
  for ((k,v) <- negationsDict) {
    negHandled = negHandled.replaceAll(k, v)
  }
  val lettersOnly = """[^a-zA-Z]""".r.replaceAllIn(negHandled, " ")
  val simpleSpaced = """ +""".r.replaceAllIn(lettersOnly, " ")
  return simpleSpaced
}

lines.foreachRDD { rdd: RDD[String] =>
  // Parse JSON
  val rawDf = spark.read.json(rdd)
  if(rawDf.count() > 0) {
    // Clean the tweers
    val cleanTweetsUdf = udf(cleanTweets _)
    var cleanDf = rawDf.select($"text", cleanTweetsUdf(rawDf("text")))
    cleanDf = cleanDf.withColumnRenamed("text", "tweet").withColumnRenamed("UDF(text)", "text")
    cleanDf.createOrReplaceTempView("clean")
    // Only keep the tweets that have more than 50 characters after cleanup
    val finalDf = spark.sql("SELECT * FROM clean WHERE LENGTH(text) > 50")
    // Prepare the data for the model
    val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words")
    val hashingTF = new HashingTF().setInputCol("words").setOutputCol("tf").setNumFeatures(65536)
    val idf = new IDF().setMinDocFreq(5).setInputCol("tf").setOutputCol("features")
    val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, idf))
    val pipelineFit = pipeline.fit(finalDf)
    val preparedDf = pipelineFit.transform(finalDf)
    // Predict sentiment using the model
    val predictionsDf = model.transform(preparedDf)
    // Display the predictions
    predictionsDf.select("tweet", "prediction").show(50, false)
  }
}

ssc.start()
ssc.awaitTermination()