In [None]:
import os
import glob
import pandas as pd
import numpy as np
import mysql.connector
import requests
from mysql.connector.connection import MySQLConnection
from typing import Dict,List,Tuple,Any,Optional
import shutil
import py7zr  # For handling .7z files
import zipfile
from tqdm import tqdm # progress bar for download
import math


# Connection Variables
USER = 'Sudo'
PASSWORD = 'password'
DATABASE = 'sys' # Do not change! This is the default database for MySQL


#Server Connection Configuration
CONN_CONFIG: Dict[str, str]  = {
    "host": "localhost",
    "user": USER,
    "password": PASSWORD,
    "database": DATABASE
}

CONTOSO_DOWNLOAD_LINK = r"https://github.com/sql-bi/Contoso-Data-Generator-V2-Data/releases/download/ready-to-use-data/csv-10m.7z"
CONTOSO_FILENAME = "csv-10m.7z"# DO NOT CHANGE THIS VALUE!

DBSTART = 'Contoso' # Name of the database to create and where all other tables will be created
EXTRACT_DIR = f"{os.path.expanduser('~')}\\Downloads\\{DBSTART}\\"


def download_file(url: str, filename: str) -> None:
    """
    Downloads a file from a given URL and saves it to the user's Downloads folder,
    with a progress bar displayed in the console.

    :param url: The URL of the file to download.
    :param filename: The name of the file to save.
    :return: None
    """
    user_downloads_dir = os.path.join(os.path.expanduser("~"), "Downloads")
    file_path = os.path.join(user_downloads_dir, filename)

    with requests.get(url, stream=True) as response:
        response.raise_for_status()  # Raise an error for bad responses (4xx or 5xx)

        # Get the total file size from the headers
        total_size = int(response.headers.get('content-length', 0))

        with open(file_path, 'wb') as f:
            with tqdm(total=total_size, unit='B', unit_scale=True, desc="Downloading") as progress_bar:
                for chunk in response.iter_content(chunk_size=1024):  # Download in 1KB chunks
                    f.write(chunk)
                    progress_bar.update(len(chunk))  # Update the progress bar

    print(f"File downloaded to: {file_path}")


def extract_archive(filename: str) -> Optional[str]:
    """
    Searches for a given .7z archive file in the user's Downloads folder,
    and extracts its contents to DOWNLOADS_DIR with a progress bar and detailed error handling.

    :param filename: The name of the .7z archive file to extract.
    :return: The path where files were extracted or None if extraction failed.
    """
    user_downloads_dir = os.path.join(os.path.expanduser("~"), "Downloads")
    file_path = os.path.join(user_downloads_dir, filename)

    # Check if the file exists in Downloads folder
    if not os.path.exists(file_path):
        print(f"Error: File '{filename}' not found in {user_downloads_dir}.")
        return None

    # Ensure the extraction directory exists
    os.makedirs(EXTRACT_DIR, exist_ok=True)

    try:    
        if filename.endswith(".7z"):
            # Extract .7z files with a progress bar
            try:
                counter = 0 
                with py7zr.SevenZipFile(file_path, mode='r', blocksize=1024*1024 ) as archive:
                    file_list = archive.getnames()
                    
                    with tqdm(total=len(file_list), unit="Files Extracted", desc="Extracting") as  files_progress_bar:
                
                        for file in file_list:
                            try:
                                files_progress_bar.set_description(f"Extracting {file}...")
                                files_progress_bar.refresh()
                                archive.extract(targets=[os.path.join(user_downloads_dir,CONTOSO_FILENAME),file], path=EXTRACT_DIR, recursive=False)
                                archive.reset() 
                                files_progress_bar.update(1)
                                
                            except Exception as file_error:
                                print(f"Error extracting file '{file}': {file_error}")
                                continue
                            
                            counter += 1
                            if counter == len(file_list):
                                archive.close()
                                break
                
                print(f"Extracted '{filename}' to '{EXTRACT_DIR}'.")
                return EXTRACT_DIR
           
            except py7zr.Bad7zFile:
                print(f"Error: '{filename}' is not a valid 7z file.")
                return None
           
            except Exception as e:
                print(f"Error extracting 7z file '{filename}': {e}")
                return None

        else:
            print(f"Error: '{filename}' is not a supported archive format (7z).")
            return None

    except PermissionError:
        print(f"Error: Permission denied while accessing '{filename}' or writing to '{EXTRACT_DIR}'.")
        return None
    
    except FileNotFoundError:
        print(f"Error: File '{filename}' not found during extraction.")
        return None
    
    except Exception as e:
        print(f"Unexpected error while extracting '{filename}': {e}")
        return None


