In [2]:
import pandas as pd
from config import MINIO_ENDPOINT, MINIO_ACCESS, MINIO_SECRET, MINIO_BUCKET


In [3]:
STORAGE_OPTIONS = {
    "key": MINIO_ACCESS,
    "secret": MINIO_SECRET,
    "client_kwargs": {"endpoint_url": MINIO_ENDPOINT},
}

In [4]:
df_orders = pd.read_csv(
    f"s3://{MINIO_BUCKET}/raw/olist_orders_dataset.csv",
    storage_options=STORAGE_OPTIONS
)

In [5]:
df_orders.head()

Unnamed: 0,order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date
0,e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18 00:00:00
1,53cdb2fc8bc7dce0b6741e2150273451,b0830fb4747a6c6d20dea0b8c802d7ef,delivered,2018-07-24 20:41:37,2018-07-26 03:24:27,2018-07-26 14:31:00,2018-08-07 15:27:45,2018-08-13 00:00:00
2,47770eb9100c2d0c44946d9cf07ec65d,41ce2a54c0b03bf3443c3d931a367089,delivered,2018-08-08 08:38:49,2018-08-08 08:55:23,2018-08-08 13:50:00,2018-08-17 18:06:29,2018-09-04 00:00:00
3,949d5b44dbf5de918fe9c16f97b45f8a,f88197465ea7920adcdbec7375364d82,delivered,2017-11-18 19:28:06,2017-11-18 19:45:59,2017-11-22 13:39:59,2017-12-02 00:28:42,2017-12-15 00:00:00
4,ad21c59c0840e6cb83a9ceb5573f8159,8ab97904e6daea8866dbdbc4fb7aad2c,delivered,2018-02-13 21:18:39,2018-02-13 22:20:29,2018-02-14 19:46:34,2018-02-16 18:17:02,2018-02-26 00:00:00


In [6]:
df_orders.dtypes


order_id                         object
customer_id                      object
order_status                     object
order_purchase_timestamp         object
order_approved_at                object
order_delivered_carrier_date     object
order_delivered_customer_date    object
order_estimated_delivery_date    object
dtype: object

In [7]:
df_orders[[
    "order_purchase_timestamp",
    "order_approved_at",
    "order_delivered_carrier_date",
    "order_delivered_customer_date",
    "order_estimated_delivery_date"
]].dtypes

order_purchase_timestamp         object
order_approved_at                object
order_delivered_carrier_date     object
order_delivered_customer_date    object
order_estimated_delivery_date    object
dtype: object

In [8]:
df_orders["order_status"].unique()

array(['delivered', 'invoiced', 'shipped', 'processing', 'unavailable',
       'canceled', 'created', 'approved'], dtype=object)

In [9]:
df_orders["order_status"].value_counts()

order_status
delivered      96478
shipped         1107
canceled         625
unavailable      609
invoiced         314
processing       301
created            5
approved           2
Name: count, dtype: int64

In [54]:
def clean_orders():
    df = pd.read_csv(
        f"s3://{MINIO_BUCKET}/raw/olist_orders_dataset.csv",
        storage_options=STORAGE_OPTIONS
    )

    # 1. ep kieu datetime
    date_cols = [
        "order_purchase_timestamp",
        "order_approved_at",
        "order_delivered_carrier_date",
        "order_delivered_customer_date",
        "order_estimated_delivery_date",
    ]
    for col in date_cols:
        df[col] = pd.to_datetime(df[col], errors="coerce", utc=True)

    # 2. them vung (partition) date
    df["order_purchase_date"] = df["order_purchase_timestamp"].dt.date

    # 3. rang buoc order_status
    status_map = {
        "delivered": "delivered",
        "shipped": "shipped",
        "canceled": "canceled",
        "unavailable": "canceled",  
        "invoiced": "approved",   
        "processing": "created",    
        "created": "created",
        "approved": "approved",
    }
    df["order_status"] = df["order_status"].map(status_map).fillna("invalid")

    # status_flag: 1 hợp lệ, 0 invalid
    df["status_flag"] = df["order_status"].apply(lambda x: 0 if x == "invalid" else 1)

    # 4. logic thoi gian
    def check_time(row):
        try:
            if pd.isna(row["order_purchase_timestamp"]):
                return 1
            if pd.notna(row["order_approved_at"]) and row["order_purchase_timestamp"] > row["order_approved_at"]:
                return 1
            if pd.notna(row["order_delivered_carrier_date"]) and row["order_approved_at"] and row["order_approved_at"] > row["order_delivered_carrier_date"]:
                return 1
            if pd.notna(row["order_delivered_customer_date"]) and row["order_delivered_carrier_date"] and row["order_delivered_carrier_date"] > row["order_delivered_customer_date"]:
                return 1
            if pd.notna(row["order_estimated_delivery_date"]) and row["order_delivered_customer_date"] and row["order_delivered_customer_date"] > row["order_estimated_delivery_date"]:
                return 1
            return 0
        except:
            return 1

    df["time_anomaly"] = df.apply(check_time, axis=1)

    #flag đã giao
    df["delivered_flag"] = df["order_delivered_customer_date"].apply(lambda x: 1 if pd.notna(x) else 0)
    
    # 5. dam bao order_id duy nhat
    df = df.sort_values("order_purchase_timestamp").drop_duplicates(subset=["order_id"], keep="last")

    # save parquet → silver
    df.to_parquet(
        f"s3://{MINIO_BUCKET}/silver/orders.parquet",
        index=False,
        storage_options=STORAGE_OPTIONS
    )
    print(f"Orders cleaned: {df.shape[0]} rows → silver/")

    return df.head()


In [55]:
clean_orders()

Orders cleaned: 99441 rows → silver/


Unnamed: 0,order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date,order_purchase_date,status_flag,time_anomaly,delivered_flag
4541,2e7a8482f6fb09756ca50c10d7bfc047,08c5351a6aca1c1589a38f244edeee9d,shipped,2016-09-04 21:15:19+00:00,2016-10-07 13:18:03+00:00,2016-10-18 13:14:51+00:00,NaT,2016-10-20 00:00:00+00:00,2016-09-04,1,0,0
4396,e5fa5a7210941f7d56d0208e4e071d35,683c54fc24d40ee9f8a6fc179fd9856c,canceled,2016-09-05 00:15:34+00:00,2016-10-07 13:17:15+00:00,NaT,NaT,2016-10-28 00:00:00+00:00,2016-09-05,1,0,0
10071,809a282bbd5dbcabb6f2f724fca862ec,622e13439d6b5a0b486c435618b2679e,canceled,2016-09-13 15:24:19+00:00,2016-10-07 13:16:46+00:00,NaT,NaT,2016-09-30 00:00:00+00:00,2016-09-13,1,0,0
30710,bfbd0f9bdef84302105ad712db648a6c,86dc2ffce2dfff336de2f386a786e574,delivered,2016-09-15 12:16:38+00:00,2016-09-15 12:16:38+00:00,2016-11-07 17:11:53+00:00,2016-11-09 07:47:38+00:00,2016-10-04 00:00:00+00:00,2016-09-15,1,1,1
83078,71303d7e93b399f5bcd537d124c0bcfa,b106b360fe2ef8849fbbd056f777b4d5,canceled,2016-10-02 22:07:52+00:00,2016-10-06 15:50:56+00:00,NaT,NaT,2016-10-25 00:00:00+00:00,2016-10-02,1,0,0


