In [1]:
import mariadb
import sys
import pandas as pd
import pyodbc
import numpy as np
from collections import Counter
import re
from datetime import datetime
import time
import logging

In [2]:
# 查詢 MSSQL 資料庫所有資料表的
def mssql_schema_query():
    return """
                ;WITH ColumnInfo AS (
                SELECT 
                    c.TABLE_NAME,
                    c.COLUMN_NAME,
                    CASE 
                        WHEN c.DATA_TYPE IN ('varchar', 'nvarchar', 'char', 'nchar')
                            THEN CONCAT(c.DATA_TYPE, '(', c.CHARACTER_MAXIMUM_LENGTH, ')')
                        ELSE c.DATA_TYPE
                    END AS DATA_TYPE,
                    -- 主鍵標記
                    CASE 
                        WHEN pk.COLUMN_NAME IS NOT NULL THEN 'YES'
                        ELSE 'NO'
                    END AS IS_PRIMARY_KEY,
                    -- 外鍵標記（正確地從 referential_constraints 得來）
                    CASE 
                        WHEN fk_col.COLUMN_NAME IS NOT NULL THEN 'YES'
                        ELSE 'NO'
                    END AS IS_FOREIGN_KEY,
                    -- 外鍵關聯資訊
                    ref_tab.name AS REFERENCES_TABLE,
                    ref_col.name AS REFERENCES_COLUMN,
                    fk.delete_referential_action_desc AS ON_DELETE,
                    fk.update_referential_action_desc AS ON_UPDATE,
                    -- 索引型態
                    ISNULL(idx.INDEX_TYPE, 'NONE') AS INDEX_TYPE

                FROM INFORMATION_SCHEMA.COLUMNS c

                -- 主鍵 JOIN
                LEFT JOIN (
                    SELECT kcu.TABLE_NAME, kcu.COLUMN_NAME, tc.CONSTRAINT_NAME
                    FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
                    JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
                        ON tc.CONSTRAINT_NAME = kcu.CONSTRAINT_NAME
                    WHERE tc.CONSTRAINT_TYPE = 'PRIMARY KEY'
                ) pk ON c.TABLE_NAME = pk.TABLE_NAME AND c.COLUMN_NAME = pk.COLUMN_NAME

                -- 外鍵 JOIN：取得正確的外鍵欄位
                LEFT JOIN (
                    SELECT kcu.TABLE_NAME, kcu.COLUMN_NAME, rc.CONSTRAINT_NAME
                    FROM INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS rc
                    JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
                        ON rc.CONSTRAINT_NAME = kcu.CONSTRAINT_NAME
                ) fk_col ON c.TABLE_NAME = fk_col.TABLE_NAME AND c.COLUMN_NAME = fk_col.COLUMN_NAME

                -- sys.foreign_key_columns → 詳細外鍵行為
                LEFT JOIN sys.foreign_key_columns fkc 
                    ON c.COLUMN_NAME = COL_NAME(fkc.parent_object_id, fkc.parent_column_id)
                    AND OBJECT_NAME(fkc.parent_object_id) = c.TABLE_NAME
                LEFT JOIN sys.foreign_keys fk 
                    ON fk.object_id = fkc.constraint_object_id
                LEFT JOIN sys.tables ref_tab 
                    ON ref_tab.object_id = fkc.referenced_object_id
                LEFT JOIN sys.columns ref_col 
                    ON ref_col.object_id = fkc.referenced_object_id 
                    AND ref_col.column_id = fkc.referenced_column_id

                -- 對應 sys.columns 的 object_id 與 column_id
                LEFT JOIN sys.columns col 
                    ON col.object_id = OBJECT_ID(c.TABLE_NAME) AND col.name = c.COLUMN_NAME

                -- 查詢該欄是否存在於索引中
                LEFT JOIN (
                    SELECT ic.object_id, ic.column_id,
                        MAX(CASE i.type 
                            WHEN 1 THEN 'CLUSTERED' 
                            WHEN 2 THEN 'NONCLUSTERED' 
                        END) AS INDEX_TYPE
                    FROM sys.indexes i
                    JOIN sys.index_columns ic 
                        ON i.object_id = ic.object_id AND i.index_id = ic.index_id
                    WHERE i.is_hypothetical = 0
                    GROUP BY ic.object_id, ic.column_id
                ) idx ON idx.object_id = col.object_id AND idx.column_id = col.column_id

                -- 只查自建表（排除系統表）
                WHERE EXISTS (
                    SELECT 1 
                    FROM sys.tables t
                    WHERE t.name = c.TABLE_NAME 
                    AND t.is_ms_shipped = 0
                    AND SCHEMA_NAME(t.schema_id) = 'dbo'
                )
            )

            -- 去除多筆對應的外鍵（如複合鍵），只保留一筆以避免重複欄位出現
            SELECT *
            FROM (
                SELECT *, ROW_NUMBER() OVER (PARTITION BY TABLE_NAME, COLUMN_NAME ORDER BY REFERENCES_TABLE) AS rn
                FROM ColumnInfo
            ) t
            WHERE rn = 1
            ORDER BY TABLE_NAME, 
                (SELECT ORDINAL_POSITION 
                FROM INFORMATION_SCHEMA.COLUMNS 
                WHERE TABLE_NAME = t.TABLE_NAME AND COLUMN_NAME = t.COLUMN_NAME);

        """