def create_database(db_name: str = None) -> None:
    """
    Creates a new MySQL database if it does not exist.

    :param db_name: Name of the database to create.
    :return: None
    """
    try:
        # Connect to MySQL Server (without specifying a database)
        conn: MySQLConnection = mysql.connector.connect(**CONN_CONFIG)
        cursor = conn.cursor()

        # Create database if it doesn't exist
        cursor.execute(f"CREATE DATABASE IF NOT EXISTS `{db_name}`;")
        print(f"Database `{db_name}` created or already exists.")

        # Close connection
        cursor.close()
        conn.close()

    except mysql.connector.Error as err:
        print(f"Error: {err}")


def infer_mysql_dtype(series: pd.Series) -> str:
    """
    Infers the MySQL data type based on an analysis of a column's values from CSV data.
    """
    
    datetime_formats = {
        "DATETIME": [
            "%Y-%m-%d %H:%M:%S",
            "%d-%m-%Y %H:%M:%S",
            "%m/%d/%Y %H:%M:%S"
        ],
        "DATE": [
            "%Y-%m-%d",
            "%d-%m-%Y",
            "%m/%d/%Y",
            "%d %b %Y",
            "%d %B %Y"
        ],
    }

    non_null_values = series.dropna()
    # default if column is empty
    if non_null_values.empty:
        return "VARCHAR(255)"  
    def check_datetime(value):
        for fmt in datetime_formats["DATETIME"]:
            try:
                pd.to_datetime(value, format=fmt)
                return "DATETIME"
          
            except ValueError:
                continue
     
        for fmt in datetime_formats["DATE"]:
            try:
                pd.to_datetime(value, format=fmt)
                return "DATE"
            except ValueError:
                continue
        return None

    inferred_types = {"int": 0, "float": 0, "datetime": 0, "date": 0, "str": 0, "bit":0}
    max_str_len = 0
    is_bit_candidate = None

    for x in non_null_values:
        val = x # force native python type
        
        if isinstance(val, bool):
            inferred_types["bit"] += 1
        
        elif isinstance(val, int) and val in (0, 1):
            inferred_types["bit"] += 1
        
        else:
            is_bit_candidate = False 
        
        
        if isinstance(val, int):
            inferred_types["int"] += 1
        
        elif isinstance(val, float):
            inferred_types["float"] += 1
        
        elif isinstance(val, str):
            max_str_len = max(max_str_len, len(val))
            dt_type = check_datetime(val)
        
            if dt_type == "DATETIME":
                inferred_types["datetime"] += 1
        
            elif dt_type == "DATE":
                inferred_types["date"] += 1
        
            else:
                inferred_types["str"] += 1

    # Determine dominant type
    
    if is_bit_candidate and inferred_types["bit"] == len(non_null_values):
        return "BIT"
    
    if inferred_types["int"] == len(non_null_values):
        min_val = non_null_values.min()
        max_val = non_null_values.max()
        
        if 0 <= min_val and max_val <= 255:
            return "TINYINT"
        
        elif -32768 <= min_val and max_val <= 32767:
            return "SMALLINT"
        
        elif -8388608 <= min_val and max_val <= 8388607:
            return "MEDIUMINT"
        
        elif -2147483648 <= min_val and max_val <= 2147483647:
            return "INT"
        
        else:
            return "BIGINT"

    elif inferred_types["float"] + inferred_types["int"] == len(non_null_values):
        return "DECIMAL(18,6)"

    elif inferred_types["datetime"] > 0 and inferred_types["datetime"] + inferred_types["str"] == len(non_null_values):
        return "DATETIME"
    
    elif inferred_types["date"] > 0 and inferred_types["date"] + inferred_types["str"] == len(non_null_values):
        return "DATE"

    elif inferred_types["str"] > 0:
    
        if max_str_len <= 255:
            return "VARCHAR(255)"
    
        elif max_str_len <= 65535:
            return "TEXT"
    
        else:
            return "LONGTEXT"

    return "VARCHAR(255)"


