# Robust ETL Pipeline for Job Data Consolidation

This notebook implements a robust, interactive ETL pipeline. It is designed with defensive programming principles to provide clear feedback and avoid crashing.

## 1. Package Validation

This cell checks if all required packages are installed in your environment. Run this first to ensure the notebook will work correctly.

In [None]:
import sys
import importlib.util
import importlib.metadata
from IPython.display import display, HTML

print("Performing package validation...")

required_packages = {
    'pandas': '1.3.5',
    'numpy': '1.21.5',
    'ipywidgets': '7.6.5'
}

built_in_modules = ['re', 'io', 'functools', 'sys']
all_ok = True

display(HTML("<b>Checking built-in modules:</b>"))
for module in built_in_modules:
    display(HTML(f"&nbsp;&nbsp;✅ <b>{module}</b>: Available (built-in)"))

display(HTML("<b>Checking installed packages:</b>"))
for package, min_version in required_packages.items():
    spec = importlib.util.find_spec(package)
    if spec is not None:
        version = importlib.metadata.version(package)
        display(HTML(f'&nbsp;&nbsp;✅ <b>{package}</b>: Installed (Version: {version})'))
    else:
        all_ok = False
        display(HTML(f' &nbsp;&nbsp;❌ <b>{package}</b>: Not found. Please install it by running: <code>pip install "{package}>={min_version}"</code>'))

if all_ok:
    display(HTML('<b style="color:green;">✅ All required packages are installed.</b>'))
else:
    display(HTML('<b style="color:red;">❌ Some required packages are missing. Please install them before proceeding.</b>'))

## 2. Setup Pipeline

This cell initializes the pipeline's status tracking object and helper functions.

In [None]:
from IPython.display import display, HTML
import pandas as pd
import numpy as np
import io
import re
from functools import reduce
import subprocess

pipeline_status = {
    'setup':        {'status': 'pending'},
    'upload':       {'status': 'pending'},
    'load':         {'status': 'pending', 'data': []},
    'extract':      {'status': 'pending', 'data': []},
    'merge':        {'status': 'pending', 'data': None},
    'completeness': {'status': 'pending', 'data': None},
    'export':       {'status': 'pending'}
}

def print_status(stage, status, message):
    icons = {'success': '✅', 'error': '❌', 'warning': '⚠️', 'info': 'ℹ️'}
    pipeline_status[stage]['status'] = status
    pipeline_status[stage]['message'] = message
    display(HTML(f"<b>{icons.get(status, 'ℹ️')} {stage.capitalize()}:</b> {message}"))

print_status('setup', 'success', 'Pipeline initialized successfully.')

## 3. Upload CSV Files

In [None]:
import sys
import subprocess

