# SQL Package

Provides simple functionality to interact with a PostgreSQL server using Python classes.

**Overview of functionality:**
* Database(self, user, password, host, dbname, port)
    * properties
        * user
        * password
        * host
        * dbname
        * port
    * methods
        * create(name) x
        * connect()
        * drop(name)
* Table(self, dbname, table, schema)
    * accepts db properties
    * properties
        * connect() --> inherited
        * fetch_data(sql, con, parse_dates)
        * get_names()
        * format_names(char_dict)
        * update_names(names_dict)
        * add_columns(columns_list, type=None)
        * compare_column_order(dataframe)
        * match_columns(dataframe)
        * save_csv(data, local_path, match_column_order=True)
        * update_values(local_path, container_path)
        * update_types(types_dict)
        * close()

## Setup

In [101]:
import os
import sys
from pathlib import Path
#sys.path[0] = str(Path(__file__).resolve().parents[2]) # Set path for custom modules

# Set path for modules
sys.path[0] = '../'

from dotenv import load_dotenv, find_dotenv
import numpy as np
import pandas as pd

# SQL libraries
import psycopg2

# Set notebook display options
pd.set_option('display.max_rows', 2000)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)

In [102]:
#sys.modules

In [325]:
class Database():
    
    # if modulename not in sys.modules: print...
    load_dotenv(find_dotenv());
    
    def __init__(self, user=None, password=None,
                 dbname=None, host=None, port=None):
        
        # Loaded from .env if not explicit
        self.user = user if user is not None else os.getenv("POSTGRES_USER")
        self.password = password if password is not None else os.getenv("POSTGRES_PASSWORD")
        self.dbname = dbname if dbname is not None else os.getenv("POSTGRES_DB")
        self.host = host if host is not None else os.getenv("DB_HOST")
        self.port = port if port is not None else os.getenv("DB_PORT")
        
        
        # Root directory
        self._root_dir = os.path.dirname(os.getcwd())
        #sys.path[0] = str(Path(__file__).resolve().parents[2])
        
    def _connect(self):

        """
        Connects to PostgreSQL database using psycopg2 driver. Same
        arguments as psycopg2.connect().

        Params
        --------
        dbname
        user
        password
        host
        port
        connect_timeout
        """

        try:
            con = psycopg2.connect(dbname=self.dbname,
                                   user=self.user,
                                   password=self.password,
                                    host=self.host, 
                                    port=self.port,
                                  connect_timeout=3)            

        except Exception as e:
            print('Error:\n', e)
            return None


        return con        

In [323]:
db = Database()

Connected as user "postgres" to database "permits" on http://localhost:5432.


