# Part 2: Streaming application using Spark Structured Streaming

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 pyspark-shell'

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.functions import explode
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.ml import PipelineModel
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, DoubleType, LongType, TimestampType
from pyspark.ml.linalg import VectorUDT



1. Write code to create a SparkSession with the following requirements: 1) use four cores with a proper application name; 2) Melbourne timezone; 3) a checkpoint location has been set.


In [5]:
spark = SparkSession \
    .builder \
    .appName("assignment2ab") \
    .master("local[4]") \
    .config("spark.sql.session.timeZone", "Australia/Melbourne") \
    .config("spark.sql.streaming.checkpointLocation", "checkpoint/ck") \
    .getOrCreate()

2. Similar to assignment 2A, write code to define the data schema for the data files, following the data types suggested in the metadata file. Load the static datasets(previous_application_static and value_dict) into data frames. (You can reuse your code from 2A.)


In [6]:
# code use from previous assignment
prev_data_schema = StructType([
    StructField("id_app", IntegerType(), True),
    StructField("contract_type", IntegerType(), True),
    StructField("amt_annuity", FloatType(), True),
    StructField("amt_application", FloatType(), True),
    StructField("amt_credit", FloatType(), True),
    StructField("amt_down_payment", FloatType(), True),
    StructField("amt_goods_price", FloatType(), True),
    StructField("hour_appr_process_start", IntegerType(), True),
    StructField("rate_down_payment", FloatType(), True),
    StructField("rate_interest_primary", FloatType(), True),
    StructField("rate_interest_privileged", FloatType(), True),
    StructField("name_cash_loan_purpose", StringType(), True),
    StructField("name_contract_status", StringType(), True),
    StructField("days_decision", IntegerType(), True),
    StructField("name_payment_type", StringType(), True),
    StructField("code_reject_reason", StringType(), True),
    StructField("name_type_suite", StringType(), True),
    StructField("name_client_type", StringType(), True),
    StructField("name_goods_category", StringType(), True),
    StructField("name_portfolio", StringType(), True),
    StructField("name_product_type", StringType(), True),
    StructField("channel_type", StringType(), True),
    StructField("sellerplace_area", IntegerType(), True),
    StructField("name_seller_industry", StringType(), True),
    StructField("cnt_payment", FloatType(), True),
    StructField("name_yield_group", StringType(), True),
    StructField("product_combination", StringType(), True),
    StructField("days_first_drawing", FloatType(), True),
    StructField("days_first_due", FloatType(), True),
    StructField("days_last_due_1st_version", FloatType(), True),
    StructField("days_last_due", FloatType(), True),
    StructField("days_termination", FloatType(), True),
    StructField("nflag_insured_on_approval", FloatType(), True),
    StructField("id",IntegerType() , True)
])

In [7]:
pre_data = spark.read.csv("previous_application_static.csv",header = True, schema=prev_data_schema)

In [8]:
value_dic_data = spark.read.csv("value_dict.csv", header = True, inferSchema = True)

3. Using the Kafka topic from the producer in Task 1, read the streaming data with Spark Streaming, assuming all data comes in the String format. Except for the 'ts' column, you shall receive it as an Int type.



In [9]:
topic = 'test'
#configuration
hostip = "host.docker.internal"

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", f"{hostip}:9092") \
    .option("subscribe", topic) \
    .load()

In [10]:
# check the stream data
query1 = df\
            .writeStream\
            .outputMode("append")\
            .format("console")\
            .trigger(processingTime= "5 seconds")\
            .start()

In [11]:
query1.stop()

In [12]:
df = df.selectExpr("CAST(value AS STRING)")


In [13]:
query2 = df\
            .writeStream\
            .outputMode("append")\
            .format("console")\
            .trigger(processingTime= "5 seconds")\
            .start()

In [14]:
query2.stop()

