In [1]:
!pip install pyspark

Defaulting to user installation because normal site-packages is not writeable

Please see https://github.com/pypa/pip/issues/5599 for advice on fixing the underlying issue.
To avoid this problem you can invoke Python with '-m pip' instead of running pip directly.





In [143]:
import pandas as pd
from pyspark.rdd import RDD
from pyspark.sql import  SparkSession,DataFrame
from pyspark.sql.types import StructType,StructField, StringType,IntegerType
from pyspark.sql.functions import col,split,regexp_replace,when,length


In [144]:
spark = SparkSession.builder.appName("Gaurav").getOrCreate()

In [145]:
rdd1=spark.read.csv('titanic.csv',inferSchema=True,header=True)

In [146]:
rdd1.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

# Print Schema


In [147]:
rdd1.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [148]:
rdd1.count() #No. of rows

891

In [149]:
len(rdd1.columns) #No. of columns

12

# Create a new dataframe containing only personnel info of Passenger(PassengerId, Name, Age, Sex)

In [150]:
rdd2=rdd1.select('PassengerId','Name','Age','Sex')

In [151]:
rdd2.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Sex: string (nullable = true)



In [152]:
rdd2.show()

+-----------+--------------------+----+------+
|PassengerId|                Name| Age|   Sex|
+-----------+--------------------+----+------+
|          1|Braund, Mr. Owen ...|22.0|  male|
|          2|Cumings, Mrs. Joh...|38.0|female|
|          3|Heikkinen, Miss. ...|26.0|female|
|          4|Futrelle, Mrs. Ja...|35.0|female|
|          5|Allen, Mr. Willia...|35.0|  male|
|          6|    Moran, Mr. James|null|  male|
|          7|McCarthy, Mr. Tim...|54.0|  male|
|          8|Palsson, Master. ...| 2.0|  male|
|          9|Johnson, Mrs. Osc...|27.0|female|
|         10|Nasser, Mrs. Nich...|14.0|female|
|         11|Sandstrom, Miss. ...| 4.0|female|
|         12|Bonnell, Miss. El...|58.0|female|
|         13|Saundercock, Mr. ...|20.0|  male|
|         14|Andersson, Mr. An...|39.0|  male|
|         15|Vestrom, Miss. Hu...|14.0|female|
|         16|Hewlett, Mrs. (Ma...|55.0|female|
|         17|Rice, Master. Eugene| 2.0|  male|
|         18|Williams, Mr. Cha...|null|  male|
|         19|

# Drop_Duplicate

In [153]:
print("Count "+str(rdd1.count()))
rdd3=rdd1.dropDuplicates()
rdd3.show()

Count 891
+-----------+--------+------+--------------------+------+----+-----+-----+------------------+-------+-------+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|            Ticket|   Fare|  Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+------------------+-------+-------+--------+
|        612|       0|     3|Jardin, Mr. Jose ...|  male|null|    0|    0|SOTON/O.Q. 3101305|   7.05|   null|       S|
|        666|       0|     2|  Hickman, Mr. Lewis|  male|32.0|    2|    0|      S.O.C. 14879|   73.5|   null|       S|
|        689|       0|     3|Fischer, Mr. Eber...|  male|18.0|    0|    0|            350036| 7.7958|   null|       S|
|        846|       0|     3| Abbing, Mr. Anthony|  male|42.0|    0|    0|         C.A. 5547|   7.55|   null|       S|
|         59|       1|     2|West, Miss. Const...|female| 5.0|    1|    2|        C.A. 34651|  27.75|   null|       S|
|         99|       1|     2|Doling, M

# dropping all records that have atleast one null/Nan values from the original dataset

In [154]:
rdd3.na.drop().show(20)

+-----------+--------+------+--------------------+------+----+-----+-----+--------+--------+-----------+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|  Ticket|    Fare|      Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+--------+-----------+--------+
|        499|       0|     1|Allison, Mrs. Hud...|female|25.0|    1|    2|  113781|  151.55|    C22 C26|       S|
|         11|       1|     3|Sandstrom, Miss. ...|female| 4.0|    1|    1| PP 9549|    16.7|         G6|       S|
|        701|       1|     1|Astor, Mrs. John ...|female|18.0|    1|    0|PC 17757| 227.525|    C62 C64|       C|
|         55|       0|     1|Ostby, Mr. Engelh...|  male|65.0|    0|    1|  113509| 61.9792|        B30|       C|
|        332|       0|     1| Partner, Mr. Austen|  male|45.5|    0|    0|  113043|    28.5|       C124|       S|
|        646|       1|     1|Harper, Mr. Henry...|  male|48.0|    1|    0|PC 17572| 76.7

# dropping columns Ticket and Cabin from the original dataset

In [155]:
rdd4=rdd3.drop("Ticket","Cabin")

In [156]:
rdd4.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked: string (nullable = true)



# Sort the original dataframe on the basis of Fare in descending order and store it into a new dataframe

In [157]:
rdd5=rdd4.sort(rdd4.Fare.desc())
rdd5.show()

+-----------+--------+------+--------------------+------+----+-----+-----+--------+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|    Fare|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+--------+
|        738|       1|     1|Lesurer, Mr. Gust...|  male|35.0|    0|    0|512.3292|       C|
|        259|       1|     1|    Ward, Miss. Anna|female|35.0|    0|    0|512.3292|       C|
|        680|       1|     1|Cardeza, Mr. Thom...|  male|36.0|    0|    1|512.3292|       C|
|        342|       1|     1|Fortune, Miss. Al...|female|24.0|    3|    2|   263.0|       S|
|         28|       0|     1|Fortune, Mr. Char...|  male|19.0|    3|    2|   263.0|       S|
|        439|       0|     1|   Fortune, Mr. Mark|  male|64.0|    1|    4|   263.0|       S|
|         89|       1|     1|Fortune, Miss. Ma...|female|23.0|    3|    2|   263.0|       S|
|        312|       1|     1|Ryerson, Miss. Em...|female|18.0|    2|  

# contains only the passengers that survived

In [158]:
rdd6=rdd5.where("survived>0")
rdd6.show()

+-----------+--------+------+--------------------+------+----+-----+-----+--------+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|    Fare|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+--------+
|        738|       1|     1|Lesurer, Mr. Gust...|  male|35.0|    0|    0|512.3292|       C|
|        259|       1|     1|    Ward, Miss. Anna|female|35.0|    0|    0|512.3292|       C|
|        680|       1|     1|Cardeza, Mr. Thom...|  male|36.0|    0|    1|512.3292|       C|
|         89|       1|     1|Fortune, Miss. Ma...|female|23.0|    3|    2|   263.0|       S|
|        342|       1|     1|Fortune, Miss. Al...|female|24.0|    3|    2|   263.0|       S|
|        743|       1|     1|"Ryerson, Miss. S...|female|21.0|    2|    2| 262.375|       C|
|        312|       1|     1|Ryerson, Miss. Em...|female|18.0|    2|    2| 262.375|       C|
|        300|       1|     1|Baxter, Mrs. Jame...|female|50.0|    0|  

# Create a new dataframe(df_survived) which follow all the following condition at the same time from the original dataset
1. All passengers should be male
2. The age of each passenger should be greater than 40 but less than 50
3. Passenger should not belong to 3rd PClass( Pclass not equal to 3)

In [159]:
rdd7=rdd1.filter((rdd1.Sex=="male") & (rdd1.Age>40) & (rdd1.Age<50) & (rdd1.Pclass!=3))
#rdd1.filter(col("Sex") == "male") & (col("Age")>40) & (col("Age")<50) & (col("Pclass")!=3).show()

In [160]:
rdd7.show()

+-----------+--------+------+--------------------+----+----+-----+-----+------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name| Sex| Age|SibSp|Parch|      Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+----+----+-----+-----+------------+-------+-----+--------+
|         36|       0|     1|Holverson, Mr. Al...|male|42.0|    1|    0|      113789|   52.0| null|       S|
|         63|       0|     1|Harris, Mr. Henry...|male|45.0|    1|    0|       36973| 83.475|  C83|       S|
|         93|       0|     1|Chaffee, Mr. Herb...|male|46.0|    1|    0| W.E.P. 5734| 61.175|  E31|       S|
|        111|       0|     1|Porter, Mr. Walte...|male|47.0|    0|    0|      110465|   52.0| C110|       S|
|        150|       0|     2|Byles, Rev. Thoma...|male|42.0|    0|    0|      244310|   13.0| null|       S|
|        188|       1|     1|"Romaine, Mr. Cha...|male|45.0|    0|    0|      111428|  26.55| null|       S|
|        218|      

# Perform the following preprocess steps on the above df_survived dataframe
1. Add 2 new columns first_name and second_name by splitting the Name column on comma(,)
2. Remove the prefixes from the second name( ex Mr. , Dr. etc)
3. Trim/Strip all the white spaces in above 2 columns( no space before and after the string in each cell)
Your output should look like the below value in last 2 columns



In [161]:
rdd8=rdd7.withColumn("first_Name", split(col("Name"), ",").getItem(0)).withColumn("Last_Name", split(col("Name"), ",").getItem(1))



In [162]:
rdd9=rdd8.withColumn('Last_Name',regexp_replace('Last_Name','Mr. |Dr. ',''))
rdd9.show()

+-----------+--------+------+--------------------+----+----+-----+-----+------------+-------+-----+--------+----------+--------------------+
|PassengerId|Survived|Pclass|                Name| Sex| Age|SibSp|Parch|      Ticket|   Fare|Cabin|Embarked|first_Name|           Last_Name|
+-----------+--------+------+--------------------+----+----+-----+-----+------------+-------+-----+--------+----------+--------------------+
|         36|       0|     1|Holverson, Mr. Al...|male|42.0|    1|    0|      113789|   52.0| null|       S| Holverson|     Alexander Oskar|
|         63|       0|     1|Harris, Mr. Henry...|male|45.0|    1|    0|       36973| 83.475|  C83|       S|    Harris|     Henry Birkhardt|
|         93|       0|     1|Chaffee, Mr. Herb...|male|46.0|    1|    0| W.E.P. 5734| 61.175|  E31|       S|   Chaffee|      Herbert Fuller|
|        111|       0|     1|Porter, Mr. Walte...|male|47.0|    0|    0|      110465|   52.0| C110|       S|    Porter|  Walter Chamberlain|
|        150|

# Perform the following additional preprocess steps on the above df_survived dataframe
1. Add a new column economy_group which contain values "Rich" or "Not Rich" depend on whether the Pclass is 1 or not respectively
2. Add a new column age_group which contain values "old" if age is more than 50, "child" if age is less than 15 otherwise "young"
3. Add a new column name_length which contain the string length for Name column

In [164]:
rdd10=rdd9.withColumn("Economy_Group",when(col("Pclass")==1,"Rich").otherwise("Not Rich"))
rdd10=rdd10.withColumn("Age_Group",when(col("Age")>50,"old"). when(col("Age")<15,"child").otherwise("young"))
rdd10=rdd10.withColumn("Name_Length",length("first_Name"))
rdd10.show()

+-----------+--------+------+--------------------+----+----+-----+-----+------------+-------+-----+--------+----------+--------------------+-------------+---------+-----------+
|PassengerId|Survived|Pclass|                Name| Sex| Age|SibSp|Parch|      Ticket|   Fare|Cabin|Embarked|first_Name|           Last_Name|Economy_Group|Age_Group|Name_Length|
+-----------+--------+------+--------------------+----+----+-----+-----+------------+-------+-----+--------+----------+--------------------+-------------+---------+-----------+
|         36|       0|     1|Holverson, Mr. Al...|male|42.0|    1|    0|      113789|   52.0| null|       S| Holverson|     Alexander Oskar|         Rich|    young|          9|
|         63|       0|     1|Harris, Mr. Henry...|male|45.0|    1|    0|       36973| 83.475|  C83|       S|    Harris|     Henry Birkhardt|         Rich|    young|          6|
|         93|       0|     1|Chaffee, Mr. Herb...|male|46.0|    1|    0| W.E.P. 5734| 61.175|  E31|       S|   Chaf