In [2]:
import json
import re
from typing import Dict, List, Optional, Any
from datetime import datetime
import openai
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure, DuplicateKeyError
import logging
from dataclasses import dataclass, asdict
import time
import os
from dotenv import load_dotenv

load_dotenv()


True

In [3]:
API_KEY = os.getenv("OpenAI_API_Key")

In [4]:
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [5]:
@dataclass
class ProcessedJobDescription:
    """Data class for structured job description"""
    job_id: str
    title: str
    category: str
    company_type: str
    location: str
    employment_type: str
    experience_level: str
    education_requirements: List[str]
    years_of_experience: str
    technical_skills: List[str]
    soft_skills: List[str]
    responsibilities: List[str]
    required_qualifications: List[str]
    preferred_qualifications: List[str]
    benefits: List[str]
    salary_range: str
    job_summary: str
    company_overview: str
    original_description: str
    processed_at: str
    keywords: List[str]
    seniority_level: int  # 1=Entry, 2=Mid, 3=Senior, 4=Lead/Principal

In [6]:
@dataclass
class ProcessedJobDescription:
    """Data class for structured job description"""
    job_id: str
    title: str
    category: str
    company_type: str
    location: str
    employment_type: str
    experience_level: str
    education_requirements: List[str]
    years_of_experience: str
    technical_skills: List[str]
    soft_skills: List[str]
    responsibilities: List[str]
    required_qualifications: List[str]
    preferred_qualifications: List[str]
    benefits: List[str]
    salary_range: str
    job_summary: str
    company_overview: str
    original_description: str
    processed_at: str
    keywords: List[str]
    seniority_level: int  # 1=Entry, 2=Mid, 3=Senior, 4=Lead/Principal

In [8]:
class JobDescriptionProcessor:
    def __init__(self, openai_api_key: str, mongo_uri: str = "mongodb://localhost:27017/", 
                 database_name: str = "recruitment_platform"):
        """
        Initialize the Job Description Processor
        
        Args:
            openai_api_key (str): OpenAI API key
            mongo_uri (str): MongoDB connection URI
            database_name (str): Database name
        """
        self.client = openai.OpenAI(api_key=openai_api_key)
        
        # MongoDB setup
        try:
            self.mongo_client = MongoClient(mongo_uri)
            self.db = self.mongo_client[database_name]
            self.collection = self.db.job_descriptions
            
            # Create indexes for better query performance
            self.collection.create_index("job_id", unique=True)
            self.collection.create_index("title")
            self.collection.create_index("category")
            self.collection.create_index("location")
            self.collection.create_index("technical_skills")
            self.collection.create_index("keywords")
            self.collection.create_index("seniority_level")
            
            logger.info("Connected to MongoDB successfully")
            
        except ConnectionFailure as e:
            logger.error(f"Failed to connect to MongoDB: {e}")
            raise

In [10]:

class JobDescriptionProcessor:
    def __init__(self, openai_api_key: str, mongo_uri: str = "mongodb://localhost:27017/", 
                 database_name: str = "recruitment_platform"):
        """
        Initialize the Job Description Processor
        
        Args:
            openai_api_key (str): OpenAI API key
            mongo_uri (str): MongoDB connection URI
            database_name (str): Database name
        """
        self.client = openai.OpenAI(api_key=openai_api_key)
        
        # MongoDB setup
        try:
            self.mongo_client = MongoClient(mongo_uri)
            self.db = self.mongo_client[database_name]
            self.collection = self.db.job_descriptions
            
            # Create indexes for better query performance
            self.collection.create_index("job_id", unique=True)
            self.collection.create_index("title")
            self.collection.create_index("category")
            self.collection.create_index("location")
            self.collection.create_index("technical_skills")
            self.collection.create_index("keywords")
            self.collection.create_index("seniority_level")
            
            logger.info("Connected to MongoDB successfully")
            
        except ConnectionFailure as e:
            logger.error(f"Failed to connect to MongoDB: {e}")
            raise

    def create_extraction_prompt(self, job_description: str) -> str:
        """
        Create a detailed prompt for extracting structured information from job descriptions
        
        Args:
            job_description (str): Raw job description text
            
        Returns:
            str: Formatted extraction prompt
        """
        prompt = f"""
        Analyze the following job description and extract structured information. 
        Return the information in JSON format with the following exact keys:

        Job Description:
        {job_description}

        Extract and return JSON with these keys:
        {{
            "title": "exact job title",
            "job_summary": "brief 2-3 sentence summary of the role",
            "company_overview": "company description if available",
            "responsibilities": ["list", "of", "key", "responsibilities"],
            "education_requirements": ["degree requirements", "certifications"],
            "years_of_experience": "X-Y years or specific requirement",
            "technical_skills": ["specific", "technical", "skills", "tools", "technologies"],
            "soft_skills": ["communication", "leadership", "teamwork", "etc"],
            "required_qualifications": ["must", "have", "qualifications"],
            "preferred_qualifications": ["nice", "to", "have", "qualifications"],
            "benefits": ["list", "of", "benefits", "mentioned"],
            "salary_range": "salary range if mentioned or 'Not specified'",
            "employment_type": "Full-time, Part-time, Contract, etc.",
            "keywords": ["relevant", "keywords", "for", "search", "matching"]
        }}

        Guidelines:
        - Extract only information that is explicitly mentioned
        - For technical_skills, include programming languages, frameworks, tools, databases, etc.
        - For keywords, include important terms that would help in job matching
        - Keep lists concise but comprehensive
        - If information is not available, use empty array [] or "Not specified"
        - Return only valid JSON, no additional text
        """
        return prompt

    def extract_structured_data(self, raw_jd: Dict) -> Optional[ProcessedJobDescription]:
        """
        Extract structured data from a raw job description using LLM
        
        Args:
            raw_jd (Dict): Raw job description data
            
        Returns:
            Optional[ProcessedJobDescription]: Processed job description or None if failed
        """
        try:
            # Create extraction prompt
            full_description = raw_jd.get('full_description', '')
            prompt = self.create_extraction_prompt(full_description)
            
            # Call OpenAI API
            response = self.client.chat.completions.create(
                model="gpt-3.5-turbo",
                messages=[
                    {"role": "system", "content": "You are an expert HR data analyst. Extract structured information from job descriptions and return only valid JSON."},
                    {"role": "user", "content": prompt}
                ],
                max_tokens=1500,
                temperature=0.3  # Lower temperature for more consistent extraction
            )
            
            # Parse the response
            extracted_text = response.choices[0].message.content.strip()
            
            # Clean the response to ensure valid JSON
            extracted_text = self._clean_json_response(extracted_text)
            
            # Parse JSON
            extracted_data = json.loads(extracted_text)
            
            # Map seniority level
            seniority_level = self._map_seniority_level(raw_jd.get('experience_level', ''))
            
            # Create ProcessedJobDescription object
            processed_jd = ProcessedJobDescription(
                job_id=raw_jd.get('id', f"JD_{int(time.time())}"),
                title=extracted_data.get('title', raw_jd.get('title', 'Unknown')),
                category=raw_jd.get('category', 'Unknown'),
                company_type=raw_jd.get('company_type', 'Unknown'),
                location=raw_jd.get('location', 'Unknown'),
                employment_type=extracted_data.get('employment_type', 'Full-time'),
                experience_level=raw_jd.get('experience_level', 'Not specified'),
                education_requirements=extracted_data.get('education_requirements', []),
                years_of_experience=extracted_data.get('years_of_experience', 'Not specified'),
                technical_skills=extracted_data.get('technical_skills', []),
                soft_skills=extracted_data.get('soft_skills', []),
                responsibilities=extracted_data.get('responsibilities', []),
                required_qualifications=extracted_data.get('required_qualifications', []),
                preferred_qualifications=extracted_data.get('preferred_qualifications', []),
                benefits=extracted_data.get('benefits', []),
                salary_range=extracted_data.get('salary_range', 'Not specified'),
                job_summary=extracted_data.get('job_summary', ''),
                company_overview=extracted_data.get('company_overview', ''),
                original_description=full_description,
                processed_at=datetime.now().isoformat(),
                keywords=extracted_data.get('keywords', []),
                seniority_level=seniority_level
            )
            
            return processed_jd
            
        except json.JSONDecodeError as e:
            logger.error(f"JSON parsing error for job {raw_jd.get('id', 'unknown')}: {e}")
            return None
        except Exception as e:
            logger.error(f"Error processing job {raw_jd.get('id', 'unknown')}: {e}")
            return None

    def _clean_json_response(self, response_text: str) -> str:
        """
        Clean the LLM response to ensure valid JSON
        
        Args:
            response_text (str): Raw response from LLM
            
        Returns:
            str: Cleaned JSON string
        """
        # Remove any text before the first {
        start_idx = response_text.find('{')
        if start_idx != -1:
            response_text = response_text[start_idx:]
        
        # Remove any text after the last }
        end_idx = response_text.rfind('}')
        if end_idx != -1:
            response_text = response_text[:end_idx + 1]
        
        return response_text

    def _map_seniority_level(self, experience_level: str) -> int:
        """
        Map experience level to numeric seniority level
        
        Args:
            experience_level (str): Experience level string
            
        Returns:
            int: Numeric seniority level
        """
        experience_level = experience_level.lower()
        if 'entry' in experience_level or 'junior' in experience_level:
            return 1
        elif 'mid' in experience_level:
            return 2
        elif 'senior' in experience_level:
            return 3
        elif 'lead' in experience_level or 'principal' in experience_level:
            return 4
        else:
            return 2  # Default to mid-level

    def store_in_mongodb(self, processed_jd: ProcessedJobDescription) -> bool:
        """
        Store processed job description in MongoDB
        
        Args:
            processed_jd (ProcessedJobDescription): Processed job description
            
        Returns:
            bool: True if successful, False otherwise
        """
        try:
            # Convert to dictionary
            jd_dict = asdict(processed_jd)
            
            # Insert or update in MongoDB
            result = self.collection.replace_one(
                {"job_id": processed_jd.job_id},
                jd_dict,
                upsert=True
            )
            
            if result.upserted_id or result.modified_count > 0:
                logger.info(f"Successfully stored job: {processed_jd.job_id}")
                return True
            else:
                logger.warning(f"No changes made for job: {processed_jd.job_id}")
                return False
                
        except Exception as e:
            logger.error(f"Error storing job {processed_jd.job_id} in MongoDB: {e}")
            return False

    def process_all_job_descriptions(self, input_file: str = "job_descriptions_dataset.json") -> Dict[str, Any]:
        """
        Process all job descriptions from the input file
        
        Args:
            input_file (str): Path to the job descriptions JSON file
            
        Returns:
            Dict[str, Any]: Processing results summary
        """
        try:
            # Load job descriptions
            with open(input_file, 'r', encoding='utf-8') as f:
                raw_job_descriptions = json.load(f)
            
            logger.info(f"Loaded {len(raw_job_descriptions)} job descriptions")
            
            # Processing counters
            successful_processed = 0
            failed_processed = 0
            successful_stored = 0
            failed_stored = 0
            
            # Process each job description
            for i, raw_jd in enumerate(raw_job_descriptions, 1):
                logger.info(f"Processing job {i}/{len(raw_job_descriptions)}: {raw_jd.get('title', 'Unknown')}")
                
                # Extract structured data
                processed_jd = self.extract_structured_data(raw_jd)
                
                if processed_jd:
                    successful_processed += 1
                    
                    # Store in MongoDB
                    if self.store_in_mongodb(processed_jd):
                        successful_stored += 1
                    else:
                        failed_stored += 1
                else:
                    failed_processed += 1
                    logger.error(f"Failed to process job: {raw_jd.get('id', 'unknown')}")
                
                # Add delay to avoid rate limiting
                time.sleep(1)
            
            # Generate summary
            summary = {
                "total_jobs": len(raw_job_descriptions),
                "successful_processed": successful_processed,
                "failed_processed": failed_processed,
                "successful_stored": successful_stored,
                "failed_stored": failed_stored,
                "processing_date": datetime.now().isoformat(),
                "mongodb_collection": self.collection.name,
                "mongodb_database": self.db.name
            }
            
            logger.info(f"Processing complete! Summary: {summary}")
            return summary
            
        except FileNotFoundError:
            logger.error(f"Input file {input_file} not found")
            raise
        except Exception as e:
            logger.error(f"Error processing job descriptions: {e}")
            raise

    def get_collection_stats(self) -> Dict[str, Any]:
        """
        Get statistics about the stored job descriptions
        
        Returns:
            Dict[str, Any]: Collection statistics
        """
        try:
            total_jobs = self.collection.count_documents({})
            
            # Get category distribution
            category_stats = list(self.collection.aggregate([
                {"$group": {"_id": "$category", "count": {"$sum": 1}}},
                {"$sort": {"count": -1}}
            ]))
            
            # Get location distribution
            location_stats = list(self.collection.aggregate([
                {"$group": {"_id": "$location", "count": {"$sum": 1}}},
                {"$sort": {"count": -1}}
            ]))
            
            # Get seniority level distribution
            seniority_stats = list(self.collection.aggregate([
                {"$group": {"_id": "$seniority_level", "count": {"$sum": 1}}},
                {"$sort": {"_id": 1}}
            ]))
            
            # Get top technical skills
            top_skills = list(self.collection.aggregate([
                {"$unwind": "$technical_skills"},
                {"$group": {"_id": "$technical_skills", "count": {"$sum": 1}}},
                {"$sort": {"count": -1}},
                {"$limit": 20}
            ]))
            
            stats = {
                "total_jobs": total_jobs,
                "category_distribution": {item["_id"]: item["count"] for item in category_stats},
                "location_distribution": {item["_id"]: item["count"] for item in location_stats},
                "seniority_distribution": {item["_id"]: item["count"] for item in seniority_stats},
                "top_technical_skills": {item["_id"]: item["count"] for item in top_skills},
                "last_updated": datetime.now().isoformat()
            }
            
            return stats
            
        except Exception as e:
            logger.error(f"Error getting collection stats: {e}")
            return {}

    def search_jobs(self, query: Dict[str, Any], limit: int = 10) -> List[Dict]:
        """
        Search jobs in the MongoDB collection
        
        Args:
            query (Dict[str, Any]): MongoDB query
            limit (int): Maximum number of results
            
        Returns:
            List[Dict]: Search results
        """
        try:
            results = list(self.collection.find(query, {"_id": 0}).limit(limit))
            return results
        except Exception as e:
            logger.error(f"Error searching jobs: {e}")
            return []

    def close_connection(self):
        """Close MongoDB connection"""
        if hasattr(self, 'mongo_client'):
            self.mongo_client.close()
            logger.info("MongoDB connection closed")


