## Working with Different Types of Data


### Step 1: Initialize PySpark Session


In [68]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("day3").getOrCreate()


### Step 2: Load the Dataset


In [69]:
# Load the Chipotle dataset into a Spark DataFrame
data_path = "../data/titanic.csv"  # Replace with the actual path
titanic_df = spark.read.csv(data_path, header=True, inferSchema=True)

# Load the Chipotle dataset into a Spark DataFrame
data_path = '../data/chipotle.csv' # Replace with the actual path
chipotle_df = spark.read.csv(data_path, header=True, inferSchema=True)

# Load the Chipotle dataset into a Spark DataFrame
data_path = '../data/kalimati_tarkari_dataset.csv' # Replace with the actual path
kalimati_df = spark.read.csv(data_path, header=True, inferSchema=True)


In [70]:
# Display schema of all the dataset
print(titanic_df.printSchema(),chipotle_df.printSchema(),kalimati_df.printSchema())

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)

root
 |-- _c0: integer (nullable = true)
 |-- order_id: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- item_name: string (nullable = true)
 |-- choice_description: string (nullable = true)
 |-- item_price: string (nullable = true)

root
 |-- SN: integer (nullable = true)
 |-- Commodity: string (nullable = true)
 |-- Date: date (nullable = true)
 |-- Unit: string (nullable = true)
 |-- Minimum: double (nullable = true)
 |-- Maximum: double (nullable = true)
 |-- Average: double (nullable = true)

None None N

### Converting to Spark Types:

Question: Load the "titanic" dataset and convert the "Fare" column from double to integer.




In [71]:
from pyspark.sql.functions import col

# Convert the "Fare" column from double to integer
titanic_df = titanic_df.withColumn("fare", col("fare").cast("int"))

