<img width="200" style="float:left" 
     src="https://upload.wikimedia.org/wikipedia/commons/f/f3/Apache_Spark_logo.svg" /> 

<img style="float:left;" 
     src="https://secemu.org/wp-content/uploads/2016/02/Twitter-Banner-1024x385.png" />   

# Sections
* [Description](#0)
* [1. Setup](#1) 
  * [1.1 Start the Kafka service and the producer](#1.1)
  * [1.2 Search for Spark Installation](#1.2)
  * [1.3 Create SparkSession](#1.3)
* [2. Use case](#2) 
  * [2.1 Main DataFrame creation](#2.1)
  * [2.2 Map the sequence of bytes to a proper JSON document and display contents](#2.2)
* [3. Finalize the exercise](#3)
  * [3.1 Stop the Spark Streaming application](#3.1)
  * [3.2 Stop the Kafka producer](#3.2)
  * [3.3 Stop the Kafka service](#3.3)

<a id='0'></a>
## Description
<p>
<div>This notebook will help you address the following:</div>
<ul>    
    <li>Consume events from a Kafka topic called <em>tweets</em></li>
    <li>Translate tweets ingested in raw format (sequence of bytes) into the proper JSON format</li>
    <li>See how to manipulate tweets - use your imagination here to implement a use case you like</li>
    <li>Display the results of the manipulation on the consule; continue the journey of the real-time solution by yourself (ex. publish results in Kafka, MariaDB, MongoDB, ...)</li>
</ul>    
</p>

<a id='1'></a>
## 1. Setup

<a id='1.1'></a>
### 1.1 Start the Kafka service and the producer 
<p>Before you can run this notebook, be sure that you log into the course environment and:</p>
<ul>
    <li><p><b>Start the Kafka service</b>:
        <br/><em>\$ sudo service kafka start</p></em></li>
    <li><b>Install the required libraries</b> entailed in the <b>requirements.txt</b> file:
        <br/><em>\$ pip install -r requirements.txt</p></em></li>
    <li>Add your <b>API key</b>, <b>API secret</b>, <b>access token</b> and <b>access secret</b> to the <em>credentials.ini</em> file</li>
    <li><p><b>Start the producer</b> connecting to Twitter and filtering tweets by keywords or hashtags:
        <br/><em>\$ python3 twitter_producer.py credentials.ini "btc,#eth,ada" -b localhost:9092 -t tweets</em></p></li>
</ul>

<a id='1.2'></a>
### 1.2 Search for Spark Installation 
This step is required just because we are working in the course environment.

In [1]:
import findspark
findspark.init()

<a id='1.3'></a>
### 1.3 Create SparkSession

In addition to create the Spark Session, we're going to set up a variable environment to include extra libraries in our "cluster".<br/>
In this case we're including the Spark package as our job will connect to Kafka.

In [2]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.3" pyspark-shell'

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

spark = SparkSession.builder.appName("Twitter").getOrCreate()

Ivy Default Cache set to: /home/osbdet/.ivy2/cache
The jars for the packages stored in: /home/osbdet/.ivy2/jars
:: loading settings :: url = jar:file:/opt/spark3/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-9bd983f5-50c7-4449-bc22-c8eeb164314b;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.3 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.3 in central
	found org.apache.kafka#kafka-clients;2.4.1 in central
	found com.github.luben#zstd-jni;1.4.4-3 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.2 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 2318ms :: artifacts dl 69ms
	:: modules in us

<a id='2'></a>
## 2. Use Case

<a id='2.1'></a>
### 2.1 Main DataFrame creation
<p>Have a look at the schema you get by default when you create a DataFrame on top of a Kafka topic.<br/> The <b><em>value field</b></em> is the one containing the data from the <em>ingestion layer</em>, the Twitter producer in our case.</p>
<p>Bear in mind thataAs <em>we're simplifying things</em>, we're not relying on schemas and <b>we're sending sequence of bytes to Kafka topics</b>.</p>
<p>Later in the notebook, we're going to convert that sequence of bytes in a proper JSON document representing every tweet as it was received.</p>

In [3]:
rawTweetsDF = spark.readStream \
                   .format("kafka") \
                   .option("kafka.bootstrap.servers", "localhost:9092") \
                   .option("subscribe", "tweets") \
                   .option("startingOffsets", "latest") \
                   .load()
rawTweetsDF

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

<a id='2.2'></a>
### 2.2 Map the sequence of bytes to a proper JSON document and display contents
We're going to apply the following logic to the events we get from the topic:
<ol>
    <li>Define the schema that matches the raw sequence of bytes we get from the topic.</li>
    <li>Cast the default data type of the field <em>value</em> (byte) to the String data type.</li>
    <li>Convert the String into a proper JSON document by using the <em>from_json</em> function.</li>
    <li>Flatten the JSON file and display event time, user name, text and the id.</li>
    <li>Display the results in the console.</li>
</ol>

Watch the schema we get now, it looks like any other DataFrame we've seen up until now... this is <b>a real unified processing framework</b>.

In [4]:
# 1. Schema of a tweet coming from Twitter.

tweet_schema="""
created_at string,
id bigint,
id_str string,
text string,
source string,
truncated boolean,
in_reply_to_status_id bigint,
in_reply_to_status_id_str string,
in_reply_to_user_id bigint,
in_reply_to_user_id_str string,
in_reply_to_screen_name string,
`user` struct<
            id:bigint,
            id_str:string,
            name:string,
            screen_name:string,
            location:string,
            url:string,
            description:string,
            protected:boolean,
            verified:boolean,
            followers_count:bigint,
            friends_count:bigint,
            listed_count:bigint,
            favourites_count:bigint,
            statuses_count:bigint,
            created_at:string,
            profile_banner_url:string,
            profile_image_url_https:string,
            default_profile:boolean,
            default_profile_image:boolean,
            withheld_in_countries: array<string>,
            withheld_scope:string,
            geo_enabled:boolean
            >,
coordinates struct <
            coordinates:array<float>,
            type:string
            >,
place struct<
            country:string,
            country_code:string,
            full_name:string,
            place_type:string,
            url:string
            >,
quoted_status_id bigint,
quoted_status_id_str string,
is_quote_status boolean,
quote_count bigint,
reply_count bigint,
retweet_count bigint,
favorite_count bigint,
entities struct<
            user_mentions:array<struct<screen_name:string>>,
            hashtags:array<struct<text:string>>, 
            media:array<struct<expanded_url:string>>, 
            urls:array<struct<expanded_url:string>>, 
            symbols:array<struct<text:string>>
            >,
favorited boolean,
retweeted boolean,
possibly_sensitive boolean,
filter_level string,
lang string
"""

In [5]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.functions import from_json, col

# 2. Cast the default data type of the field value (byte) to the String data type.
# 3. Convert the String into a proper JSON document by using the from_json function.
# 4. Flatten the JSON file and display event time, user name, text and the id.

tweetsDF = rawTweetsDF.selectExpr("CAST(value AS STRING)") \
                      .select(from_json(col("value"), tweet_schema).alias("data")) \
                      .select(col("data.created_at").alias("event_time"), 
                              col("data.user.screen_name"),
                              col("data.text"),
                              col("data.id"))
tweetsDF

DataFrame[event_time: string, screen_name: string, text: string, id: bigint]

In [7]:
# 5. Display the results in the console.

tweetsDF.writeStream \
        .format("console") \
        .outputMode("append") \
        .start() \
        .awaitTermination()


-------------------------------------------
Batch: 10
-------------------------------------------
+--------------------+---------------+----------------------+-------------------+
|          event_time|    screen_name|                  text|                 id|
+--------------------+---------------+----------------------+-------------------+
|Sat Feb 26 13:58:...|       Anjey561|  #CMN #cmnnews #cm...|1497571887576236039|
|Sat Feb 26 13:58:...|     donqsj3001|  RT @RealPabloHema...|1497571887999864835|
|Sat Feb 26 13:58:...|          x_pca|RT @dkcrypto1: 今ま...|1497571889648402434|
|Sat Feb 26 13:58:...|   ArabeskToken|  RT @CRYPTOKOLIK14...|1497571889639829508|
|Sat Feb 26 13:58:...|CryptoL24237730|  I am very thankfu...|1497571889425772547|
|Sat Feb 26 13:58:...|     SUPER_SREK|  RT @IlseCrypto: I...|1497571889857720323|
|Sat Feb 26 13:58:...|      topkripto|  RT @happycoinnews...|1497571890352640000|
|Sat Feb 26 13:58:...|ramazan34678293|  @ethmaster089 @84...|1497571890428272641|
+-

                                                                                

-------------------------------------------
Batch: 14
-------------------------------------------
-------------------------------------------
Batch: 3
-------------------------------------------
+--------------------+--------------+--------------------+-------------------+
|          event_time|   screen_name|                text|                 id|
+--------------------+--------------+--------------------+-------------------+
|Sat Feb 26 13:59:...|NANDISHPATEL11| $BTC pos vibes :/ ️|1497571903539916803|
|Sat Feb 26 13:59:...|      Rezak611|RT @genelpatron01...|1497571903736791042|
|Sat Feb 26 13:59:...|     mimi_nyce|We have the oppor...|1497571904714125314|
|Sat Feb 26 13:59:...|  Bullish_Army|#BTC is going to ...|1497571904412012548|
+--------------------+--------------+--------------------+-------------------+

+--------------------+--------------+--------------------+-------------------+
|          event_time|   screen_name|                text|                 id|
+-------------

[Stage 21:>                                                         (0 + 1) / 1]                                                                                

-------------------------------------------
Batch: 5
-------------------------------------------
-------------------------------------------
Batch: 16
-------------------------------------------
+--------------------+------------+--------------------+-------------------+
|          event_time| screen_name|                text|                 id|
+--------------------+------------+--------------------+-------------------+
|Sat Feb 26 13:59:...|   vandarkk7|Check out my NFT ...|1497571910103846913|
|Sat Feb 26 13:59:...|CryptoCapo_7|RT @CryptoCapo_: ...|1497571912666554373|
+--------------------+------------+--------------------+-------------------+

+--------------------+------------+--------------------+-------------------+
|          event_time| screen_name|                text|                 id|
+--------------------+------------+--------------------+-------------------+
|Sat Feb 26 13:59:...|   vandarkk7|Check out my NFT ...|1497571910103846913|
|Sat Feb 26 13:59:...|CryptoCapo_7

-------------------------------------------
Batch: 23
-------------------------------------------
-------------------------------------------
Batch: 12
-------------------------------------------
+--------------------+------------+--------------------+-------------------+
|          event_time| screen_name|                text|                 id|
+--------------------+------------+--------------------+-------------------+
|Sat Feb 26 13:59:...|  lele_fefef|OPEN FOR BIDS #ra...|1497571932430114823|
|Sat Feb 26 13:59:...|eisenhiem143|Market is looking...|1497571934221275137|
|Sat Feb 26 13:59:...|  selvi_asih|RT @IlseCrypto: I...|1497571934825234434|
+--------------------+------------+--------------------+-------------------+

+--------------------+------------+--------------------+-------------------+
|          event_time| screen_name|                text|                 id|
+--------------------+------------+--------------------+-------------------+
|Sat Feb 26 13:59:...|  lele_fefe

-------------------------------------------
Batch: 20
-------------------------------------------
-------------------------------------------
Batch: 30
-------------------------------------------
+--------------------+---------------+--------------------+-------------------+
|          event_time|    screen_name|                text|                 id|
+--------------------+---------------+--------------------+-------------------+
|Sat Feb 26 13:59:...|       donzain6|@metashibaglobal ...|1497571955788103680|
|Sat Feb 26 13:59:...|  Money40750905|RT @MNFT0: 800€ P...|1497571956257918976|
|Sat Feb 26 13:59:...|TeknikA27204282|"Bilgi için: bors...|1497571956857704450|
+--------------------+---------------+--------------------+-------------------+

+--------------------+---------------+--------------------+-------------------+
|          event_time|    screen_name|                text|                 id|
+--------------------+---------------+--------------------+-------------------+
|Sa

-------------------------------------------
Batch: 38
-------------------------------------------
-------------------------------------------
Batch: 28
-------------------------------------------
+--------------------+-----------+--------------------+-------------------+
|          event_time|screen_name|                text|                 id|
+--------------------+-----------+--------------------+-------------------+
|Sat Feb 26 13:59:...|monokumaman|RT @papayapaya_FX...|1497571974733697034|
+--------------------+-----------+--------------------+-------------------+

+--------------------+-----------+--------------------+-------------------+
|          event_time|screen_name|                text|                 id|
+--------------------+-----------+--------------------+-------------------+
|Sat Feb 26 13:59:...|monokumaman|RT @papayapaya_FX...|1497571974733697034|
+--------------------+-----------+--------------------+-------------------+

------------------------------------------

KeyboardInterrupt: 

+--------------------+-----------+--------------------+-------------------+
|          event_time|screen_name|                text|                 id|
+--------------------+-----------+--------------------+-------------------+
|Sat Feb 26 13:59:...|MotinMaliha|#Orijin #Polygon ...|1497571988889485317|
|Sat Feb 26 13:59:...|MrEmreTrade|Piyasayı yapanlar...|1497571989501853704|
+--------------------+-----------+--------------------+-------------------+



<a id='3'></a>
## 3. Finalize the exercise

It's always good to terminate things properly to avoid any kind of corruption.

<a id='3.1'></a>
### 3.1 Stop the Spark Streaming application
In order to stop the Spark Streaming application go to **Kernel -> Shutdown**, that's it!

<a id='3.2'></a>
### 3.2 Stop the Kafka producer
Go to the terminal where you started the producer and **press Ctrl + C**, it's that easy!

<a id='3.3'></a>
### 3.3 Stop the Kafka service
<p>Go to a terminal windows and type the following command:</p>
<em>$ sudo service kafka stop</em>