In [None]:

import os
import logging
from glob import glob
import pandas as pd

logging.basicConfig(level=logging.INFO)


def load_csv_in_chunks(file_path: str, chunk_size):
    """Docstring"""
    logging.info("Reading file in chunks: %s", file_path)

    for chunk in pd.read_csv(file_path, sep=",", chunksize=chunk_size):
        yield chunk


def load_all_months(data_folder: str, chunck_size):
    """Docstring"""
    files = sorted(glob(os.path.join(data_folder, "*.csv")))

    for file in files:
        file_path = os.path.join(data_folder, file)
        logging.info("Starting file: %s", file)
        yield from load_csv_in_chunks(file_path, chunck_size)


def save_chunk_to_parquet(df, chunk_index, output_folder):
    """
    Save a cleaned DataFrame chunk to a Parquet file with a consistent naming scheme.

    Args:
        df (pd.DataFrame): The cleaned DataFrame to save.
        chunk_index (int): The index of the current chunk.
        output_folder (str): Destination folder where to save the Parquet file.

    Returns:
        str: Path to the saved Parquet file.
    """
    os.makedirs(output_folder, exist_ok=True)

    output_path = os.path.join(
        output_folder, f"cleaned_chunk_{chunk_index}.parquet")

    df.to_parquet(output_path, index=False)
    logging.info("Saved cleaned_chunk_%s to %s", chunk_index, output_path)

    return output_path


def load_all_parquet(parquet_folder: str) -> pd.DataFrame:
    """
    Loads and concatenates all .parquet files in the specified folder.

    Args:
        parquet_folder (str): Path to the folder containing .parquet files.

    Returns:
        pd.DataFrame: Concatenated DataFrame with all the loaded data.
    """
    parquet_files = sorted(glob(os.path.join(parquet_folder, "*.parquet")))

    if not parquet_files:
        logging.warning("No .parquet files found in %s", parquet_folder)
        return pd.DataFrame()

    logging.info("Found %d .parquet files in %s",
                 len(parquet_files), parquet_folder)

    dfs = []
    for path in parquet_files:
        logging.info("Loading: %s", path)
        dfs.append(pd.read_parquet(path))

    full_df = pd.concat(dfs, ignore_index=True)
    logging.info("All files loaded. Final shape: %s", full_df.shape)

    return full_df

In [None]:
logging.basicConfig(level=logging.INFO)


def clean_chunk(df: pd.DataFrame):
    """DocString"""
    # event_time
    df["event_time"] = pd.to_datetime(
        df["event_time"], utc=True, errors="coerce")
    df = df.dropna(subset=['event_time'])

    # event_type
    valid_events = ["view", "cart", "purchase", "remove_from_cart"]
    df = df[df["event_type"].isin(valid_events)]

    # category_code to main_category and sub_category
    df['category_code'] = df['category_code'].fillna('unknown')
    df['category_code'] = df['category_code'].astype(str)
    df['main_category'] = df['category_code'].apply(
        lambda x: x.split('.')[0] if x != 'unknown' else 'unknown')
    df['sub_category'] = df['category_code'].apply(
        lambda x: 'unknown' if x == 'unknown' else '.'.join(x.split('.')[1:]) or x)

    # user_id
    df = df.dropna(subset=['user_id'])

    # brand
    df["brand"] = df["brand"].fillna("unknown")
    df["brand"] = df["brand"].astype(str)

    # price
    df['price'] = pd.to_numeric(df['price'], errors='coerce')
    df = df[~((df['event_type'] == 'purchase') & (df['price'].isnull()))]
    df['price'] = df['price'].fillna(0)
    df['price'] = df['price'].astype(float)

    # user_session
    df['user_session'] = df['user_session'].fillna('unknown')
    df['user_session'] = df['user_session'].astype(str)

    # drop columns
    df = df.drop(columns=["category_id", "category_code", "product_id"])

    return df.reset_index(drop=True)