In [2]:
# PySpark Imports
from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql import SQLContext

# PySpark ML Imports
from pyspark.ml import Pipeline
from pyspark.mllib.linalg import Vectors
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.param import Param, Params
from pyspark.ml.feature import Bucketizer, VectorAssembler, StringIndexer

# Local Imports
from _dataset import Dataset 

# Other Imports
import pandas as pd
import duckdb
import numpy as np
import os
import sys

In [7]:
# System paths
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# Database path
DATABASE_PATH = "../database/DDBB_duckdb.duckdb"


def fetch_duckdb() -> list[pd.DataFrame]:
    """
    Fetches all the required data from the database and returns an array of dataframes.
    TEMP: Only data from the movies table is being fetched right now. Expand to writers
    
    :param
    """
    con = duckdb.connect(database=DATABASE_PATH, read_only=False)
    df = con.execute('''
        WITH director_avg_scores AS (
        SELECT 
            d.director_id,
            COALESCE(SUM(CASE WHEN m.label THEN 1 ELSE 0 END) / NULLIF(COUNT(*), 0), 0.5) AS director_avg_score
        FROM 
            directing d
        INNER JOIN 
            movies m ON d.movie_id = m.movie_id
        WHERE 
            m.subset = 'train'
        GROUP BY 
            d.director_id
    ),
    director_scores AS (
        SELECT 
            d.movie_id,
            COUNT(d.director_id) AS director_count,
            AVG(COALESCE(das.director_avg_score, 0.5)) AS director_avg_score
        FROM 
            directing d
        LEFT JOIN 
            director_avg_scores das ON das.director_id = d.director_id
        GROUP BY 
            d.movie_id
    ),
    writer_avg_scores AS (
        SELECT 
            w.writer_id,
            COALESCE(SUM(CASE WHEN m.label THEN 1 ELSE 0 END) / NULLIF(COUNT(*), 0), 0.5) AS writer_avg_score
        FROM 
            writing w
        INNER JOIN 
            movies m ON w.movie_id = m.movie_id
        WHERE 
            m.subset = 'train'
        GROUP BY 
            w.writer_id
    ),
    writer_scores AS (
        SELECT 
            w.movie_id,
            COUNT(w.writer_id) AS writer_count,
            AVG(COALESCE(was.writer_avg_score, 0.5)) AS writer_avg_score
        FROM 
            writing w
        LEFT JOIN 
            writer_avg_scores was ON w.writer_id = was.writer_id
        GROUP BY 
            w.movie_id
    )
    SELECT
        m.subset, 
        m.movie_id,
        m.num_votes,
        m.runtime_min,
        m.title_length,
        ds.director_avg_score,
        COALESCE(ds.director_count, 0) AS director_count,
        m.label,
        ws.writer_avg_score,
        COALESCE(ws.writer_count, 0) AS writer_count
    FROM 
        movies m
    LEFT JOIN 
        director_scores ds ON m.movie_id = ds.movie_id
    LEFT JOIN 
        writer_scores ws ON m.movie_id = ws.movie_id
    WHERE 
        m.subset = 'train';
    ''').fetch_df()
    con.close()
    
    
    train = df[df['subset'] == 'train'].drop(['subset'], axis=1)
    test = df[df['subset'] == 'test'].drop(['subset', 'label'], axis=1)
    validation = df[df['subset'] == 'val'].drop(['subset', 'label'], axis=1)
    
    train['label'] = train['label'].astype(str)
    
    return train, test, validation

def generate_pipeline(features: list) -> Pipeline:
    """
    Function to generate the Spark pipeline based on the following operations:
        - Assembling (choosing) the desired features (numeric).
        - Index the selected features to be processed by the pipeline (strings).
        - Initializing the pipeline based on the indexed features.
    
    :param
    """
    assembler = VectorAssembler(inputCols=features, outputCol="features")
    indexer = StringIndexer(inputCol="label").setOutputCol("label-index")
    pipeline = Pipeline().setStages([assembler, indexer])
    return pipeline

def generate_output_pipeline(features: list) -> Pipeline:
    """
    Function to generate the Spark pipeline based on the following operations:
        - Assembling (choosing) the desired features (numeric).
        - Index the selected features to be processed by the pipeline (strings).
        - Initializing the pipeline based on the indexed features.
    
    :param
    """
    assembler = VectorAssembler(inputCols=features, outputCol="features")
    pipeline = Pipeline().setStages([assembler])
    return pipeline

    
