# import stuff

In [67]:
from json import dumps, loads
import logging
import mysql.connector
from mysql.connector import errorcode
import os
import pandas as pd
from pathlib import Path
from typing import List, LiteralString, Sequence, Tuple, Union
import time
import traceback

debug_mode = True
if debug_mode:
    import inspect

regenerate_db = True


# CONNECT TO DATABASE

In [68]:
def _connect(host, user, pwd, db=""):
    if db:
        return mysql.connector.connect(host=host, user=user, password=pwd, database=db) 
    return mysql.connector.connect(host=host, user=user, password=pwd)

# Credentials
host, user, pwd = "localhost", "root", "ROOTroot1"

# Database
db_name = "z_scans"

try:
    # Make connection
    connection = _connect(host, user, pwd)
    print(f"Connection established on '{host}' with '{user}' access.")
    connection.autocommit = True
    cursor = connection.cursor()

    # Attempt database creation
    if regenerate_db:
        cursor.execute(f"DROP DATABASE IF EXISTS `{db_name}`;")  # This is only to start a-fresh.
    cursor.execute(f"CREATE DATABASE IF NOT EXISTS `{db_name}`;")  # backticks are for safety if db_name becomes a user input in future
    print(f"Database {db_name} created successfully.")

except mysql.connector.Error as err:
    if err.errno == errorcode.CR_UNKNOWN_HOST:
        print(f"Failed to connect to {host}. Unknown host.")
        exit()
    elif err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
        print("Access denied. User name or password is wrong.")
        exit()
    elif err.errno == errorcode.ER_DB_CREATE_EXISTS:
        pass
    else:
        print(err)
        exit()

finally:
    cursor.execute(f"USE `{db_name}`;")
    print(f"Using {db_name} as default database.")

Connection established on 'localhost' with 'root' access.
Database z_scans created successfully.
Using z_scans as default database.


# CREATE TABLES

## CREATE TABLE AllSamples
and make sure the default values are inserted, otherwise exit.

In [69]:
# Create general table containing all samples (this creates history of samples)

def create_allsamples():
    """Creates table AllSamples that contains entries with unique samples. Uniqueness is defined through values given in particular columns (column names are given in parentheses):
        
        * silica entries are defined by assigning silica thickness in [mm] (`silica_thicknessMM`)
        * samples are defined by assigning sample solvent name (`sample_solvent`) and wt. % concentration (`sample_concentration`)
        * solvents are defined by assigning solvent density in [g/mL] (`solvent_density`) and linear refractive index (`solvent_index`)
    """
    
    try:
        create_table_query = f"""CREATE TABLE IF NOT EXISTS AllSamples (
            sample_id INT AUTO_INCREMENT PRIMARY KEY,
            sample_code VARCHAR(255) NOT NULL,
            sample_type ENUM('silica','solvent','sample') DEFAULT NULL,
            sample_solvent VARCHAR(255),
            sample_concentration DECIMAL(5, 3),
            silica_thicknessMM FLOAT,
            solvent_density FLOAT,
            solvent_index FLOAT,
            UNIQUE (sample_code, sample_solvent, sample_concentration), -- this defines samples
            UNIQUE (sample_code, solvent_density, solvent_index), -- this defines solvents
            UNIQUE (sample_code, silica_thicknessMM) -- this defines silicas
            );
        """
        cursor.execute(create_table_query)
        print("Table 'AllSamples' created successfully.")
    
    except mysql.connector.Error as err:
        if err.errno == errorcode.ER_TABLE_EXISTS_ERROR:
            print("Couldn't create table 'AllSamples'.")
            print(err)
        else:
            print(err)
        return None
    
    except Exception as err:
        print(f"Error: {err}")
        return None
    
    else:
        return True
    

