In [37]:
# Importing Necessary Dependencies
import pandas as pd
from datetime import datetime, timedelta

In [38]:
# Reading the dataset into dataframes
zulo_bank = pd.read_csv('dataset\zulo_bank.csv')

In [39]:
# Viewing the first 5 rows
display(zulo_bank.head())

Unnamed: 0,FullName,Email,Phone,TransactionType,Amount,TransactionDate,AccountType,Balance,OpeningDate,LoanAmount,LoanType,StartDate,EndDate,InterestRate
0,Carol Miller,yfisher@example.org,6088279027,withdrawal,102.15,2023-04-26,Savings,5652.16,2019-08-12,,,,,
1,Geoffrey Banks,gonzalesgeorge@example.net,001-546-857-6518x5359,withdrawal,358.8,2020-06-13,Credit,2881.24,2019-05-06,32428.9,Mortgage,2021-06-24,2050-01-08 04:59:17.907588,2.12
2,Geoffrey Banks,gonzalesgeorge@example.net,001-546-857-6518x5359,withdrawal,358.8,2020-06-13,Credit,2881.24,2019-05-06,31406.77,Personal,2021-02-27,2038-10-12 04:59:17.907821,4.63
3,Geoffrey Banks,gonzalesgeorge@example.net,001-546-857-6518x5359,withdrawal,358.8,2020-06-13,Credit,2881.24,2019-05-06,27834.0,Personal,2019-12-05,2037-08-15 04:59:17.909497,2.15
4,Geoffrey Banks,gonzalesgeorge@example.net,001-546-857-6518x5359,withdrawal,358.8,2020-06-13,Credit,2881.24,2019-05-06,27873.08,Auto,2022-01-19,2037-06-03 04:59:17.913974,7.03


In [40]:
# Check the data info
zulo_bank.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1554 entries, 0 to 1553
Data columns (total 14 columns):
 #   Column           Non-Null Count  Dtype  
---  ------           --------------  -----  
 0   FullName         1554 non-null   object 
 1   Email            1554 non-null   object 
 2   Phone            1554 non-null   object 
 3   TransactionType  1554 non-null   object 
 4   Amount           1554 non-null   float64
 5   TransactionDate  1554 non-null   object 
 6   AccountType      1554 non-null   object 
 7   Balance          1554 non-null   float64
 8   OpeningDate      1554 non-null   object 
 9   LoanAmount       1278 non-null   float64
 10  LoanType         1278 non-null   object 
 11  StartDate        1278 non-null   object 
 12  EndDate          1278 non-null   object 
 13  InterestRate     1278 non-null   float64
dtypes: float64(4), object(10)
memory usage: 170.1+ KB


In [41]:
# Fill up missing values with appropriate parameter
zulo_bank.fillna({
    'LoanAmount': 0.0,
    'LoanType': 'Unknown',
    'InterestRate': 0.0
}, inplace=True) 

In [42]:
zulo_bank.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1554 entries, 0 to 1553
Data columns (total 14 columns):
 #   Column           Non-Null Count  Dtype  
---  ------           --------------  -----  
 0   FullName         1554 non-null   object 
 1   Email            1554 non-null   object 
 2   Phone            1554 non-null   object 
 3   TransactionType  1554 non-null   object 
 4   Amount           1554 non-null   float64
 5   TransactionDate  1554 non-null   object 
 6   AccountType      1554 non-null   object 
 7   Balance          1554 non-null   float64
 8   OpeningDate      1554 non-null   object 
 9   LoanAmount       1554 non-null   float64
 10  LoanType         1554 non-null   object 
 11  StartDate        1278 non-null   object 
 12  EndDate          1278 non-null   object 
 13  InterestRate     1554 non-null   float64
dtypes: float64(4), object(10)
memory usage: 170.1+ KB


In [43]:
# Convert to 1NF
# Splitting the 'FullName' into two parts 'first_name' and 'last_name'
zulo_bank[['first_name', 'last_name']] = zulo_bank['FullName'].str.split(expand=True)

display(zulo_bank.head())

