<div style="font-family: system-ui, -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif; border: 1px solid #ddd; border-radius: 10px; padding: 16px 20px; margin-bottom: 16px; background: #fafafa;">
  <h1 style="margin-top: 0; margin-bottom: 8px; font-size: 24px;">HAVI – 01_data_ingestion</h1>
  <p style="margin: 0 0 12px 0; font-size: 14px;">
    Cel: wczytać wszystkie pliki HAVI (.xlsx), ujednolicić strukturę i zapisać wspólną tabelę tygodniową <code>master_raw</code>.
  </p>
  <table style="border-collapse: collapse; font-size: 13px;">
    <thead>
      <tr>
        <th style="border: 1px solid #ccc; padding: 4px 8px;">Kraj</th>
        <th style="border: 1px solid #ccc; padding: 4px 8px;">Granularność</th>
        <th style="border: 1px solid #ccc; padding: 4px 8px;">Lata</th>
        <th style="border: 1px solid #ccc; padding: 4px 8px;">Uwagi</th>
      </tr>
    </thead>
    <tbody>
      <tr>
        <td style="border: 1px solid #ccc; padding: 4px 8px;">Spain, Poland, Portugal, Romania, Sweden</td>
        <td style="border: 1px solid #ccc; padding: 4px 8px;">tygodnie</td>
        <td style="border: 1px solid #ccc; padding: 4px 8px;">2019–2025</td>
        <td style="border: 1px solid #ccc; padding: 4px 8px;">arkusz <code>ShortBaseByYear_Week</code></td>
      </tr>
      <tr>
        <td style="border: 1px solid #ccc; padding: 4px 8px;">Germany</td>
        <td style="border: 1px solid #ccc; padding: 4px 8px;">dni agregacja do tygodni</td>
        <td style="border: 1px solid #ccc; padding: 4px 8px;">2022–2025</td>
        <td style="border: 1px solid #ccc; padding: 4px 8px;">arkusz <code>Export</code>, kolumna <code>Delivered Cases</code></td>
      </tr>
    </tbody>
  </table>
</div>

# HAVI – 01_data_ingestion

## 1. Cel notebooka

Celem tego notebooka jest:
1. Wczytanie wszystkich plików HAVI (`.xlsx`) z katalogu roboczego.
2. Ujednolicenie ich struktury do wspólnego formatu tygodniowego.
3. Zbudowanie tabeli `master_raw` (poziom: `country, dc_id, sku, week_start`) gotowej do dalszych etapów:
   - preprocessing / feature engineering,
   - walidacja rolling-origin,
   - modelowanie per seria (`country-dc-sku`).

## 2. Zakres danych (źródła)

- **Spain, Poland, Portugal, Romania, Sweden**  
  Dane tygodniowe, arkusz `ShortBaseByYear_Week` lub `ShortBaseByYear_Month_Week`.  
  Klucz czasu: `BLD Year`, `BLD Calendar Week Number`.

- **Germany**  
  Dane dzienne, arkusz `Export`, wolumen w kolumnie `Delivered Cases`.  
  Agregujemy do tygodni ISO.


In [1]:
import pandas as pd
import numpy as np
from pathlib import Path
import re

pd.set_option("display.max_rows", 30)
pd.set_option("display.max_columns", None)

BASE_DIR = Path(".")
DATA_DIR = Path("data")
DATA_DIR.mkdir(exist_ok=True)

QC_DIR = DATA_DIR / "qc_reports"
QC_DIR.mkdir(exist_ok=True)

excel_files = sorted(BASE_DIR.glob("*.xlsx"))
print(f"Znaleziono plików .xlsx: {len(excel_files)}")
excel_files[:5]


Znaleziono plików .xlsx: 20


[WindowsPath('ES Beer 30.xlsx'),
 WindowsPath('ES Gazpacho.xlsx'),
 WindowsPath('ES Ice Cream Cone.xlsx'),
 WindowsPath('Germany 19-003-003.xlsx'),
 WindowsPath('Germany 4-807-019.xlsx')]

## 3. Funkcje pomocnicze

W tej sekcji definiujemy:
- filtr plików tymczasowych (np. `~$...xlsx`),
- rozpoznawanie kraju na podstawie nazwy pliku,
- bezpieczną konwersję typów,
- konwersję (ISO year, ISO week) → data poniedziałku (`week_start`).

