In [50]:
from kafka import KafkaConsumer
import json
from pyspark.sql import SparkSession
from pyspark.ml.feature import Imputer, StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline, PipelineModel
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType

# Initialize Spark session
spark = SparkSession.builder \
    .appName("LoanApplicationPredictor") \
    .getOrCreate()

In [51]:
spark

In [52]:
schema = StructType([
    StructField("LoanID", StringType(), True),
    StructField("Age", IntegerType(), True),
    StructField("Income", IntegerType(), True),
    StructField("LoanAmount", IntegerType(), True),
    StructField("CreditScore", IntegerType(), True),
    StructField("MonthsEmployed", IntegerType(), True),
    StructField("NumCreditLines", IntegerType(), True),
    StructField("InterestRate", DoubleType(), True),
    StructField("LoanTerm", IntegerType(), True),
    StructField("DTIRatio", DoubleType(), True),
    StructField("Education", StringType(), True),
    StructField("EmploymentType", StringType(), True),
    StructField("MaritalStatus", StringType(), True),
    StructField("HasMortgage", StringType(), True),
    StructField("HasDependents", StringType(), True),
    StructField("LoanPurpose", StringType(), True),
    StructField("HasCoSigner", StringType(), True),
    StructField("Default", IntegerType(), True)
])

