# Spark Structured Streaming Application

This notebook contains ... TODO



In [None]:
# Install pre-requisites
!pip install ipython-sql psycopg2-binary pyspark==3.2.1

In [None]:
# Load sql magic function
%load_ext sql

In [None]:
from pyspark.sql.types import (
    StructType,
    StructField,
    FloatType,
    StringType,
    LongType,
    IntegerType,
    DecimalType,
)
from pyspark.sql.types import (
    StructType,
    StructField,
    FloatType,
    StringType,
    LongType,
    IntegerType,
    DoubleType,
)
from pyspark.sql.functions import (
    split,
    regexp_replace,
    current_date,
    unix_timestamp,
    lit,
    current_timestamp,
)

from pyspark.sql.functions import col, from_json, struct, to_json
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark import SparkContext, SparkConf
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark import __version__

import pandas as pd
import pickle
import json
import time
import os

print("PySpark", __version__)

In [None]:
# Get kafka broker list from minikube
KAFKA_HOST = os.popen("minikube service kafka-cluster-kafka-external-bootstrap --url -n demo").read()
print("Apache Kafka broker running on:", KAFKA_HOST)

In [None]:
os.environ[
    "PYSPARK_SUBMIT_ARGS"
] = "--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0,org.postgresql:postgresql:42.1.1 pyspark-shell"


APP_NAME = os.getenv("APP_NAME", "spark-streaming-app")
# MASTER = os.getenv("MASTER", "local[*]")
MASTER = "spark://carloshkayser:7077"

spark = (
    SparkSession.builder.appName("Spark Structured Streaming Application")
    .master(MASTER)
    # .config("spark.files", "/home/kayser/git/data-pipelines-with-apache-kafka/spark-ml-training/model")
    .config("spark.archives", "/home/kayser/git/data-pipelines-with-apache-kafka/spark-ml-training/model/spark-logistic-regression-model.zip")
    # .config("spark.metrics.conf", "metrics.properties")
    .getOrCreate()
)

# .config("spark.pyspark.python", "/home/linuxbrew/.linuxbrew/opt/python@3.10/bin/python3")
# .config("spark.pyspark.driver.python", "/home/linuxbrew/.linuxbrew/bin/ipython")

spark.sparkContext.setLogLevel('ERROR')

spark.conf.set("spark.sql.streaming.metricsEnabled", "true")

spark


In [None]:
# from pyspark.sql.streaming import StreamingQueryListener


# class MyListener(StreamingQueryListener):
#     def onQueryStarted(self, event):
#         """
#         Called when a query is started.

#         Parameters
#         ----------
#         event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
#             The properties are available as the same as Scala API.

#         Notes
#         -----
#         This is called synchronously with
#         meth:`pyspark.sql.streaming.DataStreamWriter.start`,
#         that is, ``onQueryStart`` will be called on all listeners before
#         ``DataStreamWriter.start()`` returns the corresponding
#         :class:`pyspark.sql.streaming.StreamingQuery`.
#         Do not block in this method as it will block your query.
#         """
#         print("Query started:" + event.id)

#     def onQueryProgress(self, event):
#         """
#         Called when there is some status update (ingestion rate updated, etc.)

#         Parameters
#         ----------
#         event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
#             The properties are available as the same as Scala API.

#         Notes
#         -----
#         This method is asynchronous. The status in
#         :class:`pyspark.sql.streaming.StreamingQuery` will always be
#         latest no matter when this method is called. Therefore, the status
#         of :class:`pyspark.sql.streaming.StreamingQuery`.
#         may be changed before/when you process the event.
#         For example, you may find :class:`StreamingQuery`
#         is terminated when you are processing `QueryProgressEvent`.
#         """

#         print("Query made progress")
      
#         # getting current batch information in json format
#         jsonData = event.progress.json

#         print(jsonData)
        
#         # # creating dataset based on json for easier parsing and transforming
#         # DF = spark.read.json(spark.createDataset(jsonData))
        
