### Job Extraction Pipeline


In [None]:
import pandas as pd
import json
import time
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime

# Import helper functions
from jobs_helper import scrape_single_config

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# File paths
SITE_CONFIGS_PATH = './data/site_configs_raw.json'
OUTPUT_JOBS_CSV = './data/avature_jobs.csv'
OUTPUT_JOBS_JSON = './data/avature_jobs.json'

# Scraping configuration
MAX_WORKERS = 5

print("Configuration loaded")
print(f"Input: {SITE_CONFIGS_PATH}")
print(f"Output: {OUTPUT_JOBS_CSV}")


In [None]:
# Load site configurations
with open(SITE_CONFIGS_PATH, 'r') as f:
    all_configs = json.load(f)

df_configs = pd.DataFrame(all_configs)

print("="*60)
print("SITE CONFIGURATIONS LOADED")
print("="*60)

# Filter for scrapable endpoints
success_configs = df_configs[df_configs['status'] == 'success'].to_dict('records')
partial_configs = df_configs[df_configs['status'] == 'partial'].to_dict('records')

print(f"\nEndpoint Status:")
print(f"  SUCCESS (paginated): {len(success_configs)}")
print(f"  PARTIAL (HTML scrape): {len(partial_configs)}")
print(f"  Total to scrape: {len(success_configs) + len(partial_configs)}")

print(f"\nSample SUCCESS endpoints:")
for config in success_configs[:3]:
    print(f"  {config['tenant']}: {config['endpoint']}")

if len(partial_configs) > 0:
    print(f"\nSample PARTIAL endpoints:")
    for config in partial_configs[:3]:
        print(f"  {config['tenant']}: {config.get('sample_job_ids', 'N/A')}")


In [None]:
print("="*60)
print("STARTING JOB EXTRACTION")
print("="*60)

all_jobs = []
configs_to_scrape = success_configs + partial_configs

print(f"\nProcessing {len(configs_to_scrape)} endpoints...")
print(f"Estimated time: {len(configs_to_scrape) * 30 / 60:.1f} minutes\n")

start_time = time.time()

with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
    future_to_config = {
        executor.submit(scrape_single_config, config): config 
        for config in configs_to_scrape
    }
    
    completed = 0
    for future in as_completed(future_to_config):
        config = future_to_config[future]
        try:
            jobs = future.result()
            all_jobs.extend(jobs)
            completed += 1
            
            if completed % 10 == 0:
                elapsed = time.time() - start_time
                jobs_so_far = len(all_jobs)
                print(f"\nProgress: {completed}/{len(configs_to_scrape)} sites | "
                      f"{jobs_so_far:,} jobs | {elapsed/60:.1f} min elapsed")
        except Exception as e:
            logger.error(f"Future failed for {config['tenant']}: {e}")

elapsed_time = time.time() - start_time

print("\n" + "="*60)
print("SCRAPING COMPLETE")
print("="*60)
print(f"\nTotal time: {elapsed_time/60:.1f} minutes")
print(f"Total jobs collected: {len(all_jobs):,}")
print(f"Sites scraped: {len(configs_to_scrape)}")
if len(configs_to_scrape) > 0:
    print(f"Average: {len(all_jobs)/len(configs_to_scrape):.1f} jobs/site")


In [None]:
df_jobs = pd.DataFrame(all_jobs)

print("\n" + "="*60)
print("DEDUPLICATION")
print("="*60)

before_dedup = len(df_jobs)
print(f"\nOriginal jobs collected: {before_dedup:,}")

# Step 1: Remove same-tenant duplicates
df_jobs_step1 = df_jobs.drop_duplicates(subset=['tenant', 'job_id'], keep='first')
removed_step1 = before_dedup - len(df_jobs_step1)
print(f"Step 1 - Same-tenant duplicates: {removed_step1:,} removed")

# Step 2: Remove cross-tenant duplicates
df_jobs_step2 = df_jobs_step1.drop_duplicates(subset=['job_id'], keep='first')
removed_step2 = len(df_jobs_step1) - len(df_jobs_step2)
print(f"Step 2 - Cross-tenant duplicates: {removed_step2:,} removed")

# Step 3: Remove URL duplicates
df_jobs_final = df_jobs_step2.drop_duplicates(subset=['job_url'], keep='first')
removed_step3 = len(df_jobs_step2) - len(df_jobs_final)
print(f"Step 3 - URL duplicates: {removed_step3:,} removed")

print(f"\nFinal result:")
print(f"  Before: {before_dedup:,}")
print(f"  After: {len(df_jobs_final):,}")
print(f"  Removed: {before_dedup - len(df_jobs_final):,} ({(before_dedup - len(df_jobs_final))/before_dedup*100:.1f}%)")

df_jobs = df_jobs_final

# Save files
df_jobs.to_csv(OUTPUT_JOBS_CSV, index=False)
print(f"\nSaved CSV: {OUTPUT_JOBS_CSV}")

with open(OUTPUT_JOBS_JSON, 'w') as f:
    json.dump(df_jobs.to_dict('records'), f, indent=2)
print(f"Saved JSON: {OUTPUT_JOBS_JSON}")


In [None]:
print("\n" + "="*60)
print("JOB EXTRACTION STATISTICS")
print("="*60)

print(f"\nOverall:")
print(f"  Total jobs: {len(df_jobs):,}")
print(f"  Unique tenants: {df_jobs['tenant'].nunique()}")
print(f"  Jobs with descriptions: {df_jobs['job_description'].notna().sum():,} ({df_jobs['job_description'].notna().sum()/len(df_jobs)*100:.1f}%)")
print(f"  Jobs with locations: {df_jobs['location'].notna().sum():,} ({df_jobs['location'].notna().sum()/len(df_jobs)*100:.1f}%)")

print(f"\nTop 10 Tenants by Job Count:")
top_tenants = df_jobs['tenant'].value_counts().head(10)
for tenant, count in top_tenants.items():
    print(f"  {tenant}: {count:,} jobs")

print(f"\nSample jobs:")
print(df_jobs[['tenant', 'job_title', 'location']].head(10).to_string(index=False))


In [None]:
print("\n" + "="*60)
print("DATA QUALITY REPORT")
print("="*60)

# Check completeness
print(f"\nField Completeness:")
for col in ['job_title', 'job_url', 'job_id', 'location', 'job_description']:
    completeness = df_jobs[col].notna().sum() / len(df_jobs) * 100
    print(f"  {col}: {completeness:.1f}%")

# Check for issues
issues = []

no_title = df_jobs[df_jobs['job_title'].isna()]
if len(no_title) > 0:
    issues.append(f"{len(no_title)} jobs missing titles")

no_url = df_jobs[df_jobs['job_url'].isna()]
if len(no_url) > 0:
    issues.append(f"{len(no_url)} jobs missing URLs")

if issues:
    print(f"\nIssues Found:")
    for issue in issues:
        print(f"  {issue}")
else:
    print(f"\nNo critical issues found")