In [53]:
consumer = KafkaConsumer(
    'loan_applications',
    bootstrap_servers='localhost:9092',
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

In [54]:
from pyspark.ml.classification import LogisticRegressionModel
model_path = "/home/drissdo/Desktop/Scalable-Distributed-Systems/ML/model"
lr_model = LogisticRegressionModel.load(model_path)

In [55]:
def preprocess_data(loan_data):
    imputer = Imputer(
    inputCols=["Income", "MonthsEmployed", "NumCreditLines", "InterestRate", "LoanTerm", "DTIRatio"],
    outputCols=["Income_filled", "MonthsEmployed_filled", "NumCreditLines_filled", "InterestRate_filled", "LoanTerm_filled", "DTIRatio_filled"]
)
    loan_data_imputed = imputer.fit(loan_data).transform(loan_data)

    string_indexers = [
        StringIndexer(inputCol="Education", outputCol="Education_index"),
        StringIndexer(inputCol="EmploymentType", outputCol="EmploymentType_index"),
        StringIndexer(inputCol="MaritalStatus", outputCol="MaritalStatus_index"),
        StringIndexer(inputCol="HasMortgage", outputCol="HasMortgage_index"),
        StringIndexer(inputCol="HasDependents", outputCol="HasDependents_index"),
        StringIndexer(inputCol="LoanPurpose", outputCol="LoanPurpose_index"),
        StringIndexer(inputCol="HasCoSigner", outputCol="HasCoSigner_index")
    ]

    pipeline_indexers = Pipeline(stages=string_indexers)
    loan_data_indexed = pipeline_indexers.fit(loan_data_imputed).transform(loan_data_imputed)
    
    one_hot_encoders = [
    OneHotEncoder(inputCol="Education_index", outputCol="Education_vec"),
    OneHotEncoder(inputCol="EmploymentType_index", outputCol="EmploymentType_vec"),
    OneHotEncoder(inputCol="MaritalStatus_index", outputCol="MaritalStatus_vec"),
    OneHotEncoder(inputCol="HasMortgage_index", outputCol="HasMortgage_vec"),
    OneHotEncoder(inputCol="HasDependents_index", outputCol="HasDependents_vec"),
    OneHotEncoder(inputCol="LoanPurpose_index", outputCol="LoanPurpose_vec"),
    OneHotEncoder(inputCol="HasCoSigner_index", outputCol="HasCoSigner_vec")
]

    pipeline_encoders = Pipeline(stages=one_hot_encoders)
    loan_data_encoded = pipeline_encoders.fit(loan_data_indexed).transform(loan_data_indexed)

    # Normalize numerical features
    numerical_cols = [
        "Age", "Income_filled", "LoanAmount", "CreditScore", "MonthsEmployed_filled",
        "NumCreditLines_filled", "InterestRate_filled", "LoanTerm_filled", "DTIRatio_filled"
    ]

    assembler = VectorAssembler(inputCols=numerical_cols, outputCol="features")
    loan_data_assembled = assembler.transform(loan_data_encoded)

    scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
    scaler_model = scaler.fit(loan_data_assembled)
    loan_data_scaled = scaler_model.transform(loan_data_assembled)

    loan_data_scaled = loan_data_scaled.select(["LoanID", "scaled_features"])

    return loan_data_scaled

In [57]:
# Create empty list to store all messages
messages_list = []

# 1. Collect all messages from consumer
try:
    for message in consumer:
        if isinstance(message.value, dict):
            messages_list.append(message.value)
            print(f"Collected message: {message.value['LoanID']}")  # Log collection
            
except Exception as e:
    print(f"Error collecting messages: {e}")

# 2. Process all collected messages if we have any
if messages_list:
    try:
        # Create DataFrame from all collected messages
        df = spark.createDataFrame(messages_list, schema=schema)
        print(f"Total records collected: {df.count()}")
        
        # Process all data at once
        test_data = preprocess_data(df)
        lr_predictions = lr_model.transform(test_data)
        
        # Show predictions
        print("\nPredictions:")
        lr_predictions.select("LoanID", "prediction").show()
        
    except Exception as e:
        print(f"Error processing data: {e}")
else:
    print("No valid messages collected")

# Optional: Commit offsets after processing
consumer.commit()

Collected message: Q47SC11GPK
Collected message: 5PKRDS9T6G
Collected message: DT0Q970YGH
Collected message: A38RC479WR
Collected message: CRDXTPI8KY
Collected message: V5L1T406JF
Collected message: P3EG7LBMIC
Collected message: 0HCI1B2YTB
Collected message: BV6I61ZH8K
Collected message: 8LX0LK8Y7J
Collected message: UMJ25ZD6PX
Collected message: VHVCKK7F25
Collected message: XN2FOKJ26P
Collected message: BB9FDTKPFE
Collected message: 0HOJTGKRZS
Collected message: 6XW7VHV3DZ
Collected message: 3AU7U4IKEO
Collected message: EXENZ2QVJX
Collected message: 49BYIQJMSN
Collected message: A6HFUDU3NH
Collected message: T8SHS85JJW
Collected message: ANV7HYZS6S
Collected message: CH31F65HON
Collected message: L2I6M1JXAP
Collected message: KBZ11MLLJ9
Collected message: H2XT0LWT35
Collected message: IGGM3Z04E7
Collected message: QVA7VQCAEJ
Collected message: 613VQWAH7P
Collected message: 565JDAH6BC
Collected message: ZS4Z66KNPF
Collected message: 43FR8OTJX2
Collected message: 1JLC1M0V6G
Collected 

                                                                                

Total records collected: 100


                                                                                


Predictions:
+----------+----------+
|    LoanID|prediction|
+----------+----------+
|Q47SC11GPK|       0.0|
|5PKRDS9T6G|       0.0|
|DT0Q970YGH|       0.0|
|A38RC479WR|       0.0|
|CRDXTPI8KY|       0.0|
|V5L1T406JF|       0.0|
|P3EG7LBMIC|       0.0|
|0HCI1B2YTB|       0.0|
|BV6I61ZH8K|       0.0|
|8LX0LK8Y7J|       0.0|
|UMJ25ZD6PX|       0.0|
|VHVCKK7F25|       0.0|
|XN2FOKJ26P|       0.0|
|BB9FDTKPFE|       0.0|
|0HOJTGKRZS|       0.0|
|6XW7VHV3DZ|       0.0|
|3AU7U4IKEO|       0.0|
|EXENZ2QVJX|       0.0|
|49BYIQJMSN|       0.0|
|A6HFUDU3NH|       0.0|
+----------+----------+
only showing top 20 rows



24/12/12 10:28:49 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


AssertionError: Requires group_id

In [None]:
spark.stop()