<a href="https://colab.research.google.com/github/Abhishek-Shetty-02/DATABRICKS_PYSPARK/blob/main/SCENARIOS_01_TO_05.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [43]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.appName("Practice").getOrCreate()

# **SCENARIO 01**

In [2]:
data = [
    ("India", "SL", "India"),
    ("SL", "Aus", "Aus"),
    ("SA", "Eng", "Eng"),
    ("Eng", "NZ", "NZ"),
    ("Aus", "India", "India"),
]

columns = ["Team_1", "Team_2", "Winner"]

df = spark.createDataFrame(data, columns)
df.show()

+------+------+------+
|Team_1|Team_2|Winner|
+------+------+------+
| India|    SL| India|
|    SL|   Aus|   Aus|
|    SA|   Eng|   Eng|
|   Eng|    NZ|    NZ|
|   Aus| India| India|
+------+------+------+



In [3]:
df1= df.select("Team_1")
df2= df.select("Team_2")
union_df = df1.unionAll(df2) \
              .groupBy('Team_1') \
              .agg(count("Team_1").alias("matches_played")) \
              .withColumnRenamed("Team_1", 'team_name')
# union_df.show()

join_df = union_df.join(df, union_df.team_name == df.Winner, 'left') \
                  .drop("Team_1", "Team_2")
# join_df.show()

results_df = join_df.groupBy("team_name", "matches_played") \
                    .agg(count("Winner").alias("no_of_wins"))
# results_df.show()

final_df = results_df.withColumn("matches_played", col("matches_played").cast(IntegerType())) \
                      .withColumn("no_of_wins", col("no_of_wins").cast(IntegerType())) \
                      .withColumn("no_of_losses", col("matches_played") - col("no_of_wins" ))
final_df.show()

+---------+--------------+----------+------------+
|team_name|matches_played|no_of_wins|no_of_losses|
+---------+--------------+----------+------------+
|       SL|             2|         0|           2|
|    India|             2|         2|           0|
|      Eng|             2|         1|           1|
|       SA|             1|         0|           1|
|      Aus|             2|         1|           1|
|       NZ|             1|         1|           0|
+---------+--------------+----------+------------+



# **SCENARIO 02**

In [85]:
data2 = [
    (1, 1, "N"),  # Y
    (2, 1, "Y"),
    (2, 2, "N"),
    (3, 3, "N"),  # Y
    (4, 2, "N"),
    (4, 3, "Y"),
    (4, 4, "N"),
]

columns = ["employee_id", "department_id", "primary_flag"]

df = spark.createDataFrame(data2,columns)

df.show()

+-----------+-------------+------------+
|employee_id|department_id|primary_flag|
+-----------+-------------+------------+
|          1|            1|           N|
|          2|            1|           Y|
|          2|            2|           N|
|          3|            3|           N|
|          4|            2|           N|
|          4|            3|           Y|
|          4|            4|           N|
+-----------+-------------+------------+



In [86]:
# dff = df.filter(col("primary_flag")== "Y")\
#         .select("employee_id","department_id")

# dff.show()



dff = df.orderBy("primary_flag").show()


# window = window.partitionBy("employee_id")

+-----------+-------------+------------+
|employee_id|department_id|primary_flag|
+-----------+-------------+------------+
|          3|            3|           N|
|          1|            1|           N|
|          4|            2|           N|
|          2|            2|           N|
|          4|            4|           N|
|          2|            1|           Y|
|          4|            3|           Y|
+-----------+-------------+------------+



# **SCENARIO 03**

In [44]:
data3 = [
    ("kasireddy naidu",),
    ("konidela ram charan",),
    ("Nandamuri tarak ramarao",),
    ("charan",),
]

columns = ["customer_name"]

df3 = spark.createDataFrame(data3,columns)

df3.show()

+--------------------+
|       customer_name|
+--------------------+
|     kasireddy naidu|
| konidela ram charan|
|Nandamuri tarak r...|
|              charan|
+--------------------+



In [50]:
split_df = df3.withColumn("customer_name",split(col("customer_name")," "))


df_final = split_df.withColumn("First_name",when(size(col("customer_name")) >= 1,initcap(col("customer_name").getItem(0))))\
                   .withColumn("Middle_name",when(size(col("customer_name")) >= 3,initcap(col("customer_name").getItem(1))))\
                   .withColumn("Last_name",when(size(col("customer_name")) >= 2,initcap(concat_ws(" ", slice(col("customer_name"), 2, 10)))))

df_final.show()

