In [1]:
from pyspark.sql import SparkSession

In [2]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [4]:
spark = SparkSession.builder.appName("pivot example").getOrCreate()
print(spark)

25/03/27 11:36:53 WARN Utils: Your hostname, TTNPL-8203 resolves to a loopback address: 127.0.1.1; using 10.1.243.232 instead (on interface wlp0s20f3)
25/03/27 11:36:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/27 11:36:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/27 11:36:54 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


<pyspark.sql.session.SparkSession object at 0x7ca08b5007f0>


# **Improved Solution for Customer Balance Calculation**

## **Problem Statement**
We have two datasets:
1. **Transactions**: customer_id, transaction_type (credit/debit), transaction_amount
2. **Current Amounts**: customer_id, current_amount

**Goal**: Calculate each customer's final balance after applying all transactions.

### **Input Tables**

#### **Transactions Table**
+-----------+-----------------+-------------------+
|customer_id|transaction_type |transaction_amount |
+-----------+-----------------+-------------------+
|1          |credit           |30                 |
|1          |debit            |90                 |
|2          |credit           |50                 |
|2          |debit            |90                 |
|3          |debit            |57                 |
+-----------+-----------------+-------------------+

#### **Current Amounts Table**
+-----------+--------------+
|customer_id|current_amount|
+-----------+--------------+
|1          |1000          |
|2          |2000          |
|3          |3000          |
|4          |4000          |
+-----------+--------------+

### **Expected Output**
+-----------+--------------+
|customer_id|final_balance |
+-----------+--------------+
|1          |940           |
|2          |1960          |
|3          |2943          |
|4          |4000          |
+-----------+--------------+


In [6]:
df1 = spark.read.csv("file:///home/hdoop/notebooks/data/spark_practice/examples_27Mar2025/customers_amt.csv",header = True,inferSchema = True)
df2 = spark.read.csv("file:///home/hdoop/notebooks/data/spark_practice/examples_27Mar2025/transactions.csv",header = True,inferSchema = True)

In [7]:
df1.show()
df1.printSchema()

+-----------+--------------+
|customer_id|current_amount|
+-----------+--------------+
|          1|          1000|
|          2|          1500|
|          3|           800|
|          4|          2000|
|          5|          1300|
+-----------+--------------+

root
 |-- customer_id: integer (nullable = true)
 |-- current_amount: integer (nullable = true)



In [9]:
df2.show()
df2.printSchema()

+-----------+----------------+------------------+
|customer_id|transaction_type|transaction_amount|
+-----------+----------------+------------------+
|          1|          credit|               500|
|          1|           debit|               200|
|          2|          credit|              1000|
|          2|           debit|               300|
|          3|          credit|               700|
|          3|           debit|               400|
|          4|          credit|              1200|
|          4|           debit|               500|
|          5|          credit|               900|
|          5|           debit|               300|
+-----------+----------------+------------------+

root
 |-- customer_id: integer (nullable = true)
 |-- transaction_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)



In [10]:
df3 = df2.withColumn("new_tr_amt",when(col("transaction_type") == "debit",col("transaction_amount")*-1).otherwise(col("transaction_amount")))

In [15]:
df4 = df3.groupBy("customer_id").agg(
    sum("new_tr_amt").alias("new_tr_amt")
)

In [19]:
df1.join(df4,"customer_id","full").withColumn("Final_Balance",col("current_amount")+col("new_tr_amt")).select("customer_id","Final_Balance").show()

+-----------+-------------+
|customer_id|Final_Balance|
+-----------+-------------+
|          1|         1300|
|          2|         2200|
|          3|         1100|
|          4|         2700|
|          5|         1900|
+-----------+-------------+



# **Improved Solution for Quarterly Sales Percentage Difference Calculation**

## **Problem Statement**

We have a sales dataset with the following structure:

### **Sales Data Table**
+----------+------+
|   date   |sales |
+----------+------+
|2020-01-15| 1000|
|2020-02-20| 1500|
|2020-03-10| 2000|  # Q1
|2020-04-05| 1200|
|2020-05-12| 1800|
|2020-06-08| 2200|  # Q2
|2021-01-10| 1100|
|2021-02-15| 1600|
|2021-03-20| 2100|  # Q1
|2021-04-25| 1300|
|2021-05-30| 1900|
|2021-06-05| 2300|  # Q2
+----------+------+

