In [8]:
import pandas as pd
from sqlalchemy import create_engine, text

In [5]:
def engine_connection(user, password, host, port,db):
    engine = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{db}')
    connection= engine.connect()
    return engine, connection


In [7]:
try:
     
        engine, connection =engine_connection('postgres', 'postgres','localhost', 5432,'retaildwh')
        print("Connection Done (: ")
except Exception as e:
        print("Got ERROR in Connection for Writing...", e)

Connection Done (: 


In [24]:
try:    
        query = '''
            SELECT * 
            FROM retail_cleaned
        '''
        cleaned_data = pd.read_sql(text(query), con=connection, index_col='Id')
        print("The Cleaned Data Loaded Successfully")
except Exception as e:
        print("Got ERROR in your query or index", e)

The Cleaned Data Loaded Successfully


In [20]:

def DQ_Dimcustomer(cleaned_df, db_conn):
    DimCust = cleaned_df[['customerid', 'country']].copy()
    DimCust['name'] = 'Customer' + DimCust['customerid'].astype(str)
    DimCust= DimCust.reset_index(drop=True)
    DimCust= DimCust.drop_duplicates('customerid').set_index('customerid')
    print(DimCust)
    DimCust.head(n=0).to_sql(name='dq_dimcustomer', con=db_conn, if_exists='replace', index=True, index_label='customerid')
    db_conn.commit()


In [21]:
DQ_Dimcustomer(cleaned_data,connection)

                   country           name
customerid                               
13047       United Kingdom  Customer13047
12583              Belguim  Customer12583
13748       United Kingdom  Customer13748


In [22]:
query="""

    WITH dq_dimcustomer_columns AS (
        SELECT column_name, data_type
        FROM information_schema.columns
        WHERE table_name = 'dq_dimcustomer'
    ),

    dimcustomer_columns AS (
        SELECT column_name, data_type
        FROM information_schema.columns
        WHERE table_name = 'dimcustomer'
    )

    SELECT  *
    FROM dq_dimcustomer_columns AS dq_dimcustomer
    INNER JOIN dimcustomer_columns AS dimcustomer
    ON dq_dimcustomer.column_name = dimcustomer.column_name
    AND dq_dimcustomer.data_type = dimcustomer.data_type;

    """
df = pd.read_sql(text(query), connection)

df

Unnamed: 0,column_name,data_type,column_name.1,data_type.1
0,customerid,bigint,customerid,bigint
1,country,text,country,text
2,name,text,name,text


In [23]:
if len(df) == 3:
    print("Data Quality Check: All columns and data types match for the DimCustomer.")
    print(df)
else:
    print("Data Quality Check: Detected data quality issues in the DimCustomer.")


Data Quality Check: All columns and data types match for the DimCustomer.
  column_name data_type column_name data_type
0  customerid    bigint  customerid    bigint
1     country      text     country      text
2        name      text        name      text


In [53]:
df

<bound method DataFrame.count of   column_name data_type column_name data_type
0  CustomerID    bigint  CustomerID    bigint
1     Country      text     Country      text
2        name      text        name      text>

In [43]:
df= pd.read_csv(r'C:\Users\Mohammed\Desktop\retail-DWH\data\online_retail.csv')
df['InvoiceDate']= pd.to_datetime(df['InvoiceDate'])

In [44]:
df['CustomerID'] = df['CustomerID'].astype(int)

IntCastingNaNError: Cannot convert non-finite values (NA or inf) to integer

In [37]:
df['MonthNo']= df['InvoiceDate'].dt.strftime('%m')
df['Day']= df['InvoiceDate'].dt.strftime('%d')


In [38]:
df.head()

Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country,MonthNo,Day
0,536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01 08:26:00,2.55,17850.0,United Kingdom,12,1
1,536365,71053,WHITE METAL LANTERN,6,2010-12-01 08:26:00,3.39,17850.0,United Kingdom,12,1
2,536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,2010-12-01 08:26:00,2.75,17850.0,United Kingdom,12,1
3,536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,2010-12-01 08:26:00,3.39,17850.0,United Kingdom,12,1
4,536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,2010-12-01 08:26:00,3.39,17850.0,United Kingdom,12,1


In [39]:
df.dtypes

InvoiceNo              object
StockCode              object
Description            object
Quantity                int64
InvoiceDate    datetime64[ns]
UnitPrice             float64
CustomerID            float64
Country                object
MonthNo                object
Day                    object
dtype: object

In [35]:
df= df.drop_duplicates()

In [17]:
df.shape

(536641, 10)

In [27]:
df[['saleskey']].head()

Unnamed: 0,saleskey
0,0 17850.0\n1 17850.0\n2 ...
1,0 17850.0\n1 17850.0\n2 ...
2,0 17850.0\n1 17850.0\n2 ...
3,0 17850.0\n1 17850.0\n2 ...
4,0 17850.0\n1 17850.0\n2 ...


In [45]:

df['saleskey'] = df['CustomerID'].astype(str) + df['StockCode'] + df['MonthNo'] + df['Day']

df

KeyError: 'MonthNo'

In [35]:
from datetime import date
def DQ_DimProduct(cleanedDF, engine_conn):
    dimpro = cleanedDF[['stockcode', 'description']]\
        .drop_duplicates(subset=['stockcode', 'description'])\
        .reset_index(drop=True)

    # Generate 'productid' values as a range starting from start_productid
    start_productid=1
    dimpro['productid'] = range(start_productid, start_productid + len(dimpro))

    dimpro['name'] = 'Product-' + dimpro['stockcode']
    dimpro['processed_date'] = date.today()
    dimpro = dimpro.reindex(columns=['productid', 'name', 'stockcode', 'description', 'processed_date'])

    # Ensure 'productid' is set as the index
    dimpro.set_index('stockcode', inplace=True)

    # Add the DataFrame to the 'dimproduct' table
    dimpro.head(n=1).to_sql(name='dq_dimproduct', if_exists='replace', con=engine_conn, index=True, index_label='stockcode')
    engine_conn.commit()
    
    try:
        with open(r'C:\Users\Mohammed\Desktop\retail-DWH\ETL-SCD-Queries\dq_dimproduct.sql') as file:
            sql_query = file.read()
        results = engine_conn.execute(text(sql_query)).fetchall()
    except Exception as e:
        print('Got ERROR in Product Dimantion, While executing the SQL Query:',e)
    df= pd.DataFrame(results)
    
    if len(df) == 4:
        print("Data Quality Check: All columns and data types match for the DimProduct.")
        print(df)
    else:
        print("Data Quality Check: Detected data quality issues in the DimProduct.")
        
    engine_conn.execute(text('drop table dq_dimproduct;'))
    engine_conn.commit()
    return dimpro
    
DQ_DimProduct(cleanedDF=cleaned_data, engine_conn=connection)


Data Quality Check: All columns and data types match for the DimProduct.
   column_name data_type  column_name data_type
0    stockcode      text    stockcode      text
1    productid    bigint    productid    bigint
2         name      text         name      text
3  description      text  description      text


In [34]:
def DQ_DimDate(cleanedDF, engine_conn):
    cleanedDF['invoicedate']= pd.to_datetime(cleanedDF['invoicedate'])
    maxdate= cleanedDF['invoicedate'].max()
    mindate= cleanedDF['invoicedate'].min()
    Dimdate = pd.DataFrame({'InvoiceDate': pd.date_range(start=mindate, end=maxdate, freq='D')})
    Dimdate['DateKey'] = Dimdate['InvoiceDate']
    Dimdate['Date'] = Dimdate['InvoiceDate'].dt.strftime('%Y-%m-%d')
    Dimdate['Year']= Dimdate['InvoiceDate'].dt.strftime('%Y')
    Dimdate['MonthNo']= Dimdate['InvoiceDate'].dt.strftime('%m')
    Dimdate['MonthName']= Dimdate['InvoiceDate'].dt.strftime('%B')
    Dimdate['Day']= Dimdate['InvoiceDate'].dt.strftime('%d')
    Dimdate['quarter']= Dimdate['InvoiceDate'].dt.quarter
    Dimdate.columns= Dimdate.columns.str.lower()
    Dimdate= Dimdate.reset_index(drop=True).set_index('datekey')
    Dimdate.head(n=1).to_sql(name= 'dq_dimdate', con= engine_conn, if_exists= 'replace',index=True, index_label='datekey')
    engine_conn.commit()
    
    try:
        with open(r'C:\Users\Mohammed\Desktop\retail-DWH\ETL-SCD-Queries\dq_dimdate.sql') as file:
            sql_query = file.read()
        results = engine_conn.execute(text(sql_query)).fetchall()
    except Exception as e:
        print('Got ERROR in Product Dimantion, While executing the SQL Query:',e)
    df= pd.DataFrame(results)
    
    if len(df) == 8:
        print("Data Quality Check: All columns and data types match for the DimDate.")
        print(df)
    else:
        print("Data Quality Check: Detected data quality issues in the DimDate.")
        
    engine_conn.execute(text('drop table dq_dimdate;'))
    engine_conn.commit()
    
        
    
    
    
DQ_DimDate(cleaned_data, connection)

Data Quality Check: All columns and data types match for the DimDate.
   column_name                    data_type  column_name  \
0      datekey  timestamp without time zone      datekey   
1  invoicedate  timestamp without time zone  invoicedate   
2         date                         text         date   
3         year                         text         year   
4      monthno                         text      monthno   
5    monthname                         text    monthname   
6          day                         text          day   
7      quarter                       bigint      quarter   

                     data_type  
0  timestamp without time zone  
1  timestamp without time zone  
2                         text  
3                         text  
4                         text  
5                         text  
6                         text  
7                       bigint  


In [None]:
def FactTable(cleaned_data, DimProduct, db_conn):
    cleaned_data.columns= cleaned_data.columns.str.lower()
    try:
        fact =cleaned_data.merge(DimProduct, on=['stockcode','description']).copy()
    except Exception as e:
        print("print error in ",e)
    fact['saleskey']= range(0, len(fact),1)
    fact.columns= fact.columns.str.lower()
    fact= fact[['saleskey','invoiceno','invoicedate','customerid', 'productid','unitprice','quantity']]\
        .rename(columns={
            'invoicedate':'datekey',
            'customerid':'customerkey',
            'productid': 'productkey'})\
        .set_index('saleskey')
    fact.head(n=1).to_sql(name='dq_fact_sales', con= db_conn, if_exists='replace', index=True, index_label='saleskey')
    
    try:
        with open(r'C:\Users\Mohammed\Desktop\retail-DWH\ETL-SCD-Queries\fact_sales.sql') as file:
            query= file.read()
        result= engine_conn.execute(text(query))
        factDF= pd.DataFrame(result)
        max_saleskey_df = pd.read_sql(text('select max(saleskey) as max_date from fact_sales'), con= engine_conn)
    except Exception as e:
        print("Got ERROR in Fact Sales table: while Executing the SQL Script")
