In [39]:
import findspark
findspark.init('/usr/local/spark/')

In [40]:
import findspark
from pyspark.sql import SparkSession

In [92]:
def initialize_Spark():

    spark = SparkSession.builder \
        .master("local[*]") \
        .appName("Titanic ETL Job") \
        .getOrCreate()

    return spark

In [93]:
from pyspark.sql.types import *
import pyspark.sql.functions as f

In [94]:
def loadData(spark):
    
    df = spark.read.csv('titanic.csv',header=True,inferSchema=True)

    print("Data loaded into PySpark", "\n")

    return df

In [95]:
spark = initialize_Spark()

In [96]:
#read data
data = loadData(spark)

Data loaded into PySpark 



In [97]:
print(data.count(), len(data.columns))

891 12


In [98]:
data.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [99]:
data.show(2)

+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|   Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0| PC 17599|71.2833|  C85|       C|
+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+
only showing top 2 rows



In [100]:
# impute missing values

In [101]:
from pyspark.sql.functions import isnan, when, count, col

In [102]:
data.select([count(when(isnan(c), c)).alias(c) for c in data.columns]).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|  0|    0|    0|     0|   0|    0|       0|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



In [103]:
data.select([count(when(col(c).isNull(), c)).alias(c) for c in data.columns]).toPandas().transpose()

Unnamed: 0,0
PassengerId,0
Survived,0
Pclass,0
Name,0
Sex,0
Age,177
SibSp,0
Parch,0
Ticket,0
Fare,0


In [104]:
# mean and modus
mean_age = data.select(f.mean(f.col('age'))).collect()[0][0]

In [105]:
mean_age

29.69911764705882

In [106]:
# impute missing age
data = data.withColumn('age', f.when(f.col('age').isNull(), mean_age). otherwise(data['age']) )

In [107]:
data.select([count(when(col(c).isNull(), c)).alias(c) for c in data.columns]).toPandas().transpose()

Unnamed: 0,0
PassengerId,0
Survived,0
Pclass,0
Name,0
Sex,0
age,0
SibSp,0
Parch,0
Ticket,0
Fare,0


In [109]:
data.groupBy('Embarked').count().orderBy('count').show()

+--------+-----+
|Embarked|count|
+--------+-----+
|    null|    2|
|       Q|   77|
|       C|  168|
|       S|  644|
+--------+-----+



In [110]:
#impute Embarked
data = data.withColumn('Embarked', f.when(f.col('Embarked').isNull(), 'S'). otherwise(data['Embarked']) )

In [111]:
data.select([count(when(col(c).isNull(), c)).alias(c) for c in data.columns]).toPandas().transpose()

Unnamed: 0,0
PassengerId,0
Survived,0
Pclass,0
Name,0
Sex,0
age,0
SibSp,0
Parch,0
Ticket,0
Fare,0


In [112]:
# drop cabin colum

In [113]:
data = data.drop('Cabin')