In [1]:
import pymysql
import pandas as pd
from datetime import datetime
from sklearn.preprocessing import FunctionTransformer
from sklearn.pipeline import Pipeline

In [2]:
#first sales_1
df1 = pd.read_csv('store_sales_1.csv')
df2 = pd.read_csv('store_sales_2.csv')
df3 = pd.read_csv('store_sales_3.csv')

df = pd.concat([df1, df2, df3], ignore_index=True)
df

Unnamed: 0,ProductName,Qty,Unit_Price,SaleDate,CurrencyType,CustomerID,StoreID
0,Smith Paper,3.0,10.50,7/13/2024,OMR,9ca482a2-0356-49c1-b5e3-88ae98d1cc2f,Store_A
1,Johnson Screen,,,2/23/2025,Usd,c0b9df4e-8f03-4bf0-a31b-0a7d7c2a8907,Store_A
2,Roberts Ingredient,3.0,30.00,11/13/2024,USD,97dc18e3-2c12-4e26-9863-32514e82e822,Store_A
3,White Monitor,,10.50,4/16/2025,USD,e4d09733-d496-47b3-a4b5-04de84d8fd06,Store_A
4,Rodriguez Keyboard,2.0,20.00,8/3/2024,usd,435ecb46-4545-4af7-b72c-119f64d193a5,Store_A
...,...,...,...,...,...,...,...
295,Case,1.0,30.00,8/22/2024,Usd,b0552606-1fe5-420f-abf0-a86289c250ba,STORE_A
296,Black,,30.00,12/24/2024,Usd,6da0ca26-e56e-41e1-89b0-004aa6d74b3e,STORE_A
297,On,1.0,20.00,9/6/2024,,045081fa-0df3-4904-820b-067b4176fb85,STORE_A
298,Soon,2.0,,7/4/2024,,a6b5132d-2026-4400-83d9-a64ed546bd51,STORE_A


In [3]:
# Define transformation functions 

def drop_missing_customers(df):
    return df.dropna(subset=['CustomerID']).copy()

def fill_numeric(df):
    df['Qty'] = df['Qty'].fillna(df['Qty'].median())
    df['Unit_Price'] = df['Unit_Price'].fillna(df['Unit_Price'].median())
    return df

def fill_currency(df):
    df['CurrencyType'] = df['CurrencyType'].fillna('OMR')
    return df

def convert_types(df):
    df['Qty'] = df['Qty'].astype(int)
    df['SaleDate'] = pd.to_datetime(df['SaleDate'], errors='coerce', dayfirst=True)
    return df

def calc_total(df):
    df['Total_Price'] = df['Qty'] * df['Unit_Price']
    return df

def convert_currency(df):
    df['CurrencyType'] = df['CurrencyType'].str.upper()
    exchange_rates = {'USD': 0.385, 'EUR': 0.41, 'OMR': 1}
    df['rate_to_OMR'] = df['CurrencyType'].map(exchange_rates).fillna(1)
    df['Unit_Price'] = (df['Unit_Price'] * df['rate_to_OMR']).round(3)
    df['Total_Price'] = (df['Total_Price'] * df['rate_to_OMR']).round(3)
    df = df.drop(columns=['rate_to_OMR'])
    return df

# Build pipeline
pipeline = Pipeline([
    ("drop_customers", FunctionTransformer(drop_missing_customers)),
    ("fill_numeric", FunctionTransformer(fill_numeric)),
    ("fill_currency", FunctionTransformer(fill_currency)),
    ("convert_types", FunctionTransformer(convert_types)),
    ("calc_total", FunctionTransformer(calc_total)),
    ("convert_currency", FunctionTransformer(convert_currency)),
])

# Transform data
df_transformed = pipeline.fit_transform(df)

print("Transformation Complete!")
print(df_transformed.head())

Transformation Complete!
          ProductName  Qty  Unit_Price   SaleDate CurrencyType  \
0         Smith Paper    3      10.500 2024-07-13          OMR   
1      Johnson Screen    2       6.064 2025-02-23          USD   
2  Roberts Ingredient    3      11.550 2024-11-13          USD   
3       White Monitor    2       4.043 2025-04-16          USD   
4  Rodriguez Keyboard    2       7.700 2024-08-03          USD   

                             CustomerID  StoreID  Total_Price  
