## Import Library

In [1]:
import pandas as pd 
import numpy as np

## Load dataset

In [2]:
data = pd.read_csv("data_sales.csv")
data.head()

Unnamed: 0,Date,Store Number,Store Name,City,Category Name,Vendor Number,Vendor Name,Bottles Sold
0,01/03/2017,5230,POINT LIQUOR & TOBACCO,,CANADIAN WHISKIES,260.0,DIAGEO AMERICAS,3
1,01/03/2017,4482,INDY 66 WEST #929 / INDIANOLA,,AMERICAN VODKAS,434.0,LUXCO INC,6
2,01/03/2017,5359,FAREWAY STORES # 168/ PEOSTA,PEOSTA,AMERICAN VODKAS,434.0,LUXCO INC,12
3,01/03/2017,5242,KUM & GO #502 / IOWA CITY,,AMERICAN VODKAS,260.0,DIAGEO AMERICAS,1
4,01/03/2017,4482,INDY 66 WEST #929 / INDIANOLA,,COCKTAILS/RTD,395.0,PROXIMO,6


## Data Quality Assessment


### About data

In [3]:
data.shape

(17468378, 8)

In [4]:
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 17468378 entries, 0 to 17468377
Data columns (total 8 columns):
 #   Column         Dtype  
---  ------         -----  
 0   Date           object 
 1   Store Number   int64  
 2   Store Name     object 
 3   City           object 
 4   Category Name  object 
 5   Vendor Number  float64
 6   Vendor Name    object 
 7   Bottles Sold   int64  
dtypes: float64(1), int64(2), object(5)
memory usage: 1.0+ GB


In [5]:
data.nunique()

Date             1937
Store Number     2636
Store Name       2786
City              476
Category Name      52
Vendor Number     399
Vendor Name       435
Bottles Sold      688
dtype: int64

### Preprocessing Store Name

In [6]:
def split_store_name(store_name):
    if pd.isnull(store_name):
        return None, None, None

    if '#' in store_name and '/' in store_name:
        try:
            name_part, city_part = store_name.split('#', 1)
            city_code, city_name = city_part.split('/', 1)
            return name_part.strip(), city_code.strip(), city_name.strip()
        except ValueError:
            return store_name.strip(), None, None
    elif '/' in store_name:
        try:
            name_part, city_name = store_name.split('/', 1)
            return name_part.strip(), None, city_name.strip()
        except ValueError:
            return store_name.strip(), None, None
    elif '#' in store_name:
        try:
            name_part, city_code = store_name.split('#', 1)
            return name_part.strip(), city_code.strip(), None
        except ValueError:
            return store_name.strip(), None, None
    else:
        return store_name.strip(), None, None


In [7]:
extracted = data['Store Name'].apply(split_store_name)
extracted_df = pd.DataFrame(extracted.tolist(), columns=["Store Name", "City Code", "City Name"])

data["Store Name"] = extracted_df["Store Name"]
data["City Code"] = extracted_df["City Code"]
data["City Name"] = extracted_df["City Name"]

data

Unnamed: 0,Date,Store Number,Store Name,City,Category Name,Vendor Number,Vendor Name,Bottles Sold,City Code,City Name
0,01/03/2017,5230,POINT LIQUOR & TOBACCO,,CANADIAN WHISKIES,260.0,DIAGEO AMERICAS,3,,
1,01/03/2017,4482,INDY 66 WEST,,AMERICAN VODKAS,434.0,LUXCO INC,6,929,INDIANOLA
2,01/03/2017,5359,FAREWAY STORES,PEOSTA,AMERICAN VODKAS,434.0,LUXCO INC,12,168,PEOSTA
3,01/03/2017,5242,KUM & GO,,AMERICAN VODKAS,260.0,DIAGEO AMERICAS,1,502,IOWA CITY
4,01/03/2017,4482,INDY 66 WEST,,COCKTAILS/RTD,395.0,PROXIMO,6,929,INDIANOLA
...,...,...,...,...,...,...,...,...,...,...
17468373,12/30/2023,4273,SMOKIN' JOE'S,DES MOINES,FLAVORED RUM,370.0,PERNOD RICARD USA,1,4 TOBACCO AND LIQUOR OUTLET,
17468374,12/30/2023,10037,WHISKEY WOLF LIQUOR,GLENWOOD,AMERICAN VODKAS,434.0,LUXCO INC,6,,GLENWOOD
17468375,12/30/2023,2591,HY-VEE WINE AND SPIRITS,ATLANTIC,CANADIAN WHISKIES,259.0,HEAVEN HILL BRANDS,12,,ATLANTIC
17468376,12/30/2023,2606,HY-VEE WINE AND SPIRITS,HUMBOLDT,WHISKEY LIQUEUR,421.0,SAZERAC COMPANY INC,24,,HUMBOLDT


