In [71]:
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.functions import isnan, when, count, col, isnull
import pyspark.sql.functions as F
import numpy as np
from pyspark.sql.types import DoubleType

In [2]:
spark = SparkSession.builder.appName('ML-Titanic').getOrCreate()
df = spark.read.csv('E:/Data/titanic/train.csv', header = True, inferSchema = True)
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [3]:
df.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

In [4]:
df.groupBy("Survived").count().show()

+--------+-----+
|Survived|count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+



In [5]:
numeric_features = [t[0] for t in df.dtypes if (t[1] == 'int')or(t[1] =='double')]
pd.DataFrame(df.select(numeric_features).describe().collect(), columns=df.select(numeric_features).describe().columns)

Unnamed: 0,summary,PassengerId,Survived,Pclass,Age,SibSp,Parch,Fare
0,count,891.0,891.0,891.0,714.0,891.0,891.0,891.0
1,mean,446.0,0.3838383838383838,2.308641975308642,29.69911764705882,0.5230078563411896,0.3815937149270482,32.2042079685746
2,stddev,257.3538420152301,0.4865924542648575,0.8360712409770491,14.526497332334037,1.1027434322934315,0.8060572211299488,49.69342859718089
3,min,1.0,0.0,1.0,0.42,0.0,0.0,0.0
4,max,891.0,1.0,3.0,80.0,8.0,6.0,512.3292


In [6]:
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|177|    0|    0|     0|   0|  687|       2|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



# You can do Visualization using seaborn, matplotlib. Remember, the objective using pyspark is not to visualize but to paralize the computing

In [7]:
df.select("Pclass").distinct().show()

+------+
|Pclass|
+------+
|     1|
|     3|
|     2|
+------+



In [8]:
df.select("SibSp").distinct().show()

+-----+
|SibSp|
+-----+
|    1|
|    3|
|    5|
|    4|
|    8|
|    2|
|    0|
+-----+



In [9]:
df.select("Parch").distinct().show()

+-----+
|Parch|
+-----+
|    1|
|    6|
|    3|
|    5|
|    4|
|    2|
|    0|
+-----+



In [10]:
df.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [11]:
df_clean = df.select(['Survived',
                      'Pclass',
                      'Sex',
                      'Age',
                      'SibSp',
                      'Parch',
                      'Fare',
                      'Embarked'])
df_clean.show(5)

+--------+------+------+----+-----+-----+-------+--------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|
+--------+------+------+----+-----+-----+-------+--------+
|       0|     3|  male|22.0|    1|    0|   7.25|       S|
|       1|     1|female|38.0|    1|    0|71.2833|       C|
|       1|     3|female|26.0|    0|    0|  7.925|       S|
|       1|     1|female|35.0|    1|    0|   53.1|       S|
|       0|     3|  male|35.0|    0|    0|   8.05|       S|
+--------+------+------+----+-----+-----+-------+--------+
only showing top 5 rows



In [12]:
data_1 = df.where((df_clean.Survived == 1) & (df_clean.Age.isNotNull()))
age_1 = data_1.groupBy("Age").count()
array1 = age_1.select("count").collect()
array1 = [int(row[0]) for row in array1]
age_1 = age_1.withColumn("proportion", F.col("count")/sum(array1))

In [13]:
data_0 = df.where((df_clean.Survived == 0) & (df_clean.Age.isNotNull()))
age_0 = data_0.groupBy("Age").count()
array0 = age_0.select("count").collect()
array0 = [int(row[0]) for row in array0]
age_0 = age_0.withColumn("proportion", F.col("count")/sum(array0))

In [14]:
print(age_1.show(5))
print(age_0.show(5))

+----+-----+--------------------+
| Age|count|          proportion|
+----+-----+--------------------+
| 8.0|    2|0.006896551724137931|
| 7.0|    1|0.003448275862068...|
|49.0|    4|0.013793103448275862|
|29.0|    8|0.027586206896551724|
|47.0|    1|0.003448275862068...|
+----+-----+--------------------+
only showing top 5 rows

None
+----+-----+--------------------+
| Age|count|          proportion|
+----+-----+--------------------+
| 8.0|    2|0.004716981132075...|
|70.0|    2|0.004716981132075...|
| 7.0|    2|0.004716981132075...|
|20.5|    1|0.002358490566037...|
|49.0|    2|0.004716981132075...|
+----+-----+--------------------+
only showing top 5 rows

