In [None]:
import os
import pandas as pd
import json
from langchain_openai import ChatOpenAI
from langchain.prompts import PromptTemplate
from langchain_core.output_parsers.json import JsonOutputParser
from dotenv import load_dotenv
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Tuple
import time
from datetime import datetime

load_dotenv(dotenv_path=".env", override=True)

In [None]:
def create_job_summary_prompt():
    """Create the prompt template for job description summarization"""
    
    template = """
    You are an expert HR professional and recruiter. Please analyze the following job description and provide a comprehensive summary in JSON format.

    Job Description:
    {job_description}

    Please extract and organize the information into the following structure:

    **Job Information**
    - Extract job title from the description
    - Determine job level based on responsibilities and requirements (Entry/Junior/Mid-level/Senior/Lead/Executive)
    - Determine work type: "Remote", "Hybrid", or "On-site"

    **Compensation**
    - Extract salary information:
      - Min salary: minimum salary as number without commas
      - Max salary: maximum salary as number without commas
      - If range given (e.g., "$120,000 - $160,000"), extract both values
      - If single value given, use same for min and max
      - If not mentioned, use null for both

    **Overview**
    - Provide a brief 1-3 sentence summary of the position and its primary purpose

    **Responsibilities**
    - List the 3-5 most important responsibilities/duties
    - Focus on core functions, not minor tasks

    **Requirements
    - Skills: Extract all mentioned technical skills, programming languages, platforms (no commas in values), don't include soft skills
    - Experience: Determine years_min(as numbers), and level (junior/mid/senior)
    - Qualifications: List requirements, experience, education
    

    {format_instructions}
    """
    
    return PromptTemplate(
        input_variables=["job_description"],
        template=template,
        partial_variables={"format_instructions": "{format_instructions}"}
    )