def insert_default_silicas_allsamples(thickness: Union[int, float, tuple]):
    try:
        insert_default_silicas_query = """INSERT IGNORE
                INTO AllSamples 
                    (sample_code, silica_thicknessMM)
                VALUES
                    ('silica', %s);
                """
                
        if isinstance(thickness, (int, float)):
            cursor.execute(insert_default_silicas_query, (thickness,))
        elif isinstance(thickness, tuple):
            for t in thickness:
                cursor.execute(insert_default_silicas_query, (t,))
        else:
            raise AssertionError
        
        print("Silica entries inserted successfully.")
    
    except mysql.connector.Error as e:
        print("Couldn't insert silicas into table 'AllSamples'.")
        print(err)
        return None
    
    except AssertionError:
        print("Unexpected argument format. Required int, float, or tuple.")
        return None
    
    except Exception as e:
        print(f"Error: {e}")
        return None
    
    else:
        return True


def insert_default_solvents_allsamples(solvents: dict):
    try:
        if not isinstance(solvents, dict):
            raise TypeError("Dictionary expected in the format {solvent_name: {'density': value, 'index': value}")
        
        for solvent in solvents:
            add_solvent_query = """INSERT IGNORE
                INTO AllSamples (sample_code, solvent_density, solvent_index)
                VALUES (%s, %s, %s);"""
            cursor.execute(add_solvent_query, (solvent, solvents[solvent]['density'], solvents[solvent]['index']))
            
            if debug_mode:
                print(f"{solvent} added successfully.")
            
        print("Solvent entries added successfully.")
    
    except mysql.connector.Error as e:
        print("Couldn't insert solvents into table 'AllSamples'.")
        print(err)
        return None
    
    except Exception as e:
        print(f"Some error occured: {e}")
        return None
    
    else:
        return True


def update_sample_type_allsamples():
    try:
        update_sample_type_query = f"""UPDATE AllSamples
            SET sample_type = CASE
                WHEN silica_thicknessMM IS NOT NULL
                    THEN 'silica'
                WHEN solvent_density IS NOT NULL
                    AND solvent_index IS NOT NULL
                    THEN 'solvent'
                WHEN sample_solvent IS NOT NULL
                    AND sample_concentration IS NOT NULL
                    THEN 'sample'
                ELSE sample_type  -- Keep existing sample_type if no condition matches
            END
            WHERE sample_type IS NULL  -- If not defined yet
            ;
        """
        cursor.execute(update_sample_type_query)
        print("Updated sample_type for all entries that were NULL.")
        
    except mysql.connector.Error as err:
        print("Couldn't update sample_type in 'AllSamples' table.")
        print(err)
        return None
    
    except Exception as e:
        print(f"Some error occured: {e}")
        return None
    
    else:
        return True


solvents = {
            "Butanol": {
                "density": 0.8098,
                "index": 1.3993
            },
            "Chloroform": {
                "density": 1.484,
                "index": 1.4459
            },
            "DCM": {
                "density": 1.3255,
                "index": 1.4244
            },
            "DMF": {
                "density": 0.9445,
                "index": 1.4305
            },
            "DMSO": {
                "density": 1.101,
                "index": 1.4783
            },
            "DMSO (Deuterated)": {
                "density": 1.19,
                "index": 1.476
            },
            "Ethanol": {
                "density": 0.7893,
                "index": 1.3611
            },
            "THF": {
                "density": 0.8833,
                "index": 1.405
            },
            "Toluene": {
                "density": 0.8623,
                "index": 1.4967
            },
            "Water": {
                "density": 1,
                "index": 1.333
            },
            "Water (Deuterated)": {
                "density": 1.1044,
                "index": 1.3283
            }
        }

is_allsamples = create_allsamples()
if is_allsamples:
    is_default_silicas = insert_default_silicas_allsamples(thickness=(3,4))
    if is_default_silicas:
        is_default_solvents = insert_default_solvents_allsamples(solvents)

is_sample_type = update_sample_type_allsamples()

if not all([is_allsamples, is_default_silicas, is_default_solvents, is_sample_type]):
    exit()