`week_start` będzie kluczowy w całym pipeline, bo upraszcza:
- łączenie danych,
- budowę cech,
- walidację i wykresy.


In [19]:
COUNTRY_FROM_FILENAME = {
    "Poland": "Poland",
    "Spain": "Spain",
    "Portugal": "Portugal",
    "Romania": "Romania",
    "Sweden": "Sweden",
    "Germany": "Germany",
}

def is_valid_xlsx(path: Path) -> bool:
    name = path.name
    if name.startswith("~$"):
        return False
    if name.startswith(".~"):
        return False
    if name.lower().endswith(".tmp"):
        return False
    return True

def guess_country_from_filename(filename: str) -> str:
    for k, v in COUNTRY_FROM_FILENAME.items():
        if filename.startswith(k) or k in filename:
            return v
    return "UNKNOWN"

def to_int_safe(s: pd.Series) -> pd.Series:
    return pd.to_numeric(s, errors="coerce").astype("Int64")

def iso_week_start(year: pd.Series, week: pd.Series) -> pd.Series:
    """
    Zamienia (ISO year, ISO week) na poniedziałek danego tygodnia.
    """
    y = year.astype(str)
    w = week.astype(str).str.zfill(2)
    return pd.to_datetime(y + "-W" + w + "-1", format="%G-W%V-%u", errors="coerce")


## 4. Kraje tygodniowe 

Obsługuje: **Poland, Spain, Portugal, Romania, Sweden**.

Zasady:
- wybieramy arkusz tygodniowy `ShortBaseByYear_Week` lub `ShortBaseByYear_Month_Week`,
- mapujemy nazwy kolumn na wspólny schemat,
- wykrywamy kolumnę wolumenu (`Sales Quantity Billed*`) i mapujemy ją na `demand_raw`,
- uzupełniamy brakujące `country/dc_id/sku/product_name`,
- wyliczamy `week_start`.


In [20]:
def load_weekly_shortbase(path: Path) -> pd.DataFrame:
    xls = pd.ExcelFile(path)

    candidate_sheets = [
        "ShortBaseByYear_Week",
        "ShortBaseByYear_Month_Week",
        "ShortBaseByYear_Month",  
    ]
    sheet = next((s for s in candidate_sheets if s in xls.sheet_names), None)
    if sheet is None:
        raise ValueError(f"{path.name}: brak arkusza ShortBase (week/month-week)")

    df = pd.read_excel(xls, sheet_name=sheet)

    col_map = {
        "DIST Country": "country",
        "PDC Number": "dc_id",
        "DC Number": "dc_id",
        "MAT WRIN0": "sku",
        "MAT Description": "product_name",
        "BLD Year": "year",
        "BLD Calendar Week Number": "week",
    }
    df = df.rename(columns={k: v for k, v in col_map.items() if k in df.columns})

    demand_candidates = [
        "Sales Quantity Billed MD",
        "Sales Quantity Billed",
        "Sales Quantity Billed MD ",
        "Sales Quantity Billed Doc Curr",
    ]
    demand_col = next((c for c in demand_candidates if c in df.columns), None)
    if demand_col is None:
        raise ValueError(f"{path.name}: brak kolumny wolumenu (Sales Quantity Billed*)")

    df = df.rename(columns={demand_col: "demand_raw"})

    if "country" not in df.columns:
        df["country"] = guess_country_from_filename(path.name)
    if "dc_id" not in df.columns:
        df["dc_id"] = "ALL"
    if "sku" not in df.columns:
        m = re.search(r"\d{2,6}-\d{3}-\d{3}", path.name)
        df["sku"] = m.group(0) if m else path.stem
    if "product_name" not in df.columns:
        df["product_name"] = path.stem

    df["year"] = to_int_safe(df["year"])

    if "week" not in df.columns:
        raise ValueError(f"{path.name}: brak kolumny week (BLD Calendar Week Number).")

    df["week"] = to_int_safe(df["week"])
    df["demand_raw"] = pd.to_numeric(df["demand_raw"], errors="coerce")
    df["week_start"] = iso_week_start(df["year"], df["week"])

    df["source_file"] = path.name
    df["source_sheet"] = sheet

    return df[
        [
            "country", "dc_id", "sku", "product_name",
            "year", "week", "week_start",
            "demand_raw",
            "source_file", "source_sheet",
        ]
    ]


