<a href="https://colab.research.google.com/github/ayoubelhariri/hala/blob/main/CollabWORK_Mapping.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**INTAKE:** CHECK THE FILE

In [None]:
"""
This script defines functions to identify appropriate parsers for files/URLs and to process inputs,
and then checks if the file is compressed and unzips it.
"""
# Added a pass statement to avoid 'incomplete input' SyntaxError
pass

import os
import argparse
import urllib.parse
import requests
import sys # Import sys to access command line arguments
import zipfile # Added for unzip functionality
import tarfile # Added for unzip functionality
import gzip # Added for unzip functionality
import shutil # Added for unzip functionality


def get_file_type(file_path):
    """Identifies the file type for parsing or unzipping."""
    # Handle complex extensions like .tar.gz
    if file_path.lower().endswith('.tar.gz'):
        return '.tar.gz'
    _, extension = os.path.splitext(file_path)
    return extension.lower()

def unzip_file_if_needed(file_path, extract_to_dir='.'):
    """
    Checks if a file is a compressed archive and extracts it if it is.

    Args:
        file_path (str): The path to the file to check and potentially extract.
        extract_to_dir (str): The directory where the contents should be extracted.
                              Defaults to the current directory.
    """
    if not os.path.exists(file_path):
        print(f"Error: File '{file_path}' not found.")
        return

    file_type = get_file_type(file_path)
    archive_types = ['.zip', '.tar', '.gz', '.tar.gz', '.tgz']

    if file_type not in archive_types:
        print(f"'{os.path.basename(file_path)}' is not a recognized compressed file. No action taken.")
        return None # Not an archive

    file_name = os.path.basename(file_path)
    print(f"Archive detected: '{file_name}'. Extracting...")

    # Create a unique extraction folder name from the archive file name
    extraction_folder_name = file_name.replace('.tar.gz', '').replace('.zip', '').replace('.tgz', '').replace('.gz', '').replace('.tar', '')
    extraction_path = os.path.join(extract_to_dir, extraction_folder_name)

    if not os.path.exists(extraction_path):
        os.makedirs(extraction_path)

    try:
        if file_type == '.zip':
            with zipfile.ZipFile(file_path, 'r') as zip_ref:
                zip_ref.extractall(extraction_path)
        elif file_type in ['.tar', '.tar.gz', '.tgz']:
            with tarfile.open(file_path, 'r:*') as tar_ref:
                tar_ref.extractall(path=extraction_path)
        elif file_type == '.gz':
            output_filename = os.path.join(extraction_path, os.path.splitext(file_name)[0])
            with gzip.open(file_path, 'rb') as f_in:
                with open(output_filename, 'wb') as f_out:
                    shutil.copyfileobj(f_in, f_out)

        print(f"Successfully extracted to '{extraction_path}'")
        return extraction_path
    except (zipfile.BadZipFile, tarfile.ReadError, IOError) as e:
        print(f"Error during extraction of '{file_name}': {e}")
        return None


def get_parser_for_file(file_path):
    """
    Identifies the appropriate Python parser/library for a given file path or URL path.

    Args:
        file_path (str): The full path to the file or the path component of a URL.

    Returns:
        tuple: A tuple containing the parser name (str) and the file extension (str).
               Returns (None, None) if the input is invalid.
    """
    try:
        # Get the file extension from the path. The second element of the splitext result is the extension.
        # We use lower() to make the comparison case-insensitive (e.g., .JPG vs .jpg).
        _, file_extension = os.path.splitext(file_path)
        file_extension = file_extension.lower()
    except (TypeError, AttributeError):
        return None, None

    # A dictionary mapping common file extensions to their recommended Python parsers/libraries.
    parser_mapping = {
        # Data Formats
        '.csv': 'csv library (built-in)',
        '.json': 'json library (built-in)',
        '.xml': 'xml.etree.ElementTree (built-in) or lxml library',
        '.yaml': 'PyYAML library',
        '.yml': 'PyYAML library',
        '.ini': 'configparser library (built-in)',
        '.conf': 'configparser library (built-in)',
        '.xls': 'xlrd library or pandas library',
        '.xlsx': 'openpyxl library or pandas library',

        # Document Formats
        '.txt': 'Standard file I/O (open() function)',
        '.md': 'markdown library or mistune library',
        '.pdf': 'PyPDF2 library or pdfplumber library',
        '.docx': 'python-docx library',
        '.doc': 'python-docx library (may have limitations with older .doc formats)',
        '.rtf': 'striprtf library',

        # Image Formats (for metadata or processing)
        '.jpg': 'Pillow (PIL) library',
        '.jpeg': 'Pillow (PIL) library',
        '.png': 'Pillow (PIL) library',
        '.gif': 'Pillow (PIL) library',
        '.bmp': 'Pillow (PIL) library',
        '.tiff': 'Pillow (PIL) library',

        # Compressed Files
        '.zip': 'zipfile library (built-in)',
        '.tar': 'tarfile library (built-in)',
        '.gz': 'gzip library (built-in)',
        '.tar.gz': 'tarfile library (built-in)', # Added for consistency with unzip
        '.tgz': 'tarfile library (built-in)',    # Added for consistency with unzip
    }

    # Look up the extension in the mapping.
    parser = parser_mapping.get(file_extension, f"Unsupported file type")
    return parser, file_extension

def process_input(input_path):
    """
    Processes the input, whether it's a local file path or a URL,
    and then checks and unzips the file if needed.
    """
    downloaded_file_path = None # To store the path of the downloaded file if it's a URL

    # Check if the input is a URL
    if input_path.startswith('http://') or input_path.startswith('https://'):
        print(f"URL detected: {input_path}")
        parsed_url = urllib.parse.urlparse(input_path)
        path = parsed_url.path
        filename = os.path.basename(path)
        downloaded_file_path = filename # Assuming download saves to current dir

        parser, extension = get_parser_for_file(path)
        print(f"-> File '{filename}' needs parser: {parser}")

        # Attempt to download the file
        print(f"-> Attempting to download '{filename}'...")
        try:
            response = requests.get(input_path, stream=True)
            response.raise_for_status()

            with open(filename, 'wb') as f:
                for chunk in response.iter_content(chunk_size=8192):
                    f.write(chunk)
            print(f"-> Successfully downloaded and saved as '{filename}' in the current directory.")

        except requests.exceptions.RequestException as e:
            print(f"-> Error: Could not download the file. {e}")
            return # Exit if download fails

    else:
        # Treat as a local file path
        print(f"Local file path detected: {input_path}")
        filename = os.path.basename(input_path)
        downloaded_file_path = input_path # Use the provided path for local files

        parser, extension = get_parser_for_file(input_path)
        print(f"-> File '{filename}' needs parser: {parser}")

    # --- Integrate the UNZIP step here ---
    if downloaded_file_path and os.path.exists(downloaded_file_path):
        print("\n--- Checking for compression ---")
        unzip_file_if_needed(downloaded_file_path, extract_to_dir='.') # Extract to current directory or specify
        # Note: unzip_file_if_needed returns the extraction path or None.
        # You might want to capture this return value for the next steps.

**CONDITIONAL:** UNZIP IF FILE TYPE IS COMPRESSED.

In [None]:
import os
import zipfile
import tarfile
import gzip
import shutil

# To make this script runnable, we'll include the function from INTAKE.py.
# In a real project, you would use: from INTAKE import get_parser_for_file
def get_parser_for_file(file_path):
    """
    Identifies the appropriate Python parser/library for a given file based on its extension.
    """
    try:
        # Handle complex extensions like .tar.gz
        if file_path.lower().endswith('.tar.gz'):
            file_extension = '.tar.gz'
        else:
            _, file_extension = os.path.splitext(file_path)
            file_extension = file_extension.lower()
    except (TypeError, AttributeError):
        return "Invalid file path provided."

    parser_mapping = {
        '.zip': 'zipfile library (built-in)',
        '.tar': 'tarfile library (built-in)',
        '.gz': 'gzip library (built-in)',
        '.tar.gz': 'tarfile library (built-in)',
        '.tgz': 'tarfile library (built-in)',  # Added .tgz support
    }
    parser = parser_mapping.get(file_extension, "Not a standard compressed file.")
    return parser

