<img src="https://www.th-koeln.de/img/logo.svg" style="float: right;" width="200">
<img src="https://www.femoz.de/assets/templates/femoz/images/logo-head-02.png" style="float: left;" width="200">
<p style="text-align:center;"><br><br>Project: FEMOZ<br>
Author of notebook: Florian Schmitt<br>
Date: 18.05.2022</p>

# Fix potential naming issues in a file and push it to the results db

In [52]:
# Load libraries
import pandas as pd
from sqlalchemy import MetaData, create_engine
import psycopg2
import numpy as np

In [53]:
# connect to database
def get_connection():
    try:
        return psycopg2.connect(
            database ='raw_data_db',
            user = 'fschmitt',
            password = 'jksHI93!)sewOl',
            host = '139.6.160.28',
            port = 5432,
        )
    except:
        return False
  
conn = get_connection()
  
if conn:
    # report status
    print('Connection to the PostgreSQL established successfully.')
else:
    # report status
    print('Connection to the PostgreSQL encountered and error.')

Connection to the PostgreSQL established successfully.


In [54]:
# Get table from raw_data_db
def get_data(table):
    try:
        # open connection
        conn = get_connection()

        # create a cursor using the connection object
        curr = conn.cursor()

        # execute the sql query
        curr.execute('SELECT * FROM ' + table + ';')

        # fetch colukmn headers
        column_names = [i[0] for i in curr.description]

        # fetch all the rows from the cursor
        data = curr.fetchall()

        # compile into dataframe
        df = pd.DataFrame.from_records(data, columns = column_names)

        # close connection
        conn.close()
    
        # report status
        print(f'\nTable "{table}" successfully loaded.')

        # return result
        return df
    except:
        # report status
        print('\nAn error ocurred. Check table name.')


# define table to be loaded
table = input('Please specify which table shall be evaluated for fixing admin names: ')

# execute get function
data = get_data(table)


Table "admin_2_population" successfully loaded.


In [55]:
# print header from table for the user to pick the column containing the admin level names
print('Use this excerpt to find the correct column name.\n')
print(data.head())

Use this excerpt to find the correct column name.

   index  admin_2    status population_1997  population_2007  population_2017
0      0  Ancuabe  District           87243           107238           159340
1      1   Balama  District           98653           124100           175733
2      2   Chiúre  District          185618           217487           299235
3      3      Ibo  District            7061             9344            12205
4      4  Macomia  District           69973            79825           114345


In [56]:
# Fix admin names
def fix_names():
    # specify admin level to be fixed
    global level
    level = input('Specify the admin level to be fixed (1: regions; 2: dsitricts; 3: localities): ')
    
    # specify column containing admin layer names
    global column
    column = input('Specify which column contains the admin layer names: ')
    
    # set data variable to global to save changes
    global data

    # try to convert input into integer and report status
    try:
        # convert input into integer
        level = int(level)
        
        # verify the specified admin level exists
        if level in [1, 2, 3]:
            # store status level verification
            status_level = True
        else:
            # throw exception
            print('\nPlease enter a valid admin layer level (1: regions; 2: dsitricts; 3: localities)')
            
            # store status level verification
            status_level = False
    except:
        # throw exception if input cannot be converted to integer
        print('\nPlease enter a valid admin layer level (1: regions; 2: dsitricts; 3: localities)')
       
    # verify the column input exists in dataframe
    try:
        # call column
        data[column]
        
        # store column level verfication
        status_column = True
    except:
        # throw exception
        print(f'\nPlease enter a valid column name. Your table has the following options:')
        #print the available columns
        for column in data.columns:
            print(column)
            
        # store column level verfication
        status_column = False
        
    # start fixing admin layer names
    if status_column == True & status_level == True:
        try:        
            # load lookup file
            fix = get_data('admin_names_lookup')

            # filter lookup list for specified admin layer
            fix = fix[fix['admin_level'] == level]

            # join with db dataframe
            temp = pd.merge(data, fix, left_on = column, right_on = 'incorrect_name', how = 'left')

            # overwrite column with correct names
            temp[column] = np.where(temp['correct_name'].isnull(), temp[column], temp['correct_name'])

            # drop irrelevant columns
            temp.drop(['admin_level', 'incorrect_name', 'correct_name'], axis = 1, inplace = True)

            # store result
            data = temp

            # report status
            print('\nNames succesfully fixed.')
        except:
            # report status
            print('\nAn error occured.')
    else:
        pass

# execute fix function
fix_names()


Table "admin_names_lookup" successfully loaded.

Names succesfully fixed.


In [57]:
# verify admin names against masterdata and report inconsistencies
def match_names():
    # get master data
    master = get_data("admin_" + str(level))

    # join fixed df with master dataframe
    temp = pd.merge(data, master, left_on = column, right_on = 'admin_' + str(level), how = 'left')

    # filter for columns with failed match
    temp = temp[temp['index'].isnull()]
    print(temp)
    global error_length
    error_length = len(temp)

    if error_length == 0:
        # report status
        print('No unmatched admin names. File can be loaded into results_db.')
    else:
        # report status
        print('\nThe following names could not be matched to the master file. The dataframe has been copied to you clipboard.\n')
        temp.to_clipboard(sep=',', index=False)
        print(temp[column].unique())
        print('\nPlease match those values manually by creating a new record in "sciebo/femoz_iws/data_lake/IWS/FLORIAN/Name Lookup List.csv" and re-run this notebook.')
        
# execute match function
match_names()


Table "admin_2" successfully loaded.
Empty DataFrame
Columns: [index_x, admin_2, status, population_1997, population_2007, population_2017, index_y, index, admin_id_1, admin_1, admin_id_2]
Index: []
No unmatched admin names. File can be loaded into results_db.


In [61]:
# Store data in results db

# specify storage method
method = input('Specify the type of request to be used (create, update, append): ')

# specify database
db_name = input('Specifiy database the table should be loaded to: ')

def push_data(data, method, db_name):
    try:
        # create a conncetion to the PostgreSQL database
        conncection = create_engine('postgresql://fschmitt:jksHI93!)sewOl@139.6.160.28:5432/' + db_name)

        if method == 'create':
            # take the dataframe and write it to the specified table
            data.to_sql(table, conncection)
            print('\nData table created successfully.')
        elif method in ['append', 'update']:
            # Take the dataframe and write it to a table you specify
            # switch off that an index is generated and written to the table
            data.to_sql(table, conncection, index= False, if_exists = method)
            # report status
            print('\nData pushed to DB successfully.')
        else:
            # throw exception
            print('\nMethod does not exist.')
    except:
        # report status
        print(f'\nError while pushing the data to {db_name}')

def store_into_db(data, method, db_name):
    if error_length == 0:
        push_data(data, method, db_name)
    else:
        # ask user wheter data should be pushed regardless of naming issues
        push = input('\nThere are still unmatched names. Please confirm the push request by entering "Push": ')

        # evaluate user input
        if push == 'Push':
            push_data()
        else:
            # report status
            print('\nPush request aborted by user.')
            
store_into_db(data, method, db_name)


Data table created successfully.
