# Lab2 - Spark Streaming and GraphX

Group 2 Ya Ting Hu & Zhen Tian

## **Task 1**

#### build.sbt
```scala
name := "spark_kafka"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "2.2.1",
  "org.apache.spark" %% "spark-sql" % "2.2.1",
  "org.apache.spark" % "spark-streaming_2.11" % "2.2.1",
  "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.2.1",
  ("com.datastax.spark" %% "spark-cassandra-connector" % "2.0.2").exclude("io.netty", "netty-handler"),
  ("com.datastax.cassandra" % "cassandra-driver-core" % "3.0.0").exclude("io.netty", "netty-handler")
)
```

#### KafkaSpark.scala

In this assignment, first we import the given packages. In the main function, first we make a manual spark streaming context and use `createDirectStream` to connect to Kafka, with topic `avg`. Second, we extract the key value pair and split it, cast the value to `Double` type. Then we make the mapping function. To get the count and sum values, we modified the `state` and `return` from the template code. In the main function, we let the count += 1 and add up the state value and new value to make the summation. With the new variable count, it is possible to compute the average using summation divided by count. In the end we shall update the state with new count and summation and output the key, average pair.

```scala
package sparkstreaming

import java.util.HashMap
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka._
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.storage.StorageLevel
import java.util.{Date, Properties}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, ProducerConfig}
import scala.util.Random


object KafkaSpark {
  def main(args: Array[String]) {
    val kafkaParams = Map(
    "metadata.broker.list" -> "localhost:9092",
    "zookeeper.connect" -> "localhost:2181",
    "group.id" -> "kafka-spark-streaming",
    "zookeeper.connection.timeout.ms" -> "1000")

    // make a connection to Kafka and read (key, value) pairs from it
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("readKeyValuePairFromKafka")
    val ssc = new StreamingContext(sparkConf, Seconds(1))
   
    ssc.checkpoint(".")
    
    val topic = Set("avg")
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder,StringDecoder](
      ssc,
      kafkaParams,
      topic
    )

    val kvpair_str = messages.map(_._2).map(_.split(","))
    val kvpair_float = kvpair_str.map(x => (x(0), x(1).toDouble))
    
    def mappingFunc(key: String, value: Option[Double], state: State[(Int, Double)]): (String, Double) = {
      val value_new = value.getOrElse(0.0);
      var (count_state, sum_state) = state.getOption.getOrElse((0, 0.0))

      count_state   = count_state + 1;
      sum_state     = sum_state   + value_new;

      val avg_state = sum_state/count_state;

      state.update((count_state, sum_state))
      (key,avg_state)
    }

    val stateDstream = kvpair_float.mapWithState(StateSpec.function(mappingFunc _))

    
    stateDstream.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
```

#### **Output:**
```bash
-------------------------------------------
Time: 1633819471000 ms
-------------------------------------------
(x,12.578697929658269)
(p,12.71021135727018)
(j,12.463486140724946)
(j,12.46656008526512)
(n,12.668130165289256)
(d,12.524929685502428)
(l,12.581098339719029)
(h,12.483510908168443)
(b,12.468790468790468)
(v,12.48776637726914)
...

21/10/09 18:44:33 INFO JobScheduler: Finished job streaming job 1633819471000 ms.0 from job set of time 1633819471000 ms
21/10/09 18:44:33 INFO JobScheduler: Total delay: 2.862 s for time 1633819471000 ms (execution: 0.712 s)
21/10/09 18:44:33 INFO MapPartitionsRDD: Removing RDD 12 from persistence list
```

## **Task 2**

#### build.sbt

Added given packages.

```scala
name := "spark_kafka"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "2.2.1",
  "org.apache.spark" %% "spark-sql" % "2.2.1",
  "org.apache.spark" % "spark-streaming_2.11" % "2.2.1",
  "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % "2.2.1",
  "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.2.1",
  ("com.datastax.spark" %% "spark-cassandra-connector" % "2.0.2").exclude("io.netty", "netty-handler"),
  ("com.datastax.cassandra" % "cassandra-driver-core" % "3.0.0").exclude("io.netty", "netty-handler")
)
```

#### KafkaSpark.scala

