# Data Migration: SQL Server to Postgres

In [46]:
import os
import pandas as pd 
import pyodbc
import psycopg2
from psycopg2.extras import execute_values
from dotenv import load_dotenv

## 1. Load Credentials

In [47]:
load_dotenv()

True

In [48]:
sql_host = os.getenv("SQL_SERVER_HOST")
sql_db = os.getenv("SQL_SERVER_DB")

In [49]:
print(f"SQL SERVER HOST: {sql_host}")
print(f"SQL SERVER DB: {sql_db}")

SQL SERVER HOST: DESKTOP-UMBEJIB\SQLEXPRESS
SQL SERVER DB: sqlserver-to-postgres


In [50]:
pg_host = os.getenv("POSTGRES_HOST")
pg_port = os.getenv("POSTGRES_PORT")
pg_db = os.getenv("POSTGRES_DB")
pg_user = os.getenv("POSTGRES_USER")
pg_password = os.getenv("POSTGRES_PASSWORD")

In [51]:
print(f"POSTGRES HOST: {pg_host}")
print(f"POSTGRES PORT: {pg_port}")
print(f"POSTGRES DB: {pg_db}")
print(f"POSTGRES user: {pg_user}")
print(f"POSTGRES PASSWORD: {pg_password}")

POSTGRES HOST: localhost
POSTGRES PORT: 5432
POSTGRES DB: bank_uat
POSTGRES user: postgres
POSTGRES PASSWORD: password


## 2. Connect to SQL SERVER

In [52]:
print("Connecting to SQL SERVER....")
print(f"  server:{sql_host}")
print(f"  Database:{sql_db}")

Connecting to SQL SERVER....
  server:DESKTOP-UMBEJIB\SQLEXPRESS
  Database:sqlserver-to-postgres


In [53]:
try:
    sql_conn_string = (
        f"DRIVER={{ODBC Driver 17 for SQL Server}};"
        f"SERVER={sql_host};"
        f"DATABASE={sql_db};"
        f"Trusted_Connection=yes;"

    )

    sql_conn = pyodbc.connect(sql_conn_string)
    sql_cursor = sql_conn.cursor()
    print("[SUCCESSFUL] -> Connection to SQL Server now live!")
    
except Exception as e:
    print(f"SQL Server connection failed: {e}")
    print("""How to troubleshoot:
          > 1. Check server name in .env file is correct
          > 2. verify SQL Server is running
          > 3. Check Windows Authentication is enabled
          .....
""")

[SUCCESSFUL] -> Connection to SQL Server now live!


## 3. Connect to PostgresSQL

In [54]:
print("Connecting to PostgresSQL....")
print(f"  server:{pg_host}")
print(f"  Database:{pg_db}")

Connecting to PostgresSQL....
  server:localhost
  Database:bank_uat


In [55]:
try:
    pg_conn = psycopg2.connect(
        host=pg_host,
        port=pg_port,
        database=pg_db,
        user=pg_user,
        password="Ndodzo@08"
    )

    pg_cursor=pg_conn.cursor()
    pg_cursor.execute("SELECT version();")

    pg_version = pg_cursor.fetchone()[0]

    print("Connected to PostgresSQL")
    print(f"  Version: {pg_version[:50]}....\n")

except psycopg2.OperationalError as e:
    print(f"Postgres Connection failed: {e}")
    print(""" How to troubleshoot:
          > 1. Check Postgres is running
          > 2. Verify username + password
          > 3. Check database exists
          
          .....
          
""")
    
except Exception as e:
    print(f"Unexpected error: {e}")
    raise

Connected to PostgresSQL
  Version: PostgreSQL 18.1 on x86_64-windows, compiled by msv....



## 4. Define the tables to migrate

In [56]:
table_to_migrate = "bank"
print(table_to_migrate)

bank


## 5. Run pre-migration checks

In [57]:
print("="*50)
print(">>> Check 1: ROW COUNTS")
print("=" * 50)

>>> Check 1: ROW COUNTS


