In [9]:
import os
import sys
sys.path.append(os.path.join(os.getcwd(),"..",".."))
import config, utils
from pyspark.sql import SparkSession

spark = utils.create_spark_session("merge_deliveries",{
    'spark.executor.memory': '6g',
    'spark.executor.cores': '8',
})

matches = spark.read.csv(os.path.join(config.PROCESSED_DATA_DIR, 'matches.csv'), inferSchema=True, header=True)
deliveries = spark.read.csv(os.path.join(config.PROCESSED_DATA_DIR, 'deliveries.csv'), inferSchema=True, header=True)

[[34m2024-11-16T15:36:37.790+0530[0m] {[34mutils.py:[0m12} INFO[0m - Creating Spark session.[0m
[[34m2024-11-16T15:36:37.809+0530[0m] {[34mutils.py:[0m30} INFO[0m - Spark session created successfully.[0m


                                                                                

In [10]:
matches = matches.drop('date','city','toss_winner','toss_decision')
matches.show(5)

+--------+-----------+-----------+------+-------+-----------+
|match_id|      team1|      team2|gender| season|     winner|
+--------+-----------+-----------+------+-------+-----------+
| 1321267|    Croatia|     Greece|  male|   2022|    Croatia|
| 1299583|Philippines|    Germany|  male|2021/22|    Germany|
| 1192876|  Sri Lanka|New Zealand|  male|   2019|New Zealand|
| 1456442|New Zealand|  Sri Lanka|  male|2024/25|New Zealand|
| 1179610|    Namibia|   Botswana|female|   2019|    Namibia|
+--------+-----------+-----------+------+-------+-----------+
only showing top 5 rows



In [11]:
deliveries = deliveries.drop('season','start_date','venue','striker','non_striker','bowler')
deliveries.show(5)

+--------+-------+----+------------+------------+------------+------+-----+-------+----+-------+-------+-----------+----------------+-----------------+----------------------+
|match_id|innings|ball|batting_team|bowling_team|runs_off_bat|extras|wides|noballs|byes|legbyes|penalty|wicket_type|player_dismissed|other_wicket_type|other_player_dismissed|
+--------+-------+----+------------+------------+------------+------+-----+-------+----+-------+-------+-----------+----------------+-----------------+----------------------+
| 1306389|      1| 0.1|     Bahrain|Saudi Arabia|           0|     0|    0|      0|   0|      0|      0|          0|               0|                0|                     0|
| 1306389|      1| 0.2|     Bahrain|Saudi Arabia|           0|     1|    1|      0|   0|      0|      0|          0|               0|                0|                     0|
| 1306389|      1| 0.3|     Bahrain|Saudi Arabia|           0|     1|    1|      0|   0|      0|      0|          0|         

In [12]:
from pyspark.sql import Window
from pyspark.sql.functions import coalesce, col, lit, sum as F_sum

# Calculate "runs" as the row-wise sum of specified columns
deliveries = deliveries.withColumn(
    "runs",
    coalesce(col("runs_off_bat"), lit(0)) +
    coalesce(col("extras"), lit(0)) +
    coalesce(col("wides"), lit(0)) +
    coalesce(col("noballs"), lit(0)) +
    coalesce(col("byes"), lit(0)) +
    coalesce(col("legbyes"), lit(0)) +
    coalesce(col("penalty"), lit(0))
)

# Drop the original columns that were summed
deliveries = deliveries.drop("runs_off_bat", "extras", "wides", "noballs", "byes", "legbyes", "penalty")

# Calculate "wickets" as the row-wise sum of dismissals, handling null values
deliveries = deliveries.withColumn(
    "wickets",
    (coalesce(col("player_dismissed").cast("int"), lit(0)) +
     coalesce(col("other_player_dismissed").cast("int"), lit(0)))
)

# Drop columns related to wicket types and dismissed players that are no longer needed
deliveries = deliveries.drop("wicket_type", "player_dismissed", "other_wicket_type", "other_player_dismissed")
deliveries.show(5)

+--------+-------+----+------------+------------+----+-------+
|match_id|innings|ball|batting_team|bowling_team|runs|wickets|
+--------+-------+----+------------+------------+----+-------+
| 1306389|      1| 0.1|     Bahrain|Saudi Arabia|   0|      0|
| 1306389|      1| 0.2|     Bahrain|Saudi Arabia|   2|      0|
| 1306389|      1| 0.3|     Bahrain|Saudi Arabia|   2|      0|
| 1306389|      1| 0.4|     Bahrain|Saudi Arabia|   4|      0|
| 1306389|      1| 0.5|     Bahrain|Saudi Arabia|   0|      0|
+--------+-------+----+------------+------------+----+-------+
only showing top 5 rows



In [13]:
#Define the window specifications for cumulative sums partitioned by "match_id" and "innings"
window_spec = Window.partitionBy("match_id", "innings").orderBy("ball")

# Calculate cumulative sum for "runs" as "curr_score"
deliveries = deliveries.withColumn(
    "curr_score",
    F_sum("runs").over(window_spec)
)

# Calculate cumulative sum for "wickets" as "curr_wickets"
deliveries = deliveries.withColumn(
    "curr_wickets",
    F_sum("wickets").over(window_spec)
)

# Calculate the required columns for the final DataFrame


# Display the resulting DataFrame
deliveries.show(250)

+--------+-------+----+------------+------------+----+-------+----------+------------+
|match_id|innings|ball|batting_team|bowling_team|runs|wickets|curr_score|curr_wickets|
+--------+-------+----+------------+------------+----+-------+----------+------------+
|  211048|      2| 0.1| New Zealand|   Australia|   0|      0|         0|           0|
|  211048|      2| 0.2| New Zealand|   Australia|   0|      0|         0|           0|
|  211048|      2| 0.3| New Zealand|   Australia|   1|      0|         1|           0|
|  211048|      2| 0.4| New Zealand|   Australia|   0|      0|         1|           0|
|  211048|      2| 0.5| New Zealand|   Australia|   1|      0|         2|           0|
|  211048|      2| 0.6| New Zealand|   Australia|   0|      0|         2|           0|
|  211048|      2| 1.1| New Zealand|   Australia|   4|      0|         6|           0|
|  211048|      2| 1.2| New Zealand|   Australia|   1|      0|         7|           0|
|  211048|      2| 1.3| New Zealand|   Aust

                                                                                

In [14]:
data=deliveries.join(matches,on='match_id').drop('season','venue','gender')
data.sort('match_id').show(10)



+--------+-------+----+------------+------------+----+-------+----------+------------+-------+---------+-------+
|match_id|innings|ball|batting_team|bowling_team|runs|wickets|curr_score|curr_wickets|  team1|    team2| winner|
+--------+-------+----+------------+------------+----+-------+----------+------------+-------+---------+-------+
|  211028|      1| 1.3|     England|   Australia|   2|      0|         7|           0|England|Australia|England|
|  211028|      1| 0.6|     England|   Australia|   2|      0|         3|           0|England|Australia|England|
|  211028|      1| 1.2|     England|   Australia|   0|      0|         5|           0|England|Australia|England|
|  211028|      1| 0.3|     England|   Australia|   0|      0|         1|           0|England|Australia|England|
|  211028|      1| 0.5|     England|   Australia|   0|      0|         1|           0|England|Australia|England|
|  211028|      1| 0.7|     England|   Australia|   2|      0|         5|           0|England|Au

                                                                                

In [15]:
from pyspark.sql import functions as F

# Create data1 by adding the 'flip' column as a literal 0
data1 = data.withColumn("flip", F.lit(0))

# Swap the columns team1 and team2 for data2
data2 = data.withColumnRenamed("team1", "team_temp") \
            .withColumnRenamed("team2", "team1") \
            .withColumnRenamed("team_temp", "team2")

# Select necessary columns for data2
data2 = data2.select('match_id', 'innings', 'ball', 'runs', 'wickets', 'batting_team', 'bowling_team', 
                     'curr_score', 'curr_wickets', 'team1', 'team2', 'winner')

# Add the 'flip' column as a literal 1 to data2
data2 = data2.withColumn("flip", F.lit(1))


# Concatenate data1 and data2, sort by match_id, and add 'won' column based on team1 winning
data_combined = data1.unionByName(data2).sort('match_id') \
    .withColumn("won", F.when(F.col('winner') == F.col('team1'), 1).otherwise(0))

# Select the final columns
data = data_combined.select('match_id', 'flip', 'innings', 'ball', 'runs', 'wickets','curr_score', 'curr_wickets', 'won')

data = data.sort('match_id','flip', 'innings', 'ball')

# Show the resulting DataFrame
data.show(250)



+--------+----+-------+----+----+-------+----------+------------+---+
|match_id|flip|innings|ball|runs|wickets|curr_score|curr_wickets|won|
+--------+----+-------+----+----+-------+----------+------------+---+
|  211028|   0|      1| 0.1|   0|      0|         0|           0|  1|
|  211028|   0|      1| 0.2|   1|      0|         1|           0|  1|
|  211028|   0|      1| 0.3|   0|      0|         1|           0|  1|
|  211028|   0|      1| 0.4|   0|      0|         1|           0|  1|
|  211028|   0|      1| 0.5|   0|      0|         1|           0|  1|
|  211028|   0|      1| 0.6|   2|      0|         3|           0|  1|
|  211028|   0|      1| 0.7|   2|      0|         5|           0|  1|
|  211028|   0|      1| 1.1|   0|      0|         5|           0|  1|
|  211028|   0|      1| 1.2|   0|      0|         5|           0|  1|
|  211028|   0|      1| 1.3|   2|      0|         7|           0|  1|
|  211028|   0|      1| 1.4|   0|      0|         7|           0|  1|
|  211028|   0|     

                                                                                

In [28]:
# x=data
data = x

In [29]:
window_spec = Window.partitionBy("match_id","flip").orderBy("flip", "innings", "ball")
window_spec_ffill = Window.partitionBy("match_id").orderBy("flip", "innings", "ball").rowsBetween(Window.unboundedPreceding, 0)

# Calculate the max of "curr_score" in 1st innings as "target" otherwise forward fill
data = data.withColumn(
    "target",
    F.when(
        (F.col("innings") == 1) & (F.col("curr_score") == F.max("curr_score").over(window_spec)),
        F.col("curr_score")
    ).otherwise(F.lit(None))
)
data = data.withColumn("overs", col("ball").cast("int"))
data = data.withColumn("run_rate",
                       F.when(F.col("overs")!=0,
                       col("curr_score")/col("overs")
                       ).otherwise(0).cast("float"))

# Forward fill the "target" column
data = data.withColumn("target", F.last("target", ignorenulls=True).over(window_spec_ffill))
data = data.withColumn("target", F.when(col("innings") == 1, 0).otherwise(col("target"))).orderBy(col("match_id"),col("flip"),col("innings"), col("ball"))

data.show(450)



+--------+----+-------+----+----+-------+----------+------------+---+------+-----+---------+
|match_id|flip|innings|ball|runs|wickets|curr_score|curr_wickets|won|target|overs| run_rate|
+--------+----+-------+----+----+-------+----------+------------+---+------+-----+---------+
|  211028|   0|      1| 0.1|   0|      0|         0|           0|  1|     0|    0|      0.0|
|  211028|   0|      1| 0.2|   1|      0|         1|           0|  1|     0|    0|      0.0|
|  211028|   0|      1| 0.3|   0|      0|         1|           0|  1|     0|    0|      0.0|
|  211028|   0|      1| 0.4|   0|      0|         1|           0|  1|     0|    0|      0.0|
|  211028|   0|      1| 0.5|   0|      0|         1|           0|  1|     0|    0|      0.0|
|  211028|   0|      1| 0.6|   2|      0|         3|           0|  1|     0|    0|      0.0|
|  211028|   0|      1| 0.7|   2|      0|         5|           0|  1|     0|    0|      0.0|
|  211028|   0|      1| 1.1|   0|      0|         5|           0|  1| 

                                                                                

In [30]:
utils.spark_save_data(data,config.FILTERED_DATA_DIR, "ball_by_ball_flip.csv")
spark.stop()

24/11/16 16:06:40 ERROR TaskSchedulerImpl: Lost executor 1 on 192.168.245.142: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
24/11/16 16:06:40 WARN TaskSetManager: Lost task 0.0 in stage 253.0 (TID 882) (192.168.245.142 executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
24/11/16 16:06:40 WARN TaskSetManager: Lost task 2.0 in stage 253.0 (TID 884) (192.168.245.142 executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
24/11/16 16:06:40 WARN TaskSetManager: Lost task 5.0 in stage 253.0 (TID 887) (192.168.245.142 executor 1): ExecutorLostFailure (ex

[[34m2024-11-16T16:06:54.277+0530[0m] {[34mjava_gateway.py:[0m1066} [31mERROR[0m - [31mKeyboardInterrupt while sending command.[0m
Traceback (most recent call last):
  File "/home/ravikumar/miniconda3/envs/t20i/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ravikumar/miniconda3/envs/t20i/lib/python3.12/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ravikumar/miniconda3/envs/t20i/lib/python3.12/socket.py", line 720, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt
[[34m2024-11-16T16:06:54.290+0530[0m] {[34mclientserver.py:[0m543} INFO[0m - Closing down clientserver connection[0m


KeyboardInterrupt: 