0  9ca482a2-0356-49c1-b5e3-88ae98d1cc2f  Store_A       31.500  
1  c0b9df4e-8f03-4bf0-a31b-0a7d7c2a8907  Store_A       12.128  
2  97dc18e3-2c12-4e26-9863-32514e82e822  Store_A       34.650  
3  e4d09733-d496-47b3-a4b5-04de84d8fd06  Store_A        8.085  
4  435ecb46-4545-4af7-b72c-119f64d193a5  Store_A       15.400  


  df['SaleDate'] = pd.to_datetime(df['SaleDate'], errors='coerce', dayfirst=True)


In [5]:
df_transformed.isnull().sum()

ProductName     0
Qty             0
Unit_Price      0
SaleDate        0
CurrencyType    0
CustomerID      0
StoreID         0
Total_Price     0
dtype: int64

In [9]:
# 3. LOAD → MySQL (4 tables)

connection = pymysql.connect(
    host="localhost",
    user="root",
    password="Oman99690050#",
    charset="utf8mb4",
    autocommit=True
)
cursor = connection.cursor()

cursor.execute("CREATE DATABASE IF NOT EXISTS store1;")
cursor.execute("USE store1;")


# CREATE TABLES 
cursor.execute("""
CREATE TABLE IF NOT EXISTS Product (
    ProductID INT AUTO_INCREMENT PRIMARY KEY,
    ProductName VARCHAR(255) UNIQUE
)
""")

cursor.execute("""
CREATE TABLE IF NOT EXISTS Customer (
    CustomerID VARCHAR(50) PRIMARY KEY
)
""")

cursor.execute("""
CREATE TABLE IF NOT EXISTS Store (
    StoreID VARCHAR(50) PRIMARY KEY
)
""")

cursor.execute("""
CREATE TABLE IF NOT EXISTS Sales (
    SaleID INT AUTO_INCREMENT PRIMARY KEY,
    SaleDate DATETIME,
    StoreID VARCHAR(50),
    ProductID INT,
    CustomerID VARCHAR(50),
    Qty INT,
    Unit_Price FLOAT,
    CurrencyType VARCHAR(10),
    Total_Price FLOAT,
    FOREIGN KEY (StoreID) REFERENCES Store(StoreID),
    FOREIGN KEY (CustomerID) REFERENCES Customer(CustomerID),
    FOREIGN KEY (ProductID) REFERENCES Product(ProductID)
)
""")

connection.commit()

# ---- INSERT DATA INTO Product, Customer, Store ----
# Products
product_names = df_transformed["ProductName"].unique()
for name in product_names:
    cursor.execute("""
        INSERT IGNORE INTO Product (ProductName)
        VALUES (%s)
    """, (name,))

# Customers
customer_ids = df_transformed["CustomerID"].unique()
for cid in customer_ids:
    cursor.execute("""
        INSERT IGNORE INTO Customer (CustomerID)
        VALUES (%s)
    """, (cid,))

# Stores
store_ids = df_transformed["StoreID"].unique()
for sid in store_ids:
    cursor.execute("""
        INSERT IGNORE INTO Store (StoreID)
        VALUES (%s)
    """, (sid,))

connection.commit()

# ---- INSERT DATA INTO Sales ----
for _, row in df_transformed.iterrows():
    # Get ProductID
    cursor.execute("SELECT ProductID FROM Product WHERE ProductName=%s", (row.ProductName,))
    pid = cursor.fetchone()[0]

cursor.execute("""
    INSERT INTO Sales
    (SaleDate, StoreID, ProductID, CustomerID, Qty, Unit_Price, CurrencyType, Total_Price)
    VALUES (%s,%s,%s,%s,%s,%s,%s,%s)
""", (
    row.SaleDate, row.StoreID, pid, row.CustomerID,
    row.Qty, row.Unit_Price, row.CurrencyType,
    row.Total_Price
))

connection.commit()
connection.close()
print("ok")

ok
