<a href="https://colab.research.google.com/github/RajuKGosala-45/PySpark-Practice-Journey-With-IPL_Data/blob/main/IPL_Match_Dataset.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **PySpark Setup (for Colab)**

In [1]:
# PySpark Setup (for Colab)
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!pip install -q pyspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["PATH"] += os.pathsep + os.path.join(os.environ["JAVA_HOME"], "bin")



# **Day 34 -Advanced Pyspark Practices Using IPL_Match_Data**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import*
from pyspark.sql.types import*
from pyspark.sql.window import Window

spark=SparkSession.builder.appName("IPL_Match_Data_Analysis").getOrCreate()
matches_df=spark.read.csv("/content/ipl_matches_data.csv", header=True, inferSchema=True)
players_df=spark.read.csv("/content/players-data-updated.csv", header=True, inferSchema=True)
teams_df=spark.read.csv("/content/teams_data.csv", header=True, inferSchema=True)

# Ipl matches Data Info
print("---Matches_Data Schema---")
matches_df.printSchema()
print("---Matches_Data")
matches_df.show()
print("---Matches_Data Describe")
matches_df.describe().show()

# Players Data Info
print("---Players_Data Schema---")
players_df.printSchema()
print("---Players_Data")
players_df.show()
print("---Players_Data Describe")
players_df.describe().show()

# Teams_Data Info
print("---Teams_Data Schema---")
teams_df.printSchema()
print("---Teams_Data")
teams_df.show()
print("---Teams_Data Describe")
teams_df.describe().show()