Table 'AllSamples' created successfully.
Silica entries inserted successfully.
Butanol added successfully.
Chloroform added successfully.
DCM added successfully.
DMF added successfully.
DMSO added successfully.
DMSO (Deuterated) added successfully.
Ethanol added successfully.
THF added successfully.
Toluene added successfully.
Water added successfully.
Water (Deuterated) added successfully.
Solvent entries added successfully.
Updated sample_type for all entries that were NULL.


## CREATE TABLE AllMeasurements
For holding measurement results.

In [70]:
# create general table with measurements (this creates history of measurements)
try:
    query = """CREATE TABLE IF NOT EXISTS AllMeasurements (
        meas_id INT AUTO_INCREMENT PRIMARY KEY,
        meas_UnixDate BIGINT NOT NULL COMMENT 'Date in seconds from epoch',
        sample_id INT NOT NULL,
        sample_code VARCHAR(255) NOT NULL,
        sample_type ENUM('silica','solvent','sample') NOT NULL,
        meas_result JSON NOT NULL,
        SilicaThicknessMM FLOAT NOT NULL,
        Concentration FLOAT COMMENT 'expressed in wt. %',
        Wavelength FLOAT NOT NULL COMMENT 'Wavelength set by user',
        LaserFrequecy FLOAT NOT NULL COMMENT 'Pulsed laser frequency',
        NumberOfScans INT NOT NULL,
        FOREIGN KEY (sample_id) REFERENCES AllSamples(sample_id)
        );"""
    
    cursor.execute(query)
    print("Table 'AllMeasurements' created successfully.")
except mysql.connector.Error as err:
    raise err
except Exception:
    print("Couldn't create table 'AllMeasurements'.")

Table 'AllMeasurements' created successfully.


## CREATE TABLE AllFittingResults
For holding fitting results.

In [71]:
try:
    query = """CREATE TABLE IF NOT EXISTS AllFittingResults (
        fitting_id INT AUTO_INCREMENT PRIMARY KEY,
        meas_id INT,
        sample_code VARCHAR(256) NOT NULL,
        wavelength FLOAT NOT NULL COMMENT '[nm]',
        Re_gamma FLOAT NOT NULL COMMENT '[10^-36]',
        Re_gamma_err FLOAT NOT NULL COMMENT '[10^-36]',
        Im_gamma FLOAT NOT NULL COMMENT '[10^-36]',
        Im_gamma_err FLOAT NOT NULL COMMENT '[10^-36]',
        sigma2 FLOAT NOT NULL COMMENT '[GM]',
        sigma2_err FLOAT NOT NULL COMMENT '[GM]',
        sigma3 FLOAT NOT NULL COMMENT '[10^-80 cm6s2]',
        sigma3_err FLOAT NOT NULL COMMENT '[10^-80 cm6s2]',
        error_log VARCHAR(256) DEFAULT NULL COMMENT 'Holds error information',
        FOREIGN KEY (meas_id) REFERENCES AllMeasurements(meas_id)
        );"""
                    
    cursor.execute(query)
    print("Table 'AllFittingResults' created successfully.")
except mysql.connector.Error as err:
    raise err
except Exception:
    print("Couldn't create table 'AllFittingResults'.")

Table 'AllFittingResults' created successfully.


# INSERT DATA

## 1) Check if sample is in DB or insert it

