In [2]:
from sqlalchemy import create_engine
import pyodbc
import pandas as pd
import os


print('PGPASS' in os.environ)

True


In [4]:
#get credentials from environment variables
pwd = os.environ['PGPASS']
uid = os.environ['PGUID']

# sql db details
driver = "{ODBC Driver 17 for SQL Server}"
server = "ATHENA"
database = "AdventureWorksDW2022;"

In [6]:
#get drivers
for drivers in pyodbc.drivers():
    print(drivers)

SQL Server
SQL Server Native Client RDA 11.0
ODBC Driver 17 for SQL Server
Microsoft Access Driver (*.mdb, *.accdb)
Microsoft Excel Driver (*.xls, *.xlsx, *.xlsm, *.xlsb)
Microsoft Access Text Driver (*.txt, *.csv)


In [8]:
mssql_conn_str = (
    'DRIVER=' + driver + 
    ';SERVER=' + server +
    "\\SQLEXPRESS" + 
    ';DATABASE=' + database + 
    ';UID=' + uid + 
    ';PWD=' + pwd
)

postgres_conn_str = f"postgresql://{uid}:{pwd}@{server}:5432/adventureworks"

In [10]:
#extract data from sql server
def extract():
    try:
        src_conn = pyodbc.connect(mssql_conn_str)
        tables = ['dbo.DimCustomer', 'dbo.DimEmployee', 'dbo.DimProduct', 'dbo.FactInternetSales', 'dbo.FactResellerSales']
        dataframes = {}
        
        #select data from tables and add in dataframes
        for table in tables:
            query = f'select * from {table}'
            df = pd.read_sql(query, src_conn)
            print(f"No. of rows extracted {len(df)} for table: {table}")
            dataframes[table] = df

        src_conn.close()
        print("Data extracted from sql server successfully.")
        return dataframes
        
    except Exception as e:
        print(f"Error extracting data: {e}")
        return None

        
        
dataframes = extract()

  df = pd.read_sql(query, src_conn)


No. of rows extracted 18484 for table: dbo.DimCustomer
No. of rows extracted 296 for table: dbo.DimEmployee
No. of rows extracted 606 for table: dbo.DimProduct
No. of rows extracted 60398 for table: dbo.FactInternetSales
No. of rows extracted 60855 for table: dbo.FactResellerSales
Data extracted from sql server successfully.


In [12]:
# Data Transform
#table 1
df = dataframes['dbo.DimCustomer']
# coloumns to drop = NameStyle, MaritalStatus, suffix, yearly income, 13-23, commute distance
df.drop(columns = ['NameStyle', 'MaritalStatus', 'Suffix', 'CommuteDistance'], inplace = True)

columns_to_drop = df.columns[11:21]  # Select columns by index
df.drop(columns=columns_to_drop, inplace=True)

df1 = df.drop_duplicates()
tf = []
tf.append(df1)


# table 2
# ParentEmployeeNationalIDAlternateKey, namestyle
df = dataframes['dbo.DimEmployee']
# Data Cleaning 15:19,20:30
df.drop(columns = df.columns[20:31], inplace = True)
df.drop(columns = df.columns[15:19], inplace = True)
df.drop(columns = ['ParentEmployeeNationalIDAlternateKey', 'NameStyle'], inplace = True)
df2 = df.drop_duplicates()
tf.append(df2)


#table 3 
df = dataframes['dbo.DimProduct']
df.drop(columns = df.columns[15:36], inplace = True)
df.drop(columns = df.columns[9:13], inplace = True)
df.drop(columns = ['WeightUnitMeasureCode', 'SizeUnitMeasureCode', 'SpanishProductName', 'FrenchProductName'], inplace = True)
df3 = df.drop_duplicates()

tf.append(df3)



#table 4 
df = dataframes['dbo.FactInternetSales']
df4 = df.drop_duplicates()
tf.append(df4)


#table 5 -
df = dataframes['dbo.FactResellerSales']
df5 = df.drop_duplicates()
tf.append(df5)

In [14]:
tables = ['dbo.DimCustomer', 'dbo.DimEmployee', 'dbo.DimProduct', 'dbo.FactInternetSales', 'dbo.FactResellerSales']
transformed_data = {}
for table, data in zip(tables, tf):
    transformed_data[table] = data


In [20]:

# Load Data into PostgreSQL
def load_data_to_postgres(dataframes):
    try:
        rows_imported = 0
        postgres_engine = create_engine(postgres_conn_str)
        for table_name, df in dataframes.items():
            df.to_sql(f"stg_{table_name}", postgres_engine, if_exists='replace', index=False)
            rows_imported += len(df)
            print(f"No. of rows imported{rows_imported}")
        print("Data loaded into PostgreSQL successfully.")
    except Exception as e:
        print(f"Error loading data: {e}")

load_data_to_postgres(transformed_data)

No. of rows imported18484
No. of rows imported18780
No. of rows imported19386
No. of rows imported79784
No. of rows imported140639
Data loaded into PostgreSQL successfully.
