In [0]:
commentary_df = spark.read.csv("dbfs:/FileStore/shared_uploads/girish792004@gmail.com/ipl_commentary_data-1.csv", header=True, inferSchema=True)

In [0]:
print(commentary_df.columns)

['year', 'series_type', 'series_name', 'match_no', 'match_type', 'match_id', 'match_venue', 'match_status', 'match_winning_team', 'match_tie_breaker', 'match_toss', 'umpires', 'match_referee', 'third_umpires', 'match_datetime', 'team1_name', 'team2_name', 'team1_score', 'team1_wickets', 'team2_score', 'team2_wickets', 'team1_captain', 'team1_players', 'team1_bench', 'team1_support_staff', 'team2_captain', 'team2_players', 'team2_bench', 'team2_support_staff', 'ball_no', 'over_no', 'ball_commentary', 'runs', 'bowler', 'batsman']


In [0]:
commentary_df.select("ball_no", "bowler", "batsman", "runs").show(truncate=False)

+-------+--------+----------+----+
|ball_no|bowler  |batsman   |runs|
+-------+--------+----------+----+
|1      |Siraj   |Rohit     |2   |
|2      |Siraj   |Rohit     |0   |
|3      |Siraj   |Rohit     |0   |
|4      |Siraj   |Rohit     |2   |
|5      |Siraj   |Rohit     |0   |
|6      |Siraj   |Rohit     |1   |
|7      |Jamieson|Rohit     |1   |
|8      |Jamieson|Chris Lynn|0   |
|9      |Jamieson|Chris Lynn|0   |
|10     |Jamieson|Chris Lynn|0   |
|11     |Jamieson|Chris Lynn|0   |
|12     |Jamieson|Chris Lynn|0   |
|13     |Siraj   |Rohit     |0   |
|14     |Siraj   |Rohit     |0   |
|15     |Siraj   |Rohit     |0   |
|16     |Siraj   |Rohit     |0   |
|17     |Siraj   |Rohit     |4   |
|18     |Siraj   |Rohit     |2   |
|19     |Chahal  |Chris Lynn|0   |
|20     |Chahal  |Chris Lynn|4   |
+-------+--------+----------+----+
only showing top 20 rows



In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

commentary_df = commentary_df.withColumn("runs", col("runs").cast(IntegerType()))


In [0]:
commentary_df.groupBy("batsman").sum("runs").orderBy("sum(runs)", ascending=False).show()


+----------------+---------+
|         batsman|sum(runs)|
+----------------+---------+
|           Kohli|     5933|
|Suryakumar Yadav|     5073|
|     de Villiers|     4575|
|    Ishan Kishan|     4499|
|      Chris Lynn|     4471|
|           Rahul|     4072|
|         Maxwell|     4070|
|           Rohit|     3993|
|          Dhawan|     3864|
|         Buttler|     3715|
|          Samson|     3586|
|      du Plessis|     3477|
|    Shubman Gill|     3375|
|          Warner|     3310|
|            Pant|     3309|
|   Hardik Pandya|     3253|
|    Shreyas Iyer|     3071|
|     Nitish Rana|     2733|
|         Karthik|     2712|
|         de Kock|     2604|
+----------------+---------+
only showing top 20 rows



In [0]:
import re

def parse_commentary(text):
    try:
        match = re.match(r"(.+?) to (.+?),", text)
        bowler = match.group(1).strip() if match else None
        batsman = match.group(2).strip() if match else None
        
        text = text.lower()
        if "six" in text:
            runs = 6
        elif "four" in text:
            runs = 4
        elif "no run" in text:
            runs = 0
        else:
            run_match = re.search(r"(\d+) run", text)
            runs = int(run_match.group(1)) if run_match else 0

        return (runs, bowler, batsman)
    except:
        return (None, None, None)


In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
    StructField("runs", IntegerType(), True),
    StructField("bowler", StringType(), True),
    StructField("batsman", StringType(), True),
])

parse_udf = udf(parse_commentary, schema)


In [0]:
commentary_df = commentary_df.withColumn("parsed", parse_udf("ball_commentary"))

# Split into separate columns
commentary_df = commentary_df.withColumn("runs", commentary_df["parsed.runs"]) \
                             .withColumn("bowler", commentary_df["parsed.bowler"]) \
                             .withColumn("batsman", commentary_df["parsed.batsman"]) \
                             .drop("parsed")


In [0]:
commentary_df.select("ball_no", "bowler", "batsman", "runs").show(20)


+-------+--------+----------+----+
|ball_no|  bowler|   batsman|runs|
+-------+--------+----------+----+
|      1|   Siraj|     Rohit|   2|
|      2|   Siraj|     Rohit|   0|
|      3|   Siraj|     Rohit|   0|
|      4|   Siraj|     Rohit|   2|
|      5|   Siraj|     Rohit|   0|
|      6|   Siraj|     Rohit|   1|
|      7|Jamieson|     Rohit|   1|
|      8|Jamieson|Chris Lynn|   0|
|      9|Jamieson|Chris Lynn|   0|
|     10|Jamieson|Chris Lynn|   0|
|     11|Jamieson|Chris Lynn|   0|
|     12|Jamieson|Chris Lynn|   0|
|     13|   Siraj|     Rohit|   0|
|     14|   Siraj|     Rohit|   0|
|     15|   Siraj|     Rohit|   0|
|     16|   Siraj|     Rohit|   0|
|     17|   Siraj|     Rohit|   4|
|     18|   Siraj|     Rohit|   2|
|     19|  Chahal|Chris Lynn|   0|
|     20|  Chahal|Chris Lynn|   4|
+-------+--------+----------+----+
only showing top 20 rows