## 5. Germany 

Pliki z Niemiec mają:
- arkusz `Export`,
- dane dzienne w kolumnie `DLVD dd-mm-yyyy`,
- wolumen w kolumnie `Delivered Cases`.

Działanie:
1. Parsujemy datę,
2. wyliczamy ISO year/week,
3. agregujemy dzienne wolumeny do tygodni,
4. trzymamy `product_name = MAT Description`.


In [21]:
def load_germany_export_daily(path: Path) -> pd.DataFrame:
    df = pd.read_excel(path, sheet_name="Export")

    df["DLVD dd-mm-yyyy"] = pd.to_datetime(
        df["DLVD dd-mm-yyyy"], dayfirst=True, errors="coerce"
    )
    df = df.dropna(subset=["DLVD dd-mm-yyyy"])

    iso = df["DLVD dd-mm-yyyy"].dt.isocalendar()
    df["year"] = iso.year.astype(int)
    df["week"] = iso.week.astype(int)
    df["week_start"] = iso_week_start(df["year"].astype("Int64"), df["week"].astype("Int64"))

    df_weekly = (
        df.groupby(["DC Number", "MAT WRIN0", "MAT Description", "year", "week", "week_start"])["Delivered Cases"]
        .sum()
        .reset_index()
    )

    df_weekly = df_weekly.rename(
        columns={
            "DC Number": "dc_id",
            "MAT WRIN0": "sku",
            "MAT Description": "product_name",
            "Delivered Cases": "demand_raw",
        }
    )

    df_weekly["country"] = "Germany"
    df_weekly["source_file"] = path.name
    df_weekly["source_sheet"] = "Export"

    return df_weekly[
        [
            "country", "dc_id", "sku", "product_name",
            "year", "week", "week_start",
            "demand_raw",
            "source_file", "source_sheet",
        ]
    ]


## 6. Wczytanie wszystkich plików

- filtrujemy pliki tymczasowe,
- dla każdego pliku wybieramy loader:
  - `Germany` → `load_germany_export_daily`,
  - reszta → `load_weekly_shortbase`,
- błędy zbieramy do tabeli `errors` (nic nie znika bez śladu).


In [22]:
excel_files = [p for p in excel_files if is_valid_xlsx(p)]
print(f"Po filtrze zostało plików .xlsx: {len(excel_files)}")
excel_files[:5]


Po filtrze zostało plików .xlsx: 20


[WindowsPath('ES Beer 30.xlsx'),
 WindowsPath('ES Gazpacho.xlsx'),
 WindowsPath('ES Ice Cream Cone.xlsx'),
 WindowsPath('Germany 19-003-003.xlsx'),
 WindowsPath('Germany 4-807-019.xlsx')]

In [23]:
weekly_dfs = []
de_dfs = []
errors = []

for p in excel_files:
    try:
        if "Germany" in p.name:
            de_dfs.append(load_germany_export_daily(p))
        else:
            weekly_dfs.append(load_weekly_shortbase(p))
        print(f"OK: {p.name}")
    except Exception as e:
        errors.append((p.name, str(e)))
        print(f"ERROR: {p.name} -> {e}")

print("\nPodsumowanie:")
print(f"- ShortBase (tygodniowe): {len(weekly_dfs)}")
print(f"- Germany Export: {len(de_dfs)}")
print(f"- Błędy: {len(errors)}")

if errors:
    display(pd.DataFrame(errors, columns=["file", "error"]))


OK: ES Beer 30.xlsx
OK: ES Gazpacho.xlsx
OK: ES Ice Cream Cone.xlsx


  warn("Workbook contains no default style, apply openpyxl's default")


OK: Germany 19-003-003.xlsx


  warn("Workbook contains no default style, apply openpyxl's default")


OK: Germany 4-807-019.xlsx
OK: Polska 02589-489-000 TLUSZCZ.xlsx
OK: Polska 05243-022-000 KAWA.xlsx
OK: Polska 16333-000-000 Herbata Mietowa.xlsx
OK: Polska 62170-027-000 Chipsy PAPRYKA.xlsx
OK: PT Hot Chocolate.xlsx
OK: PT Sundae Mix.xlsx
OK: Romania 12-432-000 Sundae.xlsx
OK: Romania 5243-115-000 Coffee.xlsx
OK: Romania 76518-000-000 Tea.xlsx
OK: Romania 77-010-000 Placinta visne.xlsx
OK: Romania 7808-016-000 Cinnamon.xlsx
OK: Sweden 11-294-000 Shake.xlsx
OK: Sweden 295-629-000 Clam.xlsx
OK: Sweden 397-117-000 Cup Carrier.xlsx
OK: Sweden 75-072-000 Milk.xlsx