In [72]:
# Get sample_id from AllSamples table
def get_sample_id(sample_code, sample_type,
                  sample_solvent=None, sample_concentration=None,
                  solvent_density=None, solvent_index=None,
                  silica_thicknessMM=None):
    
    def get_id(query_core, query_sample_id, query_params):
        query = query_core + query_sample_id
        cursor.execute(query, query_params)
        result = cursor.fetchone()
        if result:
            return result[0]
        else:
            return None

    required_parameters = {'silica': [('silica_thicknessMM',silica_thicknessMM)],
                'solvent': [('solvent_density', solvent_density),('solvent_index', solvent_index)],
                'sample': [('sample_solvent', sample_solvent),('sample_concentration', sample_concentration)]}
    
    if sample_type not in required_parameters:
        print("Error: Invalid sample type provided.")
        return None
    
    query_core = "SELECT sample_id FROM AllSamples WHERE ;"
    query_core  = query_core.rstrip(';')
    
    # make sure that the sample is identified properly, or: 
    # if sample_code exists and its sample_type as provided is defined in the db.
    # E.g.: if the sample_code is 'DCM' (which is a solvent) and should be treated as sample_type='sample'
    # it has to have such entry in the db, so finally there might be one entry for
    # 'DCM' as solvent type and one for 'DCM' as sample type.
    
    query_type = "sample_code = %s and sample_type = %s"
    cursor.execute(query_core+query_type, (sample_code, sample_type))
    results = cursor.fetchmany(size=2)  # no more results are required to prove the following statements
    
    print(f"{results=}")
    if not results:
        print(f"Error: No sample named '{sample_code}' that is of type '{sample_type}' was found in the database.")
        return None
    
    missing = [item[0] for item in required_parameters[sample_type] if item[1] is None]
    if missing:
        if len(results) > 1:  # db doesn't know which entry to return
            print(f"Error: Multiple results for '{sample_code}' were found in AllSamples. Provide:",
                  ", ".join(item[0] for item in required_parameters[sample_type] if item[1] is None))
            return None
        
        else:  # db will return the unique entry
            print("No optional arguments provided. Unique entry found. Returning the id.")
            query_sample_id = "sample_code = %s"
            query_params = (sample_code, )
            return get_id(query_core, query_sample_id, query_params)
    
    # else the database will try to return the specific entry
    conditions = [f"{param[0]} = %s" for param in required_parameters[sample_type] if param[1] is not None]
    query_sample_id = f"sample_code = %s AND " + " AND ".join(conditions)
    query_params = (sample_code,) + tuple(param[1] for param in required_parameters[sample_type] if param[1] is not None)
    
    id = get_id(query_core, query_sample_id, query_params)
    if id is None:
        print("No entry found for at least one parameter:",
              ", ".join([*[f"{param[0]}={param[1]}" for param in required_parameters[sample_type]]]))
    
    return id


# Define writing the sample measured into the AllSamples table prior to saving the measurement
# (get_sample_id requires the sample to exist there)
def write_sample_query():
    query = """
        INSERT INTO AllSamples
        (sample_code, sample_solvent, sample_concentration)
        VALUES
        (%s, %s, %s);
    """
    return query


def write_solvent_query():
    query = """
        INSERT INTO AllSamples
        (sample_code, solvent_density, solvent_index)
        VALUES
        (%s, %s, %s);
    """
    return query


def write_silica_query():
    query = """
        INSERT INTO AllSamples
        (sample_code, silica_thicknessMM)
        VALUES
        (%s, %s);
    """
    return query


def write_sample_to_db(sample_code, sample_type,
                  sample_solvent=None, sample_concentration=None,
                  solvent_density=None, solvent_index=None,
                  silica_thicknessMM=None):
    
    required_parameters = {'silica': [('silica_thicknessMM',silica_thicknessMM)],
                'solvent': [('solvent_density', solvent_density),('solvent_index', solvent_index)],
                'sample': [('sample_solvent', sample_solvent),('sample_concentration', sample_concentration)]}
    
    def check_kwargs(is_found):
        # Check for correctness of kwargs
        missing = []
        if sample_type.lower() in required_parameters:
            for param in required_parameters[sample_type.lower()]:
                if param[1] is None:
                    missing.append(param[0])
        
        if is_found:
            if len(is_found) > 1:
                print('Sample exists. Multiple entries. Be more specific if you want new sample.')
                if missing:
                    print("Provide:", ", ".join(missing))
            else:
                print('One entry found. Be more specific if you want to add new sample.')
                if missing:
                    print("Provide:", ", ".join(missing))
        else:
            print(f'Be more specific to identify the {sample_type}.')
            if missing:
                print("Provide:", ", ".join(missing))
        
        return missing
    
    # Check if the sample exists in the database
    query = """
        SELECT * FROM AllSamples WHERE sample_code = %s;
    """
    cursor.execute(query, (sample_code,))
    is_found = cursor.fetchmany(size=2)
    if debug_mode:
        print(is_found)
    
    # when sample is missing from db
    # check if the parameters are unambiguous
    # and insert the sample (this is the main functionality of the method)
    if not is_found:
        missing = check_kwargs(is_found)  # if some parameters are missing inform the user
        if missing:
            return
        else:
            match sample_type:
                case "sample":
                    query = write_sample_query()
                case "solvent":
                    query = write_solvent_query()
                case "silica":
                    query = write_silica_query()
            
            params = []
            for param in required_parameters[sample_type.lower()]:
                params.append(param[1])
            cursor.execute(query, params)
    
    sample_id = None
    return sample_id

