# Data Migration: SQL to postgres

In [44]:
import os
import pandas as pd
import pyodbc
import psycopg2
from psycopg2.extras import execute_values
from dotenv import load_dotenv

## 1. Load credentials

In [45]:
load_dotenv()

True

In [46]:
sql_host = os.getenv("SQL_SERVER_HOST")
sql_db = os.getenv("SQL_SERVER_DB")

In [47]:
pg_host = os.getenv("POSTGRES_HOST") 
pg_port = os.getenv("POSTGRES_PORT")
pg_db = os.getenv("POSTGRES_DB")
pg_user = os.getenv("POSTGRES_USER")
pg_password = os.getenv("POSTGRES_PASSWORD")

 
 

In [48]:
print(f"POSTGRES_HOST: {pg_host}")
print(f"POSTGRES_PORT: {pg_port}")
print(f"POSTGRES_DB: {pg_db}")
print(f"POSTGRES_USER: {pg_user}")
print(f"POSTGRES_PASSWORD: {pg_password}")

POSTGRES_HOST: localhost
POSTGRES_PORT: 5432
POSTGRES_DB: transaction_uat
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres


## 2. Connect to SQL Server


In [49]:
print ("Connecting to SQL Server...")
print (f"   Server: {sql_host}")
print (f"   Database: {sql_db}")

Connecting to SQL Server...
   Server: INTELI5SSD-LAPT\SQLEXPRESS
   Database: TransactionDB_UAT


In [50]:
try:
    sql_conn_string = (
    f"DRIVER={{ODBC Driver 17 for SQL Server}};"
    f"SERVER={sql_host};"
    f"DATABASE={sql_db};"
    f"Trusted_Connection=yes;"
    )

    sql_conn = pyodbc.connect(sql_conn_string)
    sql_cursor = sql_conn.cursor()
    print ("[SUCCESS] -> Connection to SQL Server completed ")
except Exception as e:
    print(f"SQl Server connection failed {e}")

[SUCCESS] -> Connection to SQL Server completed 


## 3. Coonect to postgres

In [51]:
print ("Connecting to Postgres...")
print (f"  Server: {pg_host}")
print (f"  Database: {pg_db}")

Connecting to Postgres...
  Server: localhost
  Database: transaction_uat


In [52]:
try:
    pg_conn = psycopg2.connect(
        host=pg_host,
        port=pg_port,
        database=pg_db,
        user=pg_user,
        password=pg_password
    )

    pg_cursor=pg_conn.cursor()
    pg_cursor.execute("SELECT version();")
    pg_version = pg_cursor.fetchone()[0]

    print ("Connected to postgres")
    print (f"   Version: {pg_version[:50]}...\n")
except psycopg2.OperationalError as e:
    print(f"  Postgres connection failed: {e}")

except Exception as e:
    print (f"unexpected error {e}")
    raise

Connected to postgres
   Version: PostgreSQL 18.1 on x86_64-windows, compiled by msv...



## Define the tables to migrate

In [53]:
tables_to_migrate = ['Categories', 'Suppliers', 'Customers', 'Products']
print(f"  these are the tables: {tables_to_migrate}")

  these are the tables: ['Categories', 'Suppliers', 'Customers', 'Products']


In [54]:
for i, table in enumerate(tables_to_migrate, 1):
    print(f"  {i}.{table}")
Total_tables_to_migrate = len(tables_to_migrate)    
print (f"\n The total number of tables to migrate :{Total_tables_to_migrate}")

  1.Categories
  2.Suppliers
  3.Customers
  4.Products

 The total number of tables to migrate :4


## Run pre-migration checks

In [55]:
print("=" * 50)
print(">>> ROW COUNTS")
print("=" * 50)

>>> ROW COUNTS


In [56]:
customers = "SELECT COUNT(*) AS TotalCustomer FROM Customers;"
sql_cursor.execute(customers)
customers_count = sql_cursor.fetchone()[0]
print(f" Total customers is: {customers_count}")

 Total customers is: 900000


In [57]:
baseline_counts = {}

