This script is used to create the schema and tables, and upload staged data to database.

In [160]:
import pandas as pd
import numpy as np
from configparser import ConfigParser
import psycopg2
from psycopg2.extensions import register_adapter, AsIs

from constants import *

In [161]:
def db_config(filename='database.ini', section='postgresql'):
    '''
        This function reads the database config file.
    '''
    # create a parser
    parser = ConfigParser()
    # read config file
    parser.read(filename)

    # get section, default to postgresql
    db = {}
    if parser.has_section(section):
        params = parser.items(section)
        for param in params:
            db[param[0]] = param[1]
    else:
        raise Exception('Section {0} not found in the {1} file'.format(section, filename))
    return db

def db_connect():
    '''
        Connect to the PostgreSQL database server.
    '''
    try:
        # read connection parameters
        db_params = db_config()

        # connect to the PostgreSQL server
        print('Connecting to the PostgreSQL database...')
        db_conn = psycopg2.connect(**db_params)

        # create a cursor
        db_cur = db_conn.cursor()

        # return database instances
        return db_conn, db_cur
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)


In [162]:
# try to establish the connection
db_conn, db_cur = db_connect()
# verify the connection
db_cur.execute('SELECT version()')
db_version = db_cur.fetchone()
print(db_version)

Connecting to the PostgreSQL database...
('PostgreSQL 11.11 on amd64-portbld-freebsd11.4, compiled by FreeBSD clang version 10.0.0 (git@github.com:llvm/llvm-project.git llvmorg-10.0.0-0-gd32170dbd5b), 64-bit',)


In [163]:
# create the tables by executing init.sql
db_cur.execute(open("init.sql", "r").read())
db_conn.commit()

In [164]:
def insert_table_from_csv(file_path, table_name, cursor, conn):
    data = pd.read_csv(file_path)
    if "Country Code" in data.columns:
        data.drop(["Country Code"], axis=1, inplace=True)
    if "Year" in data.columns:
        data.drop(["Year"], axis=1, inplace=True)
    print(len(data.columns))
    data = [tuple(x) for x in data.to_numpy()]
    print(data[0])
    template = ','.join(['%s'] * len(data))
    query = 'INSERT INTO ' + table_name +' VALUES {}'.format(template)
    cursor.execute(query, data)
    conn.commit()

In [165]:
# adapt to numpy types and NaN
register_adapter(np.int64, AsIs)
def nan_to_null(f, _NULL=psycopg2.extensions.AsIs('NULL'), _Float=psycopg2.extensions.Float):
    if not np.isnan(f):
        return _Float(f)
    return _NULL
register_adapter(np.float64, nan_to_null)

# upload the dimension tables
db_cur.execute(f"SET search_path = {SCHEMA_NAME};")
# insert_table_from_csv(COUNTRY_DIMENSION_CSV, COUNTRY_DIMENSION, db_cur, db_conn)
# insert_table_from_csv(YEAR_DIMENSION_CSV, YEAR_DIMENSION, db_cur, db_conn)
# insert_table_from_csv(EDUCATION_DIMENSION_CSV, EDUCATION_DIMENSION, db_cur, db_conn)
# insert_table_from_csv(EVENT_DIMENSION_CSV, EVENT_DIMENSION, db_cur, db_conn)
# insert_table_from_csv(HEALTH_DIMENSION_CSV, HEALTH_DIMENSION, db_cur, db_conn)
# insert_table_from_csv(LIFE_QUALITY_DIMENSION_CSV, LIFE_QUALITY_DIMENSION, db_cur, db_conn)

10
(0, 'Parliamentary', True, 3.0, 'Left', 'LPC', 151.0, False, False, 308.0)


In [166]:
# close the cursor
if db_cur is not None:
    db_cur.close()

# close the connection
if db_conn is not None:
    db_conn.close()