In [6]:
import pandas as pd

In [7]:
df = pd.read_parquet('../data/datalake/daily/2025/07/2025-07-30-data.parquet')

In [8]:
df.head()

Unnamed: 0,Fecha,Estado,Ciudad,Tipo,Canal,Precio
0,2025-07-30,Aguascalientes,Aguascalientes,Pasteurizada,Tiendas,27.4
1,2025-07-30,Baja California,Mexicali,Pasteurizada,Tiendas,29.3
2,2025-07-30,Baja California,Tijuana,Pasteurizada,Tiendas,
3,2025-07-30,Baja California Sur,La Paz,Pasteurizada,Tiendas,26.0
4,2025-07-30,Campeche,Campeche,Pasteurizada,Tiendas,


Test locally

In [11]:
from datetime import datetime, timedelta
from pathlib import Path
import pandas as pd

def prepare_full_dataset(
    reference_date: datetime = None,
    lookback_days: int = 548,
    source_root: str = "../data/datalake/daily",
    output_path: str = "../data/processed/full_dataset.parquet"
) -> pd.DataFrame:
    if reference_date is None:
        reference_date = datetime.today()

    start_date = reference_date - timedelta(days=lookback_days)
    source_root = Path(source_root)
    all_files = []

    # Traverse dates and collect file paths
    for single_date in (start_date + timedelta(n) for n in range(lookback_days + 1)):
        file_path = source_root / f"{single_date.year:04d}" / f"{single_date.month:02d}" / f"{single_date.strftime('%Y-%m-%d')}-data.parquet"
        if file_path.exists():
            all_files.append(file_path)

    if not all_files:
        raise FileNotFoundError("No data files found for the specified lookback window.")

    # Load and concatenate
    df_list = [pd.read_parquet(fp) for fp in all_files]
    df = pd.concat(df_list, ignore_index=True)

    # Clean and normalize
    df = df.dropna(subset=["Fecha", "Precio"])
    df["Fecha"] = pd.to_datetime(df["Fecha"])

    # Sort and group by categorical columns
    group_cols = ["Estado", "Ciudad", "Tipo", "Canal"]
    df = df.sort_values(group_cols + ["Fecha"])

    # Add date-based features
    df["mes"] = df["Fecha"].dt.month
    df["dia"] = df["Fecha"].dt.day
    df["dia_semana"] = df["Fecha"].dt.day_name()

    # Lag and rolling mean
    df["Precio_lag1"] = (
        df.groupby(group_cols)["Precio"]
        .shift(1)
    )

    df["Precio_mean7"] = (
        df.groupby(group_cols)["Precio"]
        .rolling(window=7, min_periods=1)
        .mean()
        .reset_index(level=group_cols, drop=True)
    )

    # Save
    output_path = Path(output_path)
    output_path.parent.mkdir(parents=True, exist_ok=True)
    df.to_parquet(output_path, index=False)

    print(f"✅ Processed dataset with {len(df)} rows saved to: {output_path}")
    return df


In [12]:
prepare_full_dataset()

✅ Processed dataset with 41954 rows saved to: ../data/processed/full_dataset.parquet


Unnamed: 0,Fecha,Estado,Ciudad,Tipo,Canal,Precio,mes,dia,dia_semana,Precio_lag1,Precio_mean7
54,2024-01-31,Aguascalientes,Aguascalientes,Pasteurizada,Autoservicios,23.62,1,31,Wednesday,,23.6200
270,2024-02-02,Aguascalientes,Aguascalientes,Pasteurizada,Autoservicios,23.62,2,2,Friday,23.62,23.6200
486,2024-02-07,Aguascalientes,Aguascalientes,Pasteurizada,Autoservicios,23.62,2,7,Wednesday,23.62,23.6200
702,2024-02-09,Aguascalientes,Aguascalientes,Pasteurizada,Autoservicios,24.15,2,9,Friday,23.62,23.7525
918,2024-02-12,Aguascalientes,Aguascalientes,Pasteurizada,Autoservicios,24.15,2,12,Monday,24.15,23.8320
...,...,...,...,...,...,...,...,...,...,...,...
47033,2025-07-21,Zacatecas,Zacatecas,Ultrapasteurizada,Tiendas,33.50,7,21,Monday,33.50,33.5000
47249,2025-07-23,Zacatecas,Zacatecas,Ultrapasteurizada,Tiendas,33.50,7,23,Wednesday,33.50,33.5000
47465,2025-07-25,Zacatecas,Zacatecas,Ultrapasteurizada,Tiendas,33.50,7,25,Friday,33.50,33.5000
47681,2025-07-28,Zacatecas,Zacatecas,Ultrapasteurizada,Tiendas,33.50,7,28,Monday,33.50,33.5000


