In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("day5").getOrCreate()

## Find origin and destination for customers and their total amount

In [None]:
df_travel_csv = spark.read.csv('travel.csv',header=True)
df_travel_csv.show()

+-------+---------+---------+-----------+-----+
|cust_id|flight_id|   origin|destination|price|
+-------+---------+---------+-----------+-----+
|      1|       f3|    kochi|  Mangalore| 1800|
|      1|       f1|    delhi|  hyderabad| 2500|
|      2|       f2|  Ayodhya|    chennai| 3000|
|      1|       f2|hyderabad|      kochi| 1700|
|      2|       f1|   Mumbai|    Ayodhya| 4000|
+-------+---------+---------+-----------+-----+



In [None]:
from pyspark.sql.functions import *

In [None]:
df_travel_results = df_travel_csv.orderBy('cust_id','flight_id').groupBy('cust_id').agg(first('origin')\
                                                    .alias('orogin'),last('destination').alias('destination')\
                                                    ,sum('price').alias('total_amount'))
df_travel_results.show()

+-------+------+-----------+------------+
|cust_id|orogin|destination|total_amount|
+-------+------+-----------+------------+
|      1| delhi|  Mangalore|      6000.0|
|      2|Mumbai|    chennai|      7000.0|
+-------+------+-----------+------------+



## Convert the given i/p table to given o/p table<br>

I/P:  
```
+------+------+------+  
|Team_1|Team_2|Winner|  
+------+------+------+  
| India|    SL| India|  
|    SL|   Aus|   Aus|  
|    SA|   Eng|   Eng|  
|   Eng|    NZ|    NZ|  
|   Aus| India| India|  
+------+------+------+  
```

O/P:  
```
+---------+--------------+----------+-----------+
|Team_name|matches_played|No_of_wins|No_of_loses|
+---------+--------------+----------+-----------+
|       SL|             2|         0|          2|
|    India|             2|         2|          0|
|      Eng|             2|         1|          1|
|       SA|             1|         0|          1|
|      Aus|             2|         1|          1|
|       NZ|             1|         1|          0|
+---------+--------------+----------+-----------+
```

In [None]:
# Sample data
data = [
    ("India", "SL", "India"),
    ("SL", "Aus", "Aus"),
    ("SA", "Eng", "Eng"),
    ("Eng", "NZ", "NZ"),
    ("Aus", "India", "India"),
]

columns = ["Team_1", "Team_2", "Winner"]
df = spark.createDataFrame(data, columns)
df.show()

+------+------+------+
|Team_1|Team_2|Winner|
+------+------+------+
| India|    SL| India|
|    SL|   Aus|   Aus|
|    SA|   Eng|   Eng|
|   Eng|    NZ|    NZ|
|   Aus| India| India|
+------+------+------+



In [None]:
# Union All -> gives all data, keep duplicate, no sorting, faster
# Union -> gives unique data, sorting is done, slow

df_col1 = df.select('Team_1')
df_col2 = df.select('Team_2')
# union_teams_df = df_col1.union(df_col2).distinct().show()

# Derived column -> matches played
union_teams_df = df_col1.union(df_col2).groupBy('Team_1').agg(count('Team_1').alias('matches_played'))\
                  .withColumnRenamed('Team_1','Team_name')
union_teams_df.show()

# Winner column
join_df = union_teams_df.join(df,union_teams_df.Team_name == df.Winner,'left').drop('Team_1','Team_2')
# join_df.show()

# Derived column -> No. of wins
join_df = join_df.groupBy('Team_name','matches_played').agg(count('Winner').alias('No_of_wins'))
# join_df.show()

# Derived column -> No. of loses
#join_df.printSchema()
final_result_df = join_df.withColumn('No_of_loses',col('matches_played') - col('No_of_wins'))
final_result_df.show()

+---------+--------------+
|Team_name|matches_played|
+---------+--------------+
|       SL|             2|
|    India|             2|
|      Eng|             2|
|       SA|             1|
|      Aus|             2|
|       NZ|             1|
+---------+--------------+

+---------+--------------+------+
|Team_name|matches_played|Winner|
+---------+--------------+------+
|       SL|             2|  NULL|
|    India|             2| India|
|    India|             2| India|
|      Eng|             2|   Eng|
|       SA|             1|  NULL|
|      Aus|             2|   Aus|
|       NZ|             1|    NZ|
+---------+--------------+------+

+---------+--------------+----------+
|Team_name|matches_played|No_of_wins|
+---------+--------------+----------+
|       SL|             2|         0|
|    India|             2|         2|
|      Eng|             2|         1|
|       SA|             1|         0|
|      Aus|             2|         1|
|       NZ|             1|         1|
+---------+-

## Write a PySpark program to select each employee's primary department.

### If primary_flag = 'Y' exists, pick that, otherwise pick any/default department.

I/P:

```
employee_id | department_id | primary_flag
-----------------------------------------
1           | 1             | N
2           | 1             | Y
2           | 3             | N
3           | 3             | N
4           | 2             | Y
4           | 3             | N
```

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [None]:
# Define schema
schema = StructType([
    StructField("employee_id", IntegerType(), True),
    StructField("department_id", IntegerType(), True),
    StructField("primary_flag", StringType(), True)
])

