In [1]:
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

%matplotlib inline

In [2]:
sc = SparkSession.builder.appName('Example')\
     .config('spark.sql.shuffle.partitions', '50')\
     .config('spark.driver.maxResultSize', '5g')\
     .config('spark.sql.execution.arrow.enabled', 'true')\
     .getOrCreate()

### Load and Preprocess Data

In [3]:
dataframe = sc.read.csv('data/titanic_train.csv', header=True)

In [4]:
dataframe.show(3)

+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male| 22|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female| 26|    0|    0|STON/O2. 3101282|  7.925| null|       S|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
only showing top 3 rows



In [5]:
dataframe.dtypes

[('PassengerId', 'string'),
 ('Survived', 'string'),
 ('Pclass', 'string'),
 ('Name', 'string'),
 ('Sex', 'string'),
 ('Age', 'string'),
 ('SibSp', 'string'),
 ('Parch', 'string'),
 ('Ticket', 'string'),
 ('Fare', 'string'),
 ('Cabin', 'string'),
 ('Embarked', 'string')]

In [6]:
dataframe.printSchema()

root
 |-- PassengerId: string (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- SibSp: string (nullable = true)
 |-- Parch: string (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: string (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [7]:
dataframe.count()

891

#### Drop Duplicates

In [8]:
dataframe_drp = dataframe.dropDuplicates()
dataframe_drp.count()

891

In [9]:
df = dataframe_drp.drop(*['name', 'Ticket', 'Cabin'])
df.show()

+-----------+--------+------+------+----+-----+-----+--------+--------+
|PassengerId|Survived|Pclass|   Sex| Age|SibSp|Parch|    Fare|Embarked|
+-----------+--------+------+------+----+-----+-----+--------+--------+
|          3|       1|     3|female|  26|    0|    0|   7.925|       S|
|         64|       0|     3|  male|   4|    3|    2|    27.9|       S|
|         77|       0|     3|  male|null|    0|    0|  7.8958|       S|
|        142|       1|     3|female|  22|    0|    0|    7.75|       S|
|        176|       0|     3|  male|  18|    1|    1|  7.8542|       S|
|        191|       1|     2|female|  32|    0|    0|      13|       S|
|        225|       1|     1|  male|  38|    1|    0|      90|       S|
|        239|       0|     2|  male|  19|    0|    0|    10.5|       S|
|        247|       0|     3|female|  25|    0|    0|   7.775|       S|
|        278|       0|     2|  male|null|    0|    0|       0|       S|
|        374|       0|     1|  male|  22|    0|    0|135.6333|  

#### Handle Missing values

In [10]:
def missing_val(df, cols):
    vals = []
    for val in cols:
        c = df.filter(df[val].isNull()).count()
        vals.append({val: c})
    return vals

In [11]:
cols = ['PassengerId', 'Survived', 'Pclass', 'Sex', 'Age', 'SibSp', 'Parch', 'Fare', 'Embarked']
missing_val(df,cols)

[{'PassengerId': 0},
 {'Survived': 0},
 {'Pclass': 0},
 {'Sex': 0},
 {'Age': 177},
 {'SibSp': 0},
 {'Parch': 0},
 {'Fare': 0},
 {'Embarked': 2}]

In [12]:
df = df.dropna(how='all', subset=['Embarked'])
df.count()

889

In [13]:
df_new = df.withColumn('Age', df['Age'].cast(IntegerType()))
df_new = df_new.withColumn('PassengerId', df['PassengerId'].cast(IntegerType()))
df_new = df_new.withColumn('Survived', df['Survived'].cast(IntegerType()))
df_new = df_new.withColumn('Fare', df['Fare'].cast(FloatType()))

df_new.dtypes

[('PassengerId', 'int'),
 ('Survived', 'int'),
 ('Pclass', 'string'),
 ('Sex', 'string'),
 ('Age', 'int'),
 ('SibSp', 'string'),
 ('Parch', 'string'),
 ('Fare', 'float'),
 ('Embarked', 'string')]

In [14]:
col_mean = df_new.agg({'Age': 'mean'}).collect()[0][0]
df_new = df_new.fillna(col_mean, subset=['Age'])

df_new.count()

889

In [15]:
cols = ['PassengerId', 'Survived', 'Pclass', 'Sex', 'Age', 'SibSp', 'Parch', 'Fare', 'Embarked']
missing_val(df_new, cols)

[{'PassengerId': 0},
 {'Survived': 0},
 {'Pclass': 0},
 {'Sex': 0},
 {'Age': 0},
 {'SibSp': 0},
 {'Parch': 0},
 {'Fare': 0},
 {'Embarked': 0}]

#### Scale data

In [16]:
"""minm = df_new.agg({'Age': 'min'}).collect()[0][0]
maxm = df_new.agg({'Age': 'max'}).collect()[0][0]
    
df_scaled = df_new.withColumn('scaled_' + 'Age', (df_new['Age'] - minm) / (maxm - minm))
df_scaled.show(5)"""

"minm = df_new.agg({'Age': 'min'}).collect()[0][0]\nmaxm = df_new.agg({'Age': 'max'}).collect()[0][0]\n    \ndf_scaled = df_new.withColumn('scaled_' + 'Age', (df_new['Age'] - minm) / (maxm - minm))\ndf_scaled.show(5)"

In [17]:
def minmaxScale(df, col):
    minm = df.agg({col: 'min'}).collect()[0][0]
    maxm = df.agg({col: 'max'}).collect()[0][0]
    global df_scaled
    df_scaled = df.withColumn('scaled_' + col, (df[col] - minm) / (maxm - minm))
    return df_scaled.show(5)

In [18]:
minmaxScale(df_new, 'Age')

+-----------+--------+------+------+---+-----+-----+------+--------+----------+
|PassengerId|Survived|Pclass|   Sex|Age|SibSp|Parch|  Fare|Embarked|scaled_Age|
+-----------+--------+------+------+---+-----+-----+------+--------+----------+
|          3|       1|     3|female| 26|    0|    0| 7.925|       S|     0.325|
|         64|       0|     3|  male|  4|    3|    2|  27.9|       S|      0.05|
|         77|       0|     3|  male| 29|    0|    0|7.8958|       S|    0.3625|
|        142|       1|     3|female| 22|    0|    0|  7.75|       S|     0.275|
|        176|       0|     3|  male| 18|    1|    1|7.8542|       S|     0.225|
+-----------+--------+------+------+---+-----+-----+------+--------+----------+
only showing top 5 rows



In [19]:
minmaxScale(df_scaled, 'Fare')

+-----------+--------+------+------+---+-----+-----+------+--------+----------+--------------------+
|PassengerId|Survived|Pclass|   Sex|Age|SibSp|Parch|  Fare|Embarked|scaled_Age|         scaled_Fare|
+-----------+--------+------+------+---+-----+-----+------+--------+----------+--------------------+
|          3|       1|     3|female| 26|    0|    0| 7.925|       S|     0.325|0.015468569476752568|
|         64|       0|     3|  male|  4|    3|    2|  27.9|       S|      0.05|0.054457169982804386|
|         77|       0|     3|  male| 29|    0|    0|7.8958|       S|    0.3625| 0.01541157472472606|
|        142|       1|     3|female| 22|    0|    0|  7.75|       S|     0.275|0.015126991868717687|
|        176|       0|     3|  male| 18|    1|    1|7.8542|       S|     0.225|0.015330376492345367|
+-----------+--------+------+------+---+-----+-----+------+--------+----------+--------------------+
only showing top 5 rows



#### One Hot encoding

In [20]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

In [21]:
"""stringIndexer = StringIndexer(inputCol='Survived', outputCol='Survived_Index')

model = stringIndexer.fit(df_scaled)

indexed = model.transform(df_scaled)

encoder = OneHotEncoder(inputCol='Survived_Index', outputCol='Suvrvived_Vec')

ohe = encoder.fit(indexed)
encoded_df = ohe.transform(indexed)

encoded_df.show(3)"""

"stringIndexer = StringIndexer(inputCol='Survived', outputCol='Survived_Index')\n\nmodel = stringIndexer.fit(df_scaled)\n\nindexed = model.transform(df_scaled)\n\nencoder = OneHotEncoder(inputCol='Survived_Index', outputCol='Suvrvived_Vec')\n\nohe = encoder.fit(indexed)\nencoded_df = ohe.transform(indexed)\n\nencoded_df.show(3)"

In [22]:
def oneHot(df, col):
    global encoded_df
    
    stringIndexer = StringIndexer(inputCol=col, outputCol= col + '_Index')

    model = stringIndexer.fit(df)

    indexed = model.transform(df)

    encoder = OneHotEncoder(inputCol=col + '_Index', outputCol=col + '_Vec')

    ohe = encoder.fit(indexed)
    encoded_df = ohe.transform(indexed)

    encoded_df.show(3)
    
    return encoded_df

In [23]:
oneHot(df_scaled, 'Survived')

+-----------+--------+------+------+---+-----+-----+------+--------+----------+--------------------+--------------+-------------+
|PassengerId|Survived|Pclass|   Sex|Age|SibSp|Parch|  Fare|Embarked|scaled_Age|         scaled_Fare|Survived_Index| Survived_Vec|
+-----------+--------+------+------+---+-----+-----+------+--------+----------+--------------------+--------------+-------------+
|          3|       1|     3|female| 26|    0|    0| 7.925|       S|     0.325|0.015468569476752568|           1.0|    (1,[],[])|
|         64|       0|     3|  male|  4|    3|    2|  27.9|       S|      0.05|0.054457169982804386|           0.0|(1,[0],[1.0])|
|         77|       0|     3|  male| 29|    0|    0|7.8958|       S|    0.3625| 0.01541157472472606|           0.0|(1,[0],[1.0])|
+-----------+--------+------+------+---+-----+-----+------+--------+----------+--------------------+--------------+-------------+
only showing top 3 rows



DataFrame[PassengerId: int, Survived: int, Pclass: string, Sex: string, Age: int, SibSp: string, Parch: string, Fare: float, Embarked: string, scaled_Age: double, scaled_Fare: double, Survived_Index: double, Survived_Vec: vector]

In [24]:
oneHot(encoded_df, 'Sex')

+-----------+--------+------+------+---+-----+-----+------+--------+----------+--------------------+--------------+-------------+---------+-------------+
|PassengerId|Survived|Pclass|   Sex|Age|SibSp|Parch|  Fare|Embarked|scaled_Age|         scaled_Fare|Survived_Index| Survived_Vec|Sex_Index|      Sex_Vec|
+-----------+--------+------+------+---+-----+-----+------+--------+----------+--------------------+--------------+-------------+---------+-------------+
|          3|       1|     3|female| 26|    0|    0| 7.925|       S|     0.325|0.015468569476752568|           1.0|    (1,[],[])|      1.0|    (1,[],[])|
|         64|       0|     3|  male|  4|    3|    2|  27.9|       S|      0.05|0.054457169982804386|           0.0|(1,[0],[1.0])|      0.0|(1,[0],[1.0])|
|         77|       0|     3|  male| 29|    0|    0|7.8958|       S|    0.3625| 0.01541157472472606|           0.0|(1,[0],[1.0])|      0.0|(1,[0],[1.0])|
+-----------+--------+------+------+---+-----+-----+------+--------+--------

DataFrame[PassengerId: int, Survived: int, Pclass: string, Sex: string, Age: int, SibSp: string, Parch: string, Fare: float, Embarked: string, scaled_Age: double, scaled_Fare: double, Survived_Index: double, Survived_Vec: vector, Sex_Index: double, Sex_Vec: vector]

In [25]:
oneHot(encoded_df, 'SibSp')

+-----------+--------+------+------+---+-----+-----+------+--------+----------+--------------------+--------------+-------------+---------+-------------+-----------+-------------+
|PassengerId|Survived|Pclass|   Sex|Age|SibSp|Parch|  Fare|Embarked|scaled_Age|         scaled_Fare|Survived_Index| Survived_Vec|Sex_Index|      Sex_Vec|SibSp_Index|    SibSp_Vec|
+-----------+--------+------+------+---+-----+-----+------+--------+----------+--------------------+--------------+-------------+---------+-------------+-----------+-------------+
|          3|       1|     3|female| 26|    0|    0| 7.925|       S|     0.325|0.015468569476752568|           1.0|    (1,[],[])|      1.0|    (1,[],[])|        0.0|(6,[0],[1.0])|
|         64|       0|     3|  male|  4|    3|    2|  27.9|       S|      0.05|0.054457169982804386|           0.0|(1,[0],[1.0])|      0.0|(1,[0],[1.0])|        4.0|(6,[4],[1.0])|
|         77|       0|     3|  male| 29|    0|    0|7.8958|       S|    0.3625| 0.01541157472472606|

DataFrame[PassengerId: int, Survived: int, Pclass: string, Sex: string, Age: int, SibSp: string, Parch: string, Fare: float, Embarked: string, scaled_Age: double, scaled_Fare: double, Survived_Index: double, Survived_Vec: vector, Sex_Index: double, Sex_Vec: vector, SibSp_Index: double, SibSp_Vec: vector]

In [26]:
oneHot(encoded_df, 'Parch')

+-----------+--------+------+------+---+-----+-----+------+--------+----------+--------------------+--------------+-------------+---------+-------------+-----------+-------------+-----------+-------------+
|PassengerId|Survived|Pclass|   Sex|Age|SibSp|Parch|  Fare|Embarked|scaled_Age|         scaled_Fare|Survived_Index| Survived_Vec|Sex_Index|      Sex_Vec|SibSp_Index|    SibSp_Vec|Parch_Index|    Parch_Vec|
+-----------+--------+------+------+---+-----+-----+------+--------+----------+--------------------+--------------+-------------+---------+-------------+-----------+-------------+-----------+-------------+
|          3|       1|     3|female| 26|    0|    0| 7.925|       S|     0.325|0.015468569476752568|           1.0|    (1,[],[])|      1.0|    (1,[],[])|        0.0|(6,[0],[1.0])|        0.0|(6,[0],[1.0])|
|         64|       0|     3|  male|  4|    3|    2|  27.9|       S|      0.05|0.054457169982804386|           0.0|(1,[0],[1.0])|      0.0|(1,[0],[1.0])|        4.0|(6,[4],[1.0

DataFrame[PassengerId: int, Survived: int, Pclass: string, Sex: string, Age: int, SibSp: string, Parch: string, Fare: float, Embarked: string, scaled_Age: double, scaled_Fare: double, Survived_Index: double, Survived_Vec: vector, Sex_Index: double, Sex_Vec: vector, SibSp_Index: double, SibSp_Vec: vector, Parch_Index: double, Parch_Vec: vector]

In [27]:
oneHot(encoded_df, 'Embarked')

+-----------+--------+------+------+---+-----+-----+------+--------+----------+--------------------+--------------+-------------+---------+-------------+-----------+-------------+-----------+-------------+--------------+-------------+
|PassengerId|Survived|Pclass|   Sex|Age|SibSp|Parch|  Fare|Embarked|scaled_Age|         scaled_Fare|Survived_Index| Survived_Vec|Sex_Index|      Sex_Vec|SibSp_Index|    SibSp_Vec|Parch_Index|    Parch_Vec|Embarked_Index| Embarked_Vec|
+-----------+--------+------+------+---+-----+-----+------+--------+----------+--------------------+--------------+-------------+---------+-------------+-----------+-------------+-----------+-------------+--------------+-------------+
|          3|       1|     3|female| 26|    0|    0| 7.925|       S|     0.325|0.015468569476752568|           1.0|    (1,[],[])|      1.0|    (1,[],[])|        0.0|(6,[0],[1.0])|        0.0|(6,[0],[1.0])|           0.0|(2,[0],[1.0])|
|         64|       0|     3|  male|  4|    3|    2|  27.9| 

DataFrame[PassengerId: int, Survived: int, Pclass: string, Sex: string, Age: int, SibSp: string, Parch: string, Fare: float, Embarked: string, scaled_Age: double, scaled_Fare: double, Survived_Index: double, Survived_Vec: vector, Sex_Index: double, Sex_Vec: vector, SibSp_Index: double, SibSp_Vec: vector, Parch_Index: double, Parch_Vec: vector, Embarked_Index: double, Embarked_Vec: vector]

In [28]:
df_final = encoded_df[['PassengerId', 'Survived', 'Survived_vec', 'scaled_Age', 'scaled_Fare', 'Embarked_vec', 'Sex_Vec', 'SibSp_Vec', 'Parch_Vec']]
df_final.show()

+-----------+--------+-------------+----------+--------------------+-------------+-------------+-------------+-------------+
|PassengerId|Survived| Survived_vec|scaled_Age|         scaled_Fare| Embarked_vec|      Sex_Vec|    SibSp_Vec|    Parch_Vec|
+-----------+--------+-------------+----------+--------------------+-------------+-------------+-------------+-------------+
|          3|       1|    (1,[],[])|     0.325|0.015468569476752568|(2,[0],[1.0])|    (1,[],[])|(6,[0],[1.0])|(6,[0],[1.0])|
|         64|       0|(1,[0],[1.0])|      0.05|0.054457169982804386|(2,[0],[1.0])|(1,[0],[1.0])|(6,[4],[1.0])|(6,[2],[1.0])|
|         77|       0|(1,[0],[1.0])|    0.3625| 0.01541157472472606|(2,[0],[1.0])|(1,[0],[1.0])|(6,[0],[1.0])|(6,[0],[1.0])|
|        142|       1|    (1,[],[])|     0.275|0.015126991868717687|(2,[0],[1.0])|    (1,[],[])|(6,[0],[1.0])|(6,[0],[1.0])|
|        176|       0|(1,[0],[1.0])|     0.225|0.015330376492345367|(2,[0],[1.0])|(1,[0],[1.0])|(6,[1],[1.0])|(6,[1],[1.0])|


In [29]:
from pyspark.ml.feature import VectorAssembler

features = ['PassengerId',
  'scaled_Age',
  'scaled_Fare',
  'Embarked_vec',
  'Sex_Vec',
  'SibSp_Vec',
  'Parch_Vec']

assmbler = VectorAssembler(inputCols=features, outputCol='features')
df_final = assmbler.transform(df_final)
df_final.show(5)

+-----------+--------+-------------+----------+--------------------+-------------+-------------+-------------+-------------+--------------------+
|PassengerId|Survived| Survived_vec|scaled_Age|         scaled_Fare| Embarked_vec|      Sex_Vec|    SibSp_Vec|    Parch_Vec|            features|
+-----------+--------+-------------+----------+--------------------+-------------+-------------+-------------+-------------+--------------------+
|          3|       1|    (1,[],[])|     0.325|0.015468569476752568|(2,[0],[1.0])|    (1,[],[])|(6,[0],[1.0])|(6,[0],[1.0])|(18,[0,1,2,3,6,12...|
|         64|       0|(1,[0],[1.0])|      0.05|0.054457169982804386|(2,[0],[1.0])|(1,[0],[1.0])|(6,[4],[1.0])|(6,[2],[1.0])|(18,[0,1,2,3,5,10...|
|         77|       0|(1,[0],[1.0])|    0.3625| 0.01541157472472606|(2,[0],[1.0])|(1,[0],[1.0])|(6,[0],[1.0])|(6,[0],[1.0])|(18,[0,1,2,3,5,6,...|
|        142|       1|    (1,[],[])|     0.275|0.015126991868717687|(2,[0],[1.0])|    (1,[],[])|(6,[0],[1.0])|(6,[0],[1.0])|

#### Train Test split

In [30]:
splits = df_final.randomSplit([0.8, 0.2], 1)

In [31]:
train_df = splits[0]
test_df = splits[1]

print(train_df.count())
print(test_df.count())

729
160


### Build ML model

In [32]:
from pyspark.ml.classification import RandomForestClassifier

In [33]:
train_df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Survived_vec: vector (nullable = true)
 |-- scaled_Age: double (nullable = true)
 |-- scaled_Fare: double (nullable = true)
 |-- Embarked_vec: vector (nullable = true)
 |-- Sex_Vec: vector (nullable = true)
 |-- SibSp_Vec: vector (nullable = true)
 |-- Parch_Vec: vector (nullable = true)
 |-- features: vector (nullable = true)



In [34]:
rf = RandomForestClassifier(featuresCol='features',
                            labelCol='Survived') 

rfModel = rf.fit(train_df)

In [35]:
prediction = rfModel.transform(test_df)

In [36]:
prediction.select('PassengerId', 'Survived').show(10)

+-----------+--------+
|PassengerId|Survived|
+-----------+--------+
|          5|       0|
|         14|       0|
|         15|       0|
|         16|       1|
|         18|       1|
|         30|       0|
|         33|       1|
|         34|       0|
|         43|       0|
|         45|       1|
+-----------+--------+
only showing top 10 rows



In [37]:
prediction.select('PassengerId', 'Survived', 'prediction').show()

+-----------+--------+----------+
|PassengerId|Survived|prediction|
+-----------+--------+----------+
|          5|       0|       0.0|
|         14|       0|       0.0|
|         15|       0|       1.0|
|         16|       1|       1.0|
|         18|       1|       0.0|
|         30|       0|       0.0|
|         33|       1|       1.0|
|         34|       0|       0.0|
|         43|       0|       0.0|
|         45|       1|       1.0|
|         52|       0|       0.0|
|         55|       0|       0.0|
|         59|       1|       1.0|
|         67|       1|       1.0|
|         76|       0|       0.0|
|         77|       0|       0.0|
|         85|       1|       1.0|
|         86|       1|       1.0|
|         90|       0|       0.0|
|         94|       0|       0.0|
+-----------+--------+----------+
only showing top 20 rows



### Model Evaluation

In [38]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [39]:
evaluator = MulticlassClassificationEvaluator(labelCol='Survived',  predictionCol='prediction')

accuracy = evaluator.evaluate(prediction)

print("Accuracy = %s" % (accuracy))
print("Test Error = %s" % (1.0 - accuracy))

Accuracy = 0.8009716599190284
Test Error = 0.1990283400809716


### Save Model

In [52]:
rfModel.save('rfm_titanic_model')

### Load and Predict on new data

In [3]:
from pyspark.ml.classification import RandomForestClassificationModel

model = RandomForestClassificationModel.load('rfm_titanic_model')

In [4]:
new_df = sc.read.csv('titanic_test.csv', header=True)
new_df.show()

+-----------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|        892|     3|    Kelly, Mr. James|  male|34.5|    0|    0|          330911| 7.8292| null|       Q|
|        893|     3|Wilkes, Mrs. Jame...|female|  47|    1|    0|          363272|      7| null|       S|
|        894|     2|Myles, Mr. Thomas...|  male|  62|    0|    0|          240276| 9.6875| null|       Q|
|        895|     3|    Wirz, Mr. Albert|  male|  27|    0|    0|          315154| 8.6625| null|       S|
|        896|     3|Hirvonen, Mrs. Al...|female|  22|    1|    1|         3101298|12.2875| null|       S|
|        897|     3|Svensson, Mr. Joh...|  male|  14|    0|    0|            7538|  9.225| null|       S|
|        898|     3|Connolly, Miss. Kate|femal

In [5]:
new_df = new_df.drop('Name')
new_df = new_df.drop('Ticket')
new_df = new_df.drop('Cabin')
new_df.show(5)

+-----------+------+------+----+-----+-----+-------+--------+
|PassengerId|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|
+-----------+------+------+----+-----+-----+-------+--------+
|        892|     3|  male|34.5|    0|    0| 7.8292|       Q|
|        893|     3|female|  47|    1|    0|      7|       S|
|        894|     2|  male|  62|    0|    0| 9.6875|       Q|
|        895|     3|  male|  27|    0|    0| 8.6625|       S|
|        896|     3|female|  22|    1|    1|12.2875|       S|
+-----------+------+------+----+-----+-----+-------+--------+
only showing top 5 rows



In [6]:
new_df = new_df.withColumn('Age', new_df['Age'].cast(IntegerType()))
new_df = new_df.withColumn('PassengerId', new_df['PassengerId'].cast(IntegerType()))
new_df = new_df.withColumn('Fare', new_df['Fare'].cast(FloatType()))

new_df.dtypes

[('PassengerId', 'int'),
 ('Pclass', 'string'),
 ('Sex', 'string'),
 ('Age', 'int'),
 ('SibSp', 'string'),
 ('Parch', 'string'),
 ('Fare', 'float'),
 ('Embarked', 'string')]

In [7]:
df_2 = model.transform(new_df)

df_2.show(10)

IllegalArgumentException: features does not exist. Available: PassengerId, Pclass, Sex, Age, SibSp, Parch, Fare, Embarked

In [13]:
cols = ['PassengerId', 'Pclass', 'Sex', 'Age', 'SibSp', 'Parch', 'Fare', 'Embarked']
missing_val(new_df, cols)

[{'PassengerId': 0},
 {'Pclass': 0},
 {'Sex': 0},
 {'Age': 86},
 {'SibSp': 0},
 {'Parch': 0},
 {'Fare': 1},
 {'Embarked': 0}]

In [14]:
col_mean = new_df.agg({'Age': 'mean'}).collect()[0][0]
new_df = new_df.fillna(col_mean, subset=['Age'])

In [15]:
cols = ['PassengerId', 'Pclass', 'Sex', 'Age', 'SibSp', 'Parch', 'Fare', 'Embarked']
missing_val(new_df, cols)

[{'PassengerId': 0},
 {'Pclass': 0},
 {'Sex': 0},
 {'Age': 0},
 {'SibSp': 0},
 {'Parch': 0},
 {'Fare': 1},
 {'Embarked': 0}]

In [16]:
minmaxScale(new_df, 'Age')

+-----------+------+------+---+-----+-----+-------+--------+-------------------+
|PassengerId|Pclass|   Sex|Age|SibSp|Parch|   Fare|Embarked|         scaled_Age|
+-----------+------+------+---+-----+-----+-------+--------+-------------------+
|        892|     3|  male| 34|    0|    0| 7.8292|       Q| 0.4473684210526316|
|        893|     3|female| 47|    1|    0|    7.0|       S|  0.618421052631579|
|        894|     2|  male| 62|    0|    0| 9.6875|       Q| 0.8157894736842105|
|        895|     3|  male| 27|    0|    0| 8.6625|       S|0.35526315789473684|
|        896|     3|female| 22|    1|    1|12.2875|       S| 0.2894736842105263|
+-----------+------+------+---+-----+-----+-------+--------+-------------------+
only showing top 5 rows



In [17]:
minmaxScale(df_scaled, 'Fare')

+-----------+------+------+---+-----+-----+-------+--------+-------------------+--------------------+
|PassengerId|Pclass|   Sex|Age|SibSp|Parch|   Fare|Embarked|         scaled_Age|         scaled_Fare|
+-----------+------+------+---+-----+-----+-------+--------+-------------------+--------------------+
|        892|     3|  male| 34|    0|    0| 7.8292|       Q| 0.4473684210526316|0.015281579558236941|
|        893|     3|female| 47|    1|    0|    7.0|       S|  0.618421052631579|0.013663089429809523|
|        894|     2|  male| 62|    0|    0| 9.6875|       Q| 0.8157894736842105| 0.01890873983589711|
|        895|     3|  male| 27|    0|    0| 8.6625|       S|0.35526315789473684| 0.01690807391396857|
|        896|     3|female| 22|    1|    1|12.2875|       S| 0.2894736842105263| 0.02398360236869136|
+-----------+------+------+---+-----+-----+-------+--------+-------------------+--------------------+
only showing top 5 rows



In [20]:
oneHot(df_scaled, 'Sex')

+-----------+------+------+---+-----+-----+------+--------+------------------+--------------------+---------+-------------+
|PassengerId|Pclass|   Sex|Age|SibSp|Parch|  Fare|Embarked|        scaled_Age|         scaled_Fare|Sex_Index|      Sex_Vec|
+-----------+------+------+---+-----+-----+------+--------+------------------+--------------------+---------+-------------+
|        892|     3|  male| 34|    0|    0|7.8292|       Q|0.4473684210526316|0.015281579558236941|      0.0|(1,[0],[1.0])|
|        893|     3|female| 47|    1|    0|   7.0|       S| 0.618421052631579|0.013663089429809523|      1.0|    (1,[],[])|
|        894|     2|  male| 62|    0|    0|9.6875|       Q|0.8157894736842105| 0.01890873983589711|      0.0|(1,[0],[1.0])|
+-----------+------+------+---+-----+-----+------+--------+------------------+--------------------+---------+-------------+
only showing top 3 rows



DataFrame[PassengerId: int, Pclass: string, Sex: string, Age: int, SibSp: string, Parch: string, Fare: float, Embarked: string, scaled_Age: double, scaled_Fare: double, Sex_Index: double, Sex_Vec: vector]

In [21]:
oneHot(encoded_df, 'SibSp')

+-----------+------+------+---+-----+-----+------+--------+------------------+--------------------+---------+-------------+-----------+-------------+
|PassengerId|Pclass|   Sex|Age|SibSp|Parch|  Fare|Embarked|        scaled_Age|         scaled_Fare|Sex_Index|      Sex_Vec|SibSp_Index|    SibSp_Vec|
+-----------+------+------+---+-----+-----+------+--------+------------------+--------------------+---------+-------------+-----------+-------------+
|        892|     3|  male| 34|    0|    0|7.8292|       Q|0.4473684210526316|0.015281579558236941|      0.0|(1,[0],[1.0])|        0.0|(6,[0],[1.0])|
|        893|     3|female| 47|    1|    0|   7.0|       S| 0.618421052631579|0.013663089429809523|      1.0|    (1,[],[])|        1.0|(6,[1],[1.0])|
|        894|     2|  male| 62|    0|    0|9.6875|       Q|0.8157894736842105| 0.01890873983589711|      0.0|(1,[0],[1.0])|        0.0|(6,[0],[1.0])|
+-----------+------+------+---+-----+-----+------+--------+------------------+--------------------+-

DataFrame[PassengerId: int, Pclass: string, Sex: string, Age: int, SibSp: string, Parch: string, Fare: float, Embarked: string, scaled_Age: double, scaled_Fare: double, Sex_Index: double, Sex_Vec: vector, SibSp_Index: double, SibSp_Vec: vector]

In [22]:
oneHot(encoded_df, 'Parch')

+-----------+------+------+---+-----+-----+------+--------+------------------+--------------------+---------+-------------+-----------+-------------+-----------+-------------+
|PassengerId|Pclass|   Sex|Age|SibSp|Parch|  Fare|Embarked|        scaled_Age|         scaled_Fare|Sex_Index|      Sex_Vec|SibSp_Index|    SibSp_Vec|Parch_Index|    Parch_Vec|
+-----------+------+------+---+-----+-----+------+--------+------------------+--------------------+---------+-------------+-----------+-------------+-----------+-------------+
|        892|     3|  male| 34|    0|    0|7.8292|       Q|0.4473684210526316|0.015281579558236941|      0.0|(1,[0],[1.0])|        0.0|(6,[0],[1.0])|        0.0|(7,[0],[1.0])|
|        893|     3|female| 47|    1|    0|   7.0|       S| 0.618421052631579|0.013663089429809523|      1.0|    (1,[],[])|        1.0|(6,[1],[1.0])|        0.0|(7,[0],[1.0])|
|        894|     2|  male| 62|    0|    0|9.6875|       Q|0.8157894736842105| 0.01890873983589711|      0.0|(1,[0],[1.0

DataFrame[PassengerId: int, Pclass: string, Sex: string, Age: int, SibSp: string, Parch: string, Fare: float, Embarked: string, scaled_Age: double, scaled_Fare: double, Sex_Index: double, Sex_Vec: vector, SibSp_Index: double, SibSp_Vec: vector, Parch_Index: double, Parch_Vec: vector]

In [23]:
oneHot(encoded_df, 'Embarked')

+-----------+------+------+---+-----+-----+------+--------+------------------+--------------------+---------+-------------+-----------+-------------+-----------+-------------+--------------+-------------+
|PassengerId|Pclass|   Sex|Age|SibSp|Parch|  Fare|Embarked|        scaled_Age|         scaled_Fare|Sex_Index|      Sex_Vec|SibSp_Index|    SibSp_Vec|Parch_Index|    Parch_Vec|Embarked_Index| Embarked_Vec|
+-----------+------+------+---+-----+-----+------+--------+------------------+--------------------+---------+-------------+-----------+-------------+-----------+-------------+--------------+-------------+
|        892|     3|  male| 34|    0|    0|7.8292|       Q|0.4473684210526316|0.015281579558236941|      0.0|(1,[0],[1.0])|        0.0|(6,[0],[1.0])|        0.0|(7,[0],[1.0])|           2.0|    (2,[],[])|
|        893|     3|female| 47|    1|    0|   7.0|       S| 0.618421052631579|0.013663089429809523|      1.0|    (1,[],[])|        1.0|(6,[1],[1.0])|        0.0|(7,[0],[1.0])|     

DataFrame[PassengerId: int, Pclass: string, Sex: string, Age: int, SibSp: string, Parch: string, Fare: float, Embarked: string, scaled_Age: double, scaled_Fare: double, Sex_Index: double, Sex_Vec: vector, SibSp_Index: double, SibSp_Vec: vector, Parch_Index: double, Parch_Vec: vector, Embarked_Index: double, Embarked_Vec: vector]

In [25]:
new_df_final = encoded_df[['PassengerId', 'scaled_Age', 'scaled_Fare', 'Embarked_vec', 'Sex_Vec', 'SibSp_Vec', 'Parch_Vec']]
new_df_final.show()

+-----------+-------------------+--------------------+-------------+-------------+-------------+-------------+
|PassengerId|         scaled_Age|         scaled_Fare| Embarked_vec|      Sex_Vec|    SibSp_Vec|    Parch_Vec|
+-----------+-------------------+--------------------+-------------+-------------+-------------+-------------+
|        892| 0.4473684210526316|0.015281579558236941|    (2,[],[])|(1,[0],[1.0])|(6,[0],[1.0])|(7,[0],[1.0])|
|        893|  0.618421052631579|0.013663089429809523|(2,[0],[1.0])|    (1,[],[])|(6,[1],[1.0])|(7,[0],[1.0])|
|        894| 0.8157894736842105| 0.01890873983589711|    (2,[],[])|(1,[0],[1.0])|(6,[0],[1.0])|(7,[0],[1.0])|
|        895|0.35526315789473684| 0.01690807391396857|(2,[0],[1.0])|(1,[0],[1.0])|(6,[0],[1.0])|(7,[0],[1.0])|
|        896| 0.2894736842105263| 0.02398360236869136|(2,[0],[1.0])|    (1,[],[])|(6,[1],[1.0])|(7,[1],[1.0])|
|        897|0.18421052631578946|0.018006000743149692|(2,[0],[1.0])|(1,[0],[1.0])|(6,[0],[1.0])|(7,[0],[1.0])|
|

In [28]:
from pyspark.ml.feature import VectorAssembler

features = ['PassengerId',
  'scaled_Age',
  'scaled_Fare',
  'Embarked_vec',
  'Sex_Vec',
  'SibSp_Vec',
  'Parch_Vec']

assmbler = VectorAssembler(inputCols=features, outputCol='features')
new_df_final = assmbler.transform(new_df_final)
new_df_final.show(5)

+-----------+-------------------+--------------------+-------------+-------------+-------------+-------------+--------------------+
|PassengerId|         scaled_Age|         scaled_Fare| Embarked_vec|      Sex_Vec|    SibSp_Vec|    Parch_Vec|            features|
+-----------+-------------------+--------------------+-------------+-------------+-------------+-------------+--------------------+
|        892| 0.4473684210526316|0.015281579558236941|    (2,[],[])|(1,[0],[1.0])|(6,[0],[1.0])|(7,[0],[1.0])|(19,[0,1,2,5,6,12...|
|        893|  0.618421052631579|0.013663089429809523|(2,[0],[1.0])|    (1,[],[])|(6,[1],[1.0])|(7,[0],[1.0])|(19,[0,1,2,3,7,12...|
|        894| 0.8157894736842105| 0.01890873983589711|    (2,[],[])|(1,[0],[1.0])|(6,[0],[1.0])|(7,[0],[1.0])|(19,[0,1,2,5,6,12...|
|        895|0.35526315789473684| 0.01690807391396857|(2,[0],[1.0])|(1,[0],[1.0])|(6,[0],[1.0])|(7,[0],[1.0])|(19,[0,1,2,3,5,6,...|
|        896| 0.2894736842105263| 0.02398360236869136|(2,[0],[1.0])|    (1,[

In [29]:
df_2 = model.transform(new_df_final)

df_2.show(10)

+-----------+-------------------+--------------------+-------------+-------------+-------------+-------------+--------------------+--------------------+--------------------+----------+
|PassengerId|         scaled_Age|         scaled_Fare| Embarked_vec|      Sex_Vec|    SibSp_Vec|    Parch_Vec|            features|       rawPrediction|         probability|prediction|
+-----------+-------------------+--------------------+-------------+-------------+-------------+-------------+--------------------+--------------------+--------------------+----------+
|        892| 0.4473684210526316|0.015281579558236941|    (2,[],[])|(1,[0],[1.0])|(6,[0],[1.0])|(7,[0],[1.0])|(19,[0,1,2,5,6,12...|[17.7215195753915...|[0.88607597876957...|       0.0|
|        893|  0.618421052631579|0.013663089429809523|(2,[0],[1.0])|    (1,[],[])|(6,[1],[1.0])|(7,[0],[1.0])|(19,[0,1,2,3,7,12...|[10.9763975913223...|[0.54881987956611...|       0.0|
|        894| 0.8157894736842105| 0.01890873983589711|    (2,[],[])|(1,[0],

In [30]:
df_predicted = df_2['PassengerId', 'prediction']

df_predicted.show()

+-----------+----------+
|PassengerId|prediction|
+-----------+----------+
|        892|       0.0|
|        893|       0.0|
|        894|       0.0|
|        895|       0.0|
|        896|       1.0|
|        897|       0.0|
|        898|       1.0|
|        899|       0.0|
|        900|       1.0|
|        901|       0.0|
|        902|       0.0|
|        903|       0.0|
|        904|       1.0|
|        905|       0.0|
|        906|       1.0|
|        907|       1.0|
|        908|       0.0|
|        909|       0.0|
|        910|       0.0|
|        911|       1.0|
+-----------+----------+
only showing top 20 rows



In [37]:
df_predicted = df_predicted.withColumn('Survived', df_predicted['prediction'].cast(IntegerType()))

df_predicted.dtypes

[('PassengerId', 'int'), ('prediction', 'int'), ('Survived', 'int')]

In [38]:
df_predicted.show(10)

+-----------+----------+--------+
|PassengerId|prediction|Survived|
+-----------+----------+--------+
|        892|         0|       0|
|        893|         0|       0|
|        894|         0|       0|
|        895|         0|       0|
|        896|         1|       1|
|        897|         0|       0|
|        898|         1|       1|
|        899|         0|       0|
|        900|         1|       1|
|        901|         0|       0|
+-----------+----------+--------+
only showing top 10 rows



In [40]:
df_predicted.select('PassengerId', 'Survived').write.save('submission_pyspark.csv')

AnalysisException: path file:/home/chamath/Documents/PyProjects/Exercises/PySpark exercises/submission_pyspark.csv already exists.

In [40]:
sc.stop()