## Data Migration: SQL Server to Postgres

In [1]:
import os
import pandas as pd;
import pyodbc
import psycopg2
from psycopg2.extras import execute_values
from dotenv import load_dotenv;

## 1. Load credentials


In [2]:
load_dotenv()

True

In [3]:
sql_host = os.getenv("SQL_SERVER_HOST")
sql_db   = os.getenv("SQL_SERVER_DB")

In [4]:
print(f"SQL SERVER HOST: {sql_host}")
print(f"SQL SERVER DB: {sql_db}")

SQL SERVER HOST: localhost\SQLEXPRESS
SQL SERVER DB: TransactionDB_UAT


In [5]:
pg_host = os.getenv("POSTGRES_HOST")
pg_port = os.getenv("POSTGRES_PORT")
pg_db = os.getenv("POSTGRES_DB")
pg_user = os.getenv("POSTGRES_USER")
pg_password = os.getenv("POSTGRES_PASSWORD")

In [6]:
print(f"POSTGRES HOST: {pg_host}")
print(f"POSTGRES PORT: {pg_port}")
print(f"POSTGRES DB: {pg_db}")
print(f"POSTGRES USER: {pg_user}")
print(f"POSTGRES PASSWORD: {pg_password}")

POSTGRES HOST: localhost
POSTGRES PORT: 5432
POSTGRES DB: teste
POSTGRES USER: airflow
POSTGRES PASSWORD: airflow


## Connect to SQL Server

In [7]:
print("Connecting to SQL Server")
print(f"  Server: {sql_host}")
print(f"  Database: {sql_db}")

Connecting to SQL Server
  Server: localhost\SQLEXPRESS
  Database: TransactionDB_UAT


In [8]:
try:
    sql_conn_string = (
        f"Driver={{ODBC Driver 18 for SQL Server}};"
        f"SERVER={sql_host};"
        f"DATABASE={sql_db};"
        f"Trusted_connection=yes;"
        f"Encrypt=no;"
    )

    sql_conn = pyodbc.connect(sql_conn_string)
    sql_cursor = sql_conn.cursor()
    print("[SUCCESS] -> Connection to SQL Server now live! ")
except Exception as e:
    print(f"SQL Server connection failed: {e}")
    print(""" How to troubleshoot
          > 1. Check server name is .env file correct
          > 2. Verify SQL Server is running
          > 3. Check Windows Authentication is enabled
          > 4. If certified is the problem, use Encrypt=no or TrustServerCertificate=yes
 """)

[SUCCESS] -> Connection to SQL Server now live! 


## 3. Connect to PostgreSQL

In [9]:
print("Connecting to PostgreSQL...")
print(f"  Server: {pg_host}")
print(f"  Database: {pg_db}")

Connecting to PostgreSQL...
  Server: localhost
  Database: teste


In [10]:
try:
    pg_conn= psycopg2.connect(
        host=pg_host,
        port=pg_port,
        database=pg_db,
        user=pg_user,
        password=pg_password
    )

    pg_cursor = pg_conn.cursor()
    pg_cursor.execute("SELECT version();")

    pg_version = pg_cursor.fetchone()[0]
    print("Connected to PostgreSQL")
    print(f" version: {pg_version[:50]}...\n")
except psycopg2.OperationalError as e:
      print(f"Postgres connection failed: {e}")
      print(""" How to troubleshoot
               > 1. Check Postgres is running
               > 2. Verify username + password
               > 3. Check database exists
            """)

except Exception as e:
     print("Unexpected error: {e}")

Connected to PostgreSQL
 version: PostgreSQL 13.23 (Debian 13.23-1.pgdg13+1) on x86_...



## 4. Define the tables to migrate

### Migration order

- Categories (no dependencies)
- Suppliers (no dependencies)
- Customers (no dependencies)
- Products (dependencies on Categories and suppliers)

In [11]:
tables_to_migrate = ['Categories', 'Suppliers', 'Customers', 'Products']
print(tables_to_migrate)

['Categories', 'Suppliers', 'Customers', 'Products']


In [12]:
print("Table to migrate: ")
for i, table in enumerate(tables_to_migrate,1):
    print(f"  {i}. {table}")

total_no_tbls = len(tables_to_migrate)
print(f"\nTotal  no of tables to migrate: {total_no_tbls}")

Table to migrate: 
  1. Categories
  2. Suppliers
  3. Customers
  4. Products

Total  no of tables to migrate: 4


### 5. Run pre-migration checks


In [13]:
print("=" * 60)
print(">>> ROW COUNTS")
print("=" * 60)

>>> ROW COUNTS


In [14]:
test_query = "SELECT COUNT(*) AS total_rows FROM Products"
sql_cursor.execute(test_query)