try:
    for table in tables_to_migrate:
        row_count_query = f"SELECT COUNT(*) FROM {table}"
        sql_cursor.execute(row_count_query)
        count = sql_cursor.fetchone()[0]

        baseline_counts[table] = count
        print(f"{table:30} {count} rows")
    print("-" * 45)    
    total_count = sum(baseline_counts.values())
    print(f"{'Total':28} {total_count:>10,} rows")
except Exception as e:
    print (f"Failed to get baseline counts: {e}")
    raise



Categories                     8 rows
Suppliers                      5000 rows
Customers                      900000 rows
Products                       150000 rows
---------------------------------------------
Total                         1,055,008 rows


In [58]:
data_quality_issues = []
print("\nCHECK 2: NULL CHECKS (CustomerName)")
try:
    customer_query = """SELECT COUNT(*) AS null_count
                        FROM Customers 
                        WHERE CustomerName is NULL"""
    sql_cursor.execute(customer_query)
    null_customer_count= sql_cursor.fetchone()[0]
    if null_customer_count > 0:
        data_quality_issues.append(f" {null_customer_count:,} Customers with null names...")
    #print (f"Total null customer is:{data_quality_issues[0]:,}...")

    print("\nCHECK 3: INVALID EMAIL FORMATS")
    sql_cursor.execute("""SELECT COUNT(*) AS invalid_email_count
                            FROM Customers
                            WHERE Email LIKE '%@invalid'  """)
    invalid_emails = sql_cursor.fetchone()[0]
    if invalid_emails > 0:
        data_quality_issues.append(f" {invalid_emails} invalid email formats...")
        #print (data_quality_issues)

    print("\nCHECK 4: NEGATIVE PRODUCT PRICES")
    sql_cursor.execute("""SELECT COUNT(*) AS negative_price_count
                            FROM Products
                            WHERE UnitPrice < 0; """)
    negative_prices = sql_cursor.fetchone()[0]
    if negative_prices > 0:
        data_quality_issues.append(f" {negative_prices:,} Negative prices...")
        #print (data_quality_issues)

    print("\nCHECK 5: NEGATIVE STOCK QUANTITY")
    sql_cursor.execute("""SELECT COUNT(*) AS negative_stock_count
                            FROM Products
                            WHERE StockQuantity < 0; """)
    negative_stock = sql_cursor.fetchone()[0]
    if negative_stock > 0:
        data_quality_issues.append(f" {negative_stock:,} Negative stocks...")
        #print (data_quality_issues)

    print("\nCHECK 6: IDENTIFYING ORPHAN RECORDS")    
    sql_cursor.execute("""SELECT COUNT(*)
                       FROM Products prod
                       WHERE NOT EXISTS (SELECT 1
                       FROM Suppliers sup
                       WHERE sup.SupplierID=prod.SupplierID) """)
    orpthan_data = sql_cursor.fetchone()[0]
    if orpthan_data > 0:
        data_quality_issues.append(f" {orpthan_data:,} Orphan records...")
        #print (data_quality_issues)

    print("\nCHECK 7: FUTURE DATE CHECK")    
    sql_cursor.execute("""SELECT COUNT(*)AS future_date_count
                       FROM Customers
                       WHERE CreatedDate > GETDATE() """)
    future_dates = sql_cursor.fetchone()[0]
    if future_dates > 0:
        data_quality_issues.append(f" {future_dates:,} future date records...")
        #print (data_quality_issues)    

    if data_quality_issues:
        print ("\nDATA QUALITY ISSUES FOUND:")
        for quality_issues in data_quality_issues:
            print(quality_issues)
    else:
        print("No data quality issues identified")

except Exception (e):
    print(f"[ERROR]====> {e}")
    raise


CHECK 2: NULL CHECKS (CustomerName)

CHECK 3: INVALID EMAIL FORMATS

CHECK 4: NEGATIVE PRODUCT PRICES

CHECK 5: NEGATIVE STOCK QUANTITY

CHECK 6: IDENTIFYING ORPHAN RECORDS

CHECK 7: FUTURE DATE CHECK

