In [2]:
import pandas as pd
from sqlalchemy import create_engine
import urllib 
import time

# Define database connection parameters
params_source = urllib.parse.quote_plus("DRIVER={ODBC Driver 17 for SQL Server};SERVER=DESKTOP-TS0EFTA;DATABASE=AdventureWorksDW2019;UID=stone123;PWD=obembe225")
engine_source = create_engine("mssql+pyodbc:///?odbc_connect=%s" % params_source)

params_destination = urllib.parse.quote_plus("DRIVER={SQL Server};SERVER=DESKTOP-TS0EFTA;DATABASE=AdventureWorksDW2019_1;UID=stone123;PWD=obembe225")
engine_destination = create_engine("mssql+pyodbc:///?odbc_connect=%s" % params_destination)

# Define table names and chunk size
table_names = ['AdventureWorksDWBuildVersion','DatabaseLog','DimAccount','DimCurrency','DimCustomer','DimDate','DimDepartmentGroup','DimEmployee','DimGeography','DimOrganization','DimProduct','DimProductCategory','DimProductSubcategory','DimPromotion','DimReseller','DimSalesReason','DimSalesTerritory','DimScenario','FactAdditionalInternationalProductDescription','FactCallCenter','FactCurrencyRate','FactFinance','FactInternetSales','FactInternetSalesReason','FactProductInventory','FactResellerSales','FactSalesQuota','FactSurveyResponse','NewFactCurrencyRate','ProspectiveBuyer']

# Define database connection functions
def get_connection(engine):
    return engine.connect()

def close_connection(connection):
    connection.close()

# Define data ingestion function
def ingest_data(source_conn, dest_conn, table_name):
    print(f"Ingesting data from {table_name} into the target database...")
    
    # Check if the destination table has an identity column
    with dest_conn.connect() as conn:
        result = conn.execute(f"SELECT COUNT(*) FROM sys.identity_columns WHERE object_id = OBJECT_ID('{table_name}')")
        has_identity = bool(result.scalar())
    
    # If the destination table has an identity column, turn on IDENTITY_INSERT
    if has_identity:
        with dest_conn.connect() as conn:
            conn.execute(f"SET IDENTITY_INSERT {table_name} ON")
    
    # Delete data from the destination table
    with dest_conn.connect() as conn:
        conn.execute(f"DELETE FROM {table_name}")
    
    # Ingest data from source to destination table
    sql = f"SELECT * FROM {table_name}"
    for chunk in pd.read_sql(sql, source_conn, chunksize=10000):
        chunk.to_sql(table_name, con=dest_conn, if_exists='append', index=False)
        print(f"Ingested {len(chunk)} rows into {table_name}.")
        time.sleep(0.1)
    
    # If the destination table has an identity column, turn off IDENTITY_INSERT
    if has_identity:
        with dest_conn.connect() as conn:
            conn.execute(f"SET IDENTITY_INSERT {table_name} OFF")
    
    print(f"Ingestion complete for {table_name}.")

# Open database connections
source_conn = get_connection(engine_source)
dest_conn = get_connection(engine_destination)

# Ingest data for all tables
for table_name in table_names:
    ingest_data(source_conn, dest_conn, table_name)

# Close database connections
close_connection(source_conn)
close_connection(dest_conn)


Ingesting data from AdventureWorksDWBuildVersion into the target database...
Ingested 1 rows into AdventureWorksDWBuildVersion.
Ingestion complete for AdventureWorksDWBuildVersion.
Ingesting data from DatabaseLog into the target database...
Ingested 96 rows into DatabaseLog.
Ingestion complete for DatabaseLog.
Ingesting data from DimAccount into the target database...
Ingested 99 rows into DimAccount.
Ingestion complete for DimAccount.
Ingesting data from DimCurrency into the target database...
Ingested 105 rows into DimCurrency.
Ingestion complete for DimCurrency.
Ingesting data from DimCustomer into the target database...
Ingested 10000 rows into DimCustomer.
Ingested 8484 rows into DimCustomer.
Ingestion complete for DimCustomer.
Ingesting data from DimDate into the target database...
Ingested 3652 rows into DimDate.
Ingestion complete for DimDate.
Ingesting data from DimDepartmentGroup into the target database...
Ingested 7 rows into DimDepartmentGroup.
Ingestion complete for DimDe