In [1]:
import numpy as np
import pandas as pd
import yaml
import re
import json
import itertools
from dateutil import parser
from tqdm import tqdm
from collections import defaultdict

In [2]:
def load_yaml(path):
    with open(path, 'r') as f:
        data = yaml.safe_load(f) 
    cleaned_data = [{k.lstrip(':'): v for k, v in row.items()} for row in data]
    df = pd.DataFrame(cleaned_data)
    return df


In [3]:
class UnionFind:
    def __init__(self,n):
        self.parent=list(range(n))

    def find(self,x):
        if self.parent[x]!=x:
            self.parent[x]=self.find(self.parent[x])
        return self.parent[x]

    def union(self,x,y):
        root_x=self.find(x)
        root_y=self.find(y)
        if root_x!=root_y:
            self.parent[root_y]=root_x

In [4]:
def normalize_phone(phone):
    if not isinstance(phone, str) or phone.lower() == "nan":
        return ""

    return re.sub(r"\D", "", phone)

In [5]:
def find_unique_users(df):
    n = len(df)
    uf = UnionFind(n)

    # Normalize columns
    df["name"] = df["name"].astype(str).str.strip().str.lower()
    df["address"] = df["address"].astype(str).str.strip().str.lower()
    df["phone"] = df["phone"].apply(normalize_phone)
    # List of 2-field combinations
    field_pairs = [("name", "address"), ("name", "phone"), ("address", "phone")]

    for f1, f2 in field_pairs:
        mapping = defaultdict(list)
        for idx, row in tqdm(df.iterrows(), total=n, desc=f"Building hash for {f1}+{f2}"):
            key = (row[f1], row[f2])
            mapping[key].append(idx)

        # Union all indices sharing the same 2-field key
        for indices in tqdm(mapping.values(), desc=f"Union clusters for {f1}+{f2}"):
            first = indices[0]
            for other in indices[1:]:
                uf.union(first, other)

    df["cluster"] = [uf.find(i) for i in range(n)]
    return df

In [6]:

def price_to_usd(p, euro_to_usd=1.2):
    """
    Convert a single messy price string to float USD.
    Handles:
    - € amounts with ¢ or decimals
    - $ amounts with ¢ or decimals
    - USD with number before or after
    """
    if not isinstance(p, str):
        return None

    p_clean = p.replace(" ", "")  

    # Euro with ¢
    match = re.match(r'€(\d+)¢(\d+)', p_clean)
    if match:
        euros, cents = match.groups()
        return round((int(euros) + int(cents)/100) * euro_to_usd, 2)

    # Euro with decimal
    match = re.match(r'€(\d+\.?\d*)', p_clean)
    if match:
        euros = float(match.group(1))
        return round(euros * euro_to_usd, 2)

    # USD / $ with ¢ 
    match = re.match(r'\$?(\d+)¢(\d+)', p_clean)
    if match:
        dollars, cents = match.groups()
        return round(float(dollars) + int(cents)/100, 2)

    # USD before/after
    match = re.match(r'(?:USD)?(\d+\.?\d*)\$?', p_clean)
    if match:
        return round(float(match.group(1)), 2)

    return None

In [7]:
def clean_datetime(value):
    if not value or not isinstance(value, str):
        return None
    s = value.strip()
    s = s.replace(";", " ")
    s = s.replace(",", " ")
    s = s.replace("  ", " ").strip()

    # Normalize AM/PM variations 
    for variant in ["A.M.", "a.m.", "A.M", "a.m", "AM.", "am.", "Am.", "am"]:
        s = s.replace(variant, "AM")
    for variant in ["P.M.", "p.m.", "P.M", "p.m", "PM.", "pm.", "Pm.", "pm"]:
        s = s.replace(variant, "PM")

    while "  " in s:
        s = s.replace("  ", " ")
    try:
        return parser.parse(s, fuzzy=True)
    except Exception:
        return None

In [8]:
def extract_author_set(authors_str):
    authors = [a.strip().lower() for a in authors_str.split(",") if a.strip()]
    return frozenset(authors)

In [9]:
def extract_ymd(dt):
    if pd.isna(dt):
        return pd.Series([None, None, None])
    return pd.Series([dt.year, dt.month, dt.day])
    
    

In [10]:
def standardize_missing(df, missing_values=None):
    if missing_values is None:
        missing_values = {'', ' ', 'null', 'NULL', 'None', 'none', 'nan'}
    df_clean = df.copy()
    for col in df_clean.columns:
        df_clean[col] = df_clean[col].apply(
            lambda x: np.nan if pd.isna(x) or str(x).strip() in missing_values else x
        )
    return df_clean