Unnamed: 0,FullName,Email,Phone,TransactionType,Amount,TransactionDate,AccountType,Balance,OpeningDate,LoanAmount,LoanType,StartDate,EndDate,InterestRate,first_name,last_name
0,Carol Miller,yfisher@example.org,6088279027,withdrawal,102.15,2023-04-26,Savings,5652.16,2019-08-12,0.0,Unknown,,,0.0,Carol,Miller
1,Geoffrey Banks,gonzalesgeorge@example.net,001-546-857-6518x5359,withdrawal,358.8,2020-06-13,Credit,2881.24,2019-05-06,32428.9,Mortgage,2021-06-24,2050-01-08 04:59:17.907588,2.12,Geoffrey,Banks
2,Geoffrey Banks,gonzalesgeorge@example.net,001-546-857-6518x5359,withdrawal,358.8,2020-06-13,Credit,2881.24,2019-05-06,31406.77,Personal,2021-02-27,2038-10-12 04:59:17.907821,4.63,Geoffrey,Banks
3,Geoffrey Banks,gonzalesgeorge@example.net,001-546-857-6518x5359,withdrawal,358.8,2020-06-13,Credit,2881.24,2019-05-06,27834.0,Personal,2019-12-05,2037-08-15 04:59:17.909497,2.15,Geoffrey,Banks
4,Geoffrey Banks,gonzalesgeorge@example.net,001-546-857-6518x5359,withdrawal,358.8,2020-06-13,Credit,2881.24,2019-05-06,27873.08,Auto,2022-01-19,2037-06-03 04:59:17.913974,7.03,Geoffrey,Banks


In [44]:
# Convert 1NF to 2NF
# Customer table
customer = zulo_bank[['first_name', 'last_name', 'Email', 'Phone']].copy().drop_duplicates().reset_index(drop=True)
customer['customer_id'] = range(1, len(customer) + 1)
customer = customer[['customer_id', 'first_name', 'last_name', 'Email', 'Phone']]

In [45]:
# account table
account = zulo_bank[['AccountType', 'Balance', 'OpeningDate']].copy().drop_duplicates().reset_index(drop=True)
account['account_id'] = range(1, len(account) + 1)
account = account[['account_id', 'AccountType', 'Balance', 'OpeningDate']]

In [46]:
# transaction table
transaction = zulo_bank[['TransactionType', 'Amount', 'TransactionDate']].copy().drop_duplicates().reset_index(drop=True)
transaction['transaction_id'] = range(1, len(transaction) + 1)
transaction = transaction[['transaction_id', 'TransactionType', 'Amount', 'TransactionDate']]

In [47]:
# loan table
loan = zulo_bank[['LoanAmount', 'LoanType', 'StartDate', 'EndDate', 'InterestRate']].copy().drop_duplicates().reset_index(drop=True)
loan['loan_id'] = range(1, len(loan) + 1)
loan = loan[['loan_id', 'LoanAmount', 'LoanType', 'StartDate', 'EndDate', 'InterestRate']]

In [48]:
# Merge operations to create the zulo_bank
zulo_bank = zulo_bank.merge(customer, on=['first_name', 'last_name', 'Email', 'Phone'], how='left') \
                     .merge(account, on=['AccountType', 'Balance', 'OpeningDate'], how='left') \
                     .merge(transaction, on=['TransactionType', 'Amount', 'TransactionDate'], how='left') \
                     .merge(loan, on=['LoanAmount', 'LoanType', 'StartDate', 'EndDate', 'InterestRate'], how='left') \
                     [['customer_id', 'account_id', 'transaction_id', 'loan_id']]

In [49]:
zulo_bank

Unnamed: 0,customer_id,account_id,transaction_id,loan_id
0,1,1,1,1
1,2,2,2,2
2,2,2,2,3
3,2,2,2,4
4,2,2,2,5
...,...,...,...,...
1549,20,165,997,29
1550,20,165,997,30
1551,25,94,998,1
1552,5,68,999,11


In [50]:
# Convert 2NF to 3NF
# Creating a date dimension table
# Define start and end dates
start_date = datetime(2020, 1, 1)
current_date = datetime(2090, 12, 31)

# Calculate the number of days between start date and current date
num_days = (current_date - start_date).days

# Generate a list of dates from start date to current date
date_list = [start_date + timedelta(days=x) for x in range(num_days + 1)]

# Ensure date_id matches the length of date_list
date = {'date_id': [x for x in range(1, len(date_list) + 1)], 'date': date_list}

# Create DataFrame
date_dim = pd.DataFrame(date)
date_dim['Year'] = date_dim['date'].dt.year
date_dim['Month'] = date_dim['date'].dt.month
date_dim['Day'] = date_dim['date'].dt.day
date_dim['date'] = pd.to_datetime(date_dim['date']).dt.date