In [87]:
df_customers = pd.read_csv(
        f"s3://{MINIO_BUCKET}/raw/olist_customers_dataset.csv",
        storage_options=STORAGE_OPTIONS
    )


In [19]:
df_customers.head()

Unnamed: 0,customer_id,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state
0,06b8999e2fba1a1fbc88172c00ba8bc7,861eff4711a542e4b93843c6dd7febb0,14409,franca,SP
1,18955e83d337fd6b2def6b18a428ac77,290c77bc529b7ac935b93aa66c333dc3,9790,sao bernardo do campo,SP
2,4e7b3e00288586ebd08712fdd0374a03,060e732b5b29e8181a18229c7b0b2b5e,1151,sao paulo,SP
3,b2b6027bc5c5109e529d4dc6358b12c3,259dac757896d24d7702b9acbbff3f3c,8775,mogi das cruzes,SP
4,4f2d8ab171c80ec8364f7c12e35b23ad,345ecd01c38d18a9036ed96c73b8d066,13056,campinas,SP


In [20]:
df_customers.info()



<class 'pandas.core.frame.DataFrame'>
RangeIndex: 99441 entries, 0 to 99440
Data columns (total 5 columns):
 #   Column                    Non-Null Count  Dtype 
---  ------                    --------------  ----- 
 0   customer_id               99441 non-null  object
 1   customer_unique_id        99441 non-null  object
 2   customer_zip_code_prefix  99441 non-null  int64 
 3   customer_city             99441 non-null  object
 4   customer_state            99441 non-null  object
dtypes: int64(1), object(4)
memory usage: 3.8+ MB


In [24]:
df_customers["customer_unique_id"].duplicated().sum()

np.int64(3345)

In [21]:
print(df_customers["customer_state"].unique())


['SP' 'SC' 'MG' 'PR' 'RJ' 'RS' 'PA' 'GO' 'ES' 'BA' 'MA' 'MS' 'CE' 'DF'
 'RN' 'PE' 'MT' 'AM' 'AP' 'AL' 'RO' 'PB' 'TO' 'PI' 'AC' 'SE' 'RR']


In [56]:
def clean_customers():
    # 1. Load data từ MinIO (raw)
    df = pd.read_csv(
        f"s3://{MINIO_BUCKET}/raw/olist_customers_dataset.csv",
        storage_options=STORAGE_OPTIONS
    )

    # 2. Dam bao zip code luon la chuoi 5 ky tu 
    df["customer_zip_code_prefix"] = (
        df["customer_zip_code_prefix"]
        .astype(str)
        .str.zfill(5) 
    )

    # 3. Loai bo trung lap 
    df = df.drop_duplicates(subset=["customer_id"], keep="first")

    # 4. Chuan hoa 
    df["customer_city"] = (
        df["customer_city"]
        .str.lower()
        .str.strip()
    )

    # 5. Mapping state code -> state nam
    state_map = {
        "AC": "Acre","AL": "Alagoas","AP": "Amapá","AM": "Amazonas","BA": "Bahia","CE": "Ceará",
        "DF": "Distrito Federal (Brasília)","ES": "Espírito Santo","GO": "Goiás","MA": "Maranhão",
        "MT": "Mato Grosso","MS": "Mato Grosso do Sul","MG": "Minas Gerais","PA": "Pará","PB": "Paraíba",
        "PR": "Paraná","PE": "Pernambuco","PI": "Piauí","RJ": "Rio de Janeiro","RN": "Rio Grande do Norte",
        "RS": "Rio Grande do Sul","RO": "Rondônia","RR": "Roraima","SC": "Santa Catarina",
        "SP": "São Paulo","SE": "Sergipe","TO": "Tocantins"
    }
    df["customer_state_name"] = df["customer_state"].map(state_map)

    # 6. save parquet → silver
    df.to_parquet(f"s3://{MINIO_BUCKET}/silver/customers.parquet",
                    storage_options=STORAGE_OPTIONS,
                    index=False)

    print(f"Customers cleaned: {df.shape[0]} rows → silver/")
    return df.head()


In [57]:
clean_customers()

Customers cleaned: 99441 rows → silver/


Unnamed: 0,customer_id,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state,customer_state_name
0,06b8999e2fba1a1fbc88172c00ba8bc7,861eff4711a542e4b93843c6dd7febb0,14409,franca,SP,São Paulo
1,18955e83d337fd6b2def6b18a428ac77,290c77bc529b7ac935b93aa66c333dc3,9790,sao bernardo do campo,SP,São Paulo
2,4e7b3e00288586ebd08712fdd0374a03,060e732b5b29e8181a18229c7b0b2b5e,1151,sao paulo,SP,São Paulo
3,b2b6027bc5c5109e529d4dc6358b12c3,259dac757896d24d7702b9acbbff3f3c,8775,mogi das cruzes,SP,São Paulo
4,4f2d8ab171c80ec8364f7c12e35b23ad,345ecd01c38d18a9036ed96c73b8d066,13056,campinas,SP,São Paulo


In [11]:
df_order_items = pd.read_csv(
        f"s3://{MINIO_BUCKET}/raw/olist_order_items_dataset.csv",
        storage_options=STORAGE_OPTIONS
    )

In [27]:
print(df_order_items.info())


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 112650 entries, 0 to 112649
Data columns (total 7 columns):
 #   Column               Non-Null Count   Dtype  
---  ------               --------------   -----  
 0   order_id             112650 non-null  object 
 1   order_item_id        112650 non-null  int64  
 2   product_id           112650 non-null  object 
 3   seller_id            112650 non-null  object 
 4   shipping_limit_date  112650 non-null  object 
 5   price                112650 non-null  float64
 6   freight_value        112650 non-null  float64
dtypes: float64(2), int64(1), object(4)
memory usage: 6.0+ MB
None


In [28]:
print(df_order_items.head())

                           order_id  order_item_id  \
