In [1]:
import sqlite3
import mysql.connector
import pandas as pd
from datetime import datetime
import os
import warnings

HOST = os.getenv("HOST")
PASSWORD = os.getenv("PASSWORD")
USER = os.getenv("USER")
DATABASE = "sqlite_employees"

# suppress the connection warning regarding swithing to sqlalchemy
warnings.filterwarnings('ignore')



## OPTION 1
#### Directly import the function from corresponding notebook(s) with `%run` magic command.

In [2]:
%run 01_get_sqlite_table_metadata.ipynb import get_sqlite_schema
get_sqlite_schema('employees')

[(0, 'id', 'INTEGER', 0, None, 1),
 (1, 'name', 'TEXT', 1, None, 0),
 (2, 'age', 'INTEGER', 0, None, 0),
 (3, 'department', 'TEXT', 0, None, 0),
 (4, 'position', 'TEXT', 0, None, 0),
 (5, 'salary', 'REAL', 0, None, 0)]

In [3]:
%run 02_get_mysql_da''ta_type.ipynb import get_mysql_data_type
get_mysql_data_type('integer')

'INT'

## (BETTER) OPTION 2
#### Create a `utils.py` module and import the functions directly fom that module. 

In [4]:
from mysql_utils import get_mysql_data_type, get_sqlite_schema

In [5]:
def create_mysql_table(table_name: str, schema: list, host: str, user: str, password: str, database: str) -> None:
    """
    Create a MySQL table based on the schema information obtained from a SQLite table

    Args:
        table_name: The name of the table to create in MySQL
        schema: The schema information obtained from the SQLite table
        host: The MySQL server host to connect to
        user: The MySQL server user to authenticate as
        password: The password for the MySQL server user
        database: The name of the MySQL database to use
    """
    with mysql.connector.connect(host=host, user=user, password=password, database=database) as conn:
        cursor = conn.cursor()

        # Drop the table if it exists
        cursor.execute(f'DROP TABLE IF EXISTS {table_name}')

        # Create the table
        create_table_query = f'CREATE TABLE {table_name} ('
        for i, (col_id, col_name, col_type, _, _, is_pk) in enumerate(schema):
            if i > 0:
                create_table_query += ', '
            create_table_query += f'{col_name} {get_mysql_data_type(col_type)}'
            if is_pk:
                create_table_query += ' PRIMARY KEY AUTO_INCREMENT'
        create_table_query += ');'
        cursor.execute(create_table_query)

        # Add a timestamp column to the table
        alter_query = f"""ALTER TABLE {table_name} ADD COLUMN created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP;"""
        cursor.execute(alter_query)
        
        # Print the column information as confirmation
        describe_query = f"""SHOW COLUMNS FROM {table_name}"""
        cursor.execute(describe_query)
        rows = cursor.fetchall()
        print(('FIELD', 'TYPE', 'NULL', 'KEY', 'DEFAULT', 'EXTRA')) # show columns command metadata
        for row in rows:
            print(row)
            # print(tuple(x.decode() if isinstance(x, bytes) else x for x in row))

        # Commit the changes
        conn.commit()

In [6]:
create_mysql_table(table_name='employees', 
                   schema=get_sqlite_schema('employees'),
                   host=HOST,
                   user=USER,
                   password=PASSWORD,
                   database=DATABASE)

('FIELD', 'TYPE', 'NULL', 'KEY', 'DEFAULT', 'EXTRA')
('id', b'int', 'NO', 'PRI', None, 'auto_increment')
('name', b'varchar(255)', 'YES', '', None, '')
('age', b'int', 'YES', '', None, '')
('department', b'varchar(255)', 'YES', '', None, '')
('position', b'varchar(255)', 'YES', '', None, '')
('salary', b'float', 'YES', '', None, '')
('created_at', b'timestamp', 'YES', '', b'CURRENT_TIMESTAMP', 'DEFAULT_GENERATED')


The `b` prefix before the string in the output above represents a `bytes` object. In Python, a `bytes` object is a squence of bytes(values from 0 to 255) that represetn a sequence of characters.

As of Python 3.x, the `decode()` method is not available on `str` objects because they are already Unicode objects. Nevertheles, when we retrieve data from a database using Python, it is often returned as bytes object - due to the internal workings of the `mysql library`. This is because the data may contain non-ASCII characters or special characters that need to be encoded in a specific format.

To convert the `bytes` object to a `str` object, we need to use the `decode()` method. 

## Updated Code

In [7]:
def create_mysql_table(table_name: str, schema: list, host: str, user: str, password: str, database: str) -> None:
    """
    Create a MySQL table based on the schema information obtained from a SQLite table

    Args:
        table_name: The name of the table to create in MySQL
        schema: The schema information obtained from the SQLite table
        host: The MySQL server host to connect to
        user: The MySQL server user to authenticate as
        password: The password for the MySQL server user
        database: The name of the MySQL database to use
    """
    with mysql.connector.connect(host=host, user=user, password=password, database=database) as conn:
        cursor = conn.cursor()

        # Drop the table if it exists
        cursor.execute(f'DROP TABLE IF EXISTS {table_name}')

        # Create the table
        create_table_query = f'CREATE TABLE {table_name} ('
        for i, (col_id, col_name, col_type, _, _, is_pk) in enumerate(schema):
            if i > 0:
                create_table_query += ', '
            create_table_query += f'{col_name} {get_mysql_data_type(col_type)}'
            if is_pk:
                create_table_query += ' PRIMARY KEY AUTO_INCREMENT'
        create_table_query += ');'
        cursor.execute(create_table_query)

        # Add a timestamp column to the table
        alter_query = f"""ALTER TABLE {table_name} ADD COLUMN created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP;"""
        cursor.execute(alter_query)
        
        # Print the column information as confirmation
        describe_query = f"""SHOW COLUMNS FROM {table_name}"""
        cursor.execute(describe_query)
        rows = cursor.fetchall()
        print(('FIELD', 'TYPE', 'NULL', 'KEY', 'DEFAULT', 'EXTRA')) # show columns command metadata
        for row in rows:
            # print(row)
            print(tuple(x.decode() if isinstance(x, bytes) else x for x in row))

        # Commit the changes
        conn.commit()

In [8]:
create_mysql_table(table_name='employees', 
                   schema=get_sqlite_schema('employees'),
                   host=HOST,
                   user=USER,
                   password=PASSWORD,
                   database=DATABASE)

('FIELD', 'TYPE', 'NULL', 'KEY', 'DEFAULT', 'EXTRA')
('id', 'int', 'NO', 'PRI', None, 'auto_increment')
('name', 'varchar(255)', 'YES', '', None, '')
('age', 'int', 'YES', '', None, '')
('department', 'varchar(255)', 'YES', '', None, '')
('position', 'varchar(255)', 'YES', '', None, '')
('salary', 'float', 'YES', '', None, '')
('created_at', 'timestamp', 'YES', '', 'CURRENT_TIMESTAMP', 'DEFAULT_GENERATED')