In [96]:
baseline_count = {}
test_query = "SELECT COUNT(*) AS total_rows FROM bank;"
sql_cursor.execute(test_query)

baseline_count[table_to_migrate] = sql_cursor.fetchone()[0]

print(f"Results: {table_to_migrate} : {baseline_count[table_to_migrate]}")

Results: bank : 1000


In [59]:
print("="*50)
print(">>> Check 2: NULL COUNTS")
print("=" * 50)

quality_issues = []

>>> Check 2: NULL COUNTS


In [62]:
try:
    sql_cursor.execute("""SELECT COUNT(*) AS null_count
                       FROM bank
                       WHERE Age IS NULL""")
    null_names = sql_cursor.fetchone()[0]
    if null_names>0:
        quality_issues.append(f"   - {null_names:,} bank with null details")
        print(quality_issues)
    else:  
        print("No issue detected!")

except Exception as e:
    print(f"An error occurred: {e}")

No issue detected!


## 6. Get table schema

In [64]:
print("="*50)
print(">>> ANALYSE TABLE SCHEMA")
print("=" * 50)

>>> ANALYSE TABLE SCHEMA


In [84]:
table_schema = {}
try:
    schema_query = f"""
        SELECT 
            COLUMN_NAME,
            DATA_TYPE,
            CHARACTER_MAXIMUM_LENGTH,
            IS_NULLABLE
	    FROM 
            INFORMATION_SCHEMA.COLUMNS
        WHERE
            TABLE_NAME = 'bank'
            
"""

    schema_df = pd.read_sql(schema_query, sql_conn)
    print(f"\n--- SCHEMA REPORT FOR TABLE: {table_to_migrate.upper()} ---")
    table_schema[table_to_migrate] = schema_df
    print(f"{table_to_migrate}")

except Exception as e:
    pass

  schema_df = pd.read_sql(schema_query, sql_conn)



--- SCHEMA REPORT FOR TABLE: BANK ---
bank


## 7. Define data type mappings

In [66]:
print("="*50)
print(">>> DATA TYPE MAPPING")
print("=" * 50)

>>> DATA TYPE MAPPING


In [67]:
TYPE_MAPPING = {
    'int': 'INTEGER',
    'bigint': 'BIGINT',
    'smallint': 'SMALLINT',
    'tinyint': 'SMALLINT',
    'bit': "BOOLEAN",
    'decimal': 'NUMERIC',
    'numeric': 'NUMERIC',
    'money': 'NUMERIC(19,4)',
    'smallmoney': 'NUMERIC(10,4)',
    'float': 'DOUBLE PRECISION',
    'real': 'REAL',
    'datetime': "TIMESTAMP",
    'datetime2': "TIMESTAMP",
    'smalldatetime': 'TIMESTAMP',
    'date': 'DATE',
    'time': 'TIME',
    'char': 'CHAR',
    'varchar': 'VARCHAR',
    'nvarchar': 'VARCHAR',
    'text': 'TEXT',
    'ntext': 'TEXT'

}

In [73]:
print("SQL Server to PostgresSQL type mapping")
print()

for sql_type, pg_type in list(TYPE_MAPPING.items()):
    print(f"  {sql_type:13} --->    {pg_type}")

SQL Server to PostgresSQL type mapping

  int           --->    INTEGER
  bigint        --->    BIGINT
  smallint      --->    SMALLINT
  tinyint       --->    SMALLINT
  bit           --->    BOOLEAN
  decimal       --->    NUMERIC
  numeric       --->    NUMERIC
  money         --->    NUMERIC(19,4)
  smallmoney    --->    NUMERIC(10,4)
  float         --->    DOUBLE PRECISION
  real          --->    REAL
  datetime      --->    TIMESTAMP
  datetime2     --->    TIMESTAMP
  smalldatetime --->    TIMESTAMP
  date          --->    DATE
  time          --->    TIME
  char          --->    CHAR
  varchar       --->    VARCHAR
  nvarchar      --->    VARCHAR
  text          --->    TEXT
  ntext         --->    TEXT