def unzip_file_if_needed(file_path, extract_to_dir='.'):
    """
    Checks if a file is a compressed archive and extracts it if it is.

    Args:
        file_path (str): The path to the file to check and potentially extract.
        extract_to_dir (str): The directory where the contents should be extracted.
                              Defaults to the current directory.
    """
    if not os.path.exists(file_path):
        print(f"Error: File '{file_path}' not found.")
        return

    parser_info = get_parser_for_file(file_path)
    file_name = os.path.basename(file_path)
    # Create a destination directory name from the archive file name
    extraction_folder_name = file_name.replace('.tar.gz', '').replace('.zip', '').replace('.tgz', '').replace('.gz', '')
    extraction_folder = os.path.join(extract_to_dir, extraction_folder_name)


    try:
        if 'zipfile' in parser_info:
            print(f"'{file_name}' is a zip file. Extracting...")
            if not os.path.exists(extraction_folder):
                os.makedirs(extraction_folder)
            with zipfile.ZipFile(file_path, 'r') as zip_ref:
                zip_ref.extractall(extraction_folder)
            print(f"Successfully extracted to '{extraction_folder}'")

        elif 'tarfile' in parser_info:
            print(f"'{file_name}' is a tar archive. Extracting...")
            if not os.path.exists(extraction_folder):
                os.makedirs(extraction_folder)
            # 'r:*' automatically handles compression like gzip
            with tarfile.open(file_path, 'r:*') as tar_ref:
                tar_ref.extractall(path=extraction_folder)
            print(f"Successfully extracted to '{extraction_folder}'")

        elif 'gzip' in parser_info:
            print(f"'{file_name}' is a gzip file. Decompressing...")
            # Gzip typically compresses a single file, so we extract it directly
            output_filename = os.path.join(extract_to_dir, os.path.splitext(file_name)[0])
            if not os.path.exists(extract_to_dir):
                os.makedirs(extract_to_dir)
            with gzip.open(file_path, 'rb') as f_in:
                with open(output_filename, 'wb') as f_out:
                    shutil.copyfileobj(f_in, f_out)
            print(f"Successfully decompressed to '{output_filename}'")

        else:
            print(f"'{file_name}' is not a recognized compressed file. No action taken.")

    except (zipfile.BadZipFile, tarfile.ReadError, IOError) as e:
        print(f"An error occurred during extraction of '{file_name}': {e}")


# --- Main execution block to demonstrate the function ---
if __name__ == "__main__":
    print("--- Setting up test files ---")

    # Create a dummy directory for our test files
    test_dir = "test_archives"
    if not os.path.exists(test_dir):
        os.makedirs(test_dir)

    # Create a dummy file to be compressed
    dummy_file_path = os.path.join(test_dir, "sample_text.txt")
    with open(dummy_file_path, "w") as f:
        f.write("This is a test file for the unzipping script.\n")
        f.write("It will be placed inside various archives.\n")

    # 1. Create a dummy ZIP file
    zip_path = os.path.join(test_dir, "my_archive.zip")
    with zipfile.ZipFile(zip_path, 'w') as zf:
        zf.write(dummy_file_path, os.path.basename(dummy_file_path))
    print(f"Created dummy zip: {zip_path}")

    # 2. Create a dummy TAR.GZ file
    tar_path = os.path.join(test_dir, "my_archive.tar.gz")
    with tarfile.open(tar_path, "w:gz") as tar:
        tar.add(dummy_file_path, arcname=os.path.basename(dummy_file_path))
    print(f"Created dummy tar.gz: {tar_path}")

    # 3. Create a dummy GZ file
    gz_path = os.path.join(test_dir, "another_file.txt.gz")
    with open(dummy_file_path, 'rb') as f_in:
        with gzip.open(gz_path, 'wb') as f_out:
            shutil.copyfileobj(f_in, f_out)
    print(f"Created dummy gz: {gz_path}")

    # 4. Create a dummy TGZ file
    tgz_path = os.path.join(test_dir, "another_archive.tgz")
    with tarfile.open(tgz_path, "w:gz") as tar:
        tar.add(dummy_file_path, arcname=os.path.basename(dummy_file_path))
    print(f"Created dummy tgz: {tgz_path}")

    # A non-compressed file for testing
    non_zip_path = dummy_file_path

    print("\n--- Running UNZIP process ---")

    # List of files to process
    files_to_process = [zip_path, tar_path, gz_path, tgz_path, non_zip_path]

    for f in files_to_process:
        unzip_file_if_needed(f, "extracted_files")
        print("-" * 20)

    # To see the results, the cleanup is commented out.
    # You can manually delete the 'test_archives' and 'extracted_files' directories.
    # shutil.rmtree(test_dir)
    # shutil.rmtree("extracted_files")
    # print("\n--- Cleaned up test files and directories ---")



--- Setting up test files ---
Created dummy zip: test_archives/my_archive.zip
Created dummy tar.gz: test_archives/my_archive.tar.gz
Created dummy gz: test_archives/another_file.txt.gz
Created dummy tgz: test_archives/another_archive.tgz

--- Running UNZIP process ---
'my_archive.zip' is a zip file. Extracting...
Successfully extracted to 'extracted_files/my_archive'
--------------------
'my_archive.tar.gz' is a tar archive. Extracting...
Successfully extracted to 'extracted_files/my_archive'
--------------------
'another_file.txt.gz' is a gzip file. Decompressing...
Successfully decompressed to 'extracted_files/another_file.txt'
--------------------
'another_archive.tgz' is a tar archive. Extracting...
Successfully extracted to 'extracted_files/another_archive'
--------------------
'sample_text.txt' is not a recognized compressed file. No action taken.
--------------------


**PARSE** THE FILE CONTENT TO JSON.

In [None]:
import os
import json
import csv
import configparser
import xml.etree.ElementTree as ET
import zipfile
import tarfile
import gzip
import shutil

# --- Functions from previous scripts (INTAKE.py and UNZIP.py) ---
# In a real project, these would be imported from their respective files.

def get_file_type(file_path):
    """Identifies the file type for parsing or unzipping."""
    if file_path.lower().endswith('.tar.gz'):
        return '.tar.gz'
    _, extension = os.path.splitext(file_path)
    return extension.lower()

def unzip_file_if_needed(file_path, extract_to_dir):
    """Checks if a file is a compressed archive and extracts it."""
    file_type = get_file_type(file_path)
    archive_types = ['.zip', '.tar', '.gz', '.tar.gz', '.tgz']

    if file_type not in archive_types:
        return None # Not an archive

    file_name = os.path.basename(file_path)
    print(f"Archive detected: '{file_name}'. Extracting...")

    # Create a unique extraction folder to avoid conflicts
    extraction_folder_name = file_name.replace('.tar.gz', '').replace('.zip', '').replace('.tgz', '').replace('.gz', '').replace('.tar', '')
    extraction_path = os.path.join(extract_to_dir, extraction_folder_name)

    if not os.path.exists(extraction_path):
        os.makedirs(extraction_path)

    try:
        if file_type == '.zip':
            with zipfile.ZipFile(file_path, 'r') as zip_ref:
                zip_ref.extractall(extraction_path)
        elif file_type in ['.tar', '.tar.gz', '.tgz']:
            with tarfile.open(file_path, 'r:*') as tar_ref:
                tar_ref.extractall(path=extraction_path)
        elif file_type == '.gz':
            output_filename = os.path.join(extraction_path, os.path.splitext(file_name)[0])
            with gzip.open(file_path, 'rb') as f_in:
                with open(output_filename, 'wb') as f_out:
                    shutil.copyfileobj(f_in, f_out)

        print(f"Successfully extracted to '{extraction_path}'")
        return extraction_path
    except (zipfile.BadZipFile, tarfile.ReadError, IOError) as e:
        print(f"Error during extraction of '{file_name}': {e}")
        return None

# --- Core Parsing Logic ---

def parse_file_to_json(file_path):
    """
    Parses a single file (CSV, INI, XML, JSON) and returns its content as a Python dictionary.
    """
    file_type = get_file_type(file_path)
    file_name = os.path.basename(file_path)

    print(f"Parsing '{file_name}'...")

    try:
        if file_type == '.csv':
            with open(file_path, mode='r', encoding='utf-8') as csv_file:
                # Use DictReader to automatically use the header row for keys
                reader = csv.DictReader(csv_file)
                return [row for row in reader]

        elif file_type == '.ini':
            config = configparser.ConfigParser()
            config.read(file_path)
            # Convert the config object to a nested dictionary
            return {section: dict(config.items(section)) for section in config.sections()}

        elif file_type == '.xml':
            # A basic XML to dict converter. May not handle all complex cases (e.g., attributes).
            def element_to_dict(element):
                node = {}
                for child in element:
                    if child.tag not in node:
                        node[child.tag] = element_to_dict(child) if len(child) > 0 else child.text
                    else:
                        # Handle multiple elements with the same tag
                        if not isinstance(node[child.tag], list):
                            node[child.tag] = [node[child.tag]]
                        node[child.tag].append(element_to_dict(child) if len(child) > 0 else child.text)
                return node

            tree = ET.parse(file_path)
            root = tree.getroot()
            return {root.tag: element_to_dict(root)}

        elif file_type == '.json':
            with open(file_path, 'r', encoding='utf-8') as f:
                return json.load(f)

        else:
            print(f"Unsupported file type for parsing: {file_type}")
            return None

    except Exception as e:
        print(f"Could not parse '{file_name}'. Error: {e}")
        return None

