### ETL for pickup

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, expr
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA
from pyspark.ml.classification import DecisionTreeClassificationModel
import logging

postgresql_jdbc_jar = r"C:/Program Files/PostgreSQL/17/postgresql-42.7.4.jar"
spark = SparkSession.builder.appName('ETL')\
                            .config("spark.jars", postgresql_jdbc_jar) \
                            .config("spark.driver.extraClassPath", postgresql_jdbc_jar) \
                            .config("spark.driver.memory", "8g")\
                            .config("spark.executor.memory", "8g")\
                            .config("spark.executor.cores", "4")\
                            .getOrCreate()

### Previous

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, expr
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA, StringIndexer
from pyspark.ml.classification import DecisionTreeClassificationModel, LogisticRegressionModel
from pyspark.ml.regression import LinearRegressionModel
from pyspark.ml.tuning import TrainValidationSplitModel
import logging
from pyspark.sql.types import NumericType
from pyspark.ml.functions import vector_to_array


# Initialize Spark Session
spark = SparkSession.builder.appName("Pickup_ETL_Pipeline").getOrCreate()

# Setup Logger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("Pickup_ETL_Pipeline")

# Task 1: Split df_model into Train (80%) and Test (20%)
def split_data(df):
    logger.info("Splitting data into train (80%) and test (20%)...")
    train_data, test_data = df.randomSplit([0.975, 0.0025], seed=42)
    logger.info(f"Train Data Count: {train_data.count()}, Test Data Count: {test_data.count()}")
    return test_data  # Using test data as new pickup data

# Task 2: Preprocessing - Handle Missing Values
def preprocess_data(df):
    logger.info("Preprocessing data: Handling missing values...")

    # Identify Numeric Columns
    numeric_cols = [field.name for field in df.schema.fields if isinstance(field.dataType, NumericType)]

    # Fill missing values only for numeric columns
    for col_name in numeric_cols:
        avg_value = df.selectExpr(f"avg({col_name}) as avg").collect()[0]["avg"]
        if avg_value is not None:  # Ensure avg is not None
            df = df.fillna({col_name: avg_value})

    logger.info("Missing values handled successfully.")
    return df

# Task 3: Feature Engineering - Select & Transform Features
def feature_transformation(df):
    logger.info("Applying feature transformation...")

    # Convert 'is_delayed' into numeric index
    indexer_is_delayed = StringIndexer(inputCol="is_delayed", outputCol="is_delayed_indexed", handleInvalid="keep")
    df = indexer_is_delayed.fit(df).transform(df)

    # Convert 'speed_status' into numeric index
    indexer_speed_status = StringIndexer(inputCol="speed_status", outputCol="speed_status_indexed", handleInvalid="keep")
    df = indexer_speed_status.fit(df).transform(df)

    # Feature Selection (Use indexed version of `is_delayed`)
    feature_columns = [
        "pca1", "pca2", "pca3", "pca4", "pca5", "pca6", "pca7", "pca8", "pca9", 
        "speed_status_indexed", "is_delayed_indexed"
    ]

    # Assemble Features
    assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
    df = assembler.transform(df)

    # Standardization
    scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
    scaler_model = scaler.fit(df)
    df = scaler_model.transform(df)

    # Apply PCA
    pca = PCA(k=8, inputCol="scaled_features", outputCol="pca_features")
    pca_model = pca.fit(df)
    df = pca_model.transform(df)

    logger.info("Feature transformation completed successfully.")
    return df

# Task 4: Load Regression Model & Predict ETA
model_path_eta = r'C:\Users\Dusty\Downloads\Internship\Last-Mile-Delivery-Delays-and-Route-Optimization\notebooks\regression_model_pickup'

def predict_eta(df, model_path_eta):
    logger.info("Loading regression model for ETA prediction...")
    
    # Load trained Linear Regression model
    regression_model = LinearRegressionModel.load(model_path_eta)

    # Rename `pca_features` to `poly_features` (if applicable)
    if "pca_features" in df.columns:
        df = df.withColumnRenamed("pca_features", "poly_features")

    # Make Predictions
    df = regression_model.transform(df).withColumnRenamed("prediction", "predicted_eta")
    return df