In [15]:
app_data_schema = ArrayType(StructType([
    StructField("id_app", StringType(), True),
    StructField("target", StringType(), True),
    StructField("contract_type", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("own_car", StringType(), True),
    StructField("own_property", StringType(), True),
    StructField("num_of_children", StringType(), True),
    StructField("income_total", StringType(), True),
    StructField("amt_credit", StringType(), True),
    StructField("amt_annuity", StringType(), True),
    StructField("amt_goods_price", StringType(), True),
    StructField("income_type", StringType(), True),
    StructField("education_type", StringType(), True),
    StructField("family_status", StringType(), True),
    StructField("housing_type", StringType(), True),
    StructField("region_population_relative", StringType(), True),
    StructField("days_birth", StringType(), True),
    StructField("days_employed", StringType(), True),
    StructField("own_car_age", StringType(), True),
    StructField("flag_mobile", StringType(), True),
    StructField("flag_emp_phone", StringType(), True),
    StructField("flag_work_phone", StringType(), True),
    StructField("flag_cont_mobile", StringType(), True),
    StructField("flag_phone", StringType(), True),
    StructField("flag_email", StringType(), True),
    StructField("occupation_type", StringType(), True),
    StructField("cnt_fam_members", StringType(), True),
    StructField("weekday_app_process_start", StringType(), True),
    StructField("hour_app_process_start", StringType(), True),
    StructField("organization_type", StringType(), True),
    StructField("credit_score_1", StringType(), False),
    StructField("credit_score_2", StringType(), False),
    StructField("credit_score_3", StringType(), False),
    StructField("days_last_phone_change", StringType(), True),
    StructField("amt_credit_req_last_hour", StringType(), True),
    StructField("amt_credit_req_last_day",StringType(), True),
    StructField("amt_credit_req_last_week", StringType(), True),
    StructField("amt_credit_req_last_month", StringType(), True),
    StructField("amt_credit_req_last_quarter", StringType(), True),
    StructField("amt_credit_req_last_year", StringType(), True),
    StructField("ts", IntegerType(), True)
]))

In [16]:
df=df.select(F.from_json(F.col("value").cast("string"),app_data_schema).alias('parsed_value'))


4. Then, transform the streaming data format into proper types following the metadata file schema, similar to assignment 2A. Perform the following tasks:  
a) For the 'ts' column, convert it to the timestamp format, we will use it as event_time.  
b) If the data is late for more than 1 minute, discard it.




In [17]:
df = df.select(F.explode(F.col("parsed_value")).alias('unnested_value'))  