Podsumowanie:
- ShortBase (tygodniowe): 18
- Germany Export: 2
- Błędy: 0


In [24]:
df_all = pd.concat(weekly_dfs + de_dfs, ignore_index=True)
print("Połączony zbiór df_all:", df_all.shape)
display(df_all.head(10))


Połączony zbiór df_all: (18972, 10)


Unnamed: 0,country,dc_id,sku,product_name,year,week,week_start,demand_raw,source_file,source_sheet
0,Spain,100,04592-030-000,CERVEZA 30L,2019,1,2018-12-31,833.0,ES Beer 30.xlsx,ShortBaseByYear_Week
1,Spain,100,04592-030-000,CERVEZA 30L,2020,1,2019-12-30,349.0,ES Beer 30.xlsx,ShortBaseByYear_Week
2,Spain,100,04592-030-000,CERVEZA 30L,2021,1,2021-01-04,325.0,ES Beer 30.xlsx,ShortBaseByYear_Week
3,Spain,100,04592-030-000,CERVEZA 30L,2022,1,2022-01-03,552.0,ES Beer 30.xlsx,ShortBaseByYear_Week
4,Spain,100,04592-030-000,CERVEZA 30L,2023,1,2023-01-02,696.0,ES Beer 30.xlsx,ShortBaseByYear_Week
5,Spain,100,04592-030-000,CERVEZA 30L,2024,1,2024-01-01,747.0,ES Beer 30.xlsx,ShortBaseByYear_Week
6,Spain,100,04592-030-000,CERVEZA 30L,2025,1,2024-12-30,319.0,ES Beer 30.xlsx,ShortBaseByYear_Week
7,Spain,100,04592-030-000,CERVEZA 30L,2019,2,2019-01-07,689.0,ES Beer 30.xlsx,ShortBaseByYear_Week
8,Spain,100,04592-030-000,CERVEZA 30L,2020,2,2020-01-06,697.0,ES Beer 30.xlsx,ShortBaseByYear_Week
9,Spain,100,04592-030-000,CERVEZA 30L,2021,2,2021-01-11,250.0,ES Beer 30.xlsx,ShortBaseByYear_Week


In [25]:
df_all["product_name"] = (
    df_all["product_name"]
    .astype("string")
    .str.strip()
    .replace({"": pd.NA, "nan": pd.NA})
)

df_all[["sku", "product_name", "country"]].drop_duplicates().head(22)

Unnamed: 0,sku,product_name,country
0,04592-030-000,CERVEZA 30L,Spain
2110,00119-066-000,*GAZPACHO,Spain
2271,00119-066-001,GAZPACHO (17783-000),Spain
4188,00023-189-000,CONO,Spain
6262,02589-489-000,TLUSZCZ,Poland
7665,05243-022-000,KAWA,Poland
8967,16333-000-000,Herbata Mietowa,Poland
9762,62170-027-000,Chipsy PAPRYKA,Poland
10304,00041-097-000,CHOC QT 05 MC,Portugal
10995,00012-438-000,*LEITE SUNDAE,Portugal


## 7. Standaryzacja typów i formatów

- docinamy spacje w identyfikatorach,
- wymuszamy typy liczbowe w `year`, `week`, `demand_raw`.


In [26]:
df_all["country"] = df_all["country"].astype(str).str.strip()
df_all["dc_id"] = df_all["dc_id"].astype(str).str.strip()
df_all["sku"] = df_all["sku"].astype(str).str.strip()
df_all["product_name"] = df_all["product_name"].astype(str).str.strip()

df_all["demand_raw"] = pd.to_numeric(df_all["demand_raw"], errors="coerce")
df_all["year"] = pd.to_numeric(df_all["year"], errors="coerce").astype("Int64")
df_all["week"] = pd.to_numeric(df_all["week"], errors="coerce").astype("Int64")

df_all["week_start"] = pd.to_datetime(df_all["week_start"], errors="coerce")