# from values in program provide the required set of parameters
# if sample, provide solvent and concentration
sample_id = write_sample_to_db(
    sample_code='silica',
    sample_type="silica")
print(sample_id)

[(1, 'silica', 'silica', None, None, 3.0, None, None), (2, 'silica', 'silica', None, None, 4.0, None, None)]
None


## 2) Save measurement in db

In [73]:
def write_measurement_to_db(file_or_meas_data, sample_code, sample_type,
                            sample_solvent=None, sample_concentration=None,
                            solvent_density=None, solvent_index=None,
                            silica_thicknessMM=None):
    """Writes to 'dbname'.measurements column meas_result in JSON format."""
    
    sample_id = get_sample_id(sample_code, sample_type, sample_solvent, sample_concentration, solvent_density, solvent_index, silica_thicknessMM)
    
    if not sample_id:
        print("Failed to write measurement to database.")
        return
    
    column_names = ['Step', 'CA Channel [V]','Ref Channel [V]','OA Channel [V]', 'Empty Channel [V]']
    
    if os.path.exists(file_or_meas_data):
        df = pd.read_csv(file_or_meas_data, sep="\s+", names=column_names)
        json_data = df.to_json(orient='values')
        # sample_solvent = from file/database?
        # sample_concentration = from file
        # solvent_density = from database?
        # solvent_index = from database?
        # silica_thicknessMM = from file
        # wavelength = from file
        
    else:
        # if data behaves itself to be formatted in json, do something
        # json_data = file_or_meas_data or something like this
        # wavelength = from interface
        pass
    
    num_scans = 1  # # from program interface/settings or from file
    wavelength = 800  # from program interface/settings or from file
    laser_freq = 1000  # from program interface/settings
    if silica_thicknessMM is None:
        silica_thicknessMM = 4  # from program interface/settings or from file
    
    # print(dumps(loads(json_data),indent=4))
    query  = """INSERT INTO AllMeasurements
            (meas_UnixDate, sample_id, sample_code, sample_type,
            meas_result, SilicaThicknessMM,
            Concentration, Wavelength,
            LaserFrequecy, NumberOfScans)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s);"""
    cursor.execute(query, (time.time(),
                           sample_id, sample_code, sample_type,
                           json_data, silica_thicknessMM,
                           sample_concentration, wavelength,
                           laser_freq, num_scans))

fpath = "C:\\data\\2023_11_30\\2023_11_30__11_05__silica_0-00_475-0.txt"

write_measurement_to_db(fpath, 'sample1', 'sample')

results=[]
Error: No sample named 'sample1' that is of type 'sample' was found in the database.
Failed to write measurement to database.


In [74]:
pd.set_option('display.expand_frame_repr', False)  # Prevent DataFrame from wrapping

# Get data from files
path = "C:\\figures"

def read_files(path):
    for file in os.listdir(path):
        sample_code = Path(file).stem
        df = pd.read_table(os.path.join(path, file), skiprows=[1])
        
        if debug_mode:
            print(sample_code)
            print(df)
            print()
        
        process_file(sample_code, df)