count = sql_cursor.fetchone()[0]
print(f"Results: {count}")

Results: 11


In [15]:
baseline_counts = {}

try:
    for table in tables_to_migrate:
        row_count_query = f"SELECT COUNT(*) AS total_rows FROM {table}"

        # Warning: Do not input SQL queries with f-strings in production (this is just for the tutorial)
        # Example
        ## table = "users; DROP TABLE users; --"
        ## query = f"SELECT COUNT(*) FROM {table}"

        sql_cursor.execute(row_count_query)
        count = sql_cursor.fetchone()[0]

        baseline_counts[table]= count
        print(f"{table:15} {count:>12} rows")

    total_rows = sum(baseline_counts.values())
    print(f"{'-' * 33}")
    print(f"{'TOTAL':15} {total_rows:>12,} rows")
    print("\n Baseline captured! ")
except Exception as e:
    print(f"Failed to get baseline counts: {e}")
    raise

Categories                 7 rows
Suppliers                 10 rows
Customers                  3 rows
Products                  11 rows
---------------------------------
TOTAL                     31 rows

 Baseline captured! 


In [16]:
print("=" * 50)
print(">>> Check 2: Null COUNTS (CustomerName)")
print("=" * 50)



>>> Check 2: Null COUNTS (CustomerName)


In [17]:
quality_issues = []

try:
    print("\nCHECK 2: NULL CHECKS (CustomerName)")
    sql_cursor.execute("""
        SELECT COUNT(*) AS null_count
        FROM Customers
        WHERE CustomerName IS NULL
        """)
    null_names = sql_cursor.fetchone()[0]
    if null_names > 0:
        quality_issues.append(f" > {null_names:,} customers with Null names...")
    print(quality_issues)

    print("\nCHECK 3: Invalid email formats")
    sql_cursor.execute(""" SELECT COUNT(*) AS invalid_email_count FROM Customers WHERE email LIKE '@Invalid' """)
    invalid_emails = sql_cursor.fetchone()[0]
    if invalid_emails > 0:
        quality_issues.append(f" > {invalid_emails:,} emails with invalid email formats...")
    print(quality_issues)

    print("\nCHECK 4: NEGATIVE PRODUCTS PRICES")
    sql_cursor.execute(""" SELECT COUNT(*) As negative_product_prices_count FROM Products WHERE UnitPrice < 0 """)
    negative_price = sql_cursor.fetchone()[0]
    if negative_price > 0:
        quality_issues.append(f" > {negative_price:,} prices contain negative prices...")
    print(quality_issues)

    print("\nCHECK 4: NEGATIVE STOCK QUANTITIES")
    sql_cursor.execute("""
                        SELECT COUNT(*) As negative_stock_quantities_count
                       FROM Products
                       WHERE StockQuantity < 0
                        """)
    negative_stock_quantities = sql_cursor.fetchone()[0]
    if negative_stock_quantities > 0:
        quality_issues.append(f" > {negative_stock_quantities:,} products contain negative values...")
    print(quality_issues)

    print("\nCHECK 6: ORPHANED FOREIGN KEYS")
    sql_cursor.execute("""SELECT COUNT(*) AS orphaned_records FROM Products  prod
                        WHERE NOT EXISTS (SELECT 1 FROM Suppliers sup
                        WHERE sup.SupplierID = prod.SupplierID)
                        """)
    orphaned_fks = sql_cursor.fetchone()[0]
    if orphaned_fks > 0:
        quality_issues.append(f" > {orphaned_fks:,} products with orphaned foreign keys...")
    print(quality_issues)

    if quality_issues:
         for issue in quality_issues:
              print(issue)
    else:
         print("No data quality issues identified")

except Exception as e:
        print(f"Error ==>> Unexpected issue {e}")
        raise


CHECK 2: NULL CHECKS (CustomerName)
[' > 1 customers with Null names...']

CHECK 3: Invalid email formats
[' > 1 customers with Null names...']

CHECK 4: NEGATIVE PRODUCTS PRICES
[' > 1 customers with Null names...']

CHECK 4: NEGATIVE STOCK QUANTITIES
[' > 1 customers with Null names...', ' > 1 products contain negative values...']

CHECK 6: ORPHANED FOREIGN KEYS
[' > 1 customers with Null names...', ' > 1 products contain negative values...', ' > 11 products with orphaned foreign keys...']
 > 1 customers with Null names...
 > 1 products contain negative values...
 > 11 products with orphaned foreign keys...


### 6. Get table schema

In [18]:
print("="*65)
print("ANALYSE TABLE SCHEMA")
print("="*65)

ANALYSE TABLE SCHEMA


In [19]:
table_shema = {}

