In [1]:
import pymysql
import pandas as pd
from sklearn.impute import SimpleImputer
from pymysql import Error, IntegrityError, MySQLError

In [3]:
df1 = pd.read_csv('store_sales_1.csv')
df2 = pd.read_csv('store_sales_2.csv')
df3 = pd.read_csv('store_sales_3.csv')

In [5]:
#Showing info for each df
for i in df1, df2, df3:
    print(i.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 7 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   ProductName   100 non-null    object 
 1   Qty           79 non-null     float64
 2   Unit_Price    78 non-null     float64
 3   SaleDate      100 non-null    object 
 4   CurrencyType  78 non-null     object 
 5   CustomerID    90 non-null     object 
 6   StoreID       100 non-null    object 
dtypes: float64(2), object(5)
memory usage: 5.6+ KB
None
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 7 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   ProductName   100 non-null    object 
 1   Qty           66 non-null     float64
 2   Unit_Price    75 non-null     float64
 3   SaleDate      100 non-null    object 
 4   CurrencyType  84 non-null     object 
 5   CustomerID    88 non-null     object 
 6   S

In [7]:
#Showing if there is nulls in each df
for i in df1, df2, df3:
    print(i.isnull().sum())

ProductName      0
Qty             21
Unit_Price      22
SaleDate         0
CurrencyType    22
CustomerID      10
StoreID          0
dtype: int64
ProductName      0
Qty             34
Unit_Price      25
SaleDate         0
CurrencyType    16
CustomerID      12
StoreID          0
dtype: int64
ProductName      0
Qty             18
Unit_Price      20
SaleDate         0
CurrencyType    16
CustomerID       9
StoreID          0
dtype: int64


In [9]:
#Showing columns for each df
for i in df1, df2, df3:
    print(i.columns)

Index(['ProductName', 'Qty', 'Unit_Price', 'SaleDate', 'CurrencyType',
       'CustomerID', 'StoreID'],
      dtype='object')
Index(['ProductName', 'Qty', 'Unit_Price', 'SaleDate', 'CurrencyType',
       'CustomerID', 'StoreID'],
      dtype='object')
Index(['ProductName', 'Qty', 'Unit_Price', 'SaleDate', 'CurrencyType',
       'CustomerID', 'StoreID'],
      dtype='object')


In [11]:
#Showing describe for each df
for i in df1, df2, df3:
    print(i.describe())

             Qty  Unit_Price
count  79.000000   78.000000
mean    2.164557   19.416667
std     0.807503    7.428847
min     1.000000   10.500000
25%     1.500000   15.750000
50%     2.000000   17.875000
75%     3.000000   30.000000
max     3.000000   30.000000
             Qty  Unit_Price
count  66.000000   75.000000
mean    2.090909   19.096667
std     0.817638    7.282533
min     1.000000   10.500000
25%     1.000000   15.750000
50%     2.000000   15.750000
75%     3.000000   30.000000
max     3.000000   30.000000
             Qty  Unit_Price
count  82.000000   80.000000
mean    2.024390   18.468750
std     0.845841    7.111496
min     1.000000   10.500000
25%     1.000000   10.500000
50%     2.000000   15.750000
75%     3.000000   20.000000
max     3.000000   30.000000


In [13]:
#Functions to perform data transformation
def clean_store_sales_data(df):
    
    #Make a copy to avoid modifying the original
    df_clean = df.copy()
    
    df_clean = handle_missing_values(df_clean)
    
    df_clean = convert_data_types(df_clean)
    
    df_clean['Total_Price'] = df_clean['Qty'] * df_clean['Unit_Price']
    
    df_clean = standardize_currency(df_clean)
    
    df_clean['StoreID'] = df_clean['StoreID'].str.replace('store-A', 'Store_A').str.replace('STORE_A', 'Store_A')
    
    return df_clean

def handle_missing_values(df):
    
    #Fill CustomerID with most frequent
    imputer_customer = SimpleImputer(strategy='most_frequent')
    df['CustomerID'] = imputer_customer.fit_transform(df['CustomerID'].values.reshape(-1, 1)).ravel()
    
    #Fill Qty with mean
    imputer_qty = SimpleImputer(strategy='mean')
    df['Qty'] = imputer_qty.fit_transform(df[['Qty']])
    
    #Fill Unit_Price with mean
    imputer_price = SimpleImputer(strategy='mean')
    df['Unit_Price'] = imputer_price.fit_transform(df[['Unit_Price']])
    
    #Fill CurrencyType with most frequent
    imputer_currency = SimpleImputer(strategy='most_frequent')
    df['CurrencyType'] = imputer_currency.fit_transform(df['CurrencyType'].values.reshape(-1, 1)).ravel()
    
    return df

def convert_data_types(df):
    
    #Convert Qty to int64
    df['Qty'] = df['Qty'].astype('int64')
    #Convert SaleDate to datetime64
    df['SaleDate'] = df['SaleDate'].astype('datetime64[ns]')
    
    return df

def standardize_currency(df):
    
    # Convert USD prices to OMR (1 USD = 0.38 OMR)
    valid_currencies = ['usd', 'USD', 'Usd']
    if df['CurrencyType'].isin(valid_currencies).any():
        df.loc[df['CurrencyType'].isin(valid_currencies), 'Unit_Price'] *= 0.38
    
    #Standardize currency type naming to OMR
    df['CurrencyType'] = df['CurrencyType'].str.replace('usd', 'OMR').str.replace('Usd', 'OMR').str.replace('USD', 'OMR')
    
    return df

# Example usage:
df1_clean = clean_store_sales_data(df1)
df2_clean = clean_store_sales_data(df2)
df3_clean = clean_store_sales_data(df3)

In [15]:
#Stack vertically (add rows) combining all dfs' to one df
combined = pd.concat([df1_clean, df2_clean, df3_clean], axis=0)
combined.info()

<class 'pandas.core.frame.DataFrame'>
Index: 300 entries, 0 to 99
Data columns (total 8 columns):
 #   Column        Non-Null Count  Dtype         
---  ------        --------------  -----         
 0   ProductName   300 non-null    object        
 1   Qty           300 non-null    int64         
 2   Unit_Price    300 non-null    float64       
 3   SaleDate      300 non-null    datetime64[ns]
 4   CurrencyType  300 non-null    object        
 5   CustomerID    300 non-null    object        
 6   StoreID       300 non-null    object        
 7   Total_Price   300 non-null    float64       
dtypes: datetime64[ns](1), float64(2), int64(1), object(4)
memory usage: 21.1+ KB


In [17]:
#Create product df
product_df = combined[['ProductName']].drop_duplicates().reset_index(drop=True)
product_df['ProductID'] = product_df.index + 1

#Merge ProductID back to the main df
combined = combined.merge(product_df, on='ProductName', how='left')

#Create SaleID (primary key for fact table)
combined['SaleID'] = range(1, len(combined) + 1)

#Create Customer df 
customer_df = combined[['CustomerID']].drop_duplicates().reset_index(drop=True)

#Create Store df
store_df = combined[['StoreID']].drop_duplicates().reset_index(drop=True)

#Create Sale df
sale_df = combined[['SaleID', 'ProductID', 'CustomerID', 'StoreID', 
                    'Qty', 'Unit_Price', 'SaleDate', 'CurrencyType']]

In [19]:
try:
    #Establishing the database connection
    connection = pymysql.connect(
        host='127.0.0.1',
        port=3306,
        user='root',
        password='',
        charset='utf8mb4',
        cursorclass=pymysql.cursors.DictCursor
    )

    with connection.cursor() as cursor:
        #Database setup
        cursor.execute("DROP DATABASE IF EXISTS sales10")
        cursor.execute("CREATE DATABASE sales10 CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci")
        cursor.execute("USE sales10")

        #Creating tables
        connection.begin()
        
        #Create product table
        cursor.execute("""
            CREATE TABLE product (
                ProductID INT PRIMARY KEY,
                ProductName VARCHAR(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci
            )
        """)
        
        #Create customer table
        cursor.execute("""
            CREATE TABLE customer (
                CustomerID VARCHAR(255) PRIMARY KEY
            ) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci
        """)
        
        #Create store table
        cursor.execute("""
            CREATE TABLE store (
                StoreID VARCHAR(255) PRIMARY KEY
            ) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci
        """)
        
        #Create sale table with foreign keys
        cursor.execute("""
            CREATE TABLE sale (
                SaleID INT PRIMARY KEY,
                ProductID INT NOT NULL,
                CustomerID VARCHAR(255) NOT NULL,
                StoreID VARCHAR(255) NOT NULL,
                Qty INT,
                Unit_Price DECIMAL(10,2),
                SaleDate DATETIME,
                CurrencyType VARCHAR(10),
                FOREIGN KEY (ProductID) REFERENCES product(ProductID),
                FOREIGN KEY (CustomerID) REFERENCES customer(CustomerID),
                FOREIGN KEY (StoreID) REFERENCES store(StoreID)
            ) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci
        """)
        
        connection.commit()
        print("Tables created successfully")

        #Insert products
        print("Inserting products...")
        product_insert = "INSERT INTO product (ProductID, ProductName) VALUES (%s, %s)"
        for _, row in product_df.iterrows():
            try:
                cursor.execute(product_insert, (int(row.ProductID), row.ProductName))
            except IntegrityError:
                connection.rollback()
                print(f"Duplicate product skipped: ID {row.ProductID}, Name {row.ProductName}")
                continue
        
        #Insert customers
        print("Inserting customers...")
        customer_insert = "INSERT INTO customer (CustomerID) VALUES (%s)"
        for _, row in customer_df.iterrows():
            try:
                cursor.execute(customer_insert, (row.CustomerID,))
            except IntegrityError:
                connection.rollback()
                print(f"Duplicate customer skipped: {row.CustomerID}")
                continue
        
        #Insert stores
        print("Inserting stores...")
        store_insert = "INSERT INTO store (StoreID) VALUES (%s)"
        for _, row in store_df.iterrows():
            try:
                cursor.execute(store_insert, (row.StoreID,))
            except IntegrityError:
                connection.rollback()
                print(f"Duplicate store skipped: {row.StoreID}")
                continue
        
        #Insert sales
        print("Inserting sales...")
        sale_insert = """
            INSERT INTO sale (
                SaleID, ProductID, CustomerID, StoreID, 
                Qty, Unit_Price, SaleDate, CurrencyType
            ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
        """
        for _, row in sale_df.iterrows():
            try:
                cursor.execute(sale_insert, (
                    int(row.SaleID),
                    int(row.ProductID),
                    row.CustomerID,
                    row.StoreID,
                    int(row.Qty) if pd.notna(row.Qty) else None,
                    float(row.Unit_Price) if pd.notna(row.Unit_Price) else None,
                    row.SaleDate.strftime('%Y-%m-%d %H:%M:%S') if pd.notnull(row.SaleDate) else None,
                    row.CurrencyType
                ))
            except MySQLError as e:
                connection.rollback()
                print(f"Failed to insert sale {row.SaleID}: {str(e)}")
                continue
        
        connection.commit()
        print("All data inserted successfully!")

except MySQLError as e:
    print(f"MySQL Error occurred: {str(e)}")
    if 'connection' in locals() and connection.open:
        connection.rollback()
except Exception as e:
    print(f"Unexpected error occurred: {str(e)}")
finally:
    if 'connection' in locals() and connection.open:
        connection.close()
        print("Database connection closed")

Tables created successfully
Inserting products...
Inserting customers...
Inserting stores...
Inserting sales...
All data inserted successfully!
Database connection closed
