In [None]:
import boto3
import json
import os
import pandas as pd
import uuid
import sys
import numpy as np
import time
from typing import List, Dict, Any, Union
from tqdm import tqdm

In [None]:
SAGEMAKER_ENDPOINT_NAME = "e5-embeddings-pooled-2" 
AWS_REGION = "us-east-1"

PINECONE_API_KEY = ""  
PINECONE_ENVIRONMENT = "us-east-1"  # matched with AWS region
PINECONE_INDEX_NAME = "mirra-filtering"

EMBEDDING_DIMENSION = 1024

In [None]:
class ResilientSageMakerEmbedder:
    """
    A wrapper around SageMaker embedding endpoints with resilience features.
    Includes text length limits, proper error handling, and fallbacks.
    """
    
    def __init__(self, endpoint_name, max_text_length=512, region="us-east-1"):
        """
        Initialize the embedder with a SageMaker endpoint.
        
        Args:
            endpoint_name: The name of the SageMaker endpoint
            max_text_length: Maximum text length to truncate to
            region: AWS region for the endpoint
        """
        self.sagemaker_runtime = boto3.client('sagemaker-runtime', region_name=region)
        self.endpoint_name = endpoint_name
        self.max_text_length = max_text_length
        print(f"Initialized ResilientSageMakerEmbedder for endpoint: {endpoint_name}")

    def _prepare_text(self, text):
        """Clean and prepare text for the embedding model."""
        if not isinstance(text, str):
            text = str(text)
        
        # Remove excessive whitespace
        text = ' '.join(text.split())
        
        # Add E5 prefix
        if not text.startswith("passage:"):
            text = f"passage: {text}"
        
        # Truncate if needed
        if len(text) > self.max_text_length:
            text = text[:self.max_text_length]
            
        return text

    def generate_embeddings(self, texts, max_retries=3):
        """
        Generate embeddings using SageMaker E5 endpoint with retries and fallbacks
        
        Args:
            texts: String or list of texts to embed
            max_retries: Maximum retry attempts for API failures
            
        Returns:
            List of embedding vectors
        """
        # Ensure texts is a list
        if not isinstance(texts, list):
            texts = [texts]
            
        # Process one text at a time for maximum resilience
        all_embeddings = []
        
        for i, text in enumerate(texts):
            # Process with retries
            for retry in range(max_retries):
                try:
                    # Prepare the single text
                    prepared_text = self._prepare_text(text)
                    
                    # Prepare payload with explicit pooling parameters
                    payload = {
                        "inputs": [prepared_text],
                        "parameters": {
                            "normalize": True,
                            "pooling": "mean",
                            "return_sentence_embedding": True
                        }
                    }
                    
                    # Call SageMaker endpoint
                    response = self.sagemaker_runtime.invoke_endpoint(
                        EndpointName=self.endpoint_name,
                        ContentType='application/json',
                        Body=json.dumps(payload)
                    )
                    
                    # Parse response
                    response_body = json.loads(response['Body'].read().decode('utf-8'))
                    
                    # Process embedding
                    emb_array = np.array(response_body[0])
                    
                    # Handle token-level embeddings by taking mean across tokens
                    if len(emb_array.shape) > 1:
                        # Average across all but the last dimension
                        while len(emb_array.shape) > 1:
                            emb_array = np.mean(emb_array, axis=0)
                    
                    # Ensure we have the right dimension
                    if emb_array.shape[0] != EMBEDDING_DIMENSION:
                        if emb_array.shape[0] > EMBEDDING_DIMENSION:
                            emb_array = emb_array[:EMBEDDING_DIMENSION]
                        else:
                            padded = np.zeros(EMBEDDING_DIMENSION)
                            padded[:emb_array.shape[0]] = emb_array
                            emb_array = padded
                    
                    all_embeddings.append(emb_array.tolist())
                    
                    # Small delay to prevent overwhelming the endpoint
                    if i < len(texts) - 1:
                        time.sleep(0.1)
                        
                    # Success - break the retry loop
                    break
                    
                except Exception as e:
                    if retry < max_retries - 1:
                        wait_time = (2 ** retry) * 0.5  # Exponential backoff
                        print(f"Retry {retry+1} for text {i+1}: {str(e)}")
                        time.sleep(wait_time)
                    else:
                        print(f"Error generating embedding for text {i+1}: {str(e)}")
                        # Use fallback random vector
                        all_embeddings.append(self._create_random_unit_vector())
    
        return all_embeddings
    
    def _create_random_unit_vector(self, dim=1024):
        """Create a random unit vector for fallback"""
        vec = np.random.normal(0, 1, size=dim)
        return (vec / np.linalg.norm(vec)).tolist()