### 1. Braki w kluczowych kolumnach + raport

Jeśli występują braki w kluczu, zapisujemy raport do `data/qc_reports/`.


In [27]:
key_cols = ["country", "dc_id", "sku", "year", "week", "week_start", "demand_raw"]

na_counts = df_all[key_cols].isna().sum().sort_values(ascending=False)
print("Braki w kluczowych kolumnach:")
display(na_counts)

na_rows = df_all[df_all[key_cols].isna().any(axis=1)].copy()
if len(na_rows) > 0:
    na_path = QC_DIR / "missing_key_fields.csv"
    na_rows.to_csv(na_path, index=False)
    print(f"Zapisano raport braków: {na_path}")
    display(na_rows.head(20))


Braki w kluczowych kolumnach:


country       0
dc_id         0
sku           0
year          0
week          0
week_start    0
demand_raw    0
dtype: int64

### 2. Niepoprawny tydzień 

Wystąpienie takich rekordów traktujemy jako błąd danych (raport).


In [28]:
invalid_week = df_all[(df_all["week"].isna()) | (df_all["week"] < 1) | (df_all["week"] > 53)].copy()
print(f"Rekordy z niepoprawnym week (poza 1..53 lub NA): {len(invalid_week)}")

if len(invalid_week) > 0:
    inv_path = QC_DIR / "invalid_week.csv"
    invalid_week.to_csv(inv_path, index=False)
    print(f"Zapisano raport invalid week: {inv_path}")
    display(invalid_week.head(20))


Rekordy z niepoprawnym week (poza 1..53 lub NA): 0


### 3. Zakres dat i liczności 

To jest tabelka, o którą prosisz: min/max, liczba rekordów, liczba tygodni, liczba SKU, liczba DC.


In [29]:
date_summary_country = (
    df_all.groupby("country")
    .agg(
        n_records=("demand_raw", "size"),
        n_unique_weeks=("week_start", "nunique"),
        min_week=("week_start", "min"),
        max_week=("week_start", "max"),
        n_skus=("sku", "nunique"),
        n_dcs=("dc_id", "nunique"),
    )
    .reset_index()
    .sort_values(["n_records"], ascending=False)
)

print("Zakres dat i liczności per kraj:")
display(date_summary_country)

date_summary_path = QC_DIR / "date_summary_country.csv"
date_summary_country.to_csv(date_summary_path, index=False)
print(f"Zapisano: {date_summary_path}")


Zakres dat i liczności per kraj:


Unnamed: 0,country,n_records,n_unique_weeks,min_week,max_week,n_skus,n_dcs
4,Spain,6262,352,2018-12-31,2025-11-03,4,7
1,Poland,4042,357,2018-12-31,2025-11-03,4,4
5,Sweden,3206,356,2018-12-31,2025-10-27,4,4
3,Romania,2212,356,2018-12-31,2025-10-27,5,3
0,Germany,1820,202,2021-12-27,2025-11-03,2,8
2,Portugal,1430,358,2018-12-31,2025-11-03,3,4


Zapisano: data\qc_reports\date_summary_country.csv


### 4. Wartości ujemne 

Wymaganie biznesowe: wartości ujemne zastępujemy zerami.
Najpierw raportujemy: kraj / DC / SKU / tydzień / wartość.


In [30]:
neg_mask = df_all["demand_raw"] < 0
negatives = df_all.loc[
    neg_mask,
    ["country", "dc_id", "sku", "product_name", "year", "week", "week_start", "demand_raw", "source_file", "source_sheet"],
].copy()

print(f"Znaleziono ujemnych wartości demand_raw: {len(negatives)}")
if len(negatives) > 0:
    neg_path = QC_DIR / "negatives_report.csv"
    negatives.sort_values(["country", "sku", "dc_id", "week_start"]).to_csv(neg_path, index=False)
    print(f"Zapisano raport ujemnych wartości: {neg_path}")
    display(negatives.head(30))

    df_all.loc[neg_mask, "demand_raw"] = 0.0
    print("Ujemne wartości zostały zamienione na 0.")
else:
    print("Brak ujemnych wartości.")


Znaleziono ujemnych wartości demand_raw: 22
Zapisano raport ujemnych wartości: data\qc_reports\negatives_report.csv