0  00010242fe8c5a6d1ba2dd792cb16214              1   
1  00018f77f2f0320c557190d7a144bdd3              1   
2  000229ec398224ef6ca0657da4fc703e              1   
3  00024acbcdf0a6daa1e931b038114c75              1   
4  00042b26cf59d7ce69dfabb4e55b4fd9              1   

                         product_id                         seller_id  \
0  4244733e06e7ecb4970a6e2683c13e61  48436dade18ac8b2bce089ec2a041202   
1  e5f2d52b802189ee658865ca93d83a8f  dd7ddc04e1b6c2c614352b383efe2d36   
2  c777355d18b72b67abbeef9df44fd0fd  5b51032eddd242adc84c38acab88f23d   
3  7634da152a4610f1595efa32f14722fc  9d7a1d34a5052409006425275ba1c2b4   
4  ac6c3623068f30de03045865e4e10089  df560393f3a51e74553ab94004ba5c87   

   shipping_limit_date   price  freight_value  
0  2017-09-19 09:45:35   58.90          13.29  
1  2017-05-03 11:05:13  239.90          19.93  
2  2018-01-18 14:48:30  199.00          17.87  
3  2018-08-15 10:10:18   12.99          12.7

In [29]:
print(df_order_items.isna().sum())

order_id               0
order_item_id          0
product_id             0
seller_id              0
shipping_limit_date    0
price                  0
freight_value          0
dtype: int64


In [30]:
print("Số bản ghi trùng hoàn toàn:", df_order_items.duplicated().sum())

Số bản ghi trùng hoàn toàn: 0


In [31]:
dupes = df_order_items[df_order_items.duplicated(subset=["order_id", "order_item_id"], keep=False)]
print("Số bản ghi bị trùng (order_id + order_item_id):", len(dupes))

Số bản ghi bị trùng (order_id + order_item_id): 0


In [32]:
print("Số bản ghi price <= 0:", (df_order_items["price"] <= 0).sum())
print("Số bản ghi freight_value < 0:", (df_order_items["freight_value"] < 0).sum())

Số bản ghi price <= 0: 0
Số bản ghi freight_value < 0: 0


In [33]:
print("Kiểu dữ liệu cột shipping_limit_date:", df_order_items["shipping_limit_date"].dtype)
print(df_order_items["shipping_limit_date"].head())

Kiểu dữ liệu cột shipping_limit_date: object
0    2017-09-19 09:45:35
1    2017-05-03 11:05:13
2    2018-01-18 14:48:30
3    2018-08-15 10:10:18
4    2017-02-13 13:57:51
Name: shipping_limit_date, dtype: object


In [114]:
def clean_order_items():

    df = pd.read_csv(
        f"s3://{MINIO_BUCKET}/raw/olist_order_items_dataset.csv",
        storage_options=STORAGE_OPTIONS
    )

    df["shipping_limit_date"] = pd.to_datetime(
        df["shipping_limit_date"], errors="coerce", utc=True
    )

    df = df.drop_duplicates(subset=["order_id", "order_item_id", "product_id", "seller_id"])

    df.to_parquet(f"s3://{MINIO_BUCKET}/silver/order_items.parquet", 
                  storage_options=STORAGE_OPTIONS, index=False)

    print(f"Order_items cleaned: {df.shape[0]} rows → silver/")

    return df.head()


In [115]:
clean_order_items()

Order_items cleaned: 112650 rows → silver/


Unnamed: 0,order_id,order_item_id,product_id,seller_id,shipping_limit_date,price,freight_value
0,00010242fe8c5a6d1ba2dd792cb16214,1,4244733e06e7ecb4970a6e2683c13e61,48436dade18ac8b2bce089ec2a041202,2017-09-19 09:45:35+00:00,58.9,13.29
1,00018f77f2f0320c557190d7a144bdd3,1,e5f2d52b802189ee658865ca93d83a8f,dd7ddc04e1b6c2c614352b383efe2d36,2017-05-03 11:05:13+00:00,239.9,19.93
2,000229ec398224ef6ca0657da4fc703e,1,c777355d18b72b67abbeef9df44fd0fd,5b51032eddd242adc84c38acab88f23d,2018-01-18 14:48:30+00:00,199.0,17.87
3,00024acbcdf0a6daa1e931b038114c75,1,7634da152a4610f1595efa32f14722fc,9d7a1d34a5052409006425275ba1c2b4,2018-08-15 10:10:18+00:00,12.99,12.79
4,00042b26cf59d7ce69dfabb4e55b4fd9,1,ac6c3623068f30de03045865e4e10089,df560393f3a51e74553ab94004ba5c87,2017-02-13 13:57:51+00:00,199.9,18.14


In [36]:
df_products = pd.read_csv(
        f"s3://{MINIO_BUCKET}/raw/olist_products_dataset.csv",
        storage_options=STORAGE_OPTIONS
    )

In [44]:
print(df_products.shape)
print(df_products.dtypes)



(32951, 10)
product_id                     object
product_category_name          object
product_name_lenght           float64
product_description_lenght    float64
product_photos_qty            float64
product_weight_g              float64
product_length_cm             float64
product_height_cm             float64
product_width_cm              float64
product_volume_cm3            float64
dtype: object


In [45]:
print(df_products.isna().sum())

product_id                      0
product_category_name         610
product_name_lenght           610
product_description_lenght    610
product_photos_qty            610
product_weight_g                2
product_length_cm               2
product_height_cm               2
product_width_cm                2
product_volume_cm3              2
dtype: int64


In [39]:
num_cols = [
    "product_name_lenght",
    "product_description_lenght",
    "product_photos_qty",
    "product_weight_g",
    "product_length_cm",
    "product_height_cm",
    "product_width_cm"
]

print(df_products[num_cols].describe())


       product_name_lenght  product_description_lenght  product_photos_qty  \
count         32341.000000                32341.000000        32341.000000   
mean             48.476949                  771.495285            2.188986   
std              10.245741                  635.115225            1.736766   
min               5.000000                    4.000000            1.000000   
25%              42.000000                  339.000000            1.000000   
50%              51.000000                  595.000000            1.000000   
75%              57.000000                  972.000000            3.000000   
max              76.000000                 3992.000000           20.000000   

       product_weight_g  product_length_cm  product_height_cm  \
count      32949.000000       32949.000000       32949.000000   
mean        2276.472488          30.815078          16.937661   
std         4282.038731          16.914458          13.637554   
min            0.000000           7.0

In [40]:
for col in num_cols:
    print(col, (df_products[col] <= 0).sum())


product_name_lenght 0
product_description_lenght 0
product_photos_qty 0
product_weight_g 4
product_length_cm 0
product_height_cm 0
product_width_cm 0


