Data Migration: SQL Srve to Postgres


In [1]:
#!pip install psycopg2
import os
import pandas as pd
import pyodbc
import psycopg2
from psycopg2.extras import execute_values
from dotenv import load_dotenv

### Load credentials

In [2]:
load_dotenv()

True

In [3]:

sql_host = os.getenv("SQLSERVER_HOST")
sql_db = os.getenv("SQLSERVER_DB")

In [5]:

print(f"SQL SERVER HOST: {sql_host}")
print(f"SQL SERVER DB: {sql_db}")

SQL SERVER HOST: E\SQLEXPRESS01
SQL SERVER DB: TransactionDB_UAT


In [6]:

pg_host = os.getenv("POSTGRES_HOST")
pg_port = os.getenv("POSTGRES_PORT")
pg_db = os.getenv("POSTGRES_DB")
pg_user = os.getenv("POSTGRES_USER")
pg_password = os.getenv("POSTGRES_PASSWORD")

In [7]:

print(f"POSGRES HOST: {pg_host}")
print(f"POSGRES PORT: {pg_port}")
print(f"POSGRES DB: {pg_db}")
print(f"POSGRES USER: {pg_user}")
print(f"POSGRES PASSWORD: {pg_password}")

POSGRES HOST: localhost
POSGRES PORT: 5433
POSGRES DB: Transaction_UAT
POSGRES USER: postgres
POSGRES PASSWORD: 972340


### Connect to SQL Server

In [8]:
print("Connecting to SQL Server...")
print(f"   Server: {sql_host}")
print(f"   Database: {sql_db}")

Connecting to SQL Server...
   Server: E\SQLEXPRESS01
   Database: TransactionDB_UAT


In [9]:
try:
    sql_conn_string = (
        f"DRIVER={{ODBC Driver 17 for SQL Server}};"
        f"SERVER={sql_host};"
        f"DATABASE={sql_db};"
        f"Trusted_Connection=yes;"
    )

    sql_conn = pyodbc.connect(sql_conn_string)
    sql_cursor = sql_conn.cursor()
    print("[SUCCESS] -> Conenction to SQL Server now live! ")

except Exception as e:
    print(f"SQL Server connection failed: {e}")
    

[SUCCESS] -> Conenction to SQL Server now live! 


### Connect to Postgres

In [11]:

print("Connecting to PostgreSQL...")
print(f"   Server: {pg_host}")
print(f"   Database: {pg_db}")

Connecting to PostgreSQL...
   Server: localhost
   Database: Transaction_UAT


In [12]:
try:
    pg_conn = psycopg2.connect(
        host=pg_host,
        port=pg_port,
        database=pg_db,
        user=pg_user,
        password=pg_password
    )


    pg_cursor=pg_conn.cursor()
    pg_cursor.execute("SELECT version();")

    pg_version = pg_cursor.fetchone()[0]

    print("Connected to PostgreSQL")
    print(f"   Version: {pg_version[:50]}...\n")


except psycopg2.OperationalError as e:
    print(f" Postgres connection failed: {e}")
    print(""" How to troubleshoot:
            > 1. Check Postgres is running
            > 2. Verify username + password
            > 3. Check database exists
          
          ... 

""")

except Exception as e:
    print(f"Unexpected error: {e}")
    raise 

Connected to PostgreSQL
   Version: PostgreSQL 17.4 on x86_64-windows, compiled by msv...



### Migrating tables


In [None]:
#migatrion order is accroding to foregin key dependencies from the least dependent to most dependent
#Migration order
#Categories (no dependencies )
#Suppliers (no dependencies)
#Customers (no dependencies)
#Products (depends on Categories and Suppliers)

tables_to_migrate = [
    "Categories",
    "Suppliers",
    "Customers",
    "Products"]


In [14]:
print("Table to migrate:")
for i, table in enumerate(tables_to_migrate, 1):
    print(f"   {i}. {table}")

total_no_tbls = len(tables_to_migrate)
print(f"\nTotal no of tables to migrate: {total_no_tbls}")

Table to migrate:
   1. Categories
   2. Suppliers
   3. Customers
   4. Products

Total no of tables to migrate: 4


