In [None]:
# packages
from datetime import datetime
import os
import pandas as pd
from pathlib import Path
import concurrent.futures
import json
import warnings
import inflection


def read_and_parse_file(file_path, start_index, end_index):
    with open(file_path, "r") as file:
        js_object = file.read()
    data = js_object.strip()[start_index:-end_index]
    json_data = json.loads(data)
    return json_data


def read_json_files(folder_path, data_type, start_index, end_index):
    json_data_list = []
    file_paths = list(Path(folder_path).rglob(f"*{data_type}.js"))

    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = [
            executor.submit(read_and_parse_file, path, start_index, end_index)
            for path in file_paths
        ]

        for future in concurrent.futures.as_completed(futures):
            json_data = future.result()
            json_data_list.append(json_data)

    return json_data_list


def read_csv_files(folder_path):
    dataframes = []
    file_paths = list(Path(folder_path).rglob("*.csv"))

    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = [
            executor.submit(pd.read_csv, path, dtype={"column20": str, "column21": str})
            for path in file_paths
        ]

        for future in concurrent.futures.as_completed(futures):
            df = future.result()
            dataframes.append(df)

    final_dataframe = pd.concat(dataframes, ignore_index=True)
    return final_dataframe


def combine_squad_data(json_list):
    comined_json_list = []

    for json_obj in json_list:
        squad_a = json_obj.get("squadA", [])
        squad_b = json_obj.get("squadB", [])

        divided_json_1 = {"squad": squad_a}
        divided_json_2 = {"squad": squad_b}

        comined_json_list.append(divided_json_1)
        comined_json_list.append(divided_json_2)

    return comined_json_list


def combine_innings_data(innings_1_json_data_list, innings_2_json_data_list):
    combined_data = []

    # Append the JSON objects from the first list
    for innings_1_json_data in innings_1_json_data_list:
        combined_data.append(innings_1_json_data["Innings1"])

    # Append the JSON objects from the second list
    for innings_2_json_data in innings_2_json_data_list:
        combined_data.append(innings_2_json_data["Innings2"])

    return combined_data


def convert_json_list_to_df(json_data_list, key_name):
    dataframes = []
    for json_data in json_data_list:
        df = pd.DataFrame(json_data[key_name])
        dataframes.append(df)
    final_dataframe = pd.concat(dataframes, ignore_index=True)

    return final_dataframe

In [None]:
import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

data_path = os.getenv("data_path")

data_feeds_path = f"{data_path}/Data_Feeds"
squad_feeds_path = f"{data_path}/Squad_Feeds"
other_tournament_data_path = f"{data_path}/other-tournament-data"

# file name of collected data
match_schedule_file_name = "matchSchedule"
match_squad_file_name = "squad"
match_summary_file_name = "matchsummary"
match_inning1_file_name = "Innings1"
match_inning2_file_name = "Innings2"

# key value of relevant data in json objects of collected data
match_result_key = "Result"
match_player_key = "squad"
match_summary_key = "MatchSummary"
match_batting_card_key = "BattingCard"
match_extras_key = "Extras"
match_fall_of_wickets_key = "FallOfWickets"
match_wagon_wheel_key = "WagonWheel"
match_partnership_scores_key = "PartnershipScores"
match_partnership_break_key = "PartnershipBreak"
match_bowling_card_key = "BowlingCard"
match_manhattan_graph_key = "ManhattanGraph"
match_manhattan_wickets_key = "ManhattanWickets"
match_over_history_key = "OverHistory"
match_wagon_wheel_summary_key = "WagonWheelSummary"
match_batting_head_to_head_key = "battingheadtohead"
match_bowling_head_to_head_key = "battingheadtohead"

In [None]:
def load_match_result_data():
    match_schedule_json_data_list = read_json_files(
        data_feeds_path, match_schedule_file_name, 14, 2
    )
    return convert_json_list_to_df(match_schedule_json_data_list, match_result_key)


def load_match_player_data():
    match_squad_json_data_list = read_json_files(
        squad_feeds_path, match_squad_file_name, 8, 2
    )
    match_player_json_data_list = combine_squad_data(match_squad_json_data_list)
    return convert_json_list_to_df(match_player_json_data_list, match_player_key)


def load_match_summary_data():
    match_summary_json_data_list = read_json_files(
        data_feeds_path, match_summary_file_name, 22, 2
    )
    return convert_json_list_to_df(match_summary_json_data_list, match_summary_key)