In [51]:
# Account table 2NF to 3NF
account['OpeningDate'] = pd.to_datetime(account['OpeningDate']).dt.date
account = account.merge(date_dim, left_on='OpeningDate', right_on='date', how='inner') \
                 .rename(columns={'date_id':'OpeningDate_ID'}) \
                 .reset_index(drop=True) \
                 [['account_id', 'AccountType', 'Balance', 'OpeningDate_ID']]

In [52]:
# Transaction table 2NF to 3NF
transaction['TransactionDate'] = pd.to_datetime(transaction['TransactionDate']).dt.date
transaction = transaction.merge(date_dim, left_on='TransactionDate', right_on='date', how='inner') \
                         .rename(columns={'date_id': 'TransactionDate_ID'}) \
                         .reset_index(drop=True) \
                         [['transaction_id', 'TransactionType', 'Amount', 'TransactionDate_ID']]

In [53]:
# Loan table 2NF to 3NF
loan['StartDate'] = pd.to_datetime(loan['StartDate']).dt.date
loan['EndDate'] = pd.to_datetime(loan['EndDate']).dt.date
loan = loan.merge(date_dim, left_on='StartDate', right_on='date', how='inner') \
            .rename(columns={'date_id': 'StartDate_ID'}) \
            .merge(date_dim, left_on='EndDate', right_on='date', how='inner', suffixes=('', '_end')) \
            .rename(columns={'date_id': 'EndDate_ID'}) \
            [['loan_id', 'LoanAmount', 'LoanType', 'StartDate_ID', 'EndDate_ID', 'InterestRate']]

In [56]:
# Save to our directory
transaction.to_csv(r'dataset\database_model\transactions.csv', index=False)
loan.to_csv(r'dataset\database_model\loans.csv', index=False)
account.to_csv(r'dataset\database_model\accounts.csv', index=False)
customer.to_csv(r'dataset\database_model\customers.csv', index=False)
date_dim.to_csv(r'dataset\database_model\date_dim.csv', index=False)
zulo_bank.to_csv(r'dataset\database_model\fact_table.csv', index=False)

In [14]:
zulo_bank.columns

Index(['FullName', 'Email', 'Phone', 'TransactionType', 'Amount',
       'TransactionDate', 'AccountType', 'Balance', 'OpeningDate',
       'LoanAmount', 'LoanType', 'StartDate', 'EndDate', 'InterestRate',
       'first_name', 'last_name'],
      dtype='object')

### Transaction DWH Schema

In [57]:
transaction_dim = transaction[['transaction_id', 'TransactionType']].copy().drop_duplicates().reset_index(drop=True)
account_dim = account[['account_id', 'AccountType', 'Balance']].copy().drop_duplicates().reset_index(drop=True)

transaction_fact_table = zulo_bank.merge(transaction, on='transaction_id', how='inner') \
                                  .merge(account, on='account_id', how='inner') \
                                  [['transaction_id', 'account_id', 'OpeningDate_ID', 'TransactionDate_ID', 'Amount']]

In [67]:
# Save to memory
transaction_dim.to_csv(r'dataset\transaction_dwh\transaction_dim.csv', index=False)
account_dim.to_csv(r'dataset\transaction_dwh\account_dim.csv', index=False)
transaction_fact_table.to_csv(r'dataset\transaction_dwh\transaction_fact_table.csv', index=False)

### Loan DWH Schema

In [63]:
customer_dim = customer.copy()
loan_dim = loan[['loan_id', 'LoanType']].copy().drop_duplicates().reset_index(drop=True)

loan_fact_table = zulo_bank.merge(customer, on='customer_id', how='inner') \
                           .merge(loan, on='loan_id', how='inner') \
                           [['loan_id', 'customer_id', 'StartDate_ID', 'EndDate_ID', 'LoanAmount', 'InterestRate']]

In [68]:
# Save to memory
loan_dim.to_csv(r'dataset\loan_dwh\loan_dim.csv', index=False)
customer_dim.to_csv(r'dataset\loan_dwh\customer_dim.csv', index=False)
loan_fact_table.to_csv(r'dataset\loan_dwh\loan_fact_table.csv', index=False)

## Loading into the RDBMS using psycopg2

In [69]:
!pip install psycopg2



In [72]:
import psycopg2

In [73]:
# Define database connecion parameters including the database name
db_params = {
    'username': 'postgres',
    'password': 'password',
    'host': 'localhost',
    'port': '5432',
    'database': 'zulo_bank'
}

default_db_url = f"postgresql://{db_params['username']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/postgres"