In [46]:
df_products["product_volume_cm3"] = (
    df_products["product_length_cm"] * df_products["product_height_cm"] * df_products["product_width_cm"]
)

print("Weight < 50g:", (df_products["product_weight_g"] < 50).sum())
print("Volume < 10cm³:", (df_products["product_volume_cm3"] < 10).sum())

Weight < 50g: 10
Volume < 10cm³: 0


In [41]:
print(df_products["product_category_name"].nunique())
print(df_products["product_category_name"].value_counts().head(10))
print("Null:", df_products["product_category_name"].isna().sum())


73
product_category_name
cama_mesa_banho           3029
esporte_lazer             2867
moveis_decoracao          2657
beleza_saude              2444
utilidades_domesticas     2335
automotivo                1900
informatica_acessorios    1639
brinquedos                1411
relogios_presentes        1329
telefonia                 1134
Name: count, dtype: int64
Null: 610


In [42]:
print("Trùng product_id:", df_products["product_id"].duplicated().sum())


Trùng product_id: 0


In [61]:

import numpy as np
def clean_products():

    df = pd.read_csv(
        f"s3://{MINIO_BUCKET}/raw/olist_products_dataset.csv",
        storage_options=STORAGE_OPTIONS
    )

    df["product_category_name"] = df["product_category_name"].fillna("unknown")


    try:
        trans = pd.read_csv(
            f"s3://{MINIO_BUCKET}/raw/product_category_name_translation.csv",
            storage_options=STORAGE_OPTIONS
        )
        df = df.merge(trans, on="product_category_name", how="left")
        df["product_category_name"] = df["product_category_name_english"].fillna(df["product_category_name"])
        df.drop(columns=["product_category_name_english"], inplace=True)
    except Exception as e:
        print("Không tìm thấy file translation:", e)


    num_cols = [
        "product_name_lenght", "product_description_lenght", "product_photos_qty",
        "product_weight_g", "product_length_cm", "product_height_cm", "product_width_cm"
    ]
    for col in num_cols:
        df[col] = pd.to_numeric(df[col], errors="coerce")

    df.loc[df["product_weight_g"] <= 0, "product_weight_g"] = np.nan
    df["size_anomaly"] = df["product_weight_g"].apply(lambda x: 1 if x is not None and x < 50 else 0)

    df["product_volume_cm3"] = (
        df["product_length_cm"] * df["product_height_cm"] * df["product_width_cm"]
    )

    df = df.drop_duplicates(subset=["product_id"])

    df.to_parquet(
        f"s3://{MINIO_BUCKET}/silver/products.parquet",
        index=False,
        storage_options=STORAGE_OPTIONS
    )
    print(f"Products cleaned: {df.shape[0]} rows → silver/")
    return df.head()


In [62]:

clean_products()

Products cleaned: 32951 rows → silver/


Unnamed: 0,product_id,product_category_name,product_name_lenght,product_description_lenght,product_photos_qty,product_weight_g,product_length_cm,product_height_cm,product_width_cm,size_anomaly,product_volume_cm3
0,1e9e8ef04dbcff4541ed26657ea517e5,perfumery,40.0,287.0,1.0,225.0,16.0,10.0,14.0,0,2240.0
1,3aa071139cb16b67ca9e5dea641aaa2f,art,44.0,276.0,1.0,1000.0,30.0,18.0,20.0,0,10800.0
2,96bd76ec8810374ed1b65e291975717f,sports_leisure,46.0,250.0,1.0,154.0,18.0,9.0,15.0,0,2430.0
3,cef67bcfe19066a932b7673e239eb23d,baby,27.0,261.0,1.0,371.0,26.0,4.0,26.0,0,2704.0
4,9dc1a7de274444849c219cff195d0b71,housewares,37.0,402.0,4.0,625.0,20.0,17.0,13.0,0,4420.0


In [90]:
df_sellers = pd.read_csv(
        f"s3://{MINIO_BUCKET}/raw/olist_sellers_dataset.csv",
        storage_options=STORAGE_OPTIONS
    )

In [59]:
print(df_sellers.shape)
print(df_sellers.dtypes)
print(df_sellers.isna().sum())


(3095, 4)
seller_id                 object
seller_zip_code_prefix     int64
seller_city               object
seller_state              object
dtype: object
seller_id                 0
seller_zip_code_prefix    0
seller_city               0
seller_state              0
dtype: int64


In [60]:
print("Seller_id duplicated:", df_sellers["seller_id"].duplicated().sum())


Seller_id duplicated: 0


In [61]:
print(df_sellers["seller_state"].value_counts())
print((df_sellers["seller_state"].str.len() != 2).sum())


seller_state
SP    1849
PR     349
MG     244
SC     190
RJ     171
RS     129
GO      40
DF      30
ES      23
BA      19
CE      13
PE       9
PB       6
MS       5
RN       5
MT       4
RO       2
SE       2
AC       1
PI       1
MA       1
AM       1
PA       1
Name: count, dtype: int64
0


In [62]:
print(df_sellers["seller_zip_code_prefix"].describe())
print((df_sellers["seller_zip_code_prefix"] <= 0).sum())


count     3095.000000
mean     32291.059451
std      32713.453830
min       1001.000000
25%       7093.500000
50%      14940.000000
75%      64552.500000
max      99730.000000
Name: seller_zip_code_prefix, dtype: float64
0


In [63]:
print(df_sellers["seller_city"].isna().sum())
print(df_sellers["seller_city"].nunique())
print(df_sellers["seller_city"].value_counts().head(10))


0
611
seller_city
sao paulo         694
curitiba          127
rio de janeiro     96
belo horizonte     68
ribeirao preto     52
guarulhos          50
ibitinga           49
santo andre        45
campinas           41
maringa            40
Name: count, dtype: int64


In [98]:
def clean_sellers():
    df = pd.read_csv(
        f"s3://{MINIO_BUCKET}/raw/olist_sellers_dataset.csv",
        storage_options=STORAGE_OPTIONS
    )

    df = df.drop_duplicates(subset=["seller_id"])


    df["seller_city"] = df["seller_city"].str.strip().str.title()

    df["seller_state"] = df["seller_state"].str.upper()
    df.loc[df["seller_state"].str.len() != 2, "seller_state"] = "Unknown"

    df["seller_zip_code_prefix"] = (df["seller_zip_code_prefix"].astype(str).str.zfill(5))

    df.to_parquet(
        f"s3://{MINIO_BUCKET}/silver/sellers.parquet",
        index=False,
        storage_options=STORAGE_OPTIONS
    )

    print("sellers → silver (cleaned)")

    return df.head()


In [99]:
clean_sellers()

sellers → silver (cleaned)