In [24]:
load_dotenv()
MONGO_URI = os.getenv("MONGO_URI") 
print(MONGO_URI)

mongodb+srv://nusrathamtha:sfupuHmba8xgqZdy@cluster0.rosecne.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0


In [21]:
MONGO_URI

In [25]:
from pymongo import MongoClient
MONGO_URI = os.getenv("MONGO_URI") 

try:
    client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
    client.admin.command("ping")
    print("MongoDB Atlas connection successful")
except Exception as e:
    print(f"Connection failed: {e}")

MongoDB Atlas connection successful


In [26]:
def main():
    """
    Main function to run the job description processor
    """
    # Configuration
    OPENAI_API_KEY = os.getenv("OpenAI_API_Key")  # Replace with your actual API key
    MONGO_URI = os.getenv("MONGO_URI")  # Update if using cloud MongoDB
    DATABASE_NAME = "recruitment_platform"
    INPUT_FILE = "job_descriptions_dataset.json"
    
    # if OPENAI_API_KEY == os.getenv("OpenAI_API_Key"):
    #     print("⚠️  Please replace 'your-openai-api-key-here' with your actual OpenAI API key")
    #     return
    
    try:
        # Initialize processor
        processor = JobDescriptionProcessor(
            openai_api_key=OPENAI_API_KEY,
            mongo_uri=MONGO_URI,
            database_name=DATABASE_NAME
        )
        
        # Process all job descriptions
        print("Starting job description processing...")
        summary = processor.process_all_job_descriptions(INPUT_FILE)
        
        # Save processing summary
        with open("processing_summary.json", 'w', encoding='utf-8') as f:
            json.dump(summary, f, indent=2, ensure_ascii=False)
        
        # Get and save collection statistics
        stats = processor.get_collection_stats()
        with open("collection_stats.json", 'w', encoding='utf-8') as f:
            json.dump(stats, f, indent=2, ensure_ascii=False)
        
        # Print summary
        print("\n" + "="*60)
        print("PROCESSING SUMMARY")
        print("="*60)
        print(f"Total Jobs: {summary['total_jobs']}")
        print(f"Successfully Processed: {summary['successful_processed']}")
        print(f"Successfully Stored: {summary['successful_stored']}")
        print(f"Failed Processing: {summary['failed_processed']}")
        print(f"Failed Storage: {summary['failed_stored']}")
        
        print("\n" + "="*60)
        print("COLLECTION STATISTICS")
        print("="*60)
        print(f"Total Jobs in Database: {stats['total_jobs']}")
        print("\nTop Categories:")
        for category, count in list(stats['category_distribution'].items())[:5]:
            print(f"  {category}: {count}")
        
        print("\nTop Technical Skills:")
        for skill, count in list(stats['top_technical_skills'].items())[:10]:
            print(f"  {skill}: {count}")
        
        print(f"\nFiles generated:")
        print(f"  - processing_summary.json")
        print(f"  - collection_stats.json")
        
        # Close connection
        processor.close_connection()
        
    except Exception as e:
        logger.error(f"Error in main execution: {e}")
        raise

