In [1]:
#pip install mysql-connector-python sqlalchemy pandas
import pandas as pd
import sqlalchemy as sal

username = 'root'
server_name = 'localhost'
port = '3306'
database_name = 'superstore'

engine = sal.create_engine(f'mysql+mysqlconnector://{username}@{server_name}:{port}/{database_name}')
conn=engine.connect()

**ETL:** \
Extract \
Transform (filteration, data cleaning, joining)\
Load

In [2]:
def extract():
    df_orders=pd.read_csv('orders.txt')
    df_returns=pd.read_csv('returns.txt')
    return df_orders, df_returns
def transform(df_orders,df_returns):
    df= pd.merge(df_orders,df_returns,how='inner',left_on = 'order_id',right_on = 'Order Id')
    return df
def load(df):
    df.to_sql('orders_final',con=conn, index=False,if_exists='replace') #or if_exists='append'
    conn.commit()

In [3]:
#extract
df_orders,df_returns = extract()
#transform
df=transform(df_orders,df_returns)
#load
load(df)

In [4]:
print(type(conn))

<class 'sqlalchemy.engine.base.Connection'>


In [5]:
print(sal.__version__)

2.0.32


In [6]:
from sqlalchemy import text

df_sql = pd.read_sql_query(text('select * from orders_final'), conn)
df_sql.head()

Unnamed: 0,order_id,order_date,customer_name,city,category,product_id,sales,profit,Order Id,Return Reason
0,CA-2018-100762,2018-11-24,Nat Gilpin,Jackson,Office Supplies,OFF-AR-10000380,151.92,45.576,CA-2018-100762,Bad Quality
1,CA-2018-100867,2018-10-19,Eugene Hildebrand,Lakewood,Technology,TEC-PH-10004922,321.552,20.097,CA-2018-100867,Bad Quality
2,CA-2018-102652,2018-04-06,Andy Yotov,Los Angeles,Furniture,FUR-FU-10000747,91.96,15.6332,CA-2018-102652,Bad Quality
3,CA-2018-103373,2018-05-18,Bruce Stewart,Cleveland,Technology,TEC-PH-10002885,779.796,-168.9558,CA-2018-103373,Bad Quality
4,CA-2018-103744,2018-02-23,Michael Grace,El Paso,Office Supplies,OFF-BI-10000320,4.428,-6.8634,CA-2018-103744,Bad Quality


**SCD: Slowly Changing Dimensions**\

***SCD1= latest information only***\

***Dimension table***\
Product Dimensions or attributes\
    Product_id, Product_name, Price

Customer Dimensions or attributes\
    Customer_id, Customer_name, Customer_age

***Fact or Transaction table***\
Customer_id, Product_id, Sales


***SCD2= historical data***\
Product Dimensions or attributes\
    Product_key,Product_id, Product_name, Price(historical price), effective_date(inserted on), expire_date(till high date eg: 31-12-9999), active_flag(0\1 for latest)\
Problem: no primary key - product_id repeating. so we create a 'surrogate key' i.e product_key - 1,2,3,... by creating a sequencer

# SCD1

In [8]:
#drop table
query = sal.text("drop table products")
p = conn.execute(query)
conn.commit()

In [9]:
products = {'product_id':[1,2,3],'product_name':['hp laptop','iphone 11','iphone 12'],'price':[50000,70000,80000]}
products_db= pd.DataFrame(products)
products_db.to_csv('products.txt', index=False)

In [10]:
#load:
dfp=pd.DataFrame({'product_id':[],'product_name':[],'price':[]})
dfp.to_sql('products',con=conn, index=False,if_exists='replace') #or if_exists='append'
conn.commit()

In [13]:
def extract():
    df_products=pd.read_csv('products.txt')
    df_products_db=pd.read_sql_query(text('select * from products'),conn)
    return df_products, df_products_db
def transform(df_products, df_products_db):
    df_merge = pd.merge(df_products,df_products_db, how='left',on='product_id')
    #which records are to be inserted
    df_insert =df_merge[df_merge['product_name_y'].isna()] 
    df_insert_final= df_insert.iloc[:, 0:3]
    df_insert_final.columns=df_products_db.columns
    df_update = df_merge[df_merge['product_name_y'].notna()]
    return df_insert_final,df_update
def load(df_insert_final):
    df_insert_final.to_sql('products',con=conn, index=False,if_exists='append') #1st we do replace
    conn.commit()

In [14]:
df_products, df_products_db = extract()
df_insert_final,df_update = transform(df_products, df_products_db)
load(df_insert_final)

In [15]:
print(df_insert_final)
print('*******************')
print(df_update)

   product_id product_name  price
0           1    hp laptop  50000
1           2    iphone 11  70000
2           3    iphone 12  80000
*******************
Empty DataFrame
Columns: [product_id, product_name_x, price_x, product_name_y, price_y]
Index: []


In [16]:
df_merge = pd.merge(df_products,df_products_db, how='left',on='product_id')
print(df_merge)
print('*********')
#which records are to be inserted
df_insert =df_merge[df_merge['product_name_y'].isna()]
print(df_insert)
print('*********')

df_insert_final= df_insert.iloc[:, 0:3]
print(df_insert_final)
print('*********')

