In [0]:
!pip install pyspark

In [0]:
import pyspark
pyspark.__version__

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName('Titanic Data') \
    .getOrCreate()

In [0]:
spark

In [0]:
df = (spark.read.format("csv").option("header", "true").load("/FileStore/tables/train__2_.csv")) ##Just read the spark dataframes

In [0]:
df.show(5)

In [0]:
df.toPandas()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26,0,0,STON/O2. 3101282,7.925,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35,1,0,113803,53.1,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35,0,0,373450,8.05,,S
...,...,...,...,...,...,...,...,...,...,...,...,...
886,887,0,2,"Montvila, Rev. Juozas",male,27,0,0,211536,13,,S
887,888,1,1,"Graham, Miss. Margaret Edith",female,19,0,0,112053,30,B42,S
888,889,0,3,"""Johnston, Miss. Catherine Helen """"Carrie""""""",female,,1,2,W./C. 6607,23.45,,S
889,890,1,1,"Behr, Mr. Karl Howell",male,26,0,0,111369,30,C148,C


In [0]:
df.head(10)

In [0]:
df.describe().toPandas()

Unnamed: 0,summary,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,count,891.0,891.0,891.0,891,891,714.0,891.0,891.0,891,891.0,204,889
1,mean,446.0,0.3838383838383838,2.308641975308642,,,29.69911764705882,0.5230078563411896,0.3815937149270482,260318.54916792738,32.2042079685746,,
2,stddev,257.3538420152301,0.4865924542648575,0.8360712409770491,,,14.526497332334037,1.1027434322934315,0.8060572211299488,471609.26868834975,49.69342859718089,,
3,min,1.0,0.0,1.0,"""Andersson, Mr. August Edvard (""""Wennerstrom"""")""",female,0.42,0.0,0.0,110152,0.0,A10,C
4,max,99.0,1.0,3.0,"van Melkebeke, Mr. Philemon",male,9.0,8.0,6.0,WE/P 5735,93.5,T,S


In [0]:
df.dtypes

In [0]:
df.count()

In [0]:
df.columns

In [0]:
from pyspark.sql.functions import col

dataset = df.select(col('Survived').cast('float'),
                         col('Pclass').cast('float'),
                         col('Sex'),
                         col('Age').cast('float'),
                         col('Fare').cast('float'),
                         col('Embarked')
                        )
dataset.show()

In [0]:
from pyspark.sql.functions import isnull, when, count, col  ##This will get you all the null values present in the columns
dataset.select([count(when(isnull(c), c)).alias(c) for c in dataset.columns]).show()

In [0]:
dataset = dataset.replace('?', None)\
        .dropna(how='any')  ##Using this we are eliminating the null values present in the columns

In [0]:
##Now, the Spark ML library only works with numeric data. But we still want to use the Sex and the Embarked column. For that, we will need to encode them. To do it let’s use something called the StringIndexer:

##So for encoding the values we are now using thed string indexer

## It is similar to labelEncoder or One hot in Sklearn

from pyspark.ml.feature import StringIndexer


dataset = StringIndexer(
    inputCol='Sex', 
    outputCol='Gender', 
    handleInvalid='keep').fit(dataset).transform(dataset)

dataset = StringIndexer(
    inputCol='Embarked', 
    outputCol='Boarded', 
    handleInvalid='keep').fit(dataset).transform(dataset)

dataset.show()

In [0]:
dataset.dtypes

In [0]:
## Now we droping some less imp columns

dataset = dataset.drop('Sex')
dataset = dataset.drop('Embarked')
dataset.show()

In [0]:
##Now we need to smash all the features in to one vector and then send it to ml model

required_features = ['Pclass',
                    'Age',
                    'Fare',
                    'Gender',
                    'Boarded'
                   ]

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=required_features, outputCol='features')

transformed_data = assembler.transform(dataset)

In [0]:
transformed_data.show()

In [0]:
##We will now use Random forest 

(training_data, test_data) = transformed_data.randomSplit([0.8,0.2]) ##Spliting the data

from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol='Survived', 
                            featuresCol='features',
                            maxDepth=5)

In [0]:
model = rf.fit(training_data)

In [0]:
predictions = model.transform(test_data)

In [0]:
# Evaluate the columns
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol='Survived', 
    predictionCol='prediction', 
    metricName='accuracy')

In [0]:
accuracy = evaluator.evaluate(predictions)
print('Testing Accuracy = ', accuracy) ##Testing accuracy 