#         # # selecting relevant columns from dataset
#         # DF_flat = DF.select(
#         #     "id",
#         #     "runId",
#         #     "name",
#         #     "timestamp",
#         #     "batchId",
#         #     "numInputRows",
#         #     "inputRowsPerSecond",
#         #     "processedRowsPerSecond",
#         #     "durationMs.latestOffset",
#         #     "durationMs.triggerExecution",
#         #     "sink.description",
#         #     "sink.numOutputRows"
#         # )
        
#         # # transform dataset back to json
#         # DF_flat_json = DF_flat.toJSON
        
#         # # get json as string
#         # DF_flat_json_string = DF_flat_json.select("value").collect().map(_.getString(0)).mkString(" ")      
        
#         # # build and send http post request to power bi push dataset (created upfront)
#         # val url = "PasteEndpointFromPowerBI"      
#         # val client = HttpClientBuilder.create().build()
#         # val post:HttpPost = new HttpPost(url)
#         # post.addHeader("Content-Type", "application/json")
#         # val post_body = new StringEntity("["+DF_flat_json_string+"]")
#         # post.setEntity(post_body)
#         # val response:CloseableHttpResponse = client.execute(post)

#     def onQueryTerminated(self, event):
#         """
#         Called when a query is stopped, with or without error.

#         Parameters
#         ----------
#         event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
#             The properties are available as the same as Scala API.
#         """
#         print("Query terminated:" + event.id)


# my_listener = MyListener()

# spark.streams.addListener(my_listener)
# # spark.streams.removeListener(my_listener)


In [None]:
df_raw = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", KAFKA_HOST)
    .option("subscribe", "to_predict")
    .option("startingOffsets", "latest")
    .load()
)


In [None]:
df_raw.printSchema()


In [None]:
# {
#   "id": 1.0079274744188029e+19,
#   "hour": 14103100,
#   "C1": 1005,
#   "banner_pos": 0,
#   "site_id": "85f751fd",
#   "site_domain": "c4e18dd6",
#   "site_category": "50e219e0",
#   "app_id": "febd1138",
#   "app_domain": "82e27996",
#   "app_category": "0f2161f8",
#   "device_id": "a99f214a",
#   "device_ip": "b72692c8",
#   "device_model": "99e427c9",
#   "device_type": 1,
#   "device_conn_type": 0,
#   "C14": 21611,
#   "C15": 320,
#   "C16": 50,
#   "C17": 2480,
#   "C18": 3,
#   "C19": 299,
#   "C20": 100111,
#   "C21": 61
# }

schema = StructType(
    [
        StructField("id", DecimalType(38, 0), True),
        StructField("hour", IntegerType(), True),
        StructField("C1", IntegerType(), True),
        StructField("banner_pos", IntegerType(), True),
        StructField("site_id", StringType(), True),
        StructField("site_domain", StringType(), True),
        StructField("site_category", StringType(), True),
        StructField("app_id", StringType(), True),
        StructField("app_domain", StringType(), True),
        StructField("app_category", StringType(), True),
        StructField("device_id", StringType(), True),
        StructField("device_ip", StringType(), True),
        StructField("device_model", StringType(), True),
        StructField("device_type", IntegerType(), True),
        StructField("device_conn_type", IntegerType(), True),
        StructField("C14", IntegerType(), True),
        StructField("C15", IntegerType(), True),
        StructField("C16", IntegerType(), True),
        StructField("C17", IntegerType(), True),
        StructField("C18", IntegerType(), True),
        StructField("C19", IntegerType(), True),
        StructField("C20", IntegerType(), True),
        StructField("C21", IntegerType(), True),
    ]
)


In [None]:
df = (
    df_raw.selectExpr("CAST(value AS STRING)")
    .select(from_json("value", schema).alias("data"))
    .select("data.*")
)

df.printSchema()


In [None]:
from pyspark import SparkFiles

