# **Cấu hình file**

In [2]:
from pathlib import Path
import polars as pl
import duckdb

# ===== CẤU HÌNH =====
DATA_RAW = Path(r"C:\Users\pitou\Desktop\Data Mining\data_raw")
DATA_PARQUET = Path(r"C:\Users\pitou\Desktop\Data Mining\data_parquet")
DATA_CLEAN = Path(r"C:\Users\pitou\Desktop\Data Mining\data_clean")

DATA_PARQUET.mkdir(parents=True, exist_ok=True)
DATA_CLEAN.mkdir(parents=True, exist_ok=True)

FILES = {
    'branches': 'branches.csv',
    'customers': 'customers.csv',
    'orders': 'orders.csv',
    'order_details': 'order_details.csv',
    'categories': 'categories.csv',
}

def p(path: Path) -> str:
    return path.resolve().as_posix()

In [3]:
# ===== UTILITY FUNCTIONS =====
def rename_if_present(df: pl.DataFrame | pl.LazyFrame, mapping: dict) -> pl.LazyFrame:
    schema = df.collect_schema() if isinstance(df, pl.LazyFrame) else df.schema
    safe_map = {old: new for old, new in mapping.items() if old in schema}
    return df.rename(safe_map)

def strip_all_str(df: pl.LazyFrame, cols: list[str]) -> pl.LazyFrame:
    schema = df.collect_schema()
    ex_cols = [c for c in cols if c in schema]
    return df.with_columns([
        pl.col(c).cast(pl.Utf8, strict=False).str.strip_chars().alias(c) 
        for c in ex_cols
    ])

def cast_if_present(df: pl.LazyFrame, col: str, dtype) -> pl.LazyFrame:
    schema = df.collect_schema()
    if col not in schema:
        return df
    return df.with_columns(
        pl.when(pl.col(col).is_not_null())
        .then(pl.col(col).cast(dtype, strict=False))
        .otherwise(pl.lit(None))
        .alias(col)
    )

def write_lazy(df_lazy: pl.LazyFrame, out_path: Path):
    df_lazy.sink_parquet(p(out_path))


# **Chuyển CSV sang Parquet**

In [4]:
# ===== 1. CSV TO PARQUET =====
print("=" * 60)
print("BƯỚC 1: CHUYỂN CSV SANG PARQUET")
print("=" * 60)

con = duckdb.connect()

def csv_to_parquet(csv_path: Path, parquet_path: Path):
    sql = f'''
    COPY (
        SELECT * FROM read_csv_auto('{p(csv_path)}', ALL_VARCHAR=TRUE)
    )
    TO '{p(parquet_path)}' (FORMAT 'parquet');
    '''
    con.execute(sql)

for key, fname in FILES.items():
    csv_fp = DATA_RAW / fname
    if not csv_fp.exists():
        print(f'Missing CSV: {csv_fp}')
        continue
    out_fp = DATA_PARQUET / f'{key}_raw.parquet'
    print(f'→ Converting {csv_fp.name} → {out_fp.name}')
    csv_to_parquet(csv_fp, out_fp)

print('CSV to Parquet done.\n')

BƯỚC 1: CHUYỂN CSV SANG PARQUET
→ Converting branches.csv → branches_raw.parquet
→ Converting customers.csv → customers_raw.parquet
→ Converting orders.csv → orders_raw.parquet


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

→ Converting order_details.csv → order_details_raw.parquet


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

→ Converting categories.csv → categories_raw.parquet
CSV to Parquet done.



# **Làm sạch dữ liệu**

In [6]:
# ===== 2. CLEAN BRANCHES =====
print("=" * 60)
print("BƯỚC 2: LÀM SẠCH BRANCHES")
print("=" * 60)

raw_fp = DATA_PARQUET / 'branches_raw.parquet'
out_fp = DATA_CLEAN / 'branches_clean.parquet'

