![](https://raw.githubusercontent.com/DivLoic/mdd-structured-streaming/master/resources/intro.png)

### Spark Structured Streaming

![](https://raw.githubusercontent.com/DivLoic/mdd-structured-streaming/master/resources/unbounded.png)

[Développer une application de Machine Learning en moins de 30 min - Alban Phelip & Mouloud Lounaci](https://youtu.be/iZoVwBDYyMU)
![](https://raw.githubusercontent.com/DivLoic/mdd-structured-streaming/master/resources/twitter.png)

#### Les sources :

![](https://raw.githubusercontent.com/DivLoic/mdd-structured-streaming/master/resources/source.png)

configuration

In [None]:
val BROKER_HOST = "172.16.41.136"
val BROKER_PORT = "9092"
val S3_DIR = "/mnt/moisdeladata/"
val TOPIC = "twitter"

In [None]:
println("***" * 30)
println("*")
println(s"* \t BROKER_HOST: $BROKER_HOST")
println(s"* \t BROKER_PORT: $BROKER_PORT")
println(s"* \t S3_DIR NAME: $S3_DIR")
println(s"* \t TOPIC NAME : $TOPIC")
println("*")
println("***" * 30)

In [None]:
import org.apache.spark.sql.DataFrame

In [None]:
/**
 * step 1: read from the source
 */

val df: DataFrame = spark.readStream.format("kafka")
    
    .option("kafka.bootstrap.servers", s"$BROKER_HOST:$BROKER_PORT")

    .option("subscribe", TOPIC) // list or regex
  
    .load()

In [None]:
df.getClass

In [None]:
df.isStreaming

In [None]:
df.printSchema

#### Les transformations :

In [None]:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

In [None]:
println("get_json_object(col: Column, path: String): Column -- \"json string\"")
println()
println("from_json(col: Column, schema: StructType): Column  -- \"struct type\"")
println()
println("to_json(col: Column): Column  -- \"json string\"") 

kafka-connect-twitter : `{"schema": {}, "payload": {}}`

In [None]:
/**
 * step 2: extract the payload
 */

val dfPayload = df.withColumn(
    "payload",
    get_json_object(
        $"value".cast(StringType), 
        "$.payload"
    )
)

In [None]:
dfPayload.printSchema

```| <topic> | <timestamp> | {"id": 12345, "user": "info", ...} |```

In [None]:
/**
 * step 3: build a apply a schema
 */

val ex_schema = new StructType(
    Array(
        StructField("id", StringType),
        StructField("media", StringType),
        StructField("text", StringType, false)
    )
)

In [None]:
dfPayload.select(
    
    $"topic",
    $"timestamp",
    from_json($"payload", ex_schema) as 'tweet
    
).printSchema

In [None]:
dfStructured.printSchema

```
root
 |-- topic: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- tweet: struct (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- text: string (nullable = false)
 |    |-- created_at: string (nullable = true)
 |    |-- is_retweet: boolean (nullable = true)
 |    |-- media: array (nullable = true)
 |    |    |-- element: string (containsNull = false)
 |    |-- user: struct (nullable = true)
 |    |    |-- id: long (nullable = false)
 |    |    |-- location: string (nullable = true)
 |    |    |-- verified: boolean (nullable = true)
 |    |    |-- screen_name: string (nullable = true)
 |    |-- entities: struct (nullable = true)
 |    |    |-- hashtags: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = false)
 |    |    |    |    |-- text: string (nullable = true)
```

In [None]:
/**
 * step 4: flatten the frame
 */

val dfTweets: DataFrame = dfStructured.select(
    $"topic",
    $"timestamp",
    $"tweet.id" as "tweet_id",
    $"tweet.text" as "text",
    $"tweet.is_retweet" as "is_retweet",
    substring($"tweet.created_at", 0, 23) as "created_at",
    unix_timestamp(
      substring($"tweet.created_at", 0, 23), 
      "yyyy-MM-dd'T'HH:mm:ss.S"
    ).cast(TimestampType) as "creation_time",
    $"tweet.user.id" as "user_id",
    $"tweet.user.verified" as "verified",
    $"tweet.user.location" as "location",
    $"tweet.user.screen_name" as "screen",
    $"tweet.entities.hashtags.text" as "hashtags"
 )

In [None]:
dfTweets.printSchema

```
root
 |-- topic: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- tweet_id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- is_retweet: boolean (nullable = true)
 |-- created_at: string (nullable = true)
 |-- creation_time: timestamp (nullable = true)
 |-- user_id: long (nullable = true)
 |-- verified: boolean (nullable = true)
 |-- location: string (nullable = true)
 |-- screen: string (nullable = true)
 |-- hashtags: array (nullable = true)
 |    |-- element: string (containsNull = false)
```

In [None]:
/**
 * step 5: filter, join & display
 */

In [None]:
dfTweets.createOrReplaceTempView("all_tweets")

In [None]:
dfTweets

.filter(size($"hashtags") >= 4)

.createOrReplaceTempView("all_tweets")

In [None]:
val ref = Seq(
  
  ("25073877", "@realDonaldTrump", "target1"),
  ("52544275", "@IvankaTrump", "target2"),
  ("822215679726100480", "@POTUS", "target3"),
  ("22203756", "@mike_pence", "target4"),
  ("<your-id>", "@<your-name>", "target5")
  
).toDF("profile_id", "name", "target")

display(ref)

In [None]:
dfTweets

.join(ref, dfTweets("user_id") === ref("profile_id"))

.createOrReplaceTempView("known_users")

In [None]:
/** ml **/

In [None]:
val finalColumns = Seq("text", "timestamp", "creation_time", "hashtags", "location")
val dfLive = dfTweets.filter(size($"hashtags") <= 3)

In [None]:
import org.apache.spark.ml.{Pipeline, PipelineModel}

In [None]:
val model: PipelineModel = PipelineModel.read.load(s"$S3_DIR/models/trump")

In [None]:
val dfPrediction: DataFrame = model.transform(dfLive) 

In [None]:
dfPrediction.printSchema

```
root
 |-- topic: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- tweet_id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- is_retweet: boolean (nullable = true)
 |-- created_at: string (nullable = true)
 |-- creation_time: timestamp (nullable = true)
 |-- user_id: long (nullable = true)
 |-- verified: boolean (nullable = true)
 |-- location: string (nullable = true)
 |-- screen: string (nullable = true)
 |-- hashtags: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- token_raw: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ngram: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- tf: vector (nullable = true)
 |-- idf: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = true)
```

In [None]:
display(dfPrediction.select("prediction", finalColumns:_*))

![](https://raw.githubusercontent.com/DivLoic/mdd-structured-streaming/master/resources/final_result.png)

In [None]:
/**
 * step 7: write to the sink
 */

![](https://raw.githubusercontent.com/DivLoic/mdd-structured-streaming/master/resources/sink.png)

In [None]:
import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, StreamingQuery}

`%fs ls /mnt/moisdeladata/data/tweets/prediction/`

In [None]:
dfPrediction.select("prediction", finalColumns:_*)

  .writeStream.format("parquet")
  
  .option("path", s"$S3_DIR/data/tweets/prediction/table/")
  
  .option("checkpointLocation", s"$S3_DIR/checkpoints/prediction/")
  
  .trigger(ProcessingTime(0.5 seconds))
  
  .start()

`%fs ls /mnt/moisdeladata/data/tweets/prediction/`
```
    - part-00001-16dbff53-e686-4a57-a18a-26ca706034ad.snappy.parquet
    - part-00001-2b8afcb1-37ab-43c2-8026-b5ad03dfe22f.snappy.parquet
    - part-00001-3474a0e2-6e56-4073-9fe6-790c4b5c65f6.snappy.parquet
    - part-00001-686a7735-41a4-43c7-a1af-a86461d2b75f.snappy.parquet
    - ...
```    

In [None]:
dbutils.fs.ls("/mnt/moisdeladata/data/tweets/prediction/table/").size

Sources: 
- *[Apache Spark](http://spark.apache.org) documentation*

In [None]:
println(
    """
    | __  __ _____ ____   ____ ___
    ||  \/  | ____|  _ \ / ___|_ _|
    || |\/| |  _| | |_) | |    | | 
    || |  | | |___|  _ <| |___ | | 
    ||_|  |_|_____|_| \_\\____|___|
    |
    """
)