# Data Reading Section:
##### In this section, we will use the following codes, which will be explained below, to read the data we need from the main reference file and convert it to CSV format so that we can use it in the next steps.

---

# ‚ö†Ô∏è IMPORTANT ‚Äî READ BEFORE RUNNING

This notebook expects the raw dataset to be available **before execution**.  
If the required ZIP file is not placed in the correct path, the ETL pipeline will fail or generate incomplete/duplicated outputs.

---

## ‚úÖ Required Action

Please make sure the following file exists **before running the notebook**:

..\data\raw\tennis_data.zip


> üìå The path is already configured inside the notebook‚Äôs Python extraction script ‚Äî do **not** change it unless necessary.

If the ZIP file has a different name, please rename it or update the code accordingly.

---

# Part 1: Importing the required libraries, defining the paths, and creating the required directories if they do not exist.
In this section, we import the libraries and items we need to use them later, and then we define the main paths, such as the main zip file path, the output file directory, and the temp directory, in a relational manner, to be included in the data folder of this project.

In [None]:
import os, zipfile
import pandas as pd
import numpy as np
import pyarrow.parquet as pq
from io import BytesIO

print(os.getcwd())
os.chdir("..")

# Define paths
main_zip = "./data/raw/tennis_data.zip"
output_dir = "./data/processed"
temp_dir = "./data/raw/temp"
base_path = "./data/processed"
clean_path = "./data/processed/clean"

os.makedirs(clean_path, exist_ok=True)
os.makedirs(output_dir, exist_ok=True)
os.makedirs(temp_dir, exist_ok=True)

# Part 2: Creating a CSV table generator function from Parquet files
In this section, we have created a very useful function that, based on the keyword of the parquet category name that we give it, goes to the defined path of the main zip file and reads the parquets belonging to the specified tables and the data related to the specified columns from the zip files for each day. In addition to all this, we specify that the records of this table should be unique based on the unique data identifier or that this table can have multiple rows for each unique identifier. Our unique identifier is match_id.

In [None]:
def build_table(table_keyword, needed_cols, output_name, dedup_on="match_id"):
    """
    table_keyword: like 'event_' or 'home_team_'
    needed_cols: list of needed columns
    output_name: name of output CSV file
    dedup_on: unique column for deduplication (default is 'match_id')
    """
    csv_path = os.path.join(output_dir, output_name)
    if os.path.exists(csv_path):
        os.remove(csv_path)

    all_dfs = []
    row_counter = 0

    with zipfile.ZipFile(main_zip, "r") as main_zip_ref:
        daily_zips = main_zip_ref.namelist()
        print(f"üì¶ Count of daily zips: {len(daily_zips)}")

        for i, daily_zip_name in enumerate(daily_zips, start=1):
            print(f"üîπ ({i}/{len(daily_zips)}) processing {daily_zip_name} ...")
            main_zip_ref.extract(daily_zip_name, temp_dir)
            daily_zip_path = os.path.join(temp_dir, daily_zip_name)

            with zipfile.ZipFile(daily_zip_path, "r") as daily_zip_ref:
                parquet_files = [f for f in daily_zip_ref.namelist() if f.endswith(".parquet") and table_keyword in f]
                for f in parquet_files:
                    with daily_zip_ref.open(f) as pf:
                        table = pq.read_table(BytesIO(pf.read()))
                        df = table.to_pandas()
                        df = df[[c for c in needed_cols if c in df.columns]]
                        df["date_source"] = daily_zip_name.replace(".zip", "")
                        all_dfs.append(df)
                        row_counter += len(df)

            os.remove(daily_zip_path)

    if all_dfs:
        df_all = pd.concat(all_dfs, ignore_index=True)
        print(f"‚úÖ Shape: {df_all.shape}")
        if dedup_on and dedup_on in df_all.columns:
            df_all = df_all.drop_duplicates(subset=dedup_on)
        else:
            df_all = df_all.drop_duplicates()
        print(f"üßπ after cleaning duplicated rows: {df_all.shape}")

        df_all.to_csv(csv_path, index=False)
        print(f"üíæ Saved: {csv_path}")
        print(f"üìä Count of all rows: {len(df_all)}")
    else:
        print(f"‚ö†Ô∏è There is no file for {table_keyword}")

# Part 3: Using the above cell function and creating CSVs of the tables required for analysis according to the columns required from them
In this part, based on the initial analysis we had of the 17 questions in question and the data they required, we extracted a series of tables from a total of 15 tables and a series of their columns that were needed to analyze and answer the 17 questions we needed. Here, we want to extract them from the original raw zip file and convert them to CSV files so that we can use these files later in analyzing and answering the questions.

In [None]:
build_table(
    table_keyword="event_",
    needed_cols=["match_id", "first_to_serve", "winner_code", "default_period_count", "start_datetime", "match_slug"],
    output_name="event.csv",
    dedup_on="match_id"
)