In [3]:
# 查詢 MariaDB 資料庫所有資料表的語法字串
def mariaDB_schema_query(table_name):
    return  f"""SELECT 
    c.TABLE_NAME,
    c.COLUMN_NAME,
    CONCAT(c.DATA_TYPE,
        IFNULL(CONCAT('(', c.CHARACTER_MAXIMUM_LENGTH, ')'), '')
    ) AS DATA_TYPE,

    MAX(CASE 
        WHEN tc.CONSTRAINT_TYPE = 'PRIMARY KEY' THEN 'YES'
        ELSE 'NO'
    END) AS IS_PRIMARY_KEY,

    MAX(CASE 
        WHEN kcu.REFERENCED_TABLE_NAME IS NOT NULL THEN 'YES'
        ELSE 'NO'
    END) AS IS_FOREIGN_KEY,

    MAX(kcu.REFERENCED_TABLE_NAME) AS REFERENCES_TABLE,
    MAX(kcu.REFERENCED_COLUMN_NAME) AS REFERENCES_COLUMN,

    MAX(rc.DELETE_RULE) AS ON_DELETE,
    MAX(rc.UPDATE_RULE) AS ON_UPDATE,

    MAX(
        IF(s.INDEX_NAME IS NOT NULL, 
            IF(s.NON_UNIQUE = 0, 'CLUSTERED', 'NONCLUSTERED'),
            'NONE'
        )
    ) AS INDEX_TYPE

FROM information_schema.COLUMNS c

LEFT JOIN information_schema.KEY_COLUMN_USAGE kcu
    ON c.TABLE_SCHEMA = kcu.TABLE_SCHEMA
    AND c.TABLE_NAME = kcu.TABLE_NAME
    AND c.COLUMN_NAME = kcu.COLUMN_NAME

LEFT JOIN information_schema.TABLE_CONSTRAINTS tc
    ON tc.CONSTRAINT_NAME = kcu.CONSTRAINT_NAME
    AND tc.TABLE_NAME = kcu.TABLE_NAME
    AND tc.TABLE_SCHEMA = kcu.TABLE_SCHEMA

LEFT JOIN information_schema.REFERENTIAL_CONSTRAINTS rc
    ON rc.CONSTRAINT_NAME = kcu.CONSTRAINT_NAME
    AND rc.CONSTRAINT_SCHEMA = kcu.TABLE_SCHEMA

LEFT JOIN information_schema.STATISTICS s
    ON s.TABLE_SCHEMA = c.TABLE_SCHEMA
    AND s.TABLE_NAME = c.TABLE_NAME
    AND s.COLUMN_NAME = c.COLUMN_NAME

WHERE c.TABLE_SCHEMA = 'test'
  AND c.TABLE_NAME = '{table_name}'  -- ✅ 只查這張表

GROUP BY 
    c.TABLE_NAME, c.COLUMN_NAME, c.DATA_TYPE, c.CHARACTER_MAXIMUM_LENGTH

ORDER BY c.TABLE_NAME, c.ORDINAL_POSITION;

"""

