In [352]:
import sqlalchemy
from sqlalchemy import create_engine, text
import yaml
import logging
import pandas as pd
import numpy as np

In [392]:
with open("docker-compose.yml", "r") as file:
    yml = yaml.safe_load(file)

class DBCred():
    def __init__(self, service) -> None:
        try:
            db =  yml["services"][service]
            env = db["environment"]

            if service == "pg_db":
                self.db_prefix = "POSTGRES"
                self.eng_driver = "postgresql+psycopg2"
                self.db_name = env[f"{self.db_prefix}_DB"]
                self.user=env[f"{self.db_prefix}_USER"]
                self.passwd=env[f"{self.db_prefix}_PASSWORD"]
                self.port=yml["services"][service]["ports"][0].split(":")[0]
                self.host="localhost"
            
            if service == "mysql_db":
                self.db_prefix = "MYSQL"
                self.eng_driver = "mysql+pymysql"
                self.db_name = env[f"{self.db_prefix}_DATABASE"]
                self.user=env[f"{self.db_prefix}_USER"]
                self.passwd=env[f"{self.db_prefix}_PASSWORD"]
                self.port=yml["services"][service]["ports"][0].split(":")[0]
                self.host="localhost"
                
            self.conn_str = f"{self.eng_driver}://{self.user}:{self.passwd}@{self.host}:{self.port}/{self.db_name}"

        except Exception as e:
            logging.error(f"Invalid docker-compose.yml: {e}")

In [393]:
source_db_cred = DBCred("pg_db")
dest_db_cred = DBCred("mysql_db")

In [394]:
# class Database():
#     def __init__(self, db_cred):
#         self.prefix = db_cred.db_prefix
#         self.name = db_cred.db_name
#         self.conn_str = db_cred.conn_str
#         self.conn = self._connect()
#         self.cur = self.conn.cursor()

#     def _connect(self):
#         try:
#             if self.prefix == "POSTGRES":
#                 return psycopg2.connect(self.credentials_dsn)
#             elif self.prefix == "MYSQL":
#                 return pymysql.connect(host='localhost', user='user', password='userpass', port=3306, db=f"{self.name}")
#         except Exception as e:
#             logging.error(f"Error connecting to database: {e}")

#     def get_table_names(self, sql):
#         """
#             Retrieve name of tables from source DB
#             return 'list' : 'table_names'
#         """
#         try:
#             self.cur.execute(sql)
#             results = self.cur.fetchall()
#             table_names = [table_name[0] for table_name in results]
#             logging.info(f"Got {len(table_names)} tables: {table_names} \n")
#             return table_names
#         except Exception as e:
#             logging.error(f"Error retrieving table names, check database status or SQL syntax: \n{e}")

#     def extract_db(self, table_names, user_date):
#         """
#             Extract data from source DB.
#             Write each table into csv files.
#         """
#         try:
#             for table in table_names:
#                 path = f"./data/postgres/{table}/{user_date}/{table}.csv"
#                 self.cur.execute(f"SELECT * FROM {table};")
#                 columns = [col[0] for col in self.cur.description]
#                 result = self.cur.fetchall()
#                 df = pd.DataFrame(columns=columns, data=result)
#                 final_df = df.astype(object).where(pd.notnull(df), 'NULL')
#                 final_df.to_csv(path, index=False, sep=',', encoding='utf-8')
#             logging.info("Tables extracted.\n")
#         except Exception as e:
#             logging.error(f"Error while extracting or saving data from Postgres DB: {e}")


#     def insert_into_db(self, source, table, user_date):
#         """
#             Insert data into destination DB.
#             return 'bool' : success
#         """
#         success = False
#         if source == "postgres":
#             path = f"./data/{source}/{table}/{user_date}/{table}.csv"
#         elif source == "csv":
#             path = f"./data/{source}/{user_date}/{table}.csv"
        
#         try:
#             df = pd.read_csv(path, encoding="utf-8")
#             df.replace(to_replace=np.nan, value='NULL', inplace=True)
                
