In [234]:
import sys
sys.path.append('../../../kaggleLibrary')

from judeKaggleLibrary import load_data, create_csv

In [235]:
train_df, test_df = load_data("spaceship-titanic")

In [236]:
test_df.show()

+-----------+----------+---------+------+-----------+----+-----+-----------+---------+------------+------+------+-----------------+
|PassengerId|HomePlanet|CryoSleep| Cabin|Destination| Age|  VIP|RoomService|FoodCourt|ShoppingMall|   Spa|VRDeck|             Name|
+-----------+----------+---------+------+-----------+----+-----+-----------+---------+------------+------+------+-----------------+
|    0013_01|     Earth|     true| G/3/S|TRAPPIST-1e|27.0|false|        0.0|      0.0|         0.0|   0.0|   0.0|  Nelly Carsoning|
|    0018_01|     Earth|    false| F/4/S|TRAPPIST-1e|19.0|false|        0.0|      9.0|         0.0|2823.0|   0.0|   Lerome Peckers|
|    0019_01|    Europa|     true| C/0/S|55 Cancri e|31.0|false|        0.0|      0.0|         0.0|   0.0|   0.0|  Sabih Unhearfus|
|    0021_01|    Europa|    false| C/1/S|TRAPPIST-1e|38.0|false|        0.0|   6652.0|         0.0| 181.0| 585.0| Meratz Caltilter|
|    0023_01|     Earth|    false| F/5/S|TRAPPIST-1e|20.0|false|       10.0|

In [237]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import countDistinct

train_df.show()

train_df.describe().show()

# Count unique values for each column
for column in train_df.columns:
    unique_count = train_df.agg(countDistinct(column).alias(column)).collect()[0][column]
    print(f"Number of unique values in {column}: {unique_count}")

+-----------+----------+---------+-----+-------------+----+-----+-----------+---------+------------+------+------+------------------+-----------+
|PassengerId|HomePlanet|CryoSleep|Cabin|  Destination| Age|  VIP|RoomService|FoodCourt|ShoppingMall|   Spa|VRDeck|              Name|Transported|
+-----------+----------+---------+-----+-------------+----+-----+-----------+---------+------------+------+------+------------------+-----------+
|    0001_01|    Europa|    false|B/0/P|  TRAPPIST-1e|39.0|false|        0.0|      0.0|         0.0|   0.0|   0.0|   Maham Ofracculy|      false|
|    0002_01|     Earth|    false|F/0/S|  TRAPPIST-1e|24.0|false|      109.0|      9.0|        25.0| 549.0|  44.0|      Juanna Vines|       true|
|    0003_01|    Europa|    false|A/0/S|  TRAPPIST-1e|58.0| true|       43.0|   3576.0|         0.0|6715.0|  49.0|     Altark Susent|      false|
|    0003_02|    Europa|    false|A/0/S|  TRAPPIST-1e|33.0|false|        0.0|   1283.0|       371.0|3329.0| 193.0|      Sola

Dropping Useless Features

In [238]:
train_df = train_df.drop(*["Name"])

Eliminating all null values as they seem to be a small proportion (less than 4%)

In [239]:
train_df = train_df.na.drop()

In [240]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import countDistinct

train_df.show()

train_df.describe().show()

# Count unique values for each column
for column in train_df.columns:
    unique_count = train_df.agg(countDistinct(column).alias(column)).collect()[0][column]
    print(f"Number of unique values in {column}: {unique_count}")

+-----------+----------+---------+-----+-------------+----+-----+-----------+---------+------------+------+------+-----------+
|PassengerId|HomePlanet|CryoSleep|Cabin|  Destination| Age|  VIP|RoomService|FoodCourt|ShoppingMall|   Spa|VRDeck|Transported|
+-----------+----------+---------+-----+-------------+----+-----+-----------+---------+------------+------+------+-----------+
|    0001_01|    Europa|    false|B/0/P|  TRAPPIST-1e|39.0|false|        0.0|      0.0|         0.0|   0.0|   0.0|      false|
|    0002_01|     Earth|    false|F/0/S|  TRAPPIST-1e|24.0|false|      109.0|      9.0|        25.0| 549.0|  44.0|       true|
|    0003_01|    Europa|    false|A/0/S|  TRAPPIST-1e|58.0| true|       43.0|   3576.0|         0.0|6715.0|  49.0|      false|
|    0003_02|    Europa|    false|A/0/S|  TRAPPIST-1e|33.0|false|        0.0|   1283.0|       371.0|3329.0| 193.0|      false|
|    0004_01|     Earth|    false|F/1/S|  TRAPPIST-1e|16.0|false|      303.0|     70.0|       151.0| 565.0|   2

