In [None]:
%config IPCompleter.greedy=True

import yaml
import psycopg2
import os
import os.path
import sys

# Set the path
home_dir = os.getcwd()
credential_dir = os.path.join('../../config')

def create_pgconn(credentials_yaml):
    with open(credentials_yaml) as f:
        configs = yaml.load(f)
    try: 
        conn = psycopg2.connect("dbname='{}' user='{}' host='{}' password='{}'".format(
            configs['DB_name'],
            configs['user'],
            configs['host'],
            configs['password']))
    except Exception as e: 
        print("Error connecting to db.")
        raise e
    conn.set_client_encoding('latin_1')
    cur = conn.cursor()
    cur.execute("SET ROLE " + configs['role'])
    return conn

In [None]:
# Setting up a connection to the server

credentials_yaml = os.path.join(credential_dir, 'db_creds.yml') #example file on hitchikers repo
conn = create_pgconn(credentials_yaml)

def sql(query, conn=conn):
    return pd.read_sql(query, conn)

In [None]:
# To rename columns, you need to find out the unique column names in the original files
cur = conn.cursor()
cur.execute("""select distinct(orginal_col_name) from raw.column_mapping where table_name ~ 'raw."8.*';""")
[print(x[0][1:-1]) for x in cur.fetchall()];

In [None]:
# Using the manually created 6_column_mapping.csv, a dictionary of old column name to new column name can be created
with open('../../../../garfield/8_column_mapping.csv', 'r') as fil:
    col_map = fil.read()
col_names=dict()
col_types=dict()
union = []
for pair in col_map.split('\n'):
    split_pair = pair.split(',')
    if split_pair[1] != '':
        col_names[split_pair[0]] = split_pair[1]
        if split_pair[2] != '':
            col_types[split_pair[1]] = split_pair[2]
        union += [split_pair[1]]
union=set(union).difference(set(['']))

In [None]:
# This creates a list of tables that we can iterate over to rename columns and join
cur = conn.cursor()
cur.execute("""select distinct(table_name) from raw.column_mapping where table_name ~ 'raw."8.*';""")
table_list = [x[0] for x in cur.fetchall()]

In [None]:
cur = conn.cursor()
for table in table_list:
    cur.execute("""drop table if exists %s""" %table)
conn.commit()

# Preprocessing

## Each file renamed columns

In [None]:
cur = conn.cursor()
cmnd = ' varchar, '.join(union) + ' varchar'  # This creates a list of column names that will be used in the joined table
cur.execute("""drop table if exists preproc."8_joined";""")
cur.execute("""create table if not exists preproc."8_joined" (%s);""" %cmnd)
conn.commit()

In [None]:
# Iterates over all tables in raw (with columns a0, a1, a2...), inserting into a joined preprocessing table (with columns nie, dpto_code_ce, year...)
cur = conn.cursor()
for table in table_list:
    print(table)
#     new_table = table.replace('raw', 'preproc')
    cur.execute("""select mapped_col_name, orginal_col_name from raw.column_mapping where table_name='%s' order by mapped_col_name;""" %table)
    col_pairs = cur.fetchall()
    raw_cols = []
    renamed_cols = []
    for col_pair in col_pairs:
        new_name=col_pair[1].replace('"', '')
        if new_name in col_names:
            if col_names[new_name] != '':
                raw_cols += [col_pair[0]]
                renamed_cols += [col_names[new_name]]
    cmnd = ' varchar, '.join(renamed_cols) + ' varchar'
    if len(renamed_cols) > 0:
        cur.execute("""insert into preproc."8_joined" (%s) select %s from %s;""" %(','.join(renamed_cols), ','.join(raw_cols), table))
conn.commit()

In [None]:
# Changing the column types and removing empty strings
cur = conn.cursor()
for col in union:
    print(col)
    if col in col_types:
        cur.execute("""ALTER TABLE preproc."8_joined" ALTER COLUMN %s TYPE %s using NULLIF(%s, '')::%s;""" %(col, col_types[col], col, col_types[col]))
    else:
        cur.execute("""ALTER TABLE preproc."8_joined" ALTER COLUMN %s TYPE varchar using NULLIF(%s, '')::varchar;""" %(col, col))
conn.commit()