In [1]:
import os
import pandas as pd
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, BooleanType, DateType, DecimalType
from pyspark.sql.functions import col, when, sum, avg, row_number 
from pyspark.sql.window import Window

In [2]:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "sp24-i535-shahhh-cricanalysis-7b69ab52366d.json"

In [3]:
from google.cloud import storage
storage_client = storage.Client()
bucket_name = "cric_data"

In [4]:
from google.cloud import storage
def list_blobs (bucket_name) :
    storage_client = storage. Client()
    blobs = storage_client. list_blobs (bucket_name)
    for blob in blobs:
        print (blob. name)

In [5]:
list_blobs(bucket_name)

circ_data/data/ball_by_ball.csv
circ_data/data/match.csv
circ_data/data/player.csv
circ_data/data/player_match.csv
circ_data/data/team.csv
circ_data/datapackage.json
circ_data/original/Ball_By_Ball.csv
circ_data/original/Match.csv
circ_data/original/Player.csv
circ_data/original/Player_match.csv
circ_data/original/Team.csv
circ_data/original/workspace


In [6]:
from pyspark.sql import SparkSession 

spark = SparkSession.builder.appName("Cric_Analysis").getOrCreate()

24/04/29 06:33:08 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [7]:
ball_by_ball_df = spark.read.format("csv").option("header","true").load("gs://cric_data/circ_data/data/ball_by_ball.csv")
match_df = spark.read.format("csv").option("header","true").load("gs://cric_data/circ_data/data/match.csv")
player_df = spark.read.format("csv").option("header","true").load("gs://cric_data/circ_data/data/player.csv")
player_match_df = spark.read.format("csv").option("header","true").load("gs://cric_data/circ_data/data/player_match.csv")
team_df = spark.read.format("csv").option("header","true").load("gs://cric_data/circ_data/data/team.csv")

                                                                                

In [8]:
ball_by_ball_schema = StructType([
    StructField("match_id", IntegerType(), True),
    StructField("over_id", IntegerType(), True),
    StructField("ball_id", IntegerType(), True),
    StructField("innings_no", IntegerType(), True),
    StructField("team_batting", StringType(), True),
    StructField("team_bowling", StringType(), True),
    StructField("striker_batting_position", IntegerType(), True),
    StructField("extra_type", StringType(), True),
    StructField("runs_scored", IntegerType(), True),
    StructField("extra_runs", IntegerType(), True),
    StructField("wides", IntegerType(), True),
    StructField("legbyes", IntegerType(), True),
    StructField("byes", IntegerType(), True),
    StructField("noballs", IntegerType(), True),
    StructField("penalty", IntegerType(), True),
    StructField("bowler_extras", IntegerType(), True),
    StructField("out_type", StringType(), True),
    StructField("caught", BooleanType(), True),
    StructField("bowled", BooleanType(), True),
    StructField("run_out", BooleanType(), True),
    StructField("lbw", BooleanType(), True),
    StructField("retired_hurt", BooleanType(), True),
    StructField("stumped", BooleanType(), True),
    StructField("caught_and_bowled", BooleanType(), True),
    StructField("hit_wicket", BooleanType(), True),
    StructField("obstructingfeild", BooleanType(), True),
    StructField("bowler_wicket", BooleanType(), True),
    StructField("match_date", DateType(), True),
    StructField("season", IntegerType(), True),
    StructField("striker", IntegerType(), True),
    StructField("non_striker", IntegerType(), True),
    StructField("bowler", IntegerType(), True),
    StructField("player_out", IntegerType(), True),
    StructField("fielders", IntegerType(), True),
    StructField("striker_match_sk", IntegerType(), True),
    StructField("strikersk", IntegerType(), True),
    StructField("nonstriker_match_sk", IntegerType(), True),
    StructField("nonstriker_sk", IntegerType(), True),
    StructField("fielder_match_sk", IntegerType(), True),
    StructField("fielder_sk", IntegerType(), True),
    StructField("bowler_match_sk", IntegerType(), True),
    StructField("bowler_sk", IntegerType(), True),
    StructField("playerout_match_sk", IntegerType(), True),
    StructField("battingteam_sk", IntegerType(), True),
    StructField("bowlingteam_sk", IntegerType(), True),
    StructField("keeper_catch", BooleanType(), True),
    StructField("player_out_sk", IntegerType(), True),
    StructField("matchdatesk", DateType(), True)
])

In [9]:
match_schema = StructType([
    StructField("match_sk", IntegerType(), True),
    StructField("match_id", IntegerType(), True),
    StructField("team1", StringType(), True),
    StructField("team2", StringType(), True),
    StructField("match_date", DateType(), True),
    StructField("season_year", IntegerType(), True),
    StructField("venue_name", StringType(), True),
    StructField("city_name", StringType(), True),
    StructField("country_name", StringType(), True),
    StructField("toss_winner", StringType(), True),
    StructField("match_winner", StringType(), True),
    StructField("toss_name", StringType(), True),
    StructField("win_type", StringType(), True),
    StructField("outcome_type", StringType(), True),
    StructField("manofmach", StringType(), True),
    StructField("win_margin", IntegerType(), True),
    StructField("country_id", IntegerType(), True)
])