build_table(
    table_keyword="home_team_",
    needed_cols=["match_id", "player_id", "full_name", "gender", "height", "weight", "plays", "current_rank", "country"],
    output_name="home_team.csv",
    dedup_on="match_id"
)

build_table(
    table_keyword="away_team_",
    needed_cols=["match_id", "player_id", "full_name", "gender", "height", "weight", "plays", "current_rank", "country"],
    output_name="away_team.csv",
    dedup_on="match_id"
)

build_table(
    table_keyword="tournament_",
    needed_cols=["match_id", "tournament_id", "tournament_name", "ground_type", "tennis_points", "start_datetime"],
    output_name="tournament.csv",
    dedup_on="match_id"
)

build_table(
    table_keyword="time_",
    needed_cols=["match_id", "period_1", "period_2", "period_3", "period_4", "period_5", "current_period_start_timestamp"],
    output_name="time.csv",
    dedup_on="match_id"
)

build_table(
    table_keyword="statistics_",
    needed_cols=["match_id", "statistic_name", "home_value", "away_value"],
    output_name="statistics.csv",
    dedup_on=None  # No deduplication because we have multiple rows per match_id in statistics
)

build_table(
    table_keyword="power_",
    needed_cols=["match_id", "set_num", "game_num", "value", "break_occurred"],
    output_name="power.csv",
    dedup_on=None # No deduplication because we have multiple rows per match_id in power
)

build_table(
    table_keyword="pbp_",
    needed_cols=["match_id", "set_id", "game_id", "point_id", "home_point", "away_point", "home_score"],
    output_name="pbp.csv",
    dedup_on=None # No deduplication because we have multiple rows per match_id in pbp
)


##  Part 4: Data Cleaning Stage
#### In this section, we will clean the extracted CSV files created in the previous section.
# 
### **Goal:**  
- Remove duplicate rows  
 - Handle missing values (`NaN`)  
 - Standardize data types  
 The cleaned outputs will be stored in `../data/clean` for the next normalization phase.


###  Cleaning: Event Table

In [None]:
df_event = pd.read_csv(os.path.join(base_path, "event.csv"))
df_event.drop_duplicates(inplace=True)

for col in df_event.columns:
    if df_event[col].dtype == 'object':
        df_event[col] = df_event[col].fillna("Unknown")
    else:
        df_event[col] = df_event[col].fillna(0)

if "match_id" in df_event.columns:
    df_event["match_id"] = df_event["match_id"].astype(str)

df_event.to_csv(os.path.join(clean_path, "event_clean.csv"), index=False)
print("‚úÖ event_clean.csv created successfully!")

###  Cleaning: Home Team Table

In [None]:
df_home = pd.read_csv(os.path.join(base_path, "home_team.csv"))
df_home = df_home.drop_duplicates()

string_cols = ["full_name", "gender", "plays", "country"]
numeric_cols = ["height", "weight", "current_rank"]

for col in string_cols:
    if col in df_home.columns:
        df_home[col] = df_home[col].fillna("Unknown")

for col in numeric_cols:
    if col in df_home.columns:
        df_home[col] = df_home[col].fillna(0)

if "match_id" in df_home.columns:
    df_home["match_id"] = df_home["match_id"].astype(str)

df_home.to_csv(os.path.join(clean_path, "home_team_clean.csv"), index=False)
print("‚úÖ home_team_clean.csv created successfully!")

###  Cleaning: Away Team Table

In [None]:
df_away = pd.read_csv(os.path.join(base_path, "away_team.csv"))
df_away = df_away.drop_duplicates()

string_cols = ["full_name", "gender", "plays", "country"]
numeric_cols = ["height", "weight", "current_rank"]

for col in string_cols:
    if col in df_away.columns:
        df_away[col] = df_away[col].fillna("Unknown")

for col in numeric_cols:
    if col in df_away.columns:
        df_away[col] = df_away[col].fillna(0)

if "match_id" in df_away.columns:
    df_away["match_id"] = df_away["match_id"].astype(str)

df_away.to_csv(os.path.join(clean_path, "away_team_clean.csv"), index=False)
print("‚úÖ away_team_clean.csv created successfully!")

## Part 5: Normalization Stage
#### Now that we have clean CSVs, in this part we will:
#
 - Convert data types (e.g., timestamps to datetime)  
 - Standardize text (e.g., capitalization, spacing)  
 - Fill remaining missing values intelligently (using mean, median, or mode)  
 The normalized final datasets will be saved in `../data/clean` as `_final.csv` files.


###  Normalization ‚Äî Event Table

In [None]:
base_path = "./data/processed"
input_path = os.path.join(base_path, "event_clean.csv")
output_path = os.path.join(base_path, "event_final.csv")

df_event = pd.read_csv(input_path)

df_event["match_id"] = df_event["match_id"].astype(int)
df_event["default_period_count"] = df_event["default_period_count"].astype(int)
df_event["date_source"] = df_event["date_source"].astype(int)

if np.issubdtype(df_event["start_datetime"].dtype, np.number):
    df_event["start_datetime"] = pd.to_datetime(df_event["start_datetime"], unit="s", errors="coerce")

