In [4]:
'''Data pipeline to pull data from in-house DB and load data into json file.'''
import os
from dotenv import load_dotenv # type: ignore
import psycopg2 as psql # type: ignore
import warnings
import pandas as pd # type: ignore
import json

class EnvSecrets:
    def __init__(self) -> None:
        '''Fetch all sensitive data and load into variables for later use.'''
        load_dotenv()
        self.db_name = os.getenv('DATABASE_NAME')
        self.db_host = os.getenv('DATABASE_HOST')
        self.db_port = int(os.getenv('DATABASE_PORT'))
        self.sql_user = os.getenv('SQL_USERNAME')
        self.sql_pass = os.getenv('SQL_PASSWORD')
        self.schema = os.getenv('SQL_SCHEMA')

class DatabaseConnection:
    def __init__(self, secret:EnvSecrets) -> None:
        '''Open database connection.'''
        try:
            self.connection = psql.connect(
                database = secret.db_name
                , host = secret.db_host
                , port = secret.db_port
                , user = secret.sql_user
                , password = secret.sql_pass
            )
            self.cursor = self.connection.cursor()
            self.valid = True
        except:
            '''Possible expansion for email sending, notifying database failure or invalid credentials.'''
            self.valid = False
    def close(self) -> None:
        '''Close database connection.'''
        try:
            self.connection.close()
        except:
            pass

class DataPipeline:
    def __init__(self, secret) -> None:
        self.new_data = pd.DataFrame()
        self.secret:EnvSecrets = secret
        self.dict_format:dict = None ## forward declaration
    def extract(self, database:DatabaseConnection, tablename) -> None:
        database.cursor.execute(f'SELECT * FROM {self.secret.schema}.{tablename}')
        data = database.cursor.fetchall()
        cols = [desc[0] for desc in database.cursor.description]
        self.new_data = pd.DataFrame(data, columns=cols)
    def transform(self) -> None:
        for col in self.new_data.columns:
            if 'date' in  col:
                self.new_data[col] = self.new_data[col].astype(str)
        self.dict_format = self.new_data.to_dict(orient='records')
    def load(self, file_path:str) -> None:
        with open(file_path, 'w', encoding='utf-8') as json_file:
            json.dump(self.dict_format, json_file, ensure_ascii=False, indent=4)


def main() -> None:
    warnings.filterwarnings('ignore', message='.*pandas only supports SQLAlchemy connectable.*')
    json_filepath = '../data/processed.json'
    tablename = 'hackathon_july24_test'
    sensitive_data = EnvSecrets()
    df_database = DatabaseConnection(sensitive_data)
    database_seeding = DataPipeline(sensitive_data)
    try:
        database_seeding.extract(df_database, tablename)
    except Exception as e:
        print(e)
    finally:
        df_database.close()
    database_seeding.transform()
    database_seeding.load(json_filepath)

if __name__ == '__main__':
    main()