Unnamed: 0,country,dc_id,sku,product_name,year,week,week_start,demand_raw,source_file,source_sheet
77,Spain,100,04592-030-000,CERVEZA 30L,2020,12,2020-03-16,-32.0,ES Beer 30.xlsx,ShortBaseByYear_Week
90,Spain,100,04592-030-000,CERVEZA 30L,2020,14,2020-03-30,-2.0,ES Beer 30.xlsx,ShortBaseByYear_Week
431,Spain,200,04592-030-000,CERVEZA 30L,2020,12,2020-03-16,-6.0,ES Beer 30.xlsx,ShortBaseByYear_Week
1133,Spain,400,04592-030-000,CERVEZA 30L,2020,12,2020-03-16,-4.0,ES Beer 30.xlsx,ShortBaseByYear_Week
2154,Spain,100,00119-066-000,*GAZPACHO,2020,12,2020-03-16,-5.0,ES Gazpacho.xlsx,ShortBaseByYear_Week
2173,Spain,100,00119-066-000,*GAZPACHO,2022,18,2022-05-02,-4.0,ES Gazpacho.xlsx,ShortBaseByYear_Week
2502,Spain,200,00119-066-000,*GAZPACHO,2020,12,2020-03-16,-4.0,ES Gazpacho.xlsx,ShortBaseByYear_Week
2564,Spain,200,00119-066-000,*GAZPACHO,2022,35,2022-08-29,-0.001,ES Gazpacho.xlsx,ShortBaseByYear_Week
3908,Spain,700,00119-066-000,*GAZPACHO,2022,20,2022-05-16,-1.0,ES Gazpacho.xlsx,ShortBaseByYear_Week
4265,Spain,100,00023-189-000,CONO,2020,12,2020-03-16,-1.0,ES Ice Cream Cone.xlsx,ShortBaseByYear_Week


Ujemne wartości zostały zamienione na 0.


## 8. Duplikaty: identyczne vs konfliktowe 

Klucz serii w czasie:
`country, dc_id, sku, year, week`

- identyczne duplikaty (te same wartości) usuwamy,
- konfliktowe (różne wartości w tym samym kluczu) raportujemy do audytu.
Nie sumujemy konfliktów automatycznie w ingestion.


In [31]:
dedup_key = ["country", "dc_id", "sku", "year", "week"]
compare_cols = ["country", "dc_id", "sku", "product_name", "year", "week", "week_start", "demand_raw"]

dups = df_all[df_all.duplicated(subset=dedup_key, keep=False)].copy()
print(f"Duplikaty na kluczu czasu: {len(dups)} rekordów")

if len(dups) > 0:
    identical_dups = dups[dups.duplicated(subset=compare_cols, keep=False)].copy()
    conflict_dups = dups.drop(index=identical_dups.index).copy()

    print(f"Duplikaty identyczne: {len(identical_dups)}")
    print(f"Duplikaty konfliktowe: {len(conflict_dups)}")

    if len(conflict_dups) > 0:
        conflict_path = QC_DIR / "dup_conflicts.csv"
        conflict_dups.sort_values(dedup_key).to_csv(conflict_path, index=False)
        print(f"Zapisano raport konfliktów: {conflict_path}")
        display(conflict_dups.sort_values(dedup_key).head(30))

    before = len(df_all)
    df_all = df_all.drop_duplicates(subset=compare_cols, keep="first").copy()
    print(f"Usunięto identyczne duplikaty: {before - len(df_all)}")
else:
    print("Brak duplikatów na kluczu czasu.")


Duplikaty na kluczu czasu: 42 rekordów
Duplikaty identyczne: 10
Duplikaty konfliktowe: 32


Zapisano raport konfliktów: data\qc_reports\dup_conflicts.csv