In [18]:
df_formatted = df.select(
    F.col("unnested_value.id_app").cast(IntegerType()).alias("id_app"),
    F.col("unnested_value.target").cast(IntegerType()).alias("target"),
    F.col("unnested_value.contract_type").cast(IntegerType()).alias("contract_type"),
    F.col("unnested_value.gender").cast(StringType()).alias("gender"),
    F.col("unnested_value.own_car").cast(StringType()).alias("own_car"),
    F.col("unnested_value.own_property").cast(StringType()).alias("own_property"),
    F.col("unnested_value.num_of_children").cast(IntegerType()).alias("num_of_children"),
    F.col("unnested_value.income_total").cast(FloatType()).alias("income_total"),
    F.col("unnested_value.amt_credit").cast(FloatType()).alias("amt_credit"),
    F.col("unnested_value.amt_annuity").cast(FloatType()).alias("amt_annuity"),
    F.col("unnested_value.amt_goods_price").cast(FloatType()).alias("amt_goods_price"),
    F.col("unnested_value.income_type").cast(IntegerType()).alias("income_type"),
    F.col("unnested_value.education_type").cast(IntegerType()).alias("education_type"),
    F.col("unnested_value.family_status").cast(IntegerType()).alias("family_status"),
    F.col("unnested_value.housing_type").cast(IntegerType()).alias("housing_type"),
    F.col("unnested_value.region_population_relative").cast(FloatType()).alias("region_population_relative"),
    F.col("unnested_value.days_birth").cast(IntegerType()).alias("days_birth"),
    F.col("unnested_value.days_employed").cast(IntegerType()).alias("days_employed"),
    F.col("unnested_value.own_car_age").cast(FloatType()).alias("own_car_age"),
    F.col("unnested_value.flag_mobile").cast(IntegerType()).alias("flag_mobile"),
    F.col("unnested_value.flag_emp_phone").cast(IntegerType()).alias("flag_emp_phone"),
    F.col("unnested_value.flag_work_phone").cast(IntegerType()).alias("flag_work_phone"),
    F.col("unnested_value.flag_cont_mobile").cast(IntegerType()).alias("flag_cont_mobile"),
    F.col("unnested_value.flag_phone").cast(IntegerType()).alias("flag_phone"),
    F.col("unnested_value.flag_email").cast(IntegerType()).alias("flag_email"),
    F.col("unnested_value.occupation_type").cast(IntegerType()).alias("occupation_type"),
    F.col("unnested_value.cnt_fam_members").cast(FloatType()).alias("cnt_fam_members"),
    F.col("unnested_value.weekday_app_process_start").cast(StringType()).alias("weekday_app_process_start"),
    F.col("unnested_value.hour_app_process_start").cast(IntegerType()).alias("hour_app_process_start"),
    F.col("unnested_value.organization_type").cast(IntegerType()).alias("organization_type"),
    F.col("unnested_value.credit_score_1").cast(FloatType()).alias("credit_score_1"),
    F.col("unnested_value.credit_score_2").cast(FloatType()).alias("credit_score_2"),
    F.col("unnested_value.credit_score_3").cast(FloatType()).alias("credit_score_3"),
    F.col("unnested_value.days_last_phone_change").cast(FloatType()).alias("days_last_phone_change"),
    F.col("unnested_value.amt_credit_req_last_hour").cast(FloatType()).alias("amt_credit_req_last_hour"),
    F.col("unnested_value.amt_credit_req_last_day").cast(FloatType()).alias("amt_credit_req_last_day"),
    F.col("unnested_value.amt_credit_req_last_week").cast(FloatType()).alias("amt_credit_req_last_week"),
    F.col("unnested_value.amt_credit_req_last_month").cast(FloatType()).alias("amt_credit_req_last_month"),
    F.col("unnested_value.amt_credit_req_last_quarter").cast(FloatType()).alias("amt_credit_req_last_quarter"),
    F.col("unnested_value.amt_credit_req_last_year").cast(FloatType()).alias("amt_credit_req_last_year"),
    F.col("unnested_value.ts").cast("timestamp").alias("event_time")
)

In [19]:
query3 = df_formatted.writeStream\
           .format("console")\
           .outputMode("append")\
           .trigger(processingTime= "5 seconds")\
           .start()

In [20]:
query3.stop()

5. Join the static data frames with the streaming data frame, perform data type/column conversion according to your ML model and print out the Schema. (Again, you can reuse code from 2A).


In [21]:
# the code reused from 2 A
df_formatted = df_formatted\
                .withColumn("loan_to_income_ratio", F.round(F.col("amt_credit") / F.col("income_total"),2))

In [24]:
def age_bucket (age):
    if age < 25:
        return "Y"
    elif age < 35:
        return "E" 
    elif age< 45:
        return "M"
    elif age < 55:
        return "L"
    elif age< 65:
        return "N"
    else :
        return "R"

age_bucket_udf = F.udf(age_bucket, StringType())

df_formatted = df_formatted.withColumn("age", F.floor(F.abs(F.col("days_birth"))/365.25)) \
    .withColumn("age_bucket", age_bucket_udf(F.col("age")))\
    .drop("age")



In [26]:
def credit_worthiness(score):
    if score >= 0.7:
        return "high"
    elif score >= 0.4:
        return "medium"
    else:
        return "low"
    
udf_credit_worthiness = F.udf(credit_worthiness, StringType())

colum_fill_null = ['credit_score_1','credit_score_2','credit_score_3']
for columnName in colum_fill_null:
    df_formatted = df_formatted.withColumn(columnName, F.coalesce(F.col(columnName), F.lit(0.5)))

