In [1]:
import pandas as pd
from sqlalchemy import create_engine
import os
import re
import logging
from dotenv import load_dotenv

In [2]:
# Load environment variables from a .env file (if using)
load_dotenv()

False

In [3]:
# Configure logging
logging.basicConfig(
    filename='import_log.log',
    level=logging.INFO,
    format='%(asctime)s:%(levelname)s:%(message)s'
)

In [6]:
# Database connection string from environment variable
DATABASE_URI = os.getenv('DATABASE_URI', 'postgresql://postgres:postgres@localhost:5432/LMIA_db')

In [7]:
if not DATABASE_URI:
    logging.error('DATABASE_URI not found. Please set it in the environment variables.')
    raise ValueError('DATABASE_URI not found. Please set it in the environment variables.')

In [8]:
# Create SQLAlchemy engine
engine = create_engine(DATABASE_URI)

In [9]:
# Path to the raw_data folder
RAW_DATA_PATH = os.getenv('RAW_DATA_PATH', 'E:\DE_project_LMIA vs jobs\raw_data')

In [10]:
# Regular expression pattern to extract table number
FILE_PATTERN = r'Table(\d+)_.*\.(csv|xlsx)$'

In [11]:
def get_table_name(filename):
    """
    Extracts the table number from the filename and returns the corresponding PostgreSQL table name.
    """
    match = re.match(FILE_PATTERN, filename, re.IGNORECASE)
    if match:
        table_number = match.group(1)
        # Define table mappings based on table number
        table_mapping = {
            '02': 'tfw_lmia_csd_province',
            '03': 'tfw_lmia_economic_region',
            '04': 'tfw_lmia_noc_skilllevel_province',
            '05': 'tfw_lmia_noc_skilltype_province',
            '06': 'tfw_lmia_naics_province',
            '07': 'tfw_lmia_agriculture_province',
            '08': 'tfw_lmia_caregivers_noc_province',
            '10': 'tfw_top_countries_residency_province_2020_2023',
            '12': 'tfw_positions_requested_province_2016_2023',
            '13': 'tfw_positive_lmias_by_stream_province_2016_2023',
            '14': 'positive_lmias_by_stream_province_2016_2023',
            '16': 'positive_lmias_by_noc_skilllevel_province_2016_2023',
            '17': 'positive_lmias_by_noc_skilltype_province_2016_2023',
            '18': 'positive_lmias_by_naics_province_2016_2023',
            '19': 'unique_employers_positive_lmias_by_stream_province_2016_2023',
            '20': 'unique_employers_positive_lmias_by_economicregion_province_2016_2023',
            '21': 'unique_employers_positive_lmias_by_naics_province_2016_2023',
            '22': 'tfw_positive_lmias_by_economicregion_stream_province_2016_2023',
            '23': 'tfw_positive_lmias_by_economicregion_noc_skilllevel_province_2016_2023',
            '24': 'tfw_positive_lmias_by_economicregion_noc_skilltype_province_2016_2023'
            # Add more mappings as necessary
        }
        return table_mapping.get(table_number, None)
    else:
        return None

In [12]:
def import_file_to_postgres(file_path, table_name, file_extension):
    """
    Imports a CSV or Excel file into the specified PostgreSQL table with error handling.
    """
    try:
        # Read the file based on its extension
        if file_extension.lower() == 'csv':
            df = pd.read_csv(file_path, encoding='utf-8', error_bad_lines=False)
        elif file_extension.lower() in ['xls', 'xlsx']:
            df = pd.read_excel(file_path, engine='openpyxl')
        else:
            logging.warning(f'Unsupported file format for file: {file_path}')
            print(f'Unsupported file format for file: {file_path}')
            return
        
        # Optional: Data Cleaning Steps
        # Example: Convert column names to lowercase and replace spaces with underscores
        df.columns = [col.strip().lower().replace(' ', '_') for col in df.columns]
        
        # Handle missing values if necessary
        # Example: Fill NaNs with zeros for numerical columns
        # df.fillna(0, inplace=True)
        
        # Import into PostgreSQL
        df.to_sql(table_name, engine, if_exists='append', index=False)
        logging.info(f'Successfully imported {os.path.basename(file_path)} into {table_name}.')
        print(f'Successfully imported {os.path.basename(file_path)} into {table_name}.')
    except Exception as e:
        logging.error(f'Error importing {os.path.basename(file_path)} into {table_name}: {e}')
        print(f'Error importing {os.path.basename(file_path)} into {table_name}: {e}')

In [13]:
def main():
    # Check if raw data path exists
    if not os.path.exists(RAW_DATA_PATH):
        logging.error(f'Raw data path does not exist: {RAW_DATA_PATH}')
        raise FileNotFoundError(f'Raw data path does not exist: {RAW_DATA_PATH}')
    
    # Iterate through all files in the raw_data directory
    for filename in os.listdir(RAW_DATA_PATH):
        if filename.endswith('.csv') or filename.endswith('.xlsx'):
            table_name = get_table_name(filename)
            if table_name:
                file_path = os.path.join(RAW_DATA_PATH, filename)
                file_extension = filename.split('.')[-1]
                import_file_to_postgres(file_path, table_name, file_extension)
            else:
                logging.warning(f'No table mapping found for file: {filename}')
                print(f'No table mapping found for file: {filename}')
        else:
            logging.info(f'Skipping unsupported file format: {filename}')
            print(f'Skipping unsupported file format: {filename}')

if __name__ == "__main__":
    main()

FileNotFoundError: Raw data path does not exist: E:\DE_project_LMIA vs jobsaw_data