In [2]:
#!/usr/bin/env python3
"""
O*NET Military Crosswalk Processing Pipeline

This script replicates the n8n workflow for processing O*NET Military Crosswalk data:
1. Fetches raw crosswalk data from GitHub
2. Parses the CSV format
3. Filters for Air Force specialty codes
4. Extracts keywords and creates searchable text
5. Saves the processed data to a local file

Author: Kyle Hall
Date: March 2025
"""

import requests
import csv
import json
import datetime
import re
import os
from typing import List, Dict, Any
import logging
from io import StringIO

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("onet_crosswalk_processor.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

def fetch_crosswalk_data(url: str) -> str:
    """
    Fetch O*NET Military Crosswalk data from GitHub or other URL
    
    Args:
        url: URL to the Crosswalk CSV data
        
    Returns:
        CSV data as string
    """
    logger.info(f"Fetching O*NET Crosswalk data from {url}")
    response = requests.get(url)
    response.raise_for_status()
    logger.info(f"Retrieved crosswalk data, size: {len(response.text)} bytes")
    return response.text

def parse_and_filter_csv(csv_data: str) -> Dict[str, Any]:
    """
    Parse CSV data and filter for Air Force entries
    
    Args:
        csv_data: Raw CSV data as string
        
    Returns:
        Dictionary with processed entries and metadata
    """
    logger.info("Parsing and filtering crosswalk data")
    
    # Parse CSV
    csv_reader = csv.reader(StringIO(csv_data))
    headers = next(csv_reader)
    headers = [h.strip() for h in headers]
    
    # Process all rows
    parsed_data = []
    for row in csv_reader:
        if not row:
            continue
            
        entry = {}
        for i, value in enumerate(row):
            if i < len(headers):
                entry[headers[i]] = value.strip()
        
        parsed_data.append(entry)
    
    logger.info(f"Parsed {len(parsed_data)} entries from CSV")
    
    # Count service types
    service_counts = {}
    for item in parsed_data:
        svc = item.get('SVC', 'unknown')
        service_counts[svc] = service_counts.get(svc, 0) + 1
    
    logger.info(f"Service type counts: {service_counts}")
    
    # Filter for Air Force entries
    air_force_data = [
        item for item in parsed_data 
        if item.get('SVC') in ['F', 'X', 'Y', 'Z'] and item.get('STATUS') == 'A'
    ]
    
    # Filter for Space Force entries
    space_force_data = [
        item for item in parsed_data 
        if item.get('SVC') in ['H', 'L', 'O', 'U'] and item.get('STATUS') == 'A'
    ]
    
    logger.info(f"Found {len(air_force_data)} Air Force and {len(space_force_data)} Space Force entries")
    
    # Transform Air Force data
    transformed_data = []
    
    # Process Air Force entries
    for item in air_force_data:
        transformed_data.append({
            'afscCode': item.get('MOC', ''),
            'afscTitle': item.get('MOC_TITLE', ''),
            'serviceType': item.get('SVC', ''),
            'category': 'Enlisted' if item.get('MPC') == 'E' else 
                        'Officer' if item.get('MPC') == 'O' else 'Warrant',
            'onetCodes': [item.get(f'ONET{i}', '') for i in range(1, 5) if item.get(f'ONET{i}')],
            'onetTitles': [item.get(f'ONET{i}_TITLE', '') for i in range(1, 5) if item.get(f'ONET{i}_TITLE')],
            'socCodes': [item.get(f'SOC{i}', '') for i in range(1, 5) if item.get(f'SOC{i}')],
            'socTitles': [item.get(f'SOC{i}_TITLE', '') for i in range(1, 5) if item.get(f'SOC{i}_TITLE')],
            'statusDate': item.get('SDATE', ''),
            'endDate': item.get('EDATE', '')
        })
    
    # Process Space Force entries
    for item in space_force_data:
        transformed_data.append({
            'afscCode': item.get('MOC', ''),
            'afscTitle': item.get('MOC_TITLE', ''),
            'serviceType': item.get('SVC', ''),
            'category': 'Enlisted' if item.get('MPC') == 'E' else 
                        'Officer' if item.get('MPC') == 'O' else 'Warrant',
            'isSpaceForce': True,
            'onetCodes': [item.get(f'ONET{i}', '') for i in range(1, 5) if item.get(f'ONET{i}')],
            'onetTitles': [item.get(f'ONET{i}_TITLE', '') for i in range(1, 5) if item.get(f'ONET{i}_TITLE')],
            'socCodes': [item.get(f'SOC{i}', '') for i in range(1, 5) if item.get(f'SOC{i}')],
            'socTitles': [item.get(f'SOC{i}_TITLE', '') for i in range(1, 5) if item.get(f'SOC{i}_TITLE')],
            'statusDate': item.get('SDATE', ''),
            'endDate': item.get('EDATE', '')
        })
    
    return {
        'processedCrosswalk': transformed_data,
        'metadata': {
            'totalEntries': len(parsed_data),
            'airForceEntries': len(air_force_data),
            'spaceForceEntries': len(space_force_data),
            'processingDate': datetime.datetime.now().isoformat()
        }
    }

def extract_keywords(data: Dict[str, Any]) -> Dict[str, Any]:
    """
    Extract keywords and create search text for each AFSC
    
    Args:
        data: Parsed and filtered crosswalk data
        
    Returns:
        Enhanced data with keywords and search text
    """
    logger.info("Extracting keywords from AFSC entries")
    afsc_entries = data['processedCrosswalk']
    
    processed_entries = []
    for afsc in afsc_entries:
        # Extract words from titles
        title_words = re.split(r'\s+', afsc.get('afscTitle', '').lower())
        
        # Extract words from O*NET titles
        onet_words = []
        for title in afsc.get('onetTitles', []):
            if title:
                onet_words.extend(re.split(r'\s+', title.lower()))
        
        # Extract words from SOC titles
        soc_words = []
        for title in afsc.get('socTitles', []):
            if title:
                soc_words.extend(re.split(r'\s+', title.lower()))
        
        # Combine all words and remove common words
        common_words = ['and', 'or', 'the', 'a', 'an', 'in', 'on', 'at', 'to', 'for', 'with', 'by', 'of']
        all_words = [word for word in title_words + onet_words + soc_words 
                    if word and len(word) > 2 and word not in common_words]
        
        # Count word frequencies
        word_freq = {}
        for word in all_words:
            word_freq[word] = word_freq.get(word, 0) + 1
        
        # Sort by frequency and get top keywords
        keywords = [word for word, _ in sorted(word_freq.items(), key=lambda x: x[1], reverse=True)[:30]]
        
        # Create searchable text
        search_text = ' '.join([
            afsc.get('afscTitle', ''), 
            *afsc.get('onetTitles', []), 
            *afsc.get('socTitles', [])
        ]).lower()
        
        # Add to processed entries
        processed_entry = afsc.copy()
        processed_entry['keywords'] = keywords
        processed_entry['searchText'] = search_text
        processed_entries.append(processed_entry)
    
    logger.info(f"Keyword extraction complete. Processed {len(processed_entries)} entries.")
    
    return {
        'processedCrosswalk': processed_entries,
        'metadata': {
            **data['metadata'],
            'keywordsExtracted': True
        }
    }

def save_to_file(processed_data: Dict[str, Any], output_dir: str = '.', file_path: str = None) -> str:
    """
    Save processed data to a JSON file
    
    Args:
        processed_data: The processed data to save
        output_dir: Directory to save the file
        file_path: Optional specific file path
        
    Returns:
        Path to the saved file
    """
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)
    
    if file_path is None:
        timestamp = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
        file_path = os.path.join(output_dir, f"processed-afsc-crosswalk-{timestamp}.json")
    
    with open(file_path, 'w', encoding='utf-8') as f:
        json.dump(processed_data, f, indent=2)
    
    logger.info(f"Data saved to {file_path}")
    return file_path

