# A Data Preparation Experiment (An Altered Titanic DataSet)
In this experiment we see importance of data preparation part in the CRISP-DM standard.
<div>
<img src="https://upload.wikimedia.org/wikipedia/commons/b/b9/CRISP-DM_Process_Diagram.png" width="500"/>
</div>
Image Source: www.wikimedia.org

In [0]:
from pyspark.sql.functions import rand, randn
# it creates a DataFrame with one int column and 9 rows.
dataframe1 = sqlContext.range(0, 9)
dataframe1.show()

In [0]:
import pandas as pd
# Here we generate a dataframe with 3 columns, one is just "id", the second has uniform distribution and the last has normal distribution.
dataframe2 =sqlContext.range(0, 9).select("id",rand(seed=110).alias("uniform"), randn(seed=220).alias("normal"))
# as a review you can convert your spark sql dataframe to pandas dataframe in order to use functions from that library
datapandas=dataframe2.toPandas()
dataframe2.show()

In [0]:
dataframe2.describe('uniform', 'normal').show()

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('myproj').getOrCreate()
data = spark.read.csv('/FileStore/tables/titanic_alt.csv',inferSchema=True,header=True)
data.printSchema()

## Data Dictionary
- survival:	Survival	(0 = No, 1 = Yes)
- pclass:	Ticket class	(1 = 1st, 2 = 2nd, 3 = 3rd)
- sex:	Sex	
- Age:	Age in years	
- sibsp:	# of siblings / spouses aboard the Titanic	
- parch:	# of parents / children aboard the Titanic	
- ticket:	Ticket number	
- fare:	Passenger fare	
- cabin:	Cabin number	
- city:	original city of passengers (B = Beijing, D = Delhi, C = Cherbourg, Q = Queenstown, S = Southampton, L = London)
- bmi:	bmi of passenger

Also, in case of unknown value, 888 is assigned to the observations

### The relevant columns (variables) are:
survival, Pclass, sex, Age, sibsp, parch, fare, city

In [0]:
data.printSchema()

In [0]:
# this is not necessary but just as a practice
from pyspark.sql.types import StructField,StringType,IntegerType,DoubleType,StructType
# if the second element is True means it could have null cells
df_schema = StructType([
    StructField('PassengerId', StringType(), True),
    StructField('Survived', IntegerType(), True),
    StructField('Pclass', StringType(), True),
    StructField('Sex', StringType(), True),
    StructField('Age', DoubleType(), True),
    StructField('SibSp', IntegerType(), True),
    StructField('Parch', IntegerType(), True),
    StructField('Ticket', StringType(), True),
    StructField('Fare', DoubleType(), True),
    StructField('Cabin', StringType(), True),
    StructField('city', StringType(), True),
    StructField('bmi', IntegerType(), True)
    ])
data = spark.createDataFrame(data.collect(),schema = df_schema)
data.printSchema()

In [0]:
# look at numerical values in a string column
data.show()

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.functions import when, count, col
data = data.replace(888, None)
data = data.replace("888.0", None) # it how a number is shown in a string column
df = data.select(['Survived',  'Pclass',  'Sex',  'Age',  'SibSp',  'Parch', 'Fare', 'city', 'bmi'])
na_report=df.select([count(when(isnull(c), c)).alias(c) for c in df.columns])# dimensions of the dataframe
print("Number of Rows: ",df.count() ,"   Number of Columns: ", len(df.columns))
# we should handle the missing values by imputation unless too many of them are emtpy
# na.drop() method is not recommended for avoiding biased except for the dependent variable
na_report.show()

In [0]:
# Checking emptiness proportion in each column
for field in na_report.schema.fields:
        name = str(field.name)
        na_report = na_report.withColumn(name, round(col(name)/df.count(),4))
na_report.show()

In [0]:
# bmi column has too much empty values, we could drop it
df=df.drop('bmi')
df.show(10)

In [0]:
from sklearn.impute import SimpleImputer
import pandas as pd

# for statistical imputation we convert the pyspark dataframe into pandas to use functions of this library
# deep=True means it create a new independent copy of data, if it's False, it makes a reference to the data. The default value is False
misswork = df.toPandas().copy(deep=True)
median_imputer = SimpleImputer(strategy='median')
misswork.iloc[:,3:4] = median_imputer.fit_transform(misswork.iloc[:,3:4])

