# Excel to MySQL Data Pipeline

This notebook processes the `EXEMPTION MIDA FOR 755 NEW.xlsx` file, translating Malay column headers to English, cleaning data, and importing it into a MySQL database.

In [14]:
import pandas as pd
import mysql
from mysql.connector import Error
import re
import numpy as np
# --- CONFIGURATION ---
EXCEL_PATH = 'EXEMPTION MIDA FOR 755 NEW.xlsx'

# MySQL Configuration
# Please update these credentials as needed
DB_CONFIG = {
    'host': 'localhost',
    'user': 'root',
    'password': '',  # Enter your password here
    'database': 'mida_exemption_db'  # The database to create/use
}

In [15]:
# --- TRANSLATION LOGIC ---

# Mapping of full Malay phrases (normalized) to English column names
HEADER_MAPPING = {
    'TARIKH IMPORT': 'import_date',
    'NO DAFTAR BORANG IKRAR': 'declaration_form_registration_no',
    'BAKI DI BAWA KEHADAPAN': 'balance_brought_forward',
    'KUANTITI KGS': 'quantity_kgs',
    'KUANTITI PCS': 'quantity_pcs',
    'BAKI (KGS)': 'balance_kgs',
    'BAKI (PCS)': 'balance_pcs',
    'T/TANGAN PIK': 'pik_signature',
    'T/TANGAN PNK': 'pnk_signature'
}

def clean_header(h1, h2):
    """
    Combines two header rows and normalizes whitespace.
    """
    combined = f"{str(h1).strip()} {str(h2).strip()}"
    # Collapse multiple spaces into one
    normalized = " ".join(combined.split())
    return normalized

def translate_header(normalized_header):
    """
    Translates the normalized header using the mapping.
    Falls back to a snake_case conversion if not found.
    """
    if normalized_header in HEADER_MAPPING:
        return HEADER_MAPPING[normalized_header]
    
    # Fallback
    print(f"Warning: No direct translation for '{normalized_header}'. Using snake_case.")
    return normalized_header.lower().replace(' ', '_').replace('.', '').replace('/', '_').replace('(', '').replace(')', '')

In [16]:
# --- PROCESS EXCEL FILE ---

print("Loading Excel file...")
excel_file = pd.ExcelFile(EXCEL_PATH)

processed_dfs = {}

for sheet in excel_file.sheet_names:
    try:
        # 1. Read Headers (Rows 13 & 14 are indices 12 & 13)
        # We read enough rows to cover them
        header_df = pd.read_excel(excel_file, sheet_name=sheet, header=None, nrows=14)
        h1_row = header_df.iloc[12]
        h2_row = header_df.iloc[13]
        
        # 2. Construct Translated Column Names
        new_columns = []
        for h1, h2 in zip(h1_row, h2_row):
            norm = clean_header(h1, h2)
            trans = translate_header(norm)
            new_columns.append(trans)
            
        # 3. Read Data (Skip first 14 rows)
        df = pd.read_excel(excel_file, sheet_name=sheet, header=None, skiprows=14)
        
        # Assign columns
        # Ensure column count matches. If mismatch, trim or pad.
        if len(df.columns) == len(new_columns):
            df.columns = new_columns
        else:
            print(f"Warning: Column count mismatch in sheet '{sheet}'. Expected {len(new_columns)}, got {len(df.columns)}.")
            # Simple fix: truncate or take min
            min_len = min(len(df.columns), len(new_columns))
            df = df.iloc[:, :min_len]
            df.columns = new_columns[:min_len]
        
        # 4. Generate Clean Table Name
        # Replace non-alphanumeric with underscore, lower case
        table_name = re.sub(r'[^a-zA-Z0-9]', '_', sheet).lower()
        # Remove duplicate underscores
        table_name = re.sub(r'_+', '_', table_name).strip('_')
        
        processed_dfs[table_name] = df
        
    except Exception as e:
        print(f"Error processing sheet '{sheet}': {e}")

print(f"Successfully processed {len(processed_dfs)} sheets.")

Loading Excel file...
Successfully processed 56 sheets.


In [17]:
# --- INSPECT GENERATED DATAFRAMES ---
# Displaying first few rows and datatypes of the first 3 processed tables

for table_name, df in list(processed_dfs.items())[:3]:
    print(f"\n=== TABLE: {table_name} ===")
    print(df.info())
    print(df.head())


=== TABLE: ball_62 ===
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55 entries, 0 to 54
Data columns (total 7 columns):
 #   Column                            Non-Null Count  Dtype  
---  ------                            --------------  -----  
 0   import_date                       1 non-null      object 
 1   declaration_form_registration_no  1 non-null      object 
 2   balance_brought_forward           1 non-null      float64
 3   quantity_kgs                      1 non-null      float64
 4   balance_kgs                       1 non-null      float64
 5   pik_signature                     1 non-null      object 
 6   pnk_signature                     0 non-null      float64
dtypes: float64(4), object(3)
memory usage: 3.1+ KB
None
  import_date declaration_form_registration_no  balance_brought_forward  \
0  27.01.2025                     B18101053316                 252000.0   
1         NaN                              NaN                      NaN   
2         NaN            

In [18]:
# --- DATABASE IMPORT ---

def get_mysql_type(dtype):
    """Infer MySQL type from Pandas dtype"""
    dtype_str = str(dtype)
    if 'int' in dtype_str:
        return 'INT'
    elif 'float' in dtype_str:
        return 'DOUBLE'
    elif 'datetime' in dtype_str:
        return 'DATETIME'
    else:
        return 'VARCHAR(255)'

try:
    # 1. Connect to MySQL Server
    print("Connecting to MySQL...")
    conn = mysql.connector.connect(
        host=DB_CONFIG['host'],
        user=DB_CONFIG['user'],
        password=DB_CONFIG['password']
    )
    cursor = conn.cursor()
    
    # 2. Create Database
    db_name = DB_CONFIG['database']
    cursor.execute(f"CREATE DATABASE IF NOT EXISTS `{db_name}`")
    print(f"Database '{db_name}' ready.")
    
    # 3. Switch to Database
    conn.database = db_name
    
    # 4. Create Tables and Insert Data
    for table_name, df in processed_dfs.items():
        print(f"Importing table '{table_name}'...")
        
        # Generate CREATE TABLE SQL
        cols_def = []
        for col in df.columns:
            mysql_type = get_mysql_type(df[col].dtype)
            # Handle reserved words or special chars in columns by quoting
            cols_def.append(f"`{col}` {mysql_type}")
            
        create_sql = f"CREATE TABLE IF NOT EXISTS `{table_name}` ({', '.join(cols_def)})"
        cursor.execute(create_sql)
        
        # Clean Data for Insertion
        # Convert NaN to None (NULL in SQL)
        df_sql = df.where(pd.notnull(df), None)
        
        # Generate INSERT SQL
        placeholders = ', '.join(['%s'] * len(df.columns))
        columns_list = ', '.join([f"`{c}`" for c in df.columns])
        insert_sql = f"INSERT INTO `{table_name}` ({columns_list}) VALUES ({placeholders})"
        
        # Execute Batch Insert
        values = [tuple(x) for x in df_sql.to_numpy()]
        cursor.executemany(insert_sql, values)
        
    conn.commit()
    print("\nSUCCESS: All data imported into database.")

except Error as e:
    print(f"\nMySQL Error: {e}")

finally:
    if 'conn' in locals() and conn.is_connected():
        cursor.close()
        conn.close()
        print("MySQL connection closed.")

Connecting to MySQL...

MySQL Error: 1045 (28000): Access denied for user 'root'@'localhost' (using password: NO)
