--------------------------------------------------Read a file in spark--------------------------------------------------

In [0]:
flight_df = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferschema", "false")\
    .option("mode", "PERMISSIVE")\
    .load("/FileStore/tables/flight_csv.csv")

flight_df.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|    1|
|       United States|            Ireland|  264|
|       United States|              India|   69|
|               Egypt|      United States|   24|
|   Equatorial Guinea|      United States|    1|
|       United States|          Singapore|   25|
|       United States|            Grenada|   54|
|          Costa Rica|      United States|  477|
|             Senegal|      United States|   29|
|       United States|   Marshall Islands|   44|
|              Guyana|      United States|   17|
|       United States|       Sint Maarten|   53|
|               Malta|      United States|    1|
|             Bolivia|      United States|   46|
|            Anguilla|      United States|   21|
|Turks and Caicos ...|      United States|  136|
|       United States|        Afghanistan|    2|
|Saint Vincent and..

--------------------------------------------------Retrieve name of the month and day of the week in spark--------------------------------------------------

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
employee_df = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferschema", "false")\
    .option("mode", "PERMISSIVE")\
    .load("/FileStore/tables/employee-1.csv")

employee_df.show()

+---+---------------+---+-----------+---------+
| id|  employee_name|age|joined_date| location|
+---+---------------+---+-----------+---------+
|101|   Aarav Sharma| 28| 2021-06-15|    Delhi|
|102|     Meera Iyer| 32| 2019-09-23|Bangalore|
|103|    Rahul Singh| 26| 2022-01-10|   Mumbai|
|104|   Anjali Verma| 35| 2018-03-05|Hyderabad|
|105|Siddharth Mehta| 29| 2020-11-17|     Pune|
|106|    Neha Kapoor| 31| 2017-12-01|    Delhi|
|107|     Varun Nair| 27| 2023-02-20|  Chennai|
|108|Ishita Malhotra| 30| 2021-04-11|Bangalore|
|109|    Kunal Desai| 34| 2016-08-09|   Mumbai|
|110|    Divya Reddy| 25| 2023-07-01|Hyderabad|
+---+---------------+---+-----------+---------+



In [0]:
employee_df.withColumn('month', month(employee_df['joined_date'])).show()

+---+---------------+---+-----------+---------+-----+
| id|  employee_name|age|joined_date| location|month|
+---+---------------+---+-----------+---------+-----+
|101|   Aarav Sharma| 28| 2021-06-15|    Delhi|    6|
|102|     Meera Iyer| 32| 2019-09-23|Bangalore|    9|
|103|    Rahul Singh| 26| 2022-01-10|   Mumbai|    1|
|104|   Anjali Verma| 35| 2018-03-05|Hyderabad|    3|
|105|Siddharth Mehta| 29| 2020-11-17|     Pune|   11|
|106|    Neha Kapoor| 31| 2017-12-01|    Delhi|   12|
|107|     Varun Nair| 27| 2023-02-20|  Chennai|    2|
|108|Ishita Malhotra| 30| 2021-04-11|Bangalore|    4|
|109|    Kunal Desai| 34| 2016-08-09|   Mumbai|    8|
|110|    Divya Reddy| 25| 2023-07-01|Hyderabad|    7|
+---+---------------+---+-----------+---------+-----+



In [0]:
employee_df.withColumn('month_name', date_format('joined_date', 'MMMM')).show()

+---+---------------+---+-----------+---------+----------+
| id|  employee_name|age|joined_date| location|month_name|
+---+---------------+---+-----------+---------+----------+
|101|   Aarav Sharma| 28| 2021-06-15|    Delhi|      June|
|102|     Meera Iyer| 32| 2019-09-23|Bangalore| September|
|103|    Rahul Singh| 26| 2022-01-10|   Mumbai|   January|
|104|   Anjali Verma| 35| 2018-03-05|Hyderabad|     March|
|105|Siddharth Mehta| 29| 2020-11-17|     Pune|  November|
|106|    Neha Kapoor| 31| 2017-12-01|    Delhi|  December|
|107|     Varun Nair| 27| 2023-02-20|  Chennai|  February|
|108|Ishita Malhotra| 30| 2021-04-11|Bangalore|     April|
|109|    Kunal Desai| 34| 2016-08-09|   Mumbai|    August|
|110|    Divya Reddy| 25| 2023-07-01|Hyderabad|      July|
+---+---------------+---+-----------+---------+----------+



In [0]:
employee_df.withColumn('day_name', date_format('joined_date', 'E')).show()

+---+---------------+---+-----------+---------+--------+
| id|  employee_name|age|joined_date| location|day_name|
+---+---------------+---+-----------+---------+--------+
|101|   Aarav Sharma| 28| 2021-06-15|    Delhi|     Tue|
|102|     Meera Iyer| 32| 2019-09-23|Bangalore|     Mon|
|103|    Rahul Singh| 26| 2022-01-10|   Mumbai|     Mon|
|104|   Anjali Verma| 35| 2018-03-05|Hyderabad|     Mon|
|105|Siddharth Mehta| 29| 2020-11-17|     Pune|     Tue|
|106|    Neha Kapoor| 31| 2017-12-01|    Delhi|     Fri|
|107|     Varun Nair| 27| 2023-02-20|  Chennai|     Mon|
|108|Ishita Malhotra| 30| 2021-04-11|Bangalore|     Sun|
|109|    Kunal Desai| 34| 2016-08-09|   Mumbai|     Tue|
|110|    Divya Reddy| 25| 2023-07-01|Hyderabad|     Sat|
+---+---------------+---+-----------+---------+--------+



--------------------------------------------------Perform explode and sequence operations--------------------------------------------------

In [0]:
restaurant_df = spark.read.format("json")\
    .option("header", "true")\
    .option("inferschema", "false")\
    .option("mode", "PERMISSIVE")\
    .load("/FileStore/tables/resturant_json_data.json")

restaurant_df.show()