In [0]:
# for categorical variable we consider mode for imputation
# mode().iloc[:1,] we pick the first row of mode because some of categorical variables might have more than one category as the most frequent category
# .iloc[0] is just for convering the produced dataframe
misswork.iloc[:,[1,2,7]]=misswork.iloc[:,[1,2,7]].fillna(misswork.iloc[:,[1,2,7]].mode().iloc[:1,].iloc[0])
# instead of below code, you can convert the dataframe into spark and use na.drop() method
misswork=misswork.dropna(subset=['Survived'])
df=spark.createDataFrame(misswork)
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

In [0]:
# The descriptive Statistics of the numerical coolumns
# check if you are suspected in some values
df.select('Age','SibSp','Parch','Fare').describe().show()

#what issues you can see in above?

In [0]:
df = df.filter((df.Age > 0) )

what is your idea about negative age, number of parent childern (they might too busy to survive, it's subjective), and fare?

In [0]:
quantiles = {
    c: dict(
        zip(["q1", "q3"], df.approxQuantile(c, [0.25, 0.75], 0))
    )
    for c in ["Age", "SibSp","Parch","Fare"]
}
quantiles

In [0]:
for i in quantiles:
    iqr = quantiles[i]['q3'] - quantiles[i]['q1']
    quantiles[i]['lower_bound'] = quantiles[i]['q1'] - (iqr * 1.5)
    quantiles[i]['upper_bound'] = quantiles[i]['q3'] + (iqr * 1.5)
print(quantiles)

In [0]:
import pyspark.sql.functions as f
df_clean=df.select(
    "*",
    *[
        f.when(
            f.col(c).between(quantiles[c]['lower_bound'], quantiles[c]['upper_bound']),
            0
        ).otherwise(1).alias(c+"_out") 
        for c in ["Age", "SibSp","Parch","Fare"]
    ]
)
df_clean.show(10)

In [0]:
from pyspark.sql.functions import col
df_clean=df_clean.withColumn("outliers", col("Age_out")+col("SibSp_out")+col("Parch_out")+col("Fare_out"))
df_clean.show()


In [0]:
# dropping outliers
df_clean = df_clean.filter((df_clean.outliers == 0) )
df_clean=df_clean.select(["Survived","Pclass", "Sex", "Age","SibSp","Parch", "Fare","city"])
df.select('Age','SibSp','Parch','Fare').describe().show()

In [0]:
import numpy as np
print("proportion of the lost Rows: ",np.round((df.count()-df_clean.count())/df.count(),4))

we lost a big proportion, you might get back and change some steps!!

In [0]:
df_clean.registerTempTable("dataclean")
display(sqlContext.sql("select * from dataclean"))

Survived,Pclass,Sex,Age,SibSp,Parch,Fare,city
0.0,3.0,male,22.0,1,0,7.25,S
1.0,3.0,female,26.0,0,0,7.925,S
1.0,1.0,female,35.0,1,0,53.1,S
0.0,3.0,male,35.0,0,0,8.05,S
0.0,3.0,male,28.0,0,0,8.4583,Q
0.0,1.0,male,54.0,0,0,51.8625,S
1.0,2.0,female,14.0,1,0,30.0708,S
0.0,3.0,male,20.0,0,0,8.05,S
0.0,3.0,female,14.0,0,0,7.8542,S
1.0,3.0,male,28.0,0,0,13.0,S


In [0]:
# distribution of cities in database
display(sqlContext.sql("select * from dataclean"))

Survived,Pclass,Sex,Age,SibSp,Parch,Fare,city
0.0,3.0,male,22.0,1,0,7.25,S
1.0,3.0,female,26.0,0,0,7.925,S
1.0,1.0,female,35.0,1,0,53.1,S
0.0,3.0,male,35.0,0,0,8.05,S
0.0,3.0,male,28.0,0,0,8.4583,Q
0.0,1.0,male,54.0,0,0,51.8625,S
1.0,2.0,female,14.0,1,0,30.0708,S
0.0,3.0,male,20.0,0,0,8.05,S
0.0,3.0,female,14.0,0,0,7.8542,S
1.0,3.0,male,28.0,0,0,13.0,S


In [0]:
# distribution of cities in database for each category of survival
display(sqlContext.sql("select * from dataclean"))

Survived,Pclass,Sex,Age,SibSp,Parch,Fare,city
0.0,3.0,male,22.0,1,0,7.25,S
1.0,3.0,female,26.0,0,0,7.925,S
1.0,1.0,female,35.0,1,0,53.1,S
0.0,3.0,male,35.0,0,0,8.05,S
0.0,3.0,male,28.0,0,0,8.4583,Q
0.0,1.0,male,54.0,0,0,51.8625,S
1.0,2.0,female,14.0,1,0,30.0708,S
0.0,3.0,male,20.0,0,0,8.05,S
0.0,3.0,female,14.0,0,0,7.8542,S
1.0,3.0,male,28.0,0,0,13.0,S


In [0]:
display(sqlContext.sql("select * from dataclean"))

Survived,Pclass,Sex,Age,SibSp,Parch,Fare,city
0.0,3.0,male,22.0,1,0,7.25,S
1.0,3.0,female,26.0,0,0,7.925,S
1.0,1.0,female,35.0,1,0,53.1,S
0.0,3.0,male,35.0,0,0,8.05,S
0.0,3.0,male,28.0,0,0,8.4583,Q
0.0,1.0,male,54.0,0,0,51.8625,S
1.0,2.0,female,14.0,1,0,30.0708,S
0.0,3.0,male,20.0,0,0,8.05,S
0.0,3.0,female,14.0,0,0,7.8542,S
1.0,3.0,male,28.0,0,0,13.0,S


# look at the previous 3 blocks, what issues we might face in the project? What are your solutions?

In [0]:
display(sqlContext.sql("select * from dataclean"))

Survived,Pclass,Sex,Age,SibSp,Parch,Fare,city
0.0,3.0,male,22.0,1,0,7.25,S
1.0,3.0,female,26.0,0,0,7.925,S
1.0,1.0,female,35.0,1,0,53.1,S
0.0,3.0,male,35.0,0,0,8.05,S
0.0,3.0,male,28.0,0,0,8.4583,Q
0.0,1.0,male,54.0,0,0,51.8625,S
1.0,2.0,female,14.0,1,0,30.0708,S
0.0,3.0,male,20.0,0,0,8.05,S
0.0,3.0,female,14.0,0,0,7.8542,S
1.0,3.0,male,28.0,0,0,13.0,S


# look at the previous block, what issues we might face in the project? What is your solution?

In [0]:
display(sqlContext.sql("select * from dataclean"))

Survived,Pclass,Sex,Age,SibSp,Parch,Fare,city
0.0,3.0,male,22.0,1,0,7.25,S
1.0,3.0,female,26.0,0,0,7.925,S
1.0,1.0,female,35.0,1,0,53.1,S
0.0,3.0,male,35.0,0,0,8.05,S
0.0,3.0,male,28.0,0,0,8.4583,Q
0.0,1.0,male,54.0,0,0,51.8625,S
1.0,2.0,female,14.0,1,0,30.0708,S
0.0,3.0,male,20.0,0,0,8.05,S
0.0,3.0,female,14.0,0,0,7.8542,S
1.0,3.0,male,28.0,0,0,13.0,S


#look at the previous block, what issues we might face in the project? What is your solution?

In [0]:
display(sqlContext.sql("select * from dataclean"))

Survived,Pclass,Sex,Age,SibSp,Parch,Fare,city
0.0,3.0,male,22.0,1,0,7.25,S
1.0,3.0,female,26.0,0,0,7.925,S
1.0,1.0,female,35.0,1,0,53.1,S
0.0,3.0,male,35.0,0,0,8.05,S
0.0,3.0,male,28.0,0,0,8.4583,Q
0.0,1.0,male,54.0,0,0,51.8625,S
1.0,2.0,female,14.0,1,0,30.0708,S
0.0,3.0,male,20.0,0,0,8.05,S
0.0,3.0,female,14.0,0,0,7.8542,S
1.0,3.0,male,28.0,0,0,13.0,S