In [4]:
# 查詢 MSSQL 中資料表的各項資訊
def get_mssql_table_details():
    try:
        # 連線到 MSSQL
        mssql_conn = pyodbc.connect(
            'DRIVER={ODBC Driver 17 for SQL Server};'
            'SERVER=LAPTOP-J1OAU7VN;'
            'DATABASE=master;'
            'UID=sa;'
            'PWD=6124Nok45'
        )
        mssql_cursor = mssql_conn.cursor()
        print("✅ 成功連線 MSSQL")
        # 查詢使用者自定義的所有資料表資訊
        mssql_schema_querystr = mssql_schema_query()
        mssql_cursor.execute(mssql_schema_querystr)

        # 從 MSSQL 取得所有查詢結果
        columns = mssql_cursor.fetchall()

        # 將所有資料表定義儲存在 schema_info 中
        schema_info = {}
        table_name = columns[0][0]  # columns[0][0] 代表查詢的第一筆結果的第一個欄位，第一個欄位為資料表名稱
        schema_info[table_name] = []
        for col in columns:
            if col[0] != table_name:
                table_name = col[0]
                # 創建新資料表的 key
                schema_info[table_name] = []
            # 每列 col[1:10] 儲存該資料表的某個屬性的所有資訊
            schema_info[table_name].append(col[1:10])

        # 輸出資料表及其欄位資訊
        print("使用者自定義資料表結構如下：")
        for table in schema_info:
            print(f'\n{table}')
            for col in schema_info[table]:
                print(f'   ├─ {col}')
    except Exception as e:
        print("發生錯誤!", e)

    finally:
        mssql_conn.close()
        print("\n連線已關閉")
    return schema_info

In [5]:
# 資料表名稱 : COLUMN_NAME, DATA_TYPE, IS_PRIMARY_KEY, IS_FOREIGN_KEY, REFERENCES_TABLE, REFERENCES_COLUMN, ON_DELETE, ON_UPDATE, INDEX_TYPE
mssql_schema = get_mssql_table_details()
# 到這步可以查出所有建表時需要的資訊

✅ 成功連線 MSSQL
使用者自定義資料表結構如下：

Appeal
   ├─ ('caseId', 'int', 'YES', 'YES', 'Penalty_Info', 'caseId', 'NO_ACTION', 'CASCADE', 'CLUSTERED')
   ├─ ('document_no', 'nvarchar(100)', 'YES', 'YES', 'Penalty_Info', 'document_no', 'NO_ACTION', 'CASCADE', 'CLUSTERED')
   ├─ ('fac_uniformno', 'nvarchar(100)', 'YES', 'YES', 'Factory', 'fac_uniformno', 'NO_ACTION', 'CASCADE', 'CLUSTERED')
   ├─ ('fac_name', 'nvarchar(100)', 'YES', 'YES', 'Factory', 'fac_name', 'NO_ACTION', 'CASCADE', 'CLUSTERED')
   ├─ ('appeal_or_rescind', 'nvarchar(-1)', 'NO', 'NO', None, None, None, None, 'NONE')
   ├─ ('ispetition', 'int', 'NO', 'NO', None, None, None, None, 'NONE')
   ├─ ('lawsuit_date_1', 'date', 'NO', 'NO', None, None, None, None, 'NONE')
   ├─ ('petition_agency', 'nvarchar(100)', 'NO', 'NO', None, None, None, None, 'NONE')
   ├─ ('petition_results', 'nvarchar(-1)', 'NO', 'NO', None, None, None, None, 'NONE')

Factory
   ├─ ('fac_uniformno', 'nvarchar(100)', 'YES', 'NO', None, None, None, None, 'CLUSTERED')
 

In [6]:
def setup_logger():
    logging.basicConfig(
        filename='schema_transfer_log.txt',                # 寫入的檔案名稱
        level=logging.INFO,                # 記錄層級：INFO / WARNING / ERROR
        format='%(asctime)s - %(levelname)s - %(message)s',
        encoding='utf-8'                   # 防止中文字出現亂碼
    )