Unnamed: 0,country,dc_id,sku,product_name,year,week,week_start,demand_raw,source_file,source_sheet
13677,Romania,100,07808-016-000,Cinnamon,2019,31,2019-07-29,2.0,Romania 7808-016-000 Cinnamon.xlsx,ShortBaseByYear_Month_Week
13679,Romania,100,07808-016-000,Cinnamon,2019,31,2019-07-29,1.0,Romania 7808-016-000 Cinnamon.xlsx,ShortBaseByYear_Month_Week
13719,Romania,100,07808-016-000,Cinnamon,2020,49,2020-11-30,4.0,Romania 7808-016-000 Cinnamon.xlsx,ShortBaseByYear_Month_Week
13721,Romania,100,07808-016-000,Cinnamon,2020,49,2020-11-30,2.0,Romania 7808-016-000 Cinnamon.xlsx,ShortBaseByYear_Month_Week
13847,Romania,200,07808-016-000,Cinnamon,2022,35,2022-08-29,2.0,Romania 7808-016-000 Cinnamon.xlsx,ShortBaseByYear_Month_Week
13851,Romania,200,07808-016-000,Cinnamon,2022,35,2022-08-29,1.0,Romania 7808-016-000 Cinnamon.xlsx,ShortBaseByYear_Month_Week
13878,Romania,200,07808-016-000,Cinnamon,2022,44,2022-10-31,5.0,Romania 7808-016-000 Cinnamon.xlsx,ShortBaseByYear_Month_Week
13880,Romania,200,07808-016-000,Cinnamon,2022,44,2022-10-31,2.0,Romania 7808-016-000 Cinnamon.xlsx,ShortBaseByYear_Month_Week
13755,Romania,200,07808-016-000,Cinnamon,2023,9,2023-02-27,1.0,Romania 7808-016-000 Cinnamon.xlsx,ShortBaseByYear_Month_Week
13758,Romania,200,07808-016-000,Cinnamon,2023,9,2023-02-27,3.0,Romania 7808-016-000 Cinnamon.xlsx,ShortBaseByYear_Month_Week


Usunięto identyczne duplikaty: 5


## 9. Krótkie serie po agregacji

Dla Niemiec występują czasem serie z małą liczbą tygodni (np. 4–9).
To nie jest „błąd ingestion” – ale musi być widoczne w raporcie jakości,
bo takie serie i tak zwykle odfiltrujemy w modelowaniu.


In [32]:
de_short_series = (
    df_all[df_all["country"] == "Germany"]
    .groupby(["dc_id", "sku", "product_name"])
    .agg(
        n_weeks=("week_start", "nunique"),
        min_date=("week_start", "min"),
        max_date=("week_start", "max"),
        mean_demand=("demand_raw", "mean"),
    )
    .reset_index()
    .sort_values("n_weeks")
)

de_short_path = QC_DIR / "germany_series_summary.csv"
de_short_series.to_csv(de_short_path, index=False)
print(f"Zapisano: {de_short_path}")
display(de_short_series.head(30))


Zapisano: data\qc_reports\germany_series_summary.csv


Unnamed: 0,dc_id,sku,product_name,n_weeks,min_date,max_date,mean_demand
0,100,00004-807-019,Pommes Frites I,4,2023-05-22,2025-06-09,1.0
2,200,00004-807-019,Pommes Frites I,5,2024-09-02,2024-09-30,8268.0
4,300,00004-807-019,Pommes Frites I,7,2023-05-01,2025-10-27,1.142857
6,400,00004-807-019,Pommes Frites I,9,2022-01-31,2025-09-29,1.0
10,700,00004-807-019,Pommes Frites I,188,2022-01-03,2025-11-03,6685.617021
8,500,00019-003-003,Coca Cola 20 L BIB,200,2022-01-03,2025-11-03,662.16
11,700,00019-003-003,Coca Cola 20 L BIB,200,2022-01-03,2025-11-03,791.795
5,300,00019-003-003,Coca Cola 20 L BIB,201,2021-12-27,2025-11-03,1520.910448
12,900,00019-003-003,Coca Cola 20 L BIB,201,2022-01-03,2025-11-03,842.502488
7,400,00019-003-003,Coca Cola 20 L BIB,201,2021-12-27,2025-11-03,986.810945


## 10. Coverage 

Coverage mówi, ile tygodni ma każda seria (`country, dc_id, sku`).
W modelowaniu często przyjmujemy minimum 52 tygodnie (rok danych).

- `ok_for_model=True` → seria ma >= 52 tygodnie.
- `n_series_ok` → ile serii w kraju spełnia warunek.

To nie jest wymagane do ingestion, ale jest bardzo użyteczne jako „quality gate”.


In [33]:
MIN_WEEKS_FOR_MODEL = 52
series_key = ["country", "dc_id", "sku"]