In [None]:
def initialize_pinecone():
    """Initialize Pinecone and return the index"""
    try:
        # Initialize Pinecone client
        from pinecone import Pinecone
        pc = Pinecone(api_key=PINECONE_API_KEY)
        
        # Check if the index exists
        existing_indexes = pc.list_indexes().names()
        print(f"Available Pinecone indexes: {existing_indexes}")
        
        if PINECONE_INDEX_NAME not in existing_indexes:
            print(f"Creating new index '{PINECONE_INDEX_NAME}'...")
            
            # Create the index with the metadata fields we want to be searchable
            pc.create_index(
                name=PINECONE_INDEX_NAME,
                dimension=EMBEDDING_DIMENSION,
                metric="cosine",
                metadata_config={
                    "indexed": [
                        "emp_type",
                        "job_title",
                        "exp_level",
                        "domain",
                        "location",
                        "visa_sponsor",
                        "salary_range_from",
                        "salary_range_to"
                    ]
                }
            )
            print(f"Index '{PINECONE_INDEX_NAME}' created successfully")
        
        # Connect to the index
        index = pc.Index(PINECONE_INDEX_NAME)
        print(f"Connected to Pinecone index: {PINECONE_INDEX_NAME}")
        
        return index
        
    except Exception as e:
        print(f"Error initializing Pinecone: {str(e)}")
        print("Please check your API key and environment settings.")
        return None

In [None]:
def load_job_descriptions(limit=101, bucket="mirra-matcher-325", prefix="data/processed/jobs/"):
    """
    Load job descriptions from S3.
    
    Args:
        limit: Maximum number of jobs to load
        bucket: S3 bucket name
        prefix: S3 prefix for job files
        
    Returns:
        List of job description dictionaries
    """
    jobs = []
    
    # Load from S3
    s3_client = boto3.client('s3')
    
    try:
        print(f"Loading job descriptions from S3: s3://{bucket}/{prefix}")
        response = s3_client.list_objects_v2(
            Bucket=bucket,
            Prefix=prefix,
            MaxKeys=limit
        )

        if 'Contents' in response:
            for obj in response['Contents']:
                key = obj['Key']
                if key.endswith(".json"):
                    response = s3_client.get_object(Bucket=bucket, Key=key)
                    job_data = json.loads(response['Body'].read().decode('utf-8'))
                    # Add job_id if not present
                    if "job_id" not in job_data:
                        job_data["job_id"] = key.split("/")[-1].replace(".json", "")
                    jobs.append(job_data)
                    print(f"Loaded job: {job_data.get('job_id')}")
        else:
            print(f"No job files found in S3 bucket {bucket}/{prefix}")
    except Exception as e:
        print(f"Error loading files from S3: {str(e)}")

    print(f"Loaded {len(jobs)} job descriptions from S3")
    return jobs