df_formatted = df_formatted.withColumn("credit_worthiness", 
                       (F.col("credit_score_1") + F.col("credit_score_2") + F.col("credit_score_3"))/3)\
                        .withColumn("credit_worthiness", udf_credit_worthiness(F.col("credit_worthiness")))



In [28]:
df_prev_app = pre_data.groupBy("id_app") \
    .agg(
        F.count("*").alias("num_of_prev_app"),
        F.sum(F.when(F.col("name_contract_status") == "Approved", 1).otherwise(0)).alias("num_of_approved_app"),
        F.sum(F.when(F.col("name_contract_status") == "Approved", F.col("amt_credit")).otherwise(0)).alias("total_credit")
    )

df_joined = df_formatted.join(df_prev_app, "id_app", how = "left")
# Replace null value with 0, because we still need to do the prediction if a new customer applies for a loan.
need_fill_null = ["num_of_prev_app","num_of_approved_app","total_credit"]
for columnName in need_fill_null:
    df_joined = df_joined\
                            .withColumn(columnName, F.coalesce(F.col(columnName), F.lit(0)))
df_joined = df_joined\
                        .withColumn("total_credit_to_income_ratio", F.col("total_credit")/ F.col("income_total"))


In [31]:
value_to_key_map = spark.sparkContext.broadcast(
    {(row['category'], str(row['value'])): row['key'] for row in value_dic_data.collect()}
)

def replace_value_with_key(category, column_value):
    return value_to_key_map.value.get((category, str(column_value)), column_value)

udf_replace_value_with_key = F.udf(replace_value_with_key, StringType())
columns_to_replace = ["education_type", "occupation_type", "income_type", "family_status"]

for column in columns_to_replace:
    df_joined = df_joined\
                            .withColumn(column, udf_replace_value_with_key(F.lit(column), F.col(column)))

In [32]:
# the necessary features I need for the model
features = ['id_app','days_birth', 'loan_to_income_ratio', 'num_of_prev_app', 'num_of_approved_app', 'total_credit', 'total_credit_to_income_ratio', 'gender', 'own_car', 'own_property', 'income_type', 'education_type', 'family_status', 'occupation_type', 'age_bucket', 'credit_worthiness','event_time']
df_joined = df_joined.select(features)

In [33]:
df_joined.printSchema()

root
 |-- id_app: integer (nullable = true)
 |-- days_birth: integer (nullable = true)
 |-- loan_to_income_ratio: double (nullable = true)
 |-- num_of_prev_app: long (nullable = false)
 |-- num_of_approved_app: long (nullable = false)
 |-- total_credit: double (nullable = false)
 |-- total_credit_to_income_ratio: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- own_car: string (nullable = true)
 |-- own_property: string (nullable = true)
 |-- income_type: string (nullable = true)
 |-- education_type: string (nullable = true)
 |-- family_status: string (nullable = true)
 |-- occupation_type: string (nullable = true)
 |-- age_bucket: string (nullable = true)
 |-- credit_worthiness: string (nullable = true)
 |-- event_time: timestamp (nullable = true)



6. Load your ML model, and use the model and Spark to perform the following:  
    a) Every 10 seconds, print the total number of applications and number of potential default applications (prediction = 1) in the last 1 minute.  
    b) Every 15 seconds, show the total requested credit (sum of credit where default=0) in the last 15 seconds.  
    c) Every 1 minute, show the top 10 largest loan applications with the potential of default.  

In [36]:
# 6a
model = PipelineModel.load("best_classification_model")


In [37]:
preditions = model.transform(df_joined)

In [38]:
features = ['id_app','days_birth', 'loan_to_income_ratio', 
            'num_of_prev_app', 'num_of_approved_app', 
            'total_credit', 'total_credit_to_income_ratio', 
            'gender', 'own_car', 'own_property', 'income_type', 
            'education_type', 'family_status', 'occupation_type', 
            'age_bucket', 'credit_worthiness',"prediction",'event_time']
preditions = preditions.select(features)

In [39]:
applicantCounts = preditions \
    .withWatermark("event_time", "2 minutes") \
    .groupBy(window(col("event_time"), "60 seconds").alias("window")) \
    .agg(
        count("*").alias("total_num_applications"),
        sum(when(col("prediction") == 1, 1).otherwise(0)).alias("num_potential_default_applications")
    ) 