None


In [68]:
array0 = [row[0] for row in age_0.select("Age").collect()]
array1 = [row[0] for row in age_1.select("Age").collect()]
w_array0 = [row[0] for row in age_0.select("proportion").collect()]
w_array1 = [row[0] for row in age_1.select("proportion").collect()]

In [69]:
size1 = df_clean.where((df_clean.Survived == 1) & (df_clean.Age.isNull())).count()
input1 = np.random.choice(array1,size=size1,replace=True, p=w_array1)
size0 = df_clean.where((df_clean.Survived == 0) & (df_clean.Age.isNull())).count()
input0 = np.random.choice(array0,size=size0,replace=True, p=w_array0)

In [70]:
input1

array([35.  , 32.  , 24.  , 33.  , 26.  , 27.  , 63.  , 29.  , 36.  ,
       32.  , 22.  , 32.  , 24.  , 29.  , 13.  , 48.  , 16.  , 28.  ,
       18.  , 42.  , 56.  , 18.  , 62.  , 49.  ,  0.42,  3.  ,  3.  ,
       29.  , 25.  , 16.  , 27.  , 62.  , 42.  ,  2.  ,  5.  , 36.  ,
       13.  , 26.  , 30.  , 35.  , 35.  , 34.  , 32.  , 18.  , 15.  ,
        4.  , 48.  , 26.  , 22.  , 29.  , 50.  , 28.  ])

In [98]:
df_clean_input = df_clean.withColumn('Age', when((df_clean.Survived == 1) & (df_clean.Age.isNull()),
                                                 np.random.choice(array1,size=1,replace=True, p=w_array1)[0]).otherwise(df_clean['Age']))

In [100]:
df.where((df_clean.Survived == 1) & (df_clean.Age.isNull()))

+-----------+--------+------+--------------------+------+----+-----+-----+--------+--------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|  Ticket|    Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+--------+-----+--------+
|         18|       1|     2|Williams, Mr. Cha...|  male|null|    0|    0|  244373|    13.0| null|       S|
|         20|       1|     3|Masselmani, Mrs. ...|female|null|    0|    0|    2649|   7.225| null|       C|
|         29|       1|     3|"O'Dwyer, Miss. E...|female|null|    0|    0|  330959|  7.8792| null|       Q|
|         32|       1|     1|Spencer, Mrs. Wil...|female|null|    1|    0|PC 17569|146.5208|  B78|       C|
|         33|       1|     3|Glynn, Miss. Mary...|female|null|    0|    0|  335677|    7.75| null|       Q|
|         37|       1|     3|    Mamee, Mr. Hanna|  male|null|    0|    0|    2677|  7.2292| null|       C|
|         48|       1|     3

In [103]:
df_clean_input.show(30)

+--------+------+------+----+-----+-----+-------+--------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|
+--------+------+------+----+-----+-----+-------+--------+
|       0|     3|  male|22.0|    1|    0|   7.25|       S|
|       1|     1|female|38.0|    1|    0|71.2833|       C|
|       1|     3|female|26.0|    0|    0|  7.925|       S|
|       1|     1|female|35.0|    1|    0|   53.1|       S|
|       0|     3|  male|35.0|    0|    0|   8.05|       S|
|       0|     3|  male|null|    0|    0| 8.4583|       Q|
|       0|     1|  male|54.0|    0|    0|51.8625|       S|
|       0|     3|  male| 2.0|    3|    1| 21.075|       S|
|       1|     3|female|27.0|    0|    2|11.1333|       S|
|       1|     2|female|14.0|    1|    0|30.0708|       C|
|       1|     3|female| 4.0|    1|    1|   16.7|       S|
|       1|     1|female|58.0|    0|    0|  26.55|       S|
|       0|     3|  male|20.0|    0|    0|   8.05|       S|
|       0|     3|  male|39.0|    1|    5| 31.275|       

In [104]:
np.random.choice(array1,size=1,replace=True, p=w_array1)[0]

39.0

In [105]:
np.random.choice(array1,size=1,replace=True, p=w_array1)[0]

5.0