Unnamed: 0,seller_id,seller_zip_code_prefix,seller_city,seller_state
0,3442f8959a84dea7ee197c632cb2df15,13023,Campinas,SP
1,d1b65fc7debc3361ea86b5f14c68d2e2,13844,Mogi Guacu,SP
2,ce3ad9de960102d0677a81f5d0bb7b2d,20031,Rio De Janeiro,RJ
3,c0f3eea2e14555b6faeea3dd58c1b1c3,4195,Sao Paulo,SP
4,51a04a8a6bdcb23deccc82b0b80742cf,12914,Braganca Paulista,SP


In [4]:
df_payments = pd.read_csv(
        f"s3://{MINIO_BUCKET}/raw/olist_order_payments_dataset.csv",
        storage_options=STORAGE_OPTIONS
    )


In [5]:
print(df_payments.shape)
print(df_payments.isna().sum())
print(df_payments.duplicated().sum())


(103886, 5)
order_id                0
payment_sequential      0
payment_type            0
payment_installments    0
payment_value           0
dtype: int64
0


In [6]:
print(df_payments["payment_type"].value_counts())


payment_type
credit_card    76795
boleto         19784
voucher         5775
debit_card      1529
not_defined        3
Name: count, dtype: int64


In [7]:
print((df_payments["payment_installments"] <= 0).sum())


2


In [8]:
print((df_payments["payment_value"] <= 0).sum())


9


In [65]:
import numpy as np
def clean_payments():

    df = pd.read_csv(
        f"s3://{MINIO_BUCKET}/raw/olist_order_payments_dataset.csv",
        storage_options=STORAGE_OPTIONS
    )


    df = df.drop_duplicates()

    df["payment_type"] = df["payment_type"].fillna("unknown")
    df["payment_type"] = df["payment_type"].replace("not_defined", "unknown")


    df.loc[df["payment_installments"] <= 0, "payment_installments"] = 1


    df["payment_value"] = pd.to_numeric(df["payment_value"], errors="coerce")
    df.loc[df["payment_value"] <= 0, "payment_value"] = np.nan


    df.to_parquet(
        f"s3://{MINIO_BUCKET}/silver/payments.parquet",
        index=False,
        storage_options=STORAGE_OPTIONS
    )

    print("payments → silver (cleaned)")
    return df.head()


In [66]:
clean_payments()

payments → silver (cleaned)


Unnamed: 0,order_id,payment_sequential,payment_type,payment_installments,payment_value
0,b81ef226f3fe1789b1e8b2acac839d17,1,credit_card,8,99.33
1,a9810da82917af2d9aefd1278f1dcfa0,1,credit_card,1,24.39
2,25e8ea4e93396b6fa0d3dd708e76c1bd,1,credit_card,1,65.71
3,ba78997921bbcdc1373bb41e913ab953,1,credit_card,8,107.78
4,42fdf880ba16b47b59251dd489d4441a,1,credit_card,2,128.45


In [9]:
pay_sum_by_order = df_payments.groupby("order_id")["payment_value"].sum().reset_index()
pay_sum_by_order.rename(columns={"payment_value": "pay_sum"}, inplace=True)


In [12]:
item_sum_by_order = df_order_items.groupby("order_id")[["price", "freight_value"]].sum().reset_index()
item_sum_by_order["item_sum"] = item_sum_by_order["price"] + item_sum_by_order["freight_value"]


In [14]:
df_check = pd.merge(item_sum_by_order, pay_sum_by_order, on="order_id", how="inner")
df_check["diff"] = df_check["item_sum"] - df_check["pay_sum"]



In [15]:

print("Tổng số đơn hàng:", len(df_check))
print("Đơn hàng khớp:", (df_check["diff"] == 0).sum())
print("Đơn hàng lệch:", (df_check["diff"] != 0).sum())

print("\nPhân phối diff:")
print(df_check["diff"].describe())


small_diff = (df_check["diff"].abs() <= 1).sum()
print("\nSố đơn lệch nhỏ (<= 1):", small_diff)


print("\nTop 10 đơn hàng lệch lớn nhất:")
print(df_check.loc[df_check["diff"].abs().nlargest(10).index, ["order_id", "item_sum", "pay_sum", "diff"]])


Tổng số đơn hàng: 98665
Đơn hàng khớp: 79051
Đơn hàng lệch: 19614

Phân phối diff:
count    98665.000000
mean        -0.029092
std          1.129221
min       -182.810000
25%          0.000000
50%          0.000000
75%          0.000000
max         51.620000
Name: diff, dtype: float64

Số đơn lệch nhỏ (<= 1): 98416

Top 10 đơn hàng lệch lớn nhất:
                               order_id  item_sum  pay_sum    diff
79537  ce6d150fb29ada17d2082f4847107665   1403.66  1586.47 -182.81
42515  6e5fe7366a2e1bfbf3257dba0af1267f    287.91   406.92 -119.01
43434  70b742795bc441e94a44a084b6d9ce7a    466.93   578.82 -111.89
58752  996c7e73600ad3723e8627ab7bef81e4    587.90   664.43  -76.53
43437  70b7e94ea46d3e8b5bc12a50186edaf0    213.15   274.84  -61.69
72496  bc2c82b0ef78d2252b6176d1972db7c9    242.01   303.02  -61.01
67466  af9ffff2ce6b3defd34fd4c78857a379    413.17   466.97  -53.80
14628  262118ce178bb3e4590a3adcf6d62e6b    177.74   126.12   51.62
73899  bfdb5bbb06458d600a33d61f5f287472    348.9

In [67]:
def validate_payments_vs_items(df_items, df_payments, threshold=10):


    item_sum_by_order = (
        df_items.groupby("order_id")[["price", "freight_value"]]
        .sum()
        .reset_index()
    )
    item_sum_by_order["item_sum"] = item_sum_by_order["price"] + item_sum_by_order["freight_value"]


    pay_sum_by_order = (
        df_payments.groupby("order_id")["payment_value"]
        .sum()
        .reset_index()
        .rename(columns={"payment_value": "pay_sum"})
    )


    df_check = pd.merge(item_sum_by_order, pay_sum_by_order, on="order_id", how="inner")


    df_check["payment_item_diff"] = df_check["item_sum"] - df_check["pay_sum"]


    df_check["payment_item_anomaly"] = (df_check["payment_item_diff"].abs() > threshold).astype(int)


    df_check.to_parquet(
        f"s3://{MINIO_BUCKET}/silver/payments_check.parquet",
        index=False,
        storage_options=STORAGE_OPTIONS
    )

    print("Đã tạo bảng đối chiếu payments_check")
    return df_check.head()


In [68]:
validate_payments_vs_items(df_order_items,df_payments)