#             self.cursor.execute("SET FOREIGN_KEY_CHECKS=0;")
#             self.cursor.execute(f"DELETE FROM {table};")
            
#             for i in range(len(df)):
#                 row_tuple = tuple(df.to_records(index=False))
                
#                 sql = f'''
#                     INSERT INTO {table} VALUES {row_tuple[i]};
#                 '''

#                 self.cursor.execute(sql)
#             self.cursor.execute("SET FOREIGN_KEY_CHECKS=1;")
#             success = True
#             return success
#         except Exception as e:
#             print(f"Error loading data into destination database: {e}")


#     def final_query(self, user_date, sql):
#         """
#             Execute final query on destination DB and write result into 'final_query.csv'
#         """
#         path = f"./data/track/{user_date}/final_query.csv"
#         self.cursor.execute(sql)
#         cols = [col[0] for col in self.cursor.description]
#         results = self.cursor.fetchall()
#         df = pd.DataFrame(results, columns=cols)
#         df.to_csv(path, index=False, sep=',', encoding='utf-8')


#     def close_connection(self):
#         """Close the connection with the database."""
#         self.cur.close()
#         self.conn.close()


In [453]:
class Database():
    def __init__(self, db_cred):
        self.conn_str = db_cred.conn_str
        self.engine = create_engine(self.conn_str)


    def get_table_names(self, sql):
        """
            Retrieve name of tables from source DB
            return 'list' : 'table_names'
        """
        try:
            with self.engine.connect() as connection:
                results = connection.execute(text(sql))

            table_names = [table_name[0] for table_name in results]
            logging.info(f"Got {len(table_names)} tables: {table_names} \n")
            return table_names
        except Exception as e:
            logging.error(f"Error retrieving table names, check database status or SQL syntax: \n{e}")

    def extract_db(self, table_names, user_date):
        """
            Extract data from source DB.
            Write each table into csv files.
        """
        try:
            with self.engine.connect() as connection:
                for table in table_names:
                    path = f"./data/postgres/{table}/{user_date}/{table}.csv"
                    result = connection.execute(text(f"SELECT * FROM {table};"))
                    columns = result._metadata.keys
                    df = pd.DataFrame(columns=columns, data=result)
                    final_df = df.astype(object).where(pd.notnull(df), 'NULL')
                    final_df.to_csv(path, index=False, sep=',', encoding='utf-8')
            logging.info("Tables extracted.\n")
        except Exception as e:
            logging.error(f"Error while extracting or saving data from Postgres DB: {e}")


    def insert_into_db(self, source, table, user_date):
        """
            Insert data into destination DB.
            return 'bool' : success
        """
        success = False
        if source == "postgres":
            path = f"./data/{source}/{table}/{user_date}/{table}.csv"
        elif source == "csv":
            path = f"./data/{source}/{user_date}/{table}.csv"
        
        try:
            df = pd.read_csv(path, encoding="utf-8")
            df.replace(to_replace=np.nan, value='NULL', inplace=True)


            with self.engine.connect() as connection:
                connection.execute(text("SET FOREIGN_KEY_CHECKS=0;"))
                
                # db_conn = self.engine.connect()
                df.to_sql(table, connection, if_exists="replace", index=False)
            #     connection.execute(text(f"DELETE FROM {table};"))
                
            #     for i in range(len(df)):
            #         row_tuple = tuple(df.to_records(index=False))
                    
            #         sql = f"""
            #             INSERT INTO {table} VALUES {row_tuple[i]};
            #         """

            #         connection.execute(text(sql))
                connection.execute(text("SET FOREIGN_KEY_CHECKS=1;"))
                connection.commit()
            success = True
            return success
        except Exception as e:
            print(f"Error loading data into destination database: {e}")


    def final_query(self, user_date, sql):
        """
            Execute final query on destination DB and write result into 'final_query.csv'
        """
        path = f"./data/track/{user_date}/final_query.csv"
        self.cursor.execute(sql)
        cols = [col[0] for col in self.cursor.description]
        results = self.cursor.fetchall()
        df = pd.DataFrame(results, columns=cols)
        df.to_csv(path, index=False, sep=',', encoding='utf-8')






    # def close_connection(self):
    #     """Close the connection with the database."""
    #     self.cur.close()
    #     self.conn.close()


