## Importing Libraries

In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
from pyspark.sql.functions import (max, min, avg, round, col, when, udf)
from pyspark.sql.types import (IntegerType, StringType, FloatType, StructType, TimestampType)

### Create Spark Session

In [3]:
spark = (
    SparkSession.builder.master("local[1]")
    .appName("PySpark Assignment")
    .getOrCreate()
)

In [4]:
# Schema Structure
structure = (
    StructType()
    .add("P_Id", IntegerType())
    .add("Survived", StringType())
    .add("P_class", StringType())
    .add("Name", StringType())
    .add("Gender", StringType())
    .add("Age", IntegerType())
    .add("Sib_Spouse", IntegerType())
    .add("Par_child", IntegerType())
    .add("Ticket", StringType())
    .add("Fare", FloatType())
    .add("Cabin", StringType())
    .add("Embarked", StringType())
    .add("Timestamp", TimestampType())
)

In [5]:
df = spark.read.csv("workspace/data/titanic.csv", schema=structure)
df.show(2)

+----+--------+-------+--------------------+------+---+----------+---------+---------+-------+-----+--------+-------------------+
|P_Id|Survived|P_class|                Name|Gender|Age|Sib_Spouse|Par_child|   Ticket|   Fare|Cabin|Embarked|          Timestamp|
+----+--------+-------+--------------------+------+---+----------+---------+---------+-------+-----+--------+-------------------+
|   1|       0|      3|Braund, Mr. Owen ...|  male| 22|         1|        0|A/5 21171|   7.25| null|       S|2020-01-01 13:45:25|
|   2|       1|      1|Cumings, Mrs. Joh...|female| 38|         1|        0| PC 17599|71.2833|  C85|       C|2020-01-01 13:44:48|
+----+--------+-------+--------------------+------+---+----------+---------+---------+-------+-----+--------+-------------------+
only showing top 2 rows



In [6]:
df = df.withColumn("Survived", when(col("Survived") == 1, "Yes").otherwise("No"))\
       .withColumn("P_class", when(col("P_class") == 1, "1st").when(col("P_class") == 2, '2nd').otherwise('3rd'))\
       .withColumn("Embarked", when(col("Embarked") == 'S', 'Southampton').when(col("Embarked") == 'C', 'Cherbourg').otherwise('Queenstown'))

In [7]:
df.show(5)

+----+--------+-------+--------------------+------+---+----------+---------+----------------+-------+-----+-----------+-------------------+
|P_Id|Survived|P_class|                Name|Gender|Age|Sib_Spouse|Par_child|          Ticket|   Fare|Cabin|   Embarked|          Timestamp|
+----+--------+-------+--------------------+------+---+----------+---------+----------------+-------+-----+-----------+-------------------+
|   1|      No|    3rd|Braund, Mr. Owen ...|  male| 22|         1|        0|       A/5 21171|   7.25| null|Southampton|2020-01-01 13:45:25|
|   2|     Yes|    1st|Cumings, Mrs. Joh...|female| 38|         1|        0|        PC 17599|71.2833|  C85|  Cherbourg|2020-01-01 13:44:48|
|   3|     Yes|    3rd|Heikkinen, Miss. ...|female| 26|         0|        0|STON/O2. 3101282|  7.925| null|Southampton|2020-01-01 13:38:11|
|   4|     Yes|    1st|Futrelle, Mrs. Ja...|female| 35|         1|        0|          113803|   53.1| C123|Southampton|2020-01-01 13:32:00|
|   5|      No|    3

In [8]:
df.printSchema()

root
 |-- P_Id: integer (nullable = true)
 |-- Survived: string (nullable = false)
 |-- P_class: string (nullable = false)
 |-- Name: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Sib_Spouse: integer (nullable = true)
 |-- Par_child: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: float (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = false)
 |-- Timestamp: timestamp (nullable = true)



## For numerical columns, calculate minimum, maximum and average values.

In [9]:
numerical_columns = [column[0] for column in df.dtypes if column[1] in ['int','float']]

numerical_df = df.select(*numerical_columns)
summary_df = numerical_df.select(numerical_columns[1:]).summary('min','max','mean')

In [10]:
summary_df.show()

+-------+------------------+------------------+-------------------+-----------------+
|summary|               Age|        Sib_Spouse|          Par_child|             Fare|
+-------+------------------+------------------+-------------------+-----------------+
|    min|                 0|                 0|                  0|              0.0|
|    max|                80|                 8|                  6|         512.3292|
|   mean|29.679271708683473|0.5230078563411896|0.38159371492704824|32.20420804114722|
+-------+------------------+------------------+-------------------+-----------------+



## For categorical columns, create and apply UDF that will change the last letter of every word to “1”.

In [11]:
def replace_last_letter(x):
    if x is None:
        return '1'
    new_word = x[:-1] + '1'
    return new_word

udf_fun = udf(replace_last_letter, StringType())

new_df = df.select('Survived', 'P_class', 'Gender', 'Embarked')\
           .withColumn('Survived', udf_fun('Survived'))\
           .withColumn('P_class', udf_fun('P_class'))\
           .withColumn('Gender', udf_fun('Gender'))\
           .withColumn('Embarked', udf_fun('Embarked'))

In [12]:
new_df.show(10)

+--------+-------+------+-----------+
|Survived|P_class|Gender|   Embarked|
+--------+-------+------+-----------+
|      N1|    3r1|  mal1|Southampto1|
|     Ye1|    1s1|femal1|  Cherbour1|
|     Ye1|    3r1|femal1|Southampto1|
|     Ye1|    1s1|femal1|Southampto1|
|      N1|    3r1|  mal1|Southampto1|
|      N1|    3r1|  mal1| Queenstow1|
|      N1|    1s1|  mal1|Southampto1|
|      N1|    3r1|  mal1|Southampto1|
|     Ye1|    3r1|femal1|Southampto1|
|     Ye1|    2n1|femal1|  Cherbour1|
+--------+-------+------+-----------+
only showing top 10 rows



## Sort DataFrame by the first column and save the results to the Parquet file.

In [13]:
sorted_df = new_df.orderBy(new_df.columns[0])
sorted_df.show(10)

+--------+-------+------+-----------+
|Survived|P_class|Gender|   Embarked|
+--------+-------+------+-----------+
|      N1|    3r1|  mal1|Southampto1|
|      N1|    1s1|  mal1|  Cherbour1|
|      N1|    3r1|  mal1|Southampto1|
|      N1|    3r1|  mal1| Queenstow1|
|      N1|    3r1|femal1|Southampto1|
|      N1|    3r1|  mal1|Southampto1|
|      N1|    2n1|  mal1|Southampto1|
|      N1|    1s1|  mal1|Southampto1|
|      N1|    3r1|femal1|Southampto1|
|      N1|    3r1|  mal1|Southampto1|
+--------+-------+------+-----------+
only showing top 10 rows



In [20]:
# saved in parquet file
sorted_df.write.parquet("./results")