In [None]:
def format_job_for_embedding(job):
    """
    Format a job JSON into a comprehensive text representation for embedding.
    
    Args:
        job: Job description dictionary
    
    Returns:
        String representation of the job with all key attributes
    """
    # Extract job details
    details = job.get("details", {})
    
    # Extract each attribute, handling potential missing data
    # Job title
    job_title_data = details.get("job_title", ["Unknown"]) 
    job_title = job_title_data[0] if isinstance(job_title_data, list) and job_title_data else "Unknown"
    
    # Location handling
    location_data = details.get("location", [])
    location = "Remote"  # Default
    
    if isinstance(location_data, list) and location_data:
        location_item = location_data[0]
        if isinstance(location_item, dict):
            city = location_item.get("city", "")
            state = location_item.get("state", "")
            country = location_item.get("country", "")
            location = ", ".join(filter(None, [city, state, country]))
    
    # Company name
    company_data = details.get("company_name", [])
    company_name = company_data[0] if isinstance(company_data, list) and company_data else "Unknown"
    
    # Employment type - Default to "Full-time" if empty
    employment_data = details.get("employment_type", [])
    if not employment_data and details.get("tax_terms"):
        # Use tax_terms if employment_type is empty
        employment_data = details.get("tax_terms", ["Full-time"])
    employment_type = employment_data[0] if isinstance(employment_data, list) and employment_data else "Full-time"
    
    # Experience level (from required years in hard skills)
    experience_level = "Entry-level"  # Default
    if job.get("mandatory", {}).get("hard_skills"):
        max_years = 0
        for skill in job["mandatory"]["hard_skills"]:
            min_years_data = skill.get("minyears", [0])
            min_years = min_years_data[0] if isinstance(min_years_data, list) and min_years_data else 0
            
            # Convert to numeric if needed
            if not isinstance(min_years, (int, float)):
                try:
                    min_years = float(min_years)
                except (ValueError, TypeError):
                    min_years = 0
            
            max_years = max(max_years, min_years)
        
        if max_years >= 7:
            experience_level = "Senior"
        elif max_years >= 3:
            experience_level = "Mid-level"
        else:
            experience_level = "Entry-level"
    
    # Try to get salary information from wage field
    wage_data = details.get("wage", [])
    salary_range = "Not specified"
    salary_from = 0
    salary_to = 0
    
    # Parse company industry/domain
    company_industry = details.get("company_industry", [])
    domain = company_industry[0] if isinstance(company_industry, list) and company_industry else "Technology"
    
    # Check if work_authorization indicates visa sponsorship
    work_authorization = details.get("work_authorization", [])
    visa_sponsorship = "No"  # Default
    
    # Construct job text that includes responsibilities and required skills
    skills_text = ""
    if job.get("mandatory", {}).get("hard_skills"):
        skills_list = []
        for skill_item in job["mandatory"]["hard_skills"]:
            if skill_item.get("skill"):
                for skill_group in skill_item["skill"]:
                    if isinstance(skill_group, list):
                        skills_list.append(" ".join(skill_group))
                    else:
                        skills_list.append(skill_group)
        if skills_list:
            skills_text = "Required skills: " + ", ".join(skills_list)
    
    # Construct responsibilities text
    responsibilities_text = ""
    if job.get("responsibility", {}).get("hard_skills"):
        resp_list = []
        for resp_item in job["responsibility"]["hard_skills"]:
            if resp_item.get("skill"):
                for resp_group in resp_item["skill"]:
                    if isinstance(resp_group, list):
                        resp_list.append(" ".join(resp_group))
                    else:
                        resp_list.append(resp_group)
        if resp_list:
            responsibilities_text = "Responsibilities: " + ", ".join(resp_list)
    
    # Combine all attributes into a comprehensive text
    job_text = f"""
Job Title: {job_title}
Company: {company_name}
Location: {location}
Employment Type: {employment_type}
Experience Level: {experience_level}
Industry/Domain: {domain}
Visa Sponsorship: {visa_sponsorship}

{skills_text}

{responsibilities_text}

Job Description Summary:
This is a {employment_type} position for a {job_title} located in {location}. 
The role requires {experience_level} experience in the {domain} industry.
Visa sponsorship is {visa_sponsorship}.
"""
    
    return job_text.strip(), {
        "job_id": job.get("job_id", str(uuid.uuid4())),
        "job_title": job_title,
        "emp_type": employment_type,
        "exp_level": experience_level,
        "domain": domain,
        "location": location,
        "salary_range_from": salary_from,
        "salary_range_to": salary_to,
        "visa_sponsor": visa_sponsorship
    }