In [454]:
source_db = Database(source_db_cred)

In [455]:
dest_db = Database(dest_db_cred)

In [456]:
from scripts.constants import sql_PG_TABLE_NAMES_QUERY

In [457]:
src_tables = source_db.get_table_names(sql_PG_TABLE_NAMES_QUERY)

In [458]:
user_date = "2023-01-30"

In [459]:
from scripts.functions import create_db_path, create_csv_path, extract_csv
create_db_path(src_tables, user_date)
create_csv_path(user_date)

Step 1 already executed for this date. Reprocessing it for the selected day (2023-01-30).
./data/postgres/suppliers/2023-01-30 recreated.

Step 1 already executed for this date. Reprocessing it for the selected day (2023-01-30).
./data/postgres/employees/2023-01-30 recreated.

Step 1 already executed for this date. Reprocessing it for the selected day (2023-01-30).
./data/postgres/shippers/2023-01-30 recreated.

Step 1 already executed for this date. Reprocessing it for the selected day (2023-01-30).
./data/postgres/categories/2023-01-30 recreated.

Step 1 already executed for this date. Reprocessing it for the selected day (2023-01-30).
./data/postgres/employee_territories/2023-01-30 recreated.

Step 1 already executed for this date. Reprocessing it for the selected day (2023-01-30).
./data/postgres/region/2023-01-30 recreated.

Step 1 already executed for this date. Reprocessing it for the selected day (2023-01-30).
./data/postgres/customer_demographics/2023-01-30 recreated.

Step 1 

In [460]:
source_db.extract_db(src_tables, user_date)

In [461]:
extract_csv(user_date)

CSV file extracted.



In [462]:
dsn_url_dest = "mysql+pymysql://user:userpass@localhost:3306/mysql_northwind"
dsn_url = "postgresql+psycopg2://northwind_user:thewindisblowing@localhost:5432/northwind"
engine = sqlalchemy.create_engine(dsn_url)
dest_engine = sqlalchemy.create_engine(dsn_url_dest)

In [463]:

for table in src_tables:
    source = "postgres"
    success = dest_db.insert_into_db(source, table, user_date)
    if success:
        print(f"Table {table} loaded into destination database.")


Error loading data into destination database: (pymysql.err.OperationalError) (3780, "Referencing column 'supplier_id' and referenced column 'supplier_id' in foreign key constraint 'fk_products_suppliers' are incompatible.")
[SQL: 
CREATE TABLE suppliers (
	supplier_id BIGINT, 
	company_name TEXT, 
	contact_name TEXT, 
	contact_title TEXT, 
	address TEXT, 
	city TEXT, 
	region TEXT, 
	postal_code TEXT, 
	country TEXT, 
	phone TEXT, 
	fax TEXT, 
	homepage TEXT
)

]
(Background on this error at: https://sqlalche.me/e/20/e3q8)
Error loading data into destination database: (pymysql.err.OperationalError) (3780, "Referencing column 'employee_id' and referenced column 'employee_id' in foreign key constraint 'fk_orders_employees' are incompatible.")
[SQL: 
CREATE TABLE employees (
	employee_id BIGINT, 
	last_name TEXT, 
	first_name TEXT, 
	title TEXT, 
	title_of_courtesy TEXT, 
	birth_date TEXT, 
	hire_date TEXT, 
	address TEXT, 
	city TEXT, 
	region TEXT, 
	postal_code TEXT, 
	country TEXT, 
	

In [407]:
with dest_engine.connect() as connection:
    result = connection.execute(text("SELECT * FROM region;"))
    for row in result:
        print(row)

In [130]:
from scripts.constants import sql_PG_TABLE_NAMES_QUERY

In [137]:
result = source_db.get_table_names(sql_PG_TABLE_NAMES_QUERY)