In [12]:
# importing pyspark libs
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd

In [13]:
# Spark session
try:
    spark = SparkSession.builder.master("local").appName("PysparkLearning").getOrCreate()
    if spark is not None:
        print("Spark session created successfully")
    else:
        print("Please check your spark session variable")
except Exception as e:
    print("Exception occured during spark session creation with error :"+str(e))

Spark session created successfully


In [14]:
# this dataset is downloaded from Kaggle
csvFileName = "D:/Pyspark_latest/Testing with multiple file formats/csv/Titanic-Dataset.csv"

#make sure you give the header = True else spark will create default columns
try:
    titanic_data = spark.read.csv(csvFileName,inferSchema = True ,header=True, sep=",")
except Exception as e:
    print("Error in reading the file")

### Purpose : To analyse which segment of people have survived the most

#### Knowledge Bytes : 1
- There is no shape function in spark
- If you want to have summary of df like pandas.describe to give mean, median and percentiles u can use summary function
- Remember summary is a transformation unless we use an action like show or collect it will not print the results
- If you use show it will always return None type used for only debugging purose
- collect : Collects the result set and stores into a row object --> returns a py list
- Remember collect can crash the program if the data is too huge

In [15]:
print("Columns in data set are")
print(titanic_data.columns)  # printing the colums

print("Column Nature (Dtypes and nullable nature)")
print(titanic_data.printSchema()) # prints the column data type and nullable nature

print("Size of dataset")
print(titanic_data.count()) #shape function is not present in spark

print("Number of columns " +str(len(titanic_data.columns)))

print("Sample of 10 records (rows)")
print(titanic_data.show(10))  # 10 records display

# some stats around the df
print("Summary of df")
stats = titanic_data.summary().collect()
for row in stats:
    print(row.asDict())

# Other ways of producing the same result
print("Other way 1")
print(titanic_data.summary().toPandas())

# we can also use truncate which tells spark not to cut off long values

print("Other way 2")
print(titanic_data.summary().show(truncate=False))

Columns in data set are
['PassengerId', 'Survived', 'Pclass', 'Name', 'Sex', 'Age', 'SibSp', 'Parch', 'Ticket', 'Fare', 'Cabin', 'Embarked']
Column Nature (Dtypes and nullable nature)
root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)

None
Size of dataset
891
Number of columns 12
Sample of 10 records (rows)
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------

#### Calculating the percentage of nulls in each column


In [16]:
#method 1 using filter

col_list = titanic_data.columns
null_percent_dict = {}
for column in col_list:
    null_percent_dict[column] = (titanic_data.filter(col(column).isNull()).count()/titanic_data.count())*100.00
    
null_percent_df = spark.createDataFrame([null_percent_dict])
print(null_percent_df.show())


+------------------+-----------------+-------------------+----+----+-----+-----------+------+---+-----+--------+------+
|               Age|            Cabin|           Embarked|Fare|Name|Parch|PassengerId|Pclass|Sex|SibSp|Survived|Ticket|
+------------------+-----------------+-------------------+----+----+-----+-----------+------+---+-----+--------+------+
|19.865319865319865|77.10437710437711|0.22446689113355783| 0.0| 0.0|  0.0|        0.0|   0.0|0.0|  0.0|     0.0|   0.0|
+------------------+-----------------+-------------------+----+----+-----+-----------+------+---+-----+--------+------+

None


In [17]:
# method 2 using select and when
for column in col_list:
    print(column)
    null_count = titanic_data.select(count(when(col(column).isNull(),column)).alias("Null Count")).collect()[0]["Null Count"]
    print(null_count)
    

PassengerId
0
Survived
0
Pclass
0
Name
0
Sex
0
Age
177
SibSp
0
Parch
0
Ticket
0
Fare
0
Cabin
687
Embarked
2


#### Knowledge Bytes 2
- To create new column (in this case AgeSegments) we can use either withColumn or select approach
- withColumn is used to rename of create new columns
- select - we need to make sure all the columns are present in the df else it will only produce the new column

In [26]:
titanic_data1 = titanic_data.withColumn(
    "AgeSegments",
    when(col("Age") < 18, "0-17")
    .when((col("Age") >= 18) & (col("Age") < 35), "18-34")
    .when((col("Age") >= 35) & (col("Age") < 50), "35-49")
    .when((col("Age") >= 50) & (col("Age") < 60), "50-59")
    .when((col("Age") >= 60) & (col("Age") < 70), "60-69")
    .when(col("Age") >= 70, "70+")
    .otherwise("Unknown")
)


In [19]:
# method 2
titanic_data2 = titanic_data.select(
    "*",  # keep all existing columns
    when(col("Age") < 18, "0-17")
    .when((col("Age") >= 18) & (col("Age") < 35), "18-34")
    .when((col("Age") >= 35) & (col("Age") < 50), "35-49")
    .when((col("Age") >= 50) & (col("Age") < 60), "50-59")
    .when((col("Age") >= 60) & (col("Age") < 70), "60-69")
    .when(col("Age") >= 70, "70+")
    .otherwise("Unknown")
    .alias("AgeSegments_selectMethod")
)

In [23]:
titanic_data2.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+------------------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|AgeSegments_selectMethod|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+------------------------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|                   18-34|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|                   35-49|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|                   18-34|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|                   35-49|
|          5|       