In [1873]:
import os
import re
import pandas as pd
from datetime import datetime
from sqlalchemy.sql import text
from sqlalchemy.pool import NullPool
from sqlalchemy import create_engine

In [1874]:
# Directory containing the CSV files
directory = r"C:\Users\anasn\OneDrive\Desktop\My DE job\Case_Study_202309_Data\Case_Study_Data_For_Share\raw"

# Regex pattern to match filenames with the naming convention
pattern = re.compile(r'\d{6}_Orders_\d{4}_\d{2}_\d{2}_\d{2}_\d{2}_\d{2}\.csv')

# List all files in the directory matching the pattern
files = [f for f in os.listdir(directory) if pattern.match(f)]

if not files:
    raise FileNotFoundError("No files matching the pattern found in the specified directory.")

# Extract the timestamp from each filename and store along with the filename
file_timestamps = []
for file in files:
    timestamp_str = re.search(r'_(\d{4}_\d{2}_\d{2}_\d{2}_\d{2}_\d{2})\.csv', file).group(1)
    timestamp = datetime.strptime(timestamp_str, '%Y_%m_%d_%H_%M_%S')
    file_timestamps.append((file, timestamp))

# Sort the files by timestamp and select the latest one
latest_file = max(file_timestamps, key=lambda x: x[1])[0]

# Read the latest file into a pandas DataFrame
latest_file_path = os.path.join(directory, latest_file)
data = pd.read_csv(latest_file_path, sep='|',encoding='windows-1254')

year_id = int(latest_file[:4])
month_id = int(latest_file[4:6])

# Add year_id and month_id to the DataFrame
data['year_id'] = year_id
data['month_id'] = month_id

# Replace spaces with underscores in column names
data.columns = data.columns.str.replace(' ', '_').str.replace('-', '_').str.lower()

# Display the first few rows of the DataFrame
display(data)


Unnamed: 0,row_id,order_id,order_date,ship_date,ship_mode,customer_id,customer_name,segment,country,city,...,product_id,category,sub_category,product_name,sales,quantity,discount,profit,year_id,month_id
0,3779,US-2022-148768,01-12-2022,03-12-2022,Second Class,PN-18775,Parhena Norris,Home Office,United States,Miami,...,TEC-PH-10002564,Technology,Phones,OtterBox Defender Series Case - Samsung Galaxy S4,71.976,3,0.2,8.9970,2022,12
1,881,CA-2022-111689,01-12-2022,03-12-2022,Second Class,HP-14815,Harold Pawlan,Home Office,United States,New York City,...,FUR-CH-10004287,Furniture,Chairs,SAFCO Arco Folding Chair,1242.900,5,0.1,262.3900,2022,12
2,5585,CA-2022-105333,01-12-2022,05-12-2022,Standard Class,VP-21730,Victor Preis,Home Office,United States,New York City,...,OFF-ST-10002182,Office Supplies,Storage,"Iris 3-Drawer Stacking Bin, Black",83.560,4,0.0,1.6712,2022,12
3,5586,CA-2022-105333,01-12-2022,05-12-2022,Standard Class,VP-21730,Victor Preis,Home Office,United States,New York City,...,TEC-PH-10001468,Technology,Phones,Panasonic Business Telephones KX-T7736,546.060,3,0.0,163.8180,2022,12
4,5587,CA-2022-105333,01-12-2022,05-12-2022,Standard Class,VP-21730,Victor Preis,Home Office,United States,New York City,...,OFF-ST-10001809,Office Supplies,Storage,Fellowes Officeware Wire Shelving,269.490,3,0.0,5.3898,2022,12
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
472,5092,CA-2022-156720,31-12-2022,04-01-2023,Standard Class,JM-15580,Jill Matthias,Consumer,United States,Loveland,...,OFF-FA-10003472,Office Supplies,Fasteners,Bagged Rubber Bands,3.024,3,0.2,-0.6048,2022,12
473,909,CA-2022-143259,31-12-2022,04-01-2023,Standard Class,PO-18865,Patrick O'Donnell,Consumer,United States,New York City,...,OFF-BI-10003684,Office Supplies,Binders,Wilson Jones Legal Size Ring Binders,52.776,3,0.2,19.7910,2022,12
474,908,CA-2022-143259,31-12-2022,04-01-2023,Standard Class,PO-18865,Patrick O'Donnell,Consumer,United States,New York City,...,TEC-PH-10004774,Technology,Phones,Gear Head AU3700S Headset,90.930,7,0.0,2.7279,2022,12
475,1297,CA-2022-115427,31-12-2022,04-01-2023,Standard Class,EB-13975,Erica Bern,Corporate,United States,Fairfield,...,OFF-BI-10002103,Office Supplies,Binders,"Cardinal Slant-D Ring Binder, Heavy Gauge Vinyl",13.904,2,0.2,4.5188,2022,12


In [1875]:
def convert_date_format(df, column_name):
    # Define a list of date formats to try
    date_formats = ['%d-%m-%Y', '%d/%m/%Y', '%m/%d/%Y']
    
    def parse_date(date_str):
        for fmt in date_formats:
            try:
                return pd.to_datetime(date_str, format=fmt)
            except ValueError:
                continue
        return pd.NaT  # Return NaT if no formats match
    
    # Apply the parse_date function to the specified column
    df[column_name] = df[column_name].apply(parse_date)
    
    # Format the datetime column to the desired format 'dd-mm-yyyy'
    df[column_name] = df[column_name].dt.strftime('%Y-%m-%d')
    
    return df

