#### This is the colab file which I was using because of the GPU constraint.

In [None]:
!pip install fastapi uvicorn pydantic requests beautifulsoup4 transformers sentence-transformers numpy torch nest_asyncio pyngrok apify

In [None]:
import json
import torch
from transformers import AutoTokenizer, pipeline
import logging
from pydantic import BaseModel
from apify_client import ApifyClient
import nest_asyncio
import uvicorn
from fastapi import FastAPI, HTTPException
from pyngrok import ngrok
from typing import List, Optional
import re



In [None]:
from google.colab import userdata
NGROK_TOKEN = userdata.get('ngrok')
APIFY_TOKEN = userdata.get('apify')
HF_TOKEN = userdata.get("HF_TOKEN")
LINKEDIN_ACTOR_ID = userdata.get("linkedin")
GLASSDOOR_ACTOR_ID = userdata.get("glassdoor")
INDEED_ACTOR_ID = userdata.get("indeed")

In [None]:
from huggingface_hub import login
login(token=HF_TOKEN)


In [None]:
ngrok.set_auth_token(NGROK_TOKEN)

In [None]:
# Configure logging
logging.basicConfig(
    format="%(asctime)s - %(levelname)s - %(message)s", level=logging.INFO
)

def get_logger(name):
    return logging.getLogger("app.log")

In [None]:
logger = get_logger(__name__)

In [None]:
class JobSearchRequest(BaseModel):
    position: str
    experience: str
    salary: str
    jobNature: str
    location: str
    Country: Optional[str] = ""
    City: Optional[str] = ""
    skills: str
    companyName: Optional[List[str]] = []  # Optional dynamic input
    companyId: Optional[List[str]] = []  # Optional dynamic input
    publishedAt: Optional[str] = ""  # Optional dynamic input


# The rest of your models remain unchanged.
class JobDetail(BaseModel):
    job_title: str
    company: str
    experience: str
    jobNature: str
    location: str
    salary: str
    apply_link: str
    similarity: float = 0.0


class JobSearchResponse(BaseModel):
    relevant_jobs: List[JobDetail]


In [None]:
class LLMHelper:
    def __init__(self):
        logger.info("Loading Gemma 2 model for text generation...")
        print("Loading Gemma 2 model for text generation...")
        model_id = "google/gemma-2-2b-it"
        self.pipe = pipeline(
            "text-generation",
            model=model_id,
            model_kwargs={"torch_dtype": torch.bfloat16},
            device="cuda" if torch.cuda.is_available() else "cpu",
        )
        self.tokenizer = AutoTokenizer.from_pretrained(model_id)
        logger.info("Gemma 2 model loaded successfully.")
        print("Gemma 2 model loaded successfully.")

    def format_search_keywords(self, search_request: JobSearchRequest):
        print(search_request)
        prompt = f"""
Extract and format the relevant keywords from the following job search request. Keep location but also add Country and City.
Return a JSON object with keys: "position", "experience", "location", "country", "city" and "skills".

Input:
Position: {search_request.position}
Experience: {search_request.experience}
Salary: {search_request.salary}
Job Nature: {search_request.jobNature}
Location: {search_request.location}
Skills: {search_request.skills}

NOTE: Only Provide JSON OUTPUT
"""
        logger.info("Generating formatted keywords using Gemma 2...")
        print("Generating formatted keywords using Gemma 2...")
        try:
            outputs = self.pipe(prompt, max_new_tokens=100)
            gen_text = outputs[0]["generated_text"][len(prompt) :]
            logger.info(f"Raw Gemma 2 output: {gen_text}")
            print(f"Raw Gemma 2 output: {gen_text}")
            # Try to extract the JSON part from the generated text
            start = gen_text.find("{")
            end = gen_text.rfind("}") + 1
            json_text = gen_text[start:end]
            keywords = json.loads(json_text)
            logger.info(f"Formatted Keywords: {keywords}")
            print(f"Formatted Keywords: {keywords}")
        except Exception as e:
            logger.warning(
                f"Failed to parse JSON response from Gemma 2, using fallback values: {e}"
            )
            print(
                f"Failed to parse JSON response from Gemma 2, using fallback values: {e}"
            )
            keywords = {
                "position": search_request.position,
                "experience": search_request.experience,
                "location": search_request.location,
                "skills": search_request.skills,
                "salary": search_request.salary,
                "jobNature": search_request.jobNature,
            }
        return keywords


