In [1]:
import warnings
import numpy as np
import pandas as pd
warnings.filterwarnings("ignore")

In [3]:

pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', None)

In [None]:
# Load Data

df_bowling = pd.read_csv("test_Bowling_Card.csv", low_memory=False)
df_batting = pd.read_csv("test_Batting_Card.csv", low_memory=False)
df_players = pd.read_csv("players_info.csv", low_memory=False)
df_matches = pd.read_csv("test_Matches_Data.csv", low_memory=False)

In [None]:
# Display Basic Info

df_bowling.info()
# df_batting.info()
# df_players.info()
# df_matches.info()

In [None]:
# Display 10 Random Rows

df_bowling.sample(n=10)
# df_batting.sample(n=10)
# df_players.sample(n=10)
# df_matches.sample(n=10)

In [None]:
# Check for missing values

print("\nMissing Values in Bowling Data:")
print(df_bowling.isnull().sum())

print("\nMissing Values in Batting Data:")
print(df_batting.isnull().sum())

print("\nMissing Values in Players Data:")
print(df_players.isnull().sum())

print("\nMissing Values in Matches Data:")
print(df_matches.isnull().sum())

In [None]:
# Summary Statistics

print("\nBowling Data Summary:")
print(df_bowling.describe())

print("\nBatting Data Summary:")
print(df_batting.describe())

print("\nPlayers Data Summary:")
print(df_players.describe())

print("\nMatches Data Summary:")
print(df_matches.describe())

In [None]:
# Checking for duplicate entries

print("\nDuplicate Rows in Bowling Data:", df_bowling.duplicated().sum())
print("Duplicate Rows in Batting Data:", df_batting.duplicated().sum())
print("Duplicate Rows in Players Data:", df_players.duplicated().sum())
print("Duplicate Rows in Matches Data:", df_matches.duplicated().sum())

In [5]:
# Data cleaning

# 1. Handle Bowling Data

# Fill missing values in dots, fours, sixes with 0 if the bowler played
df_bowling.fillna({"dots": 0, "fours": 0, "sixes": 0}, inplace=True)

In [6]:
# 2. Handle Batting Data

# Fill missing values in runs, balls, fours , sixes, strikeRate with 0 if the batter played
df_batting.fillna({"runs": 0, "balls": 0, "fours": 0, "sixes": 0, "strikeRate": 0}, inplace=True)

# Drop fielders if too many missing values
if df_batting["fielders"].isna().mean() > 0.4:
    df_batting.drop(columns=["fielders"], inplace=True)

# Drop bowler if too many missing values
if df_batting["bowler"].isna().mean() > 0.4:
    df_batting.drop(columns=["bowler"], inplace=True)

In [7]:
# 3. Handle Players Data

df_players.fillna({"dob": "Unknown", "dod": "Unknown", "batting_style": "Unknown", "bowling_style": "Unknown"}, inplace=True) # Fill missing values with "Unknown"

# Drop image_url if too many missing values
if df_players["image_url"].isna().mean() > 0.4:
    df_players.drop(columns=["image_url"], inplace=True)

# Drop image_metadata if too many missing values
if df_players["image_metadata"].isna().mean() > 0.4:
    df_players.drop(columns=["image_metadata"], inplace=True)    

# Drop Players with missing DOB or country

df_players.dropna(subset=["dob", "country_id"], inplace=True)

In [8]:
# 4. Handle Matches Data

# Fill missing innings data with "Not Played"
innings_columns = ["Innings1 Team2 Runs Scored", "Innings1 Team2 Wickets Fell", "Innings1 Team2 Extras Rec", "Innings2 Team1 Runs Scored", "Innings2 Team1 Wickets Fell", "Innings2 Team1 Extras Rec", 
                   "Innings2 Team2 Runs Scored", "Innings2 Team2 Wickets Fell", "Innings2 Team2 Extras Rec"]
for col in innings_columns:
    df_matches[col] = df_matches[col].fillna("Not Played")

df_matches.fillna({"Match Winner": "No Result", "MOM Player": "None", "Umpire 1": "Unknown", "Umpire 2": "Unknown"}, inplace=True)

# Drop referee if too many missing values
if df_matches["Match Referee"].isna().mean() > 0.4:
    df_matches.drop(columns=["Match Referee"], inplace=True)

In [None]:
# Bowling Stats Forecasting

In [None]:
# Aggregating Player Bowling Stats