# Display schema
titanic_df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- fare: integer (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



### Working with Booleans:

Question: Load the "titanic" dataset and add a new column "IsAdult" that indicates whether a passenger is an adult (age >= 18) or not.

In [72]:
from pyspark.sql.functions import expr

# Add a new column "IsAdult" that indicates whether a passenger is an adult (age >= 18) or not.
titanic_df.withColumn("IsAdult", expr("Age >= 18")).show()


+-----------+--------+------+--------------------+------+----+-----+-----+----------------+----+-----+--------+-------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|fare|Cabin|Embarked|IsAdult|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+----+-----+--------+-------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7| null|       S|   true|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|  71|  C85|       C|   true|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|   7| null|       S|   true|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|  53| C123|       S|   true|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8| null|       S|   true|
|          6|       0|     3|    Moran, 

### Working with Numbers:

Question: Load the "titanic" dataset and calculate the average age of male and female passengers separately.

In [73]:
from pyspark.sql.functions import avg

# Calculate the average age of male and female passengers separately
average_age_by_sex = titanic_df.groupBy("Sex").agg(avg("Age").alias("AverageAge")).show()


+------+------------------+
|   Sex|        AverageAge|
+------+------------------+
|female|27.915708812260537|
|  male| 30.72664459161148|
+------+------------------+



### Working with Strings:

Question: Load the "chipotle" dataset and find the item names containing the word "Chicken."

In [74]:
# Find the item names containing the word "Chicken."
chipotle_df.filter(chipotle_df.item_name.contains("Chicken")).show()

+---+--------+--------+--------------------+--------------------+----------+
|_c0|order_id|quantity|           item_name|  choice_description|item_price|
+---+--------+--------+--------------------+--------------------+----------+
|  4|       2|       2|        Chicken Bowl|[Tomatillo-Red Ch...|   $16.98 |
|  5|       3|       1|        Chicken Bowl|[Fresh Tomato Sal...|   $10.98 |
| 11|       6|       1|Chicken Crispy Tacos|[Roasted Chili Co...|    $8.75 |
| 12|       6|       1|  Chicken Soft Tacos|[Roasted Chili Co...|    $8.75 |
| 13|       7|       1|        Chicken Bowl|[Fresh Tomato Sal...|   $11.25 |
| 16|       8|       1|     Chicken Burrito|[Tomatillo-Green ...|    $8.49 |
| 17|       9|       1|     Chicken Burrito|[Fresh Tomato Sal...|    $8.49 |
| 19|      10|       1|        Chicken Bowl|[Tomatillo Red Ch...|    $8.75 |
| 23|      12|       1|     Chicken Burrito|[[Tomatillo-Green...|   $10.98 |
| 26|      13|       1|        Chicken Bowl|[Roasted Chili Co...|    $8.49 |

23/09/04 12:56:22 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , order_id, quantity, item_name, choice_description, item_price
 Schema: _c0, order_id, quantity, item_name, choice_description, item_price
Expected: _c0 but found: 
CSV file: file:///home/user/Documents/Fusemachines/Spark%2029.08.23/data/chipotle.csv


### Regular Expressions:

Question: Load the "chipotle" dataset and find the items with names that start with "Ch" followed by any character.



In [75]:
# Find the items with names that start with "Ch" followed by any character
chipotle_df.filter(col("item_name").rlike(r'^Ch')).show()

+---+--------+--------+--------------------+--------------------+----------+
|_c0|order_id|quantity|           item_name|  choice_description|item_price|
+---+--------+--------+--------------------+--------------------+----------+
|  0|       1|       1|Chips and Fresh T...|                null|    $2.39 |
|  3|       1|       1|Chips and Tomatil...|                null|    $2.39 |
|  4|       2|       2|        Chicken Bowl|[Tomatillo-Red Ch...|   $16.98 |
|  5|       3|       1|        Chicken Bowl|[Fresh Tomato Sal...|   $10.98 |
| 10|       5|       1| Chips and Guacamole|                null|    $4.45 |
| 11|       6|       1|Chicken Crispy Tacos|[Roasted Chili Co...|    $8.75 |
| 12|       6|       1|  Chicken Soft Tacos|[Roasted Chili Co...|    $8.75 |
| 13|       7|       1|        Chicken Bowl|[Fresh Tomato Sal...|   $11.25 |
| 14|       7|       1| Chips and Guacamole|                null|    $4.45 |
| 15|       8|       1|Chips and Tomatil...|                null|    $2.39 |

23/09/04 12:56:24 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , order_id, quantity, item_name, choice_description, item_price
 Schema: _c0, order_id, quantity, item_name, choice_description, item_price
Expected: _c0 but found: 
CSV file: file:///home/user/Documents/Fusemachines/Spark%2029.08.23/data/chipotle.csv


### Working with Nulls in Data:

Question: Load the "titanic" dataset and count the number of passengers with missing age information.



In [76]:
from pyspark.sql.functions import col

# Count the number of passengers with missing age information
missing_age_titatnic = titanic_df.filter(col("Age").isNull()
                                         ).count()

# Display the result
print("Number of passengers with missing age:", missing_age_titatnic)

Number of passengers with missing age: 177


### Coalesce
Question: Utilizing the Chipotle dataset, use the coalesce function to combine the "item_name" and "choice_description" columns into a new column named "OrderDetails." Display the first 5 rows of the resulting DataFrame.

In [77]:
from pyspark.sql.functions import coalesce

# Coalesce function to combine the "item_name" and "choice_description" columns into a new column named "OrderDetails."
chipotle_df.select("*", coalesce(col("item_name"), 
                                 col("choice_description")
                                 ).alias("OrderDetails")).show(5)

+---+--------+--------+--------------------+--------------------+----------+--------------------+
|_c0|order_id|quantity|           item_name|  choice_description|item_price|        OrderDetails|
+---+--------+--------+--------------------+--------------------+----------+--------------------+
|  0|       1|       1|Chips and Fresh T...|                null|    $2.39 |Chips and Fresh T...|
|  1|       1|       1|                Izze|        [Clementine]|    $3.39 |                Izze|
|  2|       1|       1|    Nantucket Nectar|             [Apple]|    $3.39 |    Nantucket Nectar|
|  3|       1|       1|Chips and Tomatil...|                null|    $2.39 |Chips and Tomatil...|
|  4|       2|       2|        Chicken Bowl|[Tomatillo-Red Ch...|   $16.98 |        Chicken Bowl|
+---+--------+--------+--------------------+--------------------+----------+--------------------+
only showing top 5 rows



23/09/04 12:56:25 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , order_id, quantity, item_name, choice_description, item_price
 Schema: _c0, order_id, quantity, item_name, choice_description, item_price
Expected: _c0 but found: 
CSV file: file:///home/user/Documents/Fusemachines/Spark%2029.08.23/data/chipotle.csv


### ifnull, nullIf, nvl, and nvl2

Question: Replace the null values in the "Age" column of the Titanic dataset with the average age.

In [78]:
from pyspark.sql.functions import avg

# Calculate the average value of the "Age" column
average_value = titanic_df.select(avg("Age")).collect()[0][0]

# Fill missing values in the "Age" column with the calculated average value
titanic_nnull = titanic_df.fillna({"Age": average_value})

# Dislplay the result
titanic_nnull.show()

+-----------+--------+------+--------------------+------+-----------------+-----+-----+----------------+----+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex|              Age|SibSp|Parch|          Ticket|fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+-----------------+-----+-----+----------------+----+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|             22.0|    1|    0|       A/5 21171|   7| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|             38.0|    1|    0|        PC 17599|  71|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|             26.0|    0|    0|STON/O2. 3101282|   7| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|             35.0|    1|    0|          113803|  53| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|             35.0|    0|    0|          373450|   8| null|       S|


### drop

Question: Remove the "Cabin" column from the Titanic dataset.


In [79]:
# DROP COLUMN "Cabin"
titanic_df = titanic_df.drop("Cabin")

#Display the result
titanic_df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|fare|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|  71|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|   7|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|  53|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|          330877|   8|       Q|
|          7|       0|     1|McCarthy, Mr. Tim

### fill

Question: Fill the null values in the "Age" column of the Titanic dataset with a default age of 30.

In [80]:
default = 30

# Fill missing values in the "Age" column with the default value of 30
titanic_df.fillna(default, subset=["Age"])

# Display result
titanic_df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|fare|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|  71|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|   7|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|  53|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|          330877|   8|       Q|
|          7|       0|     1|McCarthy, Mr. Tim

###  replace

Question: Replace the gender "male" with "M" and "female" with "F" in the "Sex" column of the Titanic dataset.

In [81]:
from pyspark.sql.functions import when, col


#Replace the gender "male" with "M" and "female" with "F" in the "Sex"
titanic_df.withColumn("Sex",\
                       when(col("Sex") == "male", "M")\
                       .otherwise("F")).show()

+-----------+--------+------+--------------------+---+----+-----+-----+----------------+----+--------+
|PassengerId|Survived|Pclass|                Name|Sex| Age|SibSp|Parch|          Ticket|fare|Embarked|
+-----------+--------+------+--------------------+---+----+-----+-----+----------------+----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  M|22.0|    1|    0|       A/5 21171|   7|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|  F|38.0|    1|    0|        PC 17599|  71|       C|
|          3|       1|     3|Heikkinen, Miss. ...|  F|26.0|    0|    0|STON/O2. 3101282|   7|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|  F|35.0|    1|    0|          113803|  53|       S|
|          5|       0|     3|Allen, Mr. Willia...|  M|35.0|    0|    0|          373450|   8|       S|
|          6|       0|     3|    Moran, Mr. James|  M|null|    0|    0|          330877|   8|       Q|
|          7|       0|     1|McCarthy, Mr. Tim...|  M|54.0|    0|    0|  

### 6. Working with Complex Types: Structs

Question: Create a new DataFrame from the Kalimati Tarkari dataset, including a new column "PriceRange" that is a struct containing "Minimum" and "Maximum" prices for each commodity.

### Working with Complex Types: Arrays
Question: Create a new DataFrame from the Kalimati Tarkari dataset, including a new column "CommodityList" that is an array of all the commodities.


In [82]:
from pyspark.sql.functions import split

# Split the "Commodity" column by space delimiter " " and create a new column "CommodityList"
k_df = kalimati_df.select("*", split(col("Commodity"), "   ").alias("CommodityList"))

# Display the resultant dataset
k_df.show(truncate=False)

+---+--------------------+----------+----+-------+-------+-------+----------------------+
|SN |Commodity           |Date      |Unit|Minimum|Maximum|Average|CommodityList         |
+---+--------------------+----------+----+-------+-------+-------+----------------------+
|0  |Tomato Big(Nepali)  |2013-06-16|Kg  |35.0   |40.0   |37.5   |[Tomato Big(Nepali)]  |
|1  |Tomato Small(Local) |2013-06-16|Kg  |26.0   |32.0   |29.0   |[Tomato Small(Local)] |
|2  |Potato Red          |2013-06-16|Kg  |20.0   |21.0   |20.5   |[Potato Red]          |
|3  |Potato White        |2013-06-16|Kg  |15.0   |16.0   |15.5   |[Potato White]        |
|4  |Onion Dry (Indian)  |2013-06-16|Kg  |28.0   |30.0   |29.0   |[Onion Dry (Indian)]  |
|5  |Carrot(Local)       |2013-06-16|Kg  |30.0   |35.0   |32.5   |[Carrot(Local)]       |
|6  |Cabbage(Local)      |2013-06-16|Kg  |6.0    |10.0   |8.0    |[Cabbage(Local)]      |
|7  |Cauli Local         |2013-06-16|Kg  |30.0   |35.0   |32.5   |[Cauli Local]         |
|8  |Raddi

### Working with Complex Types: explode

Question: Explode the "CommodityList" array column from the previous step to generate a new row for each commodity in the list.

In [83]:
from pyspark.sql.functions import explode

# Explode the "CommodityList" array column from the previous step to generate a new row for each commodity in the list.
exploded_df = k_df.select("SN", "Date", "Unit", "Minimum", "Maximum", "Average", explode("CommodityList").alias("Commodity"))

# Display the resulting dataset
exploded_df.show(truncate=False)

+---+----------+----+-------+-------+-------+--------------------+
|SN |Date      |Unit|Minimum|Maximum|Average|Commodity           |
+---+----------+----+-------+-------+-------+--------------------+
|0  |2013-06-16|Kg  |35.0   |40.0   |37.5   |Tomato Big(Nepali)  |
|1  |2013-06-16|Kg  |26.0   |32.0   |29.0   |Tomato Small(Local) |
|2  |2013-06-16|Kg  |20.0   |21.0   |20.5   |Potato Red          |
|3  |2013-06-16|Kg  |15.0   |16.0   |15.5   |Potato White        |
|4  |2013-06-16|Kg  |28.0   |30.0   |29.0   |Onion Dry (Indian)  |
|5  |2013-06-16|Kg  |30.0   |35.0   |32.5   |Carrot(Local)       |
|6  |2013-06-16|Kg  |6.0    |10.0   |8.0    |Cabbage(Local)      |
|7  |2013-06-16|Kg  |30.0   |35.0   |32.5   |Cauli Local         |
|8  |2013-06-16|Kg  |35.0   |40.0   |37.5   |Raddish Red         |
|9  |2013-06-16|Kg  |25.0   |30.0   |27.5   |Raddish White(Local)|
|10 |2013-06-16|Kg  |16.0   |18.0   |17.0   |Brinjal Long        |
|11 |2013-06-16|Kg  |20.0   |22.0   |21.0   |Brinjal Round    

### Working with Complex Types: Maps

Question: Create a new DataFrame from the Kalimati Tarkari dataset, including a new column "PriceMap" that is a map with "Commodity" as the key and "Average" price as the value.
Answer:

In [87]:
from pyspark.sql.functions import create_map

# Creating new column "PriceMap" that is a map with "Commodity" as the key and "Average" price as the value
kalimati_tarkari= kalimati_df.select("*", create_map(col("Commodity"), col("Average")).alias("PriceMap"))\

## Display "kalimati_tarkari" DataFrame        
kalimati_tarkari.show(truncate=False)

+---+--------------------+----------+----+-------+-------+-------+------------------------------+
|SN |Commodity           |Date      |Unit|Minimum|Maximum|Average|PriceMap                      |
+---+--------------------+----------+----+-------+-------+-------+------------------------------+
|0  |Tomato Big(Nepali)  |2013-06-16|Kg  |35.0   |40.0   |37.5   |{Tomato Big(Nepali) -> 37.5}  |
|1  |Tomato Small(Local) |2013-06-16|Kg  |26.0   |32.0   |29.0   |{Tomato Small(Local) -> 29.0} |
|2  |Potato Red          |2013-06-16|Kg  |20.0   |21.0   |20.5   |{Potato Red -> 20.5}          |
|3  |Potato White        |2013-06-16|Kg  |15.0   |16.0   |15.5   |{Potato White -> 15.5}        |
|4  |Onion Dry (Indian)  |2013-06-16|Kg  |28.0   |30.0   |29.0   |{Onion Dry (Indian) -> 29.0}  |
|5  |Carrot(Local)       |2013-06-16|Kg  |30.0   |35.0   |32.5   |{Carrot(Local) -> 32.5}       |
|6  |Cabbage(Local)      |2013-06-16|Kg  |6.0    |10.0   |8.0    |{Cabbage(Local) -> 8.0}       |
|7  |Cauli Local    

### Working with JSON

Question: Convert the "kalimati_df" DataFrame to JSON format and write it to a JSON file.

In [85]:
# Convert the PySpark DataFrame "kalimati_df" to a JSON format and collect the results
kalimati_json = kalimati_df.toJSON().collect()

filename = "Kalimati.json"

# Open the file in write mode and write each JSON row to the file
with open(filename, "w") as f:
    for json_row in kalimati_json:
        f.write(json_row + "\n")

# Print a message indicating that the data has been successfully written to the file
print("Data written to", filename)

                                                                                

Data written to Kalimati.json


In [86]:
df = spark.read.json(spark.sparkContext.parallelize(kalimati_json))
df.show()

23/09/04 12:56:30 WARN TaskSetManager: Stage 117 contains a task of very large size (5701 KiB). The maximum recommended task size is 1000 KiB.


+-------+--------------------+----------+-------+-------+---+----+
|Average|           Commodity|      Date|Maximum|Minimum| SN|Unit|
+-------+--------------------+----------+-------+-------+---+----+
|   37.5|  Tomato Big(Nepali)|2013-06-16|   40.0|   35.0|  0|  Kg|
|   29.0| Tomato Small(Local)|2013-06-16|   32.0|   26.0|  1|  Kg|
|   20.5|          Potato Red|2013-06-16|   21.0|   20.0|  2|  Kg|
|   15.5|        Potato White|2013-06-16|   16.0|   15.0|  3|  Kg|
|   29.0|  Onion Dry (Indian)|2013-06-16|   30.0|   28.0|  4|  Kg|
|   32.5|       Carrot(Local)|2013-06-16|   35.0|   30.0|  5|  Kg|
|    8.0|      Cabbage(Local)|2013-06-16|   10.0|    6.0|  6|  Kg|
|   32.5|         Cauli Local|2013-06-16|   35.0|   30.0|  7|  Kg|
|   37.5|         Raddish Red|2013-06-16|   40.0|   35.0|  8|  Kg|
|   27.5|Raddish White(Local)|2013-06-16|   30.0|   25.0|  9|  Kg|
|   17.0|        Brinjal Long|2013-06-16|   18.0|   16.0| 10|  Kg|
|   21.0|       Brinjal Round|2013-06-16|   22.0|   20.0| 11| 

23/09/04 12:56:31 WARN TaskSetManager: Stage 118 contains a task of very large size (5701 KiB). The maximum recommended task size is 1000 KiB.