## 8. Create tables in PostgreSQL

In [74]:
print("="*50)
print(">>> CREATE TABLES IN POSTGRES")
print("=" * 50)

>>> CREATE TABLES IN POSTGRES


In [91]:
try:

    pg_table = table_to_migrate.lower()

    pg_cursor.execute(f"DROP TABLE IF EXISTS {pg_table} CASCADE")

    column_definitions = []
    last_idx = len(schema_df) - 1

    for idx, row in schema_df.iterrows():
        col_name = row['COLUMN_NAME'].lower()
        sql_type = row['DATA_TYPE']

        base_type = sql_type.lower()
        pg_type = TYPE_MAPPING.get(base_type, 'TEXT')

        condition_1 = (idx == last_idx)                   # Must be first column in the table
        condition_2 = col_name.endswith('id')   # Must end with ID
        condition_3 = 'int' in sql_type.lower() # Must be INT data type

        if condition_1 and condition_2 and condition_3:
            column_definitions.append(f"{col_name} SERIAL PRIMARY KEY")
        else:
            column_definitions.append(f"{col_name} {pg_type}")

    # BUILD the final SQL string

    column_string = ",\n          ".join(column_definitions)
    create_query = f"""
    CREATE TABLE {pg_table} (
            {column_string}
    
    )
    """

    # EXECUTE AND COMMIT

    pg_cursor.execute(create_query)
    pg_conn.commit()

    print("\n + " + "#" * 55)
    print("[SUCCESS] ----> ALL tables created successfully!")

# Catches SQL specific problems
except psycopg2.Error as e:
    print(f"Postgres experienced an error while creating a table: {e}")
    pg_conn.rollback()

    raise

except Exception as e:
    print(f"Unexpected issue: {e}")


 + #######################################################
[SUCCESS] ----> ALL tables created successfully!


## 9. TEST MIGRATION

In [92]:
print("="*50)
print(">>> TESTING MIGRATION")
print("=" * 50)

>>> TESTING MIGRATION


In [93]:
test_table = 'bank'
pg_table = test_table.lower()

In [99]:
try:
    print("1. Read from SQL Server....")
    extract_query = f"SELECT * FROM {test_table}"
    test_df = pd.read_sql(extract_query, sql_conn)

    print(f"    Read {len(test_df)} rows")

    print("3. Prepare the date for loading")
    data_tuples = [tuple(row) for row in test_df.to_numpy()]

    columns = [col.lower() for col in test_df.columns]

    columns_string = ', '.join(columns)

    placeholders = ', '.join(['%s'] * len(columns))


    insert_query = f"""
        INSERT INTO {pg_table} ({columns_string})
        VALUES %s


"""

    print(f"     Prepared {len(data_tuples):,} rows")

    print("3. Insert data into PostgreSQL....")
    execute_values(pg_cursor, insert_query, data_tuples, page_size= 1000)
    pg_conn.commit()

    print(f"Loaded {len(data_tuples):,} rows")


    print("4. Verifying....")
    pg_cursor.execute(f"SELECT COUNT(*) AS total_rows FROM {pg_table}")
    pg_count = pg_cursor.fetchone()[0]

    sql_count = baseline_count[test_table]

    if pg_count == sql_count:
        print(f"[SUCCESS] ---> verification passed: {pg_count:,} == {sql_count:,}")
    else:
        print(f"[FAILED] ---> Count mismatch: {pg_count:,} != {sql_count}")

    print(f"\n {test_table} migration test succefully completed")

except Exception as e:
    pg_conn.rollback()
    raise

1. Read from SQL Server....


  test_df = pd.read_sql(extract_query, sql_conn)


    Read 1000 rows
3. Prepare the date for loading
     Prepared 1,000 rows
3. Insert data into PostgreSQL....
Loaded 1,000 rows
4. Verifying....
[SUCCESS] ---> verification passed: 1,000 == 1,000

 bank migration test succefully completed