### Pre migration checks

In [15]:
print("=" * 50)
print(">>> Check 1: ROW COUNTS")
print("=" * 50)

>>> Check 1: ROW COUNTS


In [None]:
baseline_counts = {}
try:
    for table in tables_to_migrate:
        quoted_table = f"[{table}]"
        row_count_query = f"SELECT COUNT(*) as total_rows FROM {quoted_table}" 
        sql_cursor.execute(row_count_query)
        count = sql_cursor.fetchone()[0]

        baseline_counts[table] = count
        print(f"{table:15} {count:>12} rows")

    total_rows = sum(baseline_counts.values())
    print(f"{'-' * 30}")
    print(f"{'TOTAL':15} {total_rows:>12,} rows ")
    print("\n Baseline captured! ")

except Exception as e:
    print(f"Failed to get baseline counts: {e}")
    raise

# We will use this baseline to validate our migration results at the end of the process

Categories                 8 rows
Suppliers               5000 rows
Customers             900000 rows
Products              150000 rows
------------------------------
TOTAL              1,055,008 rows 

 Baseline captured! 


In [None]:
# Next, let's do some basic data quality checks to identify any potential issues before we migrate (we will migrate as-is regardless of the results but at least we will be aware of any potential issues upfront)

quality_issues = []

try:
    print("\nCHECK 2: NULL CHECKS (CustomerName)")
    sql_cursor.execute("""SELECT COUNT(*) AS null_count 
                          FROM Customers 
                          WHERE CustomerName IS NULL""")
    null_names = sql_cursor.fetchone()[0]
    if null_names > 0:
        quality_issues.append(f"    > {null_names:,} customers with NULL names...")
    # print(quality_issues)

    
    print("\nCHECK 3: INVALID EMAIL FORMATS CHECK")
    sql_cursor.execute("""SELECT COUNT(*) AS invalid_email_count 
                            FROM Customers 
                            WHERE Email LIKE '%@invalid'  """)
    invalid_emails = sql_cursor.fetchone()[0]
    if invalid_emails > 0:
        quality_issues.append(f"    > {invalid_emails:,} emails with invalid email formats...")
    # print(quality_issues)
    
    print("\nCHECK 4: NEGATIVE PRODUCT PRICES CHECK")
    sql_cursor.execute(""" SELECT COUNT(*) AS negative_product_prices_count 
                            FROM Products 
                            WHERE UnitPrice < 0
                       """)
    negative_price = sql_cursor.fetchone()[0]
    if negative_price > 0:
        quality_issues.append(f"    > {negative_price:,} prices contain negative prices...")
    # print(quality_issues)


    print("\nCHECK 5: NEGATIVE STOCK QUANTITIES CHECK")
    sql_cursor.execute(""" SELECT COUNT(*) AS negative_stock_quanities_count 
                            FROM Products 
                            WHERE StockQuantity < 0
                       """)
    negative_stock_quantities = sql_cursor.fetchone()[0]
    if negative_stock_quantities > 0:
        quality_issues.append(f"    > {negative_stock_quantities:,} products with negative stock...")
    # print(quality_issues)

    print("\nCHECK 6: ORPHANED FOREIGN KEYS CHECK")
    sql_cursor.execute(""" SELECT COUNT(*) AS orphaned_records 
                        FROM Products prod
                        WHERE NOT EXISTS (SELECT 1
                                            FROM Suppliers sup
                                            WHERE sup.SupplierID=prod.SupplierID)
""")
    orphaned_fks = sql_cursor.fetchone()[0]
    if orphaned_fks > 0:
        quality_issues.append(f"    > {orphaned_fks:,} products with orphaned foreign keys...")
    # print(quality_issues) 


    print("\nCHECK 7: FUTURES DATES CHECK")
    sql_cursor.execute(""" SELECT COUNT(*) as future_dates_count 
                            FROM Customers
                            WHERE CreatedDate > GETDATE()
""")
    future_dates = sql_cursor.fetchone()[0]
    if future_dates > 0:
        quality_issues.append(f"    > {future_dates:,} customers with future creations data later than current date...")
    # print(quality_issues)

    if quality_issues:
        print("\nData quality issues found (will migrate as-is)")
        for issue in quality_issues:
            print(issue)
    else:
        print("No data qualituy issues identified!")