def create_submission(model, validation, test, features) -> None:
    """
    Create the required submission file in .csv format
    
    :param model: PySpark generated binary classifier
    """    
    pipeline = generate_output_pipeline(features)
    pipeline_fit = pipeline.fit(validation)
    p_val = pipeline_fit.transform(validation)
    p_test = pipeline_fit.transform(test)
    
    validation = validation.toPandas()  
    test = test.toPandas()  
    
    validation["label"] = model.transform(p_val).select('prediction').collect().tolist
    test["label"] = model.transform(p_test).select('prediction').collect().tolist

    # Cast to bool and store in .csv
    validation["label"].astype(bool).to_csv("val_result.csv", index=False, header=None)
    test["label"].astype(bool).to_csv("test_result.csv", index=False, header=None)

    # Generate final submission
    for file in ["val_result.csv", "test_result.csv"]:
        with open(file, 'r+') as f:
            f.seek(0,2)                    
            size=f.tell()               
            f.truncate(size-2)
    
def main() -> None:
    """
    Main PySpark pipeline execution.
    
    :param
    """
    # Initialize PySpark Context
    conf = SparkConf().setAppName("binary-ml-classification")
    sc = SparkContext.getOrCreate(conf)
    sqlContext = SparkSession.builder.getOrCreate()
    
    # Fetch data and process features to obtain a Spark Dataframe
    train, test, validation = fetch_duckdb()
    features = ["runtime_min", "num_votes", "director_avg_score",	"director_count", "writer_avg_score","writer_count"]
    df_train = sqlContext.createDataFrame(train)
    
    # Generate the pipeline
    pipeline = generate_pipeline(features)
    
    # Fit the pipeline using the Spark Dataframe
    pipeline_fit = pipeline.fit(df_train)  
    
    # Generate and train the model
    prepared = pipeline_fit.transform(df_train)
    dt = DecisionTreeClassifier(labelCol = "label-index", featuresCol= "features")
    dt_model = dt.fit(prepared)
    
    # Read output generation files
    df_validation = sqlContext.createDataFrame(validation)
    df_test = sqlContext.createDataFrame(test)

    # create_submission(dt_model, df_validation, df_test)
    

In [8]:
if __name__ == '__main__':
    main()

Py4JJavaError: An error occurred while calling o403.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 14.0 failed 1 times, most recent failure: Lost task 15.0 in stage 14.0 (TID 101) (host.docker.internal executor driver): org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`VectorAssembler$$Lambda$1398/10817141`: (struct<runtime_min_double_VectorAssembler_4a73b4fbba0e:double,num_votes_double_VectorAssembler_4a73b4fbba0e:double,director_avg_score:double,director_count_double_VectorAssembler_4a73b4fbba0e:double,writer_avg_score:double,writer_count_double_VectorAssembler_4a73b4fbba0e:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:260)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:260)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$aggregate$2(RDD.scala:1226)
	at org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2492)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Encountered NaN while assembling a row with handleInvalid = "error". Consider
removing NaNs from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:264)
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
	at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
	... 28 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2493)
	at org.apache.spark.rdd.RDD.$anonfun$aggregate$1(RDD.scala:1228)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1221)
	at org.apache.spark.ml.tree.impl.DecisionTreeMetadata$.buildMetadata(DecisionTreeMetadata.scala:125)
	at org.apache.spark.ml.tree.impl.RandomForest$.run(RandomForest.scala:274)
	at org.apache.spark.ml.classification.DecisionTreeClassifier.$anonfun$train$1(DecisionTreeClassifier.scala:143)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.classification.DecisionTreeClassifier.train(DecisionTreeClassifier.scala:116)
	at org.apache.spark.ml.classification.DecisionTreeClassifier.train(DecisionTreeClassifier.scala:48)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:114)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`VectorAssembler$$Lambda$1398/10817141`: (struct<runtime_min_double_VectorAssembler_4a73b4fbba0e:double,num_votes_double_VectorAssembler_4a73b4fbba0e:double,director_avg_score:double,director_count_double_VectorAssembler_4a73b4fbba0e:double,writer_avg_score:double,writer_count_double_VectorAssembler_4a73b4fbba0e:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:260)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:260)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$aggregate$2(RDD.scala:1226)
	at org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2492)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more
Caused by: org.apache.spark.SparkException: Encountered NaN while assembling a row with handleInvalid = "error". Consider
removing NaNs from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:264)
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
	at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
	... 28 more
