In [29]:
import urllib.parse
import gradio as gr  # gradio to build the web app
import requests      # to fetch web pages
from bs4 import BeautifulSoup  # to parse html
import pandas as pd  # to handle data tables
from datetime import datetime  # to add dates
import time          # for adding delays in requests

# flair tools for AI skill detection
from flair.models import SequenceTagger
from flair.data import Sentence

import threading      # for smooth background tasks
import os             # to save files
import random         # for random delays

# retry tools to handle web errors
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry


**LOADING MODAL**

In [30]:
flair_model = SequenceTagger.load("kaliani/flair-ner-skill")

2025-11-09 13:05:09,255 SequenceTagger predicts: Dictionary with 7 tags: O, S-SKILL, B-SKILL, E-SKILL, I-SKILL, <START>, <STOP>


In [31]:
# Map experience levels to LinkedIn URL codes
experience_level_mapping = {
    "Internship": "f_E=1",
    "Entry level": "f_E=2",
    "Associate": "f_E=3",
    "Mid-Senior level": "f_E=4"
}

# Map work type to LinkedIn URL codes
work_type_mapping = {
    "On-site": "f_WT=1",
    "Hybrid":  "f_WT=3",   # ← WAS 2
    "Remote":  "f_WT=2"    # ← WAS 3
}

# Map time filters to LinkedIn URL codes
time_filter_mapping = {
    "Past 24 hours": "f_TPR=r86400",
    "Past week": "f_TPR=r604800",
    "Past month": "f_TPR=r2592000"
}


In [32]:
# Define a function to find skills in a text description
def get_skills(text):
    # Turn the input text into Flair's Sentence object
    sentence = Sentence(text)

    # Use the Flair NER model to detect skills in the sentence
    flair_model.predict(sentence)

    # Extract the detected skills from the sentence
    skills = [entity.text for entity in sentence.get_spans("ner")]

    return skills

# Example usage:
description = "I have experience in Python, TensorFlow, and data analysis."
found_skills = get_skills(description)
print(found_skills)

['Python', 'data analysis', '.']


In [33]:
class ScraperManager:
    # Setup when class starts
    def __init__(self):
        # Flag to stop scraping
        self.stop_event = threading.Event()

        # Empty table for job data
        self.current_df = pd.DataFrame()

        # Lock to avoid data mix-ups
        self.lock = threading.Lock()

    # Reset for new scrape
    def reset(self):
        # Clear stop flag
        self.stop_event.clear()

        # Clear job table
        self.current_df = pd.DataFrame()

    # Add one job to table
    def add_job(self, job_data):
        # Lock to keep data safe
        with self.lock:
            # Make job into tiny table
            new_df = pd.DataFrame([job_data])

            # Add to main table
            self.current_df = pd.concat([self.current_df, new_df], ignore_index=True)

# -------------------------------
# Create manager instance
# -------------------------------
scraper_manager = ScraperManager()


In [34]:
#define function to save jobs
def save_csv(df, filename="jobs"):
    try:
        #make folder for files
        os.makedirs("saved_jobs", exist_ok=True)

        #set default name with time stamp
        if not filename:
            filename = f"jobs_{int(time.time())}"

        #build file path
        full_path = f"saved_jobs/{filename}.csv"

        #save table to csv
        df.to_csv(full_path, index=False)

        #Confirm save worked
        return f"Saved to {full_path}"

    except Exception as e:
        #show error if save fails
        return f"Save error: {str(e)}"


**Proecss JOB FUNCTION**