except Exception as e:
    print(f"[ERROR] ===> Unexpected issue: {e}")
    raise


CHECK 2: NULL CHECKS (CustomerName)

CHECK 3: INVALID EMAIL FORMATS CHECK

CHECK 4: NEGATIVE PRODUCT PRICES CHECK

CHECK 5: NEGATIVE STOCK QUANTITIES CHECK

CHECK 6: ORPHANED FOREIGN KEYS CHECK

CHECK 7: FUTURES DATES CHECK

Data quality issues found (will migrate as-is)
    > 4,514 customers with NULL names...
    > 8,844 emails with invalid email formats...
    > 775 prices contain negative prices...
    > 1,467 products with negative stock...
    > 24,700 products with orphaned foreign keys...
    > 9,076 customers with future creations data later than current date...


### Extraction


In [19]:
## Get the table schemas to help with the migration process (e.g. to identify data types, nullable columns, etc...)

print("=" * 65)
print("ANALYZE TABLE SCHEMA")
print("=" * 65)

ANALYZE TABLE SCHEMA


In [20]:
table_schema = {}


try:
    for table in tables_to_migrate:
        schema_query = f"""
            SELECT 
                COLUMN_NAME, 
                DATA_TYPE, 
                CHARACTER_MAXIMUM_LENGTH,
                IS_NULLABLE
            FROM 
                INFORMATION_SCHEMA.COLUMNS
            WHERE 
                table_name = '{table}'
            ORDER BY 
                ORDINAL_POSITION

"""
        schema_df = pd.read_sql(schema_query, sql_conn)
        print(f"\n{table}")
        print("-" * 10)
        print(schema_df)
        table_schema[table] = schema_df
        print("\n\n\n")



except Exception as e:
    pass

  schema_df = pd.read_sql(schema_query, sql_conn)



Categories
----------
    COLUMN_NAME DATA_TYPE  CHARACTER_MAXIMUM_LENGTH IS_NULLABLE
0    CategoryID       int                       NaN          NO
1  CategoryName  nvarchar                      50.0         YES
2   Description  nvarchar                      -1.0         YES






  schema_df = pd.read_sql(schema_query, sql_conn)



Suppliers
----------
    COLUMN_NAME DATA_TYPE  CHARACTER_MAXIMUM_LENGTH IS_NULLABLE
0    SupplierID       int                       NaN          NO
1  SupplierName  nvarchar                     150.0         YES
2   ContactName  nvarchar                     100.0         YES
3       Country  nvarchar                     100.0         YES
4         Phone  nvarchar                      20.0         YES






  schema_df = pd.read_sql(schema_query, sql_conn)



Customers
----------
    COLUMN_NAME DATA_TYPE  CHARACTER_MAXIMUM_LENGTH IS_NULLABLE
0    CustomerID       int                       NaN          NO
1  CustomerName  nvarchar                     100.0         YES
2         Email  nvarchar                     100.0         YES
3         Phone  nvarchar                      20.0         YES
4       Country  nvarchar                     100.0         YES
5   CreatedDate  datetime                       NaN         YES
6      IsActive       bit                       NaN         YES






  schema_df = pd.read_sql(schema_query, sql_conn)



Products
----------
     COLUMN_NAME DATA_TYPE  CHARACTER_MAXIMUM_LENGTH IS_NULLABLE
0      ProductID       int                       NaN          NO
1    ProductName  nvarchar                     200.0         YES
2     CategoryID       int                       NaN         YES
3     SupplierID       int                       NaN         YES
4      UnitPrice     money                       NaN         YES
5  StockQuantity       int                       NaN         YES
6    CreatedDate  datetime                       NaN         YES






###  Transform

In [21]:
print("=" * 65)
print("DATA TYPE MAPPING")
print("=" * 65)

DATA TYPE MAPPING


In [25]:
# Define data type mappings from SQL Server to Postgres