In [10]:
player_schema = StructType([
    StructField("player_sk", IntegerType(), True),
    StructField("player_id", IntegerType(), True),
    StructField("player_name", StringType(), True),
    StructField("dob", DateType(), True),
    StructField("batting_hand", StringType(), True),
    StructField("bowling_skill", StringType(), True),
    StructField("country_name", StringType(), True)
])

In [11]:
player_match_schema = StructType([
    StructField("player_match_sk", IntegerType(), True),
    StructField("playermatch_key", DecimalType(), True),
    StructField("match_id", IntegerType(), True),
    StructField("player_id", IntegerType(), True),
    StructField("player_name", StringType(), True),
    StructField("dob", DateType(), True),
    StructField("batting_hand", StringType(), True),
    StructField("bowling_skill", StringType(), True),
    StructField("country_name", StringType(), True),
    StructField("role_desc", StringType(), True),
    StructField("player_team", StringType(), True),
    StructField("opposit_team", StringType(), True),
    StructField("season_year", IntegerType(), True),
    StructField("is_manofthematch", BooleanType(), True),
    StructField("age_as_on_match", IntegerType(), True),
    StructField("isplayers_team_won", BooleanType(), True),
    StructField("batting_status", StringType(), True),
    StructField("bowling_status", StringType(), True),
    StructField("player_captain", StringType(), True),
    StructField("opposit_captain", StringType(), True),
    StructField("player_keeper", StringType(), True),
    StructField("opposit_keeper", StringType(), True)
])

In [12]:
team_schema = StructType([
    StructField("team_sk", IntegerType(), True),
    StructField("team_id", IntegerType(), True),
    StructField("team_name", StringType(), True)
])

In [13]:
ball_by_ball_df = spark.read.schema(ball_by_ball_schema).format("csv").option("header","true").load("gs://cric_data/circ_data/data/ball_by_ball.csv")
match_df = spark.read.schema(match_schema).format("csv").option("header","true").load("gs://cric_data/circ_data/data/match.csv")
player_df = spark.read.schema(player_schema).format("csv").option("header","true").load("gs://cric_data/circ_data/data/player.csv")
player_match_df = spark.read.schema(player_match_schema).format("csv").option("header","true").load("gs://cric_data/circ_data/data/player_match.csv")
team_df = spark.read.schema(team_schema).format("csv").option("header","true").load("gs://cric_data/circ_data/data/team.csv")

In [14]:
# Filtering to include only valid deliveries (excluding extras like wides and no balls for specific analyses)
ball_by_ball_df = ball_by_ball_df.filter((col("wides") == 0) & (col("noballs")==0))

In [15]:
# Aggregation: Calculating the total and average runs scored in each match and inning
total_and_avg_runs = ball_by_ball_df.groupBy("match_id", "innings_no").agg(
    sum("runs_scored").alias("total_runs"),
    avg("runs_scored").alias("average_runs")
)

In [16]:
# Window Function: Calculating running total of runs in each match for each over
windowSpec = Window.partitionBy("match_id","innings_no").orderBy("over_id")

ball_by_ball_df = ball_by_ball_df.withColumn(
    "running_total_runs",
    sum("runs_scored").over(windowSpec)
)

In [18]:
# Conditional Column: Flag for high impact balls (either a wicket or more than 6 runs including extras)
ball_by_ball_df = ball_by_ball_df.withColumn(
    "high_impact",
    when((col("runs_scored") + col("extra_runs") > 6) | (col("bowler_wicket") == True), True).otherwise(False)
)

In [19]:
from pyspark.sql.functions import year, month, dayofmonth, when

# Extracting year, month, and day from the match date for more detailed time-based analysis
match_df = match_df.withColumn("year", year("match_date"))
match_df = match_df.withColumn("month", month("match_date"))
match_df = match_df.withColumn("day", dayofmonth("match_date"))

# High margin win: categorizing win margins into 'high', 'medium', and 'low'
match_df = match_df.withColumn(
    "win_margin_category",
    when(col("win_margin") >= 100, "High")
    .when((col("win_margin") >= 50) & (col("win_margin") < 100), "Medium")
    .otherwise("Low")
)

match_df = match_df.withColumn(
    "toss_match_winner",
    when(col("toss_winner") == col("match_winner"), "Yes").otherwise("No")
)

match_df.show(2)

+--------+--------+--------------------+--------------------+----------+-----------+--------------------+----------+------------+--------------------+--------------------+---------+--------+------------+-----------+----------+----------+----+-----+---+-------------------+-----------------+
|match_sk|match_id|               team1|               team2|match_date|season_year|          venue_name| city_name|country_name|         toss_winner|        match_winner|toss_name|win_type|outcome_type|  manofmach|win_margin|country_id|year|month|day|win_margin_category|toss_match_winner|
+--------+--------+--------------------+--------------------+----------+-----------+--------------------+----------+------------+--------------------+--------------------+---------+--------+------------+-----------+----------+----------+----+-----+---+-------------------+-----------------+
|       0|  335987|Royal Challengers...|Kolkata Knight Ri...|2008-04-18|       2008|M Chinnaswamy Sta...| Bangalore|       Indi