Đã tạo bảng đối chiếu payments_check


Unnamed: 0,order_id,price,freight_value,item_sum,pay_sum,payment_item_diff,payment_item_anomaly
0,00010242fe8c5a6d1ba2dd792cb16214,58.9,13.29,72.19,72.19,0.0,0
1,00018f77f2f0320c557190d7a144bdd3,239.9,19.93,259.83,259.83,0.0,0
2,000229ec398224ef6ca0657da4fc703e,199.0,17.87,216.87,216.87,0.0,0
3,00024acbcdf0a6daa1e931b038114c75,12.99,12.79,25.78,25.78,0.0,0
4,00042b26cf59d7ce69dfabb4e55b4fd9,199.9,18.14,218.04,218.04,2.842171e-14,0


In [69]:
df_reviews = pd.read_csv(
    f"s3://{MINIO_BUCKET}/raw/olist_order_reviews_dataset.csv",
    storage_options=STORAGE_OPTIONS
)

In [70]:
print("Kích thước:", df_reviews.shape)
print("\nKiểu dữ liệu:")
print(df_reviews.dtypes)


Kích thước: (99224, 7)

Kiểu dữ liệu:
review_id                  object
order_id                   object
review_score                int64
review_comment_title       object
review_comment_message     object
review_creation_date       object
review_answer_timestamp    object
dtype: object


In [71]:
print("\nSố lượng null theo cột:")
print(df_reviews.isna().sum())



Số lượng null theo cột:
review_id                      0
order_id                       0
review_score                   0
review_comment_title       87656
review_comment_message     58247
review_creation_date           0
review_answer_timestamp        0
dtype: int64


In [26]:
print("\nSố lượng review_id trùng lặp:", df_reviews["review_id"].duplicated().sum())



Số lượng review_id trùng lặp: 814


In [27]:
print("\nPhân phối review_score:")
print(df_reviews["review_score"].value_counts(dropna=False).sort_index())



Phân phối review_score:
review_score
1    11424
2     3151
3     8179
4    19142
5    57328
Name: count, dtype: int64


In [28]:
print("\nKiểm tra giá trị ngày tháng không hợp lệ:")
df_reviews["review_creation_date"] = pd.to_datetime(df_reviews["review_creation_date"], errors="coerce")
df_reviews["review_answer_timestamp"] = pd.to_datetime(df_reviews["review_answer_timestamp"], errors="coerce")
print("Số bản ghi ngày tạo null:", df_reviews["review_creation_date"].isna().sum())
print("Số bản ghi ngày phản hồi null:", df_reviews["review_answer_timestamp"].isna().sum())
print("Số bản ghi có creation_date > answer_timestamp:", (df_reviews["review_creation_date"] > df_reviews["review_answer_timestamp"]).sum())



Kiểm tra giá trị ngày tháng không hợp lệ:
Số bản ghi ngày tạo null: 0
Số bản ghi ngày phản hồi null: 0
Số bản ghi có creation_date > answer_timestamp: 0


In [29]:
multi_review = df_reviews.groupby("order_id")["review_id"].count()
print("\nSố đơn hàng có nhiều review:", (multi_review > 1).sum())
print("Ví dụ order có nhiều review:")
print(multi_review[multi_review > 1].head(10))


Số đơn hàng có nhiều review: 547
Ví dụ order có nhiều review:
order_id
0035246a40f520710769010f752e7507    2
013056cfe49763c6f66bda03396c5ee3    2
0176a6846bcb3b0d3aa3116a9a768597    2
02355020fd0a40a0d56df9f6ff060413    2
029863af4b968de1e5d6a82782e662f5    2
02e0b68852217f5715fb9cc885829454    2
02e723e8edb4a123d414f56cc9c4665e    2
03515a836bb855b03f7df9dee520a8fc    2
03c939fd7fd3b38f8485a0f95798f1f6    3
03eba6d9fef8f5b3e811d4b5a7cca9cd    2
Name: review_id, dtype: int64


In [30]:
def clean_reviews():

    df = pd.read_csv(
        f"s3://{MINIO_BUCKET}/raw/olist_order_reviews_dataset.csv",
        storage_options=STORAGE_OPTIONS
    )

    df["review_creation_date"] = pd.to_datetime(df["review_creation_date"], errors="coerce", utc=True)
    df["review_answer_timestamp"] = pd.to_datetime(df["review_answer_timestamp"], errors="coerce", utc=True)

    df = df.sort_values("review_answer_timestamp")
    df = df.drop_duplicates(subset=["review_id"], keep="last")

    df = df.drop_duplicates(subset=["order_id"], keep="last")


    for col in ["review_comment_title", "review_comment_message"]:
        df[col] = df[col].astype(str).str.strip().replace("nan", np.nan)

    df.to_parquet(
        f"s3://{MINIO_BUCKET}/silver/reviews.parquet",
        index=False,
        storage_options=STORAGE_OPTIONS
    )

    print("reviews → silver (cleaned)")
    return df.head()

In [31]:
clean_reviews()

reviews → silver (cleaned)


Unnamed: 0,review_id,order_id,review_score,review_comment_title,review_comment_message,review_creation_date,review_answer_timestamp
37547,6916ca4502d6d3bfd39818759d55d536,bfbd0f9bdef84302105ad712db648a6c,1,,nao recebi o produto e nem resposta da empresa,2016-10-06 00:00:00+00:00,2016-10-07 18:32:28+00:00
5503,49f695dffa457eaba90d388a5c37e942,e5215415bb6f76fe3b7cb68103a0d1c0,1,,"PRODUTO NÃO CHEGOU,E JÁ PASSOU O PRAZO DE ENTREGA",2016-10-09 00:00:00+00:00,2016-10-11 14:31:29+00:00
60439,743d98b1a4782f0646898fc915ef002a,e2144124f98f3bf46939bc5183104041,4,,,2016-10-15 00:00:00+00:00,2016-10-16 03:20:17+00:00
28075,53752edb26544dd41c1209f582c9c589,b8b9d7046c083150cb5360b83a8ebb51,5,,O pedido foi entregue antes do prazo pr0metido,2016-10-16 01:00:00+00:00,2016-10-16 15:45:11+00:00
41042,b2d5d8db2a841d27a72e4c06c6212368,9aa3197e4887919fde0307fc23601d7a,4,,Só chegou uma parte do pedido ate agora..,2016-10-15 00:00:00+00:00,2016-10-17 21:02:49+00:00


In [32]:
df_geo = pd.read_csv(
    f"s3://{MINIO_BUCKET}/raw/olist_geolocation_dataset.csv",
    storage_options=STORAGE_OPTIONS
)

