In [13]:
# This code belongs to Maulana Zulfikar Aziz

import pandas as pd
import pyodbc
import warnings

warnings.filterwarnings('ignore')

SERVER = "ZULFIKARAZIZ"
DRIVER = "ODBC Driver 17 for SQL Server"

source_conn = f'SERVER={SERVER};DRIVER={DRIVER};DATABASE=sample;Trusted_Connection=yes'
dw_conn = f'SERVER={SERVER};DRIVER={DRIVER};DATABASE=DWH;Trusted_Connection=yes'
def extract_data() :
    # Extract from SQL SERVER
    print("Extracting data from various sources ...")
    list_query = {
        "DimAccount":"""
        SELECT account_id AS AccountID, customer_id AS CustomerID, account_type AS AccountType, balance AS Balance, 
        date_opened AS DateOpened, status AS Status FROM account
        """,
        "DimCustomer": """
        SELECT cu.customer_id AS CustomerID, cu.customer_name AS CustomerName, cu.address AS Address, ci.city_name AS CityName, 
        s.state_name AS StateName, cu.age AS Age, cu.gender AS Gender, cu.email AS Email
        FROM customer as cu
        LEFT JOIN city AS ci ON cu.city_id=ci.city_id
        LEFT JOIN state AS s ON ci.state_id=s.state_id
        """,
        "DimBranch" : "SELECT branch_id AS BranchID, branch_name AS BranchName, branch_location AS BranchLocation FROM branch",
        "Transaction" : "SELECT transaction_id, account_id, transaction_date, amount, transaction_type, branch_id FROM transaction_db"
    }
    df = {}
    with pyodbc.connect(source_conn) as conn :
        for nama_tabel, query in list_query.items() :
            df[nama_tabel] = pd.read_sql(query, conn)
    transaction_sql = pd.DataFrame(df["Transaction"])
    transaction_csv = pd.read_csv("transaction_csv.csv")
    transaction_excel = pd.read_excel("transaction_excel.xlsx")
    raw_transaction = pd.concat([transaction_sql, transaction_csv, transaction_excel], ignore_index=True)
    change_columns = {"transaction_id":"TransactionID", "account_id":"AccountID", "transaction_date":"TransactionDate", "amount":"Amount",
                      "transaction_type":"TransactionType", "branch_id":"BranchID"}
    raw_transaction.rename(columns=change_columns, inplace=True)
    raw_account = pd.DataFrame(df["DimAccount"])
    raw_customer = pd.DataFrame(df["DimCustomer"])
    raw_branch = pd.DataFrame(df["DimBranch"])
    
    print("Extraction job is finished.")
    return raw_account, raw_customer, raw_branch, raw_transaction

def staging(raw_account, raw_customer, raw_branch, raw_transaction) :
    print("Staging is in process ...")
    jobs = ((raw_account, "StgAccount", raw_account.columns.tolist(), ["?" for i in range(len(raw_account.columns))]),
          (raw_customer, "StgCustomer",raw_customer.columns.tolist(), ["?" for i in range(len(raw_customer.columns))]),
          (raw_branch, "StgBranch",raw_branch.columns.tolist(), ["?" for i in range(len(raw_branch.columns))]),
          (raw_transaction, "StgFactTransaction", raw_transaction.columns.tolist(), ["?" for i in range(len(raw_transaction.columns))]))
    with pyodbc.connect(dw_conn) as conn :
        cursor = conn.cursor()
        for job in jobs :
            cols = ",".join(job[2])
            values = ",".join(job[3])
            query = f"INSERT INTO {job[1]} ({cols}) VALUES ({values})"
            cursor.execute(f"TRUNCATE TABLE {job[1]}") 
            data = job[0].values.tolist()
            cursor.executemany(query,data)
            cursor.commit()
    print("Staging process is finished.")
    
def transform_data():
    print("Reading data from stage ...")
    with pyodbc.connect(dw_conn) as conn :
        print("Transforming the data...")
        raw_account = pd.read_sql("SELECT AccountID, CustomerID, AccountType, Balance, DateOpened, Status FROM StgAccount", conn)
        raw_customer = pd.read_sql("SELECT CustomerID, CustomerName, Address, CityName, StateName, Age, Gender, Email FROM StgCustomer", conn)
        raw_branch = pd.read_sql("SELECT BranchID, BranchName, BranchLocation FROM StgBranch", conn)
        raw_transaction = pd.read_sql("SELECT TransactionID, AccountID, TransactionDate, Amount, TransactionType, BranchID FROM StgFactTransaction", 
                                      conn)

        # TRANSFORM : ACCOUNT
        clean_account = raw_account.drop_duplicates(subset=["AccountID"],keep="first")
        clean_account["DateOpened"] = pd.to_datetime(clean_account["DateOpened"],dayfirst=False, format="mixed").dt.strftime('%Y-%m-%d')

        # TRANSFORM : CUSTOMER
        clean_customer = raw_customer.drop_duplicates(subset=["CustomerID"],keep="first")
        cols_upper = ["CustomerName", "Address", "CityName", "StateName", "Gender"]
        clean_customer[cols_upper] = clean_customer[cols_upper].apply(lambda x : x.str.upper())

        # TRANSFORM : BRANCH
        clean_branch = raw_branch.drop_duplicates(subset=["BranchID"],keep="first")

        # TRANSFORM : TRANSACTION
        clean_transaction = raw_transaction.drop_duplicates(subset=["TransactionID"], keep="first")
        clean_transaction["TransactionDate"] = pd.to_datetime(clean_transaction["TransactionDate"], dayfirst=False, 
                                                              format="mixed").dt.strftime('%Y-%m-%d')

        print("Transformation process finished.")
        return clean_account, clean_customer, clean_branch, clean_transaction