In [None]:
from sentence_transformers import SentenceTransformer, util
import torch

class LLMRanker:
    def __init__(self):
        logger.info("Loading embedding model for ranking...")
        # Load a SentenceTransformer model for generating embeddings.
        self.embedding_model = SentenceTransformer("all-MiniLM-L6-v2")
        logger.info("Embedding model loaded successfully.")

    def rank_jobs(self, jobs, search_keywords):
        # Build the query string from the search keywords.
        query_text = f"{search_keywords.get('position', '')} {search_keywords.get('skills', '')} {search_keywords.get('experience', '')} {search_keywords.get('location', '')}"
        logger.info(f"Ranking jobs using query: {query_text}")
        print(f"Ranking jobs using query: {query_text}")

        # Generate the embedding for the query.
        query_embedding = self.embedding_model.encode(query_text, convert_to_tensor=True)

        # Prepare a list to hold job descriptions.
        job_texts = []
        for job in jobs:
            job_text = (
                f"{job['job_title']} at {job['company']} in {job['location']}. "
                f"Experience required: {job['experience']}, Job Nature: {job['jobNature']}, Salary: {job['salary']}."
            )
            job_texts.append(job_text)

        # Generate embeddings for all job texts.
        job_embeddings = self.embedding_model.encode(job_texts, convert_to_tensor=True)

        # Compute cosine similarities between the query and each job.
        cosine_scores = util.cos_sim(query_embedding, job_embeddings)[0]

        # Assign similarity scores to each job (scaled between 0 and 100).
        for idx, job in enumerate(jobs):
            # Convert cosine similarity (range -1 to 1) to a positive scale.
            # Since our embeddings typically yield positive similarities, a simple scaling works.
            job["similarity"] = float(cosine_scores[idx].item() * 100)

        # Sort jobs by similarity in descending order.
        ranked_jobs = sorted(jobs, key=lambda x: x["similarity"], reverse=True)
        logger.info("Job ranking via embeddings completed.")
        print("Job ranking via embeddings completed.")
        return ranked_jobs


In [None]:
client = ApifyClient(APIFY_TOKEN)
logger.info("Apify Client initialized.")
print("Apify Client initialized.")

In [None]:
def map_experience_level(exp_str):
    """
    Extracts the first number from exp_str and maps it to an allowed string value ("1" to "5").
    If the number is less than 1, returns "1". If greater than 5, returns "5".
    """
    match = re.search(r'(\d+(\.\d+)?)', exp_str)
    if match:
        exp = float(match.group(1))
        level = int(exp)
        if level < 1:
            level = 1
        elif level > 5:
            level = 5
        return str(level)
    else:
        return "1"  # Default if no number is found

def map_experience_level_indeed(exp_str: str) -> str:
    """
    Maps the experience value from the search request to an Indeed experience level.
    - 1 to 2 years: "entryLevel"
    - Greater than 2 up to 6 years: "midLevel"
    - Greater than 6 years: "seniorLevel"
    """
    import re
    match = re.search(r'(\d+(\.\d+)?)', exp_str)
    if match:
        exp = float(match.group(1))
        if exp <= 2:
            return "entryLevel"
        elif exp <= 6:
            return "midLevel"
        else:
            return "seniorLevel"
    else:
        return "entryLevel"  # Default if no number is found