In [33]:
print("Kích thước:", df_geo.shape)
print("\nKiểu dữ liệu:")
print(df_geo.dtypes)

# 2. Null values
print("\nSố lượng null theo cột:")
print(df_geo.isna().sum())

Kích thước: (1000163, 5)

Kiểu dữ liệu:
geolocation_zip_code_prefix      int64
geolocation_lat                float64
geolocation_lng                float64
geolocation_city                object
geolocation_state               object
dtype: object

Số lượng null theo cột:
geolocation_zip_code_prefix    0
geolocation_lat                0
geolocation_lng                0
geolocation_city               0
geolocation_state              0
dtype: int64


In [34]:
print("\nSố bản ghi trùng lặp hoàn toàn:", df_geo.duplicated().sum())



Số bản ghi trùng lặp hoàn toàn: 261831


In [35]:
dup_zip = df_geo.groupby("geolocation_zip_code_prefix").size().reset_index(name="count")
print("\nSố zip_code_prefix xuất hiện nhiều hơn 1 lần:", (dup_zip["count"] > 1).sum())
print("Ví dụ zip_code_prefix trùng:")
print(dup_zip[dup_zip["count"] > 1].head(10))



Số zip_code_prefix xuất hiện nhiều hơn 1 lần: 17972
Ví dụ zip_code_prefix trùng:
   geolocation_zip_code_prefix  count
0                         1001     26
1                         1002     13
2                         1003     17
3                         1004     22
4                         1005     25
5                         1006      9
6                         1007     26
7                         1008     16
8                         1009     41
9                         1010     18


In [36]:
print("\nSố city duy nhất trước chuẩn hóa:", df_geo["geolocation_city"].nunique())
print("Ví dụ city:", df_geo["geolocation_city"].sample(10).tolist())
print("\nSố state duy nhất:", df_geo["geolocation_state"].nunique())
print("Danh sách state:", df_geo["geolocation_state"].unique())


Số city duy nhất trước chuẩn hóa: 8011
Ví dụ city: ['sao paulo', 'rio de janeiro', 'franca', 'brasilia', 'tatui', 'joao pinheiro', 'caraguatatuba', 'embu das artes', 'sao paulo', 'jacui']

Số state duy nhất: 27
Danh sách state: ['SP' 'RN' 'AC' 'RJ' 'ES' 'MG' 'BA' 'SE' 'PE' 'AL' 'PB' 'CE' 'PI' 'MA'
 'PA' 'AP' 'AM' 'RR' 'DF' 'GO' 'RO' 'TO' 'MT' 'MS' 'RS' 'PR' 'SC']


In [96]:

def clean_geolocation():

    df = pd.read_csv(
        f"s3://{MINIO_BUCKET}/raw/olist_geolocation_dataset.csv",
        storage_options=STORAGE_OPTIONS
    )


    before = df.shape[0]
    df = df.drop_duplicates()
    after = df.shape[0]
    print(f"Đã loại bỏ {before - after} bản ghi trùng lặp hoàn toàn")


    df["geolocation_city"] = (
        df["geolocation_city"]
        .str.lower()
        .str.strip()
    )
    df["geolocation_zip_code_prefix"] = (df["geolocation_zip_code_prefix"].astype(str).str.zfill(5))
    
    df.to_parquet(
        f"s3://{MINIO_BUCKET}/silver/geolocation.parquet",
        index=False,
        storage_options=STORAGE_OPTIONS
    )

    print("geolocation → silver (cleaned)")
    return df.head()

In [97]:
clean_geolocation()

Đã loại bỏ 261831 bản ghi trùng lặp hoàn toàn
geolocation → silver (cleaned)


Unnamed: 0,geolocation_zip_code_prefix,geolocation_lat,geolocation_lng,geolocation_city,geolocation_state
0,1037,-23.545621,-46.639292,sao paulo,SP
1,1046,-23.546081,-46.64482,sao paulo,SP
2,1046,-23.546129,-46.642951,sao paulo,SP
3,1041,-23.544392,-46.639499,sao paulo,SP
4,1035,-23.541578,-46.641607,sao paulo,SP


In [72]:
import json 
def normalize_sample(val):
    if isinstance(val, pd.Series) or isinstance(val, np.ndarray):
        val = val.tolist()
    if val is None:
        val = []
    if not isinstance(val, str):
        val = json.dumps(val, ensure_ascii=False)
    return val


In [116]:
def load_silver_tables():
    silver_paths = {
        "orders": "orders.parquet",
        "customers": "customers.parquet",
        "order_items": "order_items.parquet",
        "products": "products.parquet",
        "sellers": "sellers.parquet",
        "payments": "payments.parquet",
        "reviews": "reviews.parquet",
        "geolocation": "geolocation.parquet",
    }

    tables = {}
    for name, file in silver_paths.items():
        tables[name] = pd.read_parquet(
            f"s3://{MINIO_BUCKET}/silver/{file}",
            storage_options=STORAGE_OPTIONS
        )
    return tables