In [413]:
class Table(Database):
    def __init__(self, user=None, password=None, dbname=None, host=None, port=None, table=None):
        super().__init__(user, password, dbname, host, port)
        
        self.table = table
        
        # Loaded from .env if not explicit
        self.user = user if user is not None else os.getenv("POSTGRES_USER")
        self.password = password if password is not None else os.getenv("POSTGRES_PASSWORD")
        self.dbname = dbname if dbname is not None else os.getenv("POSTGRES_DB")
        self.host = host if host is not None else os.getenv("DB_HOST")
        self.port = port if port is not None else os.getenv("DB_PORT")
    
    # Connect to database
    def _connect(self):
        return super(Table, self)._connect()
    
    # Fetch data from sql query
    def fetch_data(self, sql, coerce_float=False, parse_dates=None):
        
        con = self._connect()
        
        # Fetch fresh data
        data = pd.read_sql_query(sql=sql, con=con, coerce_float=coerce_float, parse_dates=parse_dates)

        # Replace None with np.nan
        data.fillna(np.nan, inplace=True)
        
        # Close db connection
        con.close()

        return data
    
    # Get names of column
    def get_names(self):
        
        con = self._connect()
        
        # Specific query to retrieve table names
        sql = "SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = N'{}'".format(self.table)
        etl = pd.read_sql_query(sql, con)
        columns = etl['column_name']
        
        con.close()
    
        return columns

    # Standardize column names using dictionary of character replacements
    def reformat_names(self, replace_map):
        
        series = self.get_names()
        
        def replace_chars(text):
            for oldchar, newchar in replace_map.items():
                text = text.replace(oldchar, newchar).lower()
            return text
        
        return series.apply(replace_chars)
    
    # Update column names in db table
    def update_names(self, replace_map):
        
        old_columns = self.get_names()
        
        new_columns = self.reformat_names(replace_map)
        
        con = self._connect()
    
        sql = 'ALTER TABLE {} '.format(self.table) + 'RENAME "{old_name}" to {new_name};'

        sql_query = []

        for idx, name in old_columns.iteritems():
            sql_query.append(sql.format(old_name=name, new_name=new_columns[idx]))

        sql_query = '\n'.join(sql_query)

        try:
            cur = con.cursor()
            print("Reading...")
            print('Executing update query on table "{}"...'.format(self.table))
            cur.execute(sql_query)
            con.commit()
            cur.close()
            print("Table is updated.")
        except Exception as e:
            con.rollback()
            print('Error:\n', e)
        finally:
            if con is not None:
                con.close()
                
    # Add new columns to database
    def add_columns(self, data):
        
        con = self._connect()

        # Get names of current columns in PostgreSQL table
        current_names = self.get_names()

        # Get names of updated table not in current table
        updated_names = data.columns.tolist()
        new_names = list(set(updated_names) - set(current_names))

        # Check names list is not empty
        if not new_names:
            print("Table is already up to date.")
            return

        # Format strings for query
        alter_table_sql = "ALTER TABLE {db_table}\n"
        add_column_sql = "\tADD COLUMN {column} TEXT,\n"

        # Create a list and append ADD column statements
        sql_query = [alter_table_sql.format(db_table=self.table)]
        for name in new_names:
            sql_query.append(add_column_sql.format(column=name))

        # Join into one string
        sql_query = ''.join(sql_query)[:-2] + ";"

        if run:
            ### ADD TRY/EXCEPT TO RUN QUERY AGAINST DB
            try:
                print("Connecting...")
                cur = con.cursor()
                print("Executing query...")
                cur.execute(sql_query)
                print("Committing changes...")
                con.commit()
                cur.close()
                print("Database updated successfully:\nAdd columns {}".format(', '.join(new_names)))
            except Exception as e:
                conn.rollback()
                print('Error:\n', e)
            finally:
                if con is not None:
                    con.close()

    # Compare order of columns in dataframe against order of columns in database                
    def compare_column_order(self, data, match_db_order=False):
        
        con = self._connect()

        db_columns = self.get_names().tolist()
        data_columns = data.columns.tolist()

        if match_db_order:
            columns_reordered = self.get_names().tolist()
            data = data[columns_reordered]
            return data

        if set(db_columns) == set(data_columns):
            str1 = 'Dataframe columns match table "{}" '.format(self.table)
            if db_columns == data_columns:
                print(str1 + 'and are in the same order.'.format(self.table))
                if not match_db_order:
                    return True
                else:
                    return data
            else:
                print('but are not in the same order.'.format(self.table))
                return False

        elif len(set(db_columns)) > len(set(data_columns)):
            print('Current dataframe has less columns than table "{}":\n'.format(self.table), 
                                            list(set(db_columns) - set(data_columns)))
            return False
        else:
            print('Current dataframe has more columns than table "{}":\n'.format(self.table), 
                                            list(set(data_columns) - set(db_columns)))
            return False

In [416]:
permits = Table(table="permits_raw")
data = permits.fetch_data(sql="SELECT * FROM permits_raw LIMIT 10;")

In [408]:
permits.get_names()[:3]

0      assessor_book
1      assessor_page
2    assessor_parcel
Name: column_name, dtype: object

In [409]:
# Map of character replacements
replace_map = {' ': '_', '-': '_', '#': 'No', '/': '_', 
               '.': '', '(': '', ')': '', "'": ''}

permits.reformat_names(replace_map)[:3]

0      assessor_book
1      assessor_page
2    assessor_parcel
Name: column_name, dtype: object

In [422]:
permits.update_names(replace_map)

Reading...
Executing update query on table "permits_raw"...
Error:
 column "assessor_book" of relation "permits_raw" already exists



In [423]:
permits.add_columns(data)

Table is already up to date.


In [424]:
permits.compare_column_order(data)

Dataframe columns match table "permits_raw" and are in the same order.


True