In [1]:
import pandas as pd
from pathlib import Path
from sqlalchemy import create_engine
import yaml

In [2]:
def load_credentials():
    '''This function reads the credentials file, returning a dictionary'''
    with open("creds.yaml", "r") as file:
        credentials = yaml.safe_load(file)
        return credentials

credentials = load_credentials()

# Create a new directory to save results in.
query_results_dir = "tables_columns_auto"
Path(query_results_dir).mkdir(parents=True, exist_ok=True)


class DatabaseConnector:
    '''This class is used to connect to the database, extract the required data, and save the data in csv files.
        Attributes:
            credentials (dict): a dictionary of credentials required to connect to the database.
            tables_df (dataframe): a dataframe that is initially empty, and will be updated with data extracted from the RDS database.
            tables_list (list): a list that is initially empty, and will be updated with the names of the tables in tables_df.'''
    def __init__(self, credentials):
        self.credentials = credentials
        self.tables_df = pd.DataFrame()
        self.tables_list = []

    def get_tables(self, conn):
        '''This method reads data based on a query, given as argument to the read_sql function, via the connection established in the get_data method.
        The class attribute tables_df is then updated with the resulting dataframe. The dataframe is also saved to a csv file.
        Additionally, tables_df is converted to a list and used to update the class attribute tables_list'''
        tables_query = '''
            SELECT
                table_name
            FROM 
                INFORMATION_SCHEMA.TABLES
            WHERE 
                table_schema = 'public'
            AND
                table_type = 'BASE TABLE'
            ORDER BY
                table_name ASC    
            '''
        
        table_names = pd.read_sql(tables_query, conn)
        self.tables_df = table_names

        # Save DataFrame to CSV file in a subdirectory
        file_name = Path(query_results_dir) / "tables_list.csv"
        table_names.to_csv(file_name, index=False)

        # Create a list of table names from tables_df, based on the column 'table_name'
        tables_list = self.tables_df['table_name'].tolist()
        self.tables_list = tables_list

    def get_columns(self, tables_list, conn):
        '''This method iterates through tables_list. 
        For each item the method reads data based on a query given as an argument to the read_sql function, 
        via the connection established in the get_data method, and assigns the result to the columns_df variable. 
        The resulting dataframe is then saved as a csv file'''
        for table in tables_list:
            columns_query = f'''
                SELECT
                    column_name
                FROM 
                    INFORMATION_SCHEMA.COLUMNS
                WHERE 
                    table_schema = 'public'
                AND
                    table_name = '{table}'
                '''
            
            columns_df = pd.read_sql(columns_query, conn)

            # Save DataFrame to CSV file in a subdirectory
            file_name = Path(query_results_dir) / f"{table}_columns.csv"
            columns_df.to_csv(file_name, index=False)

    def get_data(self):
        '''This method creates an engine to establish a connection to the database. 
        The get_tables and get_columns methods then run as sub-methods.''' 
        DATABASE_TYPE = self.credentials["DATABASE_TYPE"]
        DBAPI = self.credentials["DBAPI"]
        HOST = self.credentials["HOST"]
        USER = self.credentials["USER"]
        PASSWORD = self.credentials["PASSWORD"]
        DATABASE = self.credentials["DATABASE"]
        PORT = self.credentials["PORT"]
        
        engine = create_engine(f"{DATABASE_TYPE}+{DBAPI}://{USER}:{PASSWORD}@{HOST}:{PORT}/{DATABASE}")
        with engine.execution_options(isolation_level='AUTOCOMMIT').connect() as conn:
            self.get_tables(conn)
        
        with engine.execution_options(isolation_level='AUTOCOMMIT').connect() as conn:
            self.get_columns(self.tables_list, conn)



connector = DatabaseConnector(credentials)
connector.get_data()