# Input data
data = [
    (1, 1, "N"),
    (2, 1, "Y"),
    (2, 3, "N"),
    (3, 3, "N"),
    (4, 2, "N"),
    (4, 3, "N"),
    (4, 4, "Y")
]

df_emp = spark.createDataFrame(data, schema)

df_emp.show()

+-----------+-------------+------------+
|employee_id|department_id|primary_flag|
+-----------+-------------+------------+
|          1|            1|           N|
|          2|            1|           Y|
|          2|            3|           N|
|          3|            3|           N|
|          4|            2|           N|
|          4|            3|           N|
|          4|            4|           Y|
+-----------+-------------+------------+



In [None]:
from pyspark.sql.window import Window

In [None]:

W = Window.partitionBy('employee_id').orderBy(col('primary_flag').desc())
df_emp_result = df_emp.withColumn('rn',row_number().over(W))\
                                  .filter('rn = 1')\
                                  .select("employee_id", "department_id")
df_emp_result.show()

+-----------+-------------+
|employee_id|department_id|
+-----------+-------------+
|          1|            1|
|          2|            1|
|          3|            3|
|          4|            4|
+-----------+-------------+



## Assignment 2

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

spark = SparkSession.builder.appName("Assignment2").getOrCreate()

# Schema
schema = StructType([
    StructField("customer_name", StringType(), True)
])

# Sample input data
data = [
    ("kasireddy naidu",),
    ("konidela ram charan",),
    ("Nandamuri tarak ramarao",),
    ("charan",)
]

df = spark.createDataFrame(data, schema)
df.show(truncate=False)

+-----------------------+
|customer_name          |
+-----------------------+
|kasireddy naidu        |
|konidela ram charan    |
|Nandamuri tarak ramarao|
|charan                 |
+-----------------------+



In [None]:
from pyspark.sql.functions import split, size, col, when

# Split name into array
df_split = df.withColumn("name_arr", split(col("customer_name"), " "))

df_final = df_split.select(
    col("customer_name"),

    when(size(col("name_arr")) >= 1, col("name_arr")[0])
        .otherwise(None).alias("first_name"),

    when(size(col("name_arr")) >= 3, col("name_arr")[1])
        .when(size(col("name_arr")) == 2, None)     # 2-word name → no middle name
        .otherwise(None).alias("middle_name"),

    when(size(col("name_arr")) >= 2, col("name_arr")[size(col("name_arr")) - 1])
        .otherwise(col("name_arr")[0]).alias("last_name")
)

df_final.show(truncate=False)

+-----------------------+----------+-----------+---------+
|customer_name          |first_name|middle_name|last_name|
+-----------------------+----------+-----------+---------+
|kasireddy naidu        |kasireddy |NULL       |naidu    |
|konidela ram charan    |konidela  |ram        |charan   |
|Nandamuri tarak ramarao|Nandamuri |tarak      |ramarao  |
|charan                 |charan    |NULL       |charan   |
+-----------------------+----------+-----------+---------+



## Assignment 3

In [None]:
park = SparkSession.builder.getOrCreate()

data = [
    (1, 101, 500.0, "2024-01-01"),
    (2, 101, 600.0, "2024-01-01"),
    (3, 101, 200.0, "2024-01-02"),
    (4, 101, 300.0, "2024-01-03"),
    (5, 102, 400.0, "2024-01-05"),
    (6, 103, 600.0, "2024-01-06"),
    (7, 101, 200.0, "2024-01-07")
]

df = spark.createDataFrame(data, ["t_id", "user_id", "amount", "t_date"])

# count orders per user
# user_order_counts = df.groupBy("user_id") \
#                       .agg(count("*").alias("order_count"))

# # filter users with minimum 3 orders
# eligible_users = user_order_counts.filter(col("order_count") >= 3)

# # join back and compute total spending
# result = df.join(eligible_users, "user_id", "inner") \
#            .groupBy("user_id") \
#            .agg(sum_("amount").alias("total_spend"))

# result.show()


result = (
    df.groupBy("user_id")
      .agg(
          count("*").alias("order_count"),
          sum("amount").alias("total_spend")
      )
      .filter(col("order_count") >= 3)
      .select("user_id", "total_spend")
)

result.show()

+-------+-----------+
|user_id|total_spend|
+-------+-----------+
|    101|     1800.0|
+-------+-----------+



## Assignment 4

In [None]:
data = [
    ("RCB",),
    ("CSK",),
    ("MI",),
    ("PBKS",)
]

df = spark.createDataFrame(data, ["team"])
df.show()

df_matches = (
    df.alias("a")
      .crossJoin(df.alias("b"))
      .filter(col("a.team") < col("b.team"))   # ensures A<B → no repetition
      .select(
          col("a.team").alias("team1"),
          col("b.team").alias("team2")
      )
)

df_matches.show()

+----+
|team|
+----+
| RCB|
| CSK|
|  MI|
|PBKS|
+----+

+-----+-----+
|team1|team2|
+-----+-----+
|  CSK|  RCB|
|  CSK|   MI|
|  CSK| PBKS|
|   MI|  RCB|
| PBKS|  RCB|
|   MI| PBKS|
+-----+-----+

