# Data Migration: SQL Server to Postgres


In [38]:
import os
import pandas as pd
import pyodbc
import psycopg2
from psycopg2.extras import execute_values
from dotenv import load_dotenv

## 1. Load Credentials

In [39]:
load_dotenv()

True

In [40]:
sql_host = os.getenv("SQL_SERVER_HOST")
sql_db = os.getenv("SQL_SERVER_DATABASE")

In [41]:
print(f"SQL Server Host: {sql_host}")
print(f"SQL Server Database: {sql_db}")

SQL Server Host: DESKTOP-516QCDP\SQLEXPRESS
SQL Server Database: TransactionDB_UAT


In [42]:
pg_host = os.getenv("POSTGRES_HOST")
pg_db = os.getenv("POSTGRES_DATABASE")
pg_user = os.getenv("POSTGRES_USER")
pg_password = os.getenv("POSTGRES_PASSWORD")
pg_port = os.getenv("POSTGRES_PORT")  


In [43]:
print(f"POSTGRES Host: {pg_host}")
print(f"POSTGRES Database: {pg_db}")
print(f"POSTGRES User: {pg_user}")
print(f"POSTGRES Password: {pg_password}")
print(f"POSTGRES Port: {pg_port}")

POSTGRES Host: localhost
POSTGRES Database: Transaction_uat
POSTGRES User: postgres
POSTGRES Password: qwertyuiop
POSTGRES Port: 5432


## 2 Connect to SQL SERVER

In [44]:
print("Connecting to SQL Server...")
print(f"  Server: {sql_host}, Database: {sql_db}")

Connecting to SQL Server...
  Server: DESKTOP-516QCDP\SQLEXPRESS, Database: TransactionDB_UAT


In [45]:
try:
    sql_conn_string = (
        f"DRIVER={{ODBC Driver 17 for SQL Server}};"
        f"SERVER=localhost,1433;"
        f"DATABASE={sql_db};"
        "Trusted_Connection=yes;"
        "TrustServerCertificate=yes;"
    )
    sql_conn = pyodbc.connect(sql_conn_string)
    sql_cursor = sql_conn.cursor()
    print("Successfully connected to SQL Server.")

except Exception as e:
    print("Error connecting to SQL Server:")
    print(e)
    print(""" How to Troubleshoot:
        > 1. Ensure that the SQL Server is running and accessible from your network.
        > 2. Check server name in .env file is correct.
        > 3. Check windows authentication permissions are enabled.
          ....
 """)


Successfully connected to SQL Server.


## 3. Connect to PostgreSQL


In [46]:
print("Connecting to Postgres...")
print(f"  Server: {pg_host}, Database: {pg_db}, User: {pg_user}")

Connecting to Postgres...
  Server: localhost, Database: Transaction_uat, User: postgres


In [47]:
try:
    pg_conn = psycopg2.connect(
        host=pg_host,
        port=pg_port,
        dbname=pg_db,
        user=pg_user,
        password=pg_password
    )
    pg_cursor = pg_conn.cursor()
    pg_cursor.execute("SELECT version();")

    pg_version = pg_cursor.fetchone()[0]
    print("Successfully connected to PostgreSQL.")
    print(f"PostgreSQL version: {pg_version[:50]}...\n")

except psycopg2.OperationalError as e:
    print(f"Error connecting to PostgreSQL: {e}")
    print(""" How to Troubleshoot:
        > 1. Ensure that the PostgreSQL server is running.
        > 2. Check connection parameters in the .env file.
        > 3. Ensure the user has access to the database.
        > 4. Ensure PostgreSQL is listening on the specified port.
    """)

except Exception as e:
    print(f"An unexpected error occurred while connecting to PostgreSQL: {e}")
    raise


Successfully connected to PostgreSQL.
PostgreSQL version: PostgreSQL 18.1 on x86_64-windows, compiled by msv...



## 4. Define the tables for Migration

### Migration Order

- Categories (no dependencies)
- Suppliers (no dependencies)
- Customers (no dependencies)
- Products (depends on Categories and Suppliers)

In [48]:
tables_to_migrate = ["Categories", "Suppliers", "Customers", "Products"]
print(tables_to_migrate)

['Categories', 'Suppliers', 'Customers', 'Products']


In [49]:
print("Table to migrate:")
for i, table in enumerate(tables_to_migrate, start=1):
    print(f"  {i}: {table}")

total_no_of_tables = len(tables_to_migrate)
print(f"Total number of tables to migrate: {total_no_of_tables}")

Table to migrate:
  1: Categories
  2: Suppliers
  3: Customers
  4: Products
Total number of tables to migrate: 4


## 5. Run pre-migration Checks