if raw_fp.exists():
    df = pl.scan_parquet(p(raw_fp))
    df = rename_if_present(df, {
        'BRANCH_ID': 'branch_id',
        'REGION': 'region',
        'CITY': 'city',
        'TOWN': 'town',
        'BRANCH_TOWN': 'branch_town',
        'LAT': 'lat',
        'LON': 'lon',
    })

    df = strip_all_str(df, ['branch_id', 'region', 'city', 'town', 'branch_town'])
    df = cast_if_present(df, 'branch_id', pl.Utf8)
    df = cast_if_present(df, 'lat', pl.Float64)
    df = cast_if_present(df, 'lon', pl.Float64)

    schema = df.collect_schema()
    if 'branch_id' in schema:
        df = df.unique(subset=['branch_id'])

    # Scale lat/lon
    if 'lat' in schema and 'lon' in schema:
        df = df.with_columns([
            (pl.col("lat") / 1e8).alias("lat"),
            (pl.col("lon") / 1e8).alias("lon")
        ])

    # Fill null
    df = df.with_columns([
        pl.col("region").fill_null("Unknown").str.strip_chars(),
        pl.col("city").fill_null("Unknown").str.strip_chars(),
        pl.col("town").fill_null("Unknown").str.strip_chars(),
    ])

    # Remove null branch_id
    df = df.filter(pl.col("branch_id").is_not_null())

    write_lazy(df, out_fp)
    cnt = df.select(pl.len()).collect().item()
    print(f'Branches cleaned: {cnt:,} rows\n')


BƯỚC 2: LÀM SẠCH BRANCHES
Branches cleaned: 161 rows



In [5]:
# ===== 3. CLEAN CUSTOMERS =====
print("=" * 60)
print("BƯỚC 3: LÀM SẠCH CUSTOMERS")
print("=" * 60)

raw_fp = DATA_PARQUET / 'customers_raw.parquet'
out_fp = DATA_CLEAN / 'customers_clean.parquet'

if raw_fp.exists():
    df = pl.scan_parquet(p(raw_fp))
    df = rename_if_present(df, {
        'USERID': 'user_id',
        'USERNAME_': 'username',
        'NAMESURNAME': 'name_surname',
        'STATUS_': 'status',
        'USERGENDER': 'gender',
        'USERBIRTHDATE': 'birth_date',
        'REGION': 'region',
        'CITY': 'city',
        'TOWN': 'town',
        'DISTRICT': 'district',
        'ADDRESSTEXT': 'address_text',
    })

    text_cols = ['username', 'name_surname', 'gender', 'region', 'city', 'town', 'district', 'address_text']
    df = strip_all_str(df, text_cols)
    df = cast_if_present(df, 'user_id', pl.Utf8)

    schema = df.collect_schema()
    
    if 'birth_date' in schema:
        df = df.with_columns(
            pl.col('birth_date').str.strptime(pl.Date, format='%Y-%m-%d', strict=False)
        )

    df = df.filter(pl.col("user_id").is_not_null())
    df = df.unique(subset=['user_id'], keep='first')
    
    # Chuẩn hóa giới tính E = Nam, K = Nữ
    if 'gender' in schema:
        df = df.with_columns(
            pl.when(pl.col('gender').str.to_uppercase() == 'E')
              .then(pl.lit('Male'))
              .when(pl.col('gender').str.to_uppercase() == 'K')
              .then(pl.lit('Female'))
              .otherwise(pl.lit('Unknown')) 
              .alias('gender')
        )

    for col in ['region', 'city', 'town', 'district']:
        if col in schema:
            df = df.with_columns(
                pl.col(col).fill_null("Unknown").str.strip_chars()
            )

    write_lazy(df, out_fp)
    cnt = df.select(pl.len()).collect().item()
    print(f'Customers cleaned: {cnt:,} rows\n')

BƯỚC 3: LÀM SẠCH CUSTOMERS
Customers cleaned: 99,998 rows



In [7]:
# ===== 4. CLEAN ORDERS =====
print("=" * 60)
print("BƯỚC 4: LÀM SẠCH ORDERS")
print("=" * 60)

raw_fp = DATA_PARQUET / 'orders_raw.parquet'
out_fp = DATA_CLEAN / 'orders_clean.parquet'

