# Prerrequisites

Installing Spark and Apache Kafka Library in VM


---



In [1]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar xf spark-3.1.2-bin-hadoop3.2.tgz
!pip -q install findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages io.delta:delta-core_2.12:0.8.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog pyspark-shell'

In [3]:
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# create the session
spark = SparkSession \
        .builder \
        .master("local[*]") \
        .config("spark.ui.port", "4050") \
        .getOrCreate()

spark.version

'3.1.2'

Creating ngrok tunnel to allow Spark UI (Optional)
**Only 20 connections/minute!!!**

In [5]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels | python3 -c \
    "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

--2021-10-15 14:26:28--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 54.237.133.81, 18.205.222.128, 52.202.168.65, ...
Connecting to bin.equinox.io (bin.equinox.io)|54.237.133.81|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13832437 (13M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip’


2021-10-15 14:26:29 (44.5 MB/s) - ‘ngrok-stable-linux-amd64.zip’ saved [13832437/13832437]

Archive:  ngrok-stable-linux-amd64.zip
  inflating: ngrok                   
https://5c10-35-231-5-119.ngrok.io


# Structured Streaming with Apache Kafka

## Example 1

Reading a Kafka topic in AWS.
Before executing this code, replace `kafka:9092` by the right bootstrap server

In [67]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "ec2-3-91-30-167.compute-1.amazonaws.com:9092") \
  .option("subscribe", "twitter") \
  .load()
  
schema = StructType(
    [
        StructField('id', StringType(), True),
        StructField('timestamp_ms', StringType(), True),
        StructField('user', StringType(), True),
        StructField('geo', StringType(), True),
        StructField('play', StringType(), True),
        StructField('text', StringType(), True)
    ]
)
df.printSchema()

dataset = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp") \
    .withColumn("value", from_json("value", schema)) \
    .select(col('key'), col("timestamp"), col('value.*'))

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [68]:
dataset.writeStream \
 .outputMode("append") \
 .format("memory") \
 .option("truncate", "false") \
 .queryName("tweets_topic") \
 .start()

<pyspark.sql.streaming.StreamingQuery at 0x7fa9dea588d0>

In [69]:
spark.sql("select id, text from tweets_topic").show(10, False)

+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------+
|id                 |text                                                                                                                                        |
+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------+
|1449028372730941444|RT @realLizUSA: The Pima County Fish Tail 

President Trump outperformed Republicans 3% on mail-in ballots

Biden underperformed Democrats… |
|1449028372995063809|@mistymeadow22 @ConservativeMMT @POTUS There are always two sides to a story. I can explain accurately and fairly w… https://t.co/sCb1nvJBL9|
|1449028375151054852|RT @jonathanalter: At least 97 percent of other nations prefer Biden and Blinken to Trump and Pompeo. Russia, Hungary and one or two others…|
|1449028374618378241|R

## Exercise 1

Apply a sliding window each minute, 5 minutes of duration, grouping by id

---



In [16]:
from pyspark.sql.functions import col, window

dataset.groupBy(window(col("timestamp"), "5 minutes", "1 minutes"), col("id")) \
    .count() \
    .writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("tweets_windowed") \
    .start()

<pyspark.sql.streaming.StreamingQuery at 0x7fa9ded77390>

In [53]:
spark.sql("select * from tweets_windowed order by count desc").show(20, False)

+------------------------------------------+-------------------+-----+
|window                                    |id                 |count|
+------------------------------------------+-------------------+-----+
|{2021-10-15 14:51:00, 2021-10-15 14:56:00}|1449025109306511366|1    |
|{2021-10-15 14:50:00, 2021-10-15 14:55:00}|1449025363665948696|1    |
|{2021-10-15 14:50:00, 2021-10-15 14:55:00}|1449025405080457217|1    |
|{2021-10-15 14:56:00, 2021-10-15 15:01:00}|1449026373163573251|1    |
|{2021-10-15 14:50:00, 2021-10-15 14:55:00}|1449025459749064737|1    |
|{2021-10-15 14:51:00, 2021-10-15 14:56:00}|1449025255473877029|1    |
|{2021-10-15 14:48:00, 2021-10-15 14:53:00}|1449025234284253212|1    |
|{2021-10-15 14:55:00, 2021-10-15 15:00:00}|1449026317987430401|1    |
|{2021-10-15 14:52:00, 2021-10-15 14:57:00}|1449025873449865224|1    |
|{2021-10-15 14:52:00, 2021-10-15 14:57:00}|1449026348475944994|1    |
|{2021-10-15 14:47:00, 2021-10-15 14:52:00}|1449025175647883289|1    |
|{2021

## Exercise 2

Each minute, get the number of tweets received in last 5 minutes

---



In [None]:
dataset.groupBy(window(col("timestamp"), "5 minutes", "1 minutes")) \
    .count() \
    .writeStream \
    .outputMode("update") \
    .format("memory") \
    .queryName("tweets_windowed_2") \
    .start()

<pyspark.sql.streaming.StreamingQuery at 0x7fad81f6b630>

In [None]:
spark.sql("select * from tweets_windowed_2 order by count desc").show(50, False)

+------------------------------------------+-----+
|window                                    |count|
+------------------------------------------+-----+
|[2021-02-18 08:22:00, 2021-02-18 08:27:00]|166  |
|[2021-02-18 08:25:00, 2021-02-18 08:30:00]|166  |
|[2021-02-18 08:23:00, 2021-02-18 08:28:00]|166  |
|[2021-02-18 08:24:00, 2021-02-18 08:29:00]|166  |
|[2021-02-18 08:21:00, 2021-02-18 08:26:00]|138  |
|[2021-02-18 08:26:00, 2021-02-18 08:31:00]|28   |
|[2021-02-18 08:22:00, 2021-02-18 08:27:00]|3    |
|[2021-02-18 08:23:00, 2021-02-18 08:28:00]|3    |
|[2021-02-18 08:24:00, 2021-02-18 08:29:00]|3    |
|[2021-02-18 08:25:00, 2021-02-18 08:30:00]|3    |
|[2021-02-18 08:21:00, 2021-02-18 08:26:00]|3    |
+------------------------------------------+-----+



## Exercise 3

Get tweets containing the word `Trump` in 1 minute slots

---



In [77]:
from pyspark.sql.functions import lower

dataset = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp") \
    .withColumn("value", from_json("value", schema)) \
    .select(col('key'), col("timestamp"), col('value.*')) \
    .filter(lower(col('text')).contains("Biden"))

dataset.groupBy(window(col("timestamp"), "1 minutes")) \
    .count() \
    .writeStream \
    .outputMode("update") \
    .format("memory") \
    .queryName("tweets_windowed_biden") \
    .start()

<pyspark.sql.streaming.StreamingQuery at 0x7fa9de9eab90>

In [83]:
spark.sql("select * from tweets_windowed_biden").show(10, False)

+------+-----+
|window|count|
+------+-----+
+------+-----+



In [84]:
for query in spark.streams.active:
  query.stop()