# Attempt to create a database
try:
    # Open the connection
    conn = psycopg2.connect(default_db_url)
    conn.autocommit = True
    cur = conn.cursor()
    # Check if the database already exists
    cur.execute(f"SELECT 1 FROM pg_catalog.pg_database WHERE datname = '{db_params['database']}'")
    exists = cur.fetchone()
    if not exists:
        # Create the database
        cur.execute(f"CREATE DATABASE {db_params['database']}")
        print(f"Database {db_params['database']} created succesfully")
    else:
        print(f"Database {db_params['database']} already exists")
    # Close the connection
    cur.close()
    conn.close()
except Exception as e:
    print(f"An error occurred: {e}")

Database zulo_bank created succesfully


In [92]:
# Connect to the newly created(or existing) database to  create schema and tables
def get_db_connection():
    connection = psycopg2.connect(
        host='localhost',
        database='zulo_bank',
        user='postgres',
        password='password'
    )
    return connection

conn = get_db_connection()

In [93]:
# Create schema and tables
def create_tables():
    conn = get_db_connection()
    cursor = conn.cursor()
    create_table_query = '''
                            CREATE SCHEMA IF NOT EXISTS zulobankdb;

                            DROP TABLE IF EXISTS zulobankdb.transactions CASCADE;
                            DROP TABLE IF EXISTS zulobankdb.accounts CASCADE;
                            DROP TABLE IF EXISTS zulobankdb.customers CASCADE;
                            DROP TABLE IF EXISTS zulobankdb.date_dim CASCADE;
                            DROP TABLE IF EXISTS zulobankdb.loans CASCADE;
                            DROP TABLE IF EXISTS zulobankdb.zulo_fact_table CASCADE;
                            
                            CREATE TABLE IF NOT EXISTS zulobankdb.date_dim (
                                date_id SERIAL PRIMARY KEY,
                                date VARCHAR(10000),
                                Year INTEGER,
                                Month INTEGER,
                                Day INTEGER
                            );

                            CREATE TABLE IF NOT EXISTS zulobankdb.transactions (
                                transaction_id SERIAL PRIMARY KEY,
                                TransactionType VARCHAR(10000),
                                Amount FLOAT,
                                TransactionDate_ID INTEGER,
                                FOREIGN KEY (TransactionDate_ID) REFERENCES zulobankdb.date_dim(date_id)
                            );

                            CREATE TABLE IF NOT EXISTS zulobankdb.accounts (
                                account_id SERIAL PRIMARY KEY,
                                AccountType VARCHAR(10000),
                                Balance FLOAT,
                                OpeningDate_ID INTEGER,
                                FOREIGN KEY (OpeningDate_ID) REFERENCES zulobankdb.date_dim(date_id)
                            );

                            CREATE TABLE IF NOT EXISTS zulobankdb.loans (
                                loan_id SERIAL PRIMARY KEY,
                                LoanAmount FLOAT,
                                LoanType VARCHAR(10000),
                                StartDate_ID INTEGER,
                                EndDate_ID  INTEGER,
                                InterestRate FLOAT,
                                FOREIGN KEY (StartDate_ID) REFERENCES zulobankdb.date_dim(date_id),
                                FOREIGN KEY (EndDate_ID) REFERENCES zulobankdb.date_dim(date_id)
                            );

                            CREATE TABLE IF NOT EXISTS zulobankdb.customers (
                                customer_id SERIAL PRIMARY KEY,
                                first_name VARCHAR(10000),
                                last_name VARCHAR(10000),
                                Email VARCHAR(10000),
                                Phone VARCHAR(10000)
                            );

                            CREATE TABLE IF NOT EXISTS zulobankdb.zulo_fact_table (
                                customer_id INTEGER,
                                account_id  INTEGER,
                                transaction_id INTEGER,
                                loan_id  INTEGER,
                                FOREIGN KEY (customer_id) REFERENCES zulobankdb.customers(customer_id),
                                FOREIGN KEY (account_id) REFERENCES zulobankdb.accounts(account_id),
                                FOREIGN KEY (transaction_id) REFERENCES zulobankdb.transactions(transaction_id),
                                FOREIGN KEY (loan_id) REFERENCES zulobankdb.loans(loan_id)
                            );'''
    cursor.execute(create_table_query)
    conn.commit()
    cursor.close()
    conn.close()

In [94]:
create_tables()