In [35]:
def process_job(job, work_type, exp_level, position):
    try:
        # find job title
        title_element = job.find('h3', class_='base-search-card__title')
        # find company name
        company_element = job.find('a', class_='hidden-nested-link')
        # find location
        loc_element = job.find('span', class_='job-search-card__location')
        # find job link
        link_element = job.find('a', class_='base-card__full-link')

        # check all data exists
        if not all([title_element, company_element, loc_element, link_element]):
            return None

        # clean title text
        title = title_element.text.strip()
        # clean company text
        company = company_element.text.strip()
        # clean location text
        loc = loc_element.text.strip()
        # clean link (remove extra bits)
        link = link_element['href'].split('?')[0]

        # setup web session with retries
        session = requests.Session()
        retries = Retry(total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504])
        session.mount('https://', HTTPAdapter(max_retries=retries))

        # make request to job link
        desc = "Description not available "
        # empty skills list
        skills = []

        try:
            time.sleep(random.uniform(2,5))
            response = session.get(
                link,
                headers={
                    'User-Agent': random.choice([
                        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)',
                        'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko)',
                        'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.96'
                    ]),
                    "Accept-Language": "en-us,en;q=0.9"
                },
                timeout=10
            )

            # parse job page
            job_soup = BeautifulSoup(response.text, 'html.parser')

            # list of places to find description
            description_selectors = [
                'div.description__text',
                'div.show-more-less-html__markup',
                'div.core-section-container__content',
                'section.core-section-container'
            ]

            # try each description spot
            for selector in description_selectors:
                desc_element = job_soup.select_one(selector)
                if desc_element:
                    # clean description text
                    desc = desc_element.get_text('\n').strip()
                    # find skills with AI
                    skills = get_skills(desc)
                    break

        except Exception as e:
            print(f"Error processing {link}: {str(e)}")
            # Still return what we have

        # return job details
        return {
            "Position": position,
            "Date": datetime.now().strftime('%Y-%m-%d'),
            "Work type": work_type,
            "Level": exp_level,
            "Title": title,
            "Company": company,
            "Location": loc,
            "Link": f"[{link}]({link})",
            "Description": desc,
            "Skills": ", ".join(skills[:5]) if skills else "No skills detected"
        }

    except Exception as e:
        # log error if job fails
        print(f"Error packing job card: {str(e)}")
        return None

**SCRAPE JOB FUNCTION******

In [36]:
# ===== SCRAPE JOBS (FIXED URL) =====
def scrape_jobs(location, position, work_types, exp_levels, time_filter):
    session = requests.Session()
    retries = Retry(total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504])
    session.mount('https://', HTTPAdapter(max_retries=retries))

    for work_type in work_types:
        for exp_level in exp_levels:
            if scraper_manager.stop_event.is_set():
                return

            try:
                # FIX 4: Proper URL encoding + safe params
                pos_enc = urllib.parse.quote(position)
                loc_enc = urllib.parse.quote(location)
                base_url = f"https://www.linkedin.com/jobs/search/?keywords={pos_enc}&location={loc_enc}"
                base_url += f"&{work_type_mapping[work_type]}" if work_type in work_type_mapping else ""
                base_url += f"&{experience_level_mapping[exp_level]}" if exp_level in experience_level_mapping else ""
                base_url += f"&{time_filter_mapping[time_filter]}" if time_filter in time_filter_mapping else ""
                base_url += "&sortBy=DD"

                # Get total
                try:
                    resp = session.get(base_url, timeout=15)
                    soup = BeautifulSoup(resp.text, 'html.parser')
                    total_el = soup.find('span', class_='results-context-header__job-count')
                    total_jobs = int(total_el.text.replace(',', '')) if total_el else 25
                except:
                    total_jobs = 25
                total_jobs = min(total_jobs, 50)

                for start in range(0, total_jobs, 25):
                    if scraper_manager.stop_event.is_set():
                        return
                    time.sleep(random.uniform(2, 4))
                    url = f"{base_url}&start={start}"
                    try:
                        resp = session.get(url, timeout=10)
                        soup = BeautifulSoup(resp.text, 'html.parser')
                        cards = soup.find_all('div', class_='base-card')
                        random.shuffle(cards)
                        for card in cards:
                            if scraper_manager.stop_event.is_set():
                                return
                            data = process_job(card, work_type, exp_level, position)
                            if data:
                                scraper_manager.add_job(data)
                                # FIX 5: Yield to update GUI
                                yield
                    except Exception as e:
                        print(f"Page failed: {e}")
            except Exception as e:
                print(f"Combo failed: {e}")

