In [18]:
from sqlalchemy import create_engine, MetaData, Table, inspect, text

class DataExtractor:
    def __init__(self, source_url, destination_url, chunk_size=10000):
        self.source_engine = create_engine(source_url)
        self.destination_engine = create_engine(destination_url)
        self.chunk_size = chunk_size

    def fetch_data_in_chunks(self, table_name, select_query=None):
        if select_query:
            # Use the provided SELECT query to fetch data
            offset = 0
            while True:
                query = f"{select_query} LIMIT {self.chunk_size} OFFSET {offset}"
                result = self.source_engine.execute(text(query)).fetchall()
                if not result:
                    break
                offset += self.chunk_size
                yield result
        else:
            # Fetch all data from the source table
            metadata = MetaData(bind=self.source_engine)
            table = Table(table_name, metadata, autoload=True)
            offset = 0
            while True:
                result = self.source_engine.execute(table.select().limit(self.chunk_size).offset(offset)).fetchall()
                if not result:
                    break
                offset += self.chunk_size
                yield result

    def insert_data(self, table_name, data):
        metadata = MetaData(bind=self.destination_engine)
        table = Table(table_name, metadata, autoload=True)

        # Convert RowProxy to dict
        data_dicts = [dict(row) for row in data]
        self.destination_engine.execute(table.insert(), data_dicts)

    def table_exists(self, table_name):
        inspector = inspect(self.destination_engine)
        return table_name in inspector.get_table_names()

    def create_table(self, create_table_query):
        self.destination_engine.execute(create_table_query)

    def drop_table(self, table_name):
        if self.table_exists(table_name):
            self.destination_engine.execute(f"DROP TABLE {table_name}")

    def rename_table(self, old_name, new_name):
        self.destination_engine.execute(f"ALTER TABLE {old_name} RENAME TO {new_name}")

    def transfer_data(self, source_table, destination_table, create_table_query, select_query=None):
        temp_table = f"temp_{destination_table}"

        # Drop the temporary table if it already exists
        self.drop_table(temp_table)

        # Create the temporary table
        self.create_table(create_table_query.replace(destination_table, temp_table))

        # Transfer data using the provided SELECT query or fetch all data
        for chunk in self.fetch_data_in_chunks(source_table, select_query):
            self.insert_data(temp_table, chunk)

        # Drop the original destination table and rename the temporary table
        self.drop_table(destination_table)
        self.rename_table(temp_table, destination_table)


In [20]:
# Define database connection URLs
SOURCE_DB_URL = "postgresql://user:password@localhost:5432/mydatabase"
#TARGET_DB_URL = "mysql://user:password@localhost:3306/mydatabase"
TARGET_DB_URL = "postgresql://user:password@localhost:5432/mydatabase"

chunk_sizes = [1000, 50_000, 100_000, 500_000]

# For demonstration purposes, we'll use just one chunk size
chunk_sizes = [500_000]

# Define the "CREATE TABLE" query for the target table
create_table_query = '''
CREATE TABLE IF NOT EXISTS target_banking_orders (
    order_id INT NOT NULL,
    account_number TEXT NOT NULL,
    beneficiary_account TEXT,
    amount REAL NOT NULL,
    transaction_date DATE NOT NULL,
    transaction_type TEXT NOT NULL,
    order_status TEXT NOT NULL,
    order_description TEXT,
    currency TEXT NOT NULL,
    branch_code TEXT NOT NULL
)
'''

for chunk_size in chunk_sizes:
    # Create an instance of the DataExtractor class
    extractor = DataExtractor(SOURCE_DB_URL, TARGET_DB_URL, chunk_size=chunk_size)
    
    select_query = "SELECT * FROM banking_orders WHERE transaction_date > '2023-06-01'"
    extractor.transfer_data("banking_orders", "target_banking_orders", create_table_query, select_query)
    # Transfer data from source to target
    #extractor.transfer_data("banking_orders", "target_banking_orders", create_table_query)