def process_file(sample_code, df):
    for index, row in df.iterrows():
        values = [sample_code,
                row['lambda'],
                row['Re_gamma'],
                row['error(Re_gamma)'],
                row['Im_gamma'],
                row['error(Im_gamma)'],
                row['sigma2'],
                row['error(sigma2)'],
                row['sigma3'],
                row['error(sigma3)'],
                None]
        validate_and_insert_data(values, df.columns, index)

def validate_and_insert_data(values, columns, index):
    # Insert some data
    query = """INSERT INTO AllFittingResults 
        (sample_code, wavelength, Re_gamma, Re_gamma_err, Im_gamma, Im_gamma_err, sigma2, sigma2_err, sigma3, sigma3_err, error_log)
    VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);"""
    
    for i,v in enumerate(values[1:-1]):
        try:
            if not isinstance(v, float):
                if isinstance(v, int):
                    values[i+1] = float(v)
                elif isinstance(v, str):
                    try:
                        values[i+1] = float(v)
                    except ValueError:
                        handle_value_error(values, columns, i, v, index)
                else:
                    handle_value_error(values, columns, i, v, index)
        except TypeError as e:
            print(e)
        
    try:
        cursor.execute(query, values)
    except mysql.connector.Error as err:
        handle_db_error(err)
    except Exception as e:
        logging.error(traceback.format_exc())

def handle_value_error(values, columns, i, v, index):
    values[i + 1] = 0
    values[-1] = f"Float expected in `{columns[i]}`, data row {index + 1}. Found {type(v).__name__}: `{v}`"
    print(f"`{columns[i]}` in row {index} = {v}. Data type is {type(v).__name__}. Float expected. Replacing with 0 (zero).")

def handle_db_error(err):
    if err.errno == errorcode.ER_WARN_DATA_OUT_OF_RANGE:
        print("Database cannot store such big/small numbers.")
    else:
        print(err)
    logging.error(traceback.format_exc())
    
read_files(path)

prism1
    lambda   Re_gamma  error(Re_gamma)  Im_gamma  error(Im_gamma)   sigma2  error(sigma2)    sigma3  error(sigma3)
0    800.0 -3671.8989         105.4344   57.4295           5.0130  15.0707         1.3155   58.5908         5.1143
1    825.0 -2199.1851         179.3592  -42.0847           6.0228 -10.3847         1.4862    0.0000         0.0000
2    850.0  -593.3358         142.2461  186.3883          42.1616  43.3271         9.8007  105.5817        23.8829
3    875.0  -936.1743         117.0287  191.2120          31.1672  41.9448         6.8369    0.0000         0.0000
4    900.0 -2150.5628         133.0316    0.0000          15.3073   0.0000         3.1739    0.0000         7.9383
5    925.0  -275.8844         226.2643  207.5456          76.5626  40.7389        15.0284    0.0000         0.0000
6    950.0  -260.9847         208.8306   62.9183          31.3215  11.7087         5.8287   16.9073         8.4167
7   1000.0  -550.4078          92.4608  158.1124          31.8802  26.554

# DISPLAY TABLE