def install_package(package):
    print(f"Attempting to install {package}...")
    try:
        subprocess.check_call([sys.executable, "-m", "pip", "install", package], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
        print(f"✅ {package} installed successfully.")
        return True
    except subprocess.CalledProcessError as e:
        print(f"❌ Failed to install {package}. Error: {e}")
        return False

ipywidgets_available = False
uploader = None

try:
    import ipywidgets as widgets
    from IPython.display import display
    ipywidgets_available = True
except ImportError:
    print("⚠️ ipywidgets not found.")
    if install_package("ipywidgets"):
        import importlib
        import ipywidgets as widgets
        importlib.reload(widgets)
        from IPython.display import display
        ipywidgets_available = True

if 'pipeline_status' not in locals():
    display(HTML('<b style=\"color:red;\">❌ Pipeline status not initialized. Please run the Setup cell first.</b>'))
else:
    if ipywidgets_available:
        try:
            uploader = widgets.FileUpload(
                accept='.csv',
                multiple=True,
                description='Upload CSVs',
                button_style='primary'
            )
            display(uploader)
            print_status('upload', 'info', 'Widget ready. Please upload your CSV files.')
        except Exception as e:
            error_message = f"Could not display file upload widget. Error: {e}."
            print_status('upload', 'error', error_message)
    else:
        error_message = "ipywidgets could not be installed or imported. This notebook requires an environment that supports ipywidgets."
        print_status('upload', 'error', error_message)

## 4. Load & Validate Data

In [None]:
dfs = []
loaded_files = []
failed_files = {}

if 'uploader' not in locals() or uploader is None:
    print_status('load', 'error', 'File uploader widget not found. Please run the preceding cells.')
elif not uploader.value:
    print_status('load', 'warning', 'No files uploaded. Please upload at least one CSV file.')
elif 'pipeline_status' not in locals():
    display(HTML('<b style="color:red;">❌ Pipeline status not initialized. Please run the Setup cell first.</b>'))
else:
    for file_info in uploader.value:
        file_name = file_info['name']
        try:
            content = file_info['content']
            df = pd.read_csv(io.BytesIO(content))

            if df.empty:
                raise ValueError("The CSV file is empty or could not be parsed correctly.")

            dfs.append(df)
            loaded_files.append(file_name)
            print(f"✅ Successfully loaded and validated '{file_name}'.")

        except Exception as e:
            failed_files[file_name] = str(e)
            print(f"❌ Failed to load '{file_name}'. Error: {e}")

    success_count = len(loaded_files)
    failure_count = len(failed_files)
    total_count = success_count + failure_count

    summary_message = f"Processed {total_count} file(s): {success_count} succeeded, {failure_count} failed."
    if failure_count > 0:
        print_status('load', 'warning', summary_message + " See details above.")
    elif success_count == 0:
        print_status('load', 'error', "No data was loaded. Please check your uploaded files.")
    else:
        print_status('load', 'success', summary_message)

    pipeline_status['load']['data'] = dfs

## 5. Extract Job ID

In [None]:
processed_dfs = []

if 'dfs' not in locals() or not dfs:
    print_status('extract', 'warning', 'No dataframes to process. Please run the data loading cell first.')
elif 'pipeline_status' not in locals():
    display(HTML('<b style="color:red;">❌ Pipeline status not initialized. Please run the Setup cell first.</b>'))
else:
    def find_url_column(df):
        url_patterns = ['url', 'link', 'href', 'reference']
        for col in df.columns:
            col_lower = col.lower()
            if any(pattern in col_lower for pattern in url_patterns):
                return col
        return None

    def extract_job_id(df, url_column):
        if url_column is None:
            df['job_id'] = None
            return df, 0
        
        df['job_id'] = df[url_column].str.extract(r'(\d+)', expand=False)
        successful_extractions = df['job_id'].notna().sum()
        df['job_id'] = pd.to_numeric(df['job_id'], errors='coerce')
        return df, successful_extractions

    print("Starting Job ID extraction...")
    total_extractions = 0
    total_rows = 0

    for i, df in enumerate(dfs):
        df_copy = df.copy()
        total_rows += len(df_copy)
        try:
            print(f"Processing DataFrame #{i+1}...")
            url_col = find_url_column(df_copy)
            
            if url_col is None:
                print(f"⚠️ Warning: No URL column found in DataFrame #{i+1}. Cannot extract job IDs.")
            else:
                print(f"Found URL column: '{url_col}' in DataFrame #{i+1}.")
            
            df_processed, extracted_count = extract_job_id(df_copy, url_col)
            total_extractions += extracted_count
            
            if url_col:
                df_processed = df_processed.drop(columns=[url_col])
            
            processed_dfs.append(df_processed)
            print(f"✅ Finished processing DataFrame #{i+1}. Extracted {extracted_count} job IDs.")

        except Exception as e:
            print(f"❌ Error processing DataFrame #{i+1}: {e}")
            df_copy['job_id'] = None
            processed_dfs.append(df_copy)

    if not processed_dfs:
        print_status('extract', 'error', 'Job ID extraction failed for all dataframes.')
    else:
        summary_message = f"Extracted {total_extractions} job IDs from {total_rows} total rows across {len(dfs)} dataframe(s)."
        print_status('extract', 'success', summary_message)

    pipeline_status['extract']['data'] = processed_dfs

## 6. Merge Datasets

In [None]:
merged_df = None

if 'processed_dfs' not in locals() or not processed_dfs:
    print_status('merge', 'warning', 'No dataframes to merge. Please run the extraction cell first.')
elif 'pipeline_status' not in locals():
    display(HTML('<b style="color:red;">❌ Pipeline status not initialized. Please run the Setup cell first.</b>'))
else:
    valid_dfs_to_merge = []
    for i, df in enumerate(processed_dfs):
        if 'job_id' not in df.columns:
            print(f"⚠️ Warning: DataFrame #{i+1} is missing the 'job_id' column and will be skipped.")
            continue
        if df['job_id'].isnull().all():
            print(f"⚠️ Warning: DataFrame #{i+1} has no valid 'job_id' values and will be skipped.")
            continue
        valid_dfs_to_merge.append(df)

    if len(valid_dfs_to_merge) == 0:
        print_status('merge', 'error', 'No valid dataframes available to merge.')
    elif len(valid_dfs_to_merge) == 1:
        merged_df = valid_dfs_to_merge[0]
        print_status('merge', 'success', 'Only one valid dataframe found. No merge needed.')
        display(merged_df.head())
    else:
        try:
            print(f"Merging {len(valid_dfs_to_merge)} dataframes on 'job_id'...")
            merged_df = reduce(lambda left, right: pd.merge(left, right, on='job_id', how='outer'), valid_dfs_to_merge)
            
            message = f"Successfully merged {len(valid_dfs_to_merge)} dataframes. Resulting dataset has {merged_df.shape[0]} rows and {merged_df.shape[1]} columns."
            print_status('merge', 'success', message)
            print("Sample of merged data:")
            display(merged_df.head())

        except Exception as e:
            merged_df = None
            error_message = f"An unexpected error occurred during merging: {e}"
            print_status('merge', 'error', error_message)

pipeline_status['merge']['data'] = merged_df

## 7. Data Completeness Test

In [None]:
completeness_report = None

if 'merged_df' not in locals() or merged_df is None:
    print_status('completeness', 'warning', 'No merged dataframe to analyze. Please run the merge cell first.')
elif not isinstance(merged_df, pd.DataFrame) or merged_df.empty:
    print_status('completeness', 'warning', 'The merged dataframe is empty. Nothing to analyze.')
elif 'pipeline_status' not in locals():
    display(HTML('<b style="color:red;">❌ Pipeline status not initialized. Please run the Setup cell first.</b>'))
else:
    try:
        print("Running data completeness analysis...")
        
        df_to_check = merged_df.copy()

        df_to_check.replace(r'^\s*$', np.nan, regex=True, inplace=True)
        
        missing_values = df_to_check.isnull().sum()
        total_rows = len(df_to_check)
        completeness_percentage = ((total_rows - missing_values) / total_rows) * 100
        
        completeness_report = pd.DataFrame({
            'Missing Values': missing_values,
            'Completeness (%)': completeness_percentage
        })
        
        completeness_report.sort_values(by='Completeness (%)', ascending=True, inplace=True)
        
        print("Completeness Report (Problematic columns first):")
        display(completeness_report)
        
        avg_completeness = completeness_report['Completeness (%)'].mean()
        min_completeness = completeness_report['Completeness (%)'].min()
        print(f"\nSummary Statistics:\n- Average Column Completeness: {avg_completeness:.2f}%\n- Minimum Column Completeness: {min_completeness:.2f}%")

        try:
            print("\n--- Markdown Summary ---")
            print(completeness_report.to_markdown())
        except Exception as md_e:
            print(f"⚠️ Could not generate Markdown report: {md_e}")

        print_status('completeness', 'success', f"Analysis complete. Average column completeness is {avg_completeness:.2f}%.")

    except Exception as e:
        error_message = f"An unexpected error occurred during completeness analysis: {e}"
        print_status('completeness', 'error', error_message)

pipeline_status['completeness']['data'] = completeness_report

## 8. Export Final Dataset

In [None]:
if 'merged_df' not in locals() or not isinstance(merged_df, pd.DataFrame) or merged_df.empty:
    print_status('export', 'warning', 'No valid data to export. Skipping export.')
elif 'pipeline_status' not in locals():
    display(HTML('<b style="color:red;">❌ Pipeline status not initialized. Please run the Setup cell first.</b>'))
else:
    try:
        export_filename = 'consolidated_jobs.csv'
        merged_df.to_csv(export_filename, index=False)
        print_status('export', 'success', f'Successfully exported the consolidated dataset to <code>{export_filename}</code>.')
    except Exception as e:
        error_message = f"An unexpected error occurred during file export: {e}"
        print_status('export', 'error', error_message)

## 9. Pipeline Summary

In [None]:
display(HTML("<h2>Pipeline Execution Summary</h2>"))

if 'pipeline_status' not in locals():
    display(HTML('<b style="color:red;">❌ Pipeline status not initialized. Please run the Setup cell first.</b>'))
else:
    for stage, info in pipeline_status.items():
        status = info.get('status', 'pending')
        message = info.get('message', 'No message recorded.')
        icons = {'success': '✅', 'error': '❌', 'warning': '⚠️', 'info': 'ℹ️', 'pending': '⚪'}
        icon = icons.get(status, 'ℹ️')
        
        troubleshooting_tips = {
            'upload': 'If this failed, your browser environment might not support ipywidgets, or the installation failed.',
            'load': 'If this failed, check that you uploaded valid, non-empty CSV files.',
            'extract': 'If this failed, ensure your files contain a column with \"url\" or \"link\" in the name, and that they contain numeric job IDs.',
            'merge': 'If this failed, it might be because no dataframes had a valid \"job_id\" column to join on.',
            'completeness': 'A failure here is likely due to an issue with the merged dataframe.',
            'export': 'A failure here could be due to file system permission issues.'
        }
        
        status_html = f"<b>{icon} {stage.capitalize()}:</b> [{status.upper()}] {message}"
        if status in ['error', 'warning']:
            status_html += f"<br>&nbsp;&nbsp;<i><b>Tip:</b> {troubleshooting_tips.get(stage, 'Check previous cells for errors.')}</i>"
        
        display(HTML(status_html))

    display(HTML("<h3>Variable Availability for Debugging</h3>"))
    variables_to_check = ['uploader', 'dfs', 'processed_dfs', 'merged_df', 'completeness_report']
    for var in variables_to_check:
        if var in locals() and locals()[var] is not None:
            if isinstance(locals()[var], (list, pd.DataFrame)) and not len(locals()[var]) == 0:
                 display(HTML(f"✅ <code>{var}</code> is available."))
            elif not isinstance(locals()[var], (list, pd.DataFrame)):
                 display(HTML(f"✅ <code>{var}</code> is available."))
            else:
                 display(HTML(f"⚠️ <code>{var}</code> is available but it is empty."))
        else:
            display(HTML(f"❌ <code>{var}</code> is not available or is None."))