class JobDescriptionSummarizer:
    def __init__(self, model_name="gpt-4o-mini", temperature=0.3):
        """
        Initialize the job description summarizer
        
        Args:
            model_name (str): OpenAI model to use
            temperature (float): Temperature for text generation (0.0-2.0)
                                0.0 = deterministic, focused responses
                                1.0 = balanced creativity and consistency  
                                2.0 = highly creative, unpredictable responses
        """
        self.llm = ChatOpenAI(
            model=model_name,
            temperature=temperature
        )
        
        # Define the expected JSON schema
        self.json_schema = {
            "job_id": "Job ID to be added later",
            "job_title": "Extracted job title",
            "job_level": "Junior/Mid-level/Senior/Lead/Executive",
            "workplace_type": "Remote/Hybrid/On-site",
            "compensation": {
                "salary": {
                    "min": "Minimum salary as number or null",
                    "max": "Maximum salary as number or null"
                }
            },
            "overview": "Brief 1-3 sentence summary",
            "responsibilities": ["List of 3-5 key responsibilities"],
            "requirements": {
                "skills": ["Technical skills array"],
                "experience": {
                "years_min": "Minimum years as number",
                "level": "entry/junior/mid/senior"
                },
                "qualifications": ["Essential qualifications array"]
            }
        }
        
        # Initialize JSON parser with schema
        self.parser = JsonOutputParser(pydantic_object=None)
        self.prompt = create_job_summary_prompt()
        
        # Create the chain with format instructions
        self.chain = self.prompt.partial(format_instructions=self.parser.get_format_instructions()) | self.llm | self.parser
    
    def summarize(self, job_description: str) -> dict:
        """
        Summarize a job description
        
        Args:
            job_description (str): The full job description text
            
        Returns:
            dict: Structured summary of the job description
        """
        try:
            # Generate summary using LCEL chain with JSON parser
            result = self.chain.invoke({"job_description": job_description})
            
            return {
                "success": True,
                "summary": result,
                "raw_output": str(result)
            }
            
        except Exception as e:
            return {
                "success": False,
                "error": str(e),
                "summary": None,
                "raw_output": None
            }
    
    def process_single_job(self, job_data: Tuple[str, str]) -> dict:
        """
        Process a single job description
        
        Args:
            job_data: Tuple of (job_id, job_description)
            
        Returns:
            dict: Processing result with job_id and summary
        """
        job_id, job_description = job_data
        
        try:
            # Check if job description is empty or NaN
            if pd.isna(job_description) or str(job_description).strip() == "" or str(job_description).lower() == "nan":
                return {
                    "success": False,
                    "error": f"Job description is empty or missing",
                    "summary": None,
                    "job_id": job_id
                }
            
            # Summarize the job description
            summary_result = self.summarize(str(job_description))
            
            # Add job ID to the summary
            if summary_result["success"] and summary_result["summary"]:
                summary_result["summary"]["job_id"] = job_id
            
            summary_result["job_id"] = job_id
            
            return summary_result
            
        except Exception as e:
            return {
                "success": False,
                "error": str(e),
                "summary": None,
                "job_id": job_id
            }
    
    def process_batch_concurrent(self, batch_data: List[Tuple[str, str]], max_workers: int = 5) -> List[dict]:
        """
        Process a batch of jobs concurrently
        
        Args:
            batch_data: List of tuples (job_id, job_description)
            max_workers: Maximum number of concurrent workers
            
        Returns:
            List of processing results
        """
        results = []
        
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            # Submit all jobs in the batch
            future_to_job = {executor.submit(self.process_single_job, job_data): job_data[0] 
                            for job_data in batch_data}
            
            # Process completed futures
            for future in as_completed(future_to_job):
                job_id = future_to_job[future]
                try:
                    result = future.result()
                    results.append(result)
                    
                    if result["success"]:
                        print(f"✅ Successfully processed job ID: {job_id}")
                    else:
                        print(f"❌ Failed to process job ID: {job_id} - {result['error']}")
                        
                except Exception as e:
                    print(f"❌ Exception processing job ID: {job_id} - {str(e)}")
                    results.append({
                        "success": False,
                        "error": str(e),
                        "summary": None,
                        "job_id": job_id
                    })
        
        return results
    
    def summarize_all_from_parquet(self, parquet_file_path: str, 
                                  output_file: str = "jobs_summaries.json",
                                  batch_size: int = 50,
                                  max_workers: int = 5,
                                  id_column: str = "id", 
                                  jd_column: str = "JD",
                                  limit: int = None) -> dict:
        """
        Process all job descriptions from a parquet file in batches with concurrent processing
        
        Args:
            parquet_file_path: Path to the parquet file
            output_file: Output JSON file path
            batch_size: Number of jobs to process in each batch
            max_workers: Maximum number of concurrent workers per batch
            id_column: Name of the ID column
            jd_column: Name of the job description column
            limit: Optional limit on total number of jobs to process
            
        Returns:
            dict: Summary of processing results
        """
        start_time = time.time()
        
        try:
            # Read parquet file
            print(f"📖 Reading parquet file: {parquet_file_path}")
            df = pd.read_parquet(parquet_file_path)
            
            # Validate columns
            if id_column not in df.columns or jd_column not in df.columns:
                return {
                    "success": False,
                    "error": f"Required columns not found. Available: {list(df.columns)}"
                }
            
            # Apply limit if specified
            if limit:
                df = df.head(limit)
            
            total_jobs = len(df)
            print(f"📊 Total jobs to process: {total_jobs}")
            print(f"📦 Batch size: {batch_size}")
            print(f"👷 Max workers per batch: {max_workers}")
            
            # Prepare job data
            job_data = [(str(row[id_column]), row[jd_column]) 
                        for _, row in df.iterrows()]
            
            # Process in batches
            all_summaries = []
            successful_summaries = []
            failed_jobs = []
            
            for batch_num in range(0, total_jobs, batch_size):
                batch_end = min(batch_num + batch_size, total_jobs)
                batch = job_data[batch_num:batch_end]
                
                print(f"\n🔄 Processing batch {batch_num//batch_size + 1} " +
                      f"(jobs {batch_num + 1} to {batch_end} of {total_jobs})")
                
                batch_results = self.process_batch_concurrent(batch, max_workers)
                
                for result in batch_results:
                    all_summaries.append(result)
                    if result["success"] and result["summary"]:
                        successful_summaries.append(result["summary"])
                    else:
                        failed_jobs.append({
                            "job_id": result["job_id"],
                            "error": result.get("error", "Unknown error")
                        })
                
                # Progress update
                print(f"📈 Progress: {len(all_summaries)}/{total_jobs} jobs processed")
            
            # Save successful summaries to JSON file
            with open(output_file, 'w', encoding='utf-8') as f:
                json.dump(successful_summaries, f, indent=2, ensure_ascii=False)
            
            elapsed_time = time.time() - start_time
            
            # Summary statistics
            summary_stats = {
                "success": True,
                "total_jobs": total_jobs,
                "successful": len(successful_summaries),
                "failed": len(failed_jobs),
                "output_file": output_file,
                "elapsed_time": f"{elapsed_time:.2f} seconds",
                "failed_jobs": failed_jobs[:10] if failed_jobs else []  # Show first 10 failures
            }
            
            print("\n" + "=" * 60)
            print("📊 PROCESSING COMPLETE")
            print("=" * 60)
            print(f"✅ Successful: {summary_stats['successful']}")
            print(f"❌ Failed: {summary_stats['failed']}")
            print(f"💾 Output saved to: {output_file}")
            print(f"⏱️  Total time: {summary_stats['elapsed_time']}")
            
            if failed_jobs:
                print(f"\n⚠️  First few failed jobs:")
                for job in failed_jobs[:5]:
                    print(f"   - Job ID {job['job_id']}: {job['error']}")
            
            return summary_stats
            
        except FileNotFoundError:
            return {
                "success": False,
                "error": f"Parquet file not found: {parquet_file_path}"
            }
        except Exception as e:
            return {
                "success": False,
                "error": f"Error processing parquet file: {str(e)}"
            }

