In [385]:
import psycopg2
import pandas.io.sql as sqlio
import pandas as pd
from io import StringIO
import numpy as np
import pyodbc
import uuid
import chardet
import struct

In [386]:
conn = psycopg2.connect(
    host='host-name',
    database='postgres',
    user='postgres',
    password='my-password',
)

conn_mssql = pyodbc.connect('DSN=MSSQLServerDatabase;UID=admin;PWD=my-password')

mssql_db_name = 'master'

pg_db_name = 'master_dbo'


def handle_datetimeoffset(dto_value):
    # ref: https://github.com/mkleehammer/pyodbc/issues/134#issuecomment-281739794
    tup = struct.unpack("<6hI2h", dto_value)  # e.g., (2017, 3, 16, 10, 35, 18, 0, -6, 0)
    tweaked = [tup[i] // 100 if i == 6 else tup[i] for i in range(len(tup))]
    return "{:04d}-{:02d}-{:02d} {:02d}:{:02d}:{:02d}.{:07d} {:+03d}:{:02d}".format(*tweaked)
conn_mssql.add_output_converter(-155, handle_datetimeoffset)

In [387]:
def get_mssql_column_types(par_table_name, par_conn_mssql):
    global mssql_db_name
    cursor = conn_mssql.cursor()
    cursor.execute(f"USE {mssql_db_name}")
    cursor.close()
    sql = '''
        SELECT COLUMN_NAME, DATA_TYPE
        FROM INFORMATION_SCHEMA.COLUMNS
        where TABLE_SCHEMA = 'dbo'
        and TABLE_CATALOG = \'''' + mssql_db_name + '''\'
        and TABLE_NAME = \'''' + par_table_name + '''\'
        order by ORDINAL_POSITION 
        '''
    data = sqlio.read_sql_query(sql, par_conn_mssql)
    return dict(zip(data.COLUMN_NAME.str.lower(), data.DATA_TYPE))

def get_pg_column_types(par_table_name, par_conn):
    global pg_db_name
    sql = '''
        SELECT column_name, data_type
        FROM information_schema.columns
        where table_catalog = 'postgres'
        and table_schema = \'''' + pg_db_name + '''\'
        and table_name = \'''' + par_table_name.lower() + '''\'
        order by ordinal_position 
        '''
    data = sqlio.read_sql_query(sql, par_conn)
    return dict(zip(data.column_name.str.lower(), data.data_type))

def print_table_column_types(par_table_name, par_conn, par_conn_mssql):
    global mssql_db_name, pg_db_name
    cursor = conn_mssql.cursor()
    cursor.execute(f"USE {mssql_db_name}")
    cursor.close()
    sql = '''
        SELECT COLUMN_NAME, IS_NULLABLE , DATA_TYPE , CHARACTER_MAXIMUM_LENGTH , COLUMN_DEFAULT
        FROM INFORMATION_SCHEMA.COLUMNS
        where TABLE_SCHEMA = 'dbo'
        and TABLE_CATALOG = \'''' + mssql_db_name + '''\'
        and TABLE_NAME = \'''' + par_table_name + '''\'
        order by ORDINAL_POSITION 
        '''
    data = sqlio.read_sql_query(sql, par_conn_mssql)
    print('SQL SERVER Table info: \n', data, '\n')
    sql = '''
        select column_name, is_nullable, data_type, character_maximum_length , column_default
        from information_schema.columns
        where table_catalog = 'postgres'
        and table_schema = \'''' + pg_db_name + '''\'
        and table_name = \'''' + par_table_name.lower() + '''\'
        order by ordinal_position
        '''
    data = sqlio.read_sql_query(sql, par_conn)
    print('PostgreSQL Table info: \n', data, '\n')
    
def change_bit_type_column(par_table_name, par_conn, par_conn_mssql):
    global mssql_db_name, pg_db_name
    cursor = conn_mssql.cursor()
    cursor.execute(f"USE {mssql_db_name}")
    cursor.close()
    sql = '''
        SELECT COLUMN_NAME, IS_NULLABLE , DATA_TYPE , CHARACTER_MAXIMUM_LENGTH , COLUMN_DEFAULT
        FROM INFORMATION_SCHEMA.COLUMNS
        where TABLE_SCHEMA = 'dbo'
        and TABLE_CATALOG = \'''' + mssql_db_name + '''\'
        and TABLE_NAME = \'''' + par_table_name + '''\'
        order by ORDINAL_POSITION 
        '''
    data = sqlio.read_sql_query(sql, par_conn_mssql)
    for i,row in data.iterrows():
        if row['DATA_TYPE'] == 'bit':            
            col_name = row['COLUMN_NAME'].lower()
            print(f"Trying to change table '{par_table_name}' column '{col_name}' type to 'bool'")
            par_table_name.lower()            
            sql_drop_default = 'select 1'
            sql_set_default = 'select 1'
            if row['COLUMN_DEFAULT'] is not None: 
                sql_drop_default = f"ALTER TABLE {pg_db_name}.{par_table_name.lower()} ALTER COLUMN {col_name} DROP DEFAULT;"
                default_value = row['COLUMN_DEFAULT'].replace('(','').replace(')','')
                if default_value == '0':
                    default_value = 'False'
                else:
                    default_value = 'True'
                sql_set_default = f"ALTER TABLE {pg_db_name}.{par_table_name.lower()} ALTER COLUMN {col_name} SET DEFAULT {default_value};"
            # Helper function from https://pretius.com/postgresql-stop-worrying-about-table-and-view-dependencies/
            # https://gist.github.com/briandignan/03ef42e78434658cf27f052e2f0798e8
            sql_pre = f"select deps_save_and_drop_dependencies('{pg_db_name}', '{par_table_name.lower()}');"
            sql = f"ALTER TABLE {pg_db_name}.{par_table_name.lower()} ALTER COLUMN {col_name} TYPE bool USING cast ({col_name} as integer)=1;"
            sql_fix_numeric_in_view = '''
                with dsd_new as (
                SELECT deps_id, REPLACE(deps_ddl_to_run, '= (0)::numeric', 'is false' ) as deps_ddl_to_run_new
                FROM public.deps_saved_ddl
                WHERE deps_ddl_to_run like '%(0)::numeric%'
                )
                update deps_saved_ddl as dsd_old
                set deps_ddl_to_run = dsd_new.deps_ddl_to_run_new
                from dsd_new
                where dsd_old.deps_id = dsd_new.deps_id
            '''
            sql_post = f"select deps_restore_dependencies('{pg_db_name}', '{par_table_name.lower()}');"
            cursor = par_conn.cursor()
            try:      
                cursor.execute(sql_pre)
                cursor.execute(sql_drop_default)
                cursor.execute(sql)
                cursor.execute(sql_set_default)
                cursor.execute(sql_fix_numeric_in_view)
                cursor.execute(sql_post)
                conn.commit()
                print(f"Changed table '{par_table_name}' column '{col_name}' type to 'bool'")
                cursor.close()
            except (Exception, psycopg2.DatabaseError) as error:
                print("Error: %s" % error)
                conn.rollback()
                cursor.close()

In [388]:
cursor = conn_mssql.cursor()
cursor.execute(f"USE {mssql_db_name}")
cursor.close()
conn_mssql.commit()
sql = '''
SELECT TABLE_NAME, TABLE_CATALOG
FROM INFORMATION_SCHEMA.TABLES
where TABLE_SCHEMA = 'dbo'
and TABLE_CATALOG = \'''' + mssql_db_name + '''\'
and TABLE_TYPE = 'BASE TABLE'
order by 1
'''
table_data = sqlio.read_sql_query(sql, conn_mssql)

In [None]:
global_count = {
    'work':0,
    'did not work':0,
    'empty':0,
    'does not exist in pg':0,
    'different columns':0,
}

for i,table in table_data.iterrows():
    # if i < 334:
    #     continue
    
    table_name = table['TABLE_NAME']
    print(i, table_name)
    
    # Fetch data from MSSQL
    sql = '''
    select top 10 * 
    from ''' + mssql_db_name + '''.dbo.[''' + table_name + ''']
    '''
    data_mssql = sqlio.read_sql_query(sql, conn_mssql)
    
    if len(data_mssql) == 0:
        global_count['empty'] += 1
        continue
    
    # Copy data to PostgreSQL
    pg_table_name = table_name.lower()
    buffer = StringIO()
    data_buff = data_mssql.copy()
    data_buff.columns = data_buff.columns.str.lower()
    # If the sable has a Generated column, here delete the conlumn from the DataFrame
    if 'company_id' in data_buff.columns and table_name != 'WidgetSection_Pre_UpMigration':
        del data_buff['company_id']
    if 'diagram_id' in data_buff.columns:
        del data_buff['diagram_id']
    if table_name in ['WorkItemName', 'LookupField', 'Country']:
        del data_buff['sortorder']
    
    mssql_col_type = get_mssql_column_types(table_name, conn_mssql)
    pg_col_type = get_pg_column_types(table_name, conn)
    if len(pg_col_type) != len(mssql_col_type):
        print('Error: different columns between table')
        global_count['different columns'] += 1
        continue
    
    has_byte_column = False
    for col in data_buff:
        # If there is bytes type column, convert to quoted bytes
        if type(data_buff[col][0]) == bytes:
            has_byte_column = True
            # data_buff[col] = data_buff[col].apply(lambda x: psycopg2.Binary(x).getquoted())
        # Replace \r\n to \n
        if type(data_buff[col][0]) == str:            
            data_buff[col] = data_buff[col].replace(to_replace="\n", value="\\n", regex=True)
            data_buff[col] = data_buff[col].replace(to_replace="\r", value="\\r", regex=True)
        if mssql_col_type[col] == 'int' or mssql_col_type[col] == 'bigint':
            data_buff[col] = data_buff[col].apply(lambda x: None if pd.isnull(x) else str(int(x)))
        if mssql_col_type[col] == 'datetime':
            data_buff[col] = data_buff[col].astype(object).where(data_buff[col].notnull(), None)
            
            
    #data_buff = data_buff.fillna(value=np.nan)
    
    # Saves the data for debug purpose
    #data_buff.to_csv('mssql.csv', index = False)
    
    # Check which CSV separator to use
    contains_tab = np.column_stack([data_buff[col].astype(str).str.contains(r"\t", na=False) for col in data_buff]).any()
    csv_separator = '\t'
    if contains_tab:
        csv_separator = ','
            
    data_buff.to_csv(buffer, index = False, header=False, sep=csv_separator, line_terminator = '\r')
    buffer.seek(0)
    cursor = conn.cursor()
    table_exist_in_pg = False
    try:
        cursor.execute(f'TRUNCATE {pg_db_name}.' + pg_table_name)
        conn.commit()
        table_exist_in_pg = True
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        conn.rollback()
        cursor.close()
        global_count['does not exist in pg'] += 1
    if table_exist_in_pg:
        try:
            if False:#(not has_byte_column):
                cursor.copy_from(buffer, f"{pg_db_name}." + pg_table_name, sep=csv_separator, null='NULL', columns=data_buff.columns)
                conn.commit()
            else:
                col_list = data_buff.rename(columns = {'order':'\"Order\"'}).columns
                for i,row in data_buff.iterrows():
                    sql_insert = f"insert into {pg_db_name}." + pg_table_name + "(" \
                        + ', '.join(col_list.to_list()) + ") VALUES (" \
                        + ', '.join(np.repeat('%s', len(data_buff.columns))) + ");"                    
                    cursor.execute(sql_insert, tuple(row))
                    conn.commit()
            global_count['work'] += 1
        except (Exception, psycopg2.DatabaseError) as error:
            print("Error: %s" % error)
            conn.rollback()
            cursor.close()
            if str(error).__contains__('invalid input syntax for type numeric') or \
                str(error).__contains__('is of type numeric but expression is of type boolean'):
                change_bit_type_column(table_name, conn, conn_mssql)
            #print_table_column_types(table_name, conn, conn_mssql)
            print(sql_insert, tuple(row))
            global_count['did not work'] += 1
            break
        
        # Compare data between MSSQL and PostgreSQL
        sql_fetch = '''
        select * 
        from ''' + pg_db_name + '''.''' + pg_table_name + '''
        limit 10
        '''
        data2 = sqlio.read_sql_query(sql_fetch, conn)
        data_buff = data_mssql.copy()
        data_buff.columns = data_buff.columns.str.lower()
        is_table_equal = True
        column_equallity_report = ''
        # If the sable has a Generated column, here delete the conlumn from the DataFrame
        if table_name in ['WorkItemName', 'LookupField', 'Country']:
            del data_buff['sortorder']
            del data2['sortorder']
        if table_name in ['sysdiagrams']:
            del data_buff['diagram_id']
            del data2['diagram_id']
        for column in data_buff.columns:
            if(type(data_buff[column][0]) == uuid.UUID):
                is_equal = data2[column].equals(data_buff[column].astype(str))
            elif(type(data_buff[column][0]) == bytes):
                is_equal = data2[column].apply(lambda x: bytes(x)).equals(data_buff[column])
            elif column != 'order' and type(data2[column][0]) == pd._libs.tslibs.timestamps.Timestamp and type(data_buff[column][0]) == str:
                is_equal = data2[column].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))\
                    .equals(data_buff[column].apply(lambda x: None if pd.isnull(x) else x.split('.')[0]))
            elif mssql_col_type[column] == 'uniqueidentifier':
                is_equal = data2[column].equals(data_buff[column].str.lower())
            else:
                if column != 'order':
                    is_equal = data2[column].equals(data_buff[column])
                else:
                    is_equal = data2['Order'].equals(data_buff[column])
            if not is_equal:
                is_equal = (data_buff[column].astype(str) == data2[column].astype(str)).all()
            column_equallity_report += 'Column: ' + column + ' ' +  str(type(data_buff[column][0])) + ' ' + str(is_equal) + "\n"
            if not is_equal:
                is_table_equal = False
        if not is_table_equal:
            print(i, table_name)
            print(column_equallity_report)
            #print_table_column_types(table_name, conn, conn_mssql)
            break

    # if table_name == 'AccessTicket':
    #     break

    # if i == 10:
    #     break
    
global_count

In [394]:
import plotly.express as px

df = pd.DataFrame(list(global_count.items()),columns = ['Type','Count'])


fig = px.bar(df, x="Type", y="Count",
             barmode='group',
             height=400,
             text=df["Count"],
             )

fig.update_layout(
    title="Summary of MSSQL Table's data migrated to PostgreSQL",
    xaxis_title="Result type",
    yaxis_title="Number of Tables",
    )
    
fig.show()

## Debug if data is the same on MSSQL and PostgreSQL

In [None]:
sql = '''
select top 10 * 
from [dbo].[''' + table_name + ''']
'''
print(table_name)
data_mssql = sqlio.read_sql_query(sql, conn_mssql)
data_mssql

In [None]:
sql = '''
select * 
FROM ''' + pg_db_name + '''.''' + table_name + '''
limit 10
'''
data2 = sqlio.read_sql_query(sql, conn)
data2