In [None]:
def process_and_upload_jobs(jobs, pinecone_index, batch_size=1, max_retries=3):
    """
    Process jobs, generate embeddings, and upload to Pinecone
    
    Args:
        jobs: List of job dictionaries
        pinecone_index: Pinecone index instance
        batch_size: Batch size for processing (keep low for stability)
        max_retries: Maximum retry attempts
    
    Returns:
        Dictionary with processing statistics
    """
    if not jobs:
        print("No jobs provided for processing")
        return {"status": "error", "reason": "no_jobs"}
    
    if not pinecone_index:
        print("No Pinecone index provided")
        return {"status": "error", "reason": "no_index"}
    
    print(f"Processing {len(jobs)} jobs...")
    
    # Initialize the embedder with conservative settings
    embedder = ResilientSageMakerEmbedder(
        endpoint_name=SAGEMAKER_ENDPOINT_NAME,
        max_text_length=512,  # Limit text length to prevent memory issues
        region=AWS_REGION
    )
    
    # Process all jobs
    job_texts = []
    job_metadata = []
    
    for job in jobs:
        try:
            # Format the job and extract metadata
            job_text, metadata = format_job_for_embedding(job)
            job_texts.append(job_text)
            job_metadata.append(metadata)
            print(f"Processed job {metadata['job_id']} - {metadata['job_title']}")
        except Exception as e:
            print(f"Error processing job {job.get('job_id', 'unknown')}: {str(e)}")
    
    vectors_uploaded = 0
    total_vectors = len(job_texts)  
    
    with tqdm(total=total_vectors, desc="Processing and uploading") as progress_bar:
        for i in range(0, total_vectors, batch_size):
            batch_texts = job_texts[i:i+batch_size]
            batch_metadata = job_metadata[i:i+batch_size]
            
            print(f"Processing batch {i//batch_size + 1}/{(total_vectors-1)//batch_size + 1}...")
            
            try:
                # Process strictly one text at a time with explicit error handling
                batch_embeddings = []
                for idx, text in enumerate(batch_texts):
                    try:
                        print(f"Generating embedding for text {i+idx+1}/{total_vectors}")
                        embedding = embedder.generate_embeddings([text])[0]
                        batch_embeddings.append(embedding)
                        print(f"Successfully generated embedding: length={len(embedding)}")
                    except Exception as e:
                        print(f"ERROR generating embedding for text {i+idx+1}: {str(e)}")
                        # Use fallback vector and continue
                        fallback_vector = embedder._create_random_unit_vector()
                        batch_embeddings.append(fallback_vector)
                        print(f"Using fallback vector instead")
                    time.sleep(0.5)  # Brief pause between embeddings
                
                # Create vectors - with explicit length checks
                vectors_to_upload = []
                for j, embedding in enumerate(batch_embeddings):
                    if not embedding or len(embedding) != EMBEDDING_DIMENSION:
                        print(f"WARNING: Invalid embedding at index {j}, using fallback")
                        embedding = embedder._create_random_unit_vector()
                    
                    if j < len(batch_metadata):
                        job_id = batch_metadata[j]['job_id']
                        vector_id = f"job_{job_id}"
                        
                        # Ensure all metadata values are properly formatted
                        metadata_copy = {}
                        for key, value in batch_metadata[idx].items():
                            if key in ["salary_range_from", "salary_range_to"]:
                                metadata_copy[key] = int(value) if isinstance(value, (int, float)) else 0
                            else:
                                metadata_copy[key] = str(value) if not isinstance(value, str) else value
                        
                        vectors_to_upload.append({
                            "id": vector_id,
                            "values": embedding,
                            "metadata": metadata_copy
                        })
                
                # Print vector counts for clarity
                print(f"Prepared {len(vectors_to_upload)} vectors for upload")
                
                # Explicit upload with better error logging
                if vectors_to_upload:
                    upload_success = False
                    for retry in range(max_retries):
                        try:
                            pinecone_index.upsert(vectors=vectors_to_upload)
                            vectors_uploaded += len(vectors_to_upload)
                            progress_bar.update(len(vectors_to_upload))
                            upload_success = True
                            print(f"Successfully uploaded {len(vectors_to_upload)} vectors")
                            break
                        except Exception as e:
                            print(f"Upload attempt {retry+1} failed: {str(e)}")
                            import traceback
                            traceback.print_exc()
                            if retry < max_retries - 1:
                                wait_time = (2 ** retry) * 2.0  # Longer wait time
                                print(f"Waiting {wait_time}s before retry...")
                                time.sleep(wait_time)
                    
                    if not upload_success:
                        print("WARNING: Failed to upload batch after all retries")
                else:
                    print("No vectors to upload for this batch")
            
            except Exception as e:
                print(f"Critical error in batch {i//batch_size + 1}: {str(e)}")
                import traceback
                traceback.print_exc()
    
    return {
        "total_jobs": len(jobs),
        "vectors_uploaded": vectors_uploaded
    }