# --- Main execution block ---

if __name__ == "__main__":
    # 1. Setup a test environment
    base_dir = "parsing_test"
    extracted_dir = os.path.join(base_dir, "extracted")
    if os.path.exists(base_dir):
        shutil.rmtree(base_dir)
    os.makedirs(extracted_dir)

    # 2. Create sample data files
    # CSV
    csv_path = os.path.join(base_dir, "users.csv")
    with open(csv_path, "w", newline='') as f:
        f.write("id,name,email\n")
        f.write("1,Alice,alice@example.com\n")
        f.write("2,Bob,bob@example.com\n")

    # INI
    ini_path = os.path.join(base_dir, "settings.ini")
    with open(ini_path, "w") as f:
        f.write("[database]\n")
        f.write("host = localhost\n")
        f.write("port = 5432\n")
        f.write("[user]\n")
        f.write("theme = dark\n")

    # XML
    xml_path = os.path.join(base_dir, "data.xml")
    with open(xml_path, "w") as f:
        f.write("<data><item><id>1</id><name>Laptop</name></item><item><id>2</id><name>Mouse</name></item></data>")

    # Create a zip file containing another data file
    json_for_zip_path = os.path.join(base_dir, "product.json")
    with open(json_for_zip_path, 'w') as f:
        json.dump({"product_id": "xyz-123", "stock": 42}, f)

    zip_path = os.path.join(base_dir, "archive.zip")
    with zipfile.ZipFile(zip_path, 'w') as zf:
        zf.write(json_for_zip_path, os.path.basename(json_for_zip_path))

    # 3. Process the files
    files_to_process = [csv_path, ini_path, xml_path, zip_path]
    all_parsed_data = {}

    for path in files_to_process:
        # Check if it's an archive and extract it
        extraction_path = unzip_file_if_needed(path, extracted_dir)

        if extraction_path:
            # If it was an archive, parse the files inside it
            for root, _, files in os.walk(extraction_path):
                for name in files:
                    file_to_parse = os.path.join(root, name)
                    parsed_data = parse_file_to_json(file_to_parse)
                    if parsed_data:
                        all_parsed_data[name] = parsed_data
        else:
            # Otherwise, parse the file directly
            parsed_data = parse_file_to_json(path)
            if parsed_data:
                all_parsed_data[os.path.basename(path)] = parsed_data

    # 4. Print the final combined JSON output
    print("\n\n--- COMBINED PARSED DATA (JSON) ---")
    # Use indent for pretty-printing
    final_json = json.dumps(all_parsed_data, indent=4)
    print(final_json)

    # 5. Save the final JSON to a file
    output_json_path = os.path.join(base_dir, "output.json")
    with open(output_json_path, 'w') as f:
        f.write(final_json)
    print(f"\n--- Saved combined JSON to '{output_json_path}' ---")

Parsing 'users.csv'...
Parsing 'settings.ini'...
Parsing 'data.xml'...
Archive detected: 'archive.zip'. Extracting...
Successfully extracted to 'parsing_test/extracted/archive'
Parsing 'product.json'...