def count_rows_in_csv(filepath: str) -> int:
    """
    Counts the number of rows in a CSV file.

    :param filepath: Path to the CSV file.
    :return: Number of rows in the CSV file.
    """
    try:
        with open(filepath, 'r') as file:
            total_rows = sum(1 for _ in file) - 1  # Subtract 1 for the header row
        return total_rows
    except FileNotFoundError:
        print(f"Error: File '{filepath}' not found.")
        return 0
 

def compute_scale(x: int, max_x: int = 10000000, min_scale: float = 0.001, max_scale: float = 0.15) -> float:
    min_x = max(0, x)  
    if not (min_x <= x <= max_x):
        raise ValueError("Input must be between 0 and 10,000,000")

    # Normalize using log scale to emphasize early values
    log_x = math.log10(x + 1)  # avoid log(0)
    log_max = math.log10(max_x + 1)
    scale = max_scale - (log_x / log_max) * (max_scale - min_scale)
    
    return round(scale, 6)


def create_tables_from_csv(targetdirectory: str = EXTRACT_DIR ) -> None:
    """
    Scans CSV files in a given directory, infers table schema, 
    and creates MySQL tables dynamically based on CSV headers and data types.
    then loads the data into the tables.

    :return: None
    """
    csv_files = glob.glob(os.path.join(targetdirectory, "*.csv"))
    file_list = [(os.path.basename(file), file) for file in csv_files]
    chunk_size = 1000  # Number of rows per chunk
    conn = mysql.connector.connect(**CONN_CONFIG)
    cursor = conn.cursor()

    print(f"Found {len(file_list)} CSV files in {targetdirectory}.")

    #######################
    # Table Creation Loop #
    #######################
    for filename, filepath in file_list:

        print(f"Begin table creation for file: {filename}...")
        
        table_name = os.path.splitext(filename)[0]  # Remove .csv extension

        df = pd.read_csv(filepath,nrows=1)
        columns = df.columns.tolist()
        total_row_count = count_rows_in_csv(filepath)
        total_row_count = max(total_row_count, 9999999) 
        scaling_factor = compute_scale(x=total_row_count)
        inferred_types = []
        
        print(f"Scaling factor for {filename}: {scaling_factor}...")
        print(f"Total rows in {filepath}: {total_row_count}...")
        
        for col in columns:
            print(f"Inferring data types for column: {col} in {filename}...")
            column_types = [] 

            with pd.read_csv(filepath, usecols=[col], chunksize=1000) as data:
                
                for chunk in data:
                    chunk = chunk.sample(frac=scaling_factor)
                    column_types.extend([infer_mysql_dtype(chunk[col])])
            
            most_frequent_type = max(set(column_types), key=column_types.count)
            inferred_types.append(most_frequent_type)
            print(f"Data type inffered for '{col}': {most_frequent_type} \n \n ")
            
        
        columns_sql = ", ".join(f"{col} {dtype}" for col, dtype in zip(columns, inferred_types))
        create_table_sql = f" CREATE TABLE IF NOT EXISTS {DBSTART}.{table_name} ({columns_sql});"
        
        print(f"Executing {create_table_sql}...")
        cursor.execute(create_table_sql)
        
        #Confirm the table was created in the SQL server
        cursor.execute(
            f"""
            SELECT EXISTS(
                SELECT * FROM information_schema.tables 
                WHERE
                    table_type = 'BASE TABLE'
                    AND table_name = '{table_name}');
            """
        )
        table_check = cursor.fetchall()        
        
        try:
            if len(table_check) == 1 & table_check[0][0] == table_name:
                print(f"Table `{table_name}` created successfully. \n \n ")
        except Exception as err:
            print(f"Table creation unsuccessfull, Error: {err}")
            
        
    #####################
    # Table Insert Loop #
    #####################
    for filename, filepath in file_list:            
    
        table_name = os.path.splitext(filename)[0] 
        
        print(f"Processing file: {filename}...")
        
        try:
            with open(filepath,'r') as file:
                total_rows = sum(1 for _ in file) - 1 # Subtract 1 for the header row
        except FileNotFoundError:
            print(f"Error: File '{filepath}' not found.")
        
        print(f"Total rows in {filepath}: {total_rows}...")
        print(f"Chunking....")     
        
        total_chunks = math.ceil(total_rows / chunk_size)  # Number of chunks 
        
        print(f"Total chunks: {total_chunks}...")

        try:
            print(f"Begin loading data into table `{DBSTART}.{table_name}`...")
            #load the CSV to dataframe and load in chunks
            with pd.read_csv(filepath, chunksize=chunk_size) as data:
                    
                with tqdm(total=total_chunks, unit="chunks", desc=f"Loading {table_name}") as progress_bar:
                    try:
                        for chunk in data:
                            columns = chunk.columns.tolist()
                            records = [tuple(row) for row in chunk.to_numpy()]
                            insert_stmt = f"""
                                INSERT INTO {DBSTART}.{table_name} ({ ", ".join(f"{col}" for col in columns)}) 
                                VALUES ({','.join(['%s'] * len(chunk.columns))});
                                """
                            # print(insert_stmt)
                            cursor.executemany(insert_stmt, records)
                            conn.commit()
                            progress_bar.update(1)
                            
                    except mysql.connector.Error as err:
                        print(f"Error inserting chunk into table `{table_name}`: {err}")

                    except Exception as err:
                        print(f"Error loading chunk : {err}")

        except Exception as err:
            print(f"Error loading data into table `{table_name}`: {err}")     
               
    # Close DB connection
    cursor.close()
    conn.close()