In [1885]:
# if year_id = 2020 and month_id = 11:
#     data = data[data['product_id'] != 'FUR-FU-10004017']  # Enable only for 202011

# if year_id = 2021 and month_id = 09:
#     data = data[data['product_id'] != 'FUR-CH-10001146']  # Enable only for 202109

# if year_id = 2022 and month_id = 07::
#     data = data[data['product_id'] != 'FUR-CH-10001146']  # Enable only for 202207

# if year_id = 2022 and month_id = 12:
#     data = data[data['product_id'] != 'FUR-FU-10001473']  # Enable only for 202212

In [1877]:
# Data cleaning and transformation
data['customer_name'] = data['customer_name'].str.strip()
data['product_name'] = data['product_name'].str.strip()

# Omitting rows from the data where quantity is negative. Will include them in the inconsistency report.
data = data[data['quantity'] > -1]

#Postal code consistency formatting
data['postal_code'] = data['postal_code'].astype(str).str.zfill(5)

#Date column generelisation to yyyy-mm-dd
convert_date_format(data, 'order_date')
convert_date_format(data, 'ship_date')

# Select distinct values for dimension tables
df_customers = data[['customer_id', 'customer_name', 'segment','year_id','month_id']].drop_duplicates()
df_products = data[['product_id', 'category', 'sub_category', 'product_name','year_id','month_id']].drop_duplicates()
df_geography = data[['country', 'city', 'state', 'postal_code', 'region','year_id','month_id']].drop_duplicates()

# data.head()

In [1878]:
def insert_data_to_postgres(data, year_id, month_id, tbl_name):
    engine = create_engine("postgresql://postgres:pakistan@localhost/postgres", poolclass=NullPool, isolation_level="AUTOCOMMIT")
    with engine.begin() as conn:
        try:
            # Delete existing data for the given year_id and month_id
            delete_sql = text(f"DELETE FROM {tbl_name} WHERE year_id= :year_id AND month_id= :month_id")
            result = conn.execute(delete_sql, {'year_id' : year_id, 'month_id' : month_id})
            
            # Insert new data
            data.to_sql(tbl_name, engine, if_exists='append', index=False, method='multi')
            conn.close()
        except Exception as e:
            print(f"Error occurred: {str(e)}")

In [1879]:
insert_data_to_postgres(data, year_id, month_id, 'f_orders_retail')

In [1880]:
engine = create_engine("postgresql://postgres:pakistan@localhost/postgres", poolclass=NullPool, isolation_level="AUTOCOMMIT")
temp_table_name = 'temp_dim'

In [1881]:
# Populating dim_products table
df_products.to_sql(temp_table_name, engine, if_exists='replace', index=False)

# Perform the upsert operation for dim_products
upsert_query = f"""
INSERT INTO dim_products (product_id, category, sub_category, product_name, year_id, month_id)
SELECT product_id, category, sub_category, product_name, year_id, month_id FROM {temp_table_name}
ON CONFLICT (product_id)
DO UPDATE SET
    category = EXCLUDED.category,
    sub_category = EXCLUDED.sub_category,
    product_name = EXCLUDED.product_name,
    year_id = EXCLUDED.year_id,
    month_id = EXCLUDED.month_id;
"""

# Execute the upsert query using the text method
with engine.begin() as connection:
    connection.execute(text(upsert_query))

In [1882]:
# Populating dim_customer table
df_customers.to_sql(temp_table_name, engine, if_exists='replace', index=False)

upsert_query = f"""
INSERT INTO dim_customers (customer_id, customer_name, segment, year_id, month_id)
SELECT customer_id, customer_name, segment, year_id, month_id FROM {temp_table_name}
ON CONFLICT (customer_id)
DO UPDATE SET
    customer_name = EXCLUDED.customer_name,
    segment = EXCLUDED.segment,
    year_id = EXCLUDED.year_id,
    month_id = EXCLUDED.month_id;
"""

# Execute the upsert query
with engine.begin() as connection:
    connection.execute(text(upsert_query))

In [1883]:
# Populating dim_geographies table
df_geography.to_sql(temp_table_name, engine, if_exists='replace', index=False)

# Perform the upsert operation for dim_geographies
upsert_query = f"""
INSERT INTO dim_geographies (country, city, state, postal_code, region, year_id, month_id)
SELECT country, city, state, postal_code, region, year_id, month_id FROM {temp_table_name}
ON CONFLICT (postal_code, city)
DO UPDATE SET
    country = EXCLUDED.country,
    state = EXCLUDED.state,
    region = EXCLUDED.region,
    year_id = EXCLUDED.year_id,
    month_id = EXCLUDED.month_id;    
"""

# Execute the upsert query using the text method
with engine.begin() as connection:
    connection.execute(text(upsert_query))

In [1884]:
# Drop the temporary table
drop_query = f"DROP TABLE {temp_table_name};"
with engine.connect() as connection:
    connection.execute(text(drop_query))