---Matches_Data Schema---
root
 |-- match_id: integer (nullable = true)
 |-- season_id: integer (nullable = true)
 |-- balls_per_over: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- match_date: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- match_number: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- match_type: string (nullable = true)
 |-- format: timestamp (nullable = true)
 |-- overs: integer (nullable = true)
 |-- season: string (nullable = true)
 |-- team_type: string (nullable = true)
 |-- venue: string (nullable = true)
 |-- toss_winner: string (nullable = true)
 |-- team1: string (nullable = true)
 |-- team2: string (nullable = true)
 |-- toss_decision: string (nullable = true)
 |-- match_winner: string (nullable = true)
 |-- win_by_runs: string (nullable = true)
 |-- win_by_wickets: string (nullable = true)
 |-- player_of_match: string (nullable = true)
 |-- result: string (nullable = true)
 |-- stage: string (

## 1.Join Datasets in Pyspark

In [None]:
Ipl_data= matches_df.join(teams_df, matches_df["team1"] == teams_df["team_name"], "left")\
                    .join(players_df, matches_df["player_of_match"] == players_df["player_id"], "left")
Ipl_data.show()

+--------+---------+--------------+---------+----------+--------------------+------------+------+----------+-------------------+-----+------+---------+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+-----------+--------------+---------------+------+-----+-------+--------------------+---------------+--------------------+---------+---------------+--------------+--------------------+--------------------+--------------------+-------------------+--------------------+
|match_id|season_id|balls_per_over|     city|match_date|          event_name|match_number|gender|match_type|             format|overs|season|team_type|               venue|         toss_winner|               team1|               team2|toss_decision|        match_winner|win_by_runs|win_by_wickets|player_of_match|result|stage|team_id|           team_name|team_name_short|           image_url|player_id|    player_name|     bat_style|          bowl_style|        

## 2. Aggregations - Top Winning Teams

In [None]:
team_wins = matches_df.groupBy("match_winner").agg(count("*").alias("total_wins"))\
.orderBy(desc("total_wins"))
team_wins.show(5)

+--------------------+----------+
|        match_winner|total_wins|
+--------------------+----------+
|      Mumbai Indians|       153|
| Chennai Super Kings|       142|
|Kolkata Knight Ri...|       135|
|Royal Challengers...|       133|
| Sunrisers Hyderabad|       125|
+--------------------+----------+
only showing top 5 rows



## 3.Player Analysis – Most Player of the Match Awards

In [None]:
top_players=players_df.join(matches_df, players_df["player_id"] == matches_df["player_of_match"],"inner")\
                      .groupby("player_name").agg(count("*").alias("MOM_Awards"))\
                      .orderBy(desc("MOM_Awards"))
top_players.show(5)

+--------------+----------+
|   player_name|MOM_Awards|
+--------------+----------+
|AB de Villiers|        25|
|      CH Gayle|        22|
|     RG Sharma|        21|
|       V Kohli|        19|
|     DA Warner|        18|
+--------------+----------+
only showing top 5 rows



## 4.Use Window Functions – Top 3 Players by Avg Runs

In [None]:
window_player = Window.orderBy(desc("mom_awards"))
ranked_players = top_players.withColumn("rank", rank().over(window_player))
ranked_players.filter(col("rank") <= 3).show(truncate=False)

+--------------+----------+----+
|player_name   |MOM_Awards|rank|
+--------------+----------+----+
|AB de Villiers|25        |1   |
|CH Gayle      |22        |2   |
|RG Sharma     |21        |3   |
+--------------+----------+----+



## 5.Team Performance Summary

In [None]:
team_performance = matches_df.groupBy("match_winner").agg(
    count("*").alias("Matches_Won"),
    round(avg("win_by_runs"), 2).alias("Avg_Run_Margin"),
    round(avg("win_by_wickets"), 2).alias("Avg_Wicket_Margin")
).orderBy(desc("Matches_Won"))

team_performance.show(10, truncate=False)

+---------------------------+-----------+--------------+-----------------+
|match_winner               |Matches_Won|Avg_Run_Margin|Avg_Wicket_Margin|
+---------------------------+-----------+--------------+-----------------+
|Mumbai Indians             |153        |34.03         |6.24             |
|Chennai Super Kings        |142        |35.61         |5.93             |
|Kolkata Knight Riders      |135        |33.49         |6.24             |
|Royal Challengers Bangalore|133        |33.03         |6.59             |
|Sunrisers Hyderabad        |125        |26.31         |6.67             |
|Punjab Kings               |123        |24.21         |6.17             |
|Delhi Capitals             |121        |25.62         |6.18             |
|Rajasthan Royals           |116        |30.71         |5.87             |
|Gujarat Titans             |37         |36.47         |6.05             |
|Lucknow Super Giants       |30         |22.1          |5.4              |
+------------------------

# **35.Best Practices of Advanced Joins + Real-World Analysis**

### *1.Find the Top 5 Most Dominant Teams (Based on Win %)*

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark=SparkSession.builder.appName("IPL_Analytics").getOrCreate()
matches_df=spark.read.csv("/content/ipl_matches_data.csv", header=True, inferSchema=True)
players_df=spark.read.csv("/content/players-data-updated.csv", header=True, inferSchema=True)
teams_df=spark.read.csv("/content/teams_data.csv", header=True, inferSchema=True)

ipl_data =matches_df.join(teams_df, matches_df["team1"]== teams_df["team_name"], "left")\
                    .join(players_df, matches_df["player_of_match"]== players_df["player_id"],"left")

team_stats = (
    ipl_data.groupBy("match_winner").agg(
        count("*").alias("wins")
    )
    .join(
        ipl_data.select("team1").union(ipl_data.select("team2"))
              .groupBy("team1").count()
              .withColumnRenamed("team1", "team")
              .withColumnRenamed("count", "total_matches"),
        col("match_winner") == col("team"),
        "inner"
    )
    .withColumn("win_percentage", round((col("wins") / col("total_matches")) * 100, 2))
    .orderBy(col("win_percentage").desc())
)

team_stats.show(5)

+--------------------+----+--------------------+-------------+--------------+
|        match_winner|wins|                team|total_matches|win_percentage|
+--------------------+----+--------------------+-------------+--------------+
|      Gujarat Titans|  37|      Gujarat Titans|           60|         61.67|
| Chennai Super Kings| 142| Chennai Super Kings|          252|         56.35|
|      Mumbai Indians| 153|      Mumbai Indians|          277|         55.23|
|Lucknow Super Giants|  30|Lucknow Super Giants|           58|         51.72|
|Kolkata Knight Ri...| 135|Kolkata Knight Ri...|          265|         50.94|
+--------------------+----+--------------------+-------------+--------------+
only showing top 5 rows



### 2.Most Toss Winners vs Actual Match Winners(Correlation Check)

In [None]:
toss_vs_match =(
    ipl_data.withColumn("toss_match_same",
                        (col("toss_winner") == col("match_winner"))
                        .cast("int"))
)
toss_vs_match.groupBy().avg("toss_match_same").show()

+--------------------+
|avg(toss_match_same)|
+--------------------+
|  0.5150732127476314|
+--------------------+



### 3.strongest cities - Most matches Hosted

In [None]:
city_df=(
    ipl_data.groupBy("city").count()
    .orderBy(col("count").desc())
)
city_df.show(10)

+----------+-----+
|      city|count|
+----------+-----+
|    Mumbai|  180|
|   Kolkata|  100|
|     Delhi|   97|
|   Chennai|   91|
| Hyderabad|   83|
| Bangalore|   65|
|    Jaipur|   64|
|Chandigarh|   61|
|      Pune|   51|
|      NULL|   51|
+----------+-----+
only showing top 10 rows



### 4.Top Bowlers Based on Player of Match Awards

In [None]:
bowler_awards = (
    ipl_data.filter((col("bowl_style").isNotNull()) & (col("bowl_style") != ""))
    .groupBy("player_full_name")
    .count()
    .orderBy(col("count").desc())
)
bowler_awards.show(10, truncate=False)



+-------------------------------+-----+
|player_full_name               |count|
+-------------------------------+-----+
|Abraham Benjamin de Villiers   |25   |
|Christopher Henry Gayle        |22   |
|Rohit Gurunath Sharma          |21   |
|Virat Kohli                    |19   |
|David Andrew Warner            |18   |
|Mahendra Singh Dhoni           |18   |
|Sunil Philip Narine            |17   |
|Andre Dwayne Russell           |16   |
|Ravindrasinh Anirudhsinh Jadeja|16   |
|Yusuf Khan Pathan              |16   |
+-------------------------------+-----+
only showing top 10 rows



### 5. Compare Home vs Away Team Performance

In [None]:
home_away=(
    ipl_data.groupBy("match_winner", "city")
    .count()
    .orderBy(col("count").desc())
)
home_away.show(truncate=False)

+---------------------------+----------+-----+
|match_winner               |city      |count|
+---------------------------+----------+-----+
|Mumbai Indians             |Mumbai    |68   |
|Kolkata Knight Riders      |Kolkata   |54   |
|Chennai Super Kings        |Chennai   |51   |
|Sunrisers Hyderabad        |Hyderabad |40   |
|Rajasthan Royals           |Jaipur    |38   |
|Delhi Capitals             |Delhi     |38   |
|Punjab Kings               |Chandigarh|31   |
|Royal Challengers Bangalore|Bangalore |28   |
|Chennai Super Kings        |Mumbai    |18   |
|Rajasthan Royals           |Mumbai    |17   |
|Royal Challengers Bangalore|Bengaluru |17   |
|Royal Challengers Bangalore|Mumbai    |13   |
|Gujarat Titans             |Ahmedabad |13   |
|Sunrisers Hyderabad        |Mumbai    |12   |
|Punjab Kings               |Mumbai    |12   |
|Delhi Capitals             |Mumbai    |11   |
|Sunrisers Hyderabad        |Delhi     |11   |
|Mumbai Indians             |Kolkata   |10   |
|Punjab Kings

# **36.Advanced Window Functions (Ranking + Running Totals + Lead/Lag)**

### 1. Rank Teams by Wins (Dense Rank)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
spark=SparkSession.builder.appName("IPL_Analytics").getOrCreate()
matches_df=spark.read.csv("/content/ipl_matches_data.csv", header=True, inferSchema=True)
players_df=spark.read.csv("/content/players-data-updated.csv", header=True, inferSchema=True)
teams_df=spark.read.csv("/content/teams_data.csv", header=True, inferSchema=True)

ipl_data=matches_df.join(players_df, matches_df["player_of_match"] == players_df["player_id"], "left")\
                   .join(teams_df, matches_df["team1"] == teams_df["team_name"], "left")

win_count=ipl_data.groupBy("match_winner").agg(count("*").alias("wins")
)
WindowSpec=Window.orderBy(win_count.wins.desc())
ranked_teams=win_count.withColumn("rank", dense_rank().over(WindowSpec))
ranked_teams.show(truncate=False)



+---------------------------+----+----+
|match_winner               |wins|rank|
+---------------------------+----+----+
|Mumbai Indians             |153 |1   |
|Chennai Super Kings        |142 |2   |
|Kolkata Knight Riders      |135 |3   |
|Royal Challengers Bangalore|133 |4   |
|Sunrisers Hyderabad        |125 |5   |
|Punjab Kings               |123 |6   |
|Delhi Capitals             |121 |7   |
|Rajasthan Royals           |116 |8   |
|Gujarat Titans             |37  |9   |
|Lucknow Super Giants       |30  |10  |
|Rising Pune Supergiant     |15  |11  |
|Gujarat Lions              |13  |12  |
|Pune Warriors              |12  |13  |
|NULL                       |8   |14  |
|Kochi Tuskers Kerala       |6   |15  |
+---------------------------+----+----+



### 2. Rolling Match Count per Team (Running Total)

In [None]:
window_team=Window.partitionBy("match_winner").orderBy("match_winner")
running_total= (
    ipl_data.withColumn("runnin_win_number",
                        row_number().over(window_team))
)
running_total.select("match_winner","match_date", "runnin_win_number").show(truncate=False)

+-------------------+----------+-----------------+
|match_winner       |match_date|runnin_win_number|
+-------------------+----------+-----------------+
|NULL               |30-04-2019|1                |
|NULL               |03-05-2023|2                |
|NULL               |21-05-2011|3                |
|NULL               |29-04-2015|4                |
|NULL               |17-05-2015|5                |
|NULL               |26-04-2025|6                |
|NULL               |05-05-2025|7                |
|NULL               |17-05-2025|8                |
|Chennai Super Kings|07-04-2018|1                |
|Chennai Super Kings|10-04-2018|2                |
|Chennai Super Kings|20-04-2018|3                |
|Chennai Super Kings|22-04-2018|4                |
|Chennai Super Kings|25-04-2018|5                |
|Chennai Super Kings|30-04-2018|6                |
|Chennai Super Kings|05-05-2018|7                |
|Chennai Super Kings|13-05-2018|8                |
|Chennai Super Kings|20-05-2018

### 3.Lag Analysis: Was Previous Match Won?

In [None]:
win_flag = (
    ipl_data.withColumn("is_win", (ipl_data.match_winner == ipl_data.team1).cast("int"))
          .withColumn("previous_game_win",
                      lag("is_win", 1).over(Window.partitionBy("team1").orderBy("match_date")))
)

win_flag.show(10, truncate=False)

+--------+---------+--------------+------------+----------+---------------------+------------+------+----------+-------------------+-----+------+---------+---------------------------------------------+-------------------+-------------------+---------------------+-------------+-------------------+-----------+--------------+---------------+------+-----+---------+-------------+--------------+----------------------+------------+-------------------------------+------------------------+----------------------------------------------------------------------------------------------------+-------+-------------------+---------------+---------------------------------------------------------------------+------+-----------------+
|match_id|season_id|balls_per_over|city        |match_date|event_name           |match_number|gender|match_type|format             |overs|season|team_type|venue                                        |toss_winner        |team1              |team2                |toss_decisi

### 4. Lead Function: Who Will They Face Next?

In [None]:
next_opponent=(
    ipl_data.withColumn("Next_team",
                        lead("team2", 1).over(Window.partitionBy("team1").orderBy("match_date"))
))
next_opponent.select("team1", "team2", "Next_team").show(truncate=False)

+-------------------+---------------------------+---------------------------+
|team1              |team2                      |Next_team                  |
+-------------------+---------------------------+---------------------------+
|Chennai Super Kings|Sunrisers Hyderabad        |Delhi Capitals             |
|Chennai Super Kings|Delhi Capitals             |Mumbai Indians             |
|Chennai Super Kings|Mumbai Indians             |Sunrisers Hyderabad        |
|Chennai Super Kings|Sunrisers Hyderabad        |Punjab Kings               |
|Chennai Super Kings|Punjab Kings               |Rajasthan Royals           |
|Chennai Super Kings|Rajasthan Royals           |Delhi Capitals             |
|Chennai Super Kings|Delhi Capitals             |Delhi Capitals             |
|Chennai Super Kings|Delhi Capitals             |Punjab Kings               |
|Chennai Super Kings|Punjab Kings               |Kolkata Knight Riders      |
|Chennai Super Kings|Kolkata Knight Riders      |Rajasthan Royal

### 5. Most Consistent Teams (Rolling Win Rate)

In [None]:

win_rate_spec = Window.partitionBy("match_winner").orderBy("match_date").rowsBetween(Window.unboundedPreceding, 0)

rolling_rate = (
    ipl_data.withColumn("win_flag", (col("match_winner").isNotNull()).cast("int"))
          .withColumn("rolling_win_rate",
                      avg("win_flag").over(win_rate_spec))
)

rolling_rate.select("match_winner", "match_date", "rolling_win_rate").show()


+-------------------+----------+----------------+
|       match_winner|match_date|rolling_win_rate|
+-------------------+----------+----------------+
|               NULL|03-05-2023|             0.0|
|               NULL|05-05-2025|             0.0|
|               NULL|17-05-2015|             0.0|
|               NULL|17-05-2025|             0.0|
|               NULL|21-05-2011|             0.0|
|               NULL|26-04-2025|             0.0|
|               NULL|29-04-2015|             0.0|
|               NULL|30-04-2019|             0.0|
|Chennai Super Kings|01-05-2011|             1.0|
|Chennai Super Kings|01-05-2019|             1.0|
|Chennai Super Kings|01-05-2022|             1.0|
|Chennai Super Kings|01-11-2020|             1.0|
|Chennai Super Kings|02-05-2009|             1.0|
|Chennai Super Kings|02-05-2013|             1.0|
|Chennai Super Kings|02-05-2014|             1.0|
|Chennai Super Kings|03-04-2010|             1.0|
|Chennai Super Kings|03-04-2023|             1.0|


# **37.PySpark Joins: Advanced + Real-World Optimizations**

### 1. Broadcast Join (Small lookup table: teams_data)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
spark=SparkSession.builder.appName("IPL_Analytics").getOrCreate()
matches_df=spark.read.csv("/content/ipl_matches_data.csv", header=True, inferSchema=True)
players_df=spark.read.csv("/content/players-data-updated.csv", header=True, inferSchema=True)
teams_df=spark.read.csv("/content/teams_data.csv", header=True, inferSchema=True)
matches = matches_df.withColumn("team1_k", upper(trim(col("team1")))) \
                 .withColumn("team2_k", upper(trim(col("team2"))))

teams = teams_df.withColumn("team_k", upper(trim(col("team_name"))))

matches_broadcast = matches.join(
    broadcast(teams),
    matches.team1_k == teams.team_k,
    "left"
).withColumnRenamed("team_name_short", "team1_short")

matches_broadcast.show(5)


+--------+---------+--------------+---------+----------+--------------------+------------+------+----------+-------------------+-----+------+---------+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+-----------+--------------+---------------+------+-----+--------------------+--------------------+-------+--------------------+-----------+--------------------+--------------------+
|match_id|season_id|balls_per_over|     city|match_date|          event_name|match_number|gender|match_type|             format|overs|season|team_type|               venue|         toss_winner|               team1|               team2|toss_decision|        match_winner|win_by_runs|win_by_wickets|player_of_match|result|stage|             team1_k|             team2_k|team_id|           team_name|team1_short|           image_url|              team_k|
+--------+---------+--------------+---------+----------+--------------------+------------+------

### 2.Left Anti Join (Find Missing Players)

In [None]:
players = players_df.withColumn("player_k", upper(trim(col("player_full_name"))))
matches = matches_df.withColumn("pom_k", upper(trim(col("player_of_match"))))

missing_pom = matches.join(
    players,
    matches.pom_k == players.player_k,
    "left_anti"
)

missing_pom.select("match_date", "team1", "team2", "player_of_match").show(10, truncate=False)


+----------+---------------------------+---------------------------+---------------+
|match_date|team1                      |team2                      |player_of_match|
+----------+---------------------------+---------------------------+---------------+
|18-04-2008|Royal Challengers Bangalore|Kolkata Knight Riders      |46             |
|05-04-2017|Sunrisers Hyderabad        |Royal Challengers Bangalore|15             |
|06-04-2017|Rising Pune Supergiant     |Mumbai Indians             |36             |
|07-04-2017|Gujarat Lions              |Kolkata Knight Riders      |57             |
|08-04-2017|Punjab Kings               |Rising Pune Supergiant     |71             |
|08-04-2017|Royal Challengers Bangalore|Delhi Capitals             |4              |
|09-04-2017|Sunrisers Hyderabad        |Gujarat Lions              |22             |
|09-04-2017|Mumbai Indians             |Kolkata Knight Riders      |26             |
|10-04-2017|Punjab Kings               |Royal Challengers Bangalo

### 3.Semi Join (Only players who ever got POM)

In [None]:
players_with_pom = players.join(
    matches,
    players.player_k == matches.pom_k,
    "left_semi"
)

players_with_pom.show(10)


+---------+-----------+---------+----------+---------+----------------+------------+------------+--------+
|player_id|player_name|bat_style|bowl_style|field_pos|player_full_name|player_name2|player_image|player_k|
+---------+-----------+---------+----------+---------+----------------+------------+------------+--------+
+---------+-----------+---------+----------+---------+----------------+------------+------------+--------+



### 4.Multi-Condition Join (team + season)

In [None]:
season_team_join = matches.join(
    teams,
    (upper(trim(matches.team1)) == upper(trim(teams.team_name))),
    "left"
)

season_team_join.show(5)


+--------+---------+--------------+---------+----------+--------------------+------------+------+----------+-------------------+-----+------+---------+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+-----------+--------------+---------------+------+-----+-----+-------+--------------------+---------------+--------------------+--------------------+
|match_id|season_id|balls_per_over|     city|match_date|          event_name|match_number|gender|match_type|             format|overs|season|team_type|               venue|         toss_winner|               team1|               team2|toss_decision|        match_winner|win_by_runs|win_by_wickets|player_of_match|result|stage|pom_k|team_id|           team_name|team_name_short|           image_url|              team_k|
+--------+---------+--------------+---------+----------+--------------------+------------+------+----------+-------------------+-----+------+---------+---------

### 5.Aggregation after join (Top winning teams)

In [None]:
from pyspark.sql import functions as F

top_teams = matches.groupBy("match_winner") \
                   .agg(F.count("*").alias("wins")) \
                   .orderBy(F.desc("wins"))

top_teams.show(10,truncate=False)


+---------------------------+----+
|match_winner               |wins|
+---------------------------+----+
|Mumbai Indians             |153 |
|Chennai Super Kings        |142 |
|Kolkata Knight Riders      |135 |
|Royal Challengers Bangalore|133 |
|Sunrisers Hyderabad        |125 |
|Punjab Kings               |123 |
|Delhi Capitals             |121 |
|Rajasthan Royals           |116 |
|Gujarat Titans             |37  |
|Lucknow Super Giants       |30  |
+---------------------------+----+
only showing top 10 rows



# **38.Real-Time Analysis Basics in PySpark (Structured Streaming)**

### 1.Create a Streaming DataFrame (Socket Source)

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Day38_Streaming").getOrCreate()

stream_df = spark.readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

stream_df.printSchema()


root
 |-- value: string (nullable = true)



###Real-Time Word Count (classic streaming test)

In [None]:
from pyspark.sql.functions import explode, split

words = stream_df.select(
    explode(split(stream_df.value, " ")).alias("word")
)

word_count = words.groupBy("word").count()


In [None]:
word_count.writeStream \
    .format("console") \
    .outputMode("complete") \
    .start() \
    .awaitTermination()


###Real-Time Aggregation with Sliding Windows

In [None]:
from pyspark.sql.functions import window, current_timestamp

windowed_counts = stream_df \
    .withColumn("timestamp", current_timestamp()) \
    .withWatermark("timestamp", "1 minute") \
    .groupBy(\
        window("timestamp", "30 seconds", "10 seconds")\
    ) \
    .count()

###Reading Real-Time Data from Files (Auto-Refresh Directory)

In [None]:
file_stream = spark.readStream \
    .format("csv") \
    .option("header", True) \
    .schema(schema) \
    .load("/content/input_folder/")


### Writing Streaming Output to Delta / Parquet

In [None]:
query = windowed_counts.writeStream \
    .format("parquet") \
    .option("path", "/content/output/") \
    .option("checkpointLocation", "/content/checkpoints/") \
    .outputMode("append") \
    .start()


# **39.Real-Time Analysis Basics with IPL Dataset (PySpark)**

###Real-Time Ingestion of IPL Ball-by-Ball Records

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark=SparkSession.builder.appName("IPL_Analytics").getOrCreate()
matches_df=spark.read.csv("/content/ipl_matches_data.csv", header=True, inferSchema=True)
players_df=spark.read.csv("/content/players-data-updated.csv", header=True, inferSchema=True)
teams_df=spark.read.csv("/content/teams_data.csv", header=True, inferSchema=True)
ball_by_ball_df=spark.read.csv("/content/ball_by_ball_data.csv", header=True, inferSchema=True)
ipl_data=matches_df.join(players_df, matches_df["player_of_match"] == players_df["player_id"], "left")\
                   .join(teams_df, matches_df["team1"] == teams_df["team_name"], "left")\
                   .join(ball_by_ball_df, matches_df["match_id"]== ball_by_ball_df["match_id"], "left")
stream_df= spark.readStream\
        .schema(ball_by_ball_df.schema)\
        .csv("/content/ball_by_ball_data.csv")
stream_df.printSchema()

root
 |-- season_id: integer (nullable = true)
 |-- match_id: integer (nullable = true)
 |-- batter: string (nullable = true)
 |-- bowler: string (nullable = true)
 |-- non_striker: string (nullable = true)
 |-- team_batting: string (nullable = true)
 |-- team_bowling: string (nullable = true)
 |-- over_number: integer (nullable = true)
 |-- ball_number: integer (nullable = true)
 |-- batter_runs: integer (nullable = true)
 |-- extras: integer (nullable = true)
 |-- total_runs: integer (nullable = true)
 |-- batsman_type: string (nullable = true)
 |-- bowler_type: string (nullable = true)
 |-- player_out: string (nullable = true)
 |-- fielders_involved: string (nullable = true)
 |-- is_wicket: string (nullable = true)
 |-- is_wide_ball: string (nullable = true)
 |-- is_no_ball: boolean (nullable = true)
 |-- is_leg_bye: boolean (nullable = true)
 |-- is_bye: boolean (nullable = true)
 |-- is_penalty: boolean (nullable = true)
 |-- wide_ball_runs: string (nullable = true)
 |-- no_ball_r

### Real-Time Run Rate Monitoring per Over

In [None]:
real_time_df=stream_df.groupBy("match_id","innings","over_number")\
  .agg(sum("total_runs").alias("runs_this_over"))


###Real-Time Wicket Alert

In [None]:
Wicket_alert = stream_df.filter(col("player_out").isNotNull())

### Real-Time Batsman performance Tracker

In [None]:
batsman_Live=stream_df.groupBy("batter") \
      .agg(
          sum("total_runs").alias("runs"),
          count("ball_number").alias("balls")
      )

###Real-Time Match Momentum Score (Custom KPI)

In [None]:
momentum_df = stream_df.groupBy("match_id") \
    .agg(
        sum("total_runs").alias("runs"),
        count("player_out").alias("wkts"),
        max("over_number").alias("overs")
    )

# To see the real-time output, you need to start a streaming query


# **40.Advanced IPL Data Transformations in PySpark**

### 1 - Create a clean player Performance Table

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark=SparkSession.builder.appName("IPL_Performance").getOrCreate()
matches_df=spark.read.csv("/content/ipl_matches_data.csv", header=True, inferSchema=True)
players_df=spark.read.csv("/content/players-data-updated.csv", header=True, inferSchema=True)
teams_df=spark.read.csv("/content/teams_data.csv", header=True, inferSchema=True)
ball_by_ball_df=spark.read.csv("/content/ball_by_ball_data.csv", header=True, inferSchema=True)
ipl_data=matches_df.join(players_df, matches_df["player_of_match"] == players_df["player_id"], "left")\
                   .join(teams_df, matches_df["team1"] == teams_df["team_name"], "left")\
                   .join(ball_by_ball_df, matches_df["match_id"]== ball_by_ball_df["match_id"], "left")
player_stats= ball_by_ball_df.groupBy("batter").agg(
    sum("batter_runs").alias("Total_runs"),
    count("ball_number").alias("Faced_balls"),
    round((sum("batter_runs") / count("ball_number"))*100,2).alias("Strike_rate")
)
player_stats.show()

+---------------+----------+-----------+-----------+
|         batter|Total_runs|Faced_balls|Strike_rate|
+---------------+----------+-----------+-----------+
|  Kuldeep Yadav|       201|        240|      83.75|
|     S Anirudha|       136|        121|      112.4|
|     TM Dilshan|      1153|       1047|     110.12|
|     KA Pollard|      3437|       2447|     140.46|
|   M Theekshana|        17|         34|       50.0|
| M Muralitharan|        20|         30|      66.67|
|  LA Carseldine|        81|         71|     114.08|
|    SS Cottrell|         0|          2|        0.0|
|      CA Ingram|       205|        183|     112.02|
| R Sanjay Yadav|         0|          2|        0.0|
|       DR Smith|      2385|       1803|     132.28|
|     A Flintoff|        62|         57|     108.77|
|       M Manhas|       514|        486|     105.76|
|      GR Napier|        15|         16|      93.75|
|       AR Patel|      1916|       1489|     128.68|
|       SA Yadav|      4311|       2961|     1

### 2-Team-wise Boundary (4s and 6s) Analysis

In [None]:
boundaries = ipl_data.groupBy("team_batting").agg(
    sum(when(ipl_data.batter_runs ==4,1).otherwise(0)).alias("Fours"),
    sum(when(ipl_data.batter_runs ==6,1).otherwise(0)).alias("Sixes")
)
boundaries.show(truncate=True)

+--------------------+-----+-----+
|        team_batting|Fours|Sixes|
+--------------------+-----+-----+
| Sunrisers Hyderabad| 3581| 1562|
|Lucknow Super Giants|  795|  484|
| Chennai Super Kings| 3395| 1611|
|      Gujarat Titans|  947|  395|
|                NULL|    0|    0|
|Rising Pune Super...|  368|  157|
|Kochi Tuskers Kerala|  170|   53|
|    Rajasthan Royals| 3302| 1383|
|       Gujarat Lions|  460|  155|
|Royal Challengers...| 3617| 1778|
|Kolkata Knight Ri...| 3634| 1596|
|        Punjab Kings| 3681| 1688|
|       Pune Warriors|  525|  196|
|      Delhi Capitals| 3729| 1461|
|      Mumbai Indians| 3897| 1828|
+--------------------+-----+-----+



### 3-Most Economical Bowlers (Full IPL Dataset)

In [None]:
eco_bowler = ipl_data.groupBy("bowler").agg(
    sum("total_runs").alias("Runs_Given"),
    count("ball_number").alias("Balls_Bowled")
).withColumn(
    "Economy", round(col("Runs_Given") / (col("Balls_Bowled")/6), 2)
)

eco_bowler.orderBy("Economy").show(20)

+---------------+----------+------------+-------+
|         bowler|Runs_Given|Balls_Bowled|Economy|
+---------------+----------+------------+-------+
|           NULL|      NULL|           0|   NULL|
|   AC Gilchrist|         0|           1|    0.0|
|     R Ravindra|         7|          12|    3.5|
|       NB Singh|        18|          25|   4.32|
|    Sachin Baby|         8|          10|    4.8|
|      AM Rahane|         5|           6|    5.0|
|  LA Carseldine|         6|           7|   5.14|
|      SS Mundhe|         6|           7|   5.14|
|    DJ Thornely|        40|          44|   5.45|
|       M Manhas|        42|          42|    6.0|
|      DA Warner|         2|           2|    6.0|
|       MW Short|        25|          25|    6.0|
|  Sohail Tanvir|       275|         265|   6.23|
|     A Chandila|       245|         234|   6.28|
|     FH Edwards|       160|         150|    6.4|
|SMSM Senanayake|       211|         195|   6.49|
|    JW Hastings|        66|          61|   6.49|


### 4.Match Stage Classification (Powerplay / Middle / Death)

In [None]:
from pyspark.sql.functions import when

stage_df = ipl_data.withColumn(
    "Stage",
    when(col("over_number") <= 6, "Powerplay")
    .when(col("over_number") <= 15, "Middle Overs")
    .otherwise("Death Overs")
)

stage_df.groupBy("Stage").agg(sum("total_runs").alias("Runs")).show()

+------------+------+
|       Stage|  Runs|
+------------+------+
| Death Overs| 80588|
|   Powerplay|127556|
|Middle Overs|166017|
+------------+------+



### 5.Create a Player vs Team Performance Matrix

In [None]:
player_team_matrix = ipl_data.groupBy("batter", "team_bowling").agg(
    sum("batter_runs").alias("Runs_Scored")
)

player_team_matrix.show(20)

+----------------+--------------------+-----------+
|          batter|        team_bowling|Runs_Scored|
+----------------+--------------------+-----------+
|       SP Narine|    Rajasthan Royals|        268|
|         M Vohra|        Punjab Kings|         12|
|      D Padikkal|Royal Challengers...|        115|
|    F du Plessis| Chennai Super Kings|        197|
|     Mohsin Khan|        Punjab Kings|         15|
|DPMD Jayawardene|Royal Challengers...|        140|
|    DB Ravi Teja|Kolkata Knight Ri...|         64|
|      SD Chitnis|        Punjab Kings|         15|
|       AA Chavan|      Mumbai Indians|          4|
|     RJ Peterson|       Pune Warriors|         13|
|       CM Gautam| Chennai Super Kings|         20|
|         A Zampa|      Delhi Capitals|          5|
|   Kuldeep Yadav|Royal Challengers...|         39|
|       AT Rayudu|    Rajasthan Royals|        377|
|       DR Shorey|Royal Challengers...|          8|
|         B Kumar| Chennai Super Kings|         39|
|         AD

# **41.Window Functions in PySpark (IPL Dataset)**

### Import libraries and Load data

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
spark=SparkSession.builder.appName("IPL_Performance").getOrCreate()
matches_df=spark.read.csv("/content/ipl_matches_data.csv", header=True, inferSchema=True)
players_df=spark.read.csv("/content/players-data-updated.csv", header=True, inferSchema=True)
teams_df=spark.read.csv("/content/teams_data.csv", header=True, inferSchema=True)
ball_by_ball_df=spark.read.csv("/content/ball_by_ball_data.csv", header=True, inferSchema=True)
ipl_data=matches_df.join(players_df, matches_df["player_of_match"] == players_df["player_id"], "left")\
                   .join(teams_df, matches_df["team1"] == teams_df["team_name"], "left")\
                   .join(ball_by_ball_df.withColumnRenamed("match_id", "ball_by_ball_match_id"), matches_df["match_id"]== col("ball_by_ball_match_id"), "left")

### 1. Ranking Teams by Wins(Dense_Rank)

In [5]:
Wins = (ipl_data.groupBy("season","match_winner")
                .agg(count("*").alias("Total_wins")))

w = Window.partitionBy("season").orderBy(col("Total_wins").desc())

result = Wins.withColumn("rank", dense_rank().over(w))
result.show()

+------+--------------------+----------+----+
|season|        match_winner|Total_wins|rank|
+------+--------------------+----------+----+
|  2008|    Rajasthan Royals|      3094|   1|
|  2008|        Punjab Kings|      2303|   2|
|  2008| Chennai Super Kings|      2127|   3|
|  2008|      Delhi Capitals|      1641|   4|
|  2008|      Mumbai Indians|      1509|   5|
|  2008|Kolkata Knight Ri...|      1400|   6|
|  2008|Royal Challengers...|       977|   7|
|  2008| Sunrisers Hyderabad|       438|   8|
|  2009|      Delhi Capitals|      2305|   1|
|  2009|Royal Challengers...|      2166|   2|
|  2009| Sunrisers Hyderabad|      2164|   3|
|  2009| Chennai Super Kings|      1869|   4|
|  2009|        Punjab Kings|      1744|   5|
|  2009|    Rajasthan Royals|      1474|   6|
|  2009|      Mumbai Indians|      1205|   7|
|  2009|Kolkata Knight Ri...|       679|   8|
|  2010|      Mumbai Indians|      2704|   1|
|  2010| Chennai Super Kings|      2181|   2|
|  2010| Sunrisers Hyderabad|     

### 2.Player of Match Count Ranking

In [6]:
player_of_Match = (ipl_data.groupBy("player_of_Match")
                            .agg(count("*").alias("Player_of_Match_Count")))

w = Window.orderBy(col("Player_of_Match_Count").desc())

player_of_Match = player_of_Match.withColumn("rank", dense_rank().over(w))
player_of_Match.show()

+---------------+---------------------+----+
|player_of_Match|Player_of_Match_Count|rank|
+---------------+---------------------+----+
|            245|                 6170|   1|
|              1|                 5186|   2|
|             25|                 5134|   3|
|             38|                 4384|   4|
|             12|                 4329|   5|
|            332|                 4302|   6|
|             65|                 4018|   7|
|           1437|                 3905|   8|
|            348|                 3864|   9|
|              5|                 3807|  10|
|             60|                 3735|  11|
|           1409|                 3587|  12|
|             29|                 3468|  13|
|             24|                 3429|  14|
|             47|                 3400|  15|
|             34|                 3168|  16|
|             56|                 3107|  17|
|          11943|                 2905|  18|
|           1580|                 2904|  19|
|         

### 3. Running Total matches Each Season

In [11]:
w = Window.partitionBy("season").orderBy("match_id") \
          .rowsBetween(Window.unboundedPreceding, Window.currentRow)

ipl_running = ipl_data.withColumn("match_running_total", count("*").over(w))
ipl_running.select("season", "match_id", "match_running_total").show()

+------+--------+-------------------+
|season|match_id|match_running_total|
+------+--------+-------------------+
|  2008|  335982|                  1|
|  2008|  335982|                  2|
|  2008|  335982|                  3|
|  2008|  335982|                  4|
|  2008|  335982|                  5|
|  2008|  335982|                  6|
|  2008|  335982|                  7|
|  2008|  335982|                  8|
|  2008|  335982|                  9|
|  2008|  335982|                 10|
|  2008|  335982|                 11|
|  2008|  335982|                 12|
|  2008|  335982|                 13|
|  2008|  335982|                 14|
|  2008|  335982|                 15|
|  2008|  335982|                 16|
|  2008|  335982|                 17|
|  2008|  335982|                 18|
|  2008|  335982|                 19|
|  2008|  335982|                 20|
+------+--------+-------------------+
only showing top 20 rows



### 4. Team Winning Percentage

In [14]:
wins = ipl_data.groupBy("season", "match_winner").agg(count("*").alias("win_count"))

w = Window.partitionBy("season")

result = wins.withColumn(
    "season_total_matches",
    sum("win_count").over(w)
).withColumn(
    "win_percentage",
    round(col("win_count") / col("season_total_matches") * 100, 2)
)

result.show()

+------+--------------------+---------+--------------------+--------------+
|season|        match_winner|win_count|season_total_matches|win_percentage|
+------+--------------------+---------+--------------------+--------------+
|  2008|Royal Challengers...|      977|               13489|          7.24|
|  2008|    Rajasthan Royals|     3094|               13489|         22.94|
|  2008| Sunrisers Hyderabad|      438|               13489|          3.25|
|  2008| Chennai Super Kings|     2127|               13489|         15.77|
|  2008|      Mumbai Indians|     1509|               13489|         11.19|
|  2008|        Punjab Kings|     2303|               13489|         17.07|
|  2008|      Delhi Capitals|     1641|               13489|         12.17|
|  2008|Kolkata Knight Ri...|     1400|               13489|         10.38|
|  2009|        Punjab Kings|     1744|               13606|         12.82|
|  2009|      Mumbai Indians|     1205|               13606|          8.86|
|  2009|    

### 5.Lead & Lag: Match Result Trend

In [16]:
team_w=Window.partitionBy("match_winner").orderBy("match_date")

trend=(
    ipl_data.withColumn("prev_match", lag("match_winner").over(team_w))
    .withColumn("next_match", lead("match_winner").over(team_w))
)
trend.select("match_winner", "prev_match", "next_match").show()

+------------+----------+----------+
|match_winner|prev_match|next_match|
+------------+----------+----------+
|        NULL|      NULL|      NULL|
|        NULL|      NULL|      NULL|
|        NULL|      NULL|      NULL|
|        NULL|      NULL|      NULL|
|        NULL|      NULL|      NULL|
|        NULL|      NULL|      NULL|
|        NULL|      NULL|      NULL|
|        NULL|      NULL|      NULL|
|        NULL|      NULL|      NULL|
|        NULL|      NULL|      NULL|
|        NULL|      NULL|      NULL|
|        NULL|      NULL|      NULL|
|        NULL|      NULL|      NULL|
|        NULL|      NULL|      NULL|
|        NULL|      NULL|      NULL|
|        NULL|      NULL|      NULL|
|        NULL|      NULL|      NULL|
|        NULL|      NULL|      NULL|
|        NULL|      NULL|      NULL|
|        NULL|      NULL|      NULL|
+------------+----------+----------+
only showing top 20 rows