# Task 5: Load Classification Model & Predict Delay
model_path_delay = r'C:\Users\Dusty\Downloads\Internship\Last-Mile-Delivery-Delays-and-Route-Optimization\notebooks\classification_model_pickup'

from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col, when
from pyspark.ml.functions import vector_to_array
from pyspark.ml.classification import LogisticRegressionModel

def predict_delay(df, classification_model_path):
    logger.info("Loading classification model...")
    classification_model = LogisticRegressionModel.load(classification_model_path)

    logger.info("Columns in Test Data Before Prediction: " + str(df.columns))

    # Ensure 'features' column exists (rebuild it if necessary)
    feature_cols = ["speed_status_indexed", "pca1", "pca2", "pca3", "pca4", "pca5", "pca6", "pca7", "pca8", "pca9"]
    if "features" not in df.columns:
        assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
        df = assembler.transform(df)

    logger.info("Applying classification model for delay prediction...")
    df = classification_model.transform(df)

    logger.info("Columns in Test Data After Prediction: " + str(df.columns))

    # Convert SparseVector probability column to Array
    df = df.withColumn("probability_array", vector_to_array(col("probability")))

    # Extract probability of delay
    df = df.withColumn("delay_probability", col("probability_array")[1])

    # Apply threshold for adjusted prediction
    df = df.withColumn(
        "adjusted_prediction",
        when(col("delay_probability") > 0.25, 1.0).otherwise(0.0)
    )

    # Convert numerical prediction to string labels
    df = df.withColumn(
        "is_delayed_string",
        when(col("adjusted_prediction") == 1.0, "Delayed").otherwise("On Time")
    )

    logger.info("Delay prediction completed.")
    return df

# Task 6: Save Predictions
def save_predictions(df, output_path):
    logger.info("Saving predictions to CSV...")

    df.select(
        col("predicted_eta"),
        col("is_delayed_string").alias("is_delayed"),  # Save as string
        col("delay_probability"),
        col("adjusted_prediction")  # Keep numerical version if needed
    ).write.csv(output_path, header=True)

    logger.info("Predictions saved successfully.")

# Task 7: Execute Full Pipeline
if __name__ == "__main__":
    # Model Paths
    regression_model_path = "regression_model_pickup"  # Replace with actual path
    classification_model_path = "classification_model_pickup"  # Replace with actual path
    output_predictions_path = r'C:\Users\Dusty\Downloads\Internship\Last-Mile-Delivery-Delays-and-Route-Optimization\data\ETL_predictions_pickup.csv'


    # Step-by-Step Execution
    test_data = split_data(df_model)  # Split Data
    test_data = preprocess_data(test_data)  # Preprocess Data
    test_data = feature_transformation(test_data)  # Feature Engineering
    test_data = predict_eta(test_data, regression_model_path)  # Predict ETA
    test_data = predict_delay(test_data, classification_model_path)  # Predict Delay
    save_predictions(test_data, output_predictions_path)  # Save Results

    logger.info("Pickup Pipeline Execution Completed Successfully.")

INFO:Pickup_ETL_Pipeline:Splitting data into train (80%) and test (20%)...
INFO:Pickup_ETL_Pipeline:Train Data Count: 799139, Test Data Count: 1999
INFO:Pickup_ETL_Pipeline:Preprocessing data: Handling missing values...
INFO:Pickup_ETL_Pipeline:Missing values handled successfully.
INFO:Pickup_ETL_Pipeline:Applying feature transformation...
INFO:Pickup_ETL_Pipeline:Feature transformation completed successfully.
INFO:Pickup_ETL_Pipeline:Loading regression model for ETA prediction...
INFO:Pickup_ETL_Pipeline:Loading classification model...
INFO:Pickup_ETL_Pipeline:Columns in Test Data Before Prediction: ['pickup_time', 'accept_time', 'is_delayed', 'speed_status', 'pca1', 'pca2', 'pca3', 'pca4', 'pca5', 'pca6', 'pca7', 'pca8', 'pca9', 'pickup_eta_minutes', 'is_delayed_indexed', 'speed_status_indexed', 'features', 'scaled_features', 'poly_features', 'predicted_eta']
INFO:Pickup_ETL_Pipeline:Applying classification model for delay prediction...
INFO:Pickup_ETL_Pipeline:Columns in Test Data A

