<img width="200" style="float:left" 
     src="https://upload.wikimedia.org/wikipedia/commons/f/f3/Apache_Spark_logo.svg" /> 

# Sections
* [Description](#0)
* [1. Streaming Processing](#1) 
  * [1.1 Search for Spark Installation](#1.1)
  * [1.2 Create SparkSession](#1.2)
  * [1.3 Main DataFrame creation](#1.3) 
  * [1.4 Extract the right data out of the CSV line you get from the topic](#1.4)
  * [1.5  Main queries for streaming processing](#1.5)
* [2. Serging](#2)  
  * [2.1 Start the Kafka service and the producer](#2.1)
  * [2.2 Create SparkSession for Streaming processing](#2.2)
  * [2.3 Reading of CSV file](#2.3)
  * [2.4 Reading tweets from the kafka topic](#2.4)
  * [2.5 Creation of an specific dataframe](#2.5)
  * [2.6 Join, and launch to MariaDB](#2.6)
* [3. Finalize the exercise](#3)
  * [3.1 Stop the Spark Streaming application](#3.1)
  * [3.2 Stop the Kafka producer](#3.2)
  * [3.3 Stop the Kafka service](#3.3)

<a id='0'></a>
## Description
<p>
<div>This notebook will do the following:</div>
<ul>    

The same notebooks can handle two streaming works, processing and serving. For running each part, the notebooks has to be restarted. Once is restarted, run only the part that the user wants to use, with the steps required.  
   
PART 1
    <li>Consume events from a Kafka topic called <em>tweets</em></li>
    <li>Count and show which mentions are trending</li>
    <li>Counting hashtags in window of 30 seconds</li>
    <li>More retweetd accounts</li>
    
PART 2

<li>Read a CSV file ready to be join with the streaming data</li>
<li>Consume events from a Kafka topic called <em>tweets</em></li>
<li>Create the specific dataframe that is going to be upload to MariaDB</li>
<li>Function to specify where is going to be deployed our table</li>
<li>Query for launching</li>

</ul>    
</p>

<a id='1'></a>
## 1. Streaming Processing

<p>Before you can run this notebook, be sure that you:</p>
<ul>
    <li><p><b>Start the Kafka service</b> in you course environment:
        <br/><em>\$ sudo service kafka start</p></em></li>  
    <li><p><b>Start the producer</b>:
        <br/><em>\$ python3 tweets_producer.py send -m "This is a message from the Python client" -t tweets</em></p></li>
    <li><p><b>If you want, you can check if the tweets are being storage launching in other terminal the consumer</b>:
        <br/><em>\$ python3 tweets_consumer.py tweets IE 300</em></p></li>
</ul>

<a id='1.1'></a>
### 1.1 Search for Spark Installation 
This step is required just because we are working in the course environment.

In [1]:
import findspark
findspark.init()

In [2]:
import pandas as pd
pd.set_option('display.max_colwidth', None)

<a id='1.2'></a>
### 1.2 Create SparkSession for Streaming processing


By setting this environment variable we can include extra libraries in our Spark cluster.<br/>
We'll take advantage of this step to include that <b>Spark package to connecto to Kafka</b>.

In [3]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.3" pyspark-shell'

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

spark = (SparkSession.builder
    .appName("Processing and Serving in Streaming")
    .getOrCreate())

Ivy Default Cache set to: /home/osbdet/.ivy2/cache
The jars for the packages stored in: /home/osbdet/.ivy2/jars
:: loading settings :: url = jar:file:/opt/spark3/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-71ea722b-61c5-4c66-bf61-d7d66b741155;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.3 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.3 in central
	found org.apache.kafka#kafka-clients;2.4.1 in central
	found com.github.luben#zstd-jni;1.4.4-3 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.2 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 323ms :: artifacts dl 6ms
	:: modules in use:

<a id='1.3'></a>
### 1.3 Main DataFrame creation


In [4]:
tweets = spark.readStream\
               .format("kafka") \
               .option("kafka.bootstrap.servers", "localhost:9092") \
               .option("subscribe", "tweets") \
               .option("startingOffsets", "latest") \
               .option("kafka.group.id", "IE") \
               .load()
tweets

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

<a id='1.4'></a>
### 1.4 Extract the right data out of the CSV line you get from the topic
We're going to apply the following logic to the events we get from the topic:
<ul>
    <li>Cast the default data type of the field <em>value</em> (byte) to the String data type.</li>
    <li>Split the txt line into fields by using the <em>split</em> function.</li>
    <li>Create most of the columns that are present into the TXT file

</ul>

Check the schema we get now, it looks like any other DataFrame we've seen up until now... this is <b>a real unified processing framework</b>.

In [5]:
from pyspark.sql.functions import split, col, window
from pyspark.sql.types import StringType,BooleanType,DateType,TimestampType,IntegerType

In [6]:
from pyspark.sql.functions import split, col, window
from pyspark.sql.types import StringType,BooleanType,DateType,TimestampType,IntegerType

tweets = tweets.selectExpr("CAST(value AS STRING)") \
                 .select(split("value",'\|').alias("fields")) \
                 .withColumn("timestamp_str",col("fields").getItem(0)) \
                 .withColumn("created_at",col("timestamp_str").cast(TimestampType()))\
                 .withColumn("id_str",col("fields").getItem(1)) \
                 .withColumn("text",col("fields").getItem(2)) \
                 .withColumn("quote_count",col("fields").getItem(3)) \
                 .withColumn("reply_count",col("fields").getItem(4)) \
                 .withColumn("retweet_count",col("fields").getItem(5)) \
                 .withColumn("favorite_count",col("fields").getItem(6)) \
                 .withColumn("favorited",col("fields").getItem(7)) \
                 .withColumn("retweeted",col("fields").getItem(8)) \
                 .withColumn("possibly_sensitive",col("fields").getItem(9)) \
                 .withColumn("filter_level",col("fields").getItem(10)) \
                 .withColumn("lang",col("fields").getItem(11)) \
                 .withColumn("hour",col("fields").getItem(12)) \
                 .withColumn("mix",col("fields").getItem(13)) \
                 .withColumn("dt",col("fields").getItem(14)) \
                 .withColumn("day-hour",col("fields").getItem(15)) \
                 .withColumn("user_name",col("fields").getItem(16)) \
                 .withColumn("user_screen_name",col("fields").getItem(17)) \
                 .withColumn("user_location",col("fields").getItem(18)) \
                 .withColumn("user_description",col("fields").getItem(19)) \
                 .withColumn("user_followers_count",col("fields").getItem(20)) \
                 .withColumn("user_friends_count",col("fields").getItem(21)) \
                 .withColumn("user_listed_count",col("fields").getItem(22)) \
                 .withColumn("user_favourites_count",col("fields").getItem(23)) \
                 .withColumn("user_statuses_count",col("fields").getItem(24)) \
                 .withColumn("user_statuses_count",col("user_statuses_count").cast(IntegerType()))\
                 .withColumn("rt_quot_count",col("fields").getItem(25)) \
                 .withColumn("rt_reply_count",col("fields").getItem(26)) \
                 .withColumn("rt_retweet_count",col("fields").getItem(27)) \
                 .withColumn("rt_retweet_count",col("rt_retweet_count").cast(IntegerType()))\
                 .withColumn("rt_favorite_count",col("fields").getItem(28)) \
                 .withColumn("rt_user_id",col("fields").getItem(29)) \
                 .withColumn("rt_user_name",col("fields").getItem(30)) \
                 .withColumn("rt_user_screen_name",col("fields").getItem(31)) \
                 .withColumn("rt_user_location",col("fields").getItem(32)) \
                 .withColumn("rt_user_description",col("fields").getItem(33)) \
                 .withColumn("rt_user_followers_count",col("fields").getItem(34)) \
                 .withColumn("rt_user_friends_count",col("fields").getItem(35)) \
                 .withColumn("rt_user_listed_count",col("fields").getItem(36)) \
                 .withColumn("rt_user_favourites_count",col("fields").getItem(37)) \
                 .withColumn("rt_user_statuses_count",col("fields").getItem(38)) \
                 .withColumn("rt_user_created_at",col("fields").getItem(39)) \
                 .withColumn("user_mentions_screen_name",col("fields").getItem(40)) \
                 .withColumn("hashtags",col("fields").getItem(41)) \
                 .withColumn("media_expanded_url",col("fields").getItem(42)) \
                 .withColumn("urls_expanded_url",col("fields").getItem(43)) \
                 .withColumn("symbols_text",col("fields").getItem(44)) \
                 .drop("fields")

In [7]:
tweets.printSchema()

root
 |-- timestamp_str: string (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- id_str: string (nullable = true)
 |-- text: string (nullable = true)
 |-- quote_count: string (nullable = true)
 |-- reply_count: string (nullable = true)
 |-- retweet_count: string (nullable = true)
 |-- favorite_count: string (nullable = true)
 |-- favorited: string (nullable = true)
 |-- retweeted: string (nullable = true)
 |-- possibly_sensitive: string (nullable = true)
 |-- filter_level: string (nullable = true)
 |-- lang: string (nullable = true)
 |-- hour: string (nullable = true)
 |-- mix: string (nullable = true)
 |-- dt: string (nullable = true)
 |-- day-hour: string (nullable = true)
 |-- user_name: string (nullable = true)
 |-- user_screen_name: string (nullable = true)
 |-- user_location: string (nullable = true)
 |-- user_description: string (nullable = true)
 |-- user_followers_count: string (nullable = true)
 |-- user_friends_count: string (nullable = true)
 |-- user_lis

<a id='1.5'></a>
### 1.5 Main queries for streaming processing

1.5.1: Counting which mention is more trending

In [None]:
tweets.where(col("user_mentions_screen_name").isNotNull())\
            .where(col("user_mentions_screen_name")!="[]")\
            .groupBy("user_mentions_screen_name")\
            .count()\
            .orderBy(col("count").desc())\
            .limit(5)\
            .writeStream\
            .format("console")\
            .outputMode("complete")\
            .trigger(processingTime='10 seconds')\
            .start()\
            .awaitTermination()

1.5.2: Counting hashtags in window of 30 seconds

In [None]:
tweets.where(col("hashtags").isNotNull())\
                 .where(col("hashtags")!="[]")\
                 .groupBy(window(col("created_at"),"30 seconds"),"hashtags")\
                 .count()\
                 .orderBy(col("count").desc())\
                 .limit(5)\
                 .writeStream\
                 .format("console")\
                 .outputMode("complete")\
                 .start()\
                 .awaitTermination()

1.5.3: More retweeted accounts

In [None]:
tweets.where(col("rt_user_screen_name").isNotNull())\
            .groupBy("rt_user_screen_name")\
            .count()\
            .orderBy(col("count").desc())\
            .limit(5)\
            .writeStream\
            .format("console")\
            .outputMode("complete")\
            .trigger(processingTime='10 seconds')\
            .start()\
            .awaitTermination()

<a id='2'></a>
## 2. Serving

<a id='2.1'></a>
### 2.1 Start the Kafka service and the producer 
<p>Before you can run this part of the notebook, be sure that you:</p>
<ul>
    <li><p><b>Kafka is running, cheching status</b> in you course environment:
        <br/><em>\$ sudo service kafka status</p></em></li>
    <li><p><b>If is not running, start kafka</b> in you course environment:
        <br/><em>\$ sudo service kafka start</p></em></li>    
    <li><p><b>Start Mariadb service</b> in you course environment:
        <br/><em>\$ sudo service Mariadb start</p></em></li>
     <li><p><b>Create the required table in MariaDB:
        <br/><em>\$ mariadb -u osbdet -p < tweets-db.sql </p></em></li>    
    <li><p><b>Start the producer</b>:
        <br/><em>\$ python3 python3 tweets_producer.py send -m "This is a message from the Python client" -t tweets</em></p></li>
</ul>

<a id='2.2'></a>
### 2.2 Create SparkSession for Streaming processing


By setting this environment variable we can include extra libraries in our Spark cluster.<br/>
We'll take advantage of this step to include that <b>Spark package to connecto to Kafka</b>.

In [1]:
import findspark
findspark.init()

In [2]:
import pandas as pd
pd.set_option('display.max_colwidth', None)

In [3]:
from pyspark.sql.functions import split, col, window
from pyspark.sql.types import StringType,BooleanType,DateType,TimestampType,IntegerType

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.3" --jars "/usr/share/java/mariadb-java-client.jar,/opt/hive3/lib/hive-hcatalog-core-3.1.2.jar" pyspark-shell'

In [4]:
from pyspark.sql.session import SparkSession

spark = (SparkSession.builder
    .appName("Processing and Serving in Streaming")
    .config("spark.sql.warehouse.dir","hdfs://localhost:9000/warehouse")
    .enableHiveSupport()
    .getOrCreate())

Ivy Default Cache set to: /home/osbdet/.ivy2/cache
The jars for the packages stored in: /home/osbdet/.ivy2/jars
:: loading settings :: url = jar:file:/opt/spark3/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-3e5bd420-c6e5-47c5-ad9c-3ad9d6345909;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.3 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.3 in central
	found org.apache.kafka#kafka-clients;2.4.1 in central
	found com.github.luben#zstd-jni;1.4.4-3 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.2 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 311ms :: artifacts dl 7ms
	:: modules in use:

<a id='2.3'></a>
### 2.3 Reading of CSV file

In [5]:
marketing_info = (spark.read.option("inferSchema", "true")
                     .option("header", "true")
                     .csv("/home/osbdet/notebooks/Group_project/Strategies_Activity.csv"))

In [6]:
marketing_info.toPandas()

Unnamed: 0,id_str,Activity,Marketing,followers
0,1499694732007690241,MODERATE-MEDIUM,S6,188
1,1499694731894435843,ADICT,S1,1
2,1499694731617611778,MEDIUM,S8,525
3,1499694731449798658,HIGH,S3,49
4,1499694730808070144,LOW,S10,1280
...,...,...,...,...
127553,1497544615238385664,LOW-LOW,S11,1893
127554,1497544615167098883,MODERATE-MEDIUM,S6,198
127555,1497544615054176258,LOW,S10,1252
127556,1497544614856777733,HIGH,S3,25


<a id='2.4'></a>
### 2.4 Reading tweets from the kafka topic

In [7]:
kafka_input_topic = "tweets"
kafka_group_id = "IE"

tweets_events = (spark.readStream.format("kafka")
                         .option("kafka.bootstrap.servers", "localhost:9092")
                         .option("subscribe", kafka_input_topic)
                         .option("startingOffsets", "latest")
                         .option("kafka.group.id", kafka_group_id)
                         .load() 
                         .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"))

<a id='2.5'></a>
### 2.5 Creation of an specific dataframe

In [8]:
from pyspark.sql.functions import concat, lit

TweetsEvents = (tweets_events.select(split("value",'\|').alias("fields")) \
                             .withColumn("timestamp_str",col("fields").getItem(0)) \
                             .withColumn("created_at",col("timestamp_str").cast(TimestampType()))\
                             .withColumn("lang",col("fields").getItem(11)) \
                             .withColumn("id_str",col("fields").getItem(1)) \
                             .withColumn("user_followers_count",col("fields").getItem(20)) \
                             .where(col("user_followers_count")>=200)
                             .select("created_at", "lang", "id_str", "user_followers_count"))

<a id='2.6'></a>
### 2.6 Join, and launch to MariaDB

In [9]:
TweetsMarketingSuggestion= TweetsEvents.join(marketing_info, "id_str")

We specify where the table is going to be upload, in this case in MariaDB in the table created before with our sql.file launched in step 3.1

In [10]:
def foreach_batch_function(df, epoch_id):
    print ("Batch %d received" % epoch_id)
    
    # databases connection properties
    url = "jdbc:mariadb://localhost:3306/DBS_tweets"
    table = "final_tweets"
    mode = "append"
    props = {"user": "osbdet",
             "password":"osbdet123$", 
             "driver":"org.mariadb.jdbc.Driver"}
    (df.select("created_at", 
              "lang", 
              "id_str", 
              "user_followers_count",
              "Activity",
              "Marketing")
        .write
        .jdbc(url,table,mode,props)
     )

We launch the creation of the info in streaming in MariaDB

In [11]:
query = (TweetsMarketingSuggestion.writeStream\
                               .foreachBatch(lambda df,epochId:foreach_batch_function(df, epochId))).start()   

Batch 0 received
Batch 1 received
Batch 2 received
Batch 3 received
Batch 4 received
Batch 5 received
Batch 6 received
Batch 7 received
Batch 8 received
Batch 9 received
Batch 10 received
Batch 11 received
Batch 12 received
Batch 13 received
Batch 14 received
Batch 15 received
Batch 16 received
Batch 17 received
Batch 18 received
Batch 19 received
Batch 20 received
Batch 21 received
Batch 22 received
Batch 23 received
Batch 24 received
Batch 25 received
Batch 26 received
Batch 27 received
Batch 28 received
Batch 29 received
Batch 30 received
Batch 31 received
Batch 32 received
Batch 33 received
Batch 34 received
Batch 35 received
Batch 36 received
Batch 37 received
Batch 38 received
Batch 39 received
Batch 40 received
Batch 41 received
Batch 42 received
Batch 43 received
Batch 44 received
Batch 45 received
Batch 46 received
Batch 47 received
Batch 48 received
Batch 49 received
Batch 50 received
Batch 51 received
Batch 52 received
Batch 53 received
Batch 54 received
Batch 55 received
Ba

Batch 437 received
Batch 438 received
Batch 439 received
Batch 440 received
Batch 441 received
Batch 442 received
Batch 443 received
Batch 444 received
Batch 445 received
Batch 446 received
Batch 447 received
Batch 448 received
Batch 449 received
Batch 450 received
Batch 451 received
Batch 452 received
Batch 453 received
Batch 454 received
Batch 455 received
Batch 456 received
Batch 457 received
Batch 458 received
Batch 459 received
Batch 460 received
Batch 461 received
Batch 462 received
Batch 463 received
Batch 464 received
Batch 465 received
Batch 466 received
Batch 467 received
Batch 468 received
Batch 469 received
Batch 470 received
Batch 471 received
Batch 472 received
Batch 473 received
Batch 474 received
Batch 475 received
Batch 476 received
Batch 477 received
Batch 478 received
Batch 479 received
Batch 480 received
Batch 481 received
Batch 482 received
Batch 483 received
Batch 484 received
Batch 485 received
Batch 486 received
Batch 487 received
Batch 488 received
Batch 489 re