agg_df = df_bowling[['Match ID', 'bowler id', 'team', 'opposition', 'innings', 'wickets', 'overs', 'balls', 'economy', 'conceded']]

# Define aggregation rules
agg_rules = {
    'wickets': 'sum',      # Total wickets taken
    'overs': 'sum',        # Total overs bowled
    'balls': 'sum',        # Total balls delivered
    'economy': 'mean',     # Average economy rate
    'conceded': 'sum'      # Total runs conceded
}

# Group by 'Match ID' and 'bowler id', aggregate numeric stats, and keep first occurrence of categorical columns
filtered_df_bowling = agg_df.groupby(['Match ID', 'bowler id']).agg({**agg_rules, 'team': 'first', 'opposition': 'first'}).reset_index()

# Ensure unique rows
filtered_df_bowling = filtered_df_bowling.drop_duplicates()

# Merge df_bowling with df_players to get bowler names
filtered_df_bowling = filtered_df_bowling.merge(df_players, left_on="bowler id", right_on="player_id", how="left")

# Drop unnecessary columns to keep it clean
filtered_df_bowling.drop(columns=["bowler id", "player_id", "player_object_id", "dob", "dod", "gender", "batting_style", "bowling_style", "country_id"], inplace=True)

# Define the desired column order
filtered_df_bowling = filtered_df_bowling[['Match ID', 'player_name', 'team', 'opposition', 'wickets', 'overs', 'balls', 'economy', 'conceded']]

# Bowling DataFrame
filtered_df_bowling.sort_values(by=['Match ID', 'player_name'], ascending=[False, True]).head(10)

In [10]:
# Matches DataFrame

filtered_df_matches = df_matches[['Match ID', 'Match Start Date', 'Team1 Name', 'Team2 Name', 'Match Venue (Stadium)', 'Match Winner']]

# Convert 'Match Start Date' to datetime for sorting

filtered_df_matches.loc[:, "Match Start Date"] = pd.to_datetime(filtered_df_matches["Match Start Date"])

# Sort matches by date
filtered_df_matches = filtered_df_matches.sort_values(by="Match Start Date").reset_index(drop=True)

In [None]:
# Merge data from matches dataframe into bowling dataframe

filtered_df_bowling = filtered_df_bowling.merge(filtered_df_matches[["Match ID", "Match Start Date", "Match Venue (Stadium)"]], on="Match ID", how="left")
filtered_df_bowling = filtered_df_bowling.sort_values(["player_name", "Match Start Date"])

# Calculate additional bowling metrics
filtered_df_bowling["bowling_average"] = (filtered_df_bowling["conceded"] / filtered_df_bowling["wickets"]).round(2)
filtered_df_bowling.loc[filtered_df_bowling["wickets"] == 0, "bowling_average"] = filtered_df_bowling["conceded"].round(2)

filtered_df_bowling["strike_rate"] = (filtered_df_bowling["balls"] / filtered_df_bowling["wickets"]).round(2)
filtered_df_bowling.loc[filtered_df_bowling["wickets"] == 0, "strike_rate"] = filtered_df_bowling["balls"].round(2)

# Display updated dataset
filtered_df_bowling.head()

In [None]:
# Set the bowler name
bowler_name = "Pat Cummins"

# Filter data for the selected player
bowler_df = filtered_df_bowling[filtered_df_bowling["player_name"] == bowler_name]

bowler_df.head()

In [None]:
# Visualization

import plotly.express as px
from plotly.subplots import make_subplots
import plotly.graph_objects as go

metrics = ["wickets", "economy", "bowling_average", "strike_rate"]
titles = ["Wickets Taken", "Economy Rate", "Bowling Average", "Strike Rate"]
opposition_colors = ["#1f77b4", "#ff7f0e", "#2ca02c", "#d62728"]  # Blue, Orange, Green, Red
venue_colors = ["#9467bd", "#8c564b", "#e377c2", "#7f7f7f"]  # Purple, Brown, Pink, Grey

# Performance against Opponents
fig_opposition = make_subplots(
    rows=2, cols=2, subplot_titles=titles, vertical_spacing=0.15, horizontal_spacing=0.1
)