In [7]:
# 這裡要寫將資料表全數轉移到 mariaDB 後做驗證
def valid_sqlschema(mssql_schema,mariaDB_schema,table_name):
    mssql_table = mssql_schema[table_name]
    maria_table = mariaDB_schema[table_name]
    for i in range(len(mssql_table)):
        col_name1, col_type1, is_pk1, is_fk1, ref_table1, ref_col1, on_del1, on_upd1, idx1 = mssql_table[i]
        col_name2, col_type2, is_pk2, is_fk2, ref_table2, ref_col2, on_del2, on_upd2, idx2 = maria_table[i]
        if col_name1.lower() != col_name2.lower():
            logging.error(f"❌ 轉移 {table_name} 錯誤，欄位名稱不一致 => mssql : {col_name1} maria : {col_name2}")
        type_error = f"❌ 轉移 {table_name} 錯誤，欄位型別及長度不一致 => mssql : {col_name1} {col_type1} maria : {col_name2} {col_type2}"
        if 'nvarchar' in col_type1:
            if col_type1 == 'nvarchar(-1)':
                if col_type2 != 'longtext(4294967295)':
                    logging.error(type_error)
                continue
            if 'varchar' not in col_type2:
                logging.error(type_error)
                continue
            # 取得括號內的數字
            col_type1_len = col_type1.split('(')[1].split(')')[0]
            col_type2_len = col_type2.split('(')[1].split(')')[0]
            if col_type1_len != col_type2_len:
                logging.error(type_error)
                continue
        else:
            if col_type1 != col_type2:
                logging.error(type_error)
        if is_pk1 != is_pk2:
            logging.error(f"❌ 轉移 {table_name} 錯誤，主鍵設定不一致 => mssql : {col_name1} {is_pk1} maria : {col_name2} {is_pk2}")
        if is_fk1 != is_fk2:
            logging.error(f"❌ 轉移 {table_name} 錯誤，外鍵設定不一致 => mssql : {col_name1} {is_fk1} maria : {col_name2} {is_fk2}")

        if (ref_table1 is None and ref_table2 is not None) or (ref_table1 is not None and ref_table2 is None):
            logging.error(f"❌ 轉移 {table_name} 錯誤，參考資料表不一致 => mssql : {col_name1} {ref_table1} maria : {col_name2} {ref_table2}")
        elif ref_table1 is not None and ref_table2 is not None and ref_table1.lower() != ref_table2.lower():
            logging.error(f"❌ 轉移 {table_name} 錯誤，參考資料表不一致 => mssql : {col_name1} {ref_table1} maria : {col_name2} {ref_table2}")
            
        if (ref_col1 is None and ref_col2 is not None) or (ref_col1 is not None and ref_col2 is None):
            logging.error(f"❌ 轉移 {table_name} 錯誤，參考欄位不一致 => mssql : {col_name1} {ref_col1} maria : {col_name2} {ref_col2}")
        elif ref_col1 is not None and ref_col2 is not None and ref_col1.lower() != ref_col2.lower():
            logging.error(f"❌ 轉移 {table_name} 錯誤，參考欄位不一致 => mssql : {col_name1} {ref_col1} maria : {col_name2} {ref_col2}")

        if (on_del1 is None and on_del2 is not None) or (on_del1 is not None and on_del2 is None):
            logging.error(f"❌ 轉移 {table_name} 錯誤，ON DELETE 設定不一致 => mssql : {col_name1} {on_del1} maria : {col_name2} {on_del2}")
        elif on_del1 is not None and on_del2 is not None:
            # 移除 NO_ACTION 和 SET_NULL 中的底線
            on_del1_clean = on_del1.replace('_', ' ') if on_del1 in ['NO_ACTION', 'SET_NULL'] else on_del1
            if on_del1_clean != on_del2:
                logging.error(f"❌ 轉移 {table_name} 錯誤，ON DELETE 設定不一致 => mssql : {col_name1} {on_del1} maria : {col_name2} {on_del2}")

        if (on_upd1 is None and on_upd2 is not None) or (on_upd1 is not None and on_upd2 is None):
            logging.error(f"❌ 轉移 {table_name} 錯誤，ON UPDATE 設定不一致 => mssql : {col_name1} {on_upd1} maria : {col_name2} {on_upd2}")
        elif on_upd1 is not None and on_upd2 is not None:
            # 移除 NO_ACTION 和 SET_NULL 中的底線
            on_upd1_clean = on_upd1.replace('_', ' ') if on_upd1 in ['NO_ACTION', 'SET_NULL'] else on_upd1
            if on_upd1_clean != on_upd2:
                logging.error(f"❌ 轉移 {table_name} 錯誤，ON UPDATE 設定不一致 => mssql : {col_name1} {on_upd1} maria : {col_name2} {on_upd2}")

    return