In [127]:
from datetime import datetime
def validate_data(tables: dict):
    dq_checks = []

    orders = tables["orders"]
    customers = tables["customers"]
    order_items = tables["order_items"]
    products = tables["products"]
    sellers = tables["sellers"]
    payments = tables["payments"]
    reviews = tables["reviews"]
    geolocation = tables["geolocation"]

    invalid_orders = orders[~orders["customer_id"].isin(customers["customer_id"])]
    dq_checks.append({
        "check_name": "orders_customer_fk",
        "table_name": "orders",
        "status": "FAILED" if not invalid_orders.empty else "PASSED",
        "num_violations": len(invalid_orders),
        "sample_records": invalid_orders["order_id"].head(5).tolist()
    })

    invalid_items_order = order_items[~order_items["order_id"].isin(orders["order_id"])]
    dq_checks.append({
        "check_name": "order_items_order_fk",
        "table_name": "order_items",
        "status": "FAILED" if not invalid_items_order.empty else "PASSED",
        "num_violations": len(invalid_items_order),
        "sample_records": invalid_items_order["order_id"].head(5).tolist()
    })

    invalid_items_seller = order_items[~order_items["seller_id"].isin(sellers["seller_id"])]
    dq_checks.append({
        "check_name": "order_items_seller_fk",
        "table_name": "order_items",
        "status": "FAILED" if not invalid_items_seller.empty else "PASSED",
        "num_violations": len(invalid_items_seller),
        "sample_records": invalid_items_seller["seller_id"].head(5).tolist()
    })

    invalid_items_product = order_items[~order_items["product_id"].isin(products["product_id"])]
    dq_checks.append({
        "check_name": "order_items_product_fk",
        "table_name": "order_items",
        "status": "FAILED" if not invalid_items_product.empty else "PASSED",
        "num_violations": len(invalid_items_product),
        "sample_records": invalid_items_product["product_id"].head(5).tolist()
    })

    invalid_payments = payments[~payments["order_id"].isin(orders["order_id"])]
    dq_checks.append({
        "check_name": "payments_order_fk",
        "table_name": "order_payments",
        "status": "FAILED" if not invalid_payments.empty else "PASSED",
        "num_violations": len(invalid_payments),
        "sample_records": invalid_payments["order_id"].head(5).tolist()
    })

    invalid_reviews = reviews[~reviews["order_id"].isin(orders["order_id"])]
    dq_checks.append({
        "check_name": "reviews_order_fk",
        "table_name": "order_reviews",
        "status": "FAILED" if not invalid_reviews.empty else "PASSED",
        "num_violations": len(invalid_reviews),
        "sample_records": invalid_reviews["order_id"].head(5).tolist()
    })

    # Tập zip từ geolocation
    geo_zips = set(geolocation["geolocation_zip_code_prefix"].unique())

    # Customers zip coverage
    customers["zip_flag"] = customers["customer_zip_code_prefix"].isin(geo_zips).astype(int)
    invalid_customers_zip = customers[customers["zip_flag"] == 0]

    dq_checks.append({
        "check_name": "customers_zip_coverage",
        "table_name": "customers",
        "status": "FAILED" if len(invalid_customers_zip) > 0 else "PASSED",
        "num_violations": len(invalid_customers_zip),
        "coverage_rate": round(customers["zip_flag"].mean() * 100, 2),  # %
        "sample_records": invalid_customers_zip["customer_id"].head(5).tolist()
    })

    # Sellers zip coverage
    sellers["zip_flag"] = sellers["seller_zip_code_prefix"].isin(geo_zips).astype(int)
    invalid_sellers_zip = sellers[sellers["zip_flag"] == 0]

    dq_checks.append({
        "check_name": "sellers_zip_coverage",
        "table_name": "sellers",
        "status": "FAILED" if len(invalid_sellers_zip) > 0 else "PASSED",
        "num_violations": len(invalid_sellers_zip),
        "coverage_rate": round(sellers["zip_flag"].mean() * 100, 2),
        "sample_records": invalid_sellers_zip["seller_id"].head(5).tolist()
    })


    # Consistency: đối chiếu tiền
    items_sum = order_items.groupby("order_id")[["price","freight_value"]].sum().sum(axis=1)
    payments_sum = payments.groupby("order_id")["payment_value"].sum()
    amount_check = pd.concat([items_sum, payments_sum], axis=1).fillna(0)
    amount_check.columns = ["sum_items", "sum_payments"]
    amount_check["diff"] = (amount_check["sum_items"] - amount_check["sum_payments"]).abs()
    mismatch_orders = amount_check[amount_check["diff"] > 1e-2]
    dq_checks.append({
        "check_name": "amount_consistency",
        "table_name": "orders",
        "status": "FAILED" if not mismatch_orders.empty else "PASSED",
        "num_violations": len(mismatch_orders),
        "sample_records": mismatch_orders.index[:5].tolist()
    })

    for rec in dq_checks:
        rec["sample_records"] = normalize_sample(rec.get("sample_records"))

    dq_report = pd.DataFrame(dq_checks)
    return dq_report


In [128]:

tables = load_silver_tables()

In [129]:
dq_report = validate_data(tables)



In [130]:
dq_report

Unnamed: 0,check_name,table_name,status,num_violations,sample_records,coverage_rate
0,orders_customer_fk,orders,PASSED,0,[],
1,order_items_order_fk,order_items,PASSED,0,[],
2,order_items_seller_fk,order_items,PASSED,0,[],
3,order_items_product_fk,order_items,PASSED,0,[],
4,payments_order_fk,order_payments,PASSED,0,[],
5,reviews_order_fk,order_reviews,PASSED,0,[],
6,customers_zip_coverage,customers,FAILED,278,"[""ecb1725b26e8b8c458181455dfa434ea"", ""bcf86029...",99.72
7,sellers_zip_coverage,sellers,FAILED,7,"[""5962468f885ea01a1b6a97a218797b0a"", ""2aafae69...",99.77
8,amount_consistency,orders,FAILED,1154,"[""00789ce015e7e5791c7914f32bb4fad4"", ""01672623...",


In [100]:
df_customers["customer_zip_code_prefix"] = (
    df_customers["customer_zip_code_prefix"].astype(str).str.zfill(5)
)
df_sellers["seller_zip_code_prefix"] = (
    df_sellers["seller_zip_code_prefix"].astype(str).str.zfill(5)
)
df_geo["geolocation_zip_code_prefix"] = (
    df_geo["geolocation_zip_code_prefix"].astype(str).str.zfill(5)
)


In [101]:
# Lấy unique zip
cust_zips = df_customers["customer_zip_code_prefix"].unique()
geo_zips = df_geo["geolocation_zip_code_prefix"].unique()

# Kiểm tra giao nhau
intersection = set(cust_zips).intersection(set(geo_zips))
print("Số lượng zip code match:", len(intersection))

# Nếu có match, in thử vài zip
print("Ví dụ zip code match:", list(intersection)[:10])


Số lượng zip code match: 14837
Ví dụ zip code match: ['91030', '01327', '40252', '78290', '18145', '88490', '74270', '86975', '72306', '05346']


In [94]:
print("Customers:", df_customers["customer_zip_code_prefix"].dtype)
print("Sellers:", df_sellers["seller_zip_code_prefix"].dtype)
print("Geolocation:", df_geo["geolocation_zip_code_prefix"].dtype)


Customers: object
Sellers: object
Geolocation: object


In [6]:
raw_orders = pd.read_csv(
    f"s3://{MINIO_BUCKET}/raw/olist_orders_dataset.csv",
    storage_options=STORAGE_OPTIONS
)
print(raw_orders.groupby("customer_id")["order_id"].nunique().sort_values(ascending=False).head(10))


customer_id
00012a2ce6f8dcda20d059ce98491703    1
000161a058600d5901f007fab4c27140    1
0001fd6190edaaf884bcaf3d49edf079    1
0002414f95344307404f0ace7a26f1d5    1
000379cdec625522490c315e70c7a9fb    1
0004164d20a9e969af783496f3408652    1
000419c5494106c306a97b5635748086    1
00046a560d407e99b969756e0b10f282    1
00050bf6e01e69d5c0fd612f1bcfb69c    1
000598caf2ef4117407665ac33275130    1
Name: order_id, dtype: int64


In [7]:
print(raw_orders.shape)
print(raw_orders["customer_id"].nunique())


(99441, 8)
99441