if raw_fp.exists():
    df = pl.scan_parquet(p(raw_fp))
    df = rename_if_present(df, {
        'ORDERID': 'order_id',
        'BRANCH_ID': 'branch_id',
        'DATE_': 'order_date',
        'USERID': 'user_id',
        'NAMESURNAME': 'name_surname',
        'TOTALBASKET': 'total_basket',
    })

    df = strip_all_str(df, ['branch_id', 'name_surname'])
    df = cast_if_present(df, 'order_id', pl.Utf8)
    df = cast_if_present(df, 'user_id', pl.Utf8)
    df = cast_if_present(df, 'branch_id', pl.Utf8)

    schema = df.collect_schema()
    if 'order_date' in schema:
        df = df.with_columns(
            pl.col('order_date').str.strptime(pl.Datetime, format='%Y-%m-%d %H:%M:%S', strict=False)
        )

    if 'total_basket' in schema:
        df = df.with_columns(
            pl.col('total_basket')
            .cast(pl.Utf8, strict=False)
            .str.replace_all(",", ".")
            .cast(pl.Float64, strict=False)
        )

    if 'order_id' in schema:
        df = df.unique(subset=['order_id'])

    # Filter invalid orders
    df = df.filter(
        pl.col("user_id").is_not_null() & 
        pl.col("branch_id").is_not_null()
    )

    # Chấp nhận total_basket = 0 (có thể là đơn trả hàng/hoàn tiền)
    if "total_basket" in schema:
        df = df.filter(
            pl.col("total_basket").is_not_null() & 
            (pl.col("total_basket") >= 0)  # Thay đổi từ > 0 thành >= 0
        )

    # Add date features
    if "order_date" in schema:
        df = df.with_columns([
            pl.col("order_date").dt.year().alias("order_year"),
            pl.col("order_date").dt.month().alias("order_month"),
            pl.col("order_date").dt.weekday().alias("order_weekday")
        ])

    write_lazy(df, out_fp)
    cnt = df.select(pl.len()).collect().item()
    print(f'Orders cleaned: {cnt:,} rows\n')


BƯỚC 4: LÀM SẠCH ORDERS
Orders cleaned: 10,235,193 rows



In [8]:
# ===== 5. CLEAN ORDER_DETAILS =====
print("=" * 60)
print("BƯỚC 5: LÀM SẠCH ORDER_DETAILS")
print("=" * 60)

raw_fp = DATA_PARQUET / 'order_details_raw.parquet'
out_fp = DATA_CLEAN / 'order_details_clean.parquet'

if raw_fp.exists():
    df = pl.scan_parquet(str(raw_fp))

    df = rename_if_present(df, {
        'ORDERID': 'order_id',
        'ORDERDETAILID': 'order_detail_id',
        'AMOUNT': 'amount',
        'UNITPRICE': 'unit_price',
        'TOTALPRICE': 'total_price',
        'ITEMID': 'item_id',
        'ITEMCODE': 'item_code',
    })

    # Ép kiểu dữ liệu
    for col in ["order_id", "order_detail_id", "item_id", "item_code"]:
        df = cast_if_present(df, col, pl.Utf8)
    if 'amount' in df.collect_schema().keys():
        df = df.with_columns(pl.col('amount').cast(pl.Int64, strict=False))
    for col in ['unit_price', 'total_price']:
        if col in df.collect_schema().keys():
            df = df.with_columns(
                pl.col(col).cast(pl.Utf8, strict=False)
                .str.replace_all(",", ".").cast(pl.Float64, strict=False)
            )

    # Chỉ giữ lại các dòng có ID, số lượng và giá > 0 (hoặc >= 0 cho unit_price)
    df = df.filter(
        pl.col('order_id').is_not_null() &
        pl.col('item_id').is_not_null() &
        pl.col('amount').is_not_null() & (pl.col('amount') > 0) &
        pl.col('unit_price').is_not_null() & (pl.col('unit_price') >= 0) &
        pl.col('total_price').is_not_null() & (pl.col('total_price') >= 0)
    )
    
    # total_price là giá trị khách phải trả (sau khi đã áp dụng giảm giá và chiết khấu) và tính toán mức chiết khấu
    if {'amount', 'unit_price', 'total_price'}.issubset(df.collect_schema().keys()):
        df = df.with_columns(
            (pl.col('amount') * pl.col('unit_price') - pl.col('total_price')).alias('discount_amount')
        )

    write_lazy(df, out_fp)
    cnt = df.select(pl.len()).collect().item()
    print(f'Order_details cleaned: {cnt:,} rows\n')