def print_job_summary(summary_result: dict):
    """Pretty print the job summary result in JSON format"""
    
    if not summary_result["success"]:
        print(f"❌ Error for Job ID {summary_result.get('job_id', 'Unknown')}: {summary_result['error']}")
        return
    
    print(f"📋 JOB DESCRIPTION SUMMARY - ID: {summary_result.get('job_id', 'Unknown')}")
    print("=" * 60)
    
    summary = summary_result["summary"]
    
    # Print JSON in a readable format
    print("📄 JSON OUTPUT:")
    print("-" * 30)
    
    # Add job_id to the summary for complete JSON
    complete_summary = {"job_id": summary_result.get('job_id', 'Unknown')}
    complete_summary.update(summary)
    
    # Pretty print JSON
    print(json.dumps(complete_summary, indent=2, ensure_ascii=False))
    
    print("\n" + "=" * 60)

In [None]:
if __name__ == "__main__":
    # Initialize the summarizer
    summarizer = JobDescriptionSummarizer()
    
    print("=" * 60)
    print("JOB DESCRIPTION SUMMARIZER - CONCURRENT BATCH PROCESSING")
    print("=" * 60)
    
    # Configuration
    parquet_file_path = "Data_Engineer-20250727.parquet"  # Your parquet file path
    output_file = "jobs_summaries.json"  # Output JSON file
    batch_size = 50  # Process 50 jobs per batch
    max_workers = 5  # Number of concurrent workers per batch
    
    # Process all jobs from the parquet file
    result = summarizer.summarize_all_from_parquet(
        parquet_file_path=parquet_file_path,
        output_file=output_file,
        batch_size=batch_size,
        max_workers=max_workers,
        id_column="job_posting_id",    # Change if your ID column has different name
        jd_column="job_summary",        # Change if your JD column has different name
        limit=None  # Set to a number to limit total jobs processed (useful for testing)
    )
    
    # Print final summary
    if result["success"]:
        print(f"\n✅ Successfully processed {result['successful']} job descriptions!")
        print(f"📄 Results saved to: {result['output_file']}")
    else:
        print(f"\n❌ Error: {result['error']}")
    
    print("\n" + "=" * 60)
    print("💡 USAGE TIPS:")
    print("=" * 60)
    print("1. Update 'parquet_file_path' with your actual parquet file path")
    print("2. Adjust 'batch_size' to control how many jobs are processed together")
    print("3. Adjust 'max_workers' to control concurrent processing (be mindful of API rate limits)")
    print("4. Set 'limit' parameter to test with a smaller subset first")
    print("5. Update column names if they're different from defaults")
    print("=" * 60)