In [93]:
import sys
import json
import pyodbc
import pandas as pd
from pandas import DataFrame

class ODBC():

    def __init__(self, config):

        self.config = self.rd_config(config)
        self.c = self.conn()

    @staticmethod
    def rd_config(config):
        '''Reads in configuration file. Pattern Reference: Jwmazzi/usgpo'''
        try:
            return config if isinstance(config, dict) else json.load(open(config))
        except ValueError as val_err:
            print(f'Configuration Input "{config}" is Not Valid: {val_err}')
            sys.exit(1)
    
    def conn(self):
        '''Establishes ODBC connection via DSN or credentials.'''
        res = None
        if self.config['dsn']:
            res = pyodbc.connect('DSN=' + self.config['dsn'], autocommit = True)
        else:
            beg_str = 'Driver={};SERVER={};UID={};PWD={}'
            conn_str = beg_str.format(self.config['driver'], self.config['server'], self.config['uid'], self.config['p'])
            res = pyodbc.connect(conn_str, autocommit = True)
        return res
    
    def py_sql(self, sql):
        '''Returns dataframe from supplied sql.'''
        sql_obj = self.c.execute(sql)
        #Obtaining field names of table.
        fields = [column[0] for column in sql_obj.description]
        #Creating dataframe. Need to convert pyodbc rows to lists.
        row_lists = [list(r) for r in sql_obj]
        df = DataFrame(row_lists, columns = fields)
        return df
    
    def sql_dtypes(self, df):
        '''Maps common pandas/numpy dtypes to SQL dtypes from supplied dataframe.'''
        dtyp = {'int64': 'INT', 'object': 'VARCHAR', 'float64': 'FLOAT', 'datetime64[ns]': 'DATE', 'bool': 'BIT'}
        return [dtyp[str(df.iloc[:, i].dtype)] for i in range(len(df.columns))]
    
    def create_db(self, name):
        '''Creates database with supplied name.'''
        self.c.execute(f'CREATE DATABASE {name}')
        
class DB(ODBC):
    
    def __init__(self, config, db):
        super().__init__(config)
        #self.c from superclass exists...
        self.db = db

    def create_schema(self, name):
        '''Creates schema in designated database.'''
        self.c.execute(f'USE {self.db}')
        self.c.execute(f'CREATE SCHEMA {name}')
    
    def info_schema(self):
        '''Returns information schema from database.'''
        return self.py_sql(f'SELECT * FROM {self.db}.INFORMATION_SCHEMA.COLUMNS')
        
class Schema(DB):
    
    def __init__(self, config, db, schema):
        super().__init__(config, db)
        self.schema = schema
    
    def create_table(self, tab, df, pk = False):
        '''Creates table in database based on supplied table name and dataframe. Option to add primary key.'''
        sql = f'CREATE TABLE {".".join([self.db, self.schema, tab])} ('
        for i, v in enumerate(self.sql_dtypes(df)):
            beg_sql = f'{sql}[{str(df.columns[i])}] {v}'
            if v == 'VARCHAR':
                beg_sql = beg_sql + ' (255)'
            if pk:
                sql = beg_sql + ' PRIMARY KEY, '
            else:
                sql = beg_sql + ', '
        #Removing last comma and space.
        sql = sql[:-2] + ')'
        self.c.execute(sql)

class Table(Schema):
        
    def __init__(self, config, db, schema, table):
        super().__init__(config, db, schema)
        self.table = table
        self.df = None
        
    def adj_bool(self):
        '''Amends True/False python boolean to SQL 1/0 convention.'''
        adjustment = {True: 1, False: 0}
        for c in self.df.columns:
            if self.df.loc[:, c].dtype == 'bool':
                test_val = self.df.loc[:, c].head(1).values[0]
                if test_val in adjustment:
                    self.df.loc[:, c] = self.df.loc[:, c].map(adjustment)

    def dt_to_date(self):
        '''Amends potential date time values to string date values.'''
        for i, col in enumerate(self.df.columns):
            if self.sql_dtypes(self.df)[i] == 'DATE':
                self.df.loc[:, col] = self.df.loc[:, col].dt.strftime('%Y-%m-%d')
    
    def ins_new_vals(self, df, to_dt = True):
        self.df = df
        df_cols = [f'[{column}]' for column in self.df.columns]
        cols = ', '.join(df_cols)
        sql = f'INSERT INTO {".".join([self.db, self.schema, self.table])} ({cols}) VALUES'
                                                                      
        #Adjusting boolean values to 1 or 0 if needed.
        self.adj_bool()
        
        #Changing datetime stamps to string dates.
        if to_dt:
            self.dt_to_date()
                                                                      
        dtyp_func = {'INT': int, 'FLOAT': float}
        #May need to reset index to support this working longer-term. .iloc[r, :]....
        for r in range(len(self.df)):
            vals = list(self.df.iloc[r, :])
            #Changing dtypes to dtypes understood by SQL.
            for i, v in enumerate(vals):
                typ = self.sql_dtypes(self.df)[i]
                if typ in dtyp_func:
                    vals[i] = dtyp_func[typ](vals[i])
            #Creating tuple for execute parameters
            vals_tup = tuple(vals)
            #Approach enables passing of non-string values via parameterization.
            t = f"({', '.join(['?' for v in vals_tup])})"
            insert = sql + ' ' + t
            self.c.execute(insert, vals_tup)