df_insert_final.columns=df_products_db.columns
print(df_insert_final)

   product_id product_name_x  price_x product_name_y price_y
0           1      hp laptop    50000            NaN     NaN
1           2      iphone 11    70000            NaN     NaN
2           3      iphone 12    80000            NaN     NaN
*********
   product_id product_name_x  price_x product_name_y price_y
0           1      hp laptop    50000            NaN     NaN
1           2      iphone 11    70000            NaN     NaN
2           3      iphone 12    80000            NaN     NaN
*********
   product_id product_name_x  price_x
0           1      hp laptop    50000
1           2      iphone 11    70000
2           3      iphone 12    80000
*********
   product_id product_name  price
0           1    hp laptop  50000
1           2    iphone 11  70000
2           3    iphone 12  80000


In [17]:
#changing hp laptop price and adding one more item
products = {'product_id':[1,4],'product_name':['hp laptop','iphone 13'],'price':[55000,75000]}
products_db= pd.DataFrame(products)
products_db.to_csv('products.txt', index=False)

In [18]:
def extract():
    df_products=pd.read_csv('products.txt')
    df_products_db=pd.read_sql_query(text('select * from products'),conn)
    return df_products, df_products_db
def transform(df_products, df_products_db):
    df_merge = pd.merge(df_products,df_products_db, how='left',on='product_id')
    #which records are to be inserted
    df_insert =df_merge[df_merge['product_name_y'].isna()] 
    df_insert_final= df_insert.iloc[:, 0:3]
    df_insert_final.columns=df_products_db.columns
    df_update = df_merge[df_merge['product_name_y'].notna()]
    df_update_final = df_update.iloc[:,0:3]
    df_update_final.columns=df_products_db.columns
    return df_insert_final,df_update_final
def load_staging(df_update_final):
    df_update_final.to_sql('products_stg',con=conn, index=False,if_exists='replace')
    conn.commit()
def updates():
    query=sal.text('UPDATE products JOIN products_stg ON products.product_id = products_stg.product_id SET products.price = products_stg.price')
    p=conn.execute(query)
    conn.commit()
def inserts(df_insert_final):
    df_insert_final.to_sql('products',con=conn, index=False,if_exists='append') #1st we do replace
    conn.commit()

In [19]:
df_products, df_products_db = extract()
df_insert_final,df_update_final= transform(df_products, df_products_db)
load_staging(df_update_final)
updates()
inserts(df_insert_final)

In [20]:
df_update_final

Unnamed: 0,product_id,product_name,price
0,1,hp laptop,55000


In [21]:
print(df_merge)
print('*******************')
print(df_insert_final)
print('*******************')
print(df_update)

   product_id product_name_x  price_x product_name_y price_y
0           1      hp laptop    50000            NaN     NaN
1           2      iphone 11    70000            NaN     NaN
2           3      iphone 12    80000            NaN     NaN
*******************
   product_id product_name  price
1           4    iphone 13  75000
*******************
Empty DataFrame
Columns: [product_id, product_name_x, price_x, product_name_y, price_y]
Index: []


In [22]:
#changing hp laptop price and adding one more item
products = {'product_id':[1,6,5],'product_name':['hp laptop','Nokia 1340','Mac Book1'],'price':[30000,25000,90000]}
products_db= pd.DataFrame(products)
products_db.to_csv('products.txt', index=False)

In [23]:
def extract():
    df_products=pd.read_csv('products.txt')
    df_products_db=pd.read_sql_query(text('select * from products'),conn)
    return df_products, df_products_db
def transform(df_products, df_products_db):
    df_merge = pd.merge(df_products,df_products_db, how='left',on='product_id')
    #which records are to be inserted
    df_insert =df_merge[df_merge['product_name_y'].isna()] 
    df_insert_final= df_insert.iloc[:, 0:3]
    df_insert_final.columns=df_products_db.columns
    df_update = df_merge[df_merge['product_name_y'].notna()]
    df_update_final = df_update.iloc[:,0:3]
    df_update_final.columns=df_products_db.columns
    return df_insert_final,df_update_final
def load_staging(df_update_final):
    df_update_final.to_sql('products_stg',con=conn, index=False,if_exists='replace')
    conn.commit()
def updates():
    query=sal.text('UPDATE products JOIN products_stg ON products.product_id = products_stg.product_id SET products.product_name=products_stg.product_name, products.price = products_stg.price')
    p=conn.execute(query)
    conn.commit()
def inserts(df_insert_final):
    df_insert_final.to_sql('products',con=conn, index=False,if_exists='append') #1st we do replace
    conn.commit()

In [24]:
df_products, df_products_db = extract()
df_insert_final,df_update_final= transform(df_products, df_products_db)
load_staging(df_update_final)
updates()
inserts(df_insert_final)

In [25]:
from sqlalchemy import text

df_sql = pd.read_sql_query(text('select * from products'), conn)
df_sql.head(10)

Unnamed: 0,product_id,product_name,price
0,1,hp laptop,30000
1,2,iphone 11,70000
2,3,iphone 12,80000
3,4,iphone 13,75000
4,6,Nokia 1340,25000
5,5,Mac Book1,90000
