## Import the necessary modules ##

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *



## creating spark session ## 

In [2]:
spark = SparkSession.builder.appName("PySpark_assign").getOrCreate()

23/05/21 19:37:12 WARN Utils: Your hostname, mhussam resolves to a loopback address: 127.0.1.1; using 192.168.185.62 instead (on interface wlp3s0)
23/05/21 19:37:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/21 19:37:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/05/21 19:37:14 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/05/21 19:37:14 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
23/05/21 19:37:14 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
23/05/21 19:37:14 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.


## Reading the data from the CSV file into a DataFrame ##

In [3]:
titanic = spark.read.option("header", "true").option("inferSchema", "true").option("header", "false").csv("data/titanic.csv")
## Defining  the column names for header row
columns = ["PassengerId", "Survived", "Pclass", "Name", "Sex", "Age", "SibSp", "Parch", "Ticket", "Fare", "Cabin", "Embarked", "Timestamp"]
#Adding the column names as header row in the DataFrame
titanic = titanic.toDF(*columns)
titanic.show()



+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-------------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|          Timestamp|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-------------------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|  22|    1|    0|       A/5 21171|   7.25| null|       S|2020-01-01 13:45:25|
|          2|       1|     1|Cumings, Mrs. Joh...|female|  38|    1|    0|        PC 17599|71.2833|  C85|       C|2020-01-01 13:44:48|
|          3|       1|     3|Heikkinen, Miss. ...|female|  26|    0|    0|STON/O2. 3101282|  7.925| null|       S|2020-01-01 13:38:11|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|  35|    1|    0|          113803|   53.1| C123|       S|2020-01-01 13:32:00|
|          5|       0|     3|Allen, Mr. Willia...|  mal

## Displaying the Dataframe schema ##

In [4]:
titanic.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- Timestamp: timestamp (nullable = true)



## modified the schema by changing the data type of the "Survived" column to string and converting the binary values into strings for categorization purposes ##

In [5]:
titanic = titanic.withColumn("Survived", when(col("Survived") == 1, "YES").otherwise("NO").cast("string"))
titanic.show()


+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-------------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|          Timestamp|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-------------------+
|          1|      NO|     3|Braund, Mr. Owen ...|  male|  22|    1|    0|       A/5 21171|   7.25| null|       S|2020-01-01 13:45:25|
|          2|     YES|     1|Cumings, Mrs. Joh...|female|  38|    1|    0|        PC 17599|71.2833|  C85|       C|2020-01-01 13:44:48|
|          3|     YES|     3|Heikkinen, Miss. ...|female|  26|    0|    0|STON/O2. 3101282|  7.925| null|       S|2020-01-01 13:38:11|
|          4|     YES|     1|Futrelle, Mrs. Ja...|female|  35|    1|    0|          113803|   53.1| C123|       S|2020-01-01 13:32:00|
|          5|      NO|     3|Allen, Mr. Willia...|  mal

In [6]:
## updated schema 
titanic.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: string (nullable = false)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- Timestamp: timestamp (nullable = true)



In [7]:
titanic.show(5)

+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+-------------------+
|PassengerId|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|          Timestamp|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+-------------------+
|          1|      NO|     3|Braund, Mr. Owen ...|  male| 22|    1|    0|       A/5 21171|   7.25| null|       S|2020-01-01 13:45:25|
|          2|     YES|     1|Cumings, Mrs. Joh...|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|2020-01-01 13:44:48|
|          3|     YES|     3|Heikkinen, Miss. ...|female| 26|    0|    0|STON/O2. 3101282|  7.925| null|       S|2020-01-01 13:38:11|
|          4|     YES|     1|Futrelle, Mrs. Ja...|female| 35|    1|    0|          113803|   53.1| C123|       S|2020-01-01 13:32:00|
|          5|      NO|     3|Allen, Mr. Willia...|  male| 35| 

### Calculating the minimum, maximum and mean values of the numerical data ##

In [8]:
numerical_cols = [col_name for col_name, col_type in titanic.dtypes if col_type in ['int', 'bigint', 'float', 'double']]
numerical_df = titanic.select(*numerical_cols)


min_max_mean=numerical_df.describe()
min_max_mean.select(numerical_cols).summary('min','max','mean').show()

23/05/21 19:37:21 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+------------------+------------------+------------------+------------------+-----------------+------------------+
|summary|       PassengerId|            Pclass|               Age|             SibSp|            Parch|              Fare|
+-------+------------------+------------------+------------------+------------------+-----------------+------------------+
|    min|                 1|0.8360712409770491|                 0|                 0|                0|               0.0|
|    max|               891|               891|                80|               891|              891|               891|
|   mean|497.27076840304596|179.62894264325715|167.64315089562422|180.12515025772694|179.6375301872114|297.04536731315113|
+-------+------------------+------------------+------------------+------------------+-----------------+------------------+



#### For categorical columns, create and apply UDF that will change the last letter of every word to “1”

In [9]:
cat_columns = ["Sex","Cabin","Embarked","Survived"]

def change_last_letter_after_space(word):
    if word is not None:
        words = word.split()
        for i in range(len(words)):
            words[i] = words[i][:-1] + "1"
        return " ".join(words)
    return word
change_last_letter_udf = udf(change_last_letter_after_space, StringType())
for column in cat_columns:
    titanic = titanic.withColumn(column, change_last_letter_udf(titanic[column]))
titanic.show(10)
    

[Stage 8:>                                                          (0 + 1) / 1]

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-------------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|          Timestamp|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-------------------+
|          1|      N1|     3|Braund, Mr. Owen ...|  mal1|  22|    1|    0|       A/5 21171|   7.25| null|       1|2020-01-01 13:45:25|
|          2|     YE1|     1|Cumings, Mrs. Joh...|femal1|  38|    1|    0|        PC 17599|71.2833|  C81|       1|2020-01-01 13:44:48|
|          3|     YE1|     3|Heikkinen, Miss. ...|femal1|  26|    0|    0|STON/O2. 3101282|  7.925| null|       1|2020-01-01 13:38:11|
|          4|     YE1|     1|Futrelle, Mrs. Ja...|femal1|  35|    1|    0|          113803|   53.1| C121|       1|2020-01-01 13:32:00|
|          5|      N1|     3|Allen, Mr. Willia...|  mal

                                                                                

## sorting the df by first column 

In [10]:
sorted_df = titanic.orderBy(col(titanic.columns[0]))

## saving the results to the Parquet file.

In [11]:
sorted_df.write.mode("overwrite").parquet("output/titanic_data.parquet")

                                                                                