#### Import Necessary Libraries

In [1]:
import os
import json
import numpy
import datetime
import pprint
import requests
import requests.exceptions
import pandas as pd

import pymongo
from sqlalchemy import create_engine

# Part 1: SQL Data

#### Connection Variables

In [2]:
host_name = "localhost"
host_ip = "127.0.0.1"
port = "3306"
user_id = "root"
pwd = "AccessSQL"

src_dbname = "bank_db"
dst_dbname = "Bank_DW"

#### Define SQL Functions

In [3]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

#### Create Switch Connection Text and new Data Warehouse

In [4]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x7fcf779d4880>

#### Extract Source Data from Account, Account Transaction, Product, and Product Type

In [5]:
sql_account = "SELECT * FROM account;"
df_account = get_dataframe(user_id, pwd, host_name, src_dbname, sql_account)
df_account.head(2)

Unnamed: 0,ACCOUNT_ID,AVAIL_BALANCE,CLOSE_DATE,LAST_ACTIVITY_DATE,OPEN_DATE,PENDING_BALANCE,STATUS,CUST_ID,OPEN_BRANCH_ID,OPEN_EMP_ID,PRODUCT_CD
0,1,1057.75,,2005-01-04,2000-01-15,1057.75,ACTIVE,1,2,10,CHK
1,2,500.0,,2004-12-19,2000-01-15,500.0,ACTIVE,1,2,10,SAV


In [6]:
sql_acc_txn = "SELECT * FROM acc_transaction;"
df_acc_txn = get_dataframe(user_id, pwd, host_name, src_dbname, sql_acc_txn)
df_acc_txn.head(2)

Unnamed: 0,TXN_ID,AMOUNT,FUNDS_AVAIL_DATE,TXN_DATE,TXN_TYPE_CD,ACCOUNT_ID,EXECUTION_BRANCH_ID,TELLER_EMP_ID
0,1,100.0,2000-01-15,2000-01-15,CDT,1,,
1,2,100.0,2000-01-15,2000-01-15,CDT,2,,


In [7]:
sql_product = "SELECT * FROM product;"
df_product = get_dataframe(user_id, pwd, host_name, src_dbname, sql_product)
df_product.head(2)

Unnamed: 0,PRODUCT_CD,DATE_OFFERED,DATE_RETIRED,NAME,PRODUCT_TYPE_CD
0,AUT,2000-01-01,,auto loan,LOAN
1,BUS,2000-01-01,,business line of credit,LOAN


In [8]:
sql_prod_type = "SELECT * FROM product_type;"
df_prod_type = get_dataframe(user_id, pwd, host_name, src_dbname, sql_prod_type)
df_prod_type.head(2)

Unnamed: 0,PRODUCT_TYPE_CD,NAME
0,ACCOUNT,Customer Accounts
1,INSURANCE,Insurance Offerings


#### Merge product type into product, then perform necessary transformations

In [9]:
df_product = df_product.merge(df_prod_type, on='PRODUCT_TYPE_CD')
df_product.head(2)

Unnamed: 0,PRODUCT_CD,DATE_OFFERED,DATE_RETIRED,NAME_x,PRODUCT_TYPE_CD,NAME_y
0,AUT,2000-01-01,,auto loan,LOAN,Individual and Business Loans
1,BUS,2000-01-01,,business line of credit,LOAN,Individual and Business Loans


In [10]:
drop_cols = ['DATE_OFFERED', 'DATE_RETIRED', 'PRODUCT_TYPE_CD']
df_product.drop(drop_cols, axis=1, inplace=True)
df_product.rename(columns={"PRODUCT_CD":"PRODUCT_KEY", "NAME_x":"NAME", "NAME_y":"TYPE"}, inplace=True)
df_product.head(2)

Unnamed: 0,PRODUCT_KEY,NAME,TYPE
0,AUT,auto loan,Individual and Business Loans
1,BUS,business line of credit,Individual and Business Loans