**Goal**: Calculate the percentage difference between total sales in Q1 and Q2 for each year.

### **Expected Output**
+----+-----------+-----------+------------------+
|year|q1_sales   |q2_sales   |pct_difference    |
+----+-----------+-----------+------------------+
|2020|4500       |5200       |15.56%            |
|2021|4800       |5500       |14.58%            |
+----+-----------+-----------+------------------+

In [20]:
df = spark.read.csv("file:///home/hdoop/notebooks/data/spark_practice/examples_27Mar2025/question2.csv",header =True,inferSchema=True)

In [21]:
df.printSchema()

root
 |-- date: date (nullable = true)
 |-- sales: integer (nullable = true)



In [22]:
df.show()

+----------+-----+
|      date|sales|
+----------+-----+
|2020-01-15| 1000|
|2020-02-20| 1500|
|2020-03-10| 2000|
|2020-04-05| 1200|
|2020-05-12| 1800|
|2020-06-08| 2200|
|2021-01-10| 1100|
|2021-02-15| 1600|
|2021-03-20| 2100|
|2021-04-25| 1300|
|2021-05-30| 1900|
|2021-06-05| 2300|
+----------+-----+



In [38]:
df1 = df.withColumn("quarter",quarter(col("date"))).withColumn("year",year("date"))

In [60]:
df2 =df1.groupBy("quarter","year").agg(
    sum("sales").alias("sales")
)

In [61]:
df2.show()

+-------+----+-----+
|quarter|year|sales|
+-------+----+-----+
|      1|2021| 4800|
|      2|2021| 5500|
|      1|2020| 4500|
|      2|2020| 5200|
+-------+----+-----+



In [62]:
df2.groupBy("year").pivot("quarter",[1,2]).agg(first(col("sales"))
                                              ).withColumnRenamed("1","q1").withColumnRenamed("2","q2").show()

+----+----+----+
|year|  q1|  q2|
+----+----+----+
|2020|4500|5200|
|2021|4800|5500|
+----+----+----+



In [55]:
df3 = df1.filter(col("quarter").isin([1,2])).groupBy("year").agg(
    sum("sales").alias("sales")
).show()

+----+-----+
|year|sales|
+----+-----+
|2020| 9700|
|2021|10300|
+----+-----+



## Problem Statement
You are given a dataset containing customer journey details with multiple records for each customer. Each record represents a travel leg with a start location and an end location. Your task is to determine the overall start location and end location for each customer’s complete journey.
For each customer:


#### Start Location = The location from which their journey began.


#### End Location = The final destination of their journey.


#### Example Input Table (customer_journey)


| customer_id | start_location | end_location |
|------------|---------------|-------------|
| C1         | London        | Paris       |
| C1         | Paris        | Berlin      |
| C1         | Berlin       | New Delhi   |
| C2         | Mumbai       | Pune        |
| C2         | Pune         | Hyderabad   |
| C3         | Kochi        | Lucknow     |
| C3         | Lucknow      | Agra        |

### Customer Journey Summary

| customer_id | start_location | end_location |
|------------|---------------|-------------|
| C1         | London        | New Delhi   |
| C2         | Mumbai       | Hyderabad   |
| C3         | Kochi        | Agra        |


In [63]:
df = spark.read.csv("file:///home/hdoop/notebooks/data/spark_practice/examples_27Mar2025/travel.csv",header = True,inferSchema =True )

In [64]:
df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- start_location: string (nullable = true)
 |-- end_location: string (nullable = true)



In [65]:
df.show()

+-----------+--------------+------------+
|customer_id|start_location|end_location|
+-----------+--------------+------------+
|         C1|        London|       Paris|
|         C1|         Paris|      Berlin|
|         C1|        Berlin|   New Delhi|
|         C2|        Mumbai|        Pune|
|         C2|          Pune|   Hyderabad|
|         C3|         Kochi|     Lucknow|
|         C3|       Lucknow|        Agra|
+-----------+--------------+------------+



In [73]:
df1 = df.groupBy("customer_id").agg(
    collect_list("start_location").alias("start_loc"),
    collect_list("end_location").alias("end_loc")
)

In [77]:
df1.show()