In [None]:
class JobScraper:
    def __init__(self, search_criteria: dict):
        self.search_criteria = search_criteria

    def fetch_linkedin_jobs(self):
        """
        Use the Apify Actor with dynamic inputs from the search_request to fetch LinkedIn jobs.
        Now includes jobNature and experienceLevel in the run_input.
        """
        print(self.search_criteria)
        # Extract the first number from the experience string and convert to string.
        experience_level = map_experience_level(
            self.search_criteria.get("experience", "1")
        )
        logger.info("Extracted experience level: %s", experience_level)
        print("Extracted experience level:", experience_level)

        # Build the dynamic input using values from the search request.
        run_input = {
            "title": self.search_criteria.get("position", ""),
            "location": self.search_criteria.get("location", ""),
            "companyName": self.search_criteria.get("companyName", []),
            "companyId": self.search_criteria.get("companyId", []),
            "jobNature": self.search_criteria.get("jobNature", ""),
            "experienceLevel": experience_level,
            "rows": 10,
            "proxy": {
                "useApifyProxy": True,
                "apifyProxyGroups": ["RESIDENTIAL"],
            },
        }
        logger.info(
            "Fetching LinkedIn jobs using Apify Actor with input: %s", run_input
        )
        print("Fetching LinkedIn jobs using Apify Actor with input:", run_input)
        try:
            run = client.actor(LINKEDIN_ACTOR_ID).call(run_input=run_input)
            dataset_id = run["defaultDatasetId"]
            jobs = []
            for item in client.dataset(dataset_id).iterate_items():
                job = {
                    "job_title": item.get("title", "Unknown Title"),
                    "company": item.get("companyName", "Unknown Company"),
                    "experience": self.search_criteria.get(
                        "experience", "Not Provided"
                    ),
                    "jobNature": self.search_criteria.get("jobNature", "Not Provided"),
                    "location": item.get(
                        "location", self.search_criteria.get("location", "")
                    ),
                    "salary": item.get("salary", "Not Provided"),
                    "apply_link": item.get("jobUrl", "No Link"),
                }
                jobs.append(job)
            logger.info("Fetched %d jobs from LinkedIn.", len(jobs))
            print("Fetched %d jobs from LinkedIn." % len(jobs))
            return jobs
        except Exception as e:
            logger.error("Error fetching LinkedIn jobs: %s", e)
            print("Error fetching LinkedIn jobs:", e)
            raise HTTPException(
                status_code=500, detail="Failed to fetch LinkedIn jobs."
            )

    def fetch_indeed_jobs(self):
        """
        Use the Apify Actor with dynamic inputs from the search_request to fetch Indeed jobs.
        The run_input is built dynamically using values from the search criteria.
        """
        experience_level = map_experience_level_indeed(
            self.search_criteria.get("experience", "1")
        )

        # Build the dynamic run input for Indeed.
        run_input = {
            "job": [self.search_criteria.get("position", "")],
            "country": self.search_criteria.get("country", "united states").lower(),
            "experienceLevel": experience_level,
            "sortType": self.search_criteria.get("jobNature", None),
            "city": self.search_criteria.get("city", ""),
            "numberOfResults": 5,
            "proxy": {"useApifyProxy": True},
        }
        logger.info("Fetching Indeed jobs using Apify Actor with input: %s", run_input)
        print("Fetching Indeed jobs using Apify Actor with input:", run_input)
        try:
            run = client.actor(INDEED_ACTOR_ID).call(run_input=run_input)
            dataset_id = run["defaultDatasetId"]
            jobs = []
            for item in client.dataset(dataset_id).iterate_items():
                job = {
                    "job_title": item.get("title", "Unknown Title"),
                    "company": item.get("companyName", "Unknown Company"),
                    "experience": item.get("experience", "Not Provided"),
                    "jobNature": item.get("jobNature", "Not Provided"),
                    "location": item.get(
                        "location", self.search_criteria.get("location", "")
                    ),
                    "salary": item.get("salary", "Not Provided"),
                    "apply_link": item.get("jobMetadata", {}).get("href", "No Link"),
                }
                jobs.append(job)
            logger.info("Fetched %d jobs from Indeed.", len(jobs))
            print("Fetched %d jobs from Indeed." % len(jobs))
            return jobs
        except Exception as e:
            logger.error("Error fetching Indeed jobs: %s", e)
            print("Error fetching Indeed jobs:", e)
            return []

    def fetch_glassdoor_jobs(self):
        """
        Use the Apify Actor with dynamic inputs from the search_request to fetch Glassdoor jobs.
        The run_input is built dynamically based on the search criteria.
        """
        try:
            salary_value = float(self.search_criteria.get("salary", 0))
        except Exception:
            salary_value = None

        run_input = {
            "keyword": f"{self.search_criteria.get('position', '')} {self.search_criteria.get('skills', '')}".strip(),
            "maxItems": 5,
            "location": self.search_criteria.get("location", ""),
            "includeNoSalaryJob": True,
            "minSalary": salary_value,
            "maxSalary": salary_value,
            "fromAge": "30",
            "proxy": {"useApifyProxy": True},
        }
        logger.info(
            "Fetching Glassdoor jobs using Apify Actor with input: %s", run_input
        )
        print("Fetching Glassdoor jobs using Apify Actor with input:", run_input)
        try:
            run = client.actor(GLASSDOOR_ACTOR_ID).call(run_input=run_input)
            dataset_id = run["defaultDatasetId"]
            jobs = []
            for item in client.dataset(dataset_id).iterate_items():
                job = {
                    "job_title": item.get("job_title", "Unknown Title"),
                    "company": item.get("company_name", "Unknown Company"),
                    "experience": self.search_criteria.get(
                        "experience", "Not Provided"
                    ),
                    "jobNature": (
                        ", ".join(item.get("job_job_types", []))
                        if item.get("job_job_types")
                        else "Not Provided"
                    ),
                    "location": item.get("job_location", {}).get(
                        "unknown", self.search_criteria.get("location", "")
                    ),
                    "salary": (
                        (
                            f"{item.get('job_salary', {}).get('currency_symbol', '$')}"
                            f"{item.get('job_salary', {}).get('min', 'Not Provided')}"
                        )
                        if item.get("job_salary")
                        else "Not Provided"
                    ),
                    "apply_link": item.get("job_apply_url", "No Link"),
                }
                jobs.append(job)
            logger.info("Fetched %d jobs from Glassdoor.", len(jobs))
            print("Fetched %d jobs from Glassdoor." % len(jobs))
            return jobs
        except Exception as e:
            logger.error("Error fetching Glassdoor jobs: %s", e)
            print("Error fetching Glassdoor jobs:", e)
            return []


