In [None]:
import sqlite3
import pandas as pd
import os
from joblib import Parallel, delayed


In [None]:
##################################
# Python SQLite Conversion Reader
##################################
last_db_path = None
sql_columns = []

class FromSQLite:
    pathDB = os.path.join(os.getcwd(), "basedata", "total_base_data.db")
    
    @staticmethod
    def infer_sql_type(dtype):
        if pd.api.types.is_integer_dtype(dtype):
            return "INTEGER"
        elif pd.api.types.is_float_dtype(dtype):
            return "REAL"
        elif pd.api.types.is_datetime64_any_dtype(dtype):
            return "DATETIME"
        else:
            return "TEXT"  # Default to TEXT for other types

    @staticmethod
    def getData(SQL_columns='*', importer=None, exporter=None, year=None, product_code=None, value=None, quantity=None, table_name="base_data", path=pathDB):
        """
        Fetch data from SQLite with optional filters.

        Args:
            SQL_columns (str): Columns to select. Default is '*'.
            importer (list, optional): List of import countries. Default is None.
            exporter (list, optional): List of export countries. Default is None.
            year (list, optional): List of years. Default is None.
            product_code (list, optional): List of product codes. Default is None.
            value (float, optional): Minimum value. Default is None.
            quantity (float, optional): Minimum quantity. Default is None.
            table_name (str, optional): Name of the table in the database. Default is "base_data".
            path (str, optional): Path to the database file. Default is pathDB.

        Returns:
            pd.DataFrame: Filtered data.
        """
        if not os.path.exists(path):
            print(f"ERROR: Database file '{path}' does not exist.")
            return None

        query = f"SELECT {SQL_columns} FROM {table_name} WHERE 1=1"
        params = []

        if exporter:
            placeholders = ', '.join('?' for _ in exporter)
            query += f" AND export_country IN ({placeholders})"
            params.extend(exporter)

        if importer:
            placeholders = ', '.join('?' for _ in importer)
            query += f" AND import_country IN ({placeholders})"
            params.extend(importer)

        if year:
            placeholders = ', '.join('?' for _ in year)
            query += f" AND year IN ({placeholders})"
            params.extend(year)

        if product_code:
            placeholders = ', '.join('?' for _ in product_code)
            query += f" AND code IN ({placeholders})"
            params.extend(product_code)

        if value:
            placeholders = ', '.join('?' for _ in value)
            query += f" AND value IN ({placeholders})"
            params.append(value)

        if quantity:
            placeholders = ', '.join('?' for _ in quantity)
            query += f" AND quantity IN ({placeholders})"
            params.append(quantity)

        try:
            with sqlite3.connect(path) as conn:
                df = pd.read_sql_query(query, conn, params=params)
        except sqlite3.OperationalError as e:
            print(f"SQL Error: {e}")
            return None

        return df

    def pushData(df, table_name='base_data', db_path=pathDB):
        """
        Push data into SQLite database, adding only new rows and handling missing columns.

        Args:
            df (pd.DataFrame): Data to push into the database.
            table_name (str): Target table name.
            db_path (str): Path to the SQLite database file.
        """
        global last_db_path, sql_columns  # Declare global variables

        db_path = db_path or FromSQLite.pathDB

        # Ensure the directory for the database exists
        if not os.path.exists(os.path.dirname(db_path)):
            os.makedirs(os.path.dirname(db_path))

        try:
            # Connect to the SQLite database
            with sqlite3.connect(db_path) as conn:
                cursor = conn.cursor()

                # Check if the table exists
                cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}';")
                table_exists = cursor.fetchone() is not None

                if not table_exists:
                    # Create the table with the DataFrame's schema
                    print(f"Table '{table_name}' does not exist. Creating it...")
                    df.head(0).to_sql(table_name, conn, if_exists='replace', index=False)

                # Check if the database path is the same as the last call
                if db_path != last_db_path:
                    # Cache the SQL column names for the table
                    cursor.execute(f"PRAGMA table_info({table_name});")
                    sql_columns = [row[1] for row in cursor.fetchall()]  # Get column names
                    last_db_path = db_path

                # Check for missing columns
                df_columns = df.columns.tolist()
                print(df_columns)
                missing_in_sql = set(df_columns) - set(sql_columns)

                # Add missing columns to the table if any
                if missing_in_sql:
                    for column in missing_in_sql:
                        col_type = FromSQLite.infer_sql_type(df[column].dtype)
                        cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN {column} {col_type};")
                    print(f"Added missing columns: {missing_in_sql}", end="")

                # Push data into the table
                # Use pandas' `to_sql` with `if_exists='append'` to handle insertion
                df.to_sql(table_name, conn, if_exists='append', index=False)

                print(f"Data successfully pushed to table '{table_name}'.", end="")

        except Exception as e:
            print(f"Error pushing data: {e}")

    @staticmethod
    def updateData(table_name, updates, conditions, db_path=None):
        """
        Update data in SQLite database based on conditions.
        """
        db_path = db_path or os.path.join(os.getcwd(), "basedata", "total_base_data.db")

        try:
            with sqlite3.connect(db_path) as conn:
                cursor = conn.cursor()
                update_clause = ', '.join(f"{k} = ?" for k in updates.keys())
                condition_clause = ' AND '.join(f"{k} = ?" for k in conditions.keys())
                query = f"UPDATE {table_name} SET {update_clause} WHERE {condition_clause}"
                params = list(updates.values()) + list(conditions.values())
                cursor.execute(query, params)
                conn.commit()
        except Exception as e:
            print(f"Error updating data: {e}")