In [40]:
print(applicantCounts.isStreaming)

True


In [41]:
query10 = applicantCounts \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .option("truncate", "false") \
    .trigger(processingTime="10 seconds") \
    .start()

In [42]:
query10.stop()

----


In [43]:
# 6b
windowedCreditSum = preditions \
    .withWatermark("event_time", "2 minutes") \
    .groupBy(window(col("event_time"), "15 seconds").alias("window")) \
    .agg(
        sum(when(col("prediction") == 0, col("total_credit")).otherwise(0)).alias("total_requested_credit")
    ) \
    .select("window", "total_requested_credit")


query11 = windowedCreditSum \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .option("truncate", "false") \
    .trigger(processingTime="15 seconds") \
    .start()


In [44]:
query11.stop()

In [45]:
print(windowedCreditSum.isStreaming)

True


----

In [48]:
# 6c
def process_batch(df, epoch_id):
    df.filter("prediction = 1") \
      .orderBy(desc("total_credit")) \
      .limit(10) \
      .select("event_time","id_app","total_credit", "prediction")\
      .show() 

query12 = preditions \
    .withWatermark("event_time", "1 minutes") \
    .writeStream \
    .foreachBatch(process_batch) \
    .trigger(processingTime='1 minute') \
    .start()

+----------+------+------------+----------+
|event_time|id_app|total_credit|prediction|
+----------+------+------------+----------+
+----------+------+------------+----------+

+-------------------+------+---------------+----------+
|         event_time|id_app|   total_credit|prediction|
+-------------------+------+---------------+----------+
|2024-02-09 12:15:47|393110|      5730048.0|       1.0|
|2024-02-09 12:15:47|393120|      3370752.0|       1.0|
|2024-02-09 12:15:42|393097|      3269781.0|       1.0|
|2024-02-09 12:15:42|393101|      2466747.0|       1.0|
|2024-02-09 12:15:42|393091|2019128.8984375|       1.0|
|2024-02-09 12:15:57|393145|      1242945.0|       1.0|
|2024-02-09 12:15:52|393131|      1040868.0|       1.0|
|2024-02-09 12:15:47|393112|       711886.5|       1.0|
|2024-02-09 12:15:37|393080|       547893.0|       1.0|
|2024-02-09 12:15:57|393140|       158625.0|       1.0|
+-------------------+------+---------------+----------+

+-------------------+------+----------

In [74]:
query12.stop()

7. Write a Parquet file to store the following data:  
    a) Persist the raw data: Persist your prediction results along with application data and event_time in Parquet format; after that, read the Parquet file and show the first 10 records.  
    b) Persist the 15-second requested credit (6b) in another parquet file.


In [51]:
# 7 a
# Persist the raw data
persist_7a = preditions.writeStream \
    .format("parquet") \
    .outputMode("append") \
    .option("path", "parquet/7a")\
    .option("checkpointLocation", "checkpoint/persist_7a")\
    .start()

In [52]:
schema_7a = StructType([
    StructField("id_app", IntegerType(), True),
    StructField("days_birth", IntegerType(), True),
    StructField("loan_to_income_ratio", DoubleType(), True),
    StructField("num_of_prev_app", LongType(), False),
    StructField("num_of_approved_app", LongType(), False),
    StructField("total_credit", DoubleType(), False),
    StructField("total_credit_to_income_ratio", DoubleType(), True),
    StructField("gender", StringType(), True),
    StructField("own_car", StringType(), True),
    StructField("own_property", StringType(), True),
    StructField("income_type", StringType(), True),
    StructField("education_type", StringType(), True),
    StructField("family_status", StringType(), True),
    StructField("occupation_type", StringType(), True),
    StructField("age_bucket", StringType(), True),
    StructField("credit_worthiness", StringType(), True),
    StructField("prediction", DoubleType(), False),
    StructField("event_time", TimestampType(), True)
])