Py4JJavaError: An error occurred while calling o5720.csv.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 765.0 failed 1 times, most recent failure: Lost task 0.0 in stage 765.0 (TID 558) (DESKTOP-9JO5RF2 executor driver): org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to file:/C:/Users/Dusty/Downloads/Internship/Last-Mile-Delivery-Delays-and-Route-Optimization/data/ETL_predictions_pickup.csv.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:775)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:420)
	at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`ProbabilisticClassificationModel$$Lambda$4078/0x00000007c161d828`: (struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:91)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:403)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:410)
	... 17 more
Caused by: java.lang.IllegalArgumentException: requirement failed: BLAS.dot(x: Vector, y:Vector) was given Vectors with non-matching sizes: x.size = 11, y.size = 10
	at scala.Predef$.require(Predef.scala:281)
	at org.apache.spark.ml.linalg.BLAS$.dot(BLAS.scala:123)
	at org.apache.spark.ml.classification.LogisticRegressionModel.$anonfun$margin$1(LogisticRegression.scala:1151)
	at org.apache.spark.ml.classification.LogisticRegressionModel.$anonfun$margin$1$adapted(LogisticRegression.scala:1150)
	at org.apache.spark.ml.classification.LogisticRegressionModel.predictRaw(LogisticRegression.scala:1241)
	at org.apache.spark.ml.classification.LogisticRegressionModel.predictRaw(LogisticRegression.scala:1060)
	at org.apache.spark.ml.classification.ProbabilisticClassificationModel.$anonfun$transform$2(ProbabilisticClassifier.scala:121)
	... 25 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$4(FileFormatWriter.scala:307)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:271)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:391)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:364)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:860)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to file:/C:/Users/Dusty/Downloads/Internship/Last-Mile-Delivery-Delays-and-Route-Optimization/data/ETL_predictions_pickup.csv.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:775)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:420)
	at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`ProbabilisticClassificationModel$$Lambda$4078/0x00000007c161d828`: (struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:91)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:403)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:410)
	... 17 more
Caused by: java.lang.IllegalArgumentException: requirement failed: BLAS.dot(x: Vector, y:Vector) was given Vectors with non-matching sizes: x.size = 11, y.size = 10
	at scala.Predef$.require(Predef.scala:281)
	at org.apache.spark.ml.linalg.BLAS$.dot(BLAS.scala:123)
	at org.apache.spark.ml.classification.LogisticRegressionModel.$anonfun$margin$1(LogisticRegression.scala:1151)
	at org.apache.spark.ml.classification.LogisticRegressionModel.$anonfun$margin$1$adapted(LogisticRegression.scala:1150)
	at org.apache.spark.ml.classification.LogisticRegressionModel.predictRaw(LogisticRegression.scala:1241)
	at org.apache.spark.ml.classification.LogisticRegressionModel.predictRaw(LogisticRegression.scala:1060)
	at org.apache.spark.ml.classification.ProbabilisticClassificationModel.$anonfun$transform$2(ProbabilisticClassifier.scala:121)
	... 25 more


In [None]:
print("Schema of training data:")
classification_model.summary.predictions.printSchema()

print("Schema of test data:")
df.printSchema()

Schema of training data:


RuntimeError: No training summary available for this LogisticRegressionModel

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, expr
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA, StringIndexer
from pyspark.ml.classification import DecisionTreeClassificationModel, LogisticRegressionModel
from pyspark.ml.regression import LinearRegressionModel
from pyspark.ml.tuning import TrainValidationSplitModel
import logging
from pyspark.sql.types import NumericType
from pyspark.ml.functions import vector_to_array
import pickle