In [50]:
print("=" * 50)
print(">>> CHECK 1: ROW COUNT CHECKS <<<")
print("=" * 50)

>>> CHECK 1: ROW COUNT CHECKS <<<


In [51]:
baseline_row_counts = {}

try:
    for table in tables_to_migrate:

        row_count_query = f"""
        SELECT SUM(row_count)
        FROM sys.dm_db_partition_stats
        WHERE object_id = OBJECT_ID('{table}')
          AND index_id IN (0,1)
        """
        sql_cursor.execute(row_count_query)
        count = sql_cursor.fetchone()[0] or 0

        baseline_row_counts[table] = count
        print(f"{table:15} : {count: >12} rows")

    total_rows = sum(baseline_row_counts.values())
    print("=" * 50)
    print(f"{'Total Rows to Migrate':<25} : {total_rows:>15,} rows")
    print("\n Baseline Captured! ")

except Exception as e:
    print(f"Baseline capture failed:{e}")
    raise


Categories      :            8 rows
Suppliers       :         5000 rows
Customers       :       900000 rows
Products        :       150000 rows
Total Rows to Migrate     :       1,055,008 rows

 Baseline Captured! 


In [52]:
sql_cursor.execute("""
    SELECT
        SCHEMA_NAME(schema_id) + '.' + name AS object_name,
        QUOTENAME(SCHEMA_NAME(schema_id)) + '.' + QUOTENAME(name) AS quoted_name
    FROM sys.tables
    WHERE is_ms_shipped = 0
""")

table_map = {
    row[0]: row[1]   # unquoted_name -> quoted_name
    for row in sql_cursor.fetchall()
}

tables_to_migrate = list(table_map.keys())




In [53]:
if not tables_to_migrate:
    raise RuntimeError("No user tables found in database.")

In [54]:
baseline_row_counts = {}

try:
    for table in tables_to_migrate:
        quoted_table = table_map[table]

        row_count_query = f"""
        SELECT SUM(row_count)
        FROM sys.dm_db_partition_stats
        WHERE object_id = OBJECT_ID('{table}')
          AND index_id IN (0,1)
        """

        sql_cursor.execute(row_count_query)
        count = sql_cursor.fetchone()[0] or 0

        baseline_row_counts[table] = count
        print(f"{quoted_table:<35} : {count:>15,} rows")

    total_rows = sum(baseline_row_counts.values())

    print("=" * 60)
    print(f"{'Total Rows to Migrate':<30} : {total_rows:>15,} rows")
    print("\nBaseline Captured Successfully")

except Exception as e:
    print(f"Baseline capture failed: {e}")
    raise


[dbo].[Categories]                  :               8 rows
[dbo].[Suppliers]                   :           5,000 rows
[dbo].[Customers]                   :         900,000 rows
[dbo].[Products]                    :         150,000 rows
Total Rows to Migrate          :       1,055,008 rows

Baseline Captured Successfully


In [55]:
print("=" * 50)
print(">>> CHECK 2: NULL COUNTS (CustomerName) <<<")
print("=" * 50)

>>> CHECK 2: NULL COUNTS (CustomerName) <<<


In [61]:
quality_issues = []

try:
    print("\nCheck 2: NULL counts in 'CustomerName' column of 'Customers' table")
    sql_cursor.execute("""SELECT COUNT(*) AS null_count
                          FROM Customers
                          WHERE CustomerName IS NULL""")
    null_names = sql_cursor.fetchone()[0]
    if null_names > 0:
        quality_issues.append(f"    -{null_names:,} Customers with NULL names")
    print(quality_issues)


    print("\nCHECK 3: Invalid email formats")
    sql_cursor.execute("""SELECT COUNT(*) AS invalid_email_count
                          FROM Customers
                          WHERE Email LIKE '%@invalid'""")
    invalid_emails = sql_cursor.fetchone()[0]
    if invalid_emails > 0:
        quality_issues.append(f"    -{invalid_emails:,} Customers with invalid email formats")

except Exception as e:
    pass


Check 2: NULL counts in 'CustomerName' column of 'Customers' table
['    -4,514 Customers with NULL names']

CHECK 3: Invalid email formats


In [58]:
try:
    sql_cursor.execute("""
        IF NOT EXISTS (SELECT * FROM sys.indexes WHERE name = 'IX_Customers_CustomerName')
        CREATE INDEX IX_Customers_CustomerName ON Customers(CustomerName)
    """)
    sql_cursor.execute("""
        IF NOT EXISTS (SELECT * FROM sys.indexes WHERE name = 'IX_Customers_Email')
        CREATE INDEX IX_Customers_Email ON Customers(Email)
    """)
    sql_cursor.commit()
except Exception as e:
    print(f"Index creation: {e}")