+-----------+--------------------+--------------------+
|customer_id|           start_loc|             end_loc|
+-----------+--------------------+--------------------+
|         C3|    [Kochi, Lucknow]|     [Lucknow, Agra]|
|         C1|[London, Paris, B...|[Paris, Berlin, N...|
|         C2|      [Mumbai, Pune]|   [Pune, Hyderabad]|
+-----------+--------------------+--------------------+



                                                                                

In [105]:
@udf(returnType=ArrayType(StringType()))
def destination_router(arr1, arr2):
    if not arr1 or not arr2:  # Handle empty inputs
        return []
    for val in arr1:
        if val not in arr2:
            source = val
    for val in arr2:
        if val not in arr1:
            destination = val
    return [source, destination]

In [106]:
print(destination_router(lit([1,2,3,4]),lit([2,3,4,5])))

Column<'destination_router(array(1, 2, 3, 4), array(2, 3, 4, 5))'>


In [107]:
df2 = df1.withColumn("source-destination",destination_router(col("start_loc"),col("end_loc")))

In [108]:
df3 = df2.withColumn("source",col("source-destination").getItem(0)).withColumn("destination",col("source-destination").getItem(1))

In [109]:
df3.show()

+-----------+--------------------+--------------------+-------------------+------+-----------+
|customer_id|           start_loc|             end_loc| source-destination|source|destination|
+-----------+--------------------+--------------------+-------------------+------+-----------+
|         C3|    [Kochi, Lucknow]|     [Lucknow, Agra]|      [Kochi, Agra]| Kochi|       Agra|
|         C1|[London, Paris, B...|[Paris, Berlin, N...|[London, New Delhi]|London|  New Delhi|
|         C2|      [Mumbai, Pune]|   [Pune, Hyderabad]|[Mumbai, Hyderabad]|Mumbai|  Hyderabad|
+-----------+--------------------+--------------------+-------------------+------+-----------+



In [110]:
df4 = df3.drop("start_loc").drop("end_loc").drop("source-destination")

In [111]:
df4.show()

+-----------+------+-----------+
|customer_id|source|destination|
+-----------+------+-----------+
|         C3| Kochi|       Agra|
|         C1|London|  New Delhi|
|         C2|Mumbai|  Hyderabad|
+-----------+------+-----------+



#### Problem Statement
Given an employee dataset with two columns (name and age), we need to extract the third quarter (25%) of the dataset. The dataset should be divided into four equal parts, and only the third quarter should be selected.

Input Table: Employee Data



+---------+-----+
|  Name   | Age |
+---------+-----+
| Alice   |  25 |
| Bob     |  30 |
| Charlie |  35 |
| David   |  40 |
| Eve     |  45 |
| Frank   |  50 |
| Grace   |  55 |
| Helen   |  60 |
+---------+-----+


In [113]:
df = spark.read.csv("file:///home/hdoop/notebooks/data/spark_practice/examples_27Mar2025/age-name.csv",header = True,inferSchema = True)

## Movie-Watch-List

In [130]:
cus_df = spark.read.csv("file:///home/hdoop/notebooks/data/spark_practice/examples_27Mar2025/movie-watch-list/customer.csv",header = True,inferSchema = True)

In [131]:
mov_df = spark.read.csv("file:///home/hdoop/notebooks/data/spark_practice/examples_27Mar2025/movie-watch-list/movie.csv",header = True,inferSchema = True)

In [132]:
movieWatchList = spark.read.csv("file:///home/hdoop/notebooks/data/spark_practice/examples_27Mar2025/movie-watch-list/movieWatchList.csv",header = True,inferSchema = True)

In [133]:
cus_df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- subscription: string (nullable = true)



In [134]:
mov_df.printSchema()

root
 |-- movie_id: integer (nullable = true)
 |-- director: string (nullable = true)
 |-- actor: string (nullable = true)
 |-- description: string (nullable = true)



In [135]:
movieWatchList.printSchema()

root
 |-- movie_id: integer (nullable = true)
 |-- cus_id: integer (nullable = true)
 |-- date: date (nullable = true)



In [136]:
watchList = movieWatchList.join(cus_df.select("customer_id","name"),cus_df["customer_id"]==movieWatchList["cus_id"],"inner").select("movie_id","cus_id","name")

In [137]:
watchList.printSchema()

root
 |-- movie_id: integer (nullable = true)
 |-- cus_id: integer (nullable = true)
 |-- name: string (nullable = true)



In [139]:
watchList2 = watchList.groupBy("movie_id").agg(
    collect_list("cus_id").alias("customer_id_list"),
    collect_list("name").alias("customer_name_list")
)

In [143]:
watchList2.join(mov_df,mov_df.movie_id == watchList2.movie_id,"left").select(mov_df["movie_id"],"director","actor","description","customer_name_list").show()

+--------+-----------------+-----------------+--------------------+------------------+
|movie_id|         director|            actor|         description|customer_name_list|
+--------+-----------------+-----------------+--------------------+------------------+
|     101|Christopher Nolan|Leonardo DiCaprio|A mind-bending th...|  [Alice, Charlie]|
|     103|Quentin Tarantino|Samuel L. Jackson|A stylish crime m...|         [Charlie]|
|     102| Steven Spielberg|        Tom Hanks|A gripping war dr...| [Bob, David, Eve]|
|     105|    James Cameron|     Kate Winslet|A romantic drama ...|             [Eve]|
|     104|  Martin Scorsese|   Robert De Niro|A classic gangste...|           [David]|
+--------+-----------------+-----------------+--------------------+------------------+



In [144]:
final_watchList = watchList2.join(mov_df,mov_df.movie_id == watchList2.movie_id,"left").select(mov_df["movie_id"],"director","actor","description","customer_name_list")

In [146]:
final_watchList_with_no = final_watchList.withColumn("no_of_watcher",size("customer_name_list"))

In [148]:
final_watchList_with_no.show()

+--------+-----------------+-----------------+--------------------+------------------+-------------+
|movie_id|         director|            actor|         description|customer_name_list|no_of_watcher|
+--------+-----------------+-----------------+--------------------+------------------+-------------+
|     101|Christopher Nolan|Leonardo DiCaprio|A mind-bending th...|  [Alice, Charlie]|            2|
|     103|Quentin Tarantino|Samuel L. Jackson|A stylish crime m...|         [Charlie]|            1|
|     102| Steven Spielberg|        Tom Hanks|A gripping war dr...| [Bob, David, Eve]|            3|
|     105|    James Cameron|     Kate Winslet|A romantic drama ...|             [Eve]|            1|
|     104|  Martin Scorsese|   Robert De Niro|A classic gangste...|           [David]|            1|
+--------+-----------------+-----------------+--------------------+------------------+-------------+



## Data Seggregation

In [149]:
df = spark.read.text("file:///home/hdoop/notebooks/data/spark_practice/examples_27Mar2025/data-seggregation-folder/RawData.txt")

In [150]:
df.show()

+--------------------+
|               value|
+--------------------+
|                    |
|customer_id|name|...|
|1|Alice|active|10...|
|2|Bob|inactive|10...|
|3|Charlie||103|3|...|
|4|David|active|10...|
|5|Eve|inactive|10...|
|6||active|106|6|2...|
|7|Grace|inactive|...|
+--------------------+



In [153]:
df1 = df.withColumn("value",split("value",'[|]'))

In [155]:
df1.withColumn("no_of_columns",size("value")).show()

+--------------------+-------------+
|               value|no_of_columns|
+--------------------+-------------+
|                  []|            1|
|[customer_id, nam...|            9|
|[1, Alice, active...|            9|
|[2, Bob, inactive...|            9|
|[3, Charlie, , 10...|            9|
|[4, David, active...|            9|
|[5, Eve, inactive...|            9|
|[6, , active, 106...|            9|
|[7, Grace, inacti...|            9|
+--------------------+-------------+



In [165]:
df1 = df1.filter(df1["value"].isNotNull())

In [166]:
df1.show(truncate = False)

+---------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                            |
+---------------------------------------------------------------------------------------------------------------------------------+
|[]                                                                                                                               |
|[customer_id, name, subscription, movie_id, cus_id, date, director, actor, description]                                          |
|[1, Alice, active, 101, 1, 2024-03-01, Christopher Nolan, Leonardo DiCaprio, A mind-bending thriller about dreams within dreams.]|
|[2, Bob, inactive, 102, 2, 2024-03-05, Steven Spielberg, Tom Hanks, A gripping war drama set during World War II.]               |
|[3, Charlie, , 103, 3, 2024-03-10, Quentin Tarantino, Samuel L. Jackson, A 