def create_table(databse_name: str, table_name: str, columns: Dict[str, str]) -> None:
    """
    Creates a new MySQL table with the specified columns.

    :param table_name: The name of the table to create.
    :param columns: A dictionary where keys are column names and values are MySQL data types.
    :return: None
    """
    try:
        conn = mysql.connector.connect(**CONN_CONFIG)
        cursor = conn.cursor()
        columns_sql = ", ".join(f"`{col}` {dtype}" for col, dtype in columns.items())

        create_table_sql = f"CREATE TABLE IF NOT EXISTS `{databse_name}.{table_name}` ({columns_sql});"

        cursor.execute(create_table_sql)
        
         #Confirm the table was created in the SQL server
        cursor.execute(
            f"""
            SELECT EXISTS(
                SELECT * FROM information_schema.tables 
                WHERE
                        table_type = 'BASE TABLE'
                        AND table_name = '{table_name}');
            """
        )
        table_check = cursor.fetchall()        
        try:
            if len(table_check) == 1 & table_check[0][0] == table_name:
                print(f"Table `{table_name}` created successfully.")
        except Exception as err:
            print(f"Table not created successfully, Error: {err}")
            

        # Close DB connection
        cursor.close()
        conn.close()

    except Exception as err:
        print(f"Error: {err}")


    
# def load_to_table(table_name : str) -> None:
        
#     try:
#         #load the CSV to dataframe and index in blocks of 100 rows
#         data_df = pd.read_csv(filepath, chunksize=1000)
#         # find the number of chunks
#         num_chunks = sum(1 for _ in data_df)
#         counter = 0
        
#         with tqdm(total=num_chunks, unit="chunks", desc=f"Loading {table_name}") as progress_bar:

#             for chunk in data_df:
#                 try:
#                     chunk.to_sql(f"{DBSTART}.{table_name}", conn, if_exists='append', index=False)
#                     print(f"Chunk of data loaded into table `{table_name}` successfully.")
#                     counter += 1
                
#                 except Exception as err:
#                     print(f"Error loading chunk # {counter} into table `{table_name}`: {err}")
#                     break
                    
        
#             # Load data into the table
#             df.to_sql(table_name, conn, if_exists='append', index=False)
#             print(f"Data loaded into table `{table_name}` successfully.")
        
    
#     except Exception as err:
#         print(f"Error loading data into table `{table_name}`: {err}")