type_mapping = {
    'int': 'INTEGER',
    'bigint': 'BIGINT',
    'smallint': 'SMALLINT',
    'tinyint': 'SMALLINT',
    'bit': 'BOOLEAN',
    'decimal': 'NUMERIC',
    'numeric': 'NUMERIC',
    'money': 'NUMERIC(19,4)',
    'smallmoney': 'NUMERIC(10,4)',
    'float': 'DOUBLE PRECISION',
    'real': 'REAL',
    'datetime': 'TIMESTAMP',
    'datetime2': 'TIMESTAMP',
    'smalldatetime': 'TIMESTAMP',
    'date': 'DATE',
    'time': 'TIME',
    'char': 'CHAR',
    'varchar': 'VARCHAR',
    'nchar': 'CHAR',
    'nvarchar': 'VARCHAR',
    'text': 'TEXT',
    'ntext': 'TEXT'
}


In [None]:
print("SQL Server to PostgreSQL type mapping ")
print()

for sql_type, pg_type in list(type_mapping.items()):
    print(f"    {sql_type:13} --->      {pg_type}")

SQL Server to PostgreSQL type mapping 

    int               --->      INTEGER
    bigint            --->      BIGINT
    smallint          --->      SMALLINT
    tinyint           --->      SMALLINT
    bit               --->      BOOLEAN
    decimal           --->      NUMERIC
    numeric           --->      NUMERIC
    money             --->      NUMERIC(19,4)
    smallmoney        --->      NUMERIC(10,4)
    float             --->      DOUBLE PRECISION
    real              --->      REAL
    datetime          --->      TIMESTAMP
    datetime2         --->      TIMESTAMP
    smalldatetime     --->      TIMESTAMP
    date              --->      DATE
    time              --->      TIME
    char              --->      CHAR
    varchar           --->      VARCHAR
    nchar             --->      CHAR
    nvarchar          --->      VARCHAR
    text              --->      TEXT
    ntext             --->      TEXT


### Create table schemas in Postgres based on the SQL Server schemas

In [27]:
## I will create the tables with the same names and structures as in SQL Server to make it easier for validation later on - I can always refactor the Postgres schemas later if needed but for now I'll keep it simple and consistent with the source

try:
    for table in tables_to_migrate:

        schema = table_schema[table]

        pg_table = table.lower()

        pg_cursor.execute(f"DROP TABLE IF EXISTS {pg_table} CASCADE")

        column_definitions = []

        for idx, row in schema.iterrows():
            col_name = row['COLUMN_NAME'].lower()
            sql_type = row['DATA_TYPE']

            base_type = sql_type.lower()
            pg_type = type_mapping.get(base_type, 'TEXT')      

            condition_1 = idx == 0                      # Must be first column in the table
            condition_2 = col_name.endswith('id')       # Must end with ID
            condition_3 = 'int' in sql_type.lower()     # Must be INT data type

            if condition_1 and condition_2 and condition_3:
                column_definitions.append(f"{col_name} SERIAL PRIMARY KEY")
            else:
                column_definitions.append(f"{col_name} {pg_type}")

    
        column_string = ",\n        ".join(column_definitions)
        create_query =  f""" 
        CREATE TABLE {pg_table} (
            {column_string}
        )
        """

        pg_cursor.execute(create_query)
        pg_conn.commit()
    
    print("\n + " + "=" * 55)
    print("[SUCCESS] ---> All tables created successfully!")


except psycopg2.Error as e:
    print(f"Postgres experienced an error while creating a table: {e}")
    pg_conn.rollback()
    raise 


except Exception as e:
    print(f"Unexpected issue: {e}")





[SUCCESS] ---> All tables created successfully!


### Load 

In [30]:
#Test migration with one table to validate the process before doing a full migration of all tables

test_table = 'Customers'
pg_table = test_table.lower()


