# 02 — Cleaning (Curățarea datelor)

**Obiectiv:** standardizez tipuri, curăț valori lipsă/aberante, deduplicate, unific tranzacțiile A+B și le leg cu tabelele de dimensiuni (`products`, `customers`).  
**La final:** salvez `data/processed/sales_clean.parquet` + `sample_sales.csv` (eșantion mic pt. Git).

**Pași:**
1) Conversii numerice (`,`, `.`, sufixe `RON/lei`).
2) Conversii de dată (formate amestecate).
3) Tratare valori lipsă / negative.
4) Deduplicare.
5) Join cu `products` & `customers`.
6) (Opțional) preț promo activ la data tranzacției.
7) Export final + verificări rapide (QA).

In [1]:
from pathlib import Path
import pandas as pd
import numpy as np

INTERIM = Path("data/interim")
PROCESSED = Path("data/processed")
PROCESSED.mkdir(parents=True, exist_ok=True)

products  = pd.read_parquet(INTERIM/"products.parquet")
promos    = pd.read_parquet(INTERIM/"promotions.parquet")
customers = pd.read_parquet(INTERIM/"customers.parquet")
tx_a      = pd.read_parquet(INTERIM/"tx_systemA.parquet")
tx_b      = pd.read_parquet(INTERIM/"tx_systemB.parquet")

len(products), len(promos), len(customers), len(tx_a), len(tx_b)

(60, 30, 253, 900, 700)

**Explicație:** lucrez pe copiile intermediare (`.parquet`) create în pasul 01; sursele brute rămân nemodificate.

In [2]:
import re

def normalize_ws(x):
    """Normalizează spațiile multiple -> unul și taie la capete."""
    if pd.isna(x): return x
    return re.sub(r"\s+", " ", str(x)).strip()

def to_float_mixed(x):
    """
    Convertește string-uri cu ,/. și sufixe ('RON', 'lei') în float.
    Suportă '1.234,56', '123,45', '123.45', '12,34 RON'.
    """
    if pd.isna(x): 
        return np.nan
    s = str(x).strip()
    s = s.replace("RON","").replace("lei","").strip()
    if s.count(",") == 1 and s.count(".") > 0:
        # mii cu punct + zecimale cu virgulă
        s = s.replace(".", "").replace(",", ".")
    elif s.count(",") == 1 and s.count(".") == 0:
        # doar virgulă zecimală
        s = s.replace(",", ".")
    try:
        return float(s)
    except:
        return np.nan


In [3]:
products["product_name"] = products["product_name"].map(normalize_ws)
products["category"]     = products["category"].map(normalize_ws).fillna("NECUNOSCUT")
products["unit"]         = products["unit"].astype(str).str.strip().replace({"piece":"buc"," piece ":"buc"})
products["list_price"]   = products["list_price"].apply(to_float_mixed)