Splitting up Cabin into distinct categories

In [241]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.types import IntegerType
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol
from pyspark.sql.functions import split

class SplitFeature(Transformer, HasInputCol, HasOutputCol):
    def __init__(self, inputCol=None, outputCol=None, delimiter="/"):
        super(SplitFeature, self).__init__()
        self._setDefault(inputCol=inputCol, outputCol=outputCol)
        self.delimiter = delimiter

    def _transform(self, dataset):
        splits = split(col(self.getInputCol()), self.delimiter)
        return dataset \
            .withColumn(self.getOutputCol() + '_Deck', splits.getItem(0)) \
            .withColumn(self.getOutputCol() + '_Num', splits.getItem(1)) \
            .withColumn(self.getOutputCol() + '_Side', splits.getItem(2)) \
            .drop(self.getInputCol())


In [242]:
# from pyspark.ml import Pipeline

# splitFeature = SplitFeature(inputCol="Cabin", outputCol="Cabin", delimiter="/")

# pipeline = Pipeline(stages=[splitFeature])
# train_df = pipeline.fit(train_df).transform(train_df)
# train_df.show()

Casting Target Variable into Int

In [243]:
from pyspark.sql.functions import col

train_df = train_df.withColumn("Transported", col("Transported").cast("integer"))

Creating the model

In [244]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline


# Feature Types
categorical_features = ['HomePlanet', 'Destination', 'Cabin']
numerical_features = ['Age', 'RoomService', 'FoodCourt', 'ShoppingMall', 'Spa', 'VRDeck']
boolean_features = ['CryoSleep', 'VIP']

# Defining Assembler
# splitFeature = SplitFeature(inputCol="Cabin", outputCol="Cabin", delimiter="/")

# Assembler Steps
indexers = [StringIndexer(inputCol=column, outputCol=column + "_index", handleInvalid="keep") for column in categorical_features]
encoders = [OneHotEncoder(inputCols=[column + "_index"], outputCols=[column + "_vec"]) for column in categorical_features]

assembler = VectorAssembler(
    inputCols=[col + "_vec" for col in categorical_features] + numerical_features + boolean_features,
    outputCol="features",
    handleInvalid="keep")

# Initialize the logistic regression model with the correct label column
rf = RandomForestClassifier(featuresCol="features", labelCol="Transported") 

# Combine all stages into a pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler, rf])

# Fit the pipeline on the training data
model = pipeline.fit(train_df)

Make Predictions

In [245]:
# test_df = test_df.na.drop()

# Make predictions on the test data
predictions = model.transform(test_df)

predictions = predictions.withColumnRenamed("prediction", "Transported")

# Show some prediction results
predictions.select("PassengerId", "Transported").show()

+-----------+-----------+
|PassengerId|Transported|
+-----------+-----------+
|    0013_01|        1.0|
|    0018_01|        0.0|
|    0019_01|        1.0|
|    0021_01|        0.0|
|    0023_01|        0.0|
|    0027_01|        0.0|
|    0029_01|        1.0|
|    0032_01|        1.0|
|    0032_02|        1.0|
|    0033_01|        0.0|
|    0037_01|        0.0|
|    0040_01|        0.0|
|    0040_02|        0.0|
|    0042_01|        1.0|
|    0046_01|        0.0|
|    0046_02|        0.0|
|    0046_03|        0.0|
|    0047_01|        1.0|
|    0047_02|        0.0|
|    0047_03|        0.0|
+-----------+-----------+
only showing top 20 rows



Create CSV

In [246]:
from pyspark.sql.functions import when

# Assuming 'Transported' is currently a numeric column where 1 represents 'true' and 0 represents 'false'
predictions = predictions.withColumn("Transported", when(predictions["Transported"] == 1, True).otherwise(False))

In [247]:
create_csv("spaceship-titanic", predictions, cols=["PassengerId", "Transported"])