BƯỚC 5: LÀM SẠCH ORDER_DETAILS
Order_details cleaned: 51,185,032 rows



In [9]:
# ===== 6. CLEAN CATEGORIES =====
print("=" * 60)
print("BƯỚC 6: LÀM SẠCH CATEGORIES")
print("=" * 60)

raw_fp = DATA_PARQUET / 'categories_raw.parquet'
out_fp = DATA_CLEAN / 'categories_clean.parquet'

if raw_fp.exists():
    df = pl.scan_parquet(p(raw_fp))
    df = rename_if_present(df, {
        'ITEMID': 'item_id',
        'CATEGORY1': 'category1',
        'CATEGORY1_ID': 'category1_id',
        'CATEGORY2': 'category2',
        'CATEGORY2_ID': 'category2_id',
        'CATEGORY3': 'category3',
        'CATEGORY3_ID': 'category3_id',
        'CATEGORY4': 'category4',
        'CATEGORY4_ID': 'category4_id',
        'BRAND': 'brand',
        'ITEMCODE': 'item_code',
        'ITEMNAME': 'item_name',
    })

    text_cols = [
        'category1', 'category2', 'category3', 'category4',
        'brand', 'item_name'
    ]
    df = strip_all_str(df, text_cols)

    id_cols = ['item_id', 'item_code', 'category1_id', 'category2_id', 'category3_id', 'category4_id']
    for c in id_cols:
        df = cast_if_present(df, c, pl.Utf8)

    schema = df.collect_schema()
    if 'item_id' in schema:
        df = df.unique(subset=['item_id'])

    # Filter null item_id
    df = df.filter(pl.col("item_id").is_not_null())

    # Chuẩn hóa null values
    for c in ["category1", "category2", "category3", "category4", "brand", "item_name"]:
        if c in schema:
            df = df.with_columns(pl.col(c).fill_null("Unknown").str.strip_chars())

    write_lazy(df, out_fp)
    cnt = df.select(pl.len()).collect().item()
    print(f'Categories cleaned: {cnt:,} rows\n')

BƯỚC 6: LÀM SẠCH CATEGORIES
Categories cleaned: 27,000 rows



# **Kiểm tra nhanh**

In [10]:
DATA_CLEAN = Path(r"C:\Users\pitou\Desktop\Data Mining\data_clean")
od = (DATA_CLEAN / "order_details_clean.parquet").as_posix()
cat = (DATA_CLEAN / "categories_clean.parquet").as_posix()

con = duckdb.connect()

# Tỷ lệ dòng có unit_price = 0
res1 = con.execute(f"""
SELECT 
  COUNT(*) AS n_all,
  SUM(CASE WHEN unit_price = 0 THEN 1 ELSE 0 END) AS n_zero,
  100.0 * SUM(CASE WHEN unit_price = 0 THEN 1 ELSE 0 END) / COUNT(*) AS pct_zero
FROM parquet_scan('{od}');
""").fetchdf()
print(res1)

# unit_price = 0 nhưng total_price > 0
res2 = con.execute(f"""
SELECT 
  COUNT(*) AS n_sus,
  SUM(total_price) AS sum_total_price_sus
FROM parquet_scan('{od}')
WHERE unit_price = 0 AND total_price > 0;
""").fetchdf()
print(res2)

# unit_price = 0 và total_price = 0 (amount > 0)
res3 = con.execute(f"""
SELECT 
  COUNT(*) AS n_free,
  SUM(amount) AS qty_free
FROM parquet_scan('{od}')
WHERE unit_price = 0 AND total_price = 0 AND amount > 0;
""").fetchdf()
print(res3)

# Top mặt hàng có unit_price = 0
res4 = con.execute(f"""
SELECT d.item_id, c.item_name, c.brand, COUNT(*) AS cnt
FROM parquet_scan('{od}') d
LEFT JOIN parquet_scan('{cat}') c ON d.item_id = c.item_id
WHERE d.unit_price = 0
GROUP BY 1,2,3
ORDER BY cnt DESC
LIMIT 20;
""").fetchdf()
print(res4)

con.close()

