## Working with Different Types of Data


### Step 1: Initialize PySpark Session


In [92]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, lit , avg, coalesce , struct,array , explode, create_map
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

# Create a Spark session
spark = SparkSession.builder.appName("day3").getOrCreate()


### Step 2: Load the Dataset


In [67]:
# Load the Chipotle dataset into a Spark DataFrame
data_path = "./titanic.csv"  # Replace with the actual path
titanic = spark.read.csv(data_path, header=True, inferSchema=True)

data_path = "./titanic.csv"  # Replace with the actual path
titanic2 = spark.read.csv(data_path, header=True, inferSchema=True)

# Load the Chipotle dataset into a Spark DataFrame
data_path = './chipotle (1).csv' # Replace with the actual path
chipotle= spark.read.csv(data_path, header=True, inferSchema=True)

# Load the Chipotle dataset into a Spark DataFrame
data_path = './kalimati_tarkari_dataset.csv' # Replace with the actual path
kalimati = spark.read.csv(data_path, header=True, inferSchema=True)


In [42]:
print(titanic.printSchema(),chipotle.printSchema(),kalimati.printSchema())

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)

root
 |-- _c0: integer (nullable = true)
 |-- order_id: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- item_name: string (nullable = true)
 |-- choice_description: string (nullable = true)
 |-- item_price: string (nullable = true)

root
 |-- SN: integer (nullable = true)
 |-- Commodity: string (nullable = true)
 |-- Date: date (nullable = true)
 |-- Unit: string (nullable = true)
 |-- Minimum: double (nullable = true)
 |-- Maximum: double (nullable = true)
 |-- Average: double (nullable = true)

None None N

### Converting to Spark Types:

Question: Load the "titanic" dataset and convert the "Fare" column from double to integer.




In [43]:
titanic = titanic.withColumn("Fare", col("Fare").cast("integer"))