#### Merge product into account, then perform necessary transformations on account and account transaction

In [11]:
df_account = df_account.merge(df_product, how='outer', left_on='PRODUCT_CD', right_on='PRODUCT_KEY')
df_account.head(2)

Unnamed: 0,ACCOUNT_ID,AVAIL_BALANCE,CLOSE_DATE,LAST_ACTIVITY_DATE,OPEN_DATE,PENDING_BALANCE,STATUS,CUST_ID,OPEN_BRANCH_ID,OPEN_EMP_ID,PRODUCT_CD,PRODUCT_KEY,NAME,TYPE
0,1.0,1057.75,,2005-01-04,2000-01-15,1057.75,ACTIVE,1.0,2.0,10.0,CHK,CHK,checking account,Customer Accounts
1,4.0,2258.02,,2004-12-27,2001-03-12,2258.02,ACTIVE,2.0,2.0,10.0,CHK,CHK,checking account,Customer Accounts


In [12]:
drop_cols = ['STATUS', 'PRODUCT_CD', 'PRODUCT_KEY', 'OPEN_BRANCH_ID', 'OPEN_EMP_ID']
df_account.drop(drop_cols, axis=1, inplace=True)
df_account.rename(columns={"NAME":"PROD_NAME", "TYPE":"PROD_TYPE", "ACCOUNT_ID":"ACC_KEY", 'CUST_KEY':'CUST_ID'}, inplace=True)
df_account = df_account.drop([24, 25])
df_account.head(2)

Unnamed: 0,ACC_KEY,AVAIL_BALANCE,CLOSE_DATE,LAST_ACTIVITY_DATE,OPEN_DATE,PENDING_BALANCE,CUST_ID,PROD_NAME,PROD_TYPE
0,1.0,1057.75,,2005-01-04,2000-01-15,1057.75,1.0,checking account,Customer Accounts
1,4.0,2258.02,,2004-12-27,2001-03-12,2258.02,2.0,checking account,Customer Accounts


In [13]:
drop_cols = ['FUNDS_AVAIL_DATE']
df_acc_txn.drop(drop_cols, axis=1, inplace=True)
df_acc_txn.rename(columns={"TXN_ID":"TXN_KEY", "ACCOUNT_ID":"ACC_KEY", "EXECUTION_BRANCH_ID":"BRANCH_KEY", "TELLER_EMP_ID":"EMP_KEY", "TXN_TYPE_CD":"TXN_TYPE"}, inplace=True)
df_acc_txn.head(2)

Unnamed: 0,TXN_KEY,AMOUNT,TXN_DATE,TXN_TYPE,ACC_KEY,BRANCH_KEY,EMP_KEY
0,1,100.0,2000-01-15,CDT,1,,
1,2,100.0,2000-01-15,CDT,2,,


# Part 2: MongoDB Data

#### Update Connection Variables from SQL to MongoDB

In [14]:
host_name = "localhost"
ports = {"mongo" : 27017, "mysql" : 3306}

user_id = "root"
pwd = "AccessSQL"

atlas_cluster_name = "cluster0"
atlas_default_dbname = "admin"
atlas_user_name = "adamsnyder"
atlas_password = "AccessMongo"
conn_string_chars = "sorbu3t"

conn_str = {"local" : f"mongodb://{host_name}:{ports['mongo']}/",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.{conn_string_chars}.mongodb.net/{atlas_default_dbname}?retryWrites=true&w=majority"
}

#### Define MongoDB Functions