# Configure Logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)

# Initialize Spark Session
spark = SparkSession.builder.appName("ETA_ETL_Pipeline").getOrCreate()

# Task 1: Extract Data from PostgreSQL
url = "jdbc:postgresql://localhost:5432/postgres"
properties = {
    "user": "postgres",
    "password": "root",
    "driver": "org.postgresql.Driver"
}

logger.info("Extracting data from PostgreSQL...")
df_model = spark.read.jdbc(url=url, table="feature_engg_pickup_data", properties=properties)
df_model = df_model.sample(fraction=0.1, seed=42)
logger.info(f"Data Extracted. Total Rows: {df_model.count()}")

# Task 2: Feature Transformation (Encoding, Scaling, PCA)
logger.info("Applying Feature Transformation...")

# Encode 'is_delayed' & 'speed_status'
indexer_speed_status = StringIndexer(inputCol="speed_status", outputCol="speed_status_indexed", handleInvalid="keep")
df_model = indexer_speed_status.fit(df_model).transform(df_model)

# Select Features
feature_columns = ["pca_1", "pca_2", "pca_3", "pca_4", "pca_5", "pca_6", "pca_7", "pca_8", "pca_9", "speed_status_indexed"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df_model = assembler.transform(df_model)

# Standardization
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
scaler_model = scaler.fit(df_model)
df_model = scaler_model.transform(df_model)

# Apply PCA
pca = PCA(k=9, inputCol="scaled_features", outputCol="pca_features")
pca_model = pca.fit(df_model)
df_model = pca_model.transform(df_model)

logger.info("Feature Transformation Completed")

# Task 3: Load Pickle Model
logger.info("Loading ETA Prediction Model...")
with open("eta_model_pickup.pkl", "rb") as f:
    eta_model = pickle.load(f)

if eta_model:
    logger.info("ETA Model Loaded Successfully")
else:
    raise Exception("Failed to load ETA model")

# Task 4: Convert Spark DataFrame to NumPy Array for Prediction
logger.info("Converting DataFrame to NumPy Array for Prediction...")
feature_data = np.array(df_model.select("pca_features").rdd.map(lambda row: row[0]).collect())
logger.info(f"Feature Data Shape: {feature_data.shape}")

# Task 5: Make Predictions
logger.info("Making Predictions on Data...")
predictions = eta_model.predict(feature_data)

# Convert Predictions Back to Spark DataFrame
predictions_df = spark.createDataFrame([(float(pred),) for pred in predictions], ["predicted_eta"])

# Assign Unique IDs to Join Back with Original Data
df_model = df_model.withColumn("id", monotonically_increasing_id())
predictions_df = predictions_df.withColumn("id", monotonically_increasing_id())

# Join Predictions with Original Data
df_final = df_model.join(predictions_df, on="id").drop("id")

logger.info("ETA Predictions Completed")

# Task 6: Save Predictions
output_path = "ETA_predictions_pickup.csv"
df_final.select(col("predicted_eta")).write.csv(output_path, header=True)
logger.info(f"Predictions saved to {output_path}")

# Pipeline Completed
logger.info("ETA Prediction Pipeline Completed Successfully")

2025-03-16 14:27:19,327 - INFO - Extracting data from PostgreSQL...
2025-03-16 14:27:19,933 - INFO - Data Extracted. Total Rows: 401124
2025-03-16 14:27:19,933 - INFO - Applying Feature Transformation...


IllegalArgumentException: pca_1 does not exist. Available: order_id, pickup_time, accept_time, hour_of_day, day_of_week, month, pickup_eta_minutes, pickup_time_delay, pickup_distance_km, cluster, avg_pickup_time_minutes, pickup_order_count, city, city_order_count, is_delayed, speed_kmh, speed_status, speed_status_indexed

#### Create Pickup Prediction API (FastAPI)

In [None]:
from fastapi import FastAPI
from pydantic import BaseModel
from pyspark.ml.regression import LinearRegressionModel
from pyspark.ml.classification import DecisionTreeClassificationModel
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler

# Initialize FastAPI App
app = FastAPI()

# Start Spark Session
spark = SparkSession.builder.appName("FastAPI_Pickup").getOrCreate()

# Load Trained Pickup Models
regression_model_pickup = LinearRegressionModel.load("regression_model_pickup")
classification_model_pickup = DecisionTreeClassificationModel.load("classification_model_pickup")

# Define Feature Columns for Pickup Model
pickup_features = ["speed_kmh", "pickup_distance_km", "pickup_eta_minutes", "avg_pickup_time_minutes"]

# Define Request Schema
class PickupRequest(BaseModel):
    speed_kmh: float
    pickup_distance_km: float
    pickup_eta_minutes: float
    avg_pickup_time_minutes: float

# Predict Pickup ETA & Delay
@app.post("/predict/pickup")
def predict_pickup(data: PickupRequest):
    input_data = [[data.speed_kmh, data.pickup_distance_km, data.pickup_eta_minutes, data.avg_pickup_time_minutes]]
    df = spark.createDataFrame(input_data, pickup_features)
    
    # Feature Engineering
    assembler = VectorAssembler(inputCols=pickup_features, outputCol="features")
    df = assembler.transform(df)
    
    # Predict ETA
    eta_prediction = regression_model_pickup.transform(df).select("prediction").collect()[0][0]
    
    # Predict Delay (Binary Classification)
    delay_prediction = classification_model_pickup.transform(df).select("prediction").collect()[0][0]
    
    return {
        "predicted_pickup_eta": round(eta_prediction, 2),
        "predicted_pickup_delay": int(delay_prediction)  # 0 = No Delay, 1 = Delayed
    }

# Root Endpoint
@app.get("/")
def home():
    return {"message": "Pickup Prediction API is running!"}

### Latest

In [7]:
import pickle
import logging
import pandas as pd
import psycopg2
from sqlalchemy import create_engine
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
import numpy as np

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# PostgreSQL Connection Details
DB_URL = "postgresql://postgres:root@localhost:5432/postgres"
TABLE_NAME = "pickup_data"  # Table with raw new data
PREDICTIONS_TABLE = "predictions_results"  # Table to store predictions

# Load Raw Data from PostgreSQL
logger.info("Extracting data from PostgreSQL...")
engine = create_engine(DB_URL)
df = pd.read_sql(f"SELECT * FROM {TABLE_NAME}", engine)
df= df.sample(frac=0.01, random_state=42)
# print(f"Total number of rows: {len(df)}")
logger.info("Data extraction completed.")

# Feature Engineering
logger.info("Performing feature engineering...")

# Convert datetime columns
df["pickup_time"] = pd.to_datetime(df["pickup_time"])
df["accept_time"] = pd.to_datetime(df["accept_time"])

# Calculate new features
df["pickup_eta_minutes"] = (df["pickup_time"] - df["accept_time"]).dt.total_seconds() / 60
# df["avg_pickup_time_minutes"] = df["pickup_eta_minutes"].mean()
df["pickup_time_delay"] = (df["pickup_time"] - df["time_window_start"]).dt.total_seconds() / 60
# Define `is_delayed` based on a threshold (needed for delay prediction model)
# time_threshold = 360  # Set the threshold in minutes
df["is_delayed"] = df["pickup_time_delay"].apply(lambda x: "Delayed" if x > 0 else "On Time")

# Drop unnecessary columns for model input
df_model = df.select_dtypes(include=[np.number])

# Apply Scaling
logger.info("Applying feature scaling...")
scaler = StandardScaler()
scaled_features = scaler.fit_transform(df_model)
logger.info("Feature scaling completed.")

# Apply PCA with k=9
logger.info("Applying PCA with k=9...")
pca = PCA(n_components=10)
pca_features = pca.fit_transform(scaled_features)
pca_columns = [f"pca{i+1}" for i in range(10)]
df_pca = pd.DataFrame(pca_features, columns=pca_columns)
logger.info("PCA transformation completed.")

# Load ETA Prediction Model
logger.info("Loading ETA prediction model...")
with open("eta_model_pickup.pkl", "rb") as f:
    eta_model = pickle.load(f)

# Predict ETA
df_pca["predicted_eta"] = eta_model.predict(df_pca)
logger.info("ETA predictions completed.")

# Load Delay Classification Model
logger.info("Loading Delay classification model...")
with open("delay_model_pickup.pkl", "rb") as f:
    delay_model = pickle.load(f)

# Predict Delay
df_pca["predicted_delay"] = delay_model.predict(df_pca)
logger.info("Delay classification completed.")

# Save Predictions to PostgreSQL
logger.info("Saving predictions to PostgreSQL...")
df_pca["order_id"] = df["order_id"]  # Add back order_id for reference
df_pca[["order_id", "predicted_eta", "predicted_delay"]].to_sql(PREDICTIONS_TABLE, engine, if_exists="replace", index=False)
logger.info("Predictions saved successfully.")

INFO:__main__:Extracting data from PostgreSQL...
INFO:__main__:Data extraction completed.
INFO:__main__:Performing feature engineering...
INFO:__main__:Applying feature scaling...
INFO:__main__:Feature scaling completed.
INFO:__main__:Applying PCA with k=9...
INFO:__main__:PCA transformation completed.
INFO:__main__:Loading ETA prediction model...
INFO:__main__:ETA predictions completed.
INFO:__main__:Loading Delay classification model...
INFO:__main__:Delay classification completed.
INFO:__main__:Saving predictions to PostgreSQL...
INFO:__main__:Predictions saved successfully.


### Flask API

In [7]:
from flask import Flask, request, jsonify
from pyspark.ml.classification import LogisticRegressionModel
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession
import pandas as pd
from xgboost import XGBRegressor
import joblib

# Load Pretrained Models
delay_model = "eta_model_pickup.pkl"
eta_model = "delay_model_pickup.pkl"
# Initialize Flask App
app = Flask(__name__)

# Feature columns (adjust according to your pipeline)
feature_cols = ["speed_status_indexed", "pca1", "pca2", "pca3", "pca4", "pca5", "pca6", "pca7", "pca8", "pca9"]

@app.route("/predict", methods=["POST"])
def predict():
    try:
        # Get JSON input
        input_data = request.get_json()

        # Convert to Pandas DataFrame
        df = pd.DataFrame([input_data])

        # Convert to Spark DataFrame
        spark_df = spark.createDataFrame(df)

        # Assemble Features
        assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
        spark_df = assembler.transform(spark_df)

        # Make Predictions
        delay_pred = delay_model.transform(spark_df).select("prediction").collect()[0][0]
        eta_pred = eta_model.transform(spark_df).select("prediction").collect()[0][0]

        # Return Predictions
        return jsonify({"delay_prediction": int(delay_pred), "eta_prediction": round(eta_pred, 2)})

    except Exception as e:
        return jsonify({"error": str(e)})

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000, debug=True)

 * Serving Flask app '__main__'
 * Debug mode: on


 * Running on all addresses (0.0.0.0)
 * Running on http://127.0.0.1:5000
 * Running on http://192.168.1.233:5000
INFO:werkzeug:[33mPress CTRL+C to quit[0m
INFO:werkzeug: * Restarting with stat


SystemExit: 1

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [11]:
import joblib

# Load and check models
try:
    delay_model = joblib.load("eta_model_pickup.pkl")
    eta_model = joblib.load("delay_model_pickup.pkl")

    print(f"Delay Model Type: {type(delay_model)}")
    print(f"ETA Model Type: {type(eta_model)}")
    
except Exception as e:
    print(f"Error loading models: {e}")

Delay Model Type: <class 'xgboost.sklearn.XGBRegressor'>
ETA Model Type: <class 'sklearn.linear_model._logistic.LogisticRegression'>