pipeline_model_path = SparkFiles.get("spark-logistic-regression-model.zip")
pipeline_model_path

In [None]:
from pyspark.ml import PipelineModel

# Read the model from disk
pipelineModel = PipelineModel.load(pipeline_model_path + "/spark-logistic-regression-model")

# Apply machine learning pipeline to the data
results = pipelineModel.transform(df)

results.printSchema()


In [None]:
results = results.withColumn("processed_at", current_timestamp())

results = (
    results.withColumn("probability", results["probability"].cast("String"))
    .withColumn(
        "probabilityre",
        split(regexp_replace("probability", "^\[|\]", ""), ",")[1].cast(DoubleType()),
    )
    .select("id", "probabilityre", "processed_at")
    .withColumnRenamed("probabilityre", "probability")
)

results_kafka = results.select(
    to_json(struct("id", "probability", "processed_at")).alias("value")
)

results_postgres = results.select(
    "id", "probability", "processed_at"
)

In [None]:
results_kafka.printSchema()

In [None]:
results_postgres.printSchema()


### Logging the data stream in the console

In [None]:
query = results.select("id", "probability", "processed_at") \
    .writeStream \
    .format("console") \
    .start()

time.sleep(10)



In [None]:
query.stop()

### Inserting data stream transformation results into another Apache Kafka topic

In [None]:
query = results_kafka.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_HOST) \
    .option("topic", "predictions") \
    .option("checkpointLocation", "checkpointLocation") \
    .start()

In [None]:
stop

In [None]:
!kafka-console-consumer --bootstrap-server $(minikube service kafka-cluster-kafka-external-bootstrap --url -n demo) --topic predictions

In [None]:
dir(query)

In [None]:
query.status

In [None]:
type(query.recentProgress)

In [None]:
len(query.recentProgress)

In [24]:
query.lastProgress

In [None]:
statistics = {}

while True:

  last = query.lastProgress
  
  if last["batchId"] not in statistics.keys():
    statistics[last["batchId"]] = last

In [None]:
# save statistics to json file
with open('statistics-40replicas-0second.json', 'w') as f:
    json.dump(statistics, f)


In [None]:
import time

for i in range(100):

  data = query.lastProgress
  
  print("inputRowsPerSecond: ", data["inputRowsPerSecond"])
  print("processedRowsPerSecond:", data["processedRowsPerSecond"])
  print("\n")

  time.sleep(5)

In [None]:
query.stop()

### Inserting data stream into PostgreSQL database

In [None]:
# Create a PostgreSQL database with Docker
!docker run -d -e POSTGRES_PASSWORD=postgres -p 5432:5432 --name postgres postgres:11.7-alpine

In [None]:
# Get PostgreSQL logs
!docker logs postgres

In [None]:
%%sql postgresql://postgres:postgres@localhost:5432/postgres

CREATE TABLE predictions (
	id DECIMAL(38, 0),
	probability DOUBLE PRECISION,
	processed_at TIMESTAMP
);

In [None]:
def foreach_batch_function(df, epoch_id):

    df.write.format("jdbc").option(
        "url", "jdbc:postgresql://localhost:5432/postgres"
    ).option("driver", "org.postgresql.Driver").option("dbtable", "predictions").option(
        "user", "postgres"
    ).option(
        "password", "postgres"
    ).mode(
        "append"
    ).save()

query = results_postgres \
    .writeStream \
    .foreachBatch(foreach_batch_function) \
    .option("checkpointLocation", "checkpointLocation") \
    .start()


In [None]:
%%sql postgresql://postgres:postgres@localhost:5432/postgres

SELECT COUNT(*) FROM PREDICTIONS;


In [None]:
%%sql postgresql://postgres:postgres@localhost:5432/postgres

SELECT
	*
FROM 
	PREDICTIONS
ORDER BY
	PROCESSED_AT DESC
LIMIT 10;


In [None]:
# Stop data streams
query.stop()