In [31]:
try:
    print("1. Read from SQL Server... ")
    extract_query = f"SELECT * FROM {pg_table}"
    test_df = pd.read_sql(extract_query, sql_conn)

    print(f"        Read {len(test_df)} rows")


    print("2. Transforming data types...")

    if 'IsActive' in test_df.columns:
        test_df['IsActive'] = test_df['IsActive'].astype('bool')
        print("[SUCCESS] ---> Converted IsActive: BIT ---> BOOLEAN")


    print("3. Prepare the data for loading")
    data_tuples = [tuple(row) for row in test_df.to_numpy()]

    columns = [col.lower() for col in test_df.columns]

    columns_string = ', '.join(columns)

    placeholders = ', '.join(['%s'] * len(columns))

    insert_query = f"""
        INSERT INTO {pg_table} ({columns_string})
        VALUES %s
    """

    print(f"        Prepared {len(data_tuples):,} rows")


    print("4. Insert data into PostgreSQL...")
    execute_values(pg_cursor, insert_query, data_tuples, page_size=1000)
    pg_conn.commit()

    print(f"Loaded {len(data_tuples):,} rows")


    print("5. Verifying...")
    pg_cursor.execute(f"SELECT COUNT(*) AS total_rows FROM {pg_table}")
    pg_count = pg_cursor.fetchone()[0]

    sql_count = baseline_counts[test_table]

    if pg_count == sql_count:
        print(f"[SUCCESS] --> Verification passed: {pg_count:,} == {sql_count:,} ")
    else:
        print(f"[FAILED] --> Count mismatch: {pg_count:,} != {sql_count:,}")

    
    print(f"\n {test_table} migration test successfully completed!")



except Exception as e:
    pg_conn.rollback()
    raise

1. Read from SQL Server... 


  test_df = pd.read_sql(extract_query, sql_conn)


        Read 900000 rows
2. Transforming data types...
[SUCCESS] ---> Converted IsActive: BIT ---> BOOLEAN
3. Prepare the data for loading
        Prepared 900,000 rows
4. Insert data into PostgreSQL...
Loaded 900,000 rows
5. Verifying...
[SUCCESS] --> Verification passed: 900,000 == 900,000 

 Customers migration test successfully completed!


In [39]:
#migating the rest of the tables will follow the same process as above but with additional handling for foreign key relationships and dependencies

remaining_tables = [t for t in tables_to_migrate if t != 'Customers']

for table in remaining_tables:
    pg_table = table.lower()

    print(f"Migrating {table} --> {pg_table}...")

    try:
        print("1. Reading from SQL Server...")
        extract_query = f"SELECT * FROM {table}"
        sql_df = pd.read_sql(extract_query, sql_conn)
        print(f"     Read {len(sql_df):,} rows\n\n")

        print("2. Preparing data...")
        data_tuples = [tuple(row) for row in sql_df.to_numpy()]
        columns = [col.lower() for col in sql_df.columns]
        columns_string = ', '.join(columns)
        insert_query = f"""
                INSERT INTO {pg_table} ({columns_string})
                VALUES %s
"""
        print(f"        Prepared {len(data_tuples):,} rows\n\n")


        print("3. Processing bulk load...")
        pg_cursor.execute("CREATE SCHEMA IF NOT EXISTS uat;")
        pg_cursor.execute(f"TRUNCATE TABLE uat.{pg_table} RESTART IDENTITY CASCADE;")
        pg_conn.commit()
        execute_values(pg_cursor, insert_query, data_tuples, page_size=1000)
        pg_conn.commit()
        print(f"[SUCCESS] --> Loaded {len(data_tuples):,} rows\n")
       

       


        print("5. Verifying...")
        pg_cursor.execute(f"SELECT COUNT(*) AS total_rows FROM {pg_table}")
        pg_count = pg_cursor.fetchone()[0]

        sql_count = baseline_counts[table]

        if pg_count == sql_count:
            print(f"[SUCCESS] --> Verification passed: {pg_count:,} == {sql_count:,} ")
        else:
            print(f"[FAILED] --> Count mismatch: {pg_count:,} != {sql_count:,}")

        
        print(f"\n {table} migration successfully completed!")
        






    except Exception as e:
        print(f"Failed to migrate '{table}: {e}' ")
        pg_conn.rollback()
        raise

Migrating Categories --> categories...
1. Reading from SQL Server...
     Read 8 rows


2. Preparing data...
        Prepared 8 rows


3. Processing bulk load...
Failed to migrate 'Categories: relation "uat.categories" does not exist
' 


  sql_df = pd.read_sql(extract_query, sql_conn)


UndefinedTable: relation "uat.categories" does not exist