# download_file(CONTOSO_DOWNLOAD_LINK, CONTOSO_FILENAME) # works
# extract_archive(CONTOSO_FILENAME) # works
# create_database(DBSTART) # works
create_tables_from_csv(EXTRACT_DIR)

Found 8 CSV files in C:\Users\Abunch\Downloads\Contoso\.
Begin table creation for file: currencyexchange.csv...
Scaling factor for currencyexchange.csv: 0.001...
Total rows in C:\Users\Abunch\Downloads\Contoso\currencyexchange.csv: 10000000...
Inferring data types for column: Date in currencyexchange.csv...
Data type inffered for 'Date': DATE 
 
 
Inferring data types for column: FromCurrency in currencyexchange.csv...
Data type inffered for 'FromCurrency': VARCHAR(255) 
 
 
Inferring data types for column: ToCurrency in currencyexchange.csv...
Data type inffered for 'ToCurrency': VARCHAR(255) 
 
 
Inferring data types for column: Exchange in currencyexchange.csv...
Data type inffered for 'Exchange': DECIMAL(18,6) 
 
 
Executing  CREATE TABLE IF NOT EXISTS Contoso.currencyexchange (Date DATE, FromCurrency VARCHAR(255), ToCurrency VARCHAR(255), Exchange DECIMAL(18,6));...
Begin table creation for file: customer.csv...
Scaling factor for customer.csv: 0.001...
Total rows in C:\Users\Abun

ValueError: Input must be between 0 and 10,000,000

In [None]:
import math
    
csv_files = [r"C:\\Users\\Abunch\\Downloads\\Contoso\\date.csv"]        
file_list = [(os.path.basename(file), file) for file in csv_files]
conn = mysql.connector.connect(**CONN_CONFIG)
cursor = conn.cursor()
# Table Insert Loop
for filename, filepath in file_list:            
    
    table_name = os.path.splitext(filename)[0] 
    
    print(f"Processing file: {filename}...")
    
    try:
        with open(filepath,'r') as file:
            total_rows = sum(1 for _ in file) - 1 # Subtract 1 for the header row
    except FileNotFoundError:
        print(f"Error: File '{filepath}' not found.")
        total_rows = 0
    
    print(f"Total rows in {filepath}: {total_rows}...")
    print(f"Chunking....")     
    
    chunk_size = 100  # Number of rows per chunk
    total_chunks = math.ceil(total_rows / chunk_size)  # Number of chunks 
    counter = 1
    
    print(f"Total chunks: {total_chunks}...")

    try:
        print(f"Begin loading data into table `{DBSTART}.{table_name}`...")
        #load the CSV to dataframe and load in chunks
        with pd.read_csv(filepath, chunksize=chunk_size) as data:
                
            with tqdm(total=total_chunks, unit="chunks", desc=f"Loading {table_name}") as progress_bar:
                try:
                    for chunk in data:
                        columns = chunk.columns.tolist()
                        records = [tuple(row) for row in chunk.to_numpy()]
                        insert_stmt = f"""
                            INSERT INTO {DBSTART}.{table_name} ({ ", ".join(f"{col}" for col in columns)}) 
                            VALUES ({','.join(['%s'] * len(chunk.columns))})
                            """
                        # print(insert_stmt)
                        cursor.executemany(insert_stmt, records)
                        conn.commit()
                        progress_bar.update(1)
                        counter += 1
                        
                except mysql.connector.Error as err:
                    print(f"Error inserting chunk into table `{table_name}`: {err}")
                    break

                except Exception as err:
                    print(f"Error loading chunk : {err}")
                    break

    except Exception as err:
        print(f"Error loading data into table `{table_name}`: {err}")     

Processing file: date.csv...
Total rows in C:\\Users\\Abunch\\Downloads\\Contoso\\date.csv: 3653...
Chunking....
Total chunks: 37...
Begin loading data into table `Contoso.date`...


Loading date: 100%|██████████| 37/37 [00:00<00:00, 66.92chunks/s]


100