In [None]:
app = FastAPI(title="Job Finder API")

@app.post("/search", response_model=JobSearchResponse)
async def search_jobs(search_request: JobSearchRequest):
    try:
        logger.info("Received a job search request.")
        print("Received a job search request.")

        # Step 1: Format search keywords using the local LLM
        llm_helper = LLMHelper()
        formatted_keywords = llm_helper.format_search_keywords(search_request)

        logger.info(f"Formatted Keywords: {formatted_keywords}")
        print(f"Formatted Keywords: {formatted_keywords}")

        # Step 2: Fetch job listings using Apify API via the JobScraper
        scraper = JobScraper(formatted_keywords)

        # You can enable indeed_jobs if needed
        indeed_jobs = scraper.fetch_indeed_jobs()
        linkedin_jobs = scraper.fetch_linkedin_jobs()
        glassdoor_jobs = scraper.fetch_glassdoor_jobs()

        # For this example, we use only LinkedIn jobs
        all_jobs = linkedin_jobs + indeed_jobs + glassdoor_jobs
        logger.info(f"Total jobs fetched: {len(all_jobs)}")
        print(f"Total jobs fetched: {len(all_jobs)}")

        if not all_jobs:
            logger.warning("No job listings found.")
            print("No job listings found.")
            return {"relevant_jobs": []}

        # Step 3: Rank jobs based on relevance
        ranker = LLMRanker()
        ranked_jobs = ranker.rank_jobs(all_jobs, formatted_keywords)
        if ranked_jobs:
            logger.info(f"Ranking complete. Top job title: {ranked_jobs[0]['job_title']}")
            print(f"Ranking complete. Top job title: {ranked_jobs[0]['job_title']}")
        else:
            logger.info("No relevant jobs found after ranking.")
            print("No relevant jobs found after ranking.")


        # Step 4: Return the top result
        top_jobs = ranked_jobs[:10]  # Return only the best match
        job_details = [JobDetail(**job) for job in top_jobs]

        logger.info(f"Returning {len(job_details)} best job(s).")
        print(f"Returning {len(job_details)} best job(s).")
        return {"relevant_jobs": job_details}

    except Exception as e:
        logger.error(f"Error occurred: {str(e)}", exc_info=True)
        print(f"Error occurred: {str(e)}")
        raise HTTPException(status_code=500, detail=str(e))

# ----------------------------
# Main: Run FastAPI with Ngrok in Colab
# ----------------------------
if __name__ == "__main__":
    # Allow nested event loops (required in Colab)
    nest_asyncio.apply()

    # Set the port number for uvicorn
    port = 8000

    # Open an ngrok tunnel to the specified port
    public_url = ngrok.connect(port).public_url
    logger.info(f"Ngrok tunnel available at: {public_url}")
    print(f" * ngrok tunnel available at: {public_url}")

    # Run the FastAPI app with uvicorn
    uvicorn.run(app, host="0.0.0.0", port=port)