In [37]:
def run_scraper(cities, states, positions_input, work_types, exp_levels, time_filter):
    # Clean inputs
    locations = [f"{c.strip()}, {s.strip()}"
                 for c, s in zip(cities.split(','), states.split(','))
                 if c.strip() and s.strip()]
    positions = [p.strip() for p in positions_input.split(',') if p.strip()]

    # Reset
    scraper_manager.reset()

    # Worker thread
    def worker():
        for loc in locations:
            for pos in positions:
                if scraper_manager.stop_event.is_set():
                    return
                # Consume generator
                for _ in scrape_jobs(loc, pos, work_types, exp_levels, time_filter):
                    pass

    thread = threading.Thread(target=worker)
    thread.start()

    # LIVE UPDATE LOOP
    while thread.is_alive():
        time.sleep(0.5)
        with scraper_manager.lock:
            yield "scraping in progress ...", scraper_manager.current_df

    # FINAL
    status = "scraping completed" if not scraper_manager.stop_event.is_set() else "scraping stopped"
    yield status, scraper_manager.current_df

In [38]:
with gr.Blocks() as app:
    # Add title
    gr.Markdown("""
    <div style='text-align:center; color:#f67d3c; font-size:2em; font-weight:bold; margin:20px 0; padding:10px'>
        AI-Powered LinkedIn Scraper
    </div>
    """)

    with gr.Row():
        with gr.Column():
            # Input for cities
            cities = gr.Textbox(label="Cities (comma-separated)")

            # Input for states
            states = gr.Textbox(label="States / Countries (comma-separated)")

            # Input for positions
            positions = gr.Textbox(label="Positions (comma-separated)")

            # checkbox for work types
            work_types = gr.CheckboxGroup(
                list(work_type_mapping.keys()),
                label="Work Types"
            )

            # checkbox for experience levels
            exp_levels = gr.CheckboxGroup(
                list(experience_level_mapping.keys()),
                label="Experience Levels"
            )

            # drop down for time filter
            time_filter = gr.Dropdown(list(time_filter_mapping.keys()), label="Time Filter")

            # Buttons for start/stop
            with gr.Row():
                start_btn = gr.Button("Start Scraping", variant="primary")
                stop_btn = gr.Button("Stop Scraping", variant="secondary")

            # show status
            status = gr.Textbox(label="Status")

    # show job table results
    results = gr.DataFrame(
        headers=["Position","Date","Work Type","Level","Title","Company","Location","Skills","Link"],
        datatype=["str","str","str","str","str","str","str","str","str"],
        interactive=False
    )

    # save section
    with gr.Row():
        # Input for file name
        filename = gr.Textbox(label="Filename (optional)", placeholder="my_jobs")
        # save button
        save_btn = gr.Button("Save to CSV", variant="secondary")
        # show save status
        save_status = gr.Textbox(label="Save status")

    # === BUTTON CONNECTIONS (ALL AT SAME LEVEL) ===
    start_btn.click(
        run_scraper,
        inputs=[cities, states, positions, work_types, exp_levels, time_filter],
        outputs=[status, results]
    )
    stop_btn.click(
        lambda: scraper_manager.stop_event.set(),
        outputs=None
    )
    save_btn.click(
        save_csv,
        inputs=[results, filename],
        outputs=save_status
    )

# === LAUNCH APP (OUTSIDE BLOCK) ===
if __name__ == "__main__":
    app.launch()

It looks like you are running Gradio on a hosted Jupyter notebook, which requires `share=True`. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://1a796589a177109a83.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)