DATA QUALITY ISSUES FOUND:
 4,514 Customers with null names...
 8844 invalid email formats...
 775 Negative prices...
 1,467 Negative stocks...
 24,700 Orphan records...
 5,184 future date records...


In [59]:
table_schema = {}

try:
    for table in tables_to_migrate:
        schema_query = f"""
            SELECT
                COLUMN_NAME,
                DATA_TYPE,
                CHARACTER_MAXIMUM_LENGTH,
                IS_NULLABLE
            FROM
                INFORMATION_SCHEMA.COLUMNS
            WHERE
                table_name = '{table}'
            ORDER BY
                ORDINAL_POSITION
            """
        schema_df = pd.read_sql(schema_query, sql_conn)
        print(schema_df)
        table_schema[table] = schema_df
        #print(schema_df)
        print(f"\n{table}")
except Exception as e:
    pass

  schema_df = pd.read_sql(schema_query, sql_conn)
  schema_df = pd.read_sql(schema_query, sql_conn)
  schema_df = pd.read_sql(schema_query, sql_conn)
  schema_df = pd.read_sql(schema_query, sql_conn)


    COLUMN_NAME DATA_TYPE  CHARACTER_MAXIMUM_LENGTH IS_NULLABLE
0    CategoryID       int                       NaN          NO
1  CategoryName  nvarchar                      50.0         YES
2   Description  nvarchar                      -1.0         YES

Categories
    COLUMN_NAME DATA_TYPE  CHARACTER_MAXIMUM_LENGTH IS_NULLABLE
0    SupplierID       int                       NaN          NO
1  SupplierName  nvarchar                     150.0         YES
2   ContactName  nvarchar                     100.0         YES
3       Country  nvarchar                     100.0         YES
4         Phone  nvarchar                      20.0         YES

Suppliers
    COLUMN_NAME DATA_TYPE  CHARACTER_MAXIMUM_LENGTH IS_NULLABLE
0    CustomerID       int                       NaN          NO
1  CustomerName  nvarchar                     100.0         YES
2         Email  nvarchar                     100.0         YES
3         Phone  nvarchar                      20.0         YES
4       Country  

## 7. Define data type mappings

In [60]:
sqlserver_to_postgres_types_mapping = {
    'int': 'INTEGER',
    'bigint': 'BIGINT',
    'smallint': 'SMALLINT',
    'tinyint': 'SMALLINT',
    'bit': 'BOOLEAN',
    'decimal': 'NUMERIC',
    'numeric': 'NUMERIC',
    'money': 'NUMERIC(19,4)',
    'smallmoney': 'NUMERIC(10,4)',
    'float': 'DOUBLE PRECISION',
    'real': 'REAL',
    'datetime': 'TIMESTAMP',
    'datetime2': 'TIMESTAMP',
    'smalldatetime': 'TIMESTAMP',
    'date': 'DATE',
    'time': 'TIME',
    'char': 'CHAR',
    'varchar': 'VARCHAR',
    'nchar': 'CHAR',
    'nvarchar': 'VARCHAR',
    'text': 'TEXT',
    'ntext': 'TEXT'
}


In [61]:
print("SQL Server to PostgreSQL type mapping ")
for sql_type, pg_type in sqlserver_to_postgres_types_mapping.items():
    print(f"  {sql_type:15} --->      {pg_type}")

SQL Server to PostgreSQL type mapping 
  int             --->      INTEGER
  bigint          --->      BIGINT
  smallint        --->      SMALLINT
  tinyint         --->      SMALLINT
  bit             --->      BOOLEAN
  decimal         --->      NUMERIC
  numeric         --->      NUMERIC
  money           --->      NUMERIC(19,4)
  smallmoney      --->      NUMERIC(10,4)
  float           --->      DOUBLE PRECISION
  real            --->      REAL
  datetime        --->      TIMESTAMP
  datetime2       --->      TIMESTAMP
  smalldatetime   --->      TIMESTAMP
  date            --->      DATE
  time            --->      TIME
  char            --->      CHAR
  varchar         --->      VARCHAR
  nchar           --->      CHAR
  nvarchar        --->      VARCHAR
  text            --->      TEXT
  ntext           --->      TEXT