def load_match_innings_data():
    innings_1_json_data_list = read_json_files(
        data_feeds_path, match_inning1_file_name, 10, 2
    )
    innings_2_json_data_list = read_json_files(
        data_feeds_path, match_inning2_file_name, 10, 2
    )
    innings_json_data_list = combine_innings_data(
        innings_1_json_data_list, innings_2_json_data_list
    )
    match_batting_card = convert_json_list_to_df(
        innings_json_data_list, match_batting_card_key
    )
    match_extras = convert_json_list_to_df(innings_json_data_list, match_extras_key)
    match_fall_of_wickets = convert_json_list_to_df(
        innings_json_data_list, match_fall_of_wickets_key
    )
    match_wagon_wheel = convert_json_list_to_df(
        innings_json_data_list, match_wagon_wheel_key
    )
    match_partnership_scores = convert_json_list_to_df(
        innings_json_data_list, match_partnership_scores_key
    )
    match_partnership_break = convert_json_list_to_df(
        innings_json_data_list, match_partnership_break_key
    )
    match_bowling_card = convert_json_list_to_df(
        innings_json_data_list, match_bowling_card_key
    )
    match_manhattan_graph = convert_json_list_to_df(
        innings_json_data_list, match_manhattan_graph_key
    )
    match_manhattan_wickets = convert_json_list_to_df(
        innings_json_data_list, match_manhattan_wickets_key
    )
    match_over_history = convert_json_list_to_df(
        innings_json_data_list, match_over_history_key
    )
    match_wagon_wheel_summary = convert_json_list_to_df(
        innings_json_data_list, match_wagon_wheel_summary_key
    )
    match_batting_head_to_head = convert_json_list_to_df(
        innings_json_data_list, match_batting_head_to_head_key
    )
    match_bowling_head_to_head = convert_json_list_to_df(
        innings_json_data_list, match_bowling_head_to_head_key
    )

    dataframes = {
        "match_batting_card": match_batting_card,
        "match_extras": match_extras,
        "match_fall_of_wickets": match_fall_of_wickets,
        "match_wagon_wheel": match_wagon_wheel,
        "match_partnership_scores": match_partnership_scores,
        "match_partnership_break": match_partnership_break,
        "match_bowling_card": match_bowling_card,
        "match_manhattan_graph": match_manhattan_graph,
        "match_manhattan_wickets": match_manhattan_wickets,
        "match_over_history": match_over_history,
        "match_wagon_wheel_summary": match_wagon_wheel_summary,
        "match_batting_head_to_head": match_batting_head_to_head,
        "match_bowling_head_to_head": match_bowling_head_to_head,
    }

    return dataframes


def load_match_ball_data():
    match_balls = read_csv_files(other_tournament_data_path)
    return match_balls

In [None]:
match_result_data = load_match_result_data()
print(f"match results: {match_result_data.shape}")
print("Match Results Schema:")
print(match_result_data.info())

match_summary_data = load_match_summary_data()
print(f"match summaries: {match_summary_data.shape}")
print("Match Summaries Schema:")
print(match_summary_data.info())

match_player_data = load_match_player_data()
print(f"match players: {match_player_data.shape}")
print("Match Players Schema:")
print(match_player_data.info())

match_ball_data = load_match_ball_data()
print(f"match balls: {match_ball_data.shape}")
print("Match Balls Schema:")
print(match_ball_data.info())

innings_dataframes = load_match_innings_data()

for name, dataframe in innings_dataframes.items():
    print(f"\n{name} DataFrame:")
    print("Shape:", dataframe.shape)
    print("Columns:", dataframe.info())

Table Specific


In [None]:
MATCHES_RESULT_REQD_COLS = [
    "MatchID",
    "MatchDate",
    "MatchTime",
    "FirstBattingTeamID",
    "MatchDateOrder",
    "MatchName",
    "FirstBattingTeamName",
    "SecondBattingTeamID",
    "SecondBattingTeamName",
    "GroundName",
    "Comments",
    "TossTeam",
    "TossDetails",
    "FirstBattingSummary",
    "SecondBattingSummary",
]

In [None]:
# utils

import inflection
import pandas as pd


def clean_match_summary_data(match_summary):
    match_summary.loc[
        match_summary.CompetitionName == "BIG BASH LEAGUE 2018-19", "CompetitionName"
    ] = "BBL 2018-19"


def create_winning_team_id_column(match_df):
    match_df["winning_team_id"] = ""

    for index, row in match_df.iterrows():
        comments = row["Comments"]
        first_batting_team_name = row["FirstBattingTeamName"]
        first_batting_team_id = row["FirstBattingTeamID"]
        second_batting_team_name = row["SecondBattingTeamName"]
        second_batting_team_id = row["SecondBattingTeamID"]

        if first_batting_team_name in comments:
            match_df.at[index, "winning_team_id"] = first_batting_team_id
        elif second_batting_team_name in comments:
            match_df.at[index, "winning_team_id"] = second_batting_team_id

    return match_df