In [8]:
data.shape

(17468378, 10)

### Check data

In [9]:
data.nunique()

Date             1937
Store Number     2636
Store Name       1296
City              476
Category Name      52
Vendor Number     399
Vendor Name       435
Bottles Sold      688
City Code        1051
City Name         525
dtype: int64

In [10]:
vendor_check = data.groupby('Vendor Name')['Vendor Number'].nunique().reset_index(name='num_vendor_number')
vendor_check = vendor_check[vendor_check['num_vendor_number'] > 1]
vendor_check

Unnamed: 0,Vendor Name,num_vendor_number
219,LEVECKE CORPORATION,2
325,ROYAL WINE CORPORATION,2


In [11]:
city_check = data.groupby('City')['City Code'].nunique().reset_index(name='num_city_code')
city_check = city_check[city_check['num_city_code'] > 1]
city_check

Unnamed: 0,City,num_city_code
0,ACKLEY,2
1,ADAIR,2
2,ADEL,4
5,AKRON,2
7,ALBIA,2
...,...,...
455,WEST BRANCH,2
457,WEST DES MOINES,34
466,WILLIAMSBURG,4
468,WINDSOR HEIGHTS,3


In [12]:
city_store_check = data.groupby('Store Number')['City'].nunique().reset_index(name='num_city')
city_store_check = city_store_check[city_store_check['num_city'] > 1]
city_store_check

Unnamed: 0,Store Number,num_city
48,2543,2
95,2604,2
234,3456,2
317,3742,2
346,3822,2
833,4668,2
1343,5300,2
1657,5619,2
1857,5821,2
2379,9911,2


- Vendor: Lỗi một vendor_number có nhiều hơn 1 vendor_name => drop vendor number 
- City: Lỗi trong quá trình extract => drop column City Name và City Code
- Store: Lỗi ở store_number => drop Store Number

### Drop data

In [None]:
def drop(data):
    data = data.drop(columns= {'Vendor Number','City Name','City Code','Store Number'})
    return data 


In [14]:
data = drop(data)
data.head()

Unnamed: 0,Date,Store Name,Category Name,Vendor Name,Bottles Sold,City Name
0,01/03/2017,POINT LIQUOR & TOBACCO,CANADIAN WHISKIES,DIAGEO AMERICAS,3,
1,01/03/2017,INDY 66 WEST,AMERICAN VODKAS,LUXCO INC,6,INDIANOLA
2,01/03/2017,FAREWAY STORES,AMERICAN VODKAS,LUXCO INC,12,PEOSTA
3,01/03/2017,KUM & GO,AMERICAN VODKAS,DIAGEO AMERICAS,1,IOWA CITY
4,01/03/2017,INDY 66 WEST,COCKTAILS/RTD,PROXIMO,6,INDIANOLA


In [15]:
data.shape

(17468378, 6)

### Normalize name

In [None]:
def normalize_name(data):
    data = data.rename(columns={
        "Date": "date",
        "Store Name": "store_name",
        "City":"city_name",
        "Category Name": "category_name",
        "Vendor Name": "vendor_name",
        "Bottles Sold": "bottles_sold"
    })
    return data 

In [17]:
data = normalize_name(data)
data.head()

Unnamed: 0,date,store_name,category_name,vendor_name,bottles_sold,city_name
0,01/03/2017,POINT LIQUOR & TOBACCO,CANADIAN WHISKIES,DIAGEO AMERICAS,3,
1,01/03/2017,INDY 66 WEST,AMERICAN VODKAS,LUXCO INC,6,INDIANOLA
2,01/03/2017,FAREWAY STORES,AMERICAN VODKAS,LUXCO INC,12,PEOSTA
3,01/03/2017,KUM & GO,AMERICAN VODKAS,DIAGEO AMERICAS,1,IOWA CITY
4,01/03/2017,INDY 66 WEST,COCKTAILS/RTD,PROXIMO,6,INDIANOLA


### Convert data types