In [11]:
def pipeline(path,start):
    print(f"start: {start}")
    # load data
    books=load_yaml(path+"books.yaml")
    orders=pd.read_parquet(path+"orders.parquet")
    users=pd.read_csv(path+"users.csv")
    #process orders
    orders['unit_price']=orders['unit_price'].apply(price_to_usd)
    orders['timestamp']=orders['timestamp'].apply(clean_datetime)
    orders['paid_price']=orders['unit_price']*orders['quantity']
    orders[['year', 'month', 'day']] = orders['timestamp'].apply(extract_ymd)
    #process books
    books['author_set']=books['author'].apply(extract_author_set)
    books['author_set_str'] = books['author_set'].apply(lambda s: ",".join(sorted(s)))
    book_sales = orders.groupby('book_id')['quantity'].sum().reset_index().rename(columns={'quantity':'total_sold'})
    books = books.merge(book_sales, left_on='id', right_on='book_id', how='left')
    books['total_sold'] = books['total_sold'].fillna(0)
    #process users
    users=find_unique_users(users)

    books_clean=standardize_missing(books)
    orders_clean=standardize_missing(orders)
    users_clean=standardize_missing(users)
    # save cleaned data for dashboarding
    books_clean.to_csv(path + f"books_clean{start}.csv", index=False)
    orders_clean.to_csv(path + f"orders_clean{start}.csv", index=False)
    users_clean.to_csv(path + f"users_clean{start}.csv", index=False)
    print(f"Completed: {start}")
    
    return books_clean, orders_clean, users_clean
    

In [12]:
books1,orders1,users1=pipeline("DATA1/",1)
books2,orders2,users2=pipeline("DATA2/",2)
books3,orders3,users3=pipeline("DATA3/",3)

start: 1


Building hash for name+address: 100%|███████████████████████████████████████████| 3293/3293 [00:00<00:00, 14400.93it/s]
Union clusters for name+address: 100%|████████████████████████████████████████| 3214/3214 [00:00<00:00, 1463362.25it/s]
Building hash for name+phone: 100%|█████████████████████████████████████████████| 3293/3293 [00:00<00:00, 14317.05it/s]
Union clusters for name+phone: 100%|██████████████████████████████████████████| 3202/3202 [00:00<00:00, 1558565.79it/s]
Building hash for address+phone: 100%|██████████████████████████████████████████| 3293/3293 [00:00<00:00, 15489.81it/s]
Union clusters for address+phone: 100%|███████████████████████████████████████| 3207/3207 [00:00<00:00, 1604955.61it/s]


Completed: 1
start: 2


Building hash for name+address: 100%|███████████████████████████████████████████| 2810/2810 [00:00<00:00, 12917.23it/s]
Union clusters for name+address: 100%|████████████████████████████████████████| 2740/2740 [00:00<00:00, 1370589.50it/s]
Building hash for name+phone: 100%|█████████████████████████████████████████████| 2810/2810 [00:00<00:00, 15070.91it/s]
Union clusters for name+phone: 100%|██████████████████████████████████████████| 2743/2743 [00:00<00:00, 1374549.09it/s]
Building hash for address+phone: 100%|██████████████████████████████████████████| 2810/2810 [00:00<00:00, 14777.27it/s]
Union clusters for address+phone: 100%|███████████████████████████████████████| 2732/2732 [00:00<00:00, 1806817.81it/s]


Completed: 2
start: 3


Building hash for name+address: 100%|███████████████████████████████████████████| 3466/3466 [00:00<00:00, 14945.35it/s]
Union clusters for name+address: 100%|████████████████████████████████████████| 3380/3380 [00:00<00:00, 1126479.74it/s]
Building hash for name+phone: 100%|█████████████████████████████████████████████| 3466/3466 [00:00<00:00, 15212.70it/s]
Union clusters for name+phone: 100%|███████████████████████████████████████████| 3379/3379 [00:00<00:00, 846678.61it/s]
Building hash for address+phone: 100%|██████████████████████████████████████████| 3466/3466 [00:00<00:00, 14681.78it/s]
Union clusters for address+phone: 100%|███████████████████████████████████████| 3373/3373 [00:00<00:00, 1690653.37it/s]


Completed: 3


In [15]:

print(f"Number of unique user clusters in data1: {users1['cluster'].nunique()} out {users1.shape[0]}")
print(f"Number of unique user clusters in data2: {users2['cluster'].nunique()} out {users2.shape[0]}")
print(f"Number of unique user clusters in data3: {users3['cluster'].nunique()} out {users3.shape[0]}")

Number of unique user clusters in data1: 3115 out 3293
Number of unique user clusters in data2: 2663 out 2810
Number of unique user clusters in data3: 3290 out 3466
