In [3]:
from fastapi import FastAPI
from pydantic import BaseModel
from typing import Optional
import pandas as pd
from pyspark.ml import PipelineModel
from pyspark.sql import SparkSession
from pyspark.ml import PipelineModel
from pyspark.sql.types import *

spark = SparkSession.builder.appName("TitanicClassifierAPI").getOrCreate()

schema = StructType([
    StructField("PassengerId", IntegerType(), True),
    StructField("Pclass", IntegerType(), True),
    StructField("Name", StringType(), True),
    StructField("Sex", StringType(), True),
    StructField("Age", FloatType(), True),
    StructField("SibSp", IntegerType(), True),
    StructField("Parch", IntegerType(), True),
    StructField("Ticket", StringType(), True),
    StructField("Fare", FloatType(), True),
    StructField("Cabin", StringType(), True),
    StructField("Embarked", StringType(), True),
])

# -------------------------------
# Load model at startup
# -------------------------------
MODEL_PATH = "deployment/model/sparkml"  # wherever you saved it
model = PipelineModel.load(MODEL_PATH)
# Define FastAPI app

# -------------------------------
app = FastAPI(title="TitanicClassifier Inference API")

# -------------------------------
# Define input schema
# -------------------------------
# Example: replace feature1, feature2 with your actual features
class PredictionRequest(BaseModel):
    PassengerId: Optional[int] = None
    Pclass: int
    Name: Optional[str] = None
    Sex: str
    Age: Optional[float] = None
    SibSp: Optional[int] = None
    Parch: int
    Ticket: Optional[str] = None
    Fare: Optional[float] = None
    Cabin: Optional[str] = None
    Embarked: Optional[str] = None

# -------------------------------
# /predict endpoint
# -------------------------------
@app.post("/predict")
def predict(request: PredictionRequest):

    input_data = [request.dict()]
    input_df = spark.createDataFrame(input_data, schema=schema)
    preds = model.transform(input_df)
    print(preds)
    result_df = preds.select("prediction", "probability").toPandas()
    # Return predictions as JSON
    return {"predictions": result_df.to_dict('records')}


ERROR StatusLogger DOM element is - not a <log4j:configuration> element.
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/work/spark-3.1.3/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/work/spark-3.1.3/jars/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/work/spark-3.1.3/jars/slf4j-reload4j-1.7.35.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
                                                                                

In [4]:
print(model.stages)

[PipelineModel_3cc8edbcf60a, RandomForestClassificationModel: uid=RandomForestClassifier_f0ca91842f89, numTrees=20, numClasses=2, numFeatures=6]


In [1]:
import requests
import pandas as pd
from training.spark_session import spark_session_creator
url = "http://localhost:8000/predict"

test_df=pd.read_csv(r"/root/AILabProject/data/test.csv")
test_df = test_df.where(pd.notnull(test_df), None)

predictions = []

for _, row in test_df.iterrows():
    payload = row.to_dict()
    response = requests.post(url, json=payload)
    predictions.append(response.json()["predictions"][0])

JSONDecodeError: Expecting value: line 1 column 1 (char 0)

In [2]:
import requests
import pandas as pd

url = "http://localhost:8000/predict"

try:
    test_df = pd.read_csv(r"/root/AILabProject/data/test.csv")
    test_df = test_df.where(pd.notnull(test_df), None)

    # --- DEBUGGING: Test with just the first row ---
    print("--- Testing with the first row of data ---")
    
    first_row = test_df.iloc[0]
    payload = first_row.to_dict()

    print("\n[1] SENDING THIS PAYLOAD:")
    print(payload)

    response = requests.post(url, json=payload)

    print("\n[2] RECEIVED THIS STATUS CODE:")
    print(response.status_code)

    print("\n[3] RECEIVED THIS RESPONSE TEXT:")
    print(response.text)

    # Try to parse JSON only if the request was successful (status code 200)
    if response.status_code == 200:
        print("\n[4] SUCCESSFULLY PARSED PREDICTION:")
        prediction = response.json()["predictions"][0]
        print(prediction)
    else:
        print("\n[4] FAILED: The status code was not 200. Check the response text above for error details.")

except requests.exceptions.ConnectionError as e:
    print(f"\nCONNECTION FAILED: Is your FastAPI server running at {url}?")
except Exception as e:
    print(f"\nAN UNEXPECTED ERROR OCCURRED: {e}")

--- Testing with the first row of data ---

[1] SENDING THIS PAYLOAD:
{'PassengerId': 892, 'Pclass': 3, 'Name': 'Kelly, Mr. James', 'Sex': 'male', 'Age': 34.5, 'SibSp': 0, 'Parch': 0, 'Ticket': '330911', 'Fare': 7.8292, 'Cabin': None, 'Embarked': 'Q'}

[2] RECEIVED THIS STATUS CODE:
500

[3] RECEIVED THIS RESPONSE TEXT:
Internal Server Error

[4] FAILED: The status code was not 200. Check the response text above for error details.