df_event["winner_code"] = df_event["winner_code"].fillna(df_event["winner_code"].mode()[0])
df_event["first_to_serve"] = df_event["first_to_serve"].fillna(df_event["first_to_serve"].mode()[0])

df_event.to_csv(output_path, index=False)
print("‚úÖ event_final.csv created successfully!")
print(df_event.info())
print(df_event.isna().sum())

###  Normalization ‚Äî Home Team Table

In [None]:
input_path = os.path.join(base_path, "home_team_clean.csv")
output_path = os.path.join(base_path, "home_team_final.csv")

df_home = pd.read_csv(input_path)

numeric_cols = ["height", "weight", "current_rank"]
for col in numeric_cols:
    if col in df_home.columns:
        df_home[col] = pd.to_numeric(df_home[col], errors="coerce")

if "gender" in df_home.columns:
    df_home["gender"] = df_home["gender"].astype(str).str.strip().str.title().replace({"Nan":"Unknown"})
if "plays" in df_home.columns:
    df_home["plays"] = df_home["plays"].astype(str).str.strip().str.lower().replace({"nan":"unknown"})
for col in ["full_name", "country"]:
    if col in df_home.columns:
        df_home[col] = df_home[col].astype(str).str.strip()

if "height" in df_home.columns:
    df_home["height"] = df_home["height"].fillna(df_home["height"].mean(skipna=True))
if "weight" in df_home.columns:
    df_home["weight"] = df_home["weight"].fillna(df_home["weight"].mean(skipna=True))
if "current_rank" in df_home.columns:
    df_home["current_rank"] = df_home["current_rank"].fillna(df_home["current_rank"].median(skipna=True))

for col in ["gender", "plays"]:
    if col in df_home.columns:
        mode_val = df_home[col].mode(dropna=True)
        if not mode_val.empty:
            df_home[col] = df_home[col].fillna(mode_val.iloc[0])
        else:
            df_home[col] = df_home[col].fillna("Unknown")

for col in ["player_id", "full_name", "country"]:
    if col in df_home.columns:
        df_home[col] = df_home[col].fillna("Unknown")

if "match_id" in df_home.columns:
    df_home["match_id"] = df_home["match_id"].astype(str)

df_home.to_csv(output_path, index=False)
print("‚úÖ home_team_final.csv created successfully!")
print(df_home.info())
print(df_home.isna().sum())

###  Normalization ‚Äî Away Team Table

In [None]:
input_path = os.path.join(base_path, "away_team_clean.csv")
output_path = os.path.join(base_path, "away_team_final.csv")

df_away = pd.read_csv(input_path)

numeric_cols = ["height", "weight", "current_rank"]
for col in numeric_cols:
    if col in df_away.columns:
        df_away[col] = pd.to_numeric(df_away[col], errors="coerce")

if "gender" in df_away.columns:
    df_away["gender"] = df_away["gender"].astype(str).str.strip().str.title().replace({"Nan":"Unknown"})
if "plays" in df_away.columns:
    df_away["plays"] = df_away["plays"].astype(str).str.strip().str.lower().replace({"nan":"unknown"})
for col in ["full_name", "country"]:
    if col in df_away.columns:
        df_away[col] = df_away[col].astype(str).str.strip()

if "height" in df_away.columns:
    df_away["height"] = df_away["height"].fillna(df_away["height"].mean(skipna=True))
if "weight" in df_away.columns:
    df_away["weight"] = df_away["weight"].fillna(df_away["weight"].mean(skipna=True))
if "current_rank" in df_away.columns:
    df_away["current_rank"] = df_away["current_rank"].fillna(df_away["current_rank"].median(skipna=True))

for col in ["gender", "plays"]:
    if col in df_away.columns:
        mode_val = df_away[col].mode(dropna=True)
        if not mode_val.empty:
            df_away[col] = df_away[col].fillna(mode_val.iloc[0])
        else:
            df_away[col] = df_away[col].fillna("Unknown")

for col in ["player_id", "full_name", "country"]:
    if col in df_away.columns:
        df_away[col] = df_away[col].fillna("Unknown")

if "match_id" in df_away.columns:
    df_away["match_id"] = df_away["match_id"].astype(str)

df_away.to_csv(output_path, index=False)
print("‚úÖ away_team_final.csv created successfully!")
print(df_away.info())
print(df_away.isna().sum())

##  Part 6: Review and Summary
#### In this final part, we review all steps in the data preparation phase:
# 
| Step | Description | Output Folder | Key Action |
|------|--------------|----------------|-------------|
| 1 | Extraction from Parquet (Raw) | `../data/raw` | `build_table()` function |
| 2 | Cleaning | `../data/processed` | Remove duplicates, fill NaN with neutral values |
| 3 | Normalization | `../data/processed/clean` | Type casting, smart imputation |
# 
####  All datasets are now ready for **Phase 2 (Data Integration)**, where we will merge and build the central match table for analysis.
# 