+--------------------+----------+-----------+-------------+
|       customer_name|First_name|Middle_name|    Last_name|
+--------------------+----------+-----------+-------------+
|  [kasireddy, naidu]| Kasireddy|       NULL|        Naidu|
|[konidela, ram, c...|  Konidela|        Ram|   Ram Charan|
|[Nandamuri, tarak...| Nandamuri|      Tarak|Tarak Ramarao|
|            [charan]|    Charan|       NULL|         NULL|
+--------------------+----------+-----------+-------------+



# **BONUS**

In [82]:
data4 = [
    ("kasireddy naidu",),
    ("konidela ram charan",),
    ("nandamuri tarak ramarao",),
    ("charan",),
    ("megastar chiranjeevi konidela",),
    ("allu arjun stylish star",),
    ("pawan kalyan power star",),
    ("jr ntr young tiger",),
    ("mahesh babu superstar",),
    ("prabhas rebel pan india star",),
]

columns4 = ["customer_name"]

df4 = spark.createDataFrame(data4,columns4)

df4.show(truncate=False)

+-----------------------------+
|customer_name                |
+-----------------------------+
|kasireddy naidu              |
|konidela ram charan          |
|nandamuri tarak ramarao      |
|charan                       |
|megastar chiranjeevi konidela|
|allu arjun stylish star      |
|pawan kalyan power star      |
|jr ntr young tiger           |
|mahesh babu superstar        |
|prabhas rebel pan india star |
+-----------------------------+



In [83]:
from posix import truncate
s_df = df4.withColumn("customer_name",split(col("customer_name")," "))


df_fin = s_df.withColumn("First_name",when(size(col("customer_name")) >= 1,initcap(col("customer_name").getItem(0))))\
                   .withColumn("Middle_name",when(size(col("customer_name")) >= 3,initcap(col("customer_name").getItem(1))))\
                   .withColumn("Last_name",when(size(col("customer_name")) >=3,initcap(concat_ws(" ", slice(col("customer_name"), 3, 10)))))

df_fin.show(truncate=False)

+----------------------------------+----------+-----------+--------------+
|customer_name                     |First_name|Middle_name|Last_name     |
+----------------------------------+----------+-----------+--------------+
|[kasireddy, naidu]                |Kasireddy |NULL       |NULL          |
|[konidela, ram, charan]           |Konidela  |Ram        |Charan        |
|[nandamuri, tarak, ramarao]       |Nandamuri |Tarak      |Ramarao       |
|[charan]                          |Charan    |NULL       |NULL          |
|[megastar, chiranjeevi, konidela] |Megastar  |Chiranjeevi|Konidela      |
|[allu, arjun, stylish, star]      |Allu      |Arjun      |Stylish Star  |
|[pawan, kalyan, power, star]      |Pawan     |Kalyan     |Power Star    |
|[jr, ntr, young, tiger]           |Jr        |Ntr        |Young Tiger   |
|[mahesh, babu, superstar]         |Mahesh    |Babu       |Superstar     |
|[prabhas, rebel, pan, india, star]|Prabhas   |Rebel      |Pan India Star|
+------------------------

# **SCENARIO 04**

In [61]:
data5 = [
    (1, 101, 500.0, "2024-01-01"),
    (1, 101, 600.0, "2024-01-01"),
    (2, 102, 200.0, "2024-01-02"),
    (3, 101, 300.0, "2024-01-03"),
    (4, 103, 100.0, "2024-01-04"),
    (5, 102, 400.0, "2024-01-05"),
    (6, 103, 600.0, "2024-01-06"),
    (7, 101, 200.0, "2024-01-07"),
]

columns5 = ["t_id", "user_id", "amount", "t_date"]

df5 = spark.createDataFrame(data5,columns5)

df5.show()

+----+-------+------+----------+
|t_id|user_id|amount|    t_date|
+----+-------+------+----------+
|   1|    101| 500.0|2024-01-01|
|   1|    101| 600.0|2024-01-01|
|   2|    102| 200.0|2024-01-02|
|   3|    101| 300.0|2024-01-03|
|   4|    103| 100.0|2024-01-04|
|   5|    102| 400.0|2024-01-05|
|   6|    103| 600.0|2024-01-06|
|   7|    101| 200.0|2024-01-07|
+----+-------+------+----------+



In [67]:
agg_df = df5.filter(col("user_id")==101).groupBy("user_id").agg(sum("amount").alias("total_spent"))

agg_df.show()

+-------+-----------+
|user_id|total_spent|
+-------+-----------+
|    101|     1600.0|
+-------+-----------+



# **SCENARIO 05**

In [77]:
data6 = [
    ("RCB",),
    ("CSK",),
    ("MI",),
    ("PBKS",),
]

columns6 = ["team"]

df = spark.createDataFrame(data6,columns6)

df.show()

+----+
|team|
+----+
| RCB|
| CSK|
|  MI|
|PBKS|
+----+



In [80]:
# two teams: two tables

df1 = df.alias("df1")
df2 = df.alias("df2")

matchs = df1.join(df2, col("df1.team") < col("df2.team")) \
              .select(col("df1.team").alias("team1"), col("df2.team").alias("team2"))

matchs.show()

+-----+-----+
|team1|team2|
+-----+-----+
|  CSK|  RCB|
|  CSK|   MI|
|  CSK| PBKS|
|   MI|  RCB|
| PBKS|  RCB|
|   MI| PBKS|
+-----+-----+