if __name__ == "__main__":
    main()

INFO:__main__:Connected to MongoDB successfully
INFO:__main__:Loaded 200 job descriptions
INFO:__main__:Processing job 1/200: DevOps Engineer


Starting job description processing...


INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:__main__:Successfully stored job: JD_8986_1753437955
INFO:__main__:Processing job 2/200: Backend Developer
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:__main__:Successfully stored job: JD_7363_1753437962
INFO:__main__:Processing job 3/200: AI/ML Consultant
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:__main__:Successfully stored job: JD_2589_1753437969
INFO:__main__:Processing job 4/200: Agile Coach
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:__main__:Successfully stored job: JD_1178_1753437976
INFO:__main__:Processing job 5/200: QA Manager
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:__main__:Successfully stored job: JD_4336_1753437984
INFO:__main__:Processing job 6/200: Computer Vision E


PROCESSING SUMMARY
Total Jobs: 200
Successfully Processed: 200
Successfully Stored: 200
Failed Processing: 0
Failed Storage: 0

COLLECTION STATISTICS
Total Jobs in Database: 200

Top Categories:
  Project Management: 41
  Quality Assurance: 33
  Machine Learning: 32
  DevOps: 29
  Software Engineering: 27

Top Technical Skills:
  Python: 85
  SQL: 33
  Java: 29
  TensorFlow: 28
  R: 24
  Selenium: 23
  Tableau: 21
  AWS: 20
  Kubernetes: 18
  Docker: 18

Files generated:
  - processing_summary.json
  - collection_stats.json


INFO:__main__:MongoDB connection closed