In [75]:
def display_table(db : LiteralString, table_name : LiteralString, order_by_dir=None, debug_mode=True) -> None:
    """Display nicely formatted table `table_name` that is (un-)ordered according to `order_by_dir`.

    ## Args:
        **table_name** (*str*): Table name existing in current database
        **order_by_dir** (*list*, optional): Accepts list of tuples for multiple sorting. Accepts single 2-tuple as well. Defaults to `None`.

    ## Example usage:
    
        display_table('dummy_table', order_by_dir = 'column1')  # sorts 'column1' ASCENDING by default
        display_table('dummy_table', order_by_dir = ['column1','column4'])  # sorts 'column1' ASCENDING and then 'column4' ASCENDING by default
        display_table('dummy_table', order_by_dir = ('column1','asc'))  # sorts 'column1' ASCENDING
        display_table('dummy_table', order_by_dir = ('column1','DESC'))  # sorts 'column1' DESCENDING
        display_table('dummy_table', order_by_dir = ['column1', ('column4','ASC'))  # sorts 'column1' ASCENDING by default and then 'column4' DESCENDING
    """
    
    def is_valid_order_by_dir(order_by_dir, debug_mode=True) -> bool:
        '''Check if the `order_by_dir` is properly formatted.'''
        if debug_mode:
            fn_name = f"[{inspect.stack()[0][3]}]: "
        else:
            fn_name = ""
        
        if isinstance(order_by_dir, str):  # case 1
            if debug_mode:
                print(f"{fn_name}{order_by_dir} is a string")
            return True  # a single column name is valid

        elif isinstance(order_by_dir, list):  # case 2
            if debug_mode:
                print(f"{fn_name}{order_by_dir} is a list")
            
            for index, item in enumerate(order_by_dir):
                if isinstance(item, str):  # column name
                    if debug_mode:
                        print(f"{fn_name}Item [{index}] is a string")
                    continue
                elif isinstance(item, tuple) and len(item) == 2:  # a column and its direction
                    if debug_mode:
                        print(f"{fn_name}Item [{index}] is a 2-tuple")
                        
                    column, direction = item
                    if not isinstance(column, str) or direction.lower() not in ['asc', 'desc']:
                        if debug_mode:
                            print(f"{fn_name}first or second item is doesn't conform to specifications.")
                        return False
                else:
                    if debug_mode:
                        print(f"{fn_name}Item [{index}] is misformatted.")
                    return False
            return True

        elif isinstance(order_by_dir, tuple):  # case 3
            if len(order_by_dir) == 2:
                if debug_mode:
                    print(f"{fn_name}{order_by_dir} is a 2-tuple")
                column, direction = order_by_dir
                if isinstance(column, str) and direction.lower() in ['asc', 'desc']:
                    return True
            else:
                if debug_mode:
                    print(f"{fn_name}{order_by_dir} is a misformatted tuple")
        return False
    
    def format_params(input_data, whitelist, debug_mode=True) -> LiteralString | str | None:
        """Format `order by` clause parameters according to user provided parameters

        Args:
            input_data (_type_): parameters

        Returns:
            Formatted `order by` clause parameters
        """
        if debug_mode:
            fn_name = f"[{inspect.stack()[0][3]}]: "
        else:
            fn_name = ""
        
        result = []
        
        if isinstance(input_data, list):
            for element in input_data:
                if isinstance(element, str):
                    if element in whitelist:
                        result.append(element)  # Append strings directly
                    else:
                        print(f"{fn_name}'{element}' is not allowed.")
                elif isinstance(element, tuple):
                    if element[0] in whitelist:
                        result.append(' '.join(element))  # Join tuple elements with space
                    else:
                        print(f"{fn_name}'{element[0]}' is not allowed.")
            return ', '.join(result)  # Join everything with comma
        
        elif isinstance(input_data, tuple):
            if input_data[0] in whitelist:
                return ' '.join(input_data)
            else:
                print(f"{fn_name}'{element[0]}' is not allowed.")
                return None
        
        elif isinstance(input_data, str):
            if input_data in whitelist:
                result.append(input_data)  # Append strings directly
            else:
                print(f"{fn_name}'{input_data}' is not allowed.")
        else:
            return None

    if debug_mode:
        fn_name = f"[{inspect.stack()[0][3]}]: "
    else:
        fn_name = ""
    
    # SANITIZE table_name PARAMETER
    query = "SHOW TABLES;"
    cursor.execute(query)
    tables_in_db = [i[0] for i in cursor.fetchall()]
    
    if debug_mode:
        print(f"{fn_name}Tables found in {db} database: {tables_in_db}")
    
    # Allow table names existing in the database
    if table_name.lower() not in tables_in_db:
        print(f"{fn_name}Table doesn't exist: {table_name}")
        return
    
    query_core = f"SELECT * FROM {table_name};"  # already sanitized; semicolon required by SQL syntax highlighter
    query_core = query_core.rstrip(';')  # required for concatenation of following ORDER BY
    
    ## SANITIZE order_by_dir PARAMETER
    sanitize_columns = f"SHOW COLUMNS FROM {table_name};"  # already sanitized
    cursor.execute(sanitize_columns)
    
    # Allow only whitelisted column names in the selected table
    allowed_cols = [i[0] for i in cursor.fetchall()]
    
    if order_by_dir:
        if debug_mode:
            print(f"{fn_name}order_by_dir is not None.")
            print(f"{fn_name}validating order_by_dir.")
        
        if is_valid_order_by_dir(order_by_dir, debug_mode):
            formatted_clause = format_params(order_by_dir, allowed_cols, debug_mode)
            if formatted_clause:
                order_by_clause = f"ORDER BY {formatted_clause}"  # Format ORDER BY clause; already sanitized
                query = f"{query_core} {order_by_clause}"  # Append ORDER BY clause
            else:
                print(f"{fn_name}Returning the as-stored table.")
            if debug_mode:
                print(f"{fn_name}{query=}")
            cursor.execute(query)
        else:
            cursor.execute(query)
    else:
        cursor.execute(query)

    # Fetch all rows from the executed query
    rows = cursor.fetchall()

    # Get column names
    column_names = [i[0] for i in cursor.description]

    query = """SELECT COLUMN_COMMENT
        FROM INFORMATION_SCHEMA.COLUMNS
        WHERE TABLE_NAME = %s;
    """
    cursor.execute(query, (table_name, ))

    # Get column comments
    column_comments = [i[0] for i in cursor.fetchall()]

    # Determine the max width for each column for formatting
    max_widths = [max(len(str(item)) for item in column) for column in zip(column_names, column_comments, *rows)]

    # Format and print the column names and comments
    header_format = " | ".join(f"{{:^{width}}}" for width in max_widths)  # :^ is for center-align
    header_format_params = " | ".join(f"{{:>{width}}}" for width in max_widths)  # :> is for right-align

    print()
    print(header_format.format(*column_names))
    print(header_format.format(*column_comments))
    print("-" * (sum(max_widths) + 3 * (len(column_names) - 1)))  # Adjust separator line based on content
    # Iterate over the rows and print each
    for row in rows:
        values = [str(value) if value is not None else "" for value in row]
        print(header_format_params.format(*values))
    