def preapare_data(match_result, match_summary, match_player):
    match_result = create_winning_team_id_column(match_result)
    match_result["is_title"] = (match_result["MatchDateOrder"] == 1).astype(int)
    match_result["is_playoff"] = (
        match_result["MatchDateOrder"].isin([1, 2, 3, 4])
    ).astype(int)

    match_result.columns = [inflection.underscore(col) for col in match_result.columns]
    match_summary.columns = [
        inflection.underscore(col) for col in match_summary.columns
    ]
    match_player.columns = [inflection.underscore(col) for col in match_player.columns]

    return match_result, match_summary, match_player


def generate_match_df(match_summary, match_result):
    match_df = pd.merge(
        match_summary[
            [
                "match_id",
                "competition_name",
                "first_batting_team_id",
                "second_batting_team_id",
            ]
        ],
        match_result[["match_id", "is_title", "is_playoff", "winning_team_id"]],
        on="match_id",
    )

    match_df["season"] = match_df.competition_name.apply(lambda x: x.split(" ")[1][:4])
    match_df.competition_name = match_df.competition_name.apply(
        lambda x: x.split(" ")[0]
    )

    return match_df


def generate_teams_df(match_df, match_player):
    teams_data = []

    for team_id in set(match_df["first_batting_team_id"]).union(
        set(match_df["second_batting_team_id"])
    ):
        team_matches = match_df[
            (match_df["first_batting_team_id"] == team_id)
            | (match_df["second_batting_team_id"] == team_id)
        ]
        competitions = team_matches["competition_name"].unique().tolist()

        for competition in competitions:
            competition_matches = team_matches[
                team_matches["competition_name"] == competition
            ]
            season_year_list = competition_matches["season"].unique().tolist()
            titles_count = len(
                competition_matches[
                    competition_matches["is_title"]
                    & (competition_matches["winning_team_id"] == team_id)
                ]
            )
            playoffs_count = len(
                competition_matches[competition_matches["is_playoff"] == 1]
            )

            team_data = {
                "team_id": team_id,
                "competition_name": competition,
                "seasons_played": season_year_list,
                "titles": titles_count,
                "playoffs": playoffs_count,
            }

            teams_data.append(team_data)

    teams_df = pd.DataFrame(teams_data)
    return teams_df

In [None]:
matches_df_1 = match_result_data.loc[:, MATCHES_RESULT_REQD_COLS]

In [None]:
clean_match_summary_data(match_summary_data)
match_result, match_summary, match_player = preapare_data(
    match_result_data, match_summary_data, match_player_data
)
matches_df_2 = generate_match_df(match_summary, match_result)

In [None]:
matches_df_1.columns = [
    inflection.underscore(column) for column in matches_df_1.columns
]

In [None]:
import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Read the values from environment variables
azure_delta_lake_path = os.getenv("azure_delta_lake_path")
local_delta_lake_path = os.getenv("local_delta_lake_path")
azure_storage_account_name = os.getenv("azure_storage_account_name")
azure_storage_account_key = os.getenv("azure_storage_account_key")

In [None]:
import duckdb
from deltalake import DeltaTable
from deltalake.writer import write_deltalake
import pickle

# to access azure delta lake
options = {
    "AZURE_STORAGE_ACCOUNT_NAME": azure_storage_account_name,
    "AZURE_STORAGE_ACCOUNT_KEY": azure_storage_account_key,
}


def query_existing_data(table_name, sql_query):
    table_path = f"{local_delta_lake_path}/{table_name}"
    delta_table = DeltaTable(table_path)
    pyarrow_delta_table = delta_table.to_pyarrow_dataset()
    duckdb_table = duckdb.arrow(pyarrow_delta_table)
    df = duckdb.query(sql_query).to_df()
    return df


def update_existing_data(serialized_df, table_name):
    df = pickle.loads(serialized_df)
    table_path = f"{local_delta_lake_path}/{table_name}"
    write_deltalake(table_path, df)

In [None]:
DB_NAME = "duckdb_table"

# matches table config

GET_EXISTING_PLAYERS_DETAILS_SQL = f"""Select player_name, src_player_id, player_id 
from {DB_NAME};"""

GET_EXISTNG_TEAMS_DETAILS_SQL = f"""Select team_id, src_team_id, team_name, competition_name, seasons_played, titles, playoffs, 
team_short_name from {DB_NAME};"""

GET_EXISTING_VENUE_DETAILS_SQL = (
    f"""Select stadium_name, src_venue_id, venue_id from {DB_NAME};"""
)

