Data Migration process using python scripting 

The Migration process is between SSMS[source] to MySQL[Target]. The Tables will be migrated from SSMS to MySQL using pyhton script and most of the data present present in the tables need to be converted based on the datatype and make it into convivence phase.

In [None]:
##Packages 

import pandas as pd
import sqlalchemy
import chardet                       ## Conversion of any characterization within the code
from sqlalchemy import create_engine
from sqlalchemy.sql import quoted_name
import binascii
import os
import time


The credentials block for the connectivity of the source and target Database's 

In [None]:
##connectivity

ssms_connection_params = {
    'drivername': "Driver_name",
    'username': "User_name",
    'password': "Password",
    'host': "DB_host",
    'port': "4204",
    'database': "DB",
    'query': {'driver': 'ODBC Driver 17 for SQL Server'},
}
mysql_connection_params = {
    'drivername': 'Driver_Name',
    'username': 'Username',
    'password': 'Password',
    'host': 'DB_Host',
    'port': '3307',
    'database': 'DB',  
}


In [None]:
#Bridge_connections

ssms_engine = create_engine(f"{ssms_connection_params['drivername']}://"
                            f"{ssms_connection_params['username']}:{ssms_connection_params['password']}@"
                            f"{ssms_connection_params['host']}:{ssms_connection_params['port']}/"
                            f"{ssms_connection_params['database']}?driver={ssms_connection_params['query']['driver']}")

