In [24]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report

In [28]:
# Initialize Spark Session
spark = SparkSession.builder.appName("MLBinaryClassifier").getOrCreate()

# Load all train-X.csv files
train_files = [f"../data/train-{i}.csv" for i in range(1, 9)]
df = spark.read.csv(train_files, header=True, inferSchema=True)

# Load additional data
directing_df = spark.read.json("../data/directing.json")
writing_df = spark.read.json("../data/writing.json")

# Merge additional information if a common key exists
if 'movie' in df.columns:
    df = df.join(directing_df, 'movie', 'left')
    df = df.join(writing_df, 'movie', 'left')

# Handle categorical data
categorical_cols = [col_name for col_name, dtype in df.dtypes if dtype == "string"]
for col_name in categorical_cols:
    indexer = StringIndexer(inputCol=col_name, outputCol=f"{col_name}_index").setHandleInvalid("keep")
    df = indexer.fit(df).transform(df)
    df = df.drop(col_name)  # Drop original categorical columns

# Convert label column to numeric
df = df.withColumn("label", col("label").cast("double"))

# Handle missing values
df = df.fillna(0)

# Define features assembler
feature_cols = [col_name for col_name in df.columns if col_name != "label"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = assembler.transform(df)

# # Standardize numeric features
# scaler = StandardScaler()
# scaler_model = scaler.fit(df)
# df = scaler_model.transform(df)

# # Split data
# train_df, val_df = df.randomSplit([0.8, 0.2], seed=42)

# # Train a model
# rf = RandomForestClassifier(featuresCol="scaled_features", labelCol="label", numTrees=100, seed=42)
# model = rf.fit(train_df)

# # Evaluate the model
# y_pred = model.transform(val_df)
# evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
# accuracy = evaluator.evaluate(y_pred)

# print("Model Accuracy:", accuracy)


In [29]:
df

DataFrame[_c0: int, numVotes: double, label: double, tconst_index: double, primaryTitle_index: double, originalTitle_index: double, startYear_index: double, endYear_index: double, runtimeMinutes_index: double, features: vector]

In [19]:
directing_df

Unnamed: 0,movie,director
0,tt0003740,nm0665163
1,tt0008663,nm0803705
2,tt0009369,nm0428059
3,tt0009369,nm0949648
4,tt0010307,nm0304098
...,...,...
11157,tt9850344,nm0284774
11158,tt9850386,nm0550881
11159,tt9900782,nm7992231
11160,tt9904802,nm0052054