# số lượng dòng có unit_price = 0 là 115638 - có thể là sản phẩm khuyến mãi hoặc nhập liệu sai
# chiếm tỷ lệ ~0.22% trong toàn bộ dữ liệu (rất thấp nhưng không phải không đáng kể - tuy nhiên có thể drop nếu không ảnh hướng nhiều)

# có 5346 trường hợp unit_price = 0 nhưng khách vẫn bị tính tiền vào sản phẩm đó (có thể là lỗi nhập liệu unit_price hoặc total_price hoặc một dạng khuyến mãi phức tạp)
# khách đã chi 1.3M đơn vị tiền cho sản phẩm ở res2

# hàng tặng kèm thật sự (unit_price và total_price đều bằng 0) có 110292 dòng
# tổng số lượng sản phẩm free là 496407 sản phẩm

      n_all    n_zero  pct_zero
0  51185032  115638.0  0.225922
   n_sus  sum_total_price_sus
0   5346           1304838.63
   n_free  qty_free
0  110292  496407.0
   item_id                             item_name      brand   cnt
0     4506               GOLBASI SALATALIK YERLI  SALATALIK  2000
1     9921           KAF.DOGADAN YESIL CAY BALLI    DOGADAN  1997
2     3235                         PALET 110*130       KARO  1962
3     8801         KAF.DOGADAN YESIL CAY GINGOLU    DOGADAN  1960
4    12647                 GOLBASI BIBER KIRMIZI      BIBER  1955
5    13606             GEZER MERDANE TERLIK 9312      GEZER  1953
6      544                   BAKLIYAT KOLI BANDI       SARF  1952
7    11677  KAF.NESCAFE 3 IN 1 18 GR BOL KREMALI    NESCAFE  1949
8     1025             GOLBASI BIBER KAPYA YESIL      BIBER  1947
9      382                         CIMENTO DOKME       KARO  1938
10    1319                KAF.DOGADAN FORM LIMON    DOGADAN  1935
11   11584         BAHARAT KARTON KUTU BUYUK

# **Merge file đích**

Đây là file đích chỉ dùng để phân tích và khai phá dữ liệu

Không tiền xử lý trên file này nữa, hoặc có (nếu dữ liệu vẫn chưa thật sự được sạch)

In [11]:
DATA_CLEAN = Path(r"C:\Users\pitou\Desktop\Data Mining\data_clean")
final_fp = DATA_CLEAN / "final_data.parquet"

con = duckdb.connect()

print("Bắt đầu tạo file final_data.parquet...")

con.execute(f"""
COPY (
    SELECT
        -- Thông tin đơn hàng từ order_details và orders
        od.order_id,
        o.order_date,
        o.order_year,
        o.order_month,
        o.order_weekday,
        o.total_basket,
        
        -- Thông tin chi nhánh từ branches
        o.branch_id,
        b.region AS branch_region,
        b.city AS branch_city,
        b.town AS branch_town,
        
        -- Thông tin sản phẩm từ categories và order_details
        od.item_id,
        c.item_name,
        c.category1,
        c.category2,
        c.category3,
        c.category4,
        c.brand,
        od.amount,
        od.unit_price,
        od.total_price,
        od.discount_amount, 

        -- Thông tin khách hàng 
        cust.user_id
        
    FROM parquet_scan('{(DATA_CLEAN / "order_details_clean.parquet").as_posix()}') AS od
    
    LEFT JOIN parquet_scan('{(DATA_CLEAN / "orders_clean.parquet").as_posix()}') AS o
        ON od.order_id = o.order_id
        
    LEFT JOIN parquet_scan('{(DATA_CLEAN / "branches_clean.parquet").as_posix()}') AS b
        ON o.branch_id = b.branch_id
        
    LEFT JOIN parquet_scan('{(DATA_CLEAN / "categories_clean.parquet").as_posix()}') AS c
        ON od.item_id = c.item_id

    -- Thêm JOIN với bảng customers
    LEFT JOIN parquet_scan('{(DATA_CLEAN / "customers_clean.parquet").as_posix()}') AS cust
        ON o.user_id = cust.user_id

) TO '{final_fp.as_posix()}' (FORMAT 'parquet');
""")

con.close()
print(f"Đã tạo file {final_fp.name}")

Bắt đầu tạo file final_data.parquet...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Đã tạo file final_data.parquet