mysql_engine = create_engine(f"{mysql_connection_params['drivername']}://"
                             f"{mysql_connection_params['username']}:{mysql_connection_params['password']}@"
                             f"{mysql_connection_params['host']}:{mysql_connection_params['port']}/"
                             f"{mysql_connection_params['database']})

In [None]:

def migrate_table(ssms_engine, mysql_engine, table_name):
    start_time = time.time()  # Start time for migration
    try:
        query = f"SELECT * FROM [{table_name}];"
        ssms_data = pd.read_sql_query(query, ssms_engine)

        # Your data transformations here
        
        ssms_data['timestamp'] = ssms_data['timestamp'].apply(lambda x: binascii.hexlify(x).decode('utf-8'))
        
        target_table_name = f"{table_name}_AE_LIVE"
        ssms_data.to_sql(target_table_name, mysql_engine, index=False, if_exists='append')
        
        end_time = time.time()  # End time for migration
        time_log = end_time - start_time  # Calculate time taken for migration
        
        migration_status = "Success"
        log_data = {'Database': ssms_connection_params['database'],
                    'Schema': "dbo",  # Fill this if you have schema information
                    'Table': table_name,
                    'Migration': migration_status,
                    'Duration': time_log,
                    'Table Count': len(ssms_data)}  # Store table count in log data
        success_log = pd.DataFrame([log_data])
        
        success_csv_creation = success_log.to_csv("successful_migrations.csv", mode='a', index=False, header=not os.path.exists("successful_migrations.csv"))
        
        if success_csv_creation is None:
            print(f"Successful migration log for table '{table_name}' created successfully.")
        else:
            print(f"Error creating successful migration log for table '{table_name}'.")

        print(f"Table '{table_name}' converted & Migrated --- Successfully.")
        return 1  # Return 1 for successful migration
    except Exception as e:
        migration_status = "Failed"
        end_time = time.time()  # End time for migration even if failed
        time_log = end_time - start_time  # Calculate time taken for migration
        
        log_data = {'Database': ssms_connection_params['database'],
                    'Schema': "dbo",  # Fill this if you have schema information
                    'Table': table_name,
                    'Migration': migration_status,
                    'Duration': time_log,
                    'Table Count': ""}  # Leave table count blank for failed migration
        failure_log = pd.DataFrame([log_data])
        
        failure_csv_creation = failure_log.to_csv("failed_migrations.csv", mode='a', index=False, header=not os.path.exists("failed_migrations.csv"))
        
        if failure_csv_creation is None:
            print(f"Failed migration log for table '{table_name}' created successfully.")
        else:
            print(f"Error creating failed migration log for table '{table_name}': {e}")
        
        return 0  # Return 0 for failed migration

In [None]:
csv_file_path = "path of the CSV file"            
if os.path.exists(csv_file_path):
    table_names_to_transfer = pd.read_csv(csv_file_path)['table_name'].tolist()               ## The table_name column should be specified 
                                                                                              ## in the CSV file.
    
    success_count = 0
    fail_count = 0
                                                                                                             
    for table_name in table_names_to_transfer:
        result = migrate_table(ssms_engine, mysql_engine, table_name)
        if result == 1:
            success_count += 1
        else:
            fail_count += 1
    
    # Count the number of tables in successful and failed migrations CSV files
    if os.path.exists("successful_migrations.csv"):
        success_table_count = len(pd.read_csv("successful_migrations.csv"))
    else:
        success_table_count = 0
    
    if os.path.exists("failed_migrations.csv"):
        fail_table_count = len(pd.read_csv("failed_migrations.csv"))
    else:
        fail_table_count = 0
    
    print(f"Successful migrations: {success_count}, Total tables in success CSV: {success_table_count}")
    print(f"Failed migrations: {fail_count}, Total tables in fail CSV: {fail_table_count}")
    
else:
    print("CSV file not found.")

ssms_engine.dispose()
mysql_engine.dispose()


# Selection codes with major changes 

In [None]:


# # def migrate_table(ssms_engine, mysql_engine, table_name):
# #     try:
# #         query = f"SELECT * FROM [{table_name}];"
# #         ssms_data = pd.read_sql_query(query, ssms_engine)
    
# #         # Convert varbinary data to hexadecimal and then to bytes
# #         ssms_data['timestamp'] = ssms_data['timestamp'].apply(lambda x: binascii.hexlify(x).decode('utf-8'))
# #         ssms_data['Resource'] = ssms_data['Resource'].apply(lambda x: binascii.hexlify(x).decode('utf-8'))
# #         ssms_data['Metadata'] = ssms_data['Metadata'].apply(lambda x: binascii.hexlify(x).decode('utf-8'))
# #         ssms_data['BLOB Reference'] = ssms_data['BLOB Reference'].apply(lambda x: binascii.hexlify(x).decode('utf-8'))
# #         ssms_data['Signature'] = ssms_data['Signature'].apply(lambda x: binascii.hexlify(x).decode('utf-8') if x is not None else 'NULL')
# #         ssms_data['column name'] = ssms_data['column name'].apply(lambda x: binascii.hexlify(x).decode('utf-8')[:255] if x is not None else 'NULL')
# #         ssms_data['Descritption'] = ssms_data['Description'].apply(lambda x: binascii.hexlify(unidecode.unidecode(x).encode()))
        
# #         # Replace space at the end of 'Technician Counter Input Date ' with an underscore
# #         ssms_data = ssms_data.rename(columns={'Server Instance ID ': 'Server_Instance_ID_'})
        
# #         # Modify the target table name
# #         target_table_name = f"{table_name}_AE_LIVE"
        
# #         # Use target_table_name instead of table_name when writing to MySQL
# #         ssms_data.to_sql(target_table_name, mysql_engine, index=False, if_exists='replace')
        
# #         print(f"Table '{table_name}' converted & Migrated  --- Sucessfully.")
# #     except Exception as e:
# #         print(f"Error transferring table '{table_name}': {e}")
        
# def migrate_table(ssms_engine, mysql_engine, table_name):
#     try:
#         query = f"SELECT * FROM [{table_name}];"
#         ssms_data = pd.read_sql_query(query, ssms_engine)
        
#         ssms_data['timestamp'] = ssms_data['timestamp'].apply(lambda x: binascii.hexlify(x).decode('utf-8'))
      
       
#      # Check if the 'BLOB Reference' column exists in the dataframe
        
#         if 'Name' in ssms_data.columns:
#             ssms_data['Name'] = ssms_data['Name'].apply(lambda x: binascii.hexlify(x).decode('utf-8'))
#         if 'Description' in ssms_data.columns:
#             ssms_data['Description'] = ssms_data['Description'].apply(lambda x: binascii.hexlify(x).decode('utf-8'))
#         if 'Resources' in ssms_data.columns:
#             ssms_data['Resources'] = ssms_data['Resources'].apply(lambda x: binascii.hexlify(unidecode.unidecode(x).encode()))
#         if 'BLOB Reference' in ssms_data.columns:
#             ssms_data['BLOB Reference'] = ssms_data['BLOB Reference'].apply(lambda x: binascii.hexlify(x).decode('utf-8')(unidecode.unidecode(x).encode())[:255] if x is not None else 'NULL')
         
#         if 'User Code' in ssms_data.columns:
#             ssms_data['User Code'] = ssms_data['User Code'].apply(lambda x: binascii.hexlify(x).decode('utf-8')[:255] if x is not None else 'NULL')
#         if 'User AL Code' in ssms_data.columns:
#             ssms_data['User AL Code'] = ssms_data['User AL Code'].apply(lambda x: binascii.hexlify(x).decode('utf-8')[:255] if x is not None else 'NULL')
       
#         if 'Metadata' in ssms_data.columns:
#             ssms_data['Metadata'] = ssms_data['Metadata'].apply(lambda x: binascii.hexlify(x).decode('utf-8') if x is not None else 'NULL')
            
#         # Check if the 'BLOB Reference' column exists in the dataframe
#         if 'modifiedtables' in ssms_data.columns:
#             ssms_data['modifiedtables'] = ssms_data['modifiedtables'].apply(lambda x: binascii.hexlify(x).decode('utf-8'))
        
#         # Replace space at the end of 'Server Instance ID ' with an underscore
#         ssms_data = ssms_data.rename(columns={'Server Instance ID ': 'Server_Instance_ID_'})
        
#         # Modify the target table name
#         target_table_name = f"{table_name}_AE_LIVE"
        
#         # Use target_table_name instead of table_name when writing to MySQL
#         ssms_data.to_sql(target_table_name, mysql_engine, index=False, if_exists='replace')
        
#         print(f"Table '{table_name}' converted & Migrated  --- Successfully.")
#     except Exception as e:
#         print(f"Error transferring table '{table_name}': {e}")


# # Read table names from CSV file
# csv_file_path = "C:\\Users\\HXK0531A\\Desktop\\test_tables.csv"  # Specify the path to your CSV file
# if os.path.exists(csv_file_path):
#     table_names_to_transfer = pd.read_csv(csv_file_path)['table_name'].tolist()
#     for table_name in table_names_to_transfer:
#         migrate_table(ssms_engine, mysql_engine, table_name)
# else:
#     print("CSV file not found.")

# ssms_engine.dispose()                                                                                                 
# mysql_engine.dispose()