In [8]:
# 將 MSSQL 裡的資料表轉移到 MariaDB
# 步驟是將 mssql_schema 裡所有資料表資訊使用 CREATE 語法在 MariaDB 中建立資料表
def transfer_schema_toMaria(mssql_schema):
    setup_logger()
    try:
        # 連線到 MariaDB
        maria_conn = mariadb.connect(
            host="localhost",     
            port=3306,              
            user="root",           
            password="6124Nok45",      
            database="test" 
        )
        maria_cursor = maria_conn.cursor()
        print("✅ 成功連線 MariaDB")
        
        for table in mssql_schema:
            
            # 字典會存的資料表資訊 : COLUMN_NAME, DATA_TYPE, IS_PRIMARY_KEY, IS_FOREIGN_KEY, REFERENCES_TABLE, REFERENCES_COLUMN, ON_DELETE, ON_UPDATE, INDEX_TYPE
            # 建立 CREATE 語法字串 (還未設定 FK)
            insert_query = f'CREATE TABLE {table} ('
            index_query = ''
            col_value = ''
            pk_list = []  # 儲存 pk 欄位
            idx_list = [] # 儲存 nonclustered index 欄位

            # 從 mssql_schema[table] 取出 table 資料表的所有欄位定義
            for i ,col in enumerate(mssql_schema[table]):
                col_name, col_type, is_pk, is_fk, ref_table, ref_col, on_del, on_upd, idx = col
                # 若該欄位是 PK
                if is_pk == 'YES':
                    pk_list.append(col[0])
                # MariaDB 沒有 nvarchar 只有 varchar，若該欄位是 nvarchar 型別
                if 'nvarchar' in col_type:
                    # 若原始 MSSQL 裡欄位型別長度為 nvarchar(MAX)，查詢結果會顯示 nvarchar(-1)
                    # mariaDB 中以 LONGTEXT 近似長度 MAX
                    # 若主鍵欄位是 nvarchar，則其長度有限制不能超過 255
                    # 若 fk 超過 255 且又參考到另一張表的主鍵，因為其參考到的主鍵會被更動長度為 255，所以此參考鍵欄位長度也要更動為 255
                    if (is_pk == 'YES' or is_fk == 'YES') and int(col_type.split('(')[1].split(')')[0]) > 255:
                        col_type = 'LONGTEXT' if '-1' in col_type else 'varchar(255)'
                    else:
                        col_type = 'LONGTEXT' if '-1' in col_type else col_type.replace('nvarchar','varchar')
                
                col_value += f'\n    {col_name} {col_type},'
                
                if idx == 'NONCLUSTERED':
                    if 'varchar' in col_type and int(col_type.split('(')[1].split(')')[0]) > 255:
                        idx_list.append(f'{col_name}(100)')
                    else:
                        idx_list.append(col_name)


            insert_query += f'{col_value}\n    PRIMARY KEY({",".join(pk_list)}),'
            insert_query = insert_query[:-1] + '\n);'
            print(insert_query)
            # 執行插入資料表語法
            maria_cursor.execute(insert_query)

            if idx_list:
                index_query = f'CREATE INDEX {table}_index ON {table} ({",".join(idx_list)});'
                print(index_query) 
                maria_cursor.execute(index_query)
                
            
        # 執行建立索引語法
            
        # 儲存以建立好的 mariaDB 資料表綱要，用於每步驗證
        maria_schema = {}
        # 開始設定 FK 資訊，到這步所有資料表定義都已完成，可以進行驗證
        for table in mssql_schema:
            set_fk = ''   # 建立 fk 語法
            # 記錄當前資料表 (table) 參考鍵所參考到的資料表，beRef_table[被參考的資料表名稱] : [參考鍵欄位]
            beRef_table = {table : [] for table in mssql_schema}
            # 記錄當前資料表 (table) 參考鍵所參考到的資料表設定的完整性限制，ref_col_constraint[被參考的資料表名稱] : [限制]
            ref_col_constraint = {table : [] for table in mssql_schema}
            # 檢查當前 table 的所有欄位是否為 fk
            for i ,col in enumerate(mssql_schema[table]):
                col_name, col_type, is_pk, is_fk, ref_table, ref_col, on_del, on_upd, idx = col

                # 若該欄位是 FK
                if is_fk == 'YES':
                    
                    beRef_table[ref_table].append(col_name)
                    # 將 NO_ACTION、SET_NULL 改成符合語法的格式
                    if not ref_col_constraint[ref_table]:
                        on_del = on_del.replace('_',' ') if on_del in ('NO_ACTION', 'SET_NULL') else on_del
                        on_upd = on_upd.replace('_',' ') if on_upd in ('NO_ACTION', 'SET_NULL') else on_upd
                        ref_col_constraint[ref_table].extend([on_del, on_upd])
            for ref, fk_col in beRef_table.items():
                # 若該資料表 (ref) 有被當前資料表 (table) 參考到，fk_col 才會有值，建立新增 FK 的語法
                if fk_col:
                    on_del, on_upd = ref_col_constraint[ref]
                    set_fk += f'\nADD FOREIGN KEY({",".join(fk_col)}) REFERENCES {ref}({",".join(fk_col)}) ON DELETE {on_del} ON UPDATE {on_upd},'
            if set_fk:
                set_fk = f'ALTER TABLE {table}{set_fk[:-1]};'
                print(set_fk)
                maria_cursor.execute(set_fk)
            logging.info(f"✅ 建立資料表 {table} 完畢")

            # 這裡上述程式結束已建完一個完整資料表，現在從 mariaDB 查出這個資料表所有欄位定義
            maria_querystr = mariaDB_schema_query(table)
            maria_cursor.execute(maria_querystr)
            columns = maria_cursor.fetchall()
            maria_schema[table] = []
            for col in columns:
                maria_schema[table].append(col[1:10])
            
            # 驗證資料表綱要，將 mariaDb 資料表定義與 MSSQL 資料表定義做比對驗證
            valid_sqlschema(mssql_schema,maria_schema,table)

    except Exception as e:
        print("發生錯誤!", e)
    finally:
        maria_conn.close()
        print("\n連線已關閉")