--- COMBINED PARSED DATA (JSON) ---
{
    "users.csv": [
        {
            "id": "1",
            "name": "Alice",
            "email": "alice@example.com"
        },
        {
            "id": "2",
            "name": "Bob",
            "email": "bob@example.com"
        }
    ],
    "settings.ini": {
        "database": {
            "host": "localhost",
            "port": "5432"
        },
        "user": {
            "theme": "dark"
        }
    },
    "data.xml": {
        "data": {
            "item": [
                {
                    "id": "1",
                    "name": "Laptop"
                },
                {
                    "id": "2",
                    "name": "Mouse"
                }
            ]
        }
    },
    "product.json": {
        "pr

**SCHEMA CHECK:** CHECK TO SEE IF THE SCHEMA MATCHES.

In [None]:
import json
from datetime import datetime

# 1. DEFINE YOUR INTERNAL, CANONICAL SCHEMA
# This is the target schema that all data must conform to after transformation.
TARGET_SCHEMA = {
    "external_job_id": {"type": str, "required": True},
    "job_source": {"type": str, "required": True, "allowed_values": ["COMPANY_WEBSITE", "JOB_FEED"]},
    "feed_id": {"type": int, "required": False, "nullable": True},
    "created_at": {"type": "datetime", "required": True},
    "updated_at": {"type": "datetime", "required": True},
    "posted_at": {"type": "datetime", "required": True},
    "expires_at": {"type": "datetime", "required": False, "nullable": True},
    "status": {"type": str, "required": True},
    "company_name": {"type": str, "required": True},
    "title": {"type": str, "required": True},
    "description": {"type": str, "required": True},
    "application_url": {"type": str, "required": False, "nullable": True},
    "employment_type": {"type": str, "required": False, "nullable": True},
    "is_remote": {"type": bool, "required": True},
    "is_multi_location": {"type": bool, "required": True},
    "is_international": {"type": bool, "required": True},
    "locations": {"type": list, "required": False, "nullable": True},
    "salary_min": {"type": (int, float), "required": False, "nullable": True},
    "salary_max": {"type": (int, float), "required": False, "nullable": True},
    "salary_period": {"type": str, "required": False, "nullable": True},
    "currency": {"type": str, "required": False, "nullable": True},
}

# 2. DEFINE THE EXPANDED, COMPREHENSIVE MAPPING FROM VARIOUS FEED SCHEMAS
# Maps a huge variety of possible feed field names (key) to your internal field name (value).
FEED_SCHEMA_MAPPING = {
    # --- Company Name Mappings ---
    'company': 'company_name',
    'company_name': 'company_name',
    'companyName': 'company_name',
    'hiring_organization': 'company_name',
    'hiringOrganization': 'company_name',
    'employer': 'company_name',

    # --- Description Mappings ---
    'body': 'description',
    'description': 'description',
    'jobDescription': 'description',
    'job_description': 'description',
    'full_description': 'description',
    'details': 'description',
    'job_details': 'description',

    # --- Posting Date Mappings ---
    'date': 'posted_at',
    'posted_at': 'posted_at',
    'datePosted': 'posted_at',
    'date_posted': 'posted_at',
    'publication_date': 'posted_at',
    'post_date': 'posted_at',

    # --- Application URL Mappings ---
    'url': 'application_url',
    'job_url': 'application_url',
    'applyLink': 'application_url',
    'application_link': 'application_url',
    'apply_url': 'application_url',
    'link': 'application_url',

    # --- Job Title Mappings ---
    'title': 'title',
    'jobTitle': 'title',
    'job_title': 'title',
    'position_title': 'title',
    'position': 'title',
    'role': 'title',

    # --- Location Mappings ---
    'location': 'locations',
    'jobLocations': 'locations',
    'job_location': 'locations',
    'address': 'locations',
    'work_location': 'locations',
    'city_state': 'locations',
    'city': 'locations',
    'state': 'locations',
    'country': 'locations',

    # --- Employment Type Mappings ---
    'job-type': 'employment_type',
    'job_type': 'employment_type',
    'jobType': 'employment_type',
    'type': 'employment_type',
    'position_type': 'employment_type',
    'contract_type': 'employment_type',
    'employmentType': 'employment_type',

    # --- External ID Mappings ---
    'referencenumber': 'external_job_id',
    'ref_id': 'external_job_id',
    'jobID': 'external_job_id',
    'job_id': 'external_job_id',
    'reference_id': 'external_job_id',
    'requisition_id': 'external_job_id',
    'job_reference': 'external_job_id',

    # --- Remote Flag Mapping ---
    'remote': 'is_remote',
    'is_remote': 'is_remote',
    'isRemote': 'is_remote',

    # --- Salary Mappings ---
    'salary_min': 'salary_min',
    'min_salary': 'salary_min',
    'minimum_salary': 'salary_min',
    'salary_from': 'salary_min',
    'salary_max': 'salary_max',
    'max_salary': 'salary_max',
    'maximum_salary': 'salary_max',
    'salary_to': 'salary_max',
    'salary_period': 'salary_period',
    'salary_frequency': 'salary_period',
    'pay_period': 'salary_period',
    'currency': 'currency',
    'salary_currency': 'currency',
}

def transform_job_data(raw_data, mapping):
    """
    Transforms raw data from a feed into our internal schema format using a mapping.
    """
    transformed_data = {}
    for raw_key, raw_value in raw_data.items():
        # If the key is in our mapping, use the mapped key.
        # Otherwise, use the original key (for fields that already match).
        target_key = mapping.get(raw_key, raw_key)

        # Only include fields that are part of our target schema
        if target_key in TARGET_SCHEMA:
            transformed_data[target_key] = raw_value

    return transformed_data

def validate_datetime_string(dt_string):
    """Checks if a string is a valid ISO 8601 format."""
    try:
        datetime.fromisoformat(dt_string.replace('Z', '+00:00'))
        return True
    except (ValueError, TypeError):
        return False

def check_schema(job_data):
    """
    Validates a transformed job data dictionary against the TARGET_SCHEMA.
    """
    errors = []
    for field, rules in TARGET_SCHEMA.items():
        if rules.get("required") and field not in job_data:
            errors.append(f"Missing required field: '{field}'")
    if errors: return False, errors

    for field, value in job_data.items():
        if field in TARGET_SCHEMA:
            rules = TARGET_SCHEMA[field]
            expected_type = rules["type"]
            if value is None:
                if not rules.get("nullable"):
                    errors.append(f"Field '{field}' cannot be null.")
                continue
            if expected_type == "datetime":
                if not isinstance(value, str) or not validate_datetime_string(value):
                    errors.append(f"Field '{field}' is not a valid ISO datetime string. Got: {value}")
                continue
            if not isinstance(value, expected_type):
                errors.append(f"Field '{field}' has incorrect type. Expected {expected_type}, got {type(value)}.")
            if "allowed_values" in rules and value not in rules["allowed_values"]:
                errors.append(f"Field '{field}' has value '{value}', but only {rules['allowed_values']} are allowed.")

    if job_data.get("job_source") == "JOB_FEED" and job_data.get("feed_id") is None:
        errors.append("Conditional error: 'feed_id' is required when 'job_source' is 'JOB_FEED'.")
    if job_data.get("job_source") == "COMPANY_WEBSITE" and job_data.get("feed_id") is not None:
        errors.append("Conditional error: 'feed_id' must be null when 'job_source' is 'COMPANY_WEBSITE'.")

    return not errors, errors


# --- Main execution block to demonstrate the full process ---
if __name__ == "__main__":
    # Example of a raw job data object from a feed using varied field names
    raw_job_from_feed = {
        "requisition_id": "feed-xyz-123", # Mapped to external_job_id
        "position_title": "Lead DevOps Engineer", # Mapped to title
        "job_details": "Manage our cloud infrastructure on AWS.", # Mapped to description
        "hiring_organization": "CloudScale Inc.", # Mapped to company_name
        "publication_date": "2023-10-28T12:00:00Z", # Mapped to posted_at
        "apply_url": "https://cloudscale.jobs/apply/xyz-123", # Mapped to application_url
        "employmentType": "FULL_TIME", # Mapped to employment_type
        "isRemote": True, # Mapped to is_remote
        "address": [{"city": "Remote"}], # Mapped to 'locations'
        "feed_source_name": "Generic Job Feed", # This extra field will be ignored
        "cpc": "0.50" # This extra field will also be ignored
    }

    print("--- 1. RAW JOB DATA FROM FEED ---")
    print(json.dumps(raw_job_from_feed, indent=2))

    # 1. Transform the raw data using the mapping
    transformed_job = transform_job_data(raw_job_from_feed, FEED_SCHEMA_MAPPING)

    print("\n--- 2. TRANSFORMED JOB DATA (ready for validation) ---")
    print(json.dumps(transformed_job, indent=2))

    # 2. Add required fields that are not in the feed (e.g., metadata)
    # This data would come from the context of the running job.
    transformed_job['job_source'] = 'JOB_FEED'
    transformed_job['feed_id'] = 101
    transformed_job['status'] = 'ACTIVE'
    transformed_job['created_at'] = datetime.utcnow().isoformat() + 'Z'
    transformed_job['updated_at'] = datetime.utcnow().isoformat() + 'Z'
    # These boolean fields might need to be inferred or set to defaults
    transformed_job['is_multi_location'] = False
    transformed_job['is_international'] = False

    print("\n--- 3. FINAL JOB OBJECT (after adding metadata) ---")
    print(json.dumps(transformed_job, indent=2))

    # 3. Validate the final, transformed data object
    is_valid, validation_errors = check_schema(transformed_job)

    print("\n--- 4. VALIDATION RESULT ---")
    if is_valid:
        print("Result: VALID")
        print("This job object is clean, validated, and ready to be processed further.")
    else:
        print(f"Result: INVALID. Errors:\n{json.dumps(validation_errors, indent=2)}")



--- 1. RAW JOB DATA FROM FEED ---
{
  "requisition_id": "feed-xyz-123",
  "position_title": "Lead DevOps Engineer",
  "job_details": "Manage our cloud infrastructure on AWS.",
  "hiring_organization": "CloudScale Inc.",
  "publication_date": "2023-10-28T12:00:00Z",
  "apply_url": "https://cloudscale.jobs/apply/xyz-123",
  "employmentType": "FULL_TIME",
  "isRemote": true,
  "address": [
    {
      "city": "Remote"
    }
  ],
  "feed_source_name": "Generic Job Feed",
  "cpc": "0.50"
}

--- 2. TRANSFORMED JOB DATA (ready for validation) ---
{
  "external_job_id": "feed-xyz-123",
  "title": "Lead DevOps Engineer",
  "description": "Manage our cloud infrastructure on AWS.",
  "company_name": "CloudScale Inc.",
  "posted_at": "2023-10-28T12:00:00Z",
  "application_url": "https://cloudscale.jobs/apply/xyz-123",
  "employment_type": "FULL_TIME",
  "is_remote": true,
  "locations": [
    {
      "city": "Remote"
    }
  ]
}

--- 3. FINAL JOB OBJECT (after adding metadata) ---
{
  "external_jo

**EXTRACTION:** MOVE JOB INTO SUPABASE

In [None]:
import json
from datetime import datetime
import os

# Assume you have supabase-py installed: pip install supabase
# from supabase import create_client, Client

# --- Mock Supabase Client for Demonstration ---
# In a real application, you would initialize a real Supabase client like this:
# SUPABASE_URL = os.environ.get("SUPABASE_URL")
# SUPABASE_KEY = os.environ.get("SUPABASE_KEY")
# supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)

class MockSupabaseClient:
    """A mock client to simulate Supabase interactions for demonstration."""
    def table(self, table_name):
        print(f"[Supabase] Accessing table: '{table_name}'")
        return self

    def insert(self, data):
        print(f"[Supabase] Inserting data...")
        # Pretty-print the data that would be sent to Supabase
        print(json.dumps(data, indent=2))
        # Simulate a successful response
        class MockResponse:
            data = [{"status": "inserted"}]
        return MockResponse()

# Initialize the mock client
supabase = MockSupabaseClient()

# --- Schema Definition and Validation (from SCHEMA_CHECK.py) ---

TARGET_SCHEMA = {
    "external_job_id": {"type": str, "required": True},
    "job_source": {"type": str, "required": True, "allowed_values": ["COMPANY_WEBSITE", "JOB_FEED"]},
    "feed_id": {"type": int, "required": False, "nullable": True},
    "created_at": {"type": "datetime", "required": True},
    "updated_at": {"type": "datetime", "required": True},
    "posted_at": {"type": "datetime", "required": True},
    "expires_at": {"type": "datetime", "required": False, "nullable": True},
    "status": {"type": str, "required": True},
    "company_name": {"type": str, "required": True},
    "title": {"type": str, "required": True},
    "description": {"type": str, "required": True},
    "application_url": {"type": str, "required": False, "nullable": True},
    "employment_type": {"type": str, "required": False, "nullable": True},
    "is_remote": {"type": bool, "required": True},
    "is_multi_location": {"type": bool, "required": True},
    "is_international": {"type": bool, "required": True},
    "locations": {"type": list, "required": False, "nullable": True},
    "salary_min": {"type": (int, float), "required": False, "nullable": True},
    "salary_max": {"type": (int, float), "required": False, "nullable": True},
    "salary_period": {"type": str, "required": False, "nullable": True},
    "currency": {"type": str, "required": False, "nullable": True},
}

def validate_datetime_string(dt_string):
    """Checks if a string is a valid ISO 8601 format."""
    try:
        datetime.fromisoformat(dt_string.replace('Z', '+00:00'))
        return True
    except (ValueError, TypeError):
        return False

def check_schema(job_data):
    """Validates a job data dictionary against the TARGET_SCHEMA."""
    errors = []
    for field, rules in TARGET_SCHEMA.items():
        if rules.get("required") and field not in job_data:
            errors.append(f"Missing required field: '{field}'")
    if errors:
        return False, errors

    for field, value in job_data.items():
        if field in TARGET_SCHEMA:
            rules = TARGET_SCHEMA[field]
            expected_type = rules["type"]
            if value is None:
                if not rules.get("nullable"):
                    errors.append(f"Field '{field}' cannot be null.")
                continue
            if expected_type == "datetime":
                if not isinstance(value, str) or not validate_datetime_string(value):
                    errors.append(f"Field '{field}' is not a valid ISO datetime string. Got: {value}")
                continue
            if not isinstance(value, expected_type):
                errors.append(f"Field '{field}' has incorrect type. Expected {expected_type}, got {type(value)}.")
            if "allowed_values" in rules and value not in rules["allowed_values"]:
                errors.append(f"Field '{field}' has value '{value}', but only {rules['allowed_values']} are allowed.")

    if job_data.get("job_source") == "JOB_FEED" and job_data.get("feed_id") is None:
        errors.append("Conditional error: 'feed_id' is required when 'job_source' is 'JOB_FEED'.")
    if job_data.get("job_source") == "COMPANY_WEBSITE" and job_data.get("feed_id") is not None:
        errors.append("Conditional error: 'feed_id' must be null when 'job_source' is 'COMPANY_WEBSITE'.")

    return not errors, errors

# --- Data Extraction and Insertion Logic ---

def extract_and_load_job(job_data, table_name="open_jobs"):
    """
    Performs schema check, extracts valid fields, and loads them into a Supabase table.

    Args:
        job_data (dict): The raw job data from a feed or other source.
        table_name (str): The name of the Supabase table to insert into.
    """
    print(f"--- Processing job with external_id: {job_data.get('external_job_id', 'N/A')} ---")

    # 1. Validate the data against the schema
    is_valid, errors = check_schema(job_data)

    if not is_valid:
        print(f"Validation FAILED. Errors: {errors}")
        print("Job will not be loaded.\n")
        return

    print("Validation PASSED.")

    # 2. Extract only the fields defined in our schema
    extracted_data = {key: job_data[key] for key in TARGET_SCHEMA if key in job_data}

    print("Extracted data matching schema:")
    print(json.dumps(extracted_data, indent=2))

    # 3. Load the clean data into the Supabase table
    try:
        response = supabase.table(table_name).insert(extracted_data)
        if response.data:
            print(f"Successfully loaded job into '{table_name}'.\n")
        else:
            print("Failed to load job into Supabase.\n")
    except Exception as e:
        print(f"An error occurred while loading data to Supabase: {e}\n")


# --- Main execution block to demonstrate the function ---
if __name__ == "__main__":
    # Example of a valid job from a feed with extra fields
    valid_job_from_feed = {
        "external_job_id": "gh_1a2b3c4d",
        "job_source": "JOB_FEED",
        "feed_id": 3,
        "created_at": "2023-10-27T10:00:00Z",
        "updated_at": "2023-10-27T11:30:00Z",
        "posted_at": "2023-10-26T09:00:00Z",
        "expires_at": "2023-11-25T23:59:59Z",
        "status": "ACTIVE",
        "company_name": "Innovatech Solutions Inc.",
        "title": "Senior Software Engineer (Backend), Platform Team",
        "description": "<div><strong>About us:</strong>...</div>",
        "application_url": "https://jobs.innovatech.com/apply/1a2b3c4d",
        "employment_type": "FULL_TIME",
        "is_remote": False,
        "is_multi_location": True,
        "is_international": False,
        "locations": [{"location_id": 101, "city": "San Francisco", "state": "CA", "country": "USA"}],
        "salary_min": 150000,
        "salary_max": 180000,
        "salary_period": "YEARLY",
        "currency": "USD",
        "feed_specific_field": "value123", # This will be ignored
        "source_priority": "High" # This will also be ignored
    }

    # Example of an invalid job that will be rejected
    invalid_job = {
        "external_job_id": "gh_5e6f7g8h",
        "job_source": "JOB_FEED",
        # feed_id is missing, which will cause a validation error
        "created_at": "2023-10-27T10:00:00Z",
        "updated_at": "2023-10-27T11:30:00Z",
        "posted_at": "2023-10-26T09:00:00Z",
        "status": "ACTIVE",
        "company_name": "Data Corp",
        "title": "Data Analyst",
        "description": "<p>Looking for a data analyst.</p>",
        "is_remote": True,
        "is_multi_location": False,
        "is_international": False,
        "locations": []
    }

    # Process both jobs
    extract_and_load_job(valid_job_from_feed)
    extract_and_load_job(invalid_job)


--- Processing job with external_id: gh_1a2b3c4d ---
Validation PASSED.
Extracted data matching schema:
{
  "external_job_id": "gh_1a2b3c4d",
  "job_source": "JOB_FEED",
  "feed_id": 3,
  "created_at": "2023-10-27T10:00:00Z",
  "updated_at": "2023-10-27T11:30:00Z",
  "posted_at": "2023-10-26T09:00:00Z",
  "expires_at": "2023-11-25T23:59:59Z",
  "status": "ACTIVE",
  "company_name": "Innovatech Solutions Inc.",
  "title": "Senior Software Engineer (Backend), Platform Team",
  "description": "<div><strong>About us:</strong>...</div>",
  "application_url": "https://jobs.innovatech.com/apply/1a2b3c4d",
  "employment_type": "FULL_TIME",
  "is_remote": false,
  "is_multi_location": true,
  "is_international": false,
  "locations": [
    {
      "location_id": 101,
      "city": "San Francisco",
      "state": "CA",
      "country": "USA"
    }
  ],
  "salary_min": 150000,
  "salary_max": 180000,
  "salary_period": "YEARLY",
  "currency": "USD"
}
[Supabase] Accessing table: 'open_jobs'
[Supab

**BASE64:** CONVERT JOB TO BASE64.

In [None]:
import json
import base64
import hashlib

def get_canonical_job_hash(job_data):
    """
    Creates a unique and stable Base64 hash for a job object by focusing
    on its core, location-independent content.

    This function ignores variations in external IDs, locations, and application URLs
    to identify true duplicates.

    Args:
        job_data (dict): The job data dictionary.

    Returns:
        str: A unique Base64 encoded SHA256 hash representing the job's core content.
    """
    # 1. Define the core fields that truly determine the uniqueness of a job,
    # independent of its location or source-specific IDs.
    uniqueness_fields = [
        'company_name',
        'title',
        'description',
        'employment_type',
    ]

    # Create a dictionary with only the core fields.
    # Use .get() to avoid errors if a field is missing, defaulting to None.
    canonical_dict = {key: job_data.get(key) for key in uniqueness_fields}

    # NOTE: We explicitly DO NOT include 'external_job_id', 'locations', 'is_remote',
    # or 'application_url' to ensure jobs are deduplicated correctly even with
    # variations in those fields.

    # 2. Create a stable, sorted JSON string.
    # Sorting the keys ensures that {"a": 1, "b": 2} and {"b": 2, "a": 1} produce the same hash.
    # Using separators without whitespace makes the output compact and consistent.
    canonical_string = json.dumps(canonical_dict, sort_keys=True, separators=(',', ':'))

    # 3. Hash the string using SHA256 for a secure, fixed-length output.
    hash_object = hashlib.sha256(canonical_string.encode('utf-8'))

    # 4. Encode the binary hash in Base64 for easy storage and comparison.
    base64_hash = base64.b64encode(hash_object.digest()).decode('utf-8')

    return base64_hash

# --- Main execution block to demonstrate the function ---
if __name__ == "__main__":
    # This is the base job.
    job_a = {
        "external_job_id": "gh_1a2b3c4d", # Will be ignored
        "company_name": "Innovatech Solutions Inc.",
        "title": "Senior Software Engineer (Backend)",
        "description": "Join our platform team to build scalable microservices.",
        "employment_type": "FULL_TIME",
        "locations": [{"city": "San Francisco", "state": "CA"}], # Will be ignored
        "application_url": "https://example.com/apply?source=1" # Will be ignored
    }

    # This job is identical in its core content but has a different ID and location.
    # It should produce the SAME hash.
    job_b_same_core_content = {
        "external_job_id": "indeed_xyz987", # Different ID
        "company_name": "Innovatech Solutions Inc.",
        "title": "Senior Software Engineer (Backend)",
        "description": "Join our platform team to build scalable microservices.",
        "employment_type": "FULL_TIME",
        "locations": [{"city": "New York", "state": "NY"}, {"city": "Remote"}], # Different locations
        "application_url": "https://example.com/apply?source=2" # Different URL
    }

    # This job is genuinely different.
    job_c_different_job = {
        "external_job_id": "gh_5e6f7g8h",
        "company_name": "Data Corp",
        "title": "Data Analyst",
        "description": "<p>Looking for a data analyst.</p>",
        "employment_type": "FULL_TIME",
        "locations": []
    }

    hash_a = get_canonical_job_hash(job_a)
    hash_b = get_canonical_job_hash(job_b_same_core_content)
    hash_c = get_canonical_job_hash(job_c_different_job)

    print(f"Job A Hash: {hash_a}")
    print(f"Job B Hash: {hash_b}")
    print(f"Job C Hash: {hash_c}")

    print("\n--- Verification ---")
    print(f"Hash A and B are identical (as expected): {hash_a == hash_b}")
    print(f"Hash A and C are different (as expected): {hash_a != hash_c}")


Job A Hash: Wc8f1Ukh658r79462TQtqWhRD/C281rAjO8S8+QdT08=
Job B Hash: Wc8f1Ukh658r79462TQtqWhRD/C281rAjO8S8+QdT08=
Job C Hash: 30rEvGn5LdLnRNxy9Y46acaBNzphcdDOpaNex05q5kM=

--- Verification ---
Hash A and B are identical (as expected): True
Hash A and C are different (as expected): True


In [None]:
# This cell runs the script defined in the previous cell (cell_id: n7mqLs7a82Zz)
# and passes a sample URL as an argument.
%run /content/drive/MyDrive/Colab Notebooks/INTAKE.py https://vendors.pandologic.com/CollabWORK_A/CollabWORK_A2.xml

Exception: File `'/content/drive/MyDrive/Colab.py'` not found.

**CHECK:** IS JOB CLOSED?

In [None]:
import json
import base64
import hashlib

# --- Hashing function updated with learnings from BASE64_JOB.py ---
def get_canonical_job_hash(job_data):
    """
    Creates a unique and stable Base64 hash for a job object by focusing
    on its core, location-independent content.
    """
    # Core fields that truly determine uniqueness, ignoring IDs and locations.
    uniqueness_fields = [
        'company_name',
        'title',
        'description',
        'employment_type',
    ]
    canonical_dict = {key: job_data.get(key) for key in uniqueness_fields}
    canonical_string = json.dumps(canonical_dict, sort_keys=True, separators=(',', ':'))
    hash_object = hashlib.sha256(canonical_string.encode('utf-8'))
    return base64.b64encode(hash_object.digest()).decode('utf-8')

# --- Mock Supabase Client for Demonstration ---
class MockSupabaseClient:
    def __init__(self, initial_data):
        self._data = initial_data

    def table(self, table_name):
        print(f"[Supabase] Accessing table: '{table_name}'")
        return self

    def select(self, columns, filter=None):
        print(f"[Supabase] Selecting '{columns}' from active jobs...")
        # In a real app, this would be: .select(columns).eq("status", "ACTIVE")
        active_jobs = [item for item in self._data if item.get('status') == 'ACTIVE']
        class MockResponse:
            def __init__(self, data, columns):
                self.data = [{col: item.get(col) for col in columns.split(',')} for item in data]
        return MockResponse(active_jobs, columns)

    def update(self, new_values):
        print(f"[Supabase] Preparing to update records with: {new_values}")
        return self

    def in_(self, column, values):
        print(f"[Supabase] Filtering where '{column}' is in {list(values)}")
        updated_count = 0
        for item in self._data:
            if item.get(column) in values:
                item['status'] = 'CLOSED'
                updated_count += 1
        print(f"[Supabase] Mock updated {updated_count} records.")
        return self

    def execute(self):
        print("[Supabase] Execute called. Mock transaction complete.")
        class MockResponse:
            data = [{"status": "updated"}]
        return MockResponse()

# --- Main Logic ---
def check_and_close_jobs(new_feed_jobs, db_client):
    """
    Compares a new job feed against the database to identify and close jobs
    that are no longer active.

    Args:
        new_feed_jobs (list): A list of job dictionaries from the new feed.
        db_client: An initialized Supabase client instance.
    """
    print("--- Starting Job Closure Check ---")

    # 1. Get all job hashes from the new feed using the robust hashing function.
    hashes_in_new_feed = {get_canonical_job_hash(job) for job in new_feed_jobs}
    print(f"Found {len(hashes_in_new_feed)} unique job hashes in the new feed.")

    # 2. Get all 'ACTIVE' job hashes currently in the 'open_jobs' table.
    response = db_client.table("open_jobs").select("id,job_hash")
    active_jobs_in_db = response.data

    hashes_in_db = {job['job_hash']: job['id'] for job in active_jobs_in_db}
    print(f"Found {len(hashes_in_db)} active job hashes in the database.")

    # 3. Find which hashes are in the DB but NOT in the new feed.
    db_hash_set = set(hashes_in_db.keys())

    hashes_to_close = db_hash_set - hashes_in_new_feed

    if not hashes_to_close:
        print("No jobs to close. All active jobs in DB are present in the new feed.")
        return

    print(f"Found {len(hashes_to_close)} jobs to mark as CLOSED.")
    print("Hashes to close:", hashes_to_close)

    # 4. Update the status of these jobs to 'CLOSED' in the database.
    db_client.table("open_jobs").update({"status": "CLOSED"}).in_("job_hash", list(hashes_to_close)).execute()

    print("--- Job Closure Check Complete ---")


if __name__ == "__main__":
    # --- Setup More Realistic Mock Data ---
    job_engineer_sf = {
        "company_name": "Innovatech", "title": "Software Engineer",
        "description": "Build cool stuff.", "employment_type": "FULL_TIME",
        "external_job_id": "111", "locations": [{"city": "San Francisco"}]
    }

    job_engineer_ny = { # Same core job as above, different location/id
        "company_name": "Innovatech", "title": "Software Engineer",
        "description": "Build cool stuff.", "employment_type": "FULL_TIME",
        "external_job_id": "222", "locations": [{"city": "New York"}]
    }

    job_analyst = { # This job will be closed
        "company_name": "Data Corp", "title": "Data Analyst",
        "description": "Analyze data.", "employment_type": "FULL_TIME",
        "external_job_id": "333"
    }

    # Generate the hashes
    hash_engineer = get_canonical_job_hash(job_engineer_sf)
    hash_analyst = get_canonical_job_hash(job_analyst)

    print(f"Engineer Hash: {hash_engineer}")
    print(f"Analyst Hash:  {hash_analyst}")
    print("-" * 20)

    # Simulate the current state of the 'open_jobs' table in Supabase
    mock_db_data = [
        {"id": 1, "job_hash": hash_engineer, "status": "ACTIVE"},
        {"id": 2, "job_hash": hash_analyst, "status": "ACTIVE"},
    ]

    # Simulate a new job feed from a source. The Analyst job is missing,
    # and the Engineer job is present, but with a different location/id.
    new_feed = [job_engineer_ny]

    # Initialize the mock client with our data
    mock_supabase_client = MockSupabaseClient(mock_db_data)

    # Run the process
    check_and_close_jobs(new_feed, mock_supabase_client)



**CHECK:** IS JOB A DUPLICATE?

In [None]:
import json
import base64
import hashlib

# --- Hashing function updated with new learnings ---
def get_canonical_job_hash(job_data):
    """
    Creates a unique and stable Base64 hash for a job object by focusing
    on its core, location-independent content.
    """
    # Core fields that truly determine uniqueness, ignoring IDs and locations.
    uniqueness_fields = [
        'company_name',
        'title',
        'description',
        'employment_type',
    ]
    canonical_dict = {key: job_data.get(key) for key in uniqueness_fields}
    canonical_string = json.dumps(canonical_dict, sort_keys=True, separators=(',', ':'))
    hash_object = hashlib.sha256(canonical_string.encode('utf-8'))
    return base64.b64encode(hash_object.digest()).decode('utf-8')

# --- Mock Supabase Client for Demonstration ---
class MockSupabaseClient:
    def __init__(self, initial_data):
        self._data = initial_data
        self._hashes = {item['job_hash'] for item in initial_data}

    def table(self, table_name):
        print(f"[Supabase] Accessing table: '{table_name}'")
        return self

    def insert(self, data):
        print(f"[Supabase] Inserting data...")
        print(json.dumps(data, indent=2))
        self._data.append(data)
        self._hashes.add(data['job_hash'])
        class MockResponse:
            data = [{"status": "inserted"}]
        return MockResponse()

    def get_existing_hashes(self):
        print("[Supabase] Fetching all existing job hashes...")
        return self._hashes

# --- Main Logic ---
def process_and_insert_jobs(new_jobs, db_client):
    """
    Processes a list of new jobs, checks for duplicates using their hash,
    and inserts only the unique ones.

    Args:
        new_jobs (list): A list of new job dictionaries to process.
        db_client: An initialized Supabase client instance.
    """
    print("--- Starting Duplicate Job Check and Insertion ---")

    # 1. For efficiency, get all existing job hashes from the database in one query.
    existing_hashes = db_client.get_existing_hashes()
    print(f"Found {len(existing_hashes)} existing job hashes in the database.")

    # 2. Iterate through new jobs, generate their hash, and check for existence.
    new_jobs_inserted = 0
    for job in new_jobs:
        # Generate the unique hash for the incoming job using the robust method
        job_hash = get_canonical_job_hash(job)

        print(f"\nProcessing job '{job.get('title')}' | Hash: {job_hash[:10]}...")

        if job_hash in existing_hashes:
            print("Result: DUPLICATE. Job already exists in the database. Skipping.")
        else:
            print("Result: UNIQUE. Inserting new job.")
            # Add the hash to the job data before insertion
            job_to_insert = job.copy()
            job_to_insert['job_hash'] = job_hash

            # Insert the new job
            db_client.table("open_jobs").insert(job_to_insert)

            # Add the new hash to our set to prevent duplicate insertions from the same feed
            existing_hashes.add(job_hash)
            new_jobs_inserted += 1

    print(f"\n--- Process Complete. Inserted {new_jobs_inserted} new jobs. ---")

if __name__ == "__main__":
    # --- Setup More Realistic Mock Data ---
    # This is the job that already exists in our database.
    existing_engineer_job = {
        "company_name": "Innovatech", "title": "Software Engineer",
        "description": "Build cool stuff.", "employment_type": "FULL_TIME",
        "external_job_id": "111", "locations": [{"city": "San Francisco"}]
    }

    # This job is a DUPLICATE of the one above, but from a different feed
    # with a different ID and location. It should be skipped.
    duplicate_engineer_job_from_feed = {
        "company_name": "Innovatech", "title": "Software Engineer",
        "description": "Build cool stuff.", "employment_type": "FULL_TIME",
        "external_job_id": "abc-987", "locations": [{"city": "New York"}]
    }

    # This is a genuinely new job that should be inserted.
    new_analyst_job = {
        "company_name": "Data Corp", "title": "Data Analyst",
        "description": "Analyze data.", "employment_type": "FULL_TIME",
        "external_job_id": "333"
    }

    # Generate the hash for the existing job
    existing_hash = get_canonical_job_hash(existing_engineer_job)

    # Simulate the current state of the 'open_jobs' table in Supabase
    mock_db_data = [
        {"id": 1, "job_hash": existing_hash, "status": "ACTIVE", **existing_engineer_job},
    ]

    # Simulate a new job feed containing the duplicate and the new job
    new_feed = [duplicate_engineer_job_from_feed, new_analyst_job]

    # Initialize the mock client with our data
    mock_supabase_client = MockSupabaseClient(mock_db_data)

    # Run the process
    process_and_insert_jobs(new_feed, mock_supabase_client)


**AI ENRICHMENT:** ENHANCE JOB DATA SCHEMA WITH AI_ FIELDS

In [None]:
import json
import time
import asyncio

# --- Mock Gemini API Client ---
# This mock simulates the behavior of calling the Gemini API.
# In a real application, you would replace this with actual calls
# using a library like `google-generativeai`.

class MockGeminiClient:
    async def generate_content_async(self, prompt):
        print("\n--- Calling Mock Gemini API ---")
        print(f"Prompt: {prompt[:100]}...")
        # Simulate network latency
        await asyncio.sleep(1)

        # Simulate response for industry classification
        if "sector, industry_group, and industry" in prompt:
            mock_response_text = json.dumps({
                "sector": "Technology",
                "industry_group": "Software & IT Services",
                "industry": "Software",
                "industry_id": 501
            })
            print("Response Type: Industry Classification")
            return type('obj', (object,), {'text': mock_response_text}) # Return a mock response object with a .text attribute


        # Simulate response for AI attribute generation
        if "ai_title, ai_description" in prompt:
            mock_response_text = json.dumps({
                "ai_title": "Senior Backend Software Engineer",
                "ai_description": "Join a dynamic platform team to design, develop, and maintain scalable microservices using Go and AWS. Focus on building robust backend systems and improving CI/CD pipelines.",
                "ai_job_tasks": [
                    "Design and develop backend microservices in Go.",
                    "Maintain and improve CI/CD pipelines on AWS.",
                    "Collaborate with cross-functional teams.",
                    "Mentor junior engineers and conduct code reviews."
                ],
                "ai_search_terms": ["golang", "go", "aws", "kubernetes", "docker", "backend engineer", "microservices"],
                "ai_top_tags": ["Go", "AWS", "Backend"],
                "ai_job_function_id": 105,
                "ai_skills": ["Go (Golang)", "Amazon Web Services (AWS)", "Docker", "Kubernetes", "Microservices Architecture", "CI/CD", "Problem Solving"],
                "ai_confidence_score": 0.95 # Added confidence score
            })
            print("Response Type: AI Attribute Generation")
            return type('obj', (object,), {'text': mock_response_text}) # Return a mock response object with a .text attribute


        return type('obj', (object,), {'text': "{}"}) # Return a mock response object with a .text attribute


# --- Gemini Service Class ---

class GeminiService:
    def __init__(self, gemini_client):
        """
        Initializes the GeminiService with a Gemini API client.

        Args:
            gemini_client: An instance of a Gemini API client (e.g., MockGeminiClient or google.generativeai.GenerativeModel).
        """
        self._client = gemini_client
        self._industry_cache = {} # Cache for industry classifications
        self._industry_batch = [] # Batch for industry classification calls
        self._batch_limit = 5 # Process every 5 jobs or when explicitly called

    def add_job_for_industry_classification(self, job):
        """Adds a job to the batch for industry classification."""
        job_key = (job.get('title'), job.get('description'))
        if job_key in self._industry_cache:
            print(f"Industry cache hit for job title: {job.get('title')}")
            return self._industry_cache[job_key]

        self._industry_batch.append(job)
        print(f"Added job '{job.get('title')}' to industry batch. Current batch size: {len(self._industry_batch)}")

    async def process_industry_batch(self):
        """Processes the current batch of jobs for industry classification."""
        if not self._industry_batch:
            print("Industry batch is empty. Nothing to process.")
            return

        print(f"\n--- Processing an industry batch of {len(self._industry_batch)} jobs ---")

        # Create a single prompt for the entire batch
        prompt_lines = ["Analyze the following job listings (title and description) and return a JSON object for each with its sector, industry_group, and industry id.", "---"]
        for i, job in enumerate(self._industry_batch):
            prompt_lines.append(f"Job {i+1}:")
            prompt_lines.append(f"Title: {job.get('title')}")
            prompt_lines.append(f"Description: {job.get('description')}")
            prompt_lines.append("---")

        prompt = "\n".join(prompt_lines)

        # In a real scenario, the response would contain a list of classifications.
        # Here, we simulate it by applying the same mock response to all batch items.
        response = await self._client.generate_content_async(prompt)
        classification = json.loads(response.text) # Assuming response has a .text attribute

        for job in self._industry_batch:
            job.update(classification)
            job_key = (job.get('title'), job.get('description'))
            self._industry_cache[job_key] = classification # Cache the result
            print(f"Enriched job '{job.get('title')}' with industry info.")

        # Clear the batch after processing
        self._industry_batch.clear()
        print("Industry batch processing complete.")

    async def generate_ai_attributes(self, job_data):
        """
        Generates AI-powered attributes for a single job posting.

        Args:
            job_data (dict): The job dictionary, ideally already enriched with industry info.

        Returns:
            dict: The job dictionary updated with ai_ prefixed fields.
        """
        print(f"\n--- Generating AI attributes for job: {job_data.get('title')} ---")

        # Create a detailed prompt for Gemini
        prompt = f"""
        Based on the following job data, generate a structured JSON object containing these fields:
        ai_title, ai_description, ai_job_tasks, ai_search_terms, ai_top_tags, ai_job_function_id, ai_skills, ai_confidence_score.
        The confidence score should be a float between 0.0 and 1.0.

        Job Data:
        - Title: {job_data.get('title')}
        - Company: {job_data.get('company_name')}
        - Description: {job_data.get('description')}
        - Industry: {job_data.get('industry', 'N/A')}

        Generate the response in a clean JSON format.
        """

        response = await self._client.generate_content_async(prompt)
        ai_data = json.loads(response.text) # Assuming response has a .text attribute

        job_data.update(ai_data)
        print("Successfully enriched job with AI attributes.")
        return job_data


# --- Main Execution Block ---

async def main():
    # A list of new, unique jobs ready for enrichment
    new_jobs = [
        {
            "external_job_id": "gh_1a2b3c4d",
            "company_name": "Innovatech Solutions Inc.",
            "title": "Senior Software Engineer (Backend)",
            "description": "Join our platform team to build scalable microservices using Go and AWS."
        },
        { # A different job to show batching
            "external_job_id": "ln_5f6g7h8i",
            "company_name": "HealthData Corp",
            "title": "Clinical Data Analyst",
            "description": "Analyze clinical trial data using Python and SQL."
        }
    ]

    # Initialize the mock client and the Gemini Service
    mock_gemini_client = MockGeminiClient()
    gemini_service = GeminiService(mock_gemini_client)

    # 1. Classify industries in a batch
    for job in new_jobs:
        gemini_service.add_job_for_industry_classification(job)

    await gemini_service.process_industry_batch() # Process all jobs collected so far

    # 2. Generate AI attributes for each enriched job
    enriched_jobs = []
    for job in new_jobs:
        fully_enriched_job = await gemini_service.generate_ai_attributes(job)
        enriched_jobs.append(fully_enriched_job)

    # 3. Display the final, fully enriched job data
    print("\n\n--- FINAL ENRICHED JOB DATA ---")
    print(json.dumps(enriched_jobs, indent=2))


if __name__ == "__main__":
    asyncio.run(main())

**AI CONFIDENCE ASSESSMENT:** MEASURE THE SCORE

In [None]:
import json

# In a real application, you would import these functions from their respective files.
# from AUTO_APPROVE import sync_to_xano
# from MANUAL_REVIEW import send_for_manual_review

# For demonstration, we'll include mock versions of the functions here.
def sync_to_xano(job_data):
    """Mock function to simulate syncing a job to a Xano database."""
    print(f"-> Action: Syncing job '{job_data.get('ai_title')}' to Xano database.")
    # In a real script, this would involve an API call to Xano.
    print("-> Status: AUTO-APPROVED")

def send_for_manual_review(job_data):
    """Mock function to simulate sending a job for manual review."""
    print(f"-> Action: Sending job '{job_data.get('ai_title')}' for manual review.")
    # This could save the job to a different table, a file, or send a notification.
    print("-> Status: PENDING MANUAL REVIEW")


def check_confidence_and_route(job, confidence_threshold=0.86):
    """
    Checks the AI confidence score of a job and routes it for automatic
    approval or manual review.

    Args:
        job (dict): The enriched job data dictionary.
        confidence_threshold (float): The score above which jobs are auto-approved.
    """
    job_title = job.get('ai_title', job.get('title', 'N/A'))
    confidence_score = job.get('ai_confidence_score')

    print(f"--- Processing Job: '{job_title}' ---")

    if confidence_score is None:
        print(f"Confidence score not found. Defaulting to manual review.")
        send_for_manual_review(job)
        return

    print(f"Confidence Score: {confidence_score} | Threshold: {confidence_threshold}")

    if confidence_score >= confidence_threshold:
        print("Confidence is above or equal to threshold.")
        sync_to_xano(job)
    else:
        print("Confidence is below threshold.")
        send_for_manual_review(job)

# --- Main execution block to demonstrate the routing logic ---
if __name__ == "__main__":
    # Example of a high-confidence job that should be auto-approved
    high_confidence_job = {
        "ai_title": "Senior Backend Software Engineer",
        "ai_description": "Join a dynamic platform team...",
        "ai_skills": ["Go (Golang)", "AWS", "Docker"],
        "ai_confidence_score": 0.95
    }

    # Example of a low-confidence job that needs manual review
    low_confidence_job = {
        "ai_title": "Product Marketing Associate (Entry Level)",
        "ai_description": "Seeking a creative individual for marketing tasks.",
        "ai_skills": ["Marketing", "Social Media"],
        "ai_confidence_score": 0.78
    }

    # Example of a job with a missing confidence score
    no_score_job = {
        "ai_title": "IT Helpdesk Technician",
        "ai_description": "Provide technical support to internal employees.",
        "ai_skills": ["Active Directory", "Troubleshooting"]
        # No ai_confidence_score field
    }

    print("--- Starting AI Confidence Check Workflow ---\n")
    check_confidence_and_route(high_confidence_job)
    print("\n" + "="*40 + "\n")
    check_confidence_and_route(low_confidence_job)
    print("\n" + "="*40 + "\n")
    check_confidence_and_route(no_score_job)
    print("\n--- Workflow Complete ---")



**AUTO-APPROVAL:** IF ABOVE AI CONFIDENCE SCORE THRESHOLD

In [None]:
import json
import os

# --- Mock Xano Client for Demonstration ---
# In a real application, you would use a library like 'requests'
# to make API calls to your Xano instance.
class MockXanoClient:
    def __init__(self, api_url, api_key):
        self.api_url = api_url
        self.api_key = api_key
        print(f"[Xano Client] Initialized for endpoint: {api_url}")

    def post(self, endpoint, data):
        """Simulates posting data to a Xano endpoint."""
        print(f"[Xano Client] POST to '{endpoint}'")
        print("[Xano Client] Data Payload:")
        print(json.dumps(data, indent=2))
        # Simulate a successful API response from Xano
        return {"status": "success", "record_id": 12345}

# In a real app, you'd get these from environment variables
XANO_API_URL = "https://xano.example.com/api:your_instance"
XANO_API_KEY = "YOUR_XANO_API_KEY"

# Initialize the client
xano_client = MockXanoClient(XANO_API_URL, XANO_API_KEY)

def sync_to_xano(job_data):
    """
    Takes a validated and approved job object and syncs it to a Xano database.

    Args:
        job_data (dict): The job data to be uploaded.
    """
    print(f"--- Auto-Approving and Syncing Job: '{job_data.get('ai_title')}' ---")

    # You might want to remove the confidence score before syncing
    if 'ai_confidence_score' in job_data:
        del job_data['ai_confidence_score']

    try:
        # The endpoint for your jobs table in Xano
        jobs_endpoint = "/open_jobs"
        response = xano_client.post(jobs_endpoint, data=job_data)

        if response.get("status") == "success":
            print(f"Successfully synced job to Xano. New Record ID: {response.get('record_id')}")
        else:
            print("Error: Failed to sync job to Xano.")

    except Exception as e:
        print(f"An unexpected error occurred during Xano sync: {e}")

# --- Main execution block to demonstrate standalone usage ---
if __name__ == "__main__":
    # Example of a job that has passed the confidence check
    approved_job = {
        "ai_title": "Senior Backend Software Engineer",
        "ai_description": "Join a dynamic platform team...",
        "ai_skills": ["Go (Golang)", "AWS", "Docker"],
        "ai_confidence_score": 0.95 # This will be removed by the function
    }

    sync_to_xano(approved_job)


**MANUAL REVIEW:** IF BELOW AI THRESHOLD SCORE

In [None]:
import json
import os
from datetime import datetime

# Define the file where jobs pending review will be stored
MANUAL_REVIEW_QUEUE_FILE = "manual_review_queue.json"

def load_review_queue():
    """Loads the existing review queue from the JSON file."""
    if os.path.exists(MANUAL_REVIEW_QUEUE_FILE):
        with open(MANUAL_REVIEW_QUEUE_FILE, 'r') as f:
            try:
                return json.load(f)
            except json.JSONDecodeError:
                return []
    return []

def save_review_queue(queue_data):
    """Saves the updated review queue to the JSON file."""
    with open(MANUAL_REVIEW_QUEUE_FILE, 'w') as f:
        json.dump(queue_data, f, indent=2)

def send_for_manual_review(job_data):
    """
    Takes a job that is below the confidence threshold and adds it to a
    manual review queue.

    Args:
        job_data (dict): The job data to be reviewed.
    """
    title = job_data.get('ai_title', 'N/A')
    print(f"--- Sending Job for Manual Review: '{title}' ---")

    # Load the current queue
    review_queue = load_review_queue()

    # Add metadata for the review process
    review_item = {
        "review_status": "pending",
        "added_to_queue_at": datetime.utcnow().isoformat() + 'Z',
        "job_data": job_data
    }

    # Add the new item to the queue
    review_queue.append(review_item)

    # Save the updated queue
    save_review_queue(review_queue)

    print(f"Job successfully added to '{MANUAL_REVIEW_QUEUE_FILE}'.")
    print(f"Total jobs in queue: {len(review_queue)}")

# --- Main execution block to demonstrate standalone usage ---
if __name__ == "__main__":
    # Example of a job that failed the confidence check
    job_to_review = {
        "ai_title": "Product Marketing Associate (Entry Level)",
        "ai_description": "Seeking a creative individual for marketing tasks.",
        "ai_skills": ["Marketing", "Social Media"],
        "ai_confidence_score": 0.78
    }

    send_for_manual_review(job_to_review)

    # You can run this script multiple times to see the queue grow.
    # To see the contents, open the 'manual_review_queue.json' file
    # that will be created in the same directory.