try:
    for table in tables_to_migrate:
        schema_query = f"""SELECT
                        COLUMN_NAME,
                        DATA_TYPE,
                        CHARACTER_MAXIMUM_LENGTH,
                        IS_NULLABLE
                        FROM INFORMATION_SCHEMA.COLUMNS
                        WHERE TABLE_NAME = ?
                        ORDER BY ORDINAL_POSITION
                        """
        schema_df = pd.read_sql(schema_query,sql_conn,params=(table,))
        #schema_df = pd.read_sql(schema_query,sql_conn)
        table_shema[table] = schema_df
        print(f"="*65)
        print(f"{table:<12}")
        print(f"\n{schema_df}")
except Exception as e:
    pass

Categories  

    COLUMN_NAME DATA_TYPE  CHARACTER_MAXIMUM_LENGTH IS_NULLABLE
0    CategoryID       int                       NaN          NO
1  CategoryName   varchar                     100.0          NO
2   Description   varchar                     255.0          NO
Suppliers   

    COLUMN_NAME DATA_TYPE  CHARACTER_MAXIMUM_LENGTH IS_NULLABLE
0    SupplierID       int                       NaN          NO
1  SupplierName   varchar                     200.0          NO
2   ContactName   varchar                     150.0         YES
3       Country   varchar                     100.0         YES
4         Phone   varchar                      50.0         YES
Customers   

    COLUMN_NAME DATA_TYPE  CHARACTER_MAXIMUM_LENGTH IS_NULLABLE
0    CustomerID       int                       NaN          NO
1  CustomerName   varchar                     150.0         YES
2         Email   varchar                     150.0          NO
3         Phone   varchar                      50.0         YE

  schema_df = pd.read_sql(schema_query,sql_conn,params=(table,))
  schema_df = pd.read_sql(schema_query,sql_conn,params=(table,))
  schema_df = pd.read_sql(schema_query,sql_conn,params=(table,))
  schema_df = pd.read_sql(schema_query,sql_conn,params=(table,))


### 7. Define data type mapping

In [20]:
type_mapping = {
'int': 'INTEGER',
'bigint': 'BIGINT',
'smallint': 'SMALLINT',
'tinyint': 'SMALLINT',
'bit': 'BOOLEAN',
'decimal': 'NUMERIC',
'numeric': 'NUMERIC',
'money': 'NUMERIC(19,4)',
'smallmoney': 'NUMERIC(10,4)',
'float': 'DOUBLE PRECISION',
'real': 'REAL',
'datetime': 'TIMESTAMP',
'datetime2': 'TIMESTAMP',
'smalldatetime': 'TIMESTAMP',
'date': 'DATE',
'time': 'TIME',
'char': 'CHAR',
'varchar': 'VARCHAR',
'nchar': 'CHAR',
'nvarchar': 'VARCHAR',
'text': 'TEXT',
'ntext': 'TEXT'

}

In [21]:
print("SQL to PostgreSQL type mapping")
print()

for sql_type,pg_type in list(type_mapping.items()):
    print(f"  {sql_type:15} --->    {pg_type}")

SQL to PostgreSQL type mapping

  int             --->    INTEGER
  bigint          --->    BIGINT
  smallint        --->    SMALLINT
  tinyint         --->    SMALLINT
  bit             --->    BOOLEAN
  decimal         --->    NUMERIC
  numeric         --->    NUMERIC
  money           --->    NUMERIC(19,4)
  smallmoney      --->    NUMERIC(10,4)
  float           --->    DOUBLE PRECISION
  real            --->    REAL
  datetime        --->    TIMESTAMP
  datetime2       --->    TIMESTAMP
  smalldatetime   --->    TIMESTAMP
  date            --->    DATE
  time            --->    TIME
  char            --->    CHAR
  varchar         --->    VARCHAR
  nchar           --->    CHAR
  nvarchar        --->    VARCHAR
  text            --->    TEXT
  ntext           --->    TEXT


In [22]:
print("="*65)
print("CREATE TABLES IN POSTGRES")
print("="*65)

CREATE TABLES IN POSTGRES


In [23]:
try:
    for table in tables_to_migrate:
        schema = table_shema[table]
        pg_table = table.lower()

        pg_cursor.execute(f"DROP TABLE IF EXISTS {pg_table} CASCADE")
        column_definitions = []

        for idx,row in schema.iterrows():
            col_name = row['COLUMN_NAME'].lower()
            sql_type = row['DATA_TYPE']

            base_type = sql_type.lower()
            pg_type = type_mapping.get(base_type, 'TEXT')

            if idx == 0 and col_name.endswith('id')  and 'int' in sql_type.lower():
                column_definitions.append(f"{col_name} SERIAL PRIMARY KEY")
            else:
                column_definitions.append(f"{col_name} {pg_type}")

        column_string = ",\n        ".join(column_definitions)
        create_query = f"""
                        CREATE TABLE {pg_table} ({column_string})
                        """
        print(column_string)
        pg_cursor.execute(create_query)
        pg_conn.commit()
    print("\n + " + "="*55)
    print("[SUCCESS] ---> All tables created successfully!")