In [9]:
transfer_schema_toMaria(mssql_schema)

✅ 成功連線 MariaDB
CREATE TABLE Appeal (
    caseId int,
    document_no varchar(100),
    fac_uniformno varchar(100),
    fac_name varchar(100),
    appeal_or_rescind LONGTEXT,
    ispetition int,
    lawsuit_date_1 date,
    petition_agency varchar(100),
    petition_results LONGTEXT,
    PRIMARY KEY(caseId,document_no,fac_uniformno,fac_name)
);
CREATE TABLE Factory (
    fac_uniformno varchar(100),
    fac_name varchar(100),
    fac_area_code varchar(100),
    fac_address LONGTEXT,
    ems_no varchar(100),
    PRIMARY KEY(fac_uniformno,fac_name)
);
CREATE TABLE Inspection (
    caseId int,
    document_no varchar(100),
    inspection_datetime_s datetime,
    inspection_condition LONGTEXT,
    PRIMARY KEY(caseId,document_no)
);
CREATE TABLE Labor_Law (
    transgress_law varchar(255),
    law_content varchar(1000),
    PRIMARY KEY(transgress_law)
);
CREATE TABLE Labor_Transgress (
    pId int,
    agency varchar(1000),
    announce_date date,
    penalty_date date,
    document_no varcha