In [None]:
def job_embedding_pipeline(limit=101):
    """
    Main pipeline to load, process, and upload job embeddings
    
    Args:
        limit: Maximum number of jobs to process
        
    Returns:
        Dictionary with processing statistics
    """
    start_time = time.time()
    
    # Initialize Pinecone
    pinecone_index = initialize_pinecone()
    if not pinecone_index:
        return {"status": "error", "reason": "Failed to initialize Pinecone"}
    
    # Load jobs
    jobs = load_job_descriptions(limit=limit)
    if not jobs:
        return {"status": "error", "reason": "No jobs loaded"}
    
    # Process and upload jobs
    result = process_and_upload_jobs(jobs, pinecone_index, batch_size=1)
    
    # Get final index stats
    try:
        stats = pinecone_index.describe_index_stats()
        total_vectors = stats.get('total_vector_count', 0)
        print(f"\nFinal index statistics:")
        print(f"  - Total vectors: {total_vectors}")
    except Exception as e:
        print(f"Error getting index stats: {str(e)}")
        total_vectors = result.get("vectors_uploaded", 0)
    
    end_time = time.time()
    processing_time = end_time - start_time
    
    return {
        "status": "success",
        "jobs_processed": len(jobs),
        "vectors_uploaded": result.get("vectors_uploaded", 0),
        "total_vectors_in_index": total_vectors,
        "processing_time_seconds": processing_time
    }

In [None]:
if __name__ == "__main__":
    # Run the pipeline
    result = job_embedding_pipeline(limit=101)  
    print(f"\nPipeline results: {result}")
    
    # Example search
    pinecone_index = initialize_pinecone()
    if pinecone_index:
        embedder = ResilientSageMakerEmbedder(
            endpoint_name=SAGEMAKER_ENDPOINT_NAME,
            region=AWS_REGION
        )
        
        # Search for data science jobs that sponsor visas
        search_results = search_jobs(
            query_text="Data Science",
            pinecone_index=pinecone_index,
            embedder=embedder,
            filters={"visa_sponsor": "Yes"}
        )
        
        print("\nSearch results:")
        for i, result in enumerate(search_results):
            print(f"{i+1}. {result['job_title']} ({result['location']}) - Score: {result['similarity_score']:.4f}")

In [None]:
def search_jobs(query_text, pinecone_index, embedder=None, top_k=10, filters=None):
    """
    Search for jobs using semantic embedding similarity
    
    Args:
        query_text: Query text to search for
        pinecone_index: Pinecone index to search
        embedder: Embedding generator (optional)
        top_k: Number of results to return
        filters: Dictionary of metadata filters
        
    Returns:
        List of job matches with scores
    """
    if not embedder:
        embedder = ResilientSageMakerEmbedder(
            endpoint_name=SAGEMAKER_ENDPOINT_NAME,
            region=AWS_REGION
        )
    
    # Generate embedding for query
    query_embedding = embedder.generate_embeddings([query_text])[0]
    
    # Prepare filters if any
    filter_dict = {}
    if filters:
        for key, value in filters.items():
            if value:  # Only add non-empty filters
                # Handle numeric values for salary filters
                if key in ["salary_range_from", "salary_range_to"] and isinstance(value, (int, float)):
                    filter_dict[key] = {"$gte": value} if key == "salary_range_from" else {"$lte": value}
                else:
                    filter_dict[key] = value
    
    print(f"Searching with filters: {filter_dict}")
    
    # Perform search
    try:
        search_results = pinecone_index.query(
            vector=query_embedding,
            top_k=top_k,
            include_metadata=True,
            filter=filter_dict if filter_dict else None
        )
        
        # Format results
        results = []
        
        # For Pinecone v6+ compatibility
        matches = search_results.get('matches', [])
        if hasattr(search_results, 'matches'):
            matches = search_results.matches
        
        for match in matches:
            # Handle different response formats
            if hasattr(match, 'metadata'):
                metadata = match.metadata
                score = match.score
            else:
                metadata = match.get('metadata', {})
                score = match.get('score', 0)
            
            results.append({
                'job_id': metadata.get('job_id', 'unknown'),
                'job_title': metadata.get('job_title', 'Unknown'),
                'location': metadata.get('location', 'Unknown'),
                'emp_type': metadata.get('emp_type', 'Unknown'),
                'exp_level': metadata.get('exp_level', 'Unknown'),
                'domain': metadata.get('domain', 'Unknown'),
                'visa_sponsor': metadata.get('visa_sponsor', 'No'),
                'similarity_score': score
            })
        
        return results
    
    except Exception as e:
        print(f"Error searching jobs: {str(e)}")
        import traceback
        traceback.print_exc()
        return []