## 8. Create tables in PostgreSQL

In [62]:
try:
    for table in tables_to_migrate:
        schema = table_schema[table]
        pg_table = table.lower()
        #print(pg_table)
        pg_cursor.execute(f"DROP TABLE IF EXISTS {pg_table} CASCADE")

        column_definitions = []

        for idx, row in schema.iterrows():
            col_name = row['COLUMN_NAME'].lower()
            sql_type = row['DATA_TYPE']

            base_type = sql_type.lower()
            pg_type = sqlserver_to_postgres_types_mapping.get(base_type, 'TEXT')

            condition_1 = idx == 0
            condition_2 = col_name.endswith('id')
            condition_3 = 'int' in sql_type.lower()

            if condition_1 and condition_2 and condition_3:
                column_definitions.append(f"{col_name} SERIAL PRIMARY KEY")
            else:
                column_definitions.append(f"{col_name} {pg_type}")
        
        column_string = ",\n    ".join(column_definitions)
        create_query = f"""
        CREATE TABLE {pg_table} (
            {column_string}
        )
        """
        pg_cursor.execute(create_query)
        pg_conn.commit()

        print("\n + "  + "=" * 55)
        print ("[SUCCESS]---> All tables created successfully")
        print(column_definitions)

except psycopg2.Error as e:
    print(f"Postgres experience an error while creating a table: {e}")
    pg_conn.rollback()
    raise

except Exception as e:
    print(f"Unexpected issue: {e}")
    raise


[SUCCESS]---> All tables created successfully
['categoryid SERIAL PRIMARY KEY', 'categoryname VARCHAR', 'description VARCHAR']

[SUCCESS]---> All tables created successfully
['supplierid SERIAL PRIMARY KEY', 'suppliername VARCHAR', 'contactname VARCHAR', 'country VARCHAR', 'phone VARCHAR']

[SUCCESS]---> All tables created successfully
['customerid SERIAL PRIMARY KEY', 'customername VARCHAR', 'email VARCHAR', 'phone VARCHAR', 'country VARCHAR', 'createddate TIMESTAMP', 'isactive BOOLEAN']

[SUCCESS]---> All tables created successfully
['productid SERIAL PRIMARY KEY', 'productname VARCHAR', 'categoryid INTEGER', 'supplierid INTEGER', 'unitprice NUMERIC(19,4)', 'stockquantity INTEGER', 'createddate TIMESTAMP']


In [63]:
test_table = 'Customers'
pg_table = test_table.lower()

In [64]:
try:
    print("1. Read from SQL Server...")
    extract_query = f"SELECT * FROM {pg_table}"
    test_df = pd.read_sql(extract_query, sql_conn)
    print(f"Read {len(test_df):,} rows")

    print("2. Transforming data types...")
    if "IsActive" in test_df.columns:
        test_df["IsActive"] = test_df["IsActive"].astype(bool)
        print("[SUCCESS] ---> Converted IsActive: BIT ---> BOOLEAN")

    print("3. Prepare the data for loading...")
    data_tuples = [tuple(row) for row in test_df.to_numpy()]
    columns = [col.lower() for col in test_df.columns]
    columns_string = ", ".join(columns)
    placeholders = ", ".join(["%s"] * len(columns))

    insert_query = f"""
        INSERT INTO {pg_table} ({columns_string})
        VALUES %s
    """

    print(f"Prepared {len(data_tuples):,} rows")

    print("4. Insert data into PostgreSQL...")
    execute_values(pg_cursor, insert_query, data_tuples, page_size=1000)
    pg_conn.commit()
    print(f"Loaded {len(data_tuples):,} rows")

    print("5. Verifying...")
    pg_cursor.execute(f"SELECT COUNT(*) FROM {pg_table}")
    pg_count = pg_cursor.fetchone()[0]

    sql_count = baseline_counts[test_table]

    if pg_count == sql_count:
        print(f"[SUCCESS] ---> Verification passed: {pg_count:,} == {sql_count:,}")
    else:
        print(f"[FAILED] ---> Count mismatch: {pg_count:,} != {sql_count:,}")

    print(f"\n{test_table} migration test successfully completed!")