coverage = (
    df_all
    .groupby(series_key)
    .agg(
        n_weeks=("week_start", "nunique"),
        min_date=("week_start", "min"),
        max_date=("week_start", "max"),
        mean_demand=("demand_raw", "mean"),
        median_demand=("demand_raw", "median"),
    )
    .reset_index()
)

coverage["ok_for_model"] = coverage["n_weeks"] >= MIN_WEEKS_FOR_MODEL

print("Coverage – próbka:")
display(coverage.sort_values(["country", "n_weeks"], ascending=[True, False]).head(30))

cov_path = QC_DIR / "coverage_series.csv"
coverage.to_csv(cov_path, index=False)
print(f"Zapisano: {cov_path}")

country_cov = (
    coverage.groupby("country")
    .agg(
        n_series=("sku", "count"),
        n_series_ok=("ok_for_model", "sum"),
        avg_weeks=("n_weeks", "mean"),
        min_weeks=("n_weeks", "min"),
        max_weeks=("n_weeks", "max"),
    )
    .reset_index()
    .sort_values("n_series_ok", ascending=False)
)

print("Coverage per kraj:")
display(country_cov)

country_cov_path = QC_DIR / "coverage_country.csv"
country_cov.to_csv(country_cov_path, index=False)
print(f"Zapisano: {country_cov_path}")


Coverage – próbka:


Unnamed: 0,country,dc_id,sku,n_weeks,min_date,max_date,mean_demand,median_demand,ok_for_model
1,Germany,100,00019-003-003,202,2021-12-27,2025-11-03,1730.980198,1744.0,True
3,Germany,200,00019-003-003,201,2022-01-03,2025-11-03,1300.253731,1302.0,True
5,Germany,300,00019-003-003,201,2021-12-27,2025-11-03,1520.910448,1521.0,True
7,Germany,400,00019-003-003,201,2021-12-27,2025-11-03,986.810945,979.0,True
9,Germany,600,00019-003-003,201,2022-01-03,2025-11-03,1229.497512,1226.0,True
12,Germany,900,00019-003-003,201,2022-01-03,2025-11-03,842.502488,841.0,True
8,Germany,500,00019-003-003,200,2022-01-03,2025-11-03,662.16,655.0,True
11,Germany,700,00019-003-003,200,2022-01-03,2025-11-03,791.795,796.5,True
10,Germany,700,00004-807-019,188,2022-01-03,2025-11-03,6685.617021,6749.5,True
6,Germany,400,00004-807-019,9,2022-01-31,2025-09-29,1.0,1.0,False


Zapisano: data\qc_reports\coverage_series.csv
Coverage per kraj:


Unnamed: 0,country,n_series,n_series_ok,avg_weeks,min_weeks,max_weeks
4,Spain,27,24,231.37037,1,352
1,Poland,15,15,269.2,180,357
3,Romania,12,11,182.25,37,356
5,Sweden,12,10,266.5,37,356
0,Germany,13,9,140.0,4,202
2,Portugal,9,6,158.666667,1,346


Zapisano: data\qc_reports\coverage_country.csv


## 11. Zapis plików

Zapisujemy:
- `data/master_raw.parquet` – główny dataset po ingestion + QC korekty (ujemne->0, usunięte UNKNOWN, usunięte identyczne duplikaty),
- `data/master_raw.csv` – pomocniczo,
- raporty QC w `data/qc_reports/`.


In [34]:
df_all = df_all.sort_values(["country", "sku", "dc_id", "week_start"]).reset_index(drop=True)

parquet_path = DATA_DIR / "master_raw.parquet"
csv_path = DATA_DIR / "master_raw.csv"

df_all.to_parquet(parquet_path, index=False)
df_all.to_csv(csv_path, index=False)

print("Zapisano:")
print(parquet_path)
print(csv_path)
print(f"Raporty QC: {QC_DIR}")


Zapisano:
data\master_raw.parquet
data\master_raw.csv
Raporty QC: data\qc_reports


## 12. Podsumowanie notebooka

Wykonane:
- wczytanie wszystkich `.xlsx`,
- standaryzacja do formatu tygodniowego (w tym DE: dzienne → tygodniowe),
- kontrola jakości + raporty:
  - braki w kluczowych polach,
  - niepoprawne tygodnie,
  - zakres dat i liczności per kraj,
  - wartości ujemne (raport + zamiana na 0),
  - duplikaty (identyczne usunięte, konfliktowe raportowane),
  - coverage serii (quality gate).
