#### Read data from source to DataFrame in local Spark setup and display DataFrame schema.

In [117]:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType,StringType,FloatType,TimestampType
from pyspark.sql.functions import col,sum,count,mean,col,lit

In [118]:
spark = SparkSession.builder.appName("PySpark Assignment").getOrCreate()

titanic = spark.read.option("header", "true").option("inferSchema", "true").option("header", "false").csv("workspace/data/titanic.csv")
titanic.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: integer (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: integer (nullable = true)
 |-- _c6: integer (nullable = true)
 |-- _c7: integer (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: double (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: timestamp (nullable = true)



In [119]:

columns = ["PassengerId","Survived","Pclass","Name","Sex","Age","SibSp","Parch","Ticket","Fare","Cabin","Embarked","Timestamp"]
titanic = titanic.toDF(*columns)

# Option 2: Use withColumnRenamed() method to rename existing columns
for i, col_name in enumerate(titanic.columns):
    titanic = titanic.withColumnRenamed(col_name, columns[i])
titanic.show()


+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-------------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|          Timestamp|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-------------------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|  22|    1|    0|       A/5 21171|   7.25| null|       S|2020-01-01 13:45:25|
|          2|       1|     1|Cumings, Mrs. Joh...|female|  38|    1|    0|        PC 17599|71.2833|  C85|       C|2020-01-01 13:44:48|
|          3|       1|     3|Heikkinen, Miss. ...|female|  26|    0|    0|STON/O2. 3101282|  7.925| null|       S|2020-01-01 13:38:11|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|  35|    1|    0|          113803|   53.1| C123|       S|2020-01-01 13:32:00|
|          5|       0|     3|Allen, Mr. Willia...|  mal

In [120]:
column_types = titanic.dtypes
column_types

[('PassengerId', 'int'),
 ('Survived', 'int'),
 ('Pclass', 'int'),
 ('Name', 'string'),
 ('Sex', 'string'),
 ('Age', 'int'),
 ('SibSp', 'int'),
 ('Parch', 'int'),
 ('Ticket', 'string'),
 ('Fare', 'double'),
 ('Cabin', 'string'),
 ('Embarked', 'string'),
 ('Timestamp', 'timestamp')]

In [121]:
numerical_columns = [column[0] for column in column_types if column[1] in ['int', 'float']]
summary_df = titanic.describe(numerical_columns)
summary_df.select(numerical_columns).summary("max","min","mean").show()


+-------+------------------+------------------+------------------+------------------+------------------+-----------------+
|summary|       PassengerId|          Survived|            Pclass|               Age|             SibSp|            Parch|
+-------+------------------+------------------+------------------+------------------+------------------+-----------------+
|    max|               891|               891|               891|                80|               891|              891|
|    min|                 1|                 0|0.8360712409770491|                 0|                 0|                0|
|   mean|497.27076840304596|178.57408616762064|179.62894264325715|167.64315089562422|180.12515025772694|179.6375301872114|
+-------+------------------+------------------+------------------+------------------+------------------+-----------------+



#### For numerical columns, calculate minimum, maximum and average values.

In [123]:
str_columns = [column[0] for column in column_types if column[1] in ['string']]
str_columns

['Name', 'Sex', 'Ticket', 'Cabin', 'Embarked']

#### For categorical columns, create and apply UDF that will change the last letter of every word to “1”.

In [129]:
# from pyspark.sql.functions import udf

# def change_last_letter(word):
#     if word is not None and len(word) > 1:
#         return word[:-1] + "1"
#     return word

# change_last_letter_udf = udf(lambda word: change_last_letter(word), StringType())

# # Apply the UDF to each categorical column
# for column in str_columns:
#     titanic = titanic.withColumn(column, change_last_letter_udf(titanic[column]))

# # Show the modified DataFrame
# titanic.show()



PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/tmp/ipykernel_312/3049705286.py", line 6, in change_last_letter
TypeError: object of type 'NoneType' has no len()


#### Sort DataFrame by the first column and save the results to the Parquet file.