# produs unic după cheie
products = products.drop_duplicates(subset=["product_id"])
products.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 60 entries, 0 to 59
Data columns (total 6 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   product_id    60 non-null     object 
 1   product_name  60 non-null     object 
 2   category      60 non-null     object 
 3   unit          60 non-null     object 
 4   list_price    60 non-null     float64
 5   vat           49 non-null     float64
dtypes: float64(2), object(4)
memory usage: 2.9+ KB


**Explicație:** standardizez textul/diacriticele, unific unitățile și transform `list_price` în `float`. Elimin dublurile pe `product_id`.


In [5]:
promos["promo_price"] = promos["promo_price"].apply(to_float_mixed)
promos["start_date"]  = pd.to_datetime(promos["start_date"], format="%d/%m/%Y", errors="coerce")
promos["end_date"]    = pd.to_datetime(promos["end_date"],   format="%d/%m/%Y", errors="coerce")

# elimin promoții invalide și intervale inverse
promos = promos.dropna(subset=["product_id","promo_price","start_date","end_date"])
promos = promos[promos["end_date"] >= promos["start_date"]]
promos.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 30 entries, 0 to 29
Data columns (total 5 columns):
 #   Column         Non-Null Count  Dtype         
---  ------         --------------  -----         
 0   product_id     30 non-null     object        
 1   start_date     30 non-null     datetime64[ns]
 2   end_date       30 non-null     datetime64[ns]
 3   promo_price    30 non-null     float64       
 4   discount_type  21 non-null     object        
dtypes: datetime64[ns](2), float64(1), object(2)
memory usage: 1.3+ KB


In [7]:
# === Unesc A + B (schema a fost aliniată în 01) ===
tx = pd.concat([tx_a, tx_b], ignore_index=True, sort=False)

# --- Parsare robustă pentru timestamp din A și B ---
col = "timestamp"
tx[col] = tx[col].astype(str)  # asigur că sunt stringuri

# System A: are oră, format "%Y-%m-%d %H:%M:%S"
mask_a = tx[col].str.contains(":", na=False)

ts = pd.Series(pd.NaT, index=tx.index, dtype="datetime64[ns]")
ts.loc[mask_a] = pd.to_datetime(
    tx.loc[mask_a, col],
    format="%Y-%m-%d %H:%M:%S",
    errors="coerce"
)

# System B: restul, format "%d.%m.%Y"
mask_b = ts.isna()
ts.loc[mask_b] = pd.to_datetime(
    tx.loc[mask_b, col],
    format="%d.%m.%Y",
    errors="coerce"
)

tx["timestamp"] = ts

# === Tipuri corecte pe celelalte coloane ===
tx["unit_price"] = tx["unit_price"].apply(to_float_mixed)
tx["quantity"]   = pd.to_numeric(tx["quantity"], errors="coerce")

# === Reguli de validare ===
tx.loc[tx["quantity"] <= 0, "quantity"] = np.nan   # cantități negative/zero -> invalide
tx = tx.dropna(subset=["customer_id","product_id","quantity","unit_price","timestamp"])

# === Deduplicare (cheie compusă) ===
tx = tx.drop_duplicates(
    subset=["trans_id","customer_id","product_id","timestamp"],
    keep="first"
)

tx.info()

<class 'pandas.core.frame.DataFrame'>
Index: 675 entries, 0 to 899
Data columns (total 8 columns):
 #   Column          Non-Null Count  Dtype         
---  ------          --------------  -----         
 0   trans_id        675 non-null    object        
 1   timestamp       675 non-null    datetime64[ns]
 2   customer_id     675 non-null    object        
 3   product_id      675 non-null    object        
 4   quantity        675 non-null    float64       
 5   unit_price      675 non-null    float64       
 6   store           675 non-null    object        
 7   payment_method  0 non-null      object        
dtypes: datetime64[ns](1), float64(2), object(5)
memory usage: 47.5+ KB


**Explicație:** standardizez tipurile, invalidez cantitățile ≤ 0, elimin rânduri cu lipsuri critice și deduplicate pe o cheie compusă.


In [8]:
df = (tx
      .merge(products,  on="product_id", how="left", suffixes=("","_prod"))
      .merge(customers, on="customer_id", how="left", suffixes=("","_cust"))
     )

# normalizări ușoare pe clienti
df["customer_name"] = df["customer_name"].map(normalize_ws)
df["segment"] = df["segment"].map(lambda s: normalize_ws(s).upper().replace(" ","") if pd.notna(s) else s)

df.head(3)

Unnamed: 0,trans_id,timestamp,customer_id,product_id,quantity,unit_price,store,payment_method,product_name,category,unit,list_price,vat,customer_name,email,city,segment
0,T313725,2025-07-14 11:28:00,C2127,P1010,2.0,21.22,,,Ceai Eco,Bauturi,buc,21.93,0.09,Andrei Radu,user3327@example.ro,Iasi,B2C
1,T426596,2025-04-23 10:55:00,C2189,P1003,3.0,14.02,Store-01,,Suc Zero,Dulciuri,set,16.36,,Elena Marin,user1087@example.ro,Brasov,RETAIL
2,T797849,2025-08-16 01:08:00,C2057,P1053,3.0,38.66,shop_03,,Apa Premium,-,buc,38.16,0.05,George Radu,user4647@example.ro,Cluj-Napoca,B2C


In [9]:
# grupăm promoțiile pe produs pentru lookup rapid
promos_sorted = promos.sort_values(["product_id","start_date"])

def promo_lookup_row(row):
    p = promos_sorted[promos_sorted["product_id"] == row["product_id"]]
    if p.empty:
        return np.nan
    m = p[(p["start_date"] <= row["timestamp"]) & (p["end_date"] >= row["timestamp"])]
    return m["promo_price"].iloc[0] if len(m) else np.nan

df["promo_price"] = df.apply(promo_lookup_row, axis=1)
df["effective_price"] = np.where(df["promo_price"].notna(), df["promo_price"], df["unit_price"])
df["line_amount"] = df["effective_price"] * df["quantity"]

df[["unit_price","promo_price","effective_price","quantity","line_amount"]].head()


Unnamed: 0,unit_price,promo_price,effective_price,quantity,line_amount
0,21.22,,21.22,2.0,42.44
1,14.02,,14.02,3.0,42.06
2,38.66,,38.66,3.0,115.98
3,12.6,,12.6,1.0,12.6
4,10.49,,10.49,1.0,10.49


**Explicație:** pentru fiecare tranzacție caut promo activ pentru acel `product_id` în intervalul `[start_date, end_date]`.  
Dacă există, folosesc `promo_price` ca `effective_price`; altfel rămâne `unit_price`.


In [10]:
df.to_parquet(PROCESSED/"sales_clean.parquet", index=False)
df.sample(min(100, len(df))).to_csv(PROCESSED/"sample_sales.csv", index=False)
df.shape


(683, 20)

**Explicație:** export setul curățat (Parquet) + un `sample_sales.csv` mic (ideal de urcat în Git).


In [12]:
# Rată lipsă pe câmpuri critice
na_rates = df[["customer_id","product_id","quantity","effective_price","timestamp"]].isna().mean().sort_values(ascending=False)
display(na_rates)

# Top 5 produse după sumă vândută
top_products = df.groupby("product_id")["line_amount"].sum().sort_values(ascending=False).head(5)
display(top_products)

# Vânzări pe magazin
sales_by_store = df.groupby("store")["line_amount"].sum().sort_values(ascending=False)
display(sales_by_store.head(10))


customer_id        0.0
product_id         0.0
quantity           0.0
effective_price    0.0
timestamp          0.0
dtype: float64

product_id
P1026    2611.88
P1013    2304.65
P1018    2214.74
P1031    2128.30
P1020    2086.15
Name: line_amount, dtype: float64

store
Online      18282.88
Store-01    17298.47
Store-02    15903.59
shop_03     13909.14
            12286.76
Name: line_amount, dtype: float64