In [20]:
from pyspark.sql.functions import lower, regexp_replace

# Normalize and clean player names
player_df = player_df.withColumn("player_name", lower(regexp_replace("player_name", "[^a-zA-Z0-9 ]", "")))

# Handle missing values in 'batting_hand' and 'bowling_skill' with a default 'unknown'
player_df = player_df.na.fill({"batting_hand": "unknown", "bowling_skill": "unknown"})

# Categorizing players based on batting hand
player_df = player_df.withColumn(
    "batting_style",
    when(col("batting_hand").contains("left"), "Left-Handed").otherwise("Right-Handed")
)

# Show the modified player DataFrame
player_df.show(2)

+---------+---------+-----------+----------+--------------+----------------+------------+-------------+
|player_sk|player_id|player_name|       dob|  batting_hand|   bowling_skill|country_name|batting_style|
+---------+---------+-----------+----------+--------------+----------------+------------+-------------+
|        0|        1| sc ganguly|1972-07-08| Left-hand bat|Right-arm medium|       India| Right-Handed|
|        1|        2|bb mccullum|1981-09-27|Right-hand bat|Right-arm medium| New Zealand| Right-Handed|
+---------+---------+-----------+----------+--------------+----------------+------------+-------------+
only showing top 2 rows



In [21]:
from pyspark.sql.functions import col, when, current_date, expr

# Add a 'veteran_status' column based on player age
player_match_df = player_match_df.withColumn(
    "veteran_status",
    when(col("age_as_on_match") >= 35, "Veteran").otherwise("Non-Veteran")
)

# Dynamic column to calculate years since debut
player_match_df = player_match_df.withColumn(
    "years_since_debut",
    (year(current_date()) - col("season_year"))
)

# Show the enriched DataFrame
player_match_df.show()

+---------------+---------------+--------+---------+---------------+----------+--------------+--------------------+------------+---------+--------------------+--------------------+-----------+----------------+---------------+------------------+--------------+--------------+--------------+---------------+-------------+--------------+--------------+-----------------+
|player_match_sk|playermatch_key|match_id|player_id|    player_name|       dob|  batting_hand|       bowling_skill|country_name|role_desc|         player_team|        opposit_team|season_year|is_manofthematch|age_as_on_match|isplayers_team_won|batting_status|bowling_status|player_captain|opposit_captain|player_keeper|opposit_keeper|veteran_status|years_since_debut|
+---------------+---------------+--------+---------+---------------+----------+--------------+--------------------+------------+---------+--------------------+--------------------+-----------+----------------+---------------+------------------+--------------+-----

In [66]:
# Manually collect data from the Spark DataFrame
data = ball_by_ball_df.rdd.map(lambda x: x.asDict()).collect()

                                                                                

In [67]:
pandas_df = pd.DataFrame(data)

In [83]:
pandas_df = pandas_df.drop(['team_batting', 'team_bowling', 'matchdatesk'], axis=1)

In [87]:
pandas_df.to_csv('ball_by_ball_df.csv', index=False)

In [91]:
match_data = match_df.rdd.map(lambda x: x.asDict()).collect()
pandas_df_match_data = pd.DataFrame(match_data)
pandas_df_match_data.to_csv('match_df.csv', index=False)

In [93]:
player_data = player_df.rdd.map(lambda x: x.asDict()).collect()
pandas_df_player_data = pd.DataFrame(player_data)
pandas_df_player_data.to_csv('player_df.csv', index=False)

In [95]:
player_match_data = player_match_df.rdd.map(lambda x: x.asDict()).collect()
pandas_df_player_match_data = pd.DataFrame(player_match_data)
pandas_df_player_match_data.to_csv('player_match_df.csv', index=False)

                                                                                

In [97]:
team_data = team_df.rdd.map(lambda x: x.asDict()).collect()
pandas_df_team_data = pd.DataFrame(team_data)
pandas_df_team_data.to_csv('team_df.csv', index=False)

In [70]:
from google.cloud import storage
storage_client = storage.Client()
bucket2_name = "cric_data_processed"

In [72]:
storage_client = storage.Client()
bucket = storage_client.bucket(bucket2_name)

In [88]:
blob = bucket.blob("ball_by_ball_df.csv")
blob.upload_from_filename("ball_by_ball_df.csv")

In [92]:
blob = bucket.blob("match_df.csv")
blob.upload_from_filename("match_df.csv")

In [94]:
blob = bucket.blob("player_df.csv")
blob.upload_from_filename("player_df.csv")

In [96]:
blob = bucket.blob("player_match_df.csv")
blob.upload_from_filename("player_match_df.csv")

In [98]:
blob = bucket.blob("team_df.csv")
blob.upload_from_filename("team_df.csv")