def main():
    """Main entry point for the script"""
    # URL to O*NET Crosswalk data
    url = "https://raw.githubusercontent.com/Kyleinexile/IS-Repo/refs/heads/main/ONET%20Crosswalk.csv"

    output_dir = "processed_data"
    
    try:
        # Fetch data
        csv_data = fetch_crosswalk_data(url)
        
        # Parse and filter data
        parsed_data = parse_and_filter_csv(csv_data)
        
        # Extract keywords
        enhanced_data = extract_keywords(parsed_data)
        
        # Save to file
        save_to_file(enhanced_data, output_dir)
        
        logger.info("Workflow completed successfully")
    except Exception as e:
        logger.error(f"Error in workflow: {str(e)}")
        raise

if __name__ == "__main__":
    main()

2025-03-27 20:21:09,398 - INFO - Fetching O*NET Crosswalk data from https://raw.githubusercontent.com/Kyleinexile/IS-Repo/refs/heads/main/ONET%20Crosswalk.csv
2025-03-27 20:21:09,539 - INFO - Retrieved crosswalk data, size: 8736712 bytes
2025-03-27 20:21:09,543 - INFO - Parsing and filtering crosswalk data
2025-03-27 20:21:09,680 - INFO - Parsed 40077 entries from CSV
2025-03-27 20:21:09,685 - INFO - Service type counts: {'V': 4419, 'N': 7302, 'J': 809, 'C': 320, 'G': 1150, 'F': 9624, 'H': 4517, 'Y': 3418, 'U': 2377, 'A': 2086, 'X': 70, 'M': 2245, 'O': 20, 'S': 562, 'P': 208, 'D': 467, 'Q': 54, 'Z': 40, 'K': 343, 'L': 46}
2025-03-27 20:21:09,693 - INFO - Found 7271 Air Force and 6408 Space Force entries
2025-03-27 20:21:09,758 - INFO - Extracting keywords from AFSC entries
2025-03-27 20:21:09,827 - INFO - Keyword extraction complete. Processed 13679 entries.
2025-03-27 20:21:10,020 - INFO - Data saved to processed_data\processed-afsc-crosswalk-2025-03-27-20-21-09.json
2025-03-27 20:21: