In [1]:
!pip install kafka-python




In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from kafka import KafkaProducer
import json
import pandas as pd
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, BooleanType
import pyspark.sql.functions as F
import pyspark.sql.types as t
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.ml import feature as MF
from pyspark.ml.functions import vector_to_array
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.feature import PCA
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, LinearSVC, GBTClassifier
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml import Pipeline, PipelineModel

KAFKA_BROKER = "kafka:9092"
TOPIC = "Group3-Loan-Data9"
import time


In [20]:
spark = SparkSession.builder \
    .appName("Capstone") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
    .getOrCreate()

In [21]:
data = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", TOPIC) \
    .option("startingOffsets", "earliest") \
    .load()

In [22]:
df = spark.read.csv('Loan_Default.csv',header=True,inferSchema = True)

In [23]:
sc = df.schema

In [24]:
json_df = data.selectExpr("CAST(value AS STRING)")\
    .select(from_json(col("value"), sc).alias("data")).select("data.*")

In [25]:
print(json_df.isStreaming) 

True


In [26]:

json_df = json_df.drop('ID','year')
processed_df = json_df.withColumn("Age_range_1", F.split(F.col("age"), "-")[0]) \
                      .withColumn("Age_range_2", F.split(F.col("age"), "-")[1]) \
                      .drop("age")

processed_df.createOrReplaceTempView("df")


col_to_double = ['loan_amount', 'rate_of_interest', 'Interest_rate_spread', 'Upfront_charges',
                 'term', 'income', 'dtir1', 'Status', 'LTV', 'Credit_Score', 'property_value',
                 'Age_range_1', 'Age_range_2']

col_to_string = list(set(processed_df.columns) - set(col_to_double))

query = ", ".join([f"CAST(`{col}` AS DOUBLE) AS `{col}`" for col in col_to_double])
query1 = ", ".join(f"CAST(`{col}` AS STRING) AS `{col}`" for col in col_to_string)

df_new = spark.sql(f"SELECT {query}, {query1} FROM df")


df_new.createOrReplaceTempView("df")

df_new = spark.sql("""
    SELECT *, 
           COALESCE(rate_of_interest, 0) AS rate_of_interest_, 
           COALESCE(Interest_rate_spread, 0) AS Interest_rate_spread_, 
           COALESCE(Upfront_charges, 0) AS Upfront_charges_, 
           COALESCE(dtir1, 0) AS dtir1_ 
    FROM df
""")
df_new = df_new.drop("rate_of_interest", "Interest_rate_spread", "Upfront_charges", "dtir1")

df_new = df_new.dropna()
indexer_model = MF.StringIndexerModel.load("indexer_model")

df_indexed = indexer_model.transform(df_new)

df_indexed = df_indexed.drop(*col_to_string)

In [27]:
assembler_df = MF.VectorAssembler(
    inputCols=[c for c in df_indexed.columns if c != "Status"],
    outputCol="features"
).transform(df_indexed)
# assembler_df.show(5)

In [28]:
data = assembler_df.select('features','Status').withColumnRenamed('Status','label')

In [29]:
# df_indexed.show()

In [30]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.feature import PCA
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, LinearSVC, GBTClassifier


In [31]:
from pyspark.ml.classification import LogisticRegressionModel

model = LogisticRegressionModel.load("lr_model_best")
predictions = model.transform(data)

In [32]:
# query = predictions.writeStream \
#     .outputMode("append") \
#     .format("csv") \
#     .option("path", "predictions_output/").option("checkpointLocation", "checkpoint/").trigger(processingTime="10 seconds").start()



In [33]:
pred = predictions.select("prediction")

In [34]:
# pred.show(truncate = False)

In [35]:
import sqlite3

def write_to_sqlite(batch_df, batch_id):
    conn = sqlite3.connect('pred.db')
    cursor = conn.cursor()

    cursor.execute('''CREATE TABLE IF NOT EXISTS predictions ( 
                      prediction INT)''')

    pandas_df = batch_df.toPandas()
    rows = [(row['prediction'],) for _, row in pandas_df.iterrows()]

    cursor.executemany('''INSERT INTO predictions (prediction) VALUES (?)''', rows)

    conn.commit()
    conn.close()


In [36]:

query_sqlite = pred.writeStream \
    .outputMode("append") \
    .foreachBatch(write_to_sqlite) \
    .start()

In [37]:
conn = sqlite3.connect('pred.db')
df_sql = pd.read_sql("SELECT * FROM predictions", conn)
print(df_sql)  
conn.close()

     prediction
0             1
1             0
2             0
3             0
4             0
..          ...
127           0
128           0
129           0
130           0
131           0

[132 rows x 1 columns]