In [None]:
def whole_file_processing(file, product_codes, country_codes, test = None):
    file_path = os.path.join(os.getcwd(), "dataset", file)
    df = pd.read_csv(file_path, low_memory=False)

    df["k"] = df["k"].astype(str)  # Convert to string
    product_codes["code"] = product_codes["code"].astype(str) 
    # First merge for column 'i'
    df = pd.merge(
        left=df,
        right=country_codes.rename(columns={"country_code": "country_code_i"}),
        left_on="i",
        right_on="country_code_i",
        how="left"
    ).rename(columns={"country_name": "export_country", "country_iso3": "export_country_iso3"})

    df = pd.merge(
        left=df,
        right=country_codes.rename(columns={"country_code": "country_code_j"}),
        left_on="j",
        right_on="country_code_j",
        how="left"
    ).rename(columns={"country_name": "import_country", "country_iso3": "import_country_iso3", "t": "year", "k":"code", "v":"value", "q":"quantity"}).drop(
        columns=["country_iso2_x", "country_iso2_y", "country_code_i", "country_code_j", "i", "j","export_country_iso3","import_country_iso3"], axis=1
    )

    if df is None:
        print(f"Skipping file: {file}")
        return None
    
    if test:
        return df
    else:
        FromSQLite.pushData(df)
        print(f'{file} succesfully pushed to SQLite', end="")
    
    



In [None]:
#####################
# SQLite Processing
#####################
pathDB = os.path.join(os.getcwd(), "basedata", "total_base_data.db")
if os.path.exists(pathDB):
    print("Loading precomputed file")
    df = FromSQLite.getData()
    
else:
    files_to_process = [
        file for file in os.listdir(os.path.join(os.getcwd(), "dataset"))
        if file.startswith("BACI_") and file.endswith(".csv")
    ]
    print(files_to_process, end="")

    country_codes = pd.read_csv(os.path.join("dataset", "country_codes_V202401b.csv"))
    product_codes = pd.read_csv(os.path.join("dataset", "product_codes_HS22_V202401b.csv"))

    Parallel(n_jobs=1, backend='loky')(
        delayed(lambda file : FromSQLite.pushData(whole_file_processing(file, product_codes, country_codes), table_name="base_data", db_path=pathDB))
        (file) for file in files_to_process
    )