Test getting the files from s3

In [13]:
from datetime import datetime, timedelta
import pandas as pd
from pathlib import Path

def prepare_full_dataset_s3(
    reference_date: datetime = None,
    lookback_days: int = 548,
    s3_root: str = "s3://mlops-milk-datalake/daily",
    output_path: str = "../data/processed/full_dataset.parquet"
) -> pd.DataFrame:
    if reference_date is None:
        reference_date = datetime.today()

    start_date = reference_date - timedelta(days=lookback_days)
    all_files = []

    # Traverse S3 paths for the lookback period
    for single_date in (start_date + timedelta(n) for n in range(lookback_days + 1)):
        s3_path = f"{s3_root}/{single_date.year:04d}/{single_date.month:02d}/{single_date.strftime('%Y-%m-%d')}-data.parquet"
        try:
            # Try loading metadata only to validate existence
            pd.read_parquet(s3_path, engine="pyarrow", storage_options={"anon": False}, columns=["Precio"])
            all_files.append(s3_path)
        except Exception:
            continue  # skip if file does not exist

    if not all_files:
        raise FileNotFoundError("No valid S3 Parquet files found in the given window.")

    # Load and concatenate
    df_list = [
        pd.read_parquet(fp, engine="pyarrow", storage_options={"anon": False})
        for fp in all_files
    ]
    df = pd.concat(df_list, ignore_index=True)

    # Clean and normalize
    df = df.dropna(subset=["Fecha", "Precio"])
    df["Fecha"] = pd.to_datetime(df["Fecha"])

    # Grouping by categorical columns
    group_cols = ["Estado", "Ciudad", "Tipo", "Canal"]
    df = df.sort_values(group_cols + ["Fecha"])

    # Add features
    df["mes"] = df["Fecha"].dt.month
    df["dia"] = df["Fecha"].dt.day
    df["dia_semana"] = df["Fecha"].dt.day_name()

    df["Precio_lag1"] = df.groupby(group_cols)["Precio"].shift(1)

    df["Precio_mean7"] = (
        df.groupby(group_cols)["Precio"]
        .rolling(window=7, min_periods=1)
        .mean()
        .reset_index(level=group_cols, drop=True)
    )

    # Save locally
    output_path = Path(output_path)
    output_path.parent.mkdir(parents=True, exist_ok=True)
    df.to_parquet(output_path, index=False)

    print(f"✅ Loaded {len(df)} rows from S3 and saved to: {output_path}")
    return df


In [14]:
result_df = prepare_full_dataset_s3()

✅ Loaded 41954 rows from S3 and saved to: ../data/processed/full_dataset.parquet


the previous one worked, but took about 8 minutes, because it was trying to get a file for each day and loading each day df

In [15]:
from datetime import datetime, timedelta
import pandas as pd
from pathlib import Path
import fsspec

def prepare_full_dataset_s3_optimized(
    reference_date: datetime = None,
    lookback_days: int = 548,
    s3_root: str = "s3://mlops-milk-datalake/daily",
    output_path: str = "../data/processed/full_dataset.parquet"
) -> pd.DataFrame:
    if reference_date is None:
        reference_date = datetime.today()

    start_date = reference_date - timedelta(days=lookback_days)
    fs = fsspec.filesystem("s3")

    # 1. List all existing Parquet files under daily/
    all_paths = fs.glob(f"{s3_root}/**/*-data.parquet")

    # 2. Filter files by extracting the date from the filename
    valid_files = []
    for path in all_paths:
        filename = path.split("/")[-1]
        try:
            file_date = datetime.strptime(filename.split("-data.parquet")[0], "%Y-%m-%d")
            if start_date <= file_date <= reference_date:
                valid_files.append(f"s3://{path}")  # Reattach s3:// prefix
        except:
            continue

    if not valid_files:
        raise FileNotFoundError("No valid files found within date window.")

    # 3. Load files
    df_list = [pd.read_parquet(fp, filesystem=fs) for fp in sorted(valid_files)]
    df = pd.concat(df_list, ignore_index=True)

    # 4. Clean and process
    df = df.dropna(subset=["Fecha", "Precio"])
    df["Fecha"] = pd.to_datetime(df["Fecha"])

    group_cols = ["Estado", "Ciudad", "Tipo", "Canal"]
    df = df.sort_values(group_cols + ["Fecha"])
    df["mes"] = df["Fecha"].dt.month
    df["dia"] = df["Fecha"].dt.day
    df["dia_semana"] = df["Fecha"].dt.day_name()
    df["Precio_lag1"] = df.groupby(group_cols)["Precio"].shift(1)
    df["Precio_mean7"] = (
        df.groupby(group_cols)["Precio"]
        .rolling(window=7, min_periods=1)
        .mean()
        .reset_index(level=group_cols, drop=True)
    )

    # 5. Save locally
    output_path = Path(output_path)
    output_path.parent.mkdir(parents=True, exist_ok=True)
    df.to_parquet(output_path, index=False)

    print(f"✅ Loaded {len(df)} rows from {len(valid_files)} S3 files.")
    return df