except Exception as e:
    pg_conn.rollback()
    raise


1. Read from SQL Server...


  test_df = pd.read_sql(extract_query, sql_conn)


Read 900,000 rows
2. Transforming data types...
[SUCCESS] ---> Converted IsActive: BIT ---> BOOLEAN
3. Prepare the data for loading...
Prepared 900,000 rows
4. Insert data into PostgreSQL...
Loaded 900,000 rows
5. Verifying...
[SUCCESS] ---> Verification passed: 900,000 == 900,000

Customers migration test successfully completed!


## Table Migration

In [65]:
# Exclude Customers table if already migrated
remaining_tables = []

for t in tables_to_migrate:
    if t != "Customers":
        remaining_tables.append(t)

for table in remaining_tables:
    pg_table = table.lower()

    print(f"\nMigrating {table} → {pg_table}...")

    try:
        # 1. Read from SQL Server
        print("1. Reading from SQL Server...")
        extract_query = f"SELECT * FROM {table}"
        sql_df = pd.read_sql(extract_query, sql_conn)
        print(f"   Read {len(sql_df):,} rows\n")

        # 2. Prepare data
        print("2. Preparing data...")
        data_tuples = [tuple(row) for row in sql_df.to_numpy()]

        columns = [col.lower() for col in sql_df.columns]
        columns_string = ", ".join(columns)

        #placeholders = ", ".join(["%s"] * len(columns))

        insert_query = f"""
            INSERT INTO {pg_table} ({columns_string})
            VALUES %s
        """

        print(f"   Prepared {len(data_tuples):,} rows\n")

        # 3. Bulk insert into PostgreSQL
        print("3. Processing bulk load...")
        execute_values(
            pg_cursor,
            insert_query,
            data_tuples,
            page_size=1000
        )
        pg_conn.commit()

        print(f"[SUCCESS] → Loaded {len(data_tuples):,} rows\n")

        # 4. Verification
        print("4. Verifying row counts...")
        pg_cursor.execute(f"SELECT COUNT(*) FROM {pg_table}")
        pg_count = pg_cursor.fetchone()[0]

        sql_count = baseline_counts.get(table)

        if pg_count == sql_count:
            print(f"[SUCCESS] → Verification passed: {pg_count:,} = {sql_count:,}")
        else:
            print(f"[FAILED] → Count mismatch: {pg_count:,} ≠ {sql_count:,}")

        print(f"\n{table} migration successfully completed.")

    except Exception as e:
        pg_conn.rollback()
        print(f"[ERROR] → Failed migrating {table}: {e}")



Migrating Categories → categories...
1. Reading from SQL Server...
   Read 8 rows

2. Preparing data...
   Prepared 8 rows

3. Processing bulk load...
[SUCCESS] → Loaded 8 rows

4. Verifying row counts...
[SUCCESS] → Verification passed: 8 = 8

Categories migration successfully completed.

Migrating Suppliers → suppliers...
1. Reading from SQL Server...
   Read 5,000 rows

2. Preparing data...
   Prepared 5,000 rows

3. Processing bulk load...
[SUCCESS] → Loaded 5,000 rows

4. Verifying row counts...


  sql_df = pd.read_sql(extract_query, sql_conn)
  sql_df = pd.read_sql(extract_query, sql_conn)
  sql_df = pd.read_sql(extract_query, sql_conn)


[SUCCESS] → Verification passed: 5,000 = 5,000

Suppliers migration successfully completed.

Migrating Products → products...
1. Reading from SQL Server...
   Read 150,000 rows

2. Preparing data...
   Prepared 150,000 rows

3. Processing bulk load...
[SUCCESS] → Loaded 150,000 rows

4. Verifying row counts...
[SUCCESS] → Verification passed: 150,000 = 150,000

Products migration successfully completed.