In [96]:
# Loading the data
# date_dim table
import csv
def load_data_from_csv(csv_path):
    conn =  get_db_connection()
    cursor = conn.cursor()
    with open(csv_path, 'r') as file:
        reader = csv.reader(file)
        next(reader)
        for row in reader:
            cursor.execute(
                '''INSERT INTO zulobankdb.date_dim (date_id, date, Year, Month, Day)
                    VALUES (%s, %s, %s, %s, %s);''',
                    row
                )
    conn.commit()
    cursor.close()
    conn.close()

# provide  the csv path to the file
csv_file_path = r'dataset\database_model\date_dim.csv'

load_data_from_csv(csv_file_path)
print('dim_date data loaded successfully ')

dim_date data loaded successfully 


In [98]:
def load_data_from_csv(csv_path):
    conn =  get_db_connection()
    cursor = conn.cursor()
    with open(csv_path, 'r') as file:
        reader = csv.reader(file)
        next(reader)
        for row in reader:
            cursor.execute(
                '''INSERT INTO zulobankdb.transactions (transaction_id, TransactionType, Amount, TransactionDate_ID)
                    VALUES (%s, %s, %s, %s);''',
                    row
                )
    conn.commit()
    cursor.close()
    conn.close()

# provide  the csv path to the file
csv_file_path = r'dataset\database_model\transactions.csv'

load_data_from_csv(csv_file_path)
print('transactions data loaded successfully ')

transactions data loaded successfully 


In [100]:
def load_data_from_csv(csv_path):
    conn =  get_db_connection()
    cursor = conn.cursor()
    with open(csv_path, 'r') as file:
        reader = csv.reader(file)
        next(reader)
        for row in reader:
            cursor.execute(
                '''INSERT INTO zulobankdb.accounts (account_id, AccountType, Balance, OpeningDate_ID)
                    VALUES (%s, %s, %s, %s);''',
                    row
                )
    conn.commit()
    cursor.close()
    conn.close()

# provide  the csv path to the file
csv_file_path = r'dataset\database_model\accounts.csv'

load_data_from_csv(csv_file_path)
print('accounts data loaded successfully ')

accounts data loaded successfully 


In [102]:
def load_data_from_csv(csv_path):
    conn =  get_db_connection()
    cursor = conn.cursor()
    with open(csv_path, 'r') as file:
        reader = csv.reader(file)
        next(reader)
        for row in reader:
            cursor.execute(
                '''INSERT INTO zulobankdb.loans (loan_id, LoanAmount, LoanType, StartDate_ID, EndDate_ID, InterestRate)
                    VALUES (%s, %s, %s, %s, %s, %s);''',
                    row
                )
    conn.commit()
    cursor.close()
    conn.close()

# provide  the csv path to the file
csv_file_path = r'dataset\database_model\loans.csv'

load_data_from_csv(csv_file_path)
print('loans data loaded successfully ')

loans data loaded successfully 


In [104]:
def load_data_from_csv(csv_path):
    conn =  get_db_connection()
    cursor = conn.cursor()
    with open(csv_path, 'r') as file:
        reader = csv.reader(file)
        next(reader)
        for row in reader:
            cursor.execute(
                '''INSERT INTO zulobankdb.customers (customer_id, first_name, last_name, Email, Phone)
                    VALUES (%s, %s, %s, %s, %s);''',
                    row
                )
    conn.commit()
    cursor.close()
    conn.close()

# provide  the csv path to the file
csv_file_path = r'dataset\database_model\customers.csv'

load_data_from_csv(csv_file_path)
print('customers data loaded successfully ')

customers data loaded successfully 


In [107]:
def load_data_from_csv(csv_path):
    conn =  get_db_connection()
    cursor = conn.cursor()
    with open(csv_path, 'r') as file:
        reader = csv.reader(file)
        next(reader)
        for row in reader:
            try:
                cursor.execute(
                '''INSERT INTO zulobankdb.zulo_fact_table (customer_id, account_id, transaction_id, loan_id)
                    VALUES (%s, %s, %s, %s);''',
                    row
                )
            except psycopg2.IntegrityError: # Catch foregin key violation
                conn.rollback() # Rollback the current transaction so you can continue
            else:
                conn.commit() # Commit if the rows are inserted successfully
    cursor.close()
    conn.close()

# provide  the csv path to the file
csv_file_path = r'dataset\database_model\fact_table.csv'

load_data_from_csv(csv_file_path)
print('zulo_fact_table data loaded successfully ')

zulo_fact_table data loaded successfully 