In [18]:
def convert_dtypes(data):
    #date
    if 'date' in data.columns:
        data['date'] = pd.to_datetime(data['date'], errors='coerce')

    # numeric
    int_cols = ['bottles_sold']
    for col in int_cols:
        if col in data.columns:
            data[col] = pd.to_numeric(data[col], errors='coerce').astype('Int64')

    # category
    str_cols = ['store_name', 'city_name', 'category_name', 'vendor_name']
    for col in str_cols:
        if col in data.columns:
            data[col] = data[col].astype(str)

    return data

In [19]:
data = convert_dtypes(data)
data.dtypes

date             datetime64[ns]
store_name               object
category_name            object
vendor_name              object
bottles_sold              Int64
city_name                object
dtype: object

### Normalize tables

In [20]:
def normalize_tables(data):
    # store_dim
    df_store = data[['store_name', 'city_name']].drop_duplicates().reset_index(drop=True)
    df_store['store_id'] = df_store.index + 1

    # category_dim
    df_category = data[['category_name']].dropna().drop_duplicates().reset_index(drop=True)
    df_category['category_id'] = df_category.index + 1

    # vendor_dim
    df_vendor = data[['vendor_name','city_name']].dropna().drop_duplicates().reset_index(drop=True)
    df_vendor['vendor_id'] = df_vendor.index + 1

     # sales
    df_sales = data[['date', 'store_name', 'city_name', 'category_name', 'vendor_name', 'bottles_sold']]
    df_sales = df_sales.merge(df_store, on=['store_name', 'city_name'], how='left')
    df_sales = df_sales.merge(df_category, on='category_name', how='left')
    df_sales = df_sales.merge(df_vendor, on=['vendor_name', 'city_name'], how='left', suffixes=('', '_vendor'))
    df_sales['sale_id'] = df_sales.index + 1
    df_sales = df_sales[['sale_id', 'date', 'store_id', 'category_id', 'vendor_id', 'bottles_sold']]

    return df_store, df_category, df_vendor, df_sales


In [21]:
df_store, df_category, df_vendor, df_sales = normalize_tables(data)

In [22]:
print(df_store.head())
print(df_category.head())
print(df_vendor.head())
print(df_sales.head())

               store_name  city_name  store_id
0  POINT LIQUOR & TOBACCO       None         1
1            INDY 66 WEST  INDIANOLA         2
2          FAREWAY STORES     PEOSTA         3
3                KUM & GO  IOWA CITY         4
4           SMOKIN' JOE'S       None         5
             category_name  category_id
0        CANADIAN WHISKIES            1
1          AMERICAN VODKAS            2
2            COCKTAILS/RTD            3
3  IMPORTED FLAVORED VODKA            4
4          IMPORTED VODKAS            5
       vendor_name  city_name  vendor_id
0  DIAGEO AMERICAS       None          1
1        LUXCO INC  INDIANOLA          2
2        LUXCO INC     PEOSTA          3
3  DIAGEO AMERICAS  IOWA CITY          4
4          PROXIMO  INDIANOLA          5
   sale_id       date  store_id  category_id  vendor_id  bottles_sold
0        1 2017-01-03         1            1          1             3
1        2 2017-01-03         2            2          2             6
2        3 2017-01-03 

## Load Database

In [23]:
from sqlalchemy import create_engine
from sqlalchemy import text 

### Connect database

In [24]:
def connect_postgres(user, password, host, port, dbname):
    engine = create_engine(f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{dbname}")
    return engine

In [25]:
engine = connect_postgres(
        user='postgres',
        password='phung',
        host='localhost',
        port=5432,
        dbname='DWH'
    )

### Create scheme

In [26]:
def create_schema_if_not_exists(engine, schema_name):
    with engine.begin() as conn:
        conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}"))
        print(f"Schema '{schema_name}' created or already exists.")

In [27]:
create_schema_if_not_exists(engine, 'sales_data')

Schema 'sales_data' created or already exists.


### Write data to database

In [28]:
def write_to_db(df, table_name, engine, schema, if_exists='replace', chunksize=10000):
    df.to_sql(table_name, engine, schema=schema, if_exists=if_exists, index=False, chunksize=chunksize)
    print(f"Load '{table_name}' Done.")


In [29]:
write_to_db(df_store, "store_dim", engine,'sales_data')
write_to_db(df_category, "category_dim", engine,'sales_data')
write_to_db(df_vendor, "vendor_dim", engine,'sales_data')
write_to_db(df_sales, "sales", engine,'sales_data')

Load 'store_dim' Done.
Load 'category_dim' Done.
Load 'vendor_dim' Done.
Load 'sales' Done.