In [53]:
# I use this way to read due to I have to show the first 10 records
predic_df = spark.read.parquet("parquet/7a")

In [54]:
predic_df.show(10, vertical = True)

-RECORD 0--------------------------------------------
 id_app                       | 393890               
 days_birth                   | -13464               
 loan_to_income_ratio         | 2.77                 
 num_of_prev_app              | 2                    
 num_of_approved_app          | 1                    
 total_credit                 | 406786.5             
 total_credit_to_income_ratio | 1.80794              
 gender                       | F                    
 own_car                      | N                    
 own_property                 | N                    
 income_type                  | Working              
 education_type               | Secondary / secon... 
 family_status                | Single / not married 
 occupation_type              | Managers             
 age_bucket                   | M                    
 credit_worthiness            | medium               
 prediction                   | 0.0                  
 event_time                 

----

In [55]:
# 7b
# Persist the 15-second requested credit (6b)
persist_7b = windowedCreditSum \
    .writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "parquet/7b")\
    .option("checkpointLocation", "checkpoint/persist_7b")\
    .start()


8. Read the two parquet files from 7a and 7b as a data stream, and send the records to two topics with appropriate names.  
(Note: You shall read the parquet files as a streaming data frame and send messages to the Kafka topic when new data appears in the parquet files. The parquet files serve as an intermediate storage in this use case.)

## 8a

In [56]:
# The 6a data I can use for part3 directly. This is the reason why I do this operation here. 
parquet_6a = applicantCounts.writeStream \
    .format("parquet") \
    .outputMode("append") \
    .option("path", "parquet/6a")\
    .option("checkpointLocation", "checkpoint/parquet_6a")\
    .start()

In [57]:
schema_6a = StructType([
    StructField("window", StructType([
        StructField("start", TimestampType(), True),
        StructField("end", TimestampType(), True),
    ]), False),
    StructField("total_num_applications", LongType(), False),
    StructField("num_potential_default_applications", LongType(), True),
])

In [58]:
df_6a = spark.readStream\
        .format("parquet")\
        .schema(schema_6a)\
        .load("parquet/6a")

In [61]:
hostip = "host.docker.internal"
query6a_to_kafka = df_6a \
    .select(to_json(struct("*")).alias("value")) \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", f'{hostip}:9092') \
    .option("topic", "topic_8a1") \
    .option("checkpointLocation", "checkpoint/topic_8a1") \
    .trigger(processingTime= " 5 seconds")\
    .start()

In [62]:
# I also do the  parquet files from 7a 
df_8a = spark.readStream\
        .format("parquet")\
        .schema(schema_7a)\
        .load("parquet/7a")

In [63]:
hostip = "host.docker.internal"
query8a_to_kafka = df_8a \
    .select(to_json(struct("*")).alias("value")) \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", f'{hostip}:9092') \
    .option("topic", "topic_8a2") \
    .option("checkpointLocation", "checkpoint/topic_8a2") \
    .start()

## 8b

In [64]:
schema_7b = StructType([
    StructField("window", StructType([
        StructField("start", TimestampType(), True),
        StructField("end", TimestampType(), True)
    ]), False),
    StructField("total_requested_credit", DoubleType(), True)
])

df_8b = spark.readStream\
        .format("parquet")\
        .schema(schema_7b)\
        .load("parquet/7b")

In [67]:
hostip = "host.docker.internal"
query_8b_to_kafka = df_8b \
    .select(to_json(struct("*")).alias("value")) \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", f'{hostip}:9092') \
    .option("topic", "topic_8b") \
    .option("checkpointLocation", "checkpoint/topic_8b") \
    .start()


-----


This part just for testing if sending to kafka server successfully.

In [71]:
topic = 'topic_8a1'
#configuration
hostip = "host.docker.internal"

df_test = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", f"{hostip}:9092") \
    .option("subscribe", topic) \
    .load()

In [72]:
query1 = df_test\
            .writeStream\
            .outputMode("append")\
            .format("console")\
            .trigger(processingTime= "5 seconds")\
            .start()

In [73]:
query1.stop()