In [None]:
teams_df = query_existing_data("teams", GET_EXISTNG_TEAMS_DETAILS_SQL)
players_df = query_existing_data("players", GET_EXISTING_PLAYERS_DETAILS_SQL)
venues_df = query_existing_data("venues", GET_EXISTING_VENUE_DETAILS_SQL)

In [None]:
matches_df = matches_df_2.merge(matches_df_1, on="match_id").drop_duplicates(
    ["match_id"]
)

In [None]:
print(matches_df.info())

In [None]:
import datetime

load_timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")

matches_df["load_timestamp"] = load_timestamp

In [None]:
matches_df = matches_df.rename(
    columns={
        "match_id": "src_match_id",
        "first_batting_team_id_x": "team1",
        "second_batting_team_id_x": "team2",
        "winning_team_id": "winning_team",
    }
)

In [None]:
# Define the existing matches count and the current session's match count
existing_matches_count = 50
current_session_matches_count = len(matches_df)

# Generate matche IDs based on the existing and current session's match count
match_ids = range(
    existing_matches_count, existing_matches_count + current_session_matches_count
)

# Update the matches_df DataFrame with the new columns
matches_df["match_id"] = match_ids

In [None]:
matches_df.set_index("match_id", inplace=True)

In [None]:
print(matches_df.info())

In [None]:
matches_df["team1_score"] = matches_df["first_batting_summary"].apply(
    lambda x: int(x.split("/")[0]) if (x != "") else 0
)
matches_df["team1_wickets"] = matches_df["first_batting_summary"].apply(
    lambda x: int(x.split("/")[1].split(" ")[0]) if (x != "") else 0
)
matches_df["team1_overs"] = matches_df["first_batting_summary"].apply(
    lambda x: x.split(" ")[1].strip().replace("(", "") if (x != "") else 0
)
matches_df["team2_score"] = matches_df["second_batting_summary"].apply(
    lambda x: int(x.split("/")[0]) if (x != "") else 0
)
matches_df["team2_wickets"] = matches_df["second_batting_summary"].apply(
    lambda x: int(x.split("/")[1].split(" ")[0]) if (x != "") else 0
)
matches_df["team2_overs"] = matches_df["second_batting_summary"].apply(
    lambda x: x.split(" ")[1].strip().replace("(", "") if (x != "") else 0
)

In [None]:
REQ_COLS = [
    "src_match_id",
    "match_name",
    "competition_name",
    "is_playoff",
    "is_title",
    "match_date",
    "match_time",
    "season",
    "load_timestamp",
    "team1",
    "team2",
    "winning_team",
    "team1_score",
    "team1_wickets",
    "team1_overs",
    "team2_score",
    "team2_wickets",
    "team2_overs",
]

In [42]:
matches_df = matches_df.loc[:, REQ_COLS]
# print(matches_df.info())

In [43]:
import duckdb
from deltalake import DeltaTable
from deltalake.writer import write_deltalake
import pickle

import os
from dotenv import load_dotenv

local_delta_lake_path = os.getenv("local_delta_lake_path")


def update_existing_data(serialized_df, table_name):
    df = pickle.loads(serialized_df)
    table_path = f"{local_delta_lake_path}/{table_name}"
    write_deltalake(table_path, df)

In [44]:
serialized_df = pickle.dumps(matches_df)
# update_existing_data(serialized_df, "matches")
print(matches_df)

              src_match_id                                     match_name  \
match_id                                                                    
50        adf273dac3a2458b             SYDNEYSIXERSVSPERTHSCORCHERS221218   
51        5b117f6c333b481b         MELBOURNESTARSVSADELAIDESTRIKERS230119   
52        25ccc0c7b5e94abc      SYDNEYTHUNDERVSMELBOURNERENEGADESVS220119   
53        b5f70734046f40e6              SYDNEYTHUNDERVSSYDNEYSIXERS020219   
54        32987387a554475f           SYDNEYSIXERSVSHOBARTHURRICANES230119   
55        a4341072bed64fb5      MELBOURNERENEGADESVSSYDNEYSIXERSSF2150219   
56        295adc31d09c479c       HOBARTHURRICANESVSADELAIDESTRIKERS310119   
57        70c8b8878d4f4671           HOBARTHURRICANESVSBRISBANEHEAT290119   
58        0558b2b4d5904895           ADELAIDESTRIKERSVSBRISBANEHEAT030219   
59        0230a0365b784993            SYDNEYTHUNDERVSMELBOURNESTARS211218   
60        84a3e6bd92eb4a11           SYDNEYSIXERSVSADELAIDESTRIKERS290119   