titanic.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: integer (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



### Working with Booleans:

Question: Load the "titanic" dataset and add a new column "IsAdult" that indicates whether a passenger is an adult (age >= 18) or not.

In [44]:
titanic = titanic.withColumn("IsAdult", 
                             when(col("Age") >= 18, True)
                             .otherwise(False))

titanic.show()


+-----------+--------+------+--------------------+------+----+-----+-----+----------------+----+-----+--------+-------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|Fare|Cabin|Embarked|IsAdult|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+----+-----+--------+-------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7| null|       S|   true|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|  71|  C85|       C|   true|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|   7| null|       S|   true|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|  53| C123|       S|   true|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8| null|       S|   true|
|          6|       0|     3|    Moran, 

### Working with Numbers:

Question: Load the "titanic" dataset and calculate the average age of male and female passengers separately.

In [45]:
# grouping the dataset by gender and calculating the average age for each Sex.

average_age = titanic.groupBy("Sex").agg(avg("Age").alias("AvgAge"))

#Showing result
average_age.show()



+------+------------------+
|   Sex|            AvgAge|
+------+------------------+
|female|27.915708812260537|
|  male| 30.72664459161148|
+------+------------------+



### Working with Strings:

Question: Load the "chipotle" dataset and find the item names containing the word "Chicken."

In [46]:
#creating new dataframe by filtering item_name having chicken in it

chicken_df = chipotle.filter(col("item_name").like("%Chicken%"))

#Showing filtered output
chicken_df.show()

+---+--------+--------+--------------------+--------------------+----------+
|_c0|order_id|quantity|           item_name|  choice_description|item_price|
+---+--------+--------+--------------------+--------------------+----------+
|  4|       2|       2|        Chicken Bowl|[Tomatillo-Red Ch...|   $16.98 |
|  5|       3|       1|        Chicken Bowl|[Fresh Tomato Sal...|   $10.98 |
| 11|       6|       1|Chicken Crispy Tacos|[Roasted Chili Co...|    $8.75 |
| 12|       6|       1|  Chicken Soft Tacos|[Roasted Chili Co...|    $8.75 |
| 13|       7|       1|        Chicken Bowl|[Fresh Tomato Sal...|   $11.25 |
| 16|       8|       1|     Chicken Burrito|[Tomatillo-Green ...|    $8.49 |
| 17|       9|       1|     Chicken Burrito|[Fresh Tomato Sal...|    $8.49 |
| 19|      10|       1|        Chicken Bowl|[Tomatillo Red Ch...|    $8.75 |
| 23|      12|       1|     Chicken Burrito|[[Tomatillo-Green...|   $10.98 |
| 26|      13|       1|        Chicken Bowl|[Roasted Chili Co...|    $8.49 |

23/08/31 15:53:39 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , order_id, quantity, item_name, choice_description, item_price
 Schema: _c0, order_id, quantity, item_name, choice_description, item_price
Expected: _c0 but found: 
CSV file: file:///home/rojesh/Documents/spark-training/day3/chipotle%20(1).csv


### Regular Expressions:

Question: Load the "chipotle" dataset and find the items with names that start with "Ch" followed by any character.



In [47]:
#filtering dataframe to include only those with names starting with "Ch"

chip_df = chipotle.filter(col("item_name").startswith("Ch"))

#showing the filtered items
chip_df.show()

+---+--------+--------+--------------------+--------------------+----------+
|_c0|order_id|quantity|           item_name|  choice_description|item_price|
+---+--------+--------+--------------------+--------------------+----------+
|  0|       1|       1|Chips and Fresh T...|                null|    $2.39 |
|  3|       1|       1|Chips and Tomatil...|                null|    $2.39 |
|  4|       2|       2|        Chicken Bowl|[Tomatillo-Red Ch...|   $16.98 |
|  5|       3|       1|        Chicken Bowl|[Fresh Tomato Sal...|   $10.98 |
| 10|       5|       1| Chips and Guacamole|                null|    $4.45 |
| 11|       6|       1|Chicken Crispy Tacos|[Roasted Chili Co...|    $8.75 |
| 12|       6|       1|  Chicken Soft Tacos|[Roasted Chili Co...|    $8.75 |
| 13|       7|       1|        Chicken Bowl|[Fresh Tomato Sal...|   $11.25 |
| 14|       7|       1| Chips and Guacamole|                null|    $4.45 |
| 15|       8|       1|Chips and Tomatil...|                null|    $2.39 |

23/08/31 15:53:41 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , order_id, quantity, item_name, choice_description, item_price
 Schema: _c0, order_id, quantity, item_name, choice_description, item_price
Expected: _c0 but found: 
CSV file: file:///home/rojesh/Documents/spark-training/day3/chipotle%20(1).csv


### Working with Nulls in Data:

Question: Load the "titanic" dataset and count the number of passengers with missing age information.



In [48]:
#counting the number of passengers with null in age

missing_age_count = titanic.filter(col("Age").isNull()).count()


#printing the result
print("Number of passengers with missing age :", missing_age_count)


Number of passengers with missing age : 177


### Coalesce
Question: Utilizing the Chipotle dataset, use the coalesce function to combine the "item_name" and "choice_description" columns into a new column named "OrderDetails." Display the first 5 rows of the resulting DataFrame.

In [49]:
#combining "item_name" and "choice_description" columns into "OrderDetails"

chipotle_df = chip_df.withColumn("OrderDetails", coalesce(col("item_name"), col("choice_description")))

#displaying the first 5 rows of the result

chipotle_df.show(5)


+---+--------+--------+--------------------+--------------------+----------+--------------------+
|_c0|order_id|quantity|           item_name|  choice_description|item_price|        OrderDetails|
+---+--------+--------+--------------------+--------------------+----------+--------------------+
|  0|       1|       1|Chips and Fresh T...|                null|    $2.39 |Chips and Fresh T...|
|  3|       1|       1|Chips and Tomatil...|                null|    $2.39 |Chips and Tomatil...|
|  4|       2|       2|        Chicken Bowl|[Tomatillo-Red Ch...|   $16.98 |        Chicken Bowl|
|  5|       3|       1|        Chicken Bowl|[Fresh Tomato Sal...|   $10.98 |        Chicken Bowl|
| 10|       5|       1| Chips and Guacamole|                null|    $4.45 | Chips and Guacamole|
+---+--------+--------+--------------------+--------------------+----------+--------------------+
only showing top 5 rows



23/08/31 15:53:44 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , order_id, quantity, item_name, choice_description, item_price
 Schema: _c0, order_id, quantity, item_name, choice_description, item_price
Expected: _c0 but found: 
CSV file: file:///home/rojesh/Documents/spark-training/day3/chipotle%20(1).csv


### ifnull, nullIf, nvl, and nvl2

Question: Replace the null values in the "Age" column of the Titanic dataset with the average age.

In [64]:
#replacing null values in "Age" column with the average_age

titanic_df = titanic.na.fill(average_age, subset=["Age"])


#showing the resulting dataset

titanic_df.show()



+-----------+--------+------+--------------------+------+-----------------+-----+-----+----------------+----+--------+-------+
|PassengerId|Survived|Pclass|                Name|   Sex|              Age|SibSp|Parch|          Ticket|Fare|Embarked|IsAdult|
+-----------+--------+------+--------------------+------+-----------------+-----+-----+----------------+----+--------+-------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|             22.0|    1|    0|       A/5 21171|   7|       S|   true|
|          2|       1|     1|Cumings, Mrs. Joh...|female|             38.0|    1|    0|        PC 17599|  71|       C|   true|
|          3|       1|     3|Heikkinen, Miss. ...|female|             26.0|    0|    0|STON/O2. 3101282|   7|       S|   true|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|             35.0|    1|    0|          113803|  53|       S|   true|
|          5|       0|     3|Allen, Mr. Willia...|  male|             35.0|    0|    0|          373450|   8|  

### drop

Question: Remove the "Cabin" column from the Titanic dataset.


In [66]:
#dropping Cabin column from titanic dataset
titanic = titanic.drop("Cabin")


#showing the resulting dataset

titanic.show()

+-----------+--------+------+--------------------+------+-----------------+-----+-----+----------------+----+--------+-------+
|PassengerId|Survived|Pclass|                Name|   Sex|              Age|SibSp|Parch|          Ticket|Fare|Embarked|IsAdult|
+-----------+--------+------+--------------------+------+-----------------+-----+-----+----------------+----+--------+-------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|             22.0|    1|    0|       A/5 21171|   7|       S|   true|
|          2|       1|     1|Cumings, Mrs. Joh...|female|             38.0|    1|    0|        PC 17599|  71|       C|   true|
|          3|       1|     3|Heikkinen, Miss. ...|female|             26.0|    0|    0|STON/O2. 3101282|   7|       S|   true|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|             35.0|    1|    0|          113803|  53|       S|   true|
|          5|       0|     3|Allen, Mr. Willia...|  male|             35.0|    0|    0|          373450|   8|  

### fill

Question: Fill the null values in the "Age" column of the Titanic dataset with a default age of 30.

In [68]:
##filling null values in the "Age" column with 30

titanic_df = titanic2.fillna(30, subset=["Age"])

#showing the resulting dataset

titanic_df.show()



+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|30.0|    0|    0|      

###  replace

Question: Replace the gender "male" with "M" and "female" with "F" in the "Sex" column of the Titanic dataset.

In [69]:
#replacing gender values in the Sex column

titanic = titanic.withColumn("Sex", when(col("Sex") == "male", "M").otherwise("F"))


#showing the resulting dataset

titanic.show()



+-----------+--------+------+--------------------+---+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+---+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  M|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|  F|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|  F|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|  F|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  M|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  M|null|    0|    0|          330877| 8.4583| null|  

### 6. Working with Complex Types: Structs

Question: Create a new DataFrame from the Kalimati Tarkari dataset, including a new column "PriceRange" that is a struct containing "Minimum" and "Maximum" prices for each commodity.

In [74]:
#creatinh a new column PriceRange with a struct containing maximum and minimum prices


kalimati_df = kalimati.withColumn("PriceRange", struct(col("Minimum"), col("Maximum")))

#showing the resulting dataset

kalimati_df.show()


+---+--------------------+----------+----+-------+-------+-------+------------+
| SN|           Commodity|      Date|Unit|Minimum|Maximum|Average|  PriceRange|
+---+--------------------+----------+----+-------+-------+-------+------------+
|  0|  Tomato Big(Nepali)|2013-06-16|  Kg|   35.0|   40.0|   37.5|{35.0, 40.0}|
|  1| Tomato Small(Local)|2013-06-16|  Kg|   26.0|   32.0|   29.0|{26.0, 32.0}|
|  2|          Potato Red|2013-06-16|  Kg|   20.0|   21.0|   20.5|{20.0, 21.0}|
|  3|        Potato White|2013-06-16|  Kg|   15.0|   16.0|   15.5|{15.0, 16.0}|
|  4|  Onion Dry (Indian)|2013-06-16|  Kg|   28.0|   30.0|   29.0|{28.0, 30.0}|
|  5|       Carrot(Local)|2013-06-16|  Kg|   30.0|   35.0|   32.5|{30.0, 35.0}|
|  6|      Cabbage(Local)|2013-06-16|  Kg|    6.0|   10.0|    8.0| {6.0, 10.0}|
|  7|         Cauli Local|2013-06-16|  Kg|   30.0|   35.0|   32.5|{30.0, 35.0}|
|  8|         Raddish Red|2013-06-16|  Kg|   35.0|   40.0|   37.5|{35.0, 40.0}|
|  9|Raddish White(Local)|2013-06-16|  K

### Working with Complex Types: Arrays
Question: Create a new DataFrame from the Kalimati Tarkari dataset, including a new column "CommodityList" that is an array of all the commodities.


In [79]:
#creating a new column commoditylist with an array of all the commodities

kalimati_df = kalimati_df.withColumn("CommodityList", array(col("Commodity")))

#showing the resulting dataset
kalimati_df.show()

+---+--------------------+----------+----+-------+-------+-------+------------+--------------------+
| SN|           Commodity|      Date|Unit|Minimum|Maximum|Average|  PriceRange|       CommodityList|
+---+--------------------+----------+----+-------+-------+-------+------------+--------------------+
|  0|  Tomato Big(Nepali)|2013-06-16|  Kg|   35.0|   40.0|   37.5|{35.0, 40.0}|[Tomato Big(Nepali)]|
|  1| Tomato Small(Local)|2013-06-16|  Kg|   26.0|   32.0|   29.0|{26.0, 32.0}|[Tomato Small(Loc...|
|  2|          Potato Red|2013-06-16|  Kg|   20.0|   21.0|   20.5|{20.0, 21.0}|        [Potato Red]|
|  3|        Potato White|2013-06-16|  Kg|   15.0|   16.0|   15.5|{15.0, 16.0}|      [Potato White]|
|  4|  Onion Dry (Indian)|2013-06-16|  Kg|   28.0|   30.0|   29.0|{28.0, 30.0}|[Onion Dry (Indian)]|
|  5|       Carrot(Local)|2013-06-16|  Kg|   30.0|   35.0|   32.5|{30.0, 35.0}|     [Carrot(Local)]|
|  6|      Cabbage(Local)|2013-06-16|  Kg|    6.0|   10.0|    8.0| {6.0, 10.0}|    [Cabbage

### Working with Complex Types: explode

Question: Explode the "CommodityList" array column from the previous step to generate a new row for each commodity in the list.

In [90]:
#exploded the "CommodityList" column to generate a new row for each commodity

exploded_df = kalimati_df.select(col("SN"), col("Date"), col("Unit"), col("Minimum"), col("Maximum"), col("Average"), explode(col("CommodityList")).alias("Commodity"))

# Show the first 20 rows of the exploded DataFrame
exploded_df.show(20)

+---+----------+----+-------+-------+-------+--------------------+
| SN|      Date|Unit|Minimum|Maximum|Average|           Commodity|
+---+----------+----+-------+-------+-------+--------------------+
|  0|2013-06-16|  Kg|   35.0|   40.0|   37.5|  Tomato Big(Nepali)|
|  1|2013-06-16|  Kg|   26.0|   32.0|   29.0| Tomato Small(Local)|
|  2|2013-06-16|  Kg|   20.0|   21.0|   20.5|          Potato Red|
|  3|2013-06-16|  Kg|   15.0|   16.0|   15.5|        Potato White|
|  4|2013-06-16|  Kg|   28.0|   30.0|   29.0|  Onion Dry (Indian)|
|  5|2013-06-16|  Kg|   30.0|   35.0|   32.5|       Carrot(Local)|
|  6|2013-06-16|  Kg|    6.0|   10.0|    8.0|      Cabbage(Local)|
|  7|2013-06-16|  Kg|   30.0|   35.0|   32.5|         Cauli Local|
|  8|2013-06-16|  Kg|   35.0|   40.0|   37.5|         Raddish Red|
|  9|2013-06-16|  Kg|   25.0|   30.0|   27.5|Raddish White(Local)|
| 10|2013-06-16|  Kg|   16.0|   18.0|   17.0|        Brinjal Long|
| 11|2013-06-16|  Kg|   20.0|   22.0|   21.0|       Brinjal Ro

### Working with Complex Types: Maps

Question: Create a new DataFrame from the Kalimati Tarkari dataset, including a new column "PriceMap" that is a map with "Commodity" as the key and "Average" price as the value.
Answer:

In [96]:
kalimati_df2 = kalimati.withColumn("PriceMap", create_map(col("Commodity"), col("Average")))


kalimati_df2.show(truncate=False)


+---+--------------------+----------+----+-------+-------+-------+------------------------------+
|SN |Commodity           |Date      |Unit|Minimum|Maximum|Average|PriceMap                      |
+---+--------------------+----------+----+-------+-------+-------+------------------------------+
|0  |Tomato Big(Nepali)  |2013-06-16|Kg  |35.0   |40.0   |37.5   |{Tomato Big(Nepali) -> 37.5}  |
|1  |Tomato Small(Local) |2013-06-16|Kg  |26.0   |32.0   |29.0   |{Tomato Small(Local) -> 29.0} |
|2  |Potato Red          |2013-06-16|Kg  |20.0   |21.0   |20.5   |{Potato Red -> 20.5}          |
|3  |Potato White        |2013-06-16|Kg  |15.0   |16.0   |15.5   |{Potato White -> 15.5}        |
|4  |Onion Dry (Indian)  |2013-06-16|Kg  |28.0   |30.0   |29.0   |{Onion Dry (Indian) -> 29.0}  |
|5  |Carrot(Local)       |2013-06-16|Kg  |30.0   |35.0   |32.5   |{Carrot(Local) -> 32.5}       |
|6  |Cabbage(Local)      |2013-06-16|Kg  |6.0    |10.0   |8.0    |{Cabbage(Local) -> 8.0}       |
|7  |Cauli Local    

### Working with JSON

Question: Convert the "kalimati_df" DataFrame to JSON format and write it to a JSON file.

In [100]:
# Define the output JSON file path
output_json_path = "result_kalimati2.json"

# Write the DataFrame to JSON format
kalimati_df.write.json(output_json_path)