display_table(db_name, "AllFittingResults", order_by_dir=[("sample_code","desc"), ("wavelength","desc")], debug_mode=debug_mode)

[display_table]: Tables found in z_scans database: ['allfittingresults', 'allmeasurements', 'allsamples']
[display_table]: order_by_dir is not None.
[display_table]: validating order_by_dir.
[is_valid_order_by_dir]: [('sample_code', 'desc'), ('wavelength', 'desc')] is a list
[is_valid_order_by_dir]: Item [0] is a 2-tuple
[is_valid_order_by_dir]: Item [1] is a 2-tuple
[display_table]: query='SELECT * FROM AllFittingResults ORDER BY sample_code desc, wavelength desc'

fitting_id | meas_id | sample_code | wavelength | Re_gamma | Re_gamma_err | Im_gamma | Im_gamma_err |  sigma2  | sigma2_err |     sigma3     |   sigma3_err   |        error_log       
           |         |             |    [nm]    | [10^-36] |   [10^-36]   | [10^-36] |   [10^-36]   |   [GM]   |    [GM]    | [10^-80 cm6s2] | [10^-80 cm6s2] | Holds error information
-----------------------------------------------------------------------------------------------------------------------------------------------------------------

# Make some operations on the table

# CLOSE CONNECTION

In [76]:
# Close the cursor and connection
cursor.close()
connection.close()