In [15]:
def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def get_mongo_dataframe_local(user_id, pwd, host_name, port, db_name, collection, query):
    '''Create a connection to MongoDB, with or without authentication credentials'''
    if user_id and pwd:
        mongo_uri = 'mongodb://%s:%s@%s:%s/%s' % (username, password, host, port, db_name)
        client = pymongo.MongoClient(mongo_uri)
    else:
        conn_str = f"mongodb://{host_name}:{port}/"
        client = pymongo.MongoClient(conn_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

#### Populate MongoDB With Employee and Department data

In [None]:
# **DO NOT RUN THIS CELL AGAIN**

In [26]:
client = pymongo.MongoClient(conn_str["atlas"])
db = client[src_dbname]

data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"DEPARTMENT" : "bank_department.json", "EMPLOYEE":"bank_employee.json"}

for file in json_files:
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        #print(f"{file} was successfully loaded.")

        
client.close()        

#### Extract data into a dataframe

In [16]:
query = {}
collection = "EMPLOYEE"

df_employee = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)
df_employee.head(2)

Unnamed: 0,EMP_ID,END_DATE,FIRST_NAME,LAST_NAME,START_DATE,TITLE,ASSIGNED_BRANCH_ID,DEPT_ID,SUPERIOR_EMP_ID
0,1,,Michael,Smith,2001-06-22,President,1,3,
1,2,,Susan,Barker,2002-09-12,Vice President,1,3,1.0


In [17]:
query = {}
collection = "DEPARTMENT"

df_department = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)
df_department.head(2)

Unnamed: 0,DEPT_ID,NAME
0,1,Operations
1,2,Loans


#### Merge the Department dataframe into Employee, then perform transformations on new Employee dataframe

In [18]:
df_employee = df_employee.merge(df_department, how='outer', left_on='DEPT_ID', right_on="DEPT_ID")
df_employee.head(2)

Unnamed: 0,EMP_ID,END_DATE,FIRST_NAME,LAST_NAME,START_DATE,TITLE,ASSIGNED_BRANCH_ID,DEPT_ID,SUPERIOR_EMP_ID,NAME
0,1.0,,Michael,Smith,2001-06-22,President,1.0,3,,Administration
1,2.0,,Susan,Barker,2002-09-12,Vice President,1.0,3,1.0,Administration


In [19]:
drop_cols = ['DEPT_ID']
df_employee.drop(drop_cols, axis=1, inplace=True)
df_employee.rename(columns={"EMP_ID":"EMP_KEY", "ASSIGNED_BRANCH_ID":"BRANCH_Key", "NAME":"DEPT", "SUPERIOR_EMP_ID":"SUPERIOR_KEY"}, inplace=True)
df_employee = df_employee.drop(18)
df_employee.head(2)

Unnamed: 0,EMP_KEY,END_DATE,FIRST_NAME,LAST_NAME,START_DATE,TITLE,BRANCH_Key,SUPERIOR_KEY,DEPT
0,1.0,,Michael,Smith,2001-06-22,President,1.0,,Administration
1,2.0,,Susan,Barker,2002-09-12,Vice President,1.0,1.0,Administration


# Part 3: Extracting CSV data from API

#### Defining response function with error handling

In [20]:
def get_api_response(url, response_type):
    try:
        response = requests.get(url)
        response.raise_for_status()
    
    except requests.exceptions.HTTPError as errh:
        return "An Http Error occurred: " + repr(errh)
    except requests.exceptions.ConnectionError as errc:
        return "An Error Connecting to the API occurred: " + repr(errc)
    except requests.exceptions.Timeout as errt:
        return "A Timeout Error occurred: " + repr(errt)
    except requests.exceptions.RequestException as err:
        return "An Unknown Error occurred: " + repr(err)

    if response_type == 'json':
        result = json.dumps(response.json(), sort_keys=True, indent=4)
    elif response_type == 'dataframe':
        result = pd.json_normalize(response.json())
    else:
        result = "An unhandled error has occurred!"
        
    return result

#### Defining API variables

In [21]:
url = "https://retoolapi.dev/Y0cknB/data"
# this API was generated using the bank_branch.csv file 
# on the website https://retool.com/utilities/generate-api-from-csv
response_type = ['csv', 'dataframe']