def load_dimension(clean_account, clean_customer, clean_branch) :
    print("Starting dimension load process ...")
    jobs = ((clean_customer, "DimCustomer",clean_customer.columns.tolist(), ["?" for i in range(len(clean_customer.columns))]),
            (clean_account, "DimAccount", clean_account.columns.tolist(), ["?" for i in range(len(clean_account.columns))]),
            (clean_branch, "DimBranch",clean_branch.columns.tolist(), ["?" for i in range(len(clean_branch.columns))]))
    with pyodbc.connect(dw_conn) as conn :
        cursor = conn.cursor()
        for job in jobs :
            df_clean = job[0].copy()
            id_col = df_clean.columns[0]
            existing_data = pd.read_sql(f"SELECT {id_col} FROM {job[1]}",conn)
            new_data = df_clean[~df_clean[id_col].isin(existing_data[id_col])]
            if not new_data.empty :
                cols = ",".join(job[2])
                values = ",".join(job[3])
                query = f"INSERT INTO {job[1]} ({cols}) VALUES ({values})"
                data = new_data.values.tolist()
                cursor.executemany(query,data)
                cursor.commit()
    print("Dimension load process is finished.")

def load_fact(clean_transaction) :
    print("Starting fact load process ...")
    with pyodbc.connect(dw_conn) as conn :
        cursor = conn.cursor()
        existing_data = pd.read_sql("SELECT TransactionID FROM FactTransaction",conn)
        new_data = clean_transaction[~clean_transaction["TransactionID"].isin(existing_data["TransactionID"])]
        if not new_data.empty :
            query = """
            INSERT INTO FactTransaction (TransactionID, AccountID, TransactionDate, Amount, TransactionType, BranchID) VALUES (?, ?, ?, ?, ?, ?)
            """
            data = new_data.values.tolist()
            cursor.executemany(query,data)
            cursor.commit()
    print("Load fact process is finished.")

def get_daily_transaction(start_date, end_date) :
    print(f"\n--- Transaction from {start_date} to {end_date} ---")
    print("\n")
    with pyodbc.connect(dw_conn) as conn :
        cursor = conn.cursor()
        query = "{CALL DailyTransaction (?, ?)}"
        params = (start_date, end_date)
        cursor.execute(query, params)
        rows = cursor.fetchall()
        if rows :
            columns = [column[0] for column in cursor.description]
            df = pd.DataFrame.from_records(rows,columns=columns)
            print(df)
        else :
            print("There is no data that can be shown.")

def get_balance_per_customer(name) :
    print(f"\n--- Balance Information for Customer Named {name} ---")
    print("\n")
    with pyodbc.connect(dw_conn) as conn :
        cursor = conn.cursor()
        query = "{CALL BalancePerCustomer (?)}"
        params = (name)
        cursor.execute(query, params)
        rows = cursor.fetchall()
        if rows :
            columns = [column[0] for column in cursor.description]
            df = pd.DataFrame.from_records(rows,columns=columns)
            print(df)
        else :
            print("There is no data that can be shown.")
            
if __name__=="__main__" :
    try:
        print("Starting ETL Job...")
        raw_account, raw_customer, raw_branch, raw_transaction = extract_data()
        staging(raw_account, raw_customer, raw_branch, raw_transaction)
        clean_account, clean_customer, clean_branch, clean_factsales = transform_data()
        load_dimension(clean_account, clean_customer, clean_branch)
        load_fact(clean_factsales)
        print("ETL job is finished")
        print("")
        print("--- Menu Daily Transaction ---")
        startdate = input("Insert start date (format : YYYY-MM-DD) : ")
        enddate = input("Insert end date (format : YYYY-MM-DD) : ")
        get_daily_transaction(startdate, enddate)
        print("")
        print("--- Menu Balance Per Customer ---")
        name = input("Insert customer name : ")
        get_balance_per_customer(name)
    except Exception as e :
        print(f"Error : {e}")

Starting ETL Job...
Extracting data from various sources ...
Extraction job is finished.
Staging is in process ...
Staging process is finished.
Reading data from stage ...
Transforming the data...
Transformation process finished.
Starting dimension load process ...
Dimension load process is finished.
Starting fact load process ...
Load fact process is finished.
ETL job is finished

--- Menu Daily Transaction ---


Insert start date (format : YYYY-MM-DD) :  2024-01-17
Insert end date (format : YYYY-MM-DD) :  2024-01-20



--- Transaction from 2024-01-17 to 2024-01-20 ---


  TransactionDate  TotalTransaction  TotalAmount
0      2024-01-17                 2      1100000
1      2024-01-18                 4     11250000
2      2024-01-19                 3      5400000
3      2024-01-20                 4      4000000

--- Menu Balance Per Customer ---


Insert customer name :  Shelly



--- Balance Information for Customer Named Shelly ---


    CustomerName AccountType   Balance  CurrentBalance
0  SHELLY JUWITA    checking  25000000        14000000
1  SHELLY JUWITA      saving   1500000         1600000