In the beginning we shall make the spark session and spark context, and load the data from kafka with topic `avg`. Then we convert the initial data into a temporary table and split the key value pair, into two columns and group by the key. In the mapping function, we have key (where we defined in the `groupByKey`, values with all items in the row, Group state with sum and count (`Double` and `Int` respectively). Inside the mapping function, we get the sum and count as variables, and using `foreach` loop to compute summation of `value` column. Later we update the state with sum and count, and in the end return the key average pair as usual. Finally, we use `mapGroupsWithState` to apply the mapping function, and write the stream then finish the whole program. 

```scala
package sparkstreaming

import java.util.HashMap
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka._
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.storage.StorageLevel
import java.util.{Date, Properties}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, ProducerConfig}
import scala.util.Random

import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{Trigger, GroupState, GroupStateTimeout}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.functions.{col, split}

object KafkaSpark {
  def main(args: Array[String]) {
    
  
    val spark:SparkSession = SparkSession.builder.appName("avg_spark").master("local[*]").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    val inputTable = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "avg").load()
    

    import spark.implicits._
    val temp_table = inputTable.selectExpr("CAST(value AS STRING)").as[(String)]
        .select(split(col("value"),",").getItem(0).as("key"),
                split(col("value"),",").getItem(1).as("val")).drop("value")
        .groupByKey(x => x.getString(0))


def mappingFunc(key: String, values: Iterator[Row], state: GroupState[(Double, Int)]):(String, Double) = {
        var (sum, cnt) = state.getOption.getOrElse((0.0, 0))

        values.foreach{value => 
            cnt = cnt + 1
            sum = value.getString(1).toDouble + sum
        }
        
        state.update((sum, cnt))
        (key, sum/cnt)
    }

    val resultTable = temp_table.mapGroupsWithState(mappingFunc _)

    resultTable.writeStream.format("console")
      .outputMode("update")
      .trigger(Trigger.ProcessingTime(0))
      .start()
      .awaitTermination()
  }
}
```

#### **Output:**
```bash
-------------------------------------------
Batch: 0
-------------------------------------------
+---+------------------+
| _1|                _2|
+---+------------------+
|  l|13.102739726027398|
|  x|            12.625|
|  g| 12.16600790513834|
|  m|12.017094017094017|
|  f|12.245689655172415|
|  n| 12.04564315352697|
|  k| 12.28310502283105|
|  v|13.460869565217392|
|  e|12.672413793103448|
|  o| 11.11344537815126|
|  h|12.916666666666666|
|  z|11.841584158415841|
|  p|12.551020408163266|
|  d|13.304147465437788|
|  w|12.222222222222221|
|  y|12.452755905511811|
|  c|12.076923076923077|
|  u|13.352490421455938|
|  i|13.426724137931034|
|  q|12.178423236514522|
+---+------------------+
only showing top 20 rows

-------------------------------------------
Batch: 1
-------------------------------------------
+---+------------------+
| _1|                _2|
+---+------------------+
|  l|12.519252764010675|
|  x|12.454621592812547|
|  g|12.459955970545813|
|  m|12.451525893958076|
|  f| 12.56525436833153|
|  n|12.514680353234207|
|  k|12.522346797726923|
|  v|12.450045899632803|
|  e|12.511940413115259|
|  o|12.471619993977718|
|  h|12.581570537208949|
|  z|12.589227379776647|
|  p|12.551442198501022|
|  d|12.557758031442242|
|  w|12.538041416976409|
|  y|12.448592457697853|
|  c|12.458559315560308|
|  u|12.527782006393668|
|  i| 12.46321752265861|
|  q|12.422279990824986|
+---+------------------+
only showing top 20 rows

-------------------------------------------
Batch: 2
-------------------------------------------
+---+------------------+
| _1|                _2|
+---+------------------+
|  l| 12.50893371757925|
|  x|12.484007543974442|
|  g|12.521029745642052|
|  m| 12.52194066749073|
|  f|12.577554673515449|
|  n|12.537221795855718|
|  k|12.470072936363987|
|  v| 12.46878853267571|
|  e|12.462496133622023|
|  o|12.520618162344121|
|  h|12.580966291271093|
|  z|12.479436685108523|
|  p| 12.53145460810966|
|  d|12.568201897591518|
|  w| 12.56778882938026|
|  y|12.505762971472334|
|  c|12.503457868098753|
|  u|12.540193423597678|
|  i|12.507687012391004|
|  q| 12.40910670837678|
+---+------------------+
only showing top 20 rows

```

## **Task 3**

In [1]:
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

In [2]:
val vertices = sc.parallelize(Array((1L, ("Alice", 28)),
                                    (2L, ("Bob", 27)), 
                                    (3L, ("Charlie", 65)), 
                                    (4L, ("David", 42)),
                                    (5L, ("Ed", 55)),
                                    (6L, ("Fran", 50)),
                                    (7L, ("Alex", 55))))
                                     
val edges     = sc.parallelize(Array(
                               Edge(2L, 1L, 7),
                               Edge(2L, 4L, 2), 
                               Edge(3L, 6L, 3),
                               Edge(3L, 2L, 4),
                               Edge(4L, 1L, 1), 
                               Edge(5L, 2L, 2),
                               Edge(5L, 3L, 8),
                               Edge(5L, 6L, 3),
                               Edge(7L, 5L, 3),
                               Edge(7L, 6L, 4)))

val graph_network = Graph(vertices, edges)

vertices = ParallelCollectionRDD[0] at parallelize at <console>:31
edges = ParallelCollectionRDD[1] at parallelize at <console>:39
graph_network = org.apache.spark.graphx.impl.GraphImpl@3bbd3f78


org.apache.spark.graphx.impl.GraphImpl@3bbd3f78

### **1. Display the names of the users that are at least 30 years old.**

To filter our the users age, we make a filter on the 2nd attribute of the 

vertices >= 30 and the next step will map the names output
```scala
graph_network.vertices.filter(_._2._2 >= 30).collect()
```
Output without map names: we can see they are at least 30 years old
```scala
Array((4,(David,42)), (5,(Ed,55)), (6,(Fran,50)), (3,(Charlie,65)), (7,(Alex,55)))
```

In [3]:
val age_30_names = graph_network.vertices.filter(_._2._2 >= 30).map(_._2._1).collect()

age_30_names = Array(David, Ed, Fran, Charlie, Alex)


Array(David, Ed, Fran, Charlie, Alex)

### **2. Display who likes who.**

To answer this question, we use triplets to print out two people.

In [4]:
graph_network.triplets.map(triplet =>triplet.srcAttr._1 + " likes " + triplet.dstAttr._1).collect()

Array(Bob likes Alice, Bob likes David, Charlie likes Bob, Charlie likes Fran, David likes Alice, Ed likes Bob, Ed likes Charlie, Ed likes Fran, Alex likes Ed, Alex likes Fran)

### **3. If someone likes someone else more than 5 times than that relationship is getting pretty serious, so now display the lovers.**

To answer this question, we use smiliar way as the last question, namely triplets with a filter to find out who are the lovers. In the filter we filter out the attribute > 5. 

In [5]:
graph_network.triplets.filter(_.attr>5).map(triplet =>triplet.srcAttr._1 + " is lover with " + triplet.dstAttr._1).collect()

Array(Bob is lover with Alice, Ed is lover with Charlie)

### **4. Print the number of people who like each user (e.g., Alice is liked by 2 people).**

In this question we first define a User class with indegree and outdegree. Then we map the origial graph and create the empty graph. Next, we use `outerJoinVertices` to fillin both indegree and outdegree.

In [6]:
case class User(id:Long, name: String, age: Int, indegree: Int, outdegree: Int)

val user_graph_map = graph_network.mapVertices{case(id, (name,age)) => User(id, name, age, 0, 0)}

val user_graph = user_graph_map.outerJoinVertices(graph_network.inDegrees){
(id, u, indegree_value) => User(id, u.name, u.age, indegree_value.getOrElse(0), u.outdegree)
}.outerJoinVertices(graph_network.outDegrees){
(id, u, outdegree_value) => User(id, u.name, u.age, u.indegree, outdegree_value.getOrElse(0))
}

defined class User
user_graph_map = org.apache.spark.graphx.impl.GraphImpl@79c4653a
user_graph = org.apache.spark.graphx.impl.GraphImpl@f7c1b63


org.apache.spark.graphx.impl.GraphImpl@f7c1b63

Also we use `for` loop to extract name and indegree then print it out.

In [7]:
for ((name, indegree) <- user_graph.vertices.map{case(id, user) => (user.name, user.indegree)}.collect){
    println(name  + " is liked by " + indegree + " people")
}

David is liked by 1 people
Alice is liked by 2 people
Ed is liked by 1 people
Fran is liked by 3 people
Bob is liked by 2 people
Charlie is liked by 1 people
Alex is liked by 0 people


### **5. Print the names of the users who are liked by the same number of people they like (e.g., Bob and David).**

To answer this question, we use the graph in the last question, if the `indegree` and `outdegree` are equal we print out the names of them.

In [8]:
for ((name, indegree, outdegree) <- user_graph.vertices.map{case(id, user) => (user.name, user.indegree, user.outdegree)}.collect){
    if (indegree == outdegree) {
        println(name)
    }
}

David
Bob


### **6. Find the oldest follower of each user (hint: use the aggregateMessages).**

In this question we can use `aggregateMessages`, send the messages to triplet of names and ages. In the end we leave the larger message.

In [9]:
val follower_rdd: VertexRDD[(String, Int)] = user_graph.aggregateMessages[(String, Int)](
    // map
    triplet => {
        triplet.sendToDst(triplet.srcAttr.name, triplet.srcAttr.age)
    },
    // reduce if message is larger then take the larger one
    (a, b) => (if (a._2 > b._2) {
                    a
    } else {
                    b
    }
))

follower_rdd = VertexRDDImpl[62] at RDD at VertexRDD.scala:57


VertexRDDImpl[62] at RDD at VertexRDD.scala:57

Then, we use a `for` loop to print out all the follower. if the user doesn't have any follower then we print out there have no followers. If the user has follower then print out the name.

In [10]:
val ug_oldeest = user_graph.vertices.leftJoin(follower_rdd) {
    case (id, user, follower_list) => (user.name, follower_list)
}

for ((name, follower) <- ug_oldeest.map(follower_list => (follower_list._2._1, (follower_list._2._2))).collect){
    if (follower == None){
        println(name + " doesn't have follower!")
    } else {
   
    println(follower.map(_._1).get +" is the oldest follower of "+ name)
    }
}

Bob is the oldest follower of David
David is the oldest follower of Alice
Alex is the oldest follower of Ed
Charlie is the oldest follower of Fran
Charlie is the oldest follower of Bob
Ed is the oldest follower of Charlie
Alex doesn't have follower!


ug_oldeest = VertexRDDImpl[64] at RDD at VertexRDD.scala:57


VertexRDDImpl[64] at RDD at VertexRDD.scala:57