#### Generating dataframe from API

In [22]:
df_branch = get_api_response(url, response_type[1])
print(df_branch.shape)
print(df_branch.columns)

df_branch.head(2)

(5, 7)
Index(['id', 'CITY', 'NAME', 'STATE', 'ADDRESS', 'ZIP_CODE', 'BRANCH_ID'], dtype='object')


Unnamed: 0,id,CITY,NAME,STATE,ADDRESS,ZIP_CODE,BRANCH_ID
0,1,Waltham,Headquarters,MA,3882 Main St.,2451,1
1,2,Woburn,Woburn Branch,MA,422 Maple St.,1801,2


#### Perform transformations on new branch dataframe

In [23]:
df_branch = df_branch[["BRANCH_ID", "NAME", "ADDRESS", "STATE", "ZIP_CODE"]]
df_branch.rename(columns={"BRANCH_ID":"BRANCH_KEY"}, inplace=True)
df_branch = df_branch.drop(4)
df_branch = df_branch.astype({'BRANCH_KEY': 'int32'})
df_branch.head(2)

Unnamed: 0,BRANCH_KEY,NAME,ADDRESS,STATE,ZIP_CODE
0,1,Headquarters,3882 Main St.,MA,2451
1,2,Woburn Branch,422 Maple St.,MA,1801


# Part 4: Set new fact and dimension tables into SQL warehouse

#### Restate MySQL declaration variables

In [24]:
host_name = "localhost"
host_ip = "127.0.0.1"
port = "3306"
user_id = "root"
pwd = "AccessSQL"

src_dbname = "bank_db"
dst_dbname = "Bank_DW"

#### Create and populate fact and dimension tables

In [25]:
db_operation = "insert"

tables = [('fact_transactions', df_acc_txn, 'TXN_KEY'),
          ('dim_account', df_account, 'ACC_KEY'),
          ('dim_employee', df_employee, 'EMP_KEY'),
          ('dim_branch', df_branch, 'BRANCH_KEY')]

for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

# Part 5: Create, populate, and integrate dim date

In [26]:
# I ran my files Project1_Create_DimDate.sql and Project1_Integrate_DimDate.sql
# in MySQL Workbench to create and integrate the date dimension table
# This code can be found in my GitHub

# Part 6: Validate functionality of new data warehouse

#### First test query groups the fact_transaction table and dim_date table. It selects the transaction key and account number from the fact table and the date and day of the week from the dimension table.

In [28]:
query = f"""select ft.TXN_KEY as t_key,
	ft.ACC_KEY as acc_num,
	ft.TXN_DATE as t_date,
	dd.day_name_of_week as week_day
from {dst_dbname}.fact_transactions as ft
right join {dst_dbname}.dim_date as dd
on ft.TXN_DATE_KEY = dd.date_key
order by t_key desc
limit 21;"""

df_test1 = get_dataframe(user_id, pwd, host_name, src_dbname, query)

In [29]:
df_test1.head(5)

Unnamed: 0,t_key,acc_num,t_date,week_day
0,21,28,2003-07-30,Wednesday
1,20,24,2002-09-30,Monday
2,19,23,2004-06-30,Wednesday
3,18,22,2004-10-28,Thursday
4,17,21,2003-07-30,Wednesday


#### Second test query aggregates the accounts by what product the account provides. For each product, the query selects the average balance in all accounts of that product type.

In [30]:
query = f"""select PROD_NAME as product,
	avg(PENDING_BALANCE) as avg_balance
from {dst_dbname}.dim_account
group by product
order by product;"""

df_test2 = get_dataframe(user_id, pwd, host_name, src_dbname, query)

In [31]:
df_test2.head(5)

Unnamed: 0,product,avg_balance
0,business line of credit,4672.775
1,certificate of deposit,4875.0
2,checking account,7366.804
3,money market account,5848.38
4,savings account,463.94