except psycopg2.Error as e:
    print(f"Postgres experienced an error while creating a table: {e}")
    pg_conn.rollback()
    raise

except Exception as e:
    print(f"Unexpected issue: {e}")

categoryid SERIAL PRIMARY KEY,
        categoryname VARCHAR,
        description VARCHAR
supplierid SERIAL PRIMARY KEY,
        suppliername VARCHAR,
        contactname VARCHAR,
        country VARCHAR,
        phone VARCHAR
customerid SERIAL PRIMARY KEY,
        customername VARCHAR,
        email VARCHAR,
        phone VARCHAR,
        country VARCHAR,
        createddate TIMESTAMP,
        isactive BOOLEAN
productid SERIAL PRIMARY KEY,
        productname VARCHAR,
        categoryid INTEGER,
        supplierid INTEGER,
        unitprice NUMERIC,
        stockquantity INTEGER,
        createddate TIMESTAMP

[SUCCESS] ---> All tables created successfully!


# 9. Test Migration with one table

In [24]:
print("="*65)
print("TESTING MIGRATION (SINGLE TABLE)")
print("="*65)

TESTING MIGRATION (SINGLE TABLE)


In [25]:
test_table = 'Customers'
pg_table = test_table.lower()

In [26]:
try:
    print("1. Read from SQL Server...")
    extract_query = f"SELECT * FROM {pg_table}"
    test_df = pd.read_sql(extract_query,sql_conn)

    print("2.  Transforming data types...")

    if 'IsActive' in test_df.columns:
        test_df['IsActive'] = test_df['IsActive'].astype('bool')
        print("[SUCCESS] --->> Converted IsActive: BIT ---> BOOLEAN")

        print("3. Prepare the data for loading")
        data_tuples =[tuple(row) for row in test_df.to_numpy()]
        columns = [col.lower() for col in test_df.columns]

        column_string = ', '.join(columns)
        placeholders = ', '.join(['%'] * len(columns))

        insert_query = f"""
                        INSERT INTO {pg_table} ({column_string})
                        VALUES %s
                        """
        print(f"  Prepared {len(data_tuples):,} rows")
        print("4. Insert data into PostgresSQL...")
        execute_values(pg_cursor,insert_query,data_tuples,page_size=1000)
        pg_conn.commit()

        print(f"Loaded {len(data_tuples):,} rows")

        print("5.  Verifying...")
        pg_cursor.execute(f"SELECT COUNT(*) AS total_rows FROM {pg_table}")
        pg_count =pg_cursor.fetchone()[0]

        sql_count = baseline_counts[test_table]

        if pg_count == sql_count:
            print(f"[SUCCESS] --> Verification passed: {pg_count:,} == {sql_count:,}")
        else:
            print(f"[FAILED] --> Count mismatch: {pg_count:,} != {sql_count:,}")

        print(f"\n {test_table} migration test successfully completed!")



except Exception as e:
    pg_conn.rollback()
    raise


1. Read from SQL Server...
2.  Transforming data types...
[SUCCESS] --->> Converted IsActive: BIT ---> BOOLEAN
3. Prepare the data for loading
  Prepared 3 rows
4. Insert data into PostgresSQL...
Loaded 3 rows
5.  Verifying...
[SUCCESS] --> Verification passed: 3 == 3

 Customers migration test successfully completed!


  test_df = pd.read_sql(extract_query,sql_conn)


# 10. Migrate remaining tables

In [27]:
print("="* 65)
print("MIGRATE REMAINING TABLES")
print("="* 65)

MIGRATE REMAINING TABLES


In [None]:
remaining_tables = [t for t in tables_to_migrate if t != 'Customers']

for table in remaining_tables:
    pg_table = table.lower()

    print(f"Migrating {table} --> {pg_table}...")

    try:
        print("1. Reading from SQL Server...")
        extract_query = f"SLEECT * FROM {table}"
        sql_df = pd.read_sql(extract_query,sql_conn)
        print(f"  Read {len(sql_df):,} rows \n\n")

        print("2.  Preparing data...")
        data_tuples = [tuple(row) for row in sql_df.to_numpy()]
        columns = [col.lower() for col in sql_df.columns]
        columns_string = ', '.join(columns)

    except Exception as e:
        pass

: 