In [3]:
########## ETL pipeline
# ETL tools
import psycopg2
import pygrametl
from pygrametl.datasources import SQLSource, CSVSource, PandasSource
from pygrametl.tables import Dimension, FactTable, CachedDimension, BulkFactTable

# stats tools
import pandas as pd
import pandas_profiling

# Ingest CSV input data file and modify data types
filename = '../../../Applications/Assessment/loans.csv'
names = ['End of Period','loannumber','region','countrycode','country','Borrower','Guarantor Country Code','Guarantor','Loan Type','Loan Status','Interest Rate','Currency of Commitment','projectidsrc','projectname' ,'orig_principal_amt','cancelled_amt','undisbursed_amt','disbursed_amt','repaid_to_ibrd','Due to IBRD','Exchange Adjustment','Borrower\'s Obligation','Sold 3rd Party','Repaid 3rd Party','Due 3rd Party','Loans Held','First Repayment Date','Last Repayment Date','Agreement Signing Date','Board Approval Date','Effective Date','Closed Date','Last Disbursement Date']
data = pd.read_csv(filename, names=names, skiprows=1)
data['projectidsrc'] = data['projectidsrc'].astype(str)
data['countrycode'] = data['countrycode'].astype(str).str.upper()
# df['End of Period'] = pd.to_numeric(df['End of Period'])
# df['Due 3rd Party'] = pd.to_datetime(df['Due 3rd Party']) 

In [None]:
##########Explore data --Commented out for brevity
#peek = data.head(20)
#print(peek)
#print(data.shape)
#print(peek.shape)
#types = data.dtypes
#print(types)
#types = data.dtypes
#print(types)
#pd.set_option( 'display.width' , 100)
#pd.set_option( 'precision' , 3)
#description = data.describe()
#print(description)
#class_counts = data.groupby( 'projectname' ).size()
#print(class_counts)
#data.isnull().sum()
#print(data.head(3))
#data.columns

In [4]:
##########Stats
# profile the data - Statistics
profile = pandas_profiling.ProfileReport(data,minimal=True)
profile.to_file("../loan_report.html")

HBox(children=(FloatProgress(value=0.0, description='Summarize dataset', max=42.0, style=ProgressStyle(descrip…




HBox(children=(FloatProgress(value=0.0, description='Generate report structure', max=1.0, style=ProgressStyle(…




HBox(children=(FloatProgress(value=0.0, description='Render HTML', max=1.0, style=ProgressStyle(description_wi…




HBox(children=(FloatProgress(value=0.0, description='Export report to file', max=1.0, style=ProgressStyle(desc…




In [5]:
##########Database
# datawarehouse connection
data.drop(columns=['Borrower','Guarantor Country Code','Guarantor','Loan Type','Loan Status','Interest Rate','Currency of Commitment','Due to IBRD','Exchange Adjustment','Borrower\'s Obligation','Sold 3rd Party','Repaid 3rd Party','Due 3rd Party','Loans Held','First Repayment Date','Last Repayment Date','Agreement Signing Date','Board Approval Date','Effective Date','Closed Date','Last Disbursement Date'])
dataPS = PandasSource(data)
dw_string = "host='localhost' dbname='dw1' user='dw' password='dwhouse'"
dw_pgconn = psycopg2.connect(dw_string)
dw_conn_wrapper = pygrametl.ConnectionWrapper(connection=dw_pgconn)


In [6]:
##########Utility Methods
# bulk insert method
def bulkloader(name, attributes, fieldsep, rowsep, nullval, filehandle):
    cursor = dw_conn_wrapper.cursor()
    cursor.copy_from(file=filehandle, table=name, sep=fieldsep,null="nullval",
                     columns=attributes)
    
# split the timestamp into its parts
def split_timestamp(row):
    timestamp = row['End of Period']
    timestamp = timestamp[:10]
    timestamp_split = timestamp.split('-')
    row['year'] = timestamp_split[0]
    row['month'] = timestamp_split[1]
    row['day'] = timestamp_split[2]


In [7]:
########## Dimension and Fact tables
# Dimension and Fact objects
loan_dimension = CachedDimension(
    name='loandim',
    key='loanid',
    attributes=['loannumber'],
    lookupatts=['loannumber'],
    prefill=True)

eop_dimension = CachedDimension(
    name='eopdim',
    key='eopid',
    attributes=['day', 'month', 'year'],
    lookupatts=['day','month','year'],
    prefill=True)

country_dimension = CachedDimension(
    name='countrydim',
    key='countryid',
    attributes=['countrycode', 'country'],
    lookupatts=['country'],
    prefill=True)

region_dimension = CachedDimension(
    name='regiondim',
    key='regionid',
    attributes=['region'],
    lookupatts=['region'],
    prefill=True)

project_dimension = CachedDimension(
    name='projectdim',
    key='projectid',
    attributes=['projectidsrc', 'projectname'],
    lookupatts=['projectidsrc'],
    prefill=True)

fact_table = BulkFactTable(
    name='loanfact',
    keyrefs=['loanid', 'eopid', 'countryid','regionid','projectid'],
    measures=['orig_principal_amt','cancelled_amt','undisbursed_amt','disbursed_amt','repaid_to_ibrd'],
    bulkloader=bulkloader,
    bulksize=1000000)


In [8]:
########## Load Data
for row in dataPS:
    split_timestamp(row)
    row['loanid'] = loan_dimension.ensure(row)
    row['eopid'] = eop_dimension.ensure(row)
    row['countryid'] = country_dimension.ensure(row, namemapping={'countrycode':'countrycode'})
    row['regionid'] = region_dimension.ensure(row)
    row['projectid'] = project_dimension.ensure(row)
    #insert fact
    fact_table.insert(row)
    
dw_conn_wrapper.commit()
dw_conn_wrapper.close()