In [16]:
prepare_full_dataset_s3_optimized()

✅ Loaded 41954 rows from 222 S3 files.


Unnamed: 0,Fecha,Estado,Ciudad,Tipo,Canal,Precio,mes,dia,dia_semana,Precio_lag1,Precio_mean7
54,2024-01-31,Aguascalientes,Aguascalientes,Pasteurizada,Autoservicios,23.62,1,31,Wednesday,,23.6200
270,2024-02-02,Aguascalientes,Aguascalientes,Pasteurizada,Autoservicios,23.62,2,2,Friday,23.62,23.6200
486,2024-02-07,Aguascalientes,Aguascalientes,Pasteurizada,Autoservicios,23.62,2,7,Wednesday,23.62,23.6200
702,2024-02-09,Aguascalientes,Aguascalientes,Pasteurizada,Autoservicios,24.15,2,9,Friday,23.62,23.7525
918,2024-02-12,Aguascalientes,Aguascalientes,Pasteurizada,Autoservicios,24.15,2,12,Monday,24.15,23.8320
...,...,...,...,...,...,...,...,...,...,...,...
47033,2025-07-21,Zacatecas,Zacatecas,Ultrapasteurizada,Tiendas,33.50,7,21,Monday,33.50,33.5000
47249,2025-07-23,Zacatecas,Zacatecas,Ultrapasteurizada,Tiendas,33.50,7,23,Wednesday,33.50,33.5000
47465,2025-07-25,Zacatecas,Zacatecas,Ultrapasteurizada,Tiendas,33.50,7,25,Friday,33.50,33.5000
47681,2025-07-28,Zacatecas,Zacatecas,Ultrapasteurizada,Tiendas,33.50,7,28,Monday,33.50,33.5000


worked in less than a minute

In [1]:
import pandas as pd

In [2]:
full_df = pd.read_parquet('../data/processed/full_dataset.parquet')

In [3]:
full_df

Unnamed: 0,Fecha,Estado,Ciudad,Tipo,Canal,Precio,mes,dia,dia_semana,Precio_lag1,Precio_mean7
0,2024-01-29,Aguascalientes,Aguascalientes,Pasteurizada,Autoservicios,23.62,1,29,Monday,,23.620
1,2024-01-31,Aguascalientes,Aguascalientes,Pasteurizada,Autoservicios,23.62,1,31,Wednesday,23.62,23.620
2,2024-02-02,Aguascalientes,Aguascalientes,Pasteurizada,Autoservicios,23.62,2,2,Friday,23.62,23.620
3,2024-02-07,Aguascalientes,Aguascalientes,Pasteurizada,Autoservicios,23.62,2,7,Wednesday,23.62,23.620
4,2024-02-09,Aguascalientes,Aguascalientes,Pasteurizada,Autoservicios,24.15,2,9,Friday,23.62,23.726
...,...,...,...,...,...,...,...,...,...,...,...
42138,2025-07-21,Zacatecas,Zacatecas,Ultrapasteurizada,Tiendas,33.50,7,21,Monday,33.50,33.500
42139,2025-07-23,Zacatecas,Zacatecas,Ultrapasteurizada,Tiendas,33.50,7,23,Wednesday,33.50,33.500
42140,2025-07-25,Zacatecas,Zacatecas,Ultrapasteurizada,Tiendas,33.50,7,25,Friday,33.50,33.500
42141,2025-07-28,Zacatecas,Zacatecas,Ultrapasteurizada,Tiendas,33.50,7,28,Monday,33.50,33.500