for i, metric in enumerate(metrics):
    row, col = (i // 2) + 1, (i % 2) + 1  # Arrange in 2x2 grid
    
    # Group by opposition and sort by metric in descending order
    if metric == "wickets":
        data = bowler_df.groupby("opposition", as_index=False)[metric].sum().sort_values(by=metric, ascending=False)
    else:
        data = bowler_df.groupby("opposition", as_index=False)[metric].mean().sort_values(by=metric)
    
    trace = go.Bar(x=data["opposition"], y=data[metric], marker=dict(color=opposition_colors[i]), name=titles[i])
    
    fig_opposition.add_trace(trace, row=row, col=col)

fig_opposition.update_layout(
    title_text=f"{bowler_name} - Performance Against Opponents", 
    height=900, width=1100,
    showlegend=False
)
fig_opposition.show()

# Performance at Venues

venue_data = bowler_df.groupby("Match Venue (Stadium)", as_index=False)[metrics].agg(
    lambda x: x.sum() if "wickets" in x.name else x.mean()
)

for i, metric in enumerate(metrics):
    sorted_data = venue_data.sort_values(
        by=metric, ascending=(metric == "wickets")
    )
    
    # For non-wicket metrices, invert the color values to match reversed scale
    if metric != "wickets":
        color_data = sorted_data[metric].max() - sorted_data[metric]  # Invert the values
    else:
        color_data = sorted_data[metric]  # Keep original values
    
    fig_venue = px.bar(
        sorted_data, x=metric, y="Match Venue (Stadium)", color=color_data,
        orientation="h", title=f"{bowler_name} - {titles[i]} at All Venues",
        labels={metric: titles[i], "Match Venue (Stadium)": "Venue"}, height=1200,
        color_continuous_scale=px.colors.sequential.Plasma
    )
    
    # Calculate min and max values from original data
    min_val = sorted_data[metric].min()
    max_val = sorted_data[metric].max()
    
    # Generate a range of tick values
    tick_vals = np.linspace(min_val, max_val, 5)  # 5 ticks from min to max
    
    # Conditionally reverse the colorbar labels
    if metric != "wickets":
        tick_text = [str(round(val, 2)) for val in tick_vals[::-1]]  # Reverse labels
        fig_venue.update_layout(
            coloraxis=dict(
                cmin=color_data.min(),  # Match inverted range
                cmax=color_data.max(),
                reversescale=False,  # Default gradient with inverted data
                colorbar=dict(
                    title=titles[i],  # Set colorbar label to metric title
                    tickmode="array",
                    tickvals=np.linspace(color_data.min(), color_data.max(), 5),  # Full range of color data
                    ticktext=tick_text,
                    len=1.0,  # Full length of colorbar
                    yanchor="middle",
                    y=0.5,
                    ticklabelposition="outside"  # Ensure labels are fully visible
                )
            )
        )
    else:
        tick_text = [str(round(val, 2)) for val in tick_vals]
        fig_venue.update_layout(
            coloraxis=dict(
                cmin=min_val,
                cmax=max_val,
                reversescale=False,
                colorbar=dict(
                    title=titles[i],
                    tickmode="array",
                    tickvals=np.linspace(min_val, max_val, 5),  # Full range of original data
                    ticktext=tick_text,
                    len=1.0,
                    yanchor="middle",
                    y=0.5,  # Position at middle
                    ticklabelposition="outside"  # Ensure labels are fully visible
                )
            )
        )
    
    fig_venue.show()

In [109]:
# Set the date as index
bowler_df.set_index("Match Start Date", inplace=True)

time_series_bowler_data = bowler_df[['wickets', 'economy', 'bowling_average', 'strike_rate']]

# Split data
train_size = int(len(time_series_bowler_data) * 0.8)
train_data = time_series_bowler_data[:train_size]
test_data = time_series_bowler_data[train_size:]

In [123]:
# Perform the Augmented Dickey-Fuller (ADF) test to check for stationarity

from statsmodels.tsa.stattools import adfuller

def adf_test(series, metric_name):
    series_clean = pd.Series(series).dropna()

    result = adfuller(series_clean)

    print(f'ADF Statistic for {metric_name}: {result[0]:.4f}')
    print(f'p-value: {result[1]:.4f}')
    
    is_stationary = result[1] <= 0.05
    if is_stationary:
        print(f"{metric_name} is likely stationary (p-value <= 0.05).")
    else:
        print(f"{metric_name} is likely non-stationary (p-value > 0.05). Consider differencing.")
    
    return result[1], is_stationary

In [None]:
# Run ADF test on all columns in training data
for column in train_data.columns:
    print(f"\nTesting stationarity for '{column}'...")
    p_value, is_stationary = adf_test(train_data[column], metric_name=column)

In [80]:
# Plot the ACF and PACF to identify autocorrelation and partial autocorrelation patterns

import matplotlib.pyplot as plt
import statsmodels.api as sm

def plot_acf_pacf(train_data, lags=20):
    for column in train_data.columns:
        series = train_data[column].dropna()
        allowed_lags = min(lags, max(1, (len(series) // 2) - 1)) # If the number of observations is small, the lag is automatically reduced to fit statsmodels' requirements

        plt.figure(figsize=(12, 5))

        # ACF Plot
        plt.subplot(1, 2, 1)
        sm.graphics.tsa.plot_acf(series, lags=allowed_lags, ax=plt.gca())
        plt.title(f'ACF of {column} (lags={allowed_lags})')

        # PACF Plot
        plt.subplot(1, 2, 2)
        sm.graphics.tsa.plot_pacf(series, lags=allowed_lags, ax=plt.gca())
        plt.title(f'PACF of {column} (lags={allowed_lags})')

        plt.tight_layout()
        plt.show()

In [None]:
# Plot ACF and PACF for all columns in training data
plot_acf_pacf(train_data)

In [None]:
# Fit manually specified ARIMA models and compare with naive forecasts

from statsmodels.tsa.arima.model import ARIMA
from sklearn.metrics import mean_squared_error, mean_absolute_error

# Manually specified ARIMA (p, d, q) values for each metric
manual_orders = {
    'wickets': (1, 0, 2),
    'economy': (1, 0, 2),
    'bowling_average': (1, 0, 2),
    'strike_rate': (1, 0, 0)
}

arima_models = {}
forecasts = {}
naive_forecasts = {}
metrics = {}

# Number of steps to forecast
n_test = len(test_data)

for column in time_series_bowler_data.columns:
    print(f"\nProcessing {column} with ARIMA order {manual_orders[column]}...")

    # Fit ARIMA with manually specified order
    arima_model = ARIMA(train_data[column].dropna(), order=manual_orders[column])
    fitted_model = arima_model.fit()
    arima_models[column] = fitted_model

    # Forecast the test period with ARIMA
    forecast = fitted_model.forecast(steps=n_test)
    forecasts[column] = forecast

    # Naive forecast: repeat the last training value
    last_value = train_data[column].iloc[-1]
    naive_forecast = np.full(n_test, last_value)
    naive_forecasts[column] = naive_forecast

    # Calculate error metrics for ARIMA
    mse = mean_squared_error(test_data[column], forecast)
    mae = mean_absolute_error(test_data[column], forecast)
    metrics[column] = {'MSE': mse, 'MAE': mae}

    # Calculate error metrics for Naive model
    naive_mse = mean_squared_error(test_data[column], naive_forecast)
    naive_mae = mean_absolute_error(test_data[column], naive_forecast)

    print(f"Error Metrics for {column} (ARIMA):")
    print(f"MSE: {mse:.4f}")
    print(f"MAE: {mae:.4f}")
    print(f"Error Metrics for {column} (Naive):")
    print(f"MSE: {naive_mse:.4f}")
    print(f"MAE: {naive_mae:.4f}")

    # Plot actual vs predicted vs naive for test period
    plt.figure(figsize=(10, 4))
    plt.plot(train_data[column], label='Training Data', color='gray')
    plt.plot(test_data[column], label='Actual Test Data', color='blue')
    plt.plot(test_data.index, forecast, label='ARIMA Predicted', color='orange')
    plt.plot(test_data.index, naive_forecast, label='Naive Predicted', color='green', linestyle='--')
    plt.title(f'{column}: Actual vs ARIMA vs Naive Predicted')
    plt.xlabel('Year')
    plt.ylabel(column)
    plt.legend()
    plt.show()

In [None]:
# Time Series Forecasting with ARIMA, SARIMA, and Naive Baselines

from statsmodels.tsa.statespace.sarimax import SARIMAX
from pmdarima import auto_arima
from itertools import product

class TimeSeriesForecaster:
    def __init__(self, train_data, test_data, seasonal_period=12):
        # Initialize with train-test data
        self.train_data = train_data
        self.test_data = test_data
        self.seasonal_period = seasonal_period

        # Initialize storage
        self.arima_models = {}
        self.sarima_models = {}
        self.best_models = {}
        self.forecasts = {}
        self.naive_forecasts = {}
        self.metrics = {}

        # Grid search parameters
        self.p_values = range(0, 7)
        self.d_values = [0]  # For stationary data
        self.q_values = range(0, 7)

        self.n_test = len(self.test_data)

    def grid_search_arima(self, train_data, test_data):
        best_score, best_order = float("inf"), None
        for p, d, q in product(self.p_values, self.d_values, self.q_values):
            try:
                model = ARIMA(train_data, order=(p, d, q))
                fitted_model = model.fit()
                forecast = fitted_model.forecast(steps=len(test_data))
                mse = mean_squared_error(test_data, forecast)
                if mse < best_score:
                    best_score, best_order = mse, (p, d, q)
                print(f"ARIMA{p,d,q} MSE: {mse:.4f}")
            except Exception as e:
                print(f"ARIMA{p,d,q} failed with error: {e}")
                continue
        if best_order is None:
            print("Warning: No valid ARIMA model found with grid search. Falling back to auto_arima.")
        return best_order

    def fit_arima(self, train_data, test_data):
        best_arima_order = self.grid_search_arima(train_data, test_data)
        if best_arima_order is None:
            print("Using auto_arima as fallback for ARIMA...")
            auto_arima_model = auto_arima(
                train_data, start_p=0, start_q=0, max_p=6, max_q=6, d=0,
                seasonal=False, trace=True, error_action='ignore',
                suppress_warnings=True, stepwise=True
            )
            best_arima_order = auto_arima_model.order
            print(f"Fallback ARIMA order from auto_arima: {best_arima_order}")

        arima_model = ARIMA(train_data, order=best_arima_order)
        fitted_arima = arima_model.fit()
        arima_forecast = fitted_arima.forecast(steps=len(test_data))
        return fitted_arima, arima_forecast, best_arima_order

    def fit_sarima(self, train_data, test_data):
        auto_sarima = auto_arima(
            train_data, start_p=0, start_q=0, max_p=6, max_q=6, d=0,
            seasonal=True, m=self.seasonal_period, start_P=0, start_Q=0,
            max_P=2, max_Q=2, D=0, trace=True, error_action='ignore',
            suppress_warnings=True, stepwise=True
        )

        best_sarima_order = auto_sarima.order
        best_sarima_seasonal_order = auto_sarima.seasonal_order
        sarima_model = SARIMAX(
            train_data, order=best_sarima_order,
            seasonal_order=best_sarima_seasonal_order
        )
        fitted_sarima = sarima_model.fit(disp=False)
        sarima_forecast = fitted_sarima.forecast(steps=len(test_data))
        return fitted_sarima, sarima_forecast, best_sarima_order, best_sarima_seasonal_order

    def fit_models(self):
        for column in self.train_data.columns:
            print(f"\nProcessing {column}...")

            # Fit ARIMA
            print("Fitting ARIMA...")
            fitted_arima, arima_forecast, best_arima_order = self.fit_arima(
                self.train_data[column].dropna(), self.test_data[column]
            )
            self.arima_models[column] = fitted_arima
            print(f"Best ARIMA order for {column}: {best_arima_order}")

            # Fit SARIMA
            print("Fitting SARIMA...")
            fitted_sarima, sarima_forecast, best_sarima_order, best_sarima_seasonal_order = self.fit_sarima(
                self.train_data[column].dropna(), self.test_data[column]
            )
            self.sarima_models[column] = fitted_sarima
            print(f"Best SARIMA order for {column}: {best_sarima_order}, Seasonal: {best_sarima_seasonal_order}")

            # Naive forecast
            last_value = self.train_data[column].iloc[-1]
            naive_forecast = np.full(self.n_test, last_value)
            self.naive_forecasts[column] = naive_forecast

            # Metrics
            arima_mse = mean_squared_error(self.test_data[column], arima_forecast)
            arima_mae = mean_absolute_error(self.test_data[column], arima_forecast)
            sarima_mse = mean_squared_error(self.test_data[column], sarima_forecast)
            sarima_mae = mean_absolute_error(self.test_data[column], sarima_forecast)
            naive_mse = mean_squared_error(self.test_data[column], naive_forecast)
            naive_mae = mean_absolute_error(self.test_data[column], naive_forecast)

            # Best model selection
            if arima_mse < sarima_mse and arima_mse < naive_mse:
                self.best_models[column] = {'model': 'ARIMA', 'fitted_model': fitted_arima, 'forecast': arima_forecast}
                self.metrics[column] = {'MSE': arima_mse, 'MAE': arima_mae}
                self.forecasts[column] = arima_forecast
                print(f"Selected ARIMA for {column} with MSE: {arima_mse:.4f}, MAE: {arima_mae:.4f}")
            elif sarima_mse < arima_mse and sarima_mse < naive_mse:
                self.best_models[column] = {'model': 'SARIMA', 'fitted_model': fitted_sarima, 'forecast': sarima_forecast}
                self.metrics[column] = {'MSE': sarima_mse, 'MAE': sarima_mae}
                self.forecasts[column] = sarima_forecast
                print(f"Selected SARIMA for {column} with MSE: {sarima_mse:.4f}, MAE: {sarima_mae:.4f}")
            else:
                self.best_models[column] = {'model': 'Naive', 'fitted_model': None, 'forecast': naive_forecast}
                self.metrics[column] = {'MSE': naive_mse, 'MAE': naive_mae}
                self.forecasts[column] = naive_forecast
                print(f"Selected Naive for {column} with MSE: {naive_mse:.4f}, MAE: {naive_mae:.4f}")

            # Metrics summary
            print(f"Error Metrics for {column} (ARIMA): MSE: {arima_mse:.4f}, MAE: {arima_mae:.4f}")
            print(f"Error Metrics for {column} (SARIMA): MSE: {sarima_mse:.4f}, MAE: {sarima_mae:.4f}")
            print(f"Error Metrics for {column} (Naive): MSE: {naive_mse:.4f}, MAE: {naive_mae:.4f}")

    def visualize(self):
        for column in self.train_data.columns:
            plt.figure(figsize=(10, 4))
            plt.plot(self.train_data[column], label='Training Data', color='gray')
            plt.plot(self.test_data[column], label='Actual Test Data', color='blue')
            plt.plot(self.test_data.index, self.forecasts[column],
                     label=f'{self.best_models[column]["model"]} Predicted', color='orange')
            plt.plot(self.test_data.index, self.naive_forecasts[column],
                     label='Naive Predicted', color='green', linestyle='--')
            plt.title(f'{column}: Actual vs {self.best_models[column]["model"]} vs Naive Predicted')
            plt.xlabel('Year')
            plt.ylabel(column)
            plt.legend()
            plt.show()

            print(f"Last training value for {column}: {self.train_data[column].iloc[-1]}, "
                  f"Mean test value: {self.test_data[column].mean()}, "
                  f"Std test value: {self.test_data[column].std()}")

    def forecast_future(self, steps=5):
        future_forecasts = {}
        future_naive_forecasts = {}

        for column in self.train_data.columns:
            if self.best_models[column]['model'] in ['ARIMA', 'SARIMA']:
                future_forecast = self.best_models[column]['fitted_model'].forecast(steps=steps)
            else:
                future_forecast = np.full(steps, self.train_data[column].iloc[-1])
            future_forecasts[column] = future_forecast

            last_value = self.train_data[column].iloc[-1]
            future_naive = np.full(steps, last_value)
            future_naive_forecasts[column] = future_naive

            print(f"\nFuture Forecast for {column} (next {steps} steps):")
            print(f"{self.best_models[column]['model']} Forecast:")
            print(future_forecast)
            print("Naive Forecast:")
            print(future_naive)

        return future_forecasts, future_naive_forecasts

In [None]:
# Time Series Forecasting

if __name__ == "__main__":
    # For bowling data
    bowling_forecaster = TimeSeriesForecaster(train_data, test_data)
    bowling_forecaster.fit_models()
    bowling_forecaster.visualize()
    bowling_future, bowling_naive_future = bowling_forecaster.forecast_future(steps=5)

In [None]:
# Batting Stats Forecasting

In [None]:
# Aggregating Player Batting Stats

# Batting Average
df_batting["batting_average"] = (df_batting["runs"] / df_batting["isOut"]).round(2)
df_batting.loc[df_batting["isOut"] == 0, "batting_average"] = df_batting["runs"].round(2)

agg_df_batting = df_batting[['Match ID', 'batsman', 'team', 'innings', 'runs', 'balls', 'fours', 'sixes', 'batting_average', 'strikeRate']]

agg_rules_batting = {
    'runs': 'sum',              # Total runs scored
    'balls': 'sum',             # Total balls faced
    'fours': 'sum',             # Total fours hit  
    'sixes': 'sum',             # Total sixes hit 
    'batting_average': 'mean',  # Average of batting average
    'strikeRate': 'mean'        # Average strike rate
}

# Group by 'Match ID' and 'batsman id', aggregate numeric stats, and keep first occurrence of categorical columns
filtered_df_batting = agg_df_batting.groupby(['Match ID', 'batsman']).agg({**agg_rules_batting, 'team': 'first'}).reset_index()

filtered_df_batting = filtered_df_batting.drop_duplicates()

# Merge filtered_df_batting with df_players to get batter names
filtered_df_batting = filtered_df_batting.merge(df_players, left_on="batsman", right_on="player_id", how="left")

filtered_df_batting.drop(columns=["batsman", "player_id", "player_object_id", "dob", "dod", "gender", "batting_style", "bowling_style", "country_id"], inplace=True)

# Merge df_batting with df_matches to get Team1 Name and Team2 Name
filtered_df_batting = filtered_df_batting.merge(df_matches[["Match ID", "Team1 Name", "Team2 Name"]], on="Match ID", how="left")

# Determine Opponent Team
filtered_df_batting["opposition"] = filtered_df_batting.apply(
    lambda row: row["Team2 Name"] if row["team"] == row["Team1 Name"] else row["Team1 Name"], axis=1
)

filtered_df_batting.drop(columns=["Team1 Name", "Team2 Name"], inplace=True)

filtered_df_batting = filtered_df_batting.merge(filtered_df_matches[["Match ID", "Match Start Date", "Match Venue (Stadium)"]], on="Match ID", how="left")

filtered_df_batting = filtered_df_batting[['Match ID', 'player_name', 'team', 'opposition', 'runs', 'balls', 'fours', 'sixes', 'batting_average', 'strikeRate', 'Match Start Date', 'Match Venue (Stadium)']]

filtered_df_batting = filtered_df_batting.sort_values(["player_name", "Match Start Date"])

# Batting DataFrame
filtered_df_batting.head()

In [None]:
# Set the player name
batter_name = "Travis Head"

# Filter data for the selected player
batter_df = filtered_df_batting[filtered_df_batting["player_name"] == batter_name]

batter_df.head()

In [None]:
# Visualization

# Define metrics and titles for batting performance
metrics = ["runs", "batting_average", "strikeRate"]
titles = ["Runs Scored", "Batting Average", "Strike Rate"]
opposition_colors = ["#1f77b4", "#ff7f0e", "#2ca02c"]  # Blue, Orange, Green

# Performance against Opponents
fig_opposition = make_subplots(
    rows=1, cols=3, subplot_titles=titles, horizontal_spacing=0.1
)

for i, metric in enumerate(metrics):
    # Arrange in 1x3 grid
    col = i + 1
    
    # Group by opposition and sort by metric in descending order
    if metric == "runs":
        data = batter_df.groupby("opposition", as_index=False)[metric].sum().sort_values(by=metric, ascending=False)
    else:
        data = batter_df.groupby("opposition", as_index=False)[metric].mean().sort_values(by=metric, ascending=False)
    
    trace = go.Bar(
        x=data["opposition"], 
        y=data[metric], 
        marker=dict(color=opposition_colors[i]), 
        name=titles[i]
    )
    
    fig_opposition.add_trace(trace, row=1, col=col)

# Update layout
fig_opposition.update_layout(
    title_text=f"{batter_name} - Performance Against Opponents", 
    height=700, 
    width=1750,
    showlegend=False
)

# Display the figure
fig_opposition.show()

# Performance by Match Venue
venue_data = batter_df.groupby("Match Venue (Stadium)", as_index=False)[metrics].agg(
    lambda x: x.sum() if "runs" in x.name else x.mean()
)

for i, metric in enumerate(metrics):
    sorted_data = venue_data.sort_values(
        by=metric, ascending=False  # Sort data in descending order
    )
    
    color_data = sorted_data[metric]  # Keep original values for all
    
    fig_venue = px.bar(
        sorted_data, 
        x=metric, 
        y="Match Venue (Stadium)", 
        color=color_data,
        orientation="h", 
        title=f"{batter_name} - {titles[i]} by Match Venue",
        labels={metric: titles[i], "Match Venue (Stadium)": "Venue"},
        height=1200,
        width=1750,
        color_continuous_scale=px.colors.sequential.Plasma
    )
    
    # Generate a range of tick values directly from color_data
    tick_vals = np.linspace(color_data.min(), color_data.max(), 5)  # 5 ticks from min to max
    
    # Use the same configuration for all metrics
    tick_text = [str(round(val, 2)) for val in tick_vals]
    fig_venue.update_layout(
        coloraxis=dict(
            cmin=color_data.min(),
            cmax=color_data.max(),
            reversescale=False,  # Keep color scale aligned with values
            colorbar=dict(
                title=titles[i],
                tickmode="array",
                tickvals=tick_vals,
                ticktext=tick_text,
                len=1.0,
                yanchor="middle",
                y=0.5,
                ticklabelposition="outside"
            )
        ),
        yaxis=dict(
            autorange="reversed"  # Highest values at top
        ),
        showlegend=False
    )
    
    fig_venue.show()

In [None]:

batter_df.set_index("Match Start Date", inplace=True)  
# batter_df.index = pd.to_datetime(batter_df.index)   

time_series_batter_data = batter_df[['runs', 'batting_average', 'strikeRate']]

# Split data
train_size = int(len(time_series_batter_data) * 0.8)
train_data = time_series_batter_data[:train_size]
test_data = time_series_batter_data[train_size:]

In [None]:
# Run ADF test on all columns in training data
for column in train_data.columns:
    print(f"\nTesting stationarity for '{column}'...")
    p_value, is_stationary = adf_test(train_data[column], metric_name=column)

In [None]:
# Plot ACF and PACF for all columns in training data
plot_acf_pacf(train_data)

In [None]:
# Fit manually specified ARIMA models and compare with naive forecasts

manual_orders = {
    'runs': (2, 0, 3),
    'batting_average': (2, 0, 3),
    'strikeRate': (2, 0, 4)
}

arima_models = {}
forecasts = {}
naive_forecasts = {}
metrics = {}

n_test = len(test_data)

for column in time_series_batter_data.columns:
    print(f"\nProcessing {column} with ARIMA order {manual_orders[column]}...")

    # Fit ARIMA with manually specified order
    arima_model = ARIMA(train_data[column].dropna(), order=manual_orders[column])
    fitted_model = arima_model.fit()
    arima_models[column] = fitted_model

    # Forecast the test period with ARIMA
    forecast = fitted_model.forecast(steps=n_test)
    forecasts[column] = forecast

    # Naive forecast
    last_value = train_data[column].iloc[-1]
    naive_forecast = np.full(n_test, last_value)
    naive_forecasts[column] = naive_forecast

    # Calculate error metrics for ARIMA
    mse = mean_squared_error(test_data[column], forecast)
    mae = mean_absolute_error(test_data[column], forecast)
    metrics[column] = {'MSE': mse, 'MAE': mae}

    # Calculate error metrics for Naive model
    naive_mse = mean_squared_error(test_data[column], naive_forecast)
    naive_mae = mean_absolute_error(test_data[column], naive_forecast)

    print(f"Error Metrics for {column} (ARIMA):")
    print(f"MSE: {mse:.4f}")
    print(f"MAE: {mae:.4f}")
    print(f"Error Metrics for {column} (Naive):")
    print(f"MSE: {naive_mse:.4f}")
    print(f"MAE: {naive_mae:.4f}")

    # Plot actual vs predicted vs naive for test period
    plt.figure(figsize=(10, 4))
    plt.plot(train_data[column], label='Training Data', color='gray')
    plt.plot(test_data[column], label='Actual Test Data', color='blue')
    plt.plot(test_data.index, forecast, label='ARIMA Predicted', color='orange')
    plt.plot(test_data.index, naive_forecast, label='Naive Predicted', color='green', linestyle='--')
    plt.title(f'{column}: Actual vs ARIMA vs Naive Predicted')
    plt.xlabel('Year')
    plt.ylabel(column)
    plt.legend()
    plt.show()

In [None]:
# Time Series Forecasting

if __name__ == "__main__":
    # For batting data
    batting_forecaster = TimeSeriesForecaster(train_data, test_data)
    batting_forecaster.fit_models()
    batting_forecaster.visualize()
    batting_future, batting_naive_future = batting_forecaster.forecast_future(steps=5)