In [None]:
# model_persistence_student_performance.ipynb

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import QuantileDiscretizer, VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import os

# Start Spark session
spark = SparkSession.builder.appName("ModelPersistenceStudent").master("local[*]").getOrCreate()

# Load UCI Student Performance data (adjust path accordingly)
data_path = "student-mat.csv"  # Update to your CSV location
df = spark.read.csv(data_path, header=True, sep=";")

# Create binary label 'passed' (final grade G3 >= 10)
df = df.withColumn("G3_int", df["G3"].cast("double"))
df = df.withColumn("label", (df["G3_int"] >= 10).cast("double"))

# Select one numeric feature to demonstrate discretization (e.g., 'age')
df = df.withColumn("age_double", df["age"].cast("double"))

# Keep only required columns
data = df.select("age_double", "label").na.drop()

# Discretize 'age_double' into 3 buckets
discretizer = QuantileDiscretizer(numBuckets=3, inputCol="age_double", outputCol="discretized")

# Assemble features vector
assembler = VectorAssembler(inputCols=["discretized"], outputCol="features")

# Logistic Regression model
lr = LogisticRegression(maxIter=5, featuresCol="features", labelCol="label")

# Pipeline
pipeline = Pipeline(stages=[discretizer, assembler, lr])

# Parameter grid for cross-validation
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.0, 0.1]) \
    .build()

# Evaluator
evaluator = BinaryClassificationEvaluator(labelCol="label")

# CrossValidator setup
cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator,
                    numFolds=2)

# Fit model
cv_model = cv.fit(data)

# Save model
model_path = "student_performance_model"
cv_model.write().overwrite().save(model_path)

print(f"Model saved at: {model_path}")

# Load persisted model
from pyspark.ml.tuning import CrossValidatorModel
loaded_model = CrossValidatorModel.load(model_path)
print(f"Loaded model UID: {loaded_model.uid}")

spark.stop()


In [None]:
# predictive_service.py

import uvicorn
from fastapi import FastAPI, Query
from pydantic import BaseModel
from typing import List
from pyspark.sql import SparkSession
from pyspark.ml.tuning import CrossValidatorModel
from pyspark.ml.feature import QuantileDiscretizer, VectorAssembler

app = FastAPI()

# Start Spark session (in real deployment, manage lifecycle properly)
spark = SparkSession.builder.appName("StudentPerformancePredictiveService").master("local[*]").getOrCreate()

# Load persisted model
model_path = "student_performance_model"
model = CrossValidatorModel.load(model_path)

class PredictionRequest(BaseModel):
    ages: List[float]  # list of ages for prediction

class PredictionResponse(BaseModel):
    predictions: List[float]

@app.post("/predict", response_model=PredictionResponse)
def predict(data: PredictionRequest):
    # Prepare data as Spark DataFrame
    df = spark.createDataFrame([(age,) for age in data.ages], ["age_double"])

    # Run model pipeline stages manually (discretize + assemble)
    # Since the pipeline includes discretizer and assembler, we can use the full pipeline model:
    predictions_df = model.bestModel.transform(df)

    # Collect predictions
    preds = predictions_df.select("prediction").rdd.flatMap(lambda x: x).collect()

    return PredictionResponse(predictions=preds)

# Run with: uvicorn predictive_service:app --reload --host 0.0.0.0 --port 9000
if __name__ == "__main__":
    uvicorn.run("predictive_service:app", host="0.0.0.0", port=9000, reload=True)