+----+-------+--------------------+-------------+-------------+-------------+------+
|code|message|         restaurants|results_found|results_shown|results_start|status|
+----+-------+--------------------+-------------+-------------+-------------+------+
|null|   null|                  []|            0|            0|            1|  null|
|null|   null|[{{{17066603}, b9...|         6835|           20|            1|  null|
|null|   null|                  []|            0|            0|            1|  null|
|null|   null|                  []|            0|            0|            1|  null|
|null|   null|[{{{17093124}, b9...|         8680|           20|            1|  null|
|null|   null|                  []|            0|            0|            1|  null|
|null|   null|                  []|            0|            0|            1|  null|
|null|   null|[{{{17580142}, b9...|          943|           20|            1|  null|
|null|   null|                  []|            0|            0|  

In [0]:
restaurant_df.printSchema()

root
 |-- code: long (nullable = true)
 |-- message: string (nullable = true)
 |-- restaurants: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- restaurant: struct (nullable = true)
 |    |    |    |-- R: struct (nullable = true)
 |    |    |    |    |-- res_id: long (nullable = true)
 |    |    |    |-- apikey: string (nullable = true)
 |    |    |    |-- average_cost_for_two: long (nullable = true)
 |    |    |    |-- cuisines: string (nullable = true)
 |    |    |    |-- currency: string (nullable = true)
 |    |    |    |-- deeplink: string (nullable = true)
 |    |    |    |-- establishment_types: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- events_url: string (nullable = true)
 |    |    |    |-- featured_image: string (nullable = true)
 |    |    |    |-- has_online_delivery: long (nullable = true)
 |    |    |    |-- has_table_booking: long (nullable = true)
 |    |    |    |-- i

In [0]:
restaurant_df.select("*", explode("restaurants").alias("new_restaurants")).drop("restaurants").printSchema()

root
 |-- code: long (nullable = true)
 |-- message: string (nullable = true)
 |-- results_found: long (nullable = true)
 |-- results_shown: long (nullable = true)
 |-- results_start: string (nullable = true)
 |-- status: string (nullable = true)
 |-- new_restaurants: struct (nullable = true)
 |    |-- restaurant: struct (nullable = true)
 |    |    |-- R: struct (nullable = true)
 |    |    |    |-- res_id: long (nullable = true)
 |    |    |-- apikey: string (nullable = true)
 |    |    |-- average_cost_for_two: long (nullable = true)
 |    |    |-- cuisines: string (nullable = true)
 |    |    |-- currency: string (nullable = true)
 |    |    |-- deeplink: string (nullable = true)
 |    |    |-- establishment_types: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- events_url: string (nullable = true)
 |    |    |-- featured_image: string (nullable = true)
 |    |    |-- has_online_delivery: long (nullable = true)
 |    |    |-- has_tab

In [0]:
restaurant_df.select("*", explode("restaurants").alias("new_restaurants")).drop("restaurants")\
    .select("*","new_restaurants.restaurant.R.res_id").show()

+----+-------+-------------+-------------+-------------+------+--------------------+--------+
|code|message|results_found|results_shown|results_start|status|     new_restaurants|  res_id|
+----+-------+-------------+-------------+-------------+------+--------------------+--------+
|null|   null|         6835|           20|            1|  null|{{{17066603}, b90...|17066603|
|null|   null|         6835|           20|            1|  null|{{{17059541}, b90...|17059541|
|null|   null|         6835|           20|            1|  null|{{{17064405}, b90...|17064405|
|null|   null|         6835|           20|            1|  null|{{{17057797}, b90...|17057797|
|null|   null|         6835|           20|            1|  null|{{{17057591}, b90...|17057591|
|null|   null|         6835|           20|            1|  null|{{{17064266}, b90...|17064266|
|null|   null|         6835|           20|            1|  null|{{{17060516}, b90...|17060516|
|null|   null|         6835|           20|            1|  nu

In [0]:
restaurant_df.select("*", explode("restaurants").alias("new_restaurants")).drop("restaurants")\
    .select("*","new_restaurants.restaurant.R.res_id", explode("new_restaurants.restaurant.establishment_types")\
        .alias("new_establishment_type")).printSchema()

root
 |-- code: long (nullable = true)
 |-- message: string (nullable = true)
 |-- results_found: long (nullable = true)
 |-- results_shown: long (nullable = true)
 |-- results_start: string (nullable = true)
 |-- status: string (nullable = true)
 |-- new_restaurants: struct (nullable = true)
 |    |-- restaurant: struct (nullable = true)
 |    |    |-- R: struct (nullable = true)
 |    |    |    |-- res_id: long (nullable = true)
 |    |    |-- apikey: string (nullable = true)
 |    |    |-- average_cost_for_two: long (nullable = true)
 |    |    |-- cuisines: string (nullable = true)
 |    |    |-- currency: string (nullable = true)
 |    |    |-- deeplink: string (nullable = true)
 |    |    |-- establishment_types: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- events_url: string (nullable = true)
 |    |    |-- featured_image: string (nullable = true)
 |    |    |-- has_online_delivery: long (nullable = true)
 |    |    |-- has_tab

In [0]:
restaurant_df.select("*", explode("restaurants").alias("new_restaurants")).drop("restaurants")\
    .select("*","new_restaurants.restaurant.R.res_id", explode("new_restaurants.restaurant.establishment_types")\
        .alias("new_establishment_type"), 'new_restaurants.restaurant.name', 'new_restaurants.restaurant.location'\
            ).drop("new_restaurants").printSchema()

root
 |-- code: long (nullable = true)
 |-- message: string (nullable = true)
 |-- results_found: long (nullable = true)
 |-- results_shown: long (nullable = true)
 |-- results_start: string (nullable = true)
 |-- status: string (nullable = true)
 |-- res_id: long (nullable = true)
 |-- new_establishment_type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: struct (nullable = true)
 |    |-- address: string (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- city_id: long (nullable = true)
 |    |-- country_id: long (nullable = true)
 |    |-- latitude: string (nullable = true)
 |    |-- locality: string (nullable = true)
 |    |-- locality_verbose: string (nullable = true)
 |    |-- longitude: string (nullable = true)
 |    |-- zipcode: string (nullable = true)



In [0]:
restaurant_df.select("*", explode("restaurants").alias("new_restaurants")).drop("restaurants")\
    .select("*","new_restaurants.restaurant.R.res_id", explode_outer("new_restaurants.restaurant.establishment_types")\
        .alias("new_establishment_type"), 'new_restaurants.restaurant.name'\
            ).drop("new_restaurants", "code","message", "results_found", "results_start", "status", "results_shown").show(truncate=False)

+--------+----------------------+------------------------------------+
|res_id  |new_establishment_type|name                                |
+--------+----------------------+------------------------------------+
|17066603|null                  |The Coop                            |
|17059541|null                  |Maggiano's Little Italy             |
|17064405|null                  |Tako Cheena by Pom Pom              |
|17057797|null                  |Bosphorous Turkish Cuisine          |
|17057591|null                  |Bahama Breeze Island Grille         |
|17064266|null                  |Hawkers Asian Street Fare           |
|17060516|null                  |Seasons 52 Fresh Grill              |
|17060320|null                  |Raglan Road Irish Pub and Restaurant|
|17059060|null                  |Hillstone                           |
|17059012|null                  |Hollerbach's Willow Tree Café       |
|17060869|null                  |Texas de Brazil                     |
|17061

In [0]:
restaurant_df.select("*", explode("restaurants").alias("new_restaurants")).drop("restaurants")\
    .select("*","new_restaurants.restaurant.R.res_id", \
                explode_outer("new_restaurants.restaurant.establishment_types").alias("new_establishment_type"),\
                'new_restaurants.restaurant.name', \
                'new_restaurants.restaurant.location',\
                'new_restaurants.restaurant.location.address', \
                'new_restaurants.restaurant.location.city', \
                'new_restaurants.restaurant.location.zipcode',\
                'new_restaurants.restaurant.price_range', \
                'new_restaurants.restaurant.url', \
                'new_restaurants.restaurant.average_cost_for_two',\
                'new_restaurants.restaurant.cuisines', \
                'new_restaurants.restaurant.has_online_delivery', \
                'new_restaurants.restaurant.has_table_booking'
            ).drop("new_restaurants", "code","message", "results_found", "results_start", "status", "results_shown").show(truncate=False)

+--------+----------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------+-------+-------+-----------+-----------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+---------------------------------+-------------------+-----------------+
|res_id  |new_establishment_type|name                                |location                                                                                                                                                                  |address                                             |city   |zipcode|price_range|url                                                                                                             

In [0]:
df1 = restaurant_df.select("*", explode("restaurants").alias("new_restaurants")).drop("restaurants")

df2 = df1.select("*", 
    explode_outer("new_restaurants.restaurant.establishment_types").alias("new_establishment_type")
)
df3 = df2.withColumn(
    "location",
    struct(
        col("new_restaurants.restaurant.location.address").alias("address"),
        col("new_restaurants.restaurant.location.city").alias("city"),
        col("new_restaurants.restaurant.location.zipcode").alias("zipcode")
    )
)

df4 = df3.select("*",
    explode_outer("new_restaurants.restaurant.offers").alias("offers"),
    'new_restaurants.restaurant.R.res_id',
    'new_restaurants.restaurant.name',
    'new_restaurants.restaurant.price_range',
    'new_restaurants.restaurant.url',
    'new_restaurants.restaurant.average_cost_for_two',
    'new_restaurants.restaurant.cuisines',
    'new_restaurants.restaurant.has_online_delivery',
    'new_restaurants.restaurant.has_table_booking'
).drop("new_restaurants", "code", "message", "results_found", "results_start", "status", "results_shown", "city_id")

df4.show(truncate=False)


+----------------------+----------------------------------------------------------------------+------+--------+------------------------------------+-----------+-----------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+---------------------------------+-------------------+-----------------+
|new_establishment_type|location                                                              |offers|res_id  |name                                |price_range|url                                                                                                                                                  |average_cost_for_two|cuisines                         |has_online_delivery|has_table_booking|
+----------------------+----------------------------------------------------------------------+------+--------+------------------------------------+-----------+--------------------------------

--------------------------------------------------Apply different conditions on exploded dataframe--------------------------------------------------

In [0]:
df4.printSchema()

root
 |-- new_establishment_type: string (nullable = true)
 |-- location: struct (nullable = false)
 |    |-- address: string (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- zipcode: string (nullable = true)
 |-- offers: string (nullable = true)
 |-- res_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- price_range: long (nullable = true)
 |-- url: string (nullable = true)
 |-- average_cost_for_two: long (nullable = true)
 |-- cuisines: string (nullable = true)
 |-- has_online_delivery: long (nullable = true)
 |-- has_table_booking: long (nullable = true)



In [0]:
df4.select('location').show(truncate=False)

+----------------------------------------------------------------------+
|location                                                              |
+----------------------------------------------------------------------+
|{610 W Morse Boulevard, Winter Park, FL 32789, Orlando, 32789}        |
|{9101 International Drive,Orlando, FL 32819, Orlando, 32819}          |
|{932 North Mills Avenue, Orlando, FL 32803, Orlando, 32803}           |
|{108 S Park Ave, Winter Park, FL 32789, Orlando, 32789}               |
|{8849 International Drive, Orlando, FL 32819, Orlando, 32819}         |
|{1103 N Mills Avenue, Orlando, FL 32803, Orlando, 32803}              |
|{7700 West Sand Lake Road, Orlando, FL 32819, Orlando, 32819}         |
|{1640 E Buena Vista Drive, Lake Buena Vista, FL 32830, Orlando, 32830}|
|{215 South Orlando Avenue, Winter Park, FL 32789, Orlando, 32789}     |
|{205 East 1st Street, Sanford, FL 32771, Orlando, 32771}              |
|{5259 International Drive, Orlando, FL 32819, Orla

In [0]:
df4.filter(df4['location']['city'] == "Orlando").show()


+----------------------+--------------------+------+--------+--------------------+-----------+--------------------+--------------------+--------------------+-------------------+-----------------+
|new_establishment_type|            location|offers|  res_id|                name|price_range|                 url|average_cost_for_two|            cuisines|has_online_delivery|has_table_booking|
+----------------------+--------------------+------+--------+--------------------+-----------+--------------------+--------------------+--------------------+-------------------+-----------------+
|                  null|{610 W Morse Boul...|  null|17066603|            The Coop|          2|https://www.zomat...|                  25|Southern, Cajun, ...|                  0|                0|
|                  null|{9101 Internation...|  null|17059541|Maggiano's Little...|          4|https://www.zomat...|                  50|             Italian|                  0|                0|
|                  n

In [0]:
df4.select("name", "location", "price_range", 'cuisines', 'has_table_booking', 'average_cost_for_two'\
    ).filter(df4['name'] == "Café Tu Tu Tango").show()


+----------------+--------------------+-----------+-------------------+-----------------+--------------------+
|            name|            location|price_range|           cuisines|has_table_booking|average_cost_for_two|
+----------------+--------------------+-----------+-------------------+-----------------+--------------------+
|Café Tu Tu Tango|{8625 Internation...|          3|New American, Tapas|                0|                  40|
+----------------+--------------------+-----------+-------------------+-----------------+--------------------+



--------------------------------------------------Retrieve month and year from date--------------------------------------------------

In [0]:
employee_df.show()

+---+---------------+---+-----------+---------+
| id|  employee_name|age|joined_date| location|
+---+---------------+---+-----------+---------+
|101|   Aarav Sharma| 28| 2021-06-15|    Delhi|
|102|     Meera Iyer| 32| 2019-09-23|Bangalore|
|103|    Rahul Singh| 26| 2022-01-10|   Mumbai|
|104|   Anjali Verma| 35| 2018-03-05|Hyderabad|
|105|Siddharth Mehta| 29| 2020-11-17|     Pune|
|106|    Neha Kapoor| 31| 2017-12-01|    Delhi|
|107|     Varun Nair| 27| 2023-02-20|  Chennai|
|108|Ishita Malhotra| 30| 2021-04-11|Bangalore|
|109|    Kunal Desai| 34| 2016-08-09|   Mumbai|
|110|    Divya Reddy| 25| 2023-07-01|Hyderabad|
+---+---------------+---+-----------+---------+



In [0]:
employee_df.withColumn('year', year(employee_df['joined_date']))\
    .withColumn('month', month(employee_df['joined_date']))\
    .withColumn('date', dayofmonth(employee_df['joined_date'])).show()

+---+---------------+---+-----------+---------+----+-----+----+
| id|  employee_name|age|joined_date| location|year|month|date|
+---+---------------+---+-----------+---------+----+-----+----+
|101|   Aarav Sharma| 28| 2021-06-15|    Delhi|2021|    6|  15|
|102|     Meera Iyer| 32| 2019-09-23|Bangalore|2019|    9|  23|
|103|    Rahul Singh| 26| 2022-01-10|   Mumbai|2022|    1|  10|
|104|   Anjali Verma| 35| 2018-03-05|Hyderabad|2018|    3|   5|
|105|Siddharth Mehta| 29| 2020-11-17|     Pune|2020|   11|  17|
|106|    Neha Kapoor| 31| 2017-12-01|    Delhi|2017|   12|   1|
|107|     Varun Nair| 27| 2023-02-20|  Chennai|2023|    2|  20|
|108|Ishita Malhotra| 30| 2021-04-11|Bangalore|2021|    4|  11|
|109|    Kunal Desai| 34| 2016-08-09|   Mumbai|2016|    8|   9|
|110|    Divya Reddy| 25| 2023-07-01|Hyderabad|2023|    7|   1|
+---+---------------+---+-----------+---------+----+-----+----+



--------------------------------------------------Filter data--------------------------------------------------

In [0]:
employee_df.select("employee_name", "age", "location").filter(employee_df['age'] > 28).show()

+---------------+---+---------+
|  employee_name|age| location|
+---------------+---+---------+
|   Aarav Sharma| 28|    Delhi|
|     Meera Iyer| 32|Bangalore|
|    Rahul Singh| 26|   Mumbai|
|   Anjali Verma| 35|Hyderabad|
|Siddharth Mehta| 29|     Pune|
|    Neha Kapoor| 31|    Delhi|
|     Varun Nair| 27|  Chennai|
|Ishita Malhotra| 30|Bangalore|
|    Kunal Desai| 34|   Mumbai|
+---------------+---+---------+



In [0]:
employee_df.select("employee_name", "age", "location").filter(employee_df['age'] <= 28).show()

+-------------+---+---------+
|employee_name|age| location|
+-------------+---+---------+
| Aarav Sharma| 28|    Delhi|
|  Rahul Singh| 26|   Mumbai|
|   Varun Nair| 27|  Chennai|
|  Divya Reddy| 25|Hyderabad|
+-------------+---+---------+



In [0]:
employee_df.select("employee_name", "age", "location").where(employee_df['location'] == 'Mumbai').show()

+-------------+---+--------+
|employee_name|age|location|
+-------------+---+--------+
|  Rahul Singh| 26|  Mumbai|
|  Kunal Desai| 34|  Mumbai|
+-------------+---+--------+



In [0]:
employee_df.select("employee_name", "age", "location").where(employee_df['location'] != 'Mumbai').show()

+---------------+---+---------+
|  employee_name|age| location|
+---------------+---+---------+
|   Aarav Sharma| 28|    Delhi|
|     Meera Iyer| 32|Bangalore|
|   Anjali Verma| 35|Hyderabad|
|Siddharth Mehta| 29|     Pune|
|    Neha Kapoor| 31|    Delhi|
|     Varun Nair| 27|  Chennai|
|Ishita Malhotra| 30|Bangalore|
|    Divya Reddy| 25|Hyderabad|
+---------------+---+---------+



------------------------Add (n) number of months  and (n) number of days to ficen conditional date-----------------------------------

In [0]:
data = [
    (101, "2025-01-01", 2, 10),
    (102, "2025-02-15", 1, 5),
    (103, "2025-03-20", 0, 0),
    (104, "2025-01-30", -3, -20)
]
schema = ["id", "joined_date", "months_to_add", "days_to_add"]
df = spark.createDataFrame(data=data, schema=schema)

df.withColumn("final_date", date_add(add_months(col("joined_date"), \
    col("months_to_add").cast("int")), \
        col("days_to_add").cast("int"))).show()


+---+-----------+-------------+-----------+----------+
| id|joined_date|months_to_add|days_to_add|final_date|
+---+-----------+-------------+-----------+----------+
|101| 2025-01-01|            2|         10|2025-03-11|
|102| 2025-02-15|            1|          5|2025-03-20|
|103| 2025-03-20|            0|          0|2025-03-20|
|104| 2025-01-30|           -3|        -20|2024-10-10|
+---+-----------+-------------+-----------+----------+



In [0]:
df.printSchema()

root
 |-- id: long (nullable = true)
 |-- joined_date: string (nullable = true)
 |-- months_to_add: long (nullable = true)
 |-- days_to_add: long (nullable = true)



--------------------------------------------------Access a variable from one notebook to another--------------------------------------------------

In [0]:
employee_df.createOrReplaceTempView("employee_table")

In [0]:
spark.sql("""
        select * from employee_table
        """).show()

+---+---------------+---+-----------+---------+
| id|  employee_name|age|joined_date| location|
+---+---------------+---+-----------+---------+
|101|   Aarav Sharma| 28| 2021-06-15|    Delhi|
|102|     Meera Iyer| 32| 2019-09-23|Bangalore|
|103|    Rahul Singh| 26| 2022-01-10|   Mumbai|
|104|   Anjali Verma| 35| 2018-03-05|Hyderabad|
|105|Siddharth Mehta| 29| 2020-11-17|     Pune|
|106|    Neha Kapoor| 31| 2017-12-01|    Delhi|
|107|     Varun Nair| 27| 2023-02-20|  Chennai|
|108|Ishita Malhotra| 30| 2021-04-11|Bangalore|
|109|    Kunal Desai| 34| 2016-08-09|   Mumbai|
|110|    Divya Reddy| 25| 2023-07-01|Hyderabad|
+---+---------------+---+-----------+---------+



In [0]:
%run /Users/maryvishakha.nadar@neosoftmail.com/access_notebook

Child notebook has run successfully


In [0]:
a = 10
b = 20
sum_result = add(a, b)
print(f"The sum of {a} and {b} is {sum_result}.")

name = "Elon Musk"
greeting_message = greet(name)
print(greeting_message)


The sum of 10 and 20 is 30.
Hello there, Elon Musk!


---------------------------------perform operations using python datetime and timedelta module--------------------------------------

In [0]:
from datetime import datetime, timedelta


In [0]:
stock_df = spark.read.format("csv")\
    .option("header", "true")\
        .option("infrastructure", "false")\
            .load("/FileStore/tables/ETH_1h.csv")

stock_df.show()

+----------------+------+------+------+------+------+-----------+
|            Date|Symbol|  Open|  High|   Low| Close|     Volume|
+----------------+------+------+------+------+------+-----------+
|2020-03-13 08-PM|ETHUSD|129.94|131.82|126.87|128.71| 1940673.93|
|2020-03-13 07-PM|ETHUSD|119.51|132.02| 117.1|129.94| 7579741.09|
|2020-03-13 06-PM|ETHUSD|124.47|124.85| 115.5|119.51| 4898735.81|
|2020-03-13 05-PM|ETHUSD|124.08|127.42|121.63|124.47| 2753450.92|
|2020-03-13 04-PM|ETHUSD|124.85|129.51|120.17|124.08| 4461424.71|
|2020-03-13 03-PM|ETHUSD|128.39| 128.9|116.06|124.85|  7378976.0|
|2020-03-13 02-PM|ETHUSD|134.03| 137.9| 125.5|128.39| 3733916.89|
|2020-03-13 01-PM|ETHUSD|131.35|140.95|128.99|134.03| 9582732.93|
|2020-03-13 12-PM|ETHUSD|128.93| 134.6|126.95|131.35| 3906590.52|
|2020-03-13 11-AM|ETHUSD| 132.6|133.17|126.01|128.93| 3311080.29|
|2020-03-13 10-AM|ETHUSD| 133.8|134.99| 128.9| 132.6| 3483436.48|
|2020-03-13 09-AM|ETHUSD|127.12|136.53|126.62| 133.8| 2767584.44|
|2020-03-1

In [0]:
stock_df.count()

Out[132]: 23674

* **Datetime**

In [0]:
# get todays date and time
print("Todays date and time:",datetime.now())

Todays date and time: 2025-04-30 08:51:27.830849


In [0]:
#get the date and time without timezone
print("Date without timezone:", datetime.today())

Date without timezone: 2025-04-30 08:52:19.185250


In [0]:
# Current Utc datetime
print("Current Utc datetime:", datetime.utcnow())

Current Utc datetime: 2025-04-30 08:53:06.077130


In [0]:
print("COnvert Timestamp to Date time:",datetime.fromtimestamp(1746003340))

COnvert Timestamp to Date time: 2025-04-30 08:55:40


In [0]:
print("COnvert datetime to timestamp:", datetime.timestamp(datetime.now()))

COnvert datetime to timestamp: 1746003410.28577


In [0]:
print("Parse string to datetime:", datetime.strptime("2024-06-15", "%Y-%m-%d"))

Parse string to datetime: 2024-06-15 00:00:00


In [0]:
print("convert datetime to string:", datetime.strftime(datetime.now(), "%Y-%B-%d"))

convert datetime to string: 2025-April-30


In [0]:
print("Get the date only:", datetime.date(datetime.now()))

Get the date only: 2025-04-30


In [0]:
print("Get the time only:",datetime.time(datetime.now()))

Get the time only: 09:05:20.507785


In [0]:
print("COnvert to another timezone", datetime.now().astimezone())

COnvert to another timezone 2025-04-30 09:08:06.753667+00:00


In [0]:
print("Iso calender:", datetime.now().isocalendar())

Iso calender: datetime.IsoCalendarDate(year=2025, week=18, weekday=3)


In [0]:
print("Iso Weekday:", datetime.now().isoweekday())

Iso Weekday: 3


In [0]:
print("Weekday:", datetime.now().weekday())

Weekday: 2


In [0]:
print("string type datetime:", datetime.now().ctime())

string type datetime: Wed Apr 30 09:11:23 2025


* **Timedelta**

In [0]:
# get today's date and time
now = datetime.now()
print("today's date and time:", now)

# Add 10 days
future = now + timedelta(days=10)
print("10 days later:", future)

# Subtract 5 days
past = now - timedelta(days=5)
print("5 days ago:", past)

# Add hour and minutes
updated = now + timedelta(hours=3, minutes=30)
print("After 3.5 hours:", updated)

timedelta = timedelta(hours=3)
print("Total seconds", timedelta.total_seconds())


today's date and time: 2025-04-30 09:16:14.324517
10 days later: 2025-05-10 09:16:14.324517
5 days ago: 2025-04-25 09:16:14.324517
After 3.5 hours: 2025-04-30 12:46:14.324517
Total seconds 10800.0


In [0]:
# Difference between dates

past_date = datetime(2024, 10, 10)
diff = now - past_date
print("Days since 1st october 2025:", diff.days)

Days since 1st october 2025: 202


In [0]:
#Custom date
custom_date = datetime(2025, 5, 15, 14, 30)
print("Custom date:", custom_date)


Custom date: 2025-05-15 14:30:00


In [0]:
# Format date

formatted = now.strftime("%A, %d %B %Y at %I:%M %p")
print("Formatted:", formatted)

Formatted: Wednesday, 30 April 2025 at 09:07 AM


In [0]:
from datetime import time

t = time(14, 30)
print("Timezone Info:", t.tzinfo)

Timezone Info: None


In [0]:
from datetime import time, timezone

t = time(14, 30, tzinfo=timezone.utc)
print("Timezone Info:", t.tzinfo)

Timezone Info: UTC


--------------------------------------------Retrieve Data from api------------------------

In [0]:
pip install requests

Python interpreter will be restarted.
Python interpreter will be restarted.


In [0]:
import requests
import json

In [0]:
response = requests.get(url='https://api.thecatapi.com/v1/images/0XYvRd7oD')
print(response)

<Response [200]>


In [0]:
print(response.json())

{'id': '0XYvRd7oD', 'url': 'https://cdn2.thecatapi.com/images/0XYvRd7oD.jpg', 'breeds': [{'weight': {'imperial': '7  -  10', 'metric': '3 - 5'}, 'id': 'abys', 'name': 'Abyssinian', 'cfa_url': 'http://cfa.org/Breeds/BreedsAB/Abyssinian.aspx', 'vetstreet_url': 'http://www.vetstreet.com/cats/abyssinian', 'vcahospitals_url': 'https://vcahospitals.com/know-your-pet/cat-breeds/abyssinian', 'temperament': 'Active, Energetic, Independent, Intelligent, Gentle', 'origin': 'Egypt', 'country_codes': 'EG', 'country_code': 'EG', 'description': 'The Abyssinian is easy to care for, and a joy to have in your home. They’re affectionate cats and love both people and other animals.', 'life_span': '14 - 15', 'indoor': 0, 'lap': 1, 'alt_names': '', 'adaptability': 5, 'affection_level': 5, 'child_friendly': 3, 'dog_friendly': 4, 'energy_level': 5, 'grooming': 1, 'health_issues': 2, 'intelligence': 5, 'shedding_level': 2, 'social_needs': 5, 'stranger_friendly': 5, 'vocalisation': 1, 'experimental': 0, 'hairle

In [0]:
cat_df = spark.read.format("json")\
    .option("header", "true")\
        .option("infrastructure", "true")\
            .load('/FileStore/tables/cat.json')

cat_df.show()

+--------------------+------+---------+--------------------+-----+
|              breeds|height|       id|                 url|width|
+--------------------+------+---------+--------------------+-----+
|[{5, 5, , http://...|  1445|0XYvRd7oD|https://cdn2.thec...| 1204|
+--------------------+------+---------+--------------------+-----+



In [0]:
cat_df.printSchema()

root
 |-- breeds: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- adaptability: long (nullable = true)
 |    |    |-- affection_level: long (nullable = true)
 |    |    |-- alt_names: string (nullable = true)
 |    |    |-- cfa_url: string (nullable = true)
 |    |    |-- child_friendly: long (nullable = true)
 |    |    |-- country_code: string (nullable = true)
 |    |    |-- country_codes: string (nullable = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- dog_friendly: long (nullable = true)
 |    |    |-- energy_level: long (nullable = true)
 |    |    |-- experimental: long (nullable = true)
 |    |    |-- grooming: long (nullable = true)
 |    |    |-- hairless: long (nullable = true)
 |    |    |-- health_issues: long (nullable = true)
 |    |    |-- hypoallergenic: long (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- indoor: long (nullable = true)
 |    |    |-- intelligence: long (nul

In [0]:
cat_df.select("*", explode("breeds").alias("breed")).drop("breeds").printSchema()

root
 |-- height: long (nullable = true)
 |-- id: string (nullable = true)
 |-- url: string (nullable = true)
 |-- width: long (nullable = true)
 |-- breed: struct (nullable = true)
 |    |-- adaptability: long (nullable = true)
 |    |-- affection_level: long (nullable = true)
 |    |-- alt_names: string (nullable = true)
 |    |-- cfa_url: string (nullable = true)
 |    |-- child_friendly: long (nullable = true)
 |    |-- country_code: string (nullable = true)
 |    |-- country_codes: string (nullable = true)
 |    |-- description: string (nullable = true)
 |    |-- dog_friendly: long (nullable = true)
 |    |-- energy_level: long (nullable = true)
 |    |-- experimental: long (nullable = true)
 |    |-- grooming: long (nullable = true)
 |    |-- hairless: long (nullable = true)
 |    |-- health_issues: long (nullable = true)
 |    |-- hypoallergenic: long (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- indoor: long (nullable = true)
 |    |-- intelligence: long (n

In [0]:
cat_df.select("*", explode("breeds").alias("breed"),\
    'breed.description', 'breed.health_issues', 'breed.child_friendly', 'breed.alt_names'\
    ).drop("breeds").show()

+------+---------+--------------------+-----+--------------------+--------------------+-------------+--------------+---------+
|height|       id|                 url|width|               breed|         description|health_issues|child_friendly|alt_names|
+------+---------+--------------------+-----+--------------------+--------------------+-------------+--------------+---------+
|  1445|0XYvRd7oD|https://cdn2.thec...| 1204|{5, 5, , http://c...|The Abyssinian is...|            2|             3|         |
+------+---------+--------------------+-----+--------------------+--------------------+-------------+--------------+---------+



-----------------------------------------------Perform all the join operations---------------------------------------------------

In [0]:
user_df = spark.read.format("csv")\
    .option("header", "true")\
        .option("infrastructure", "true")\
            .load('/FileStore/tables/Users-1.csv')

user_df.show()


+---+----------+------------+--------------------+
| id|first_name|   last_name|               email|
+---+----------+------------+--------------------+
|  1|      Cari|    Lafflina|cari.lafflina@hot...|
|  2|    Kalina|  Plackstone|kalina.plackstone...|
|  3|     Reena|      Worham|reena.worham@cox.net|
|  4|    Rockie|      Gamble|rockie.gamble@gma...|
|  5|    Granny|    Lafflina|granny.lafflina@h...|
|  6|    Jammie|      Pariso|jammie.pariso@aol...|
|  7| Clementia|     Lipyeat|clementia.lipyeat...|
|  8|    Hadria|Birdwhistell|hadria.birdwhiste...|
|  9|    Lovell|       Doyle|lovell.doyle@gmai...|
| 10|     Tripp|     Stockau|tripp.stockau@hot...|
| 11|   Brandtr|    Habeshaw|brandtr.habeshaw@...|
| 12|     Rozet|     Baughen|rozet.baughen@wan...|
| 13|   Eveline|      Yukhin|eveline.yukhin@ao...|
| 14|     Letty|     Laborde|letty.laborde@yah...|
| 15|    Audrie|      Menpes|audrie.menpes@yah...|
| 16|   Lilllie|       Rozet|lilllie.rozet@gma...|
| 17|   Janelle|      Farrin|ja

In [0]:
product_df = spark.read.format("csv")\
    .option("header", "true")\
        .option("infrastructure", "true")\
            .load('/FileStore/tables/Products.csv')

product_df.show()


+---+--------------------+-----+-----------+
| id|                name|price|category_id|
+---+--------------------+-----+-----------+
|  1|      Wireless Mouse|  499|          5|
|  2|Bluetooth Headphones|  188|          8|
|  3|    Smartphone Stand|  498|          6|
|  4|     Fitness Tracker|   54|         16|
|  5|       LED Desk Lamp|  226|          2|
|  6| Mechanical Keyboard| 3599|         18|
|  7|        Portable SSD|  150|         17|
|  8|          Smartwatch| 4599|         11|
|  9|        Coffee Maker|  999|          3|
| 10|            Yoga Mat|  442|         12|
| 11|     Electric Kettle|  236|         20|
| 12|     Gaming Mousepad|  439|         10|
| 13|        Laptop Stand|  420|         14|
| 14|           Air Fryer| 1000|          4|
| 15|    Wireless Charger|  978|         19|
| 16|          Multi-Tool|  475|          7|
| 17|            Backpack|  324|          1|
| 18|     Scented Candles|  536|         13|
| 19|     Pressure Cooker|  589|         15|
| 20|     

In [0]:
categories_df = spark.read.format("csv")\
    .option("header", "true")\
        .option("infrastructure", "true")\
            .load('/FileStore/tables/Categories.csv')

categories_df.show()


+---+-------------------+
| id|               name|
+---+-------------------+
|  1|        Electronics|
|  2|           Clothing|
|  3|          Groceries|
|  4|         Home Goods|
|  5|    Beauty Products|
|  6|               Toys|
|  7|   Sports Equipment|
|  8|              Books|
|  9|            Jewelry|
| 10|       Pet Supplies|
| 11|         Automotive|
| 12|Health and Wellness|
| 13|           Footwear|
| 14|    Office Supplies|
| 15| Garden and Outdoor|
| 16|      Baby Products|
| 17|  Music Instruments|
| 18|       Art Supplies|
| 19| Travel Accessories|
| 20|      Personal Care|
+---+-------------------+



In [0]:
order_df = spark.read.format("csv")\
    .option("header", "true")\
        .option("infrastructure", "true")\
            .load('/FileStore/tables/Orders.csv')

order_df.show()


+-------+---+----------+--------+----------+
|user_id| id|product_id|quantity|order_date|
+-------+---+----------+--------+----------+
|     16| 92|       100|       4|2024-06-21|
|      2| 82|        88|       9|2025-03-12|
|     50| 72|        35|      50|2024-12-30|
|     46| 65|        38|      17|2024-12-28|
|      4| 60|        69|       6|2024-05-19|
|      5| 47|        15|       5|2024-11-10|
|      1| 15|        71|       9|2024-09-05|
|     47| 68|         3|       8|2025-03-24|
|     21| 63|        74|       8|2024-10-19|
|     37| 59|        14|       8|2024-07-25|
|     20| 38|        76|      14|2025-02-20|
|     27| 35|         9|       8|2025-01-16|
|     29| 24|        16|       8|2024-05-11|
|     22| 23|        78|       8|2024-05-07|
|     36| 12|        79|       8|2025-01-08|
|     18| 95|        84|      27|2024-10-02|
|      9| 94|        99|       6|2024-11-08|
|     34| 90|        25|       7|2024-10-09|
|     22| 81|        39|       7|2024-10-25|
|     20| 

* **Inner Join**

In [0]:
# Get all users who have placed at least one order
user_df.join(order_df, on="id", how="inner").show()


+---+----------+---------+--------------------+-------+----------+--------+----------+
| id|first_name|last_name|               email|user_id|product_id|quantity|order_date|
+---+----------+---------+--------------------+-------+----------+--------+----------+
| 47|      Livy|Yakunikov|livy.yakunikov@ya...|      5|        15|       5|2024-11-10|
| 15|    Audrie|   Menpes|audrie.menpes@yah...|      1|        71|       9|2024-09-05|
| 38|    Vonnie|  Pawelke|vonnie.pawelke@gm...|     20|        76|      14|2025-02-20|
| 35|       Kim|   Linney|kim.linney@facebo...|     27|         9|       8|2025-01-16|
| 24|     Trudy| Dodworth|trudy.dodworth@gm...|     29|        16|       8|2024-05-11|
| 23|    Homere|  Calcott|homere.calcott@ho...|     22|        78|       8|2024-05-07|
| 12|     Rozet|  Baughen|rozet.baughen@wan...|     36|        79|       8|2025-01-08|
| 44|  Meredith|   Astill|meredith.astill@w...|     21|        27|       7|2024-05-25|
| 39|     Riane|    Rozet|riane.cossem@oran

In [0]:
# List products that have a valid category assigned
product_df.join(categories_df, on="id", how="inner").show()


+---+--------------------+-----+-----------+-------------------+
| id|                name|price|category_id|               name|
+---+--------------------+-----+-----------+-------------------+
|  1|      Wireless Mouse|  499|          5|        Electronics|
|  2|Bluetooth Headphones|  188|          8|           Clothing|
|  3|    Smartphone Stand|  498|          6|          Groceries|
|  4|     Fitness Tracker|   54|         16|         Home Goods|
|  5|       LED Desk Lamp|  226|          2|    Beauty Products|
|  6| Mechanical Keyboard| 3599|         18|               Toys|
|  7|        Portable SSD|  150|         17|   Sports Equipment|
|  8|          Smartwatch| 4599|         11|              Books|
|  9|        Coffee Maker|  999|          3|            Jewelry|
| 10|            Yoga Mat|  442|         12|       Pet Supplies|
| 11|     Electric Kettle|  236|         20|         Automotive|
| 12|     Gaming Mousepad|  439|         10|Health and Wellness|
| 13|        Laptop Stand

* **Left Join**

In [0]:
# List all users and their orders
user_df.join(order_df, on="id", how="left").show()


+---+----------+------------+--------------------+-------+----------+--------+----------+
| id|first_name|   last_name|               email|user_id|product_id|quantity|order_date|
+---+----------+------------+--------------------+-------+----------+--------+----------+
|  1|      Cari|    Lafflina|cari.lafflina@hot...|     42|        37|       3|2024-06-08|
|  2|    Kalina|  Plackstone|kalina.plackstone...|     49|        68|       4|2025-02-20|
|  3|     Reena|      Worham|reena.worham@cox.net|     13|        54|      12|2024-07-12|
|  4|    Rockie|      Gamble|rockie.gamble@gma...|     43|        43|      34|2024-10-09|
|  5|    Granny|    Lafflina|granny.lafflina@h...|      7|        28|      14|2024-05-17|
|  6|    Jammie|      Pariso|jammie.pariso@aol...|     44|        81|       6|2025-04-06|
|  7| Clementia|     Lipyeat|clementia.lipyeat...|      4|        44|       2|2024-10-15|
|  8|    Hadria|Birdwhistell|hadria.birdwhiste...|     18|        20|       4|2025-02-10|
|  9|    L

In [0]:
# Show all products with category details (even uncategorized ones)

product_df.join(categories_df, on="id", how="left").show()


+---+--------------------+-----+-----------+-------------------+
| id|                name|price|category_id|               name|
+---+--------------------+-----+-----------+-------------------+
|  1|      Wireless Mouse|  499|          5|        Electronics|
|  2|Bluetooth Headphones|  188|          8|           Clothing|
|  3|    Smartphone Stand|  498|          6|          Groceries|
|  4|     Fitness Tracker|   54|         16|         Home Goods|
|  5|       LED Desk Lamp|  226|          2|    Beauty Products|
|  6| Mechanical Keyboard| 3599|         18|               Toys|
|  7|        Portable SSD|  150|         17|   Sports Equipment|
|  8|          Smartwatch| 4599|         11|              Books|
|  9|        Coffee Maker|  999|          3|            Jewelry|
| 10|            Yoga Mat|  442|         12|       Pet Supplies|
| 11|     Electric Kettle|  236|         20|         Automotive|
| 12|     Gaming Mousepad|  439|         10|Health and Wellness|
| 13|        Laptop Stand

* **Right Join**

In [0]:
# Show all categories and the products under them (even if no product)

product_df.join(categories_df, on="id", how="right").show()


+---+--------------------+-----+-----------+-------------------+
| id|                name|price|category_id|               name|
+---+--------------------+-----+-----------+-------------------+
|  1|      Wireless Mouse|  499|          5|        Electronics|
|  2|Bluetooth Headphones|  188|          8|           Clothing|
|  3|    Smartphone Stand|  498|          6|          Groceries|
|  4|     Fitness Tracker|   54|         16|         Home Goods|
|  5|       LED Desk Lamp|  226|          2|    Beauty Products|
|  6| Mechanical Keyboard| 3599|         18|               Toys|
|  7|        Portable SSD|  150|         17|   Sports Equipment|
|  8|          Smartwatch| 4599|         11|              Books|
|  9|        Coffee Maker|  999|          3|            Jewelry|
| 10|            Yoga Mat|  442|         12|       Pet Supplies|
| 11|     Electric Kettle|  236|         20|         Automotive|
| 12|     Gaming Mousepad|  439|         10|Health and Wellness|
| 13|        Laptop Stand

In [0]:
#Get a master list of users and orders, even if user/order is missing

user_df.join(order_df, on="id", how="outer").show()


+---+----------+----------+--------------------+-------+----------+--------+----------+
| id|first_name| last_name|               email|user_id|product_id|quantity|order_date|
+---+----------+----------+--------------------+-------+----------+--------+----------+
|  1|      Cari|  Lafflina|cari.lafflina@hot...|     42|        37|       3|2024-06-08|
| 10|     Tripp|   Stockau|tripp.stockau@hot...|     23|        82|       6|2025-01-31|
|100|      null|      null|                null|     28|        92|       1|2024-04-28|
| 11|   Brandtr|  Habeshaw|brandtr.habeshaw@...|     39|        10|       5|2024-12-10|
| 12|     Rozet|   Baughen|rozet.baughen@wan...|     36|        79|       8|2025-01-08|
| 13|   Eveline|    Yukhin|eveline.yukhin@ao...|     35|        85|       3|2024-07-16|
| 14|     Letty|   Laborde|letty.laborde@yah...|     28|        96|       2|2024-06-25|
| 15|    Audrie|    Menpes|audrie.menpes@yah...|      1|        71|       9|2024-09-05|
| 16|   Lilllie|     Rozet|lilll

* **Left Anti Join**

In [0]:
#Find users who have NEVER placed an order

user_df.join(order_df, on="id", how="left_anti").show()


+---+----------+---------+-----+
| id|first_name|last_name|email|
+---+----------+---------+-----+
+---+----------+---------+-----+



* **Cross Join**

In [0]:
# Generate all possible combinations of categories and products

categories_df.crossJoin(product_df).show()


+---+-------------------+---+--------------+-----+-----------+
| id|               name| id|          name|price|category_id|
+---+-------------------+---+--------------+-----+-----------+
|  1|        Electronics|  1|Wireless Mouse|  499|          5|
|  2|           Clothing|  1|Wireless Mouse|  499|          5|
|  3|          Groceries|  1|Wireless Mouse|  499|          5|
|  4|         Home Goods|  1|Wireless Mouse|  499|          5|
|  5|    Beauty Products|  1|Wireless Mouse|  499|          5|
|  6|               Toys|  1|Wireless Mouse|  499|          5|
|  7|   Sports Equipment|  1|Wireless Mouse|  499|          5|
|  8|              Books|  1|Wireless Mouse|  499|          5|
|  9|            Jewelry|  1|Wireless Mouse|  499|          5|
| 10|       Pet Supplies|  1|Wireless Mouse|  499|          5|
| 11|         Automotive|  1|Wireless Mouse|  499|          5|
| 12|Health and Wellness|  1|Wireless Mouse|  499|          5|
| 13|           Footwear|  1|Wireless Mouse|  499|     

------------------------------------------------------ Retrieve current month name---------------------------------------

In [0]:
from datetime import datetime

current_month_name = datetime.now().strftime("%B")
print("Current month name:", current_month_name)

Current month name: April


------------------------------------------- Use partioning for big files-----------------------------------------------

In [0]:
emp_df = spark.read.format("csv")\
    .option("header", "true")\
        .option("infrastructure", "false")\
            .load("/FileStore/tables/employee_write-1.csv")

emp_df.show()

+---+--------+---+------+-------+------+
| id|    name|age|salary|address|gender|
+---+--------+---+------+-------+------+
|  1|  Manish| 26| 75000|  INDIA|     m|
|  2|  Nikita| 23|100000|    USA|     f|
|  3|  Pritam| 22|150000|  INDIA|     m|
|  4|Prantosh| 17|200000|  JAPAN|     m|
|  5|  Vikash| 31|300000|    USA|     m|
|  6|   Rahul| 55|300000|  INDIA|     m|
|  7|    Raju| 67|540000|    USA|     m|
|  8| Praveen| 28| 70000|  JAPAN|     m|
|  9|     Dev| 32|150000|  JAPAN|     m|
| 10|  Sherin| 16| 25000| RUSSIA|     f|
| 11|    Ragu| 12| 35000|  INDIA|     f|
| 12|   Sweta| 43|200000|  INDIA|     f|
| 13| Raushan| 48|650000|    USA|     m|
| 14|  Mukesh| 36| 95000| RUSSIA|     m|
| 15| Prakash| 52|750000|  INDIA|     m|
+---+--------+---+------+-------+------+



In [0]:
emp_df.write.format("csv")\
    .option("header", "true")\
        .option("mode", "overwrite")\
            .option("path","/FileStore/tables/emp_address_partitions/")\
                .partitionBy("address")\
                .save()


In [0]:
dbutils.fs.ls("/FileStore/tables/emp_address_partitions/")

Out[56]: [FileInfo(path='dbfs:/FileStore/tables/emp_address_partitions/_SUCCESS', name='_SUCCESS', size=0, modificationTime=1746011143000),
 FileInfo(path='dbfs:/FileStore/tables/emp_address_partitions/address=INDIA/', name='address=INDIA/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/emp_address_partitions/address=JAPAN/', name='address=JAPAN/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/emp_address_partitions/address=RUSSIA/', name='address=RUSSIA/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/emp_address_partitions/address=USA/', name='address=USA/', size=0, modificationTime=0)]

In [0]:
marvel_character_df = spark.read.format("csv")\
    .option("header", "true")\
        .option("infrastructure", "false")\
            .option("mode", "PERMISSIVE")\
                .load("/FileStore/tables/marvel_characters_dataset.csv")

marvel_character_df.show()

+---------------+-----------------+--------------------+--------------------+--------+-----------+
|      Character|        Real Name|         Affiliation|              Powers|    Role|Power Level|
+---------------+-----------------+--------------------+--------------------+--------+-----------+
|       iron man|       Tony Stark|            Avengers|Powered Armor, Ge...|    Hero|        Low|
|captain america|     Steve Rogers|            Avengers|Super Soldier, En...|    Hero|        Low|
|           thor|     Thor Odinson|            Avengers|God of Thunder, W...|    Hero|        Low|
|    black widow| Natasha Romanoff|            Avengers|Superhuman streng...|    Hero|        Low|
|           hulk|     Bruce Banner|            Avengers|Superhuman streng...|    Hero|        Low|
|     spider-man|     Peter Parker|            Avengers|Spider sense, Wal...|    Hero|        Low|
|      wolverine|    James Howlett|               X-Men|Regeneration, Ada...|    Hero|        Low|
|       de

In [0]:
marvel_character_df.write.format("csv")\
    .option("header", "true")\
        .option("mode", "overwrite")\
            .option("path","/FileStore/tables/marvel_Affiliation_partition/")\
                .partitionBy("Affiliation")\
                .save()


In [0]:
dbutils.fs.ls("/FileStore/tables/marvel_Affiliation_partition/")

Out[59]: [FileInfo(path='dbfs:/FileStore/tables/marvel_Affiliation_partition/Affiliation=Avengers/', name='Affiliation=Avengers/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/marvel_Affiliation_partition/Affiliation=Defenders/', name='Affiliation=Defenders/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/marvel_Affiliation_partition/Affiliation=Fantastic Four/', name='Affiliation=Fantastic Four/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/marvel_Affiliation_partition/Affiliation=Guardians of the Galaxy/', name='Affiliation=Guardians of the Galaxy/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/marvel_Affiliation_partition/Affiliation=Hydra/', name='Affiliation=Hydra/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/marvel_Affiliation_partition/Affiliation=Inhumans/', name='Affiliation=Inhumans/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/marvel_A

In [0]:
dbutils.fs.ls("dbfs:/FileStore/tables/marvel_Affiliation_partition/Affiliation=Avengers/")

Out[60]: [FileInfo(path='dbfs:/FileStore/tables/marvel_Affiliation_partition/Affiliation=Avengers/_SUCCESS', name='_SUCCESS', size=0, modificationTime=1746012316000),
 FileInfo(path='dbfs:/FileStore/tables/marvel_Affiliation_partition/Affiliation=Avengers/_committed_6933422137141533648', name='_committed_6933422137141533648', size=113, modificationTime=1746012316000),
 FileInfo(path='dbfs:/FileStore/tables/marvel_Affiliation_partition/Affiliation=Avengers/_started_6933422137141533648', name='_started_6933422137141533648', size=0, modificationTime=1746012314000),
 FileInfo(path='dbfs:/FileStore/tables/marvel_Affiliation_partition/Affiliation=Avengers/part-00000-tid-6933422137141533648-09446d60-b788-41fd-9b47-aa7724c99ee8-136-1.c000.csv', name='part-00000-tid-6933422137141533648-09446d60-b788-41fd-9b47-aa7724c99ee8-136-1.c000.csv', size=1577, modificationTime=1746012314000)]

In [0]:
marvel_character_df.write.format("csv")\
    .option("header", "true")\
        .option("mode", "overwrite")\
            .option("path","/FileStore/tables/marvel_Affiliation_Powers_partition/")\
                .partitionBy("Affiliation", "Powers")\
                .save()


In [0]:
dbutils.fs.ls("/FileStore/tables/marvel_Affiliation_Powers_partition/")

Out[62]: [FileInfo(path='dbfs:/FileStore/tables/marvel_Affiliation_Powers_partition/Affiliation=Avengers/', name='Affiliation=Avengers/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/marvel_Affiliation_Powers_partition/Affiliation=Defenders/', name='Affiliation=Defenders/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/marvel_Affiliation_Powers_partition/Affiliation=Fantastic Four/', name='Affiliation=Fantastic Four/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/marvel_Affiliation_Powers_partition/Affiliation=Guardians of the Galaxy/', name='Affiliation=Guardians of the Galaxy/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/marvel_Affiliation_Powers_partition/Affiliation=Hydra/', name='Affiliation=Hydra/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/marvel_Affiliation_Powers_partition/Affiliation=Inhumans/', name='Affiliation=Inhumans/', size=0, modificationTime=0),
 File

In [0]:
dbutils.fs.ls("dbfs:/FileStore/tables/marvel_Affiliation_Powers_partition/Affiliation=Avengers/")

Out[63]: [FileInfo(path='dbfs:/FileStore/tables/marvel_Affiliation_Powers_partition/Affiliation=Avengers/Powers=Aerial combat, Shield mastery/', name='Powers=Aerial combat, Shield mastery/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/marvel_Affiliation_Powers_partition/Affiliation=Avengers/Powers=Chi manipulation, Martial arts expert/', name='Powers=Chi manipulation, Martial arts expert/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/marvel_Affiliation_Powers_partition/Affiliation=Avengers/Powers=Energy manipulation, Expert marksman/', name='Powers=Energy manipulation, Expert marksman/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/marvel_Affiliation_Powers_partition/Affiliation=Avengers/Powers=Flight, Superhuman strength, Cosmic energy/', name='Powers=Flight, Superhuman strength, Cosmic energy/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/marvel_Affiliation_Powers_partition/Affiliation=Aven

--------------------------------------------From a nested json get the keys-------------------------------------------------

In [0]:
df4.select("location.city", 'location.zipcode').show(truncate=False)

+-------+-------+
|city   |zipcode|
+-------+-------+
|Orlando|32789  |
|Orlando|32819  |
|Orlando|32803  |
|Orlando|32789  |
|Orlando|32819  |
|Orlando|32803  |
|Orlando|32819  |
|Orlando|32830  |
|Orlando|32789  |
|Orlando|32771  |
|Orlando|32819  |
|Orlando|32789  |
|Orlando|32830  |
|Orlando|32819  |
|Orlando|32792  |
|Orlando|32801  |
|Orlando|32789  |
|Orlando|32803  |
|Orlando|34786  |
|Orlando|32830  |
+-------+-------+
only showing top 20 rows



In [0]:
df4.select("name","location.city", 'location.zipcode', 'price_range', 'average_cost_for_two', 'cuisines')\
    .withColumn("is_valid", col("name").isNotNull() &
        col("city").isNotNull() &
        (col("city") != "") &
        col("zipcode").isNotNull() &
        (col("price_range") != "") &
        (col('price_range') < 40) &
        (col("cuisines") != "")
    ).show()


+--------------------+-------+-------+-----------+--------------------+--------------------+--------+
|                name|   city|zipcode|price_range|average_cost_for_two|            cuisines|is_valid|
+--------------------+-------+-------+-----------+--------------------+--------------------+--------+
|            The Coop|Orlando|  32789|          2|                  25|Southern, Cajun, ...|    null|
|Maggiano's Little...|Orlando|  32819|          4|                  50|             Italian|    null|
|Tako Cheena by Po...|Orlando|  32803|          1|                  10|Asian, Latin Amer...|    null|
|Bosphorous Turkis...|Orlando|  32789|          3|                  40|Mediterranean, Tu...|    null|
|Bahama Breeze Isl...|Orlando|  32819|          3|                  45|           Caribbean|    null|
|Hawkers Asian Str...|Orlando|  32803|          3|                  35|         Asian, Thai|    null|
|Seasons 52 Fresh ...|Orlando|  32819|          4|                  60|           

-------------------------------------- Use structype and structfield to define schema of a spark dataframe ------------------------------------

In [0]:
employee_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("salary", IntegerType(), True),
    StructField("address", StringType(), True),
    StructField("nominee", StringType(), True),
    StructField("_corrupt_record", StringType(), True),
])

employee_schema_df = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferschema", "false")\
    .option("mode", "PERMISSIVE")\
    .schema(employee_schema)\
    .load("/FileStore/tables/employee.csv")
    
employee_schema_df.show(truncate=False)

+---+--------+---+------+------------+--------+-------------------------------------------+
|id |name    |age|salary|address     |nominee |_corrupt_record                            |
+---+--------+---+------+------------+--------+-------------------------------------------+
|1  |Manish  |26 |75000 |bihar       |nominee1|null                                       |
|2  |Nikita  |23 |100000|uttarpradesh|nominee2|null                                       |
|3  |Pritam  |22 |150000|Bangalore   |India   |3,Pritam,22,150000,Bangalore,India,nominee3|
|4  |Prantosh|17 |200000|Kolkata     |India   |4,Prantosh,17,200000,Kolkata,India,nominee4|
|5  |Vikash  |31 |300000|null        |nominee5|null                                       |
+---+--------+---+------+------------+--------+-------------------------------------------+



------------------------------------------- Windows function for different cases -----------------------------------------

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import *


In [0]:
emp_d = [(1,'manish',50000,'IT','m'),
(2,'vikash',60000,'sales','m'),
(3,'raushan',70000,'marketing','m'),
(4,'mukesh',80000,'IT','m'),
(5,'priti',90000,'sales','f'),
(6,'nikita',45000,'marketing','f'),
(7,'ragini',55000,'marketing','f'),
(8,'rashi',100000,'IT','f'),
(9,'aditya',65000,'IT','m'),
(10,'rahul',50000,'marketing','m'),
(11,'rakhi',50000,'IT','f'),
(12,'akhilesh',90000,'sales','m')]

emp_s = ['id', 'name', 'salary', 'dept', 'gender']

emp_dff = spark.createDataFrame(data=emp_d, schema=emp_s)

emp_dff.show()

+---+--------+------+---------+------+
| id|    name|salary|     dept|gender|
+---+--------+------+---------+------+
|  1|  manish| 50000|       IT|     m|
|  2|  vikash| 60000|    sales|     m|
|  3| raushan| 70000|marketing|     m|
|  4|  mukesh| 80000|       IT|     m|
|  5|   priti| 90000|    sales|     f|
|  6|  nikita| 45000|marketing|     f|
|  7|  ragini| 55000|marketing|     f|
|  8|   rashi|100000|       IT|     f|
|  9|  aditya| 65000|       IT|     m|
| 10|   rahul| 50000|marketing|     m|
| 11|   rakhi| 50000|       IT|     f|
| 12|akhilesh| 90000|    sales|     m|
+---+--------+------+---------+------+



In [0]:
window = Window.partitionBy("dept")
emp_dff = emp_dff.withColumn("total_salary", sum("salary").over(window=window))\
    .withColumn("average_salary", avg("salary").over(window=window))\
        .withColumn("max_salary", max("salary").over(window=window))\
            .withColumn("min_salary", min("salary").over(window=window))

emp_dff.show()

+---+--------+------+---------+------+------------+----------+--------------+----------+----------+
| id|    name|salary|     dept|gender|total_salary|Row_number|average_salary|max_salary|min_salary|
+---+--------+------+---------+------+------------+----------+--------------+----------+----------+
|  1|  manish| 50000|       IT|     m|      345000|         1|       69000.0|    100000|     50000|
| 11|   rakhi| 50000|       IT|     f|      345000|         2|       69000.0|    100000|     50000|
|  9|  aditya| 65000|       IT|     m|      345000|         3|       69000.0|    100000|     50000|
|  4|  mukesh| 80000|       IT|     m|      345000|         4|       69000.0|    100000|     50000|
|  8|   rashi|100000|       IT|     f|      345000|         5|       69000.0|    100000|     50000|
|  6|  nikita| 45000|marketing|     f|      220000|         1|       55000.0|     70000|     45000|
| 10|   rahul| 50000|marketing|     m|      220000|         2|       55000.0|     70000|     45000|


In [0]:
window = Window.partitionBy("dept").orderBy("salary")
emp_dff = emp_dff.withColumn("Row_number", row_number().over(window=window))\
    .withColumn("Rank", rank().over(window=window))\
        .withColumn("Dense_Rank", dense_rank().over(window=window))\
                .withColumn("Percentile_Rank", percent_rank().over(window=window))

emp_dff.show()

+---+--------+------+---------+------+------------+----------+--------------+----------+----------+----+----------+------------------+
| id|    name|salary|     dept|gender|total_salary|Row_number|average_salary|max_salary|min_salary|Rank|Dense_Rank|   Percentile_Rank|
+---+--------+------+---------+------+------------+----------+--------------+----------+----------+----+----------+------------------+
|  1|  manish| 50000|       IT|     m|      345000|         1|       69000.0|    100000|     50000|   1|         1|               0.0|
| 11|   rakhi| 50000|       IT|     f|      345000|         2|       69000.0|    100000|     50000|   1|         1|               0.0|
|  9|  aditya| 65000|       IT|     m|      345000|         3|       69000.0|    100000|     50000|   3|         2|               0.5|
|  4|  mukesh| 80000|       IT|     m|      345000|         4|       69000.0|    100000|     50000|   4|         3|              0.75|
|  8|   rashi|100000|       IT|     f|      345000|    

In [0]:
window = Window.partitionBy("dept").orderBy(desc("salary"))
emp_dff = emp_dff.withColumn("Row_number", row_number().over(window=window))\
    .withColumn("Rank", rank().over(window=window))\
        .withColumn("Dense_Rank", dense_rank().over(window=window))\
                .withColumn("Percentile_Rank", percent_rank().over(window=window))\
                    .filter(emp_dff["Dense_Rank"] <= 2)

emp_dff.show()

+---+--------+------+---------+------+------------+----------+--------------+----------+----------+----+----------+---------------+
| id|    name|salary|     dept|gender|total_salary|Row_number|average_salary|max_salary|min_salary|Rank|Dense_Rank|Percentile_Rank|
+---+--------+------+---------+------+------------+----------+--------------+----------+----------+----+----------+---------------+
|  9|  aditya| 65000|       IT|     m|      345000|         1|       69000.0|    100000|     50000|   1|         1|            0.0|
|  1|  manish| 50000|       IT|     m|      345000|         2|       69000.0|    100000|     50000|   2|         2|            0.5|
| 11|   rakhi| 50000|       IT|     f|      345000|         3|       69000.0|    100000|     50000|   2|         2|            0.5|
| 10|   rahul| 50000|marketing|     m|      220000|         1|       55000.0|     70000|     45000|   1|         1|            0.0|
|  6|  nikita| 45000|marketing|     f|      220000|         2|       55000.0

In [0]:
window = Window.partitionBy("dept").orderBy(desc("salary"))
emp_dff = emp_dff.withColumn("Row_number", row_number().over(window=window))\
    .withColumn("Rank", rank().over(window=window))\
        .withColumn("Dense_Rank", dense_rank().over(window=window))\
                .withColumn("Percentile_Rank", percent_rank().over(window=window))\
                    .filter(emp_dff["Dense_Rank"] <= 2).limit(2)

emp_dff.show()

+---+------+------+----+------+------------+----------+--------------+----------+----------+----+----------+---------------+
| id|  name|salary|dept|gender|total_salary|Row_number|average_salary|max_salary|min_salary|Rank|Dense_Rank|Percentile_Rank|
+---+------+------+----+------+------------+----------+--------------+----------+----------+----+----------+---------------+
|  9|aditya| 65000|  IT|     m|      345000|         1|       69000.0|    100000|     50000|   1|         1|            0.0|
|  1|manish| 50000|  IT|     m|      345000|         2|       69000.0|    100000|     50000|   2|         2|            0.5|
+---+------+------+----+------+------------+----------+--------------+----------+----------+----+----------+---------------+



In [0]:
product_data = [
(1,"iphone","01-01-2023",1500000),
(2,"samsung","01-01-2023",1100000),
(3,"oneplus","01-01-2023",1100000),
(1,"iphone","01-02-2023",1300000),
(2,"samsung","01-02-2023",1120000),
(3,"oneplus","01-02-2023",1120000),
(1,"iphone","01-03-2023",1600000),
(2,"samsung","01-03-2023",1080000),
(3,"oneplus","01-03-2023",1160000),
(1,"iphone","01-04-2023",1700000),
(2,"samsung","01-04-2023",1800000),
(3,"oneplus","01-04-2023",1170000),
(1,"iphone","01-05-2023",1200000),
(2,"samsung","01-05-2023",980000),
(3,"oneplus","01-05-2023",1175000),
(1,"iphone","01-06-2023",1100000),
(2,"samsung","01-06-2023",1100000),
(3,"oneplus","01-06-2023",1200000)
]


product_schema = ['product_id', 'product_name', 'sales_date', 'sales']

product_df = spark.createDataFrame(data=product_data, schema=product_schema)

product_df.show()

+----------+------------+----------+-------+
|product_id|product_name|sales_date|  sales|
+----------+------------+----------+-------+
|         1|      iphone|01-01-2023|1500000|
|         2|     samsung|01-01-2023|1100000|
|         3|     oneplus|01-01-2023|1100000|
|         1|      iphone|01-02-2023|1300000|
|         2|     samsung|01-02-2023|1120000|
|         3|     oneplus|01-02-2023|1120000|
|         1|      iphone|01-03-2023|1600000|
|         2|     samsung|01-03-2023|1080000|
|         3|     oneplus|01-03-2023|1160000|
|         1|      iphone|01-04-2023|1700000|
|         2|     samsung|01-04-2023|1800000|
|         3|     oneplus|01-04-2023|1170000|
|         1|      iphone|01-05-2023|1200000|
|         2|     samsung|01-05-2023| 980000|
|         3|     oneplus|01-05-2023|1175000|
|         1|      iphone|01-06-2023|1100000|
|         2|     samsung|01-06-2023|1100000|
|         3|     oneplus|01-06-2023|1200000|
+----------+------------+----------+-------+



In [0]:
window = Window.partitionBy("product_id").orderBy(desc("sales_date"))
last_month = product_df.withColumn("lead", lead(col('sales'), 1, "No value").over(window=window))\
    .withColumn("lag", lag(col('sales'), ).over(window=window))

last_month.show()

+----------+------------+----------+-------+-------+-------+
|product_id|product_name|sales_date|  sales|   lead|    lag|
+----------+------------+----------+-------+-------+-------+
|         1|      iphone|01-06-2023|1100000|1200000|   null|
|         1|      iphone|01-05-2023|1200000|1700000|1100000|
|         1|      iphone|01-04-2023|1700000|1600000|1200000|
|         1|      iphone|01-03-2023|1600000|1300000|1700000|
|         1|      iphone|01-02-2023|1300000|1500000|1600000|
|         1|      iphone|01-01-2023|1500000|   null|1300000|
|         2|     samsung|01-06-2023|1100000| 980000|   null|
|         2|     samsung|01-05-2023| 980000|1800000|1100000|
|         2|     samsung|01-04-2023|1800000|1080000| 980000|
|         2|     samsung|01-03-2023|1080000|1120000|1800000|
|         2|     samsung|01-02-2023|1120000|1100000|1080000|
|         2|     samsung|01-01-2023|1100000|   null|1120000|
|         3|     oneplus|01-06-2023|1200000|1175000|   null|
|         3|     oneplus

In [0]:
window = Window.partitionBy("product_id").orderBy("sales_date")
last_month = product_df.withColumn("previous_month_loss", lag(col('sales'), 1, "null").over(window=window))

last_month.show()

+----------+------------+----------+-------+-------------------+
|product_id|product_name|sales_date|  sales|previous_month_loss|
+----------+------------+----------+-------+-------------------+
|         1|      iphone|01-01-2023|1500000|               null|
|         1|      iphone|01-02-2023|1300000|            1500000|
|         1|      iphone|01-03-2023|1600000|            1300000|
|         1|      iphone|01-04-2023|1700000|            1600000|
|         1|      iphone|01-05-2023|1200000|            1700000|
|         1|      iphone|01-06-2023|1100000|            1200000|
|         2|     samsung|01-01-2023|1100000|               null|
|         2|     samsung|01-02-2023|1120000|            1100000|
|         2|     samsung|01-03-2023|1080000|            1120000|
|         2|     samsung|01-04-2023|1800000|            1080000|
|         2|     samsung|01-05-2023| 980000|            1800000|
|         2|     samsung|01-06-2023|1100000|             980000|
|         3|     oneplus|

In [0]:
window = Window.partitionBy("product_id").orderBy("sales_date")
last_month = last_month.withColumn("loss_percentage",  round(((col('sales') - col('previous_month_loss'))/ col('sales'))*100))

last_month.show()

+----------+------------+----------+-------+-------------------+---------------+
|product_id|product_name|sales_date|  sales|previous_month_loss|loss_percentage|
+----------+------------+----------+-------+-------------------+---------------+
|         1|      iphone|01-01-2023|1500000|               null|           null|
|         1|      iphone|01-02-2023|1300000|            1500000|          -15.0|
|         1|      iphone|01-03-2023|1600000|            1300000|           19.0|
|         1|      iphone|01-04-2023|1700000|            1600000|            6.0|
|         1|      iphone|01-05-2023|1200000|            1700000|          -42.0|
|         1|      iphone|01-06-2023|1100000|            1200000|           -9.0|
|         2|     samsung|01-01-2023|1100000|               null|           null|
|         2|     samsung|01-02-2023|1120000|            1100000|            2.0|
|         2|     samsung|01-03-2023|1080000|            1120000|           -4.0|
|         2|     samsung|01-

In [0]:
phone_window = Window.partitionBy("product_name")

monthly_sales_df = product_df.withColumn("Total Sales",sum(col("sales")).over(phone_window))\
 .withColumn("Sales %",round((col("sales")/col("Total Sales"))*100,2))

monthly_sales_df.show()


+----------+------------+----------+-------+-----------+-------+
|product_id|product_name|sales_date|  sales|Total Sales|Sales %|
+----------+------------+----------+-------+-----------+-------+
|         1|      iphone|01-01-2023|1500000|    8400000|  17.86|
|         1|      iphone|01-02-2023|1300000|    8400000|  15.48|
|         1|      iphone|01-03-2023|1600000|    8400000|  19.05|
|         1|      iphone|01-04-2023|1700000|    8400000|  20.24|
|         1|      iphone|01-05-2023|1200000|    8400000|  14.29|
|         1|      iphone|01-06-2023|1100000|    8400000|   13.1|
|         3|     oneplus|01-01-2023|1100000|    6925000|  15.88|
|         3|     oneplus|01-02-2023|1120000|    6925000|  16.17|
|         3|     oneplus|01-03-2023|1160000|    6925000|  16.75|
|         3|     oneplus|01-04-2023|1170000|    6925000|   16.9|
|         3|     oneplus|01-05-2023|1175000|    6925000|  16.97|
|         3|     oneplus|01-06-2023|1200000|    6925000|  17.33|
|         2|     samsung|

In [0]:
window = Window.partitionBy("product_id").orderBy("sales_date").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
product_df.withColumn("first_sales", first("sales").over(window))\
    .withColumn("last_sales", last("sales").over(window)).show()

+----------+------------+----------+-------+-----------+----------+
|product_id|product_name|sales_date|  sales|first_sales|last_sales|
+----------+------------+----------+-------+-----------+----------+
|         1|      iphone|01-01-2023|1500000|    1500000|   1100000|
|         1|      iphone|01-02-2023|1300000|    1500000|   1100000|
|         1|      iphone|01-03-2023|1600000|    1500000|   1100000|
|         1|      iphone|01-04-2023|1700000|    1500000|   1100000|
|         1|      iphone|01-05-2023|1200000|    1500000|   1100000|
|         1|      iphone|01-06-2023|1100000|    1500000|   1100000|
|         2|     samsung|01-01-2023|1100000|    1100000|   1100000|
|         2|     samsung|01-02-2023|1120000|    1100000|   1100000|
|         2|     samsung|01-03-2023|1080000|    1100000|   1100000|
|         2|     samsung|01-04-2023|1800000|    1100000|   1100000|
|         2|     samsung|01-05-2023| 980000|    1100000|   1100000|
|         2|     samsung|01-06-2023|1100000|    

In [0]:
product_df.withColumn("first_sales", first("sales").over(window))\
    .withColumn("last_sales", last("sales").over(window)).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Window [product_id#126736L, product_name#126737, sales_date#126738, sales#126739L, first(sales#126739L, false) windowspecdefinition(product_id#126736L, sales_date#126738 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS first_sales#126950L, last(sales#126739L, false) windowspecdefinition(product_id#126736L, sales_date#126738 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS last_sales#126957L], [product_id#126736L], [sales_date#126738 ASC NULLS FIRST]
   +- Sort [product_id#126736L ASC NULLS FIRST, sales_date#126738 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(product_id#126736L, 200), ENSURE_REQUIREMENTS, [plan_id=5154]
         +- Scan ExistingRDD[product_id#126736L,product_name#126737,sales_date#126738,sales#126739L]




-------------------------------------------------- Use of group by--------------------------------------------------

In [0]:
product_df.groupBy("product_name").sum("sales").show()

+------------+----------+
|product_name|sum(sales)|
+------------+----------+
|      iphone|   8400000|
|     samsung|   7180000|
|     oneplus|   6925000|
+------------+----------+



In [0]:
product_df.groupBy("product_name").agg(sum("sales").alias("total_sales"), max("sales").alias("highest_sales")).show()

+------------+-----------+-------------+
|product_name|total_sales|highest_sales|
+------------+-----------+-------------+
|      iphone|    8400000|      1700000|
|     samsung|    7180000|      1800000|
|     oneplus|    6925000|      1200000|
+------------+-----------+-------------+



-------------------------------------------------- Using alias, col and more spark functions-------------------------------------------

In [0]:
product_df.select(col("product_id"), col("product_name"), col("sales")).show()

+----------+------------+-------+
|product_id|product_name|  sales|
+----------+------------+-------+
|         1|      iphone|1500000|
|         2|     samsung|1100000|
|         3|     oneplus|1100000|
|         1|      iphone|1300000|
|         2|     samsung|1120000|
|         3|     oneplus|1120000|
|         1|      iphone|1600000|
|         2|     samsung|1080000|
|         3|     oneplus|1160000|
|         1|      iphone|1700000|
|         2|     samsung|1800000|
|         3|     oneplus|1170000|
|         1|      iphone|1200000|
|         2|     samsung| 980000|
|         3|     oneplus|1175000|
|         1|      iphone|1100000|
|         2|     samsung|1100000|
|         3|     oneplus|1200000|
+----------+------------+-------+



In [0]:
product_df.select(col("product_id").alias("Id"), col("product_name").alias("Name of the product"), col("sales").alias("Price")).show()

+---+-------------------+-------+
| Id|Name of the product|  Price|
+---+-------------------+-------+
|  1|             iphone|1500000|
|  2|            samsung|1100000|
|  3|            oneplus|1100000|
|  1|             iphone|1300000|
|  2|            samsung|1120000|
|  3|            oneplus|1120000|
|  1|             iphone|1600000|
|  2|            samsung|1080000|
|  3|            oneplus|1160000|
|  1|             iphone|1700000|
|  2|            samsung|1800000|
|  3|            oneplus|1170000|
|  1|             iphone|1200000|
|  2|            samsung| 980000|
|  3|            oneplus|1175000|
|  1|             iphone|1100000|
|  2|            samsung|1100000|
|  3|            oneplus|1200000|
+---+-------------------+-------+



In [0]:
product_df.select(expr("sales + 5")).show()

+-----------+
|(sales + 5)|
+-----------+
|    1500005|
|    1100005|
|    1100005|
|    1300005|
|    1120005|
|    1120005|
|    1600005|
|    1080005|
|    1160005|
|    1700005|
|    1800005|
|    1170005|
|    1200005|
|     980005|
|    1175005|
|    1100005|
|    1100005|
|    1200005|
+-----------+



In [0]:
product_df.select(expr("product_name as name"),expr("concat(product_id, '  ' ,sales) as Id_Sales")).show()

+-------+----------+
|   name|  Id_Sales|
+-------+----------+
| iphone|1  1500000|
|samsung|2  1100000|
|oneplus|3  1100000|
| iphone|1  1300000|
|samsung|2  1120000|
|oneplus|3  1120000|
| iphone|1  1600000|
|samsung|2  1080000|
|oneplus|3  1160000|
| iphone|1  1700000|
|samsung|2  1800000|
|oneplus|3  1170000|
| iphone|1  1200000|
|samsung| 2  980000|
|oneplus|3  1175000|
| iphone|1  1100000|
|samsung|2  1100000|
|oneplus|3  1200000|
+-------+----------+



------------------------------------------------- Get the percentage difference between 2 years----------------------------------------

In [0]:
data = [
    (2021, "electronics", 85000),
    (2022, "electronics", 100000),
    (2023, "electronics", 120000),
    (2021, "fashion", 45000),
    (2022, "fashion", 60000),
    (2023, "fashion", 75000),
    (2021, "furniture", 30000),
    (2022, "furniture", 35000),
    (2023, "furniture", 40000),
]

schema = ["year", "category", "total_sales"]

stats_df = spark.createDataFrame(data, schema)

stats_df.display()


year,category,total_sales
2021,electronics,85000
2022,electronics,100000
2023,electronics,120000
2021,fashion,45000
2022,fashion,60000
2023,fashion,75000
2021,furniture,30000
2022,furniture,35000
2023,furniture,40000


In [0]:
from pyspark.sql import Window

window = Window.partitionBy("category").orderBy("year")
stat_df = stats_df.withColumn("prev_year_sales", lag("total_sales", 1).over(window))

stat_df.show()


+----+-----------+-----------+---------------+
|year|   category|total_sales|prev_year_sales|
+----+-----------+-----------+---------------+
|2021|electronics|      85000|           null|
|2022|electronics|     100000|          85000|
|2023|electronics|     120000|         100000|
|2021|    fashion|      45000|           null|
|2022|    fashion|      60000|          45000|
|2023|    fashion|      75000|          60000|
|2021|  furniture|      30000|           null|
|2022|  furniture|      35000|          30000|
|2023|  furniture|      40000|          35000|
+----+-----------+-----------+---------------+



In [0]:
sales_per_df = stat_df.withColumn(
    "percentage_diff",
    round(((col("total_sales") - col("prev_year_sales")) / col("prev_year_sales")) * 100, 2)
)

sales_per_df.show()

+----+-----------+-----------+---------------+---------------+
|year|   category|total_sales|prev_year_sales|percentage_diff|
+----+-----------+-----------+---------------+---------------+
|2021|electronics|      85000|           null|           null|
|2022|electronics|     100000|          85000|          17.65|
|2023|electronics|     120000|         100000|           20.0|
|2021|    fashion|      45000|           null|           null|
|2022|    fashion|      60000|          45000|          33.33|
|2023|    fashion|      75000|          60000|           25.0|
|2021|  furniture|      30000|           null|           null|
|2022|  furniture|      35000|          30000|          16.67|
|2023|  furniture|      40000|          35000|          14.29|
+----+-----------+-----------+---------------+---------------+



-------------------------------------------- Use pivot and aggregation----------------------------------------------

In [0]:
stats_df.groupBy("category").pivot("year").agg(sum("total_sales")).show()


+-----------+-----+------+------+
|   category| 2021|  2022|  2023|
+-----------+-----+------+------+
|  furniture|30000| 35000| 40000|
|electronics|85000|100000|120000|
|    fashion|45000| 60000| 75000|
+-----------+-----+------+------+

