# Pre processing

In [13]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
from pyspark.sql import SparkSession

In [14]:
spark = SparkSession.builder.appName('Spaceship').getOrCreate()

In [15]:
train_data = spark.read.csv("train.csv", inferSchema=True, header=True)
validation_data = spark.read.csv("test.csv", inferSchema=True, header=True)

train_data.show()

+-----------+----------+---------+-----+-------------+----+-----+-----------+---------+------------+------+------+------------------+-----------+
|PassengerId|HomePlanet|CryoSleep|Cabin|  Destination| Age|  VIP|RoomService|FoodCourt|ShoppingMall|   Spa|VRDeck|              Name|Transported|
+-----------+----------+---------+-----+-------------+----+-----+-----------+---------+------------+------+------+------------------+-----------+
|    0001_01|    Europa|    false|B/0/P|  TRAPPIST-1e|39.0|false|        0.0|      0.0|         0.0|   0.0|   0.0|   Maham Ofracculy|      false|
|    0002_01|     Earth|    false|F/0/S|  TRAPPIST-1e|24.0|false|      109.0|      9.0|        25.0| 549.0|  44.0|      Juanna Vines|       true|
|    0003_01|    Europa|    false|A/0/S|  TRAPPIST-1e|58.0| true|       43.0|   3576.0|         0.0|6715.0|  49.0|     Altark Susent|      false|
|    0003_02|    Europa|    false|A/0/S|  TRAPPIST-1e|33.0|false|        0.0|   1283.0|       371.0|3329.0| 193.0|      Sola

In [16]:
fillna_values = {'CryoSleep': False, 
                  'VIP': False,
                  'Age': 0,
                  'VRDeck': 0,
                  'Cabin': 'Unknown/0/Unknown',
                  'Destination': 'Unknown',
                  'ShoppingMall': 0,
                  'Name': '',
                  'RoomService': 0,
                  'FoodCourt': 0,
                  'HomePlanet': 'Unknown',
                  'Spa': 0}

validation_data = validation_data.fillna(value=fillna_values)
train_data = train_data.fillna(value=fillna_values)
train_data.show()

+-----------+----------+---------+-----------------+-------------+----+-----+-----------+---------+------------+------+------+------------------+-----------+
|PassengerId|HomePlanet|CryoSleep|            Cabin|  Destination| Age|  VIP|RoomService|FoodCourt|ShoppingMall|   Spa|VRDeck|              Name|Transported|
+-----------+----------+---------+-----------------+-------------+----+-----+-----------+---------+------------+------+------+------------------+-----------+
|    0001_01|    Europa|    false|            B/0/P|  TRAPPIST-1e|39.0|false|        0.0|      0.0|         0.0|   0.0|   0.0|   Maham Ofracculy|      false|
|    0002_01|     Earth|    false|            F/0/S|  TRAPPIST-1e|24.0|false|      109.0|      9.0|        25.0| 549.0|  44.0|      Juanna Vines|       true|
|    0003_01|    Europa|    false|            A/0/S|  TRAPPIST-1e|58.0| true|       43.0|   3576.0|         0.0|6715.0|  49.0|     Altark Susent|      false|
|    0003_02|    Europa|    false|            A/0/S|

In [17]:
print(train_data.columns)

['PassengerId', 'HomePlanet', 'CryoSleep', 'Cabin', 'Destination', 'Age', 'VIP', 'RoomService', 'FoodCourt', 'ShoppingMall', 'Spa', 'VRDeck', 'Name', 'Transported']


In [18]:
print(train_data.count())
print(validation_data.count())

8693
4277


### Create deck, num, side columns

In [19]:
from pyspark.sql.functions import split

split_col = split(train_data['Cabin'], '/')
train_data = train_data.withColumn('Deck', split_col.getItem(0)) \
       .withColumn('Num', split_col.getItem(1)) \
       .withColumn('Side', split_col.getItem(2))

split_col = split(validation_data['Cabin'], '/')
validation_data = validation_data.withColumn('Deck', split_col.getItem(0)) \
       .withColumn('Num', split_col.getItem(1)) \
       .withColumn('Side', split_col.getItem(2))

### Create passenger group column

In [20]:
split_col = split(train_data['PassengerId'], '_')
train_data = train_data.withColumn('GroupId', split_col.getItem(0)) \
       .withColumn('GroupNum', split_col.getItem(1)) 

split_col = split(validation_data['PassengerId'], '_')
validation_data = validation_data.withColumn('GroupId', split_col.getItem(0)) \
       .withColumn('GroupNum', split_col.getItem(1)) 

### Create First and Second name calumns

In [21]:
split_col = split(train_data['Name'], ' ')
train_data = train_data.withColumn('FirstName', split_col.getItem(0)) \
       .withColumn('SecondName', split_col.getItem(1)) 

split_col = split(validation_data['Name'], ' ')
validation_data = validation_data.withColumn('FirstName', split_col.getItem(0)) \
       .withColumn('SecondName', split_col.getItem(1)) 

### Convert the categorical columns

In [22]:
from pyspark.ml.feature import StringIndexer

# Indexing the 'Deck' column
indexer_deck = StringIndexer(inputCol="Deck", outputCol="DeckIndex")
train_data = indexer_deck.fit(train_data).transform(train_data)
validation_data = indexer_deck.fit(validation_data).transform(validation_data)

# Indexing the 'SecondName' column
indexer_second_name = StringIndexer(inputCol="SecondName", outputCol="SecondNameIndex")
train_data = indexer_second_name.fit(train_data).transform(train_data)
validation_data = indexer_second_name.fit(validation_data).transform(validation_data)

### Convert the integer columns

In [23]:
from pyspark.sql.functions import col

train_data = train_data.withColumn("CryoSleep", col("CryoSleep").cast("int")) \
                       .withColumn("Num", col("Num").cast("int")) \
                       .withColumn("GroupId", col("GroupId").cast("int")) \
                       .withColumn("GroupNum", col("GroupNum").cast("int")) \
                       .withColumn("VIP", col("VIP").cast("int")) \
                       .withColumn("Transported", col("Transported").cast("int"))

validation_data = validation_data.withColumn("CryoSleep", col("CryoSleep").cast("int")) \
                       .withColumn("Num", col("Num").cast("int")) \
                       .withColumn("GroupId", col("GroupId").cast("int")) \
                       .withColumn("GroupNum", col("GroupNum").cast("int")) \
                       .withColumn("VIP", col("VIP").cast("int")) 

### Apply one-hot encoding

In [24]:
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml import Pipeline
# Indexing and encoding for 'HomePlanet'
indexer_home = StringIndexer(inputCol="HomePlanet", outputCol="HomePlanetIndex")
encoder_home = OneHotEncoder(inputCol="HomePlanetIndex", outputCol="HomePlanetVec")

# Indexing and encoding for 'Destination'
indexer_dest = StringIndexer(inputCol="Destination", outputCol="DestinationIndex")
encoder_dest = OneHotEncoder(inputCol="DestinationIndex", outputCol="DestinationVec")

# Indexing and encoding for 'Side'
indexer_side = StringIndexer(inputCol="Side", outputCol="SideIndex")
encoder_side = OneHotEncoder(inputCol="SideIndex", outputCol="SideVec")

# Build the pipeline
pipeline = Pipeline(stages=[indexer_home, encoder_home, indexer_dest, encoder_dest, indexer_side, encoder_side])

# Fit and transform the data
model = pipeline.fit(train_data)
train_data = model.transform(train_data)

In [25]:
train_data.columns

['PassengerId',
 'HomePlanet',
 'CryoSleep',
 'Cabin',
 'Destination',
 'Age',
 'VIP',
 'RoomService',
 'FoodCourt',
 'ShoppingMall',
 'Spa',
 'VRDeck',
 'Name',
 'Transported',
 'Deck',
 'Num',
 'Side',
 'GroupId',
 'GroupNum',
 'FirstName',
 'SecondName',
 'DeckIndex',
 'SecondNameIndex',
 'HomePlanetIndex',
 'HomePlanetVec',
 'DestinationIndex',
 'DestinationVec',
 'SideIndex',
 'SideVec']

## Creating MLP Model

In [26]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.feature import VectorAssembler

featureColumnNames = [
 'CryoSleep',
 'Age',
 'VIP',
 'RoomService',
 'FoodCourt',
 'ShoppingMall',
 'Spa',
 'VRDeck',
 'Num',
 'Side',
 'GroupId',
 'GroupNum',
 'DeckIndex',
 'SecondNameIndex',
 'HomePlanetVec',
 'DestinationVec',
 'SideVec'
 ]

# Combine feature columns into a single vector column
assembler = VectorAssembler(
    inputCols=featureColumnNames,
    outputCol="features"
)

assembled_train_data = assembler.transform(train_data)
(train, test) = assembled_train_data.randomSplit([0.8, 0.2])
layers = [10, 5, 4, 2]

# Create the trainer and set its parameters
mlp = MultilayerPerceptronClassifier(
    featuresCol='features', 
    labelCol='label', 
    maxIter=100, 
    layers=layers, 
    blockSize=128, 
    seed=1234
)