#### Notebook pull command
```bash
kaggle kernels pull asaniczka/ner-using-gemini -p /media/asaniczka/working/Projects/scrapers/linkedin_job_scraper/kaggle_gemini -m
```

#### Notebook push commnad
```bash
kaggle kernels push -p /media/asaniczka/working/Projects/scrapers/linkedin_job_scraper/kaggle_gemini
```

## 🌟🌟 **BIG STUFF** 🌟🌟

This notebook will demonstrate how to use Google's GEMINI as a Named Entity Recognition (NER) Tool. 💫✨

The goal is to take a job description like this: 📝

```
Recruiting from Scratch is a premier talent firm that focuses on placing the best product managers, software, and hardware talent at innovative companies. Our team is 100% remote and we work with teams across the United States to help them hire. We work with companies funded by the best … Experience with agile engineering practices such as TDD, Pair Programming, Continuous Integration, automated testing, and deployment Experience with building stream-processing systems, using solutions such as Kafka, Storm or Spark-Streaming Experience with dimensional data modeling and schema design in Data Warehouses Familiar with ETL (managing high-quality reliable ETL pipelines) Be familiar with legal compliance (with data management tools) data classification, and retention Location : This role will be located in office twice a week minimum in Palo Alto, San Francisco or Chicago. Salary Range: $180,000 - 250,000 USD base.
```

and get a list of skills mentioned in the description like this: 🔍

```python
skills = ['Python', 'Snowflake', 'Airflow', 'ETL', 'Kubernetes', 'Docker', 'Helm', 'Spark', 'pySpark', 'SQL', 'Kafka', 'Storm']
```

In [1]:
import logging
import datetime
import os
import time
from typing import Optional, Union
import json
import re
import concurrent.futures
import csv
import subprocess

import pandas as pd
import pytz

import google.generativeai as genai
from kaggle_secrets import UserSecretsClient
user_secrets = UserSecretsClient()
api_key = user_secrets.get_secret("GEMINI_API_KEY")
genai.configure(api_key=api_key)

In [2]:
kaggle_apikey = user_secrets.get_secret("kaggle_apikey")
kaggle_username = user_secrets.get_secret("kaggle_username")

os.environ['KAGGLE_USERNAME'] = kaggle_username
os.environ['KAGGLE_KEY'] = kaggle_apikey

In [3]:
RAW_CSV_PATH = '/kaggle/input/linkedin-data-engineer-job-postings/postings.csv'
PROJECT_NAME = 'linkedin_job_ner'

In [4]:
# GLOBAL VARS
LOGGER = None
PROJECT_FOLDER = None
DATA_FOLDER = None
TEMP_FOLDER = None
LOG_FOLDER = None
LOG_FILE_PATH = None
START_TIME = None
DATE_TODAY = datetime.datetime.now(pytz.utc).date()

## Set up a logger


In [5]:
def setup_logger() -> None:
    """
    Set up a logger
    """
    # pylint: disable=global-statement
    global LOGGER

    LOGGER = logging.getLogger(__name__)
    LOGGER.setLevel(logging.DEBUG)  # set the logging level to debug

    log_format = logging.Formatter(
        '%(asctime)s :   %(levelname)s   :   %(message)s')

    # init the console logger
    stream_handler = logging.StreamHandler()
    stream_handler.setLevel(logging.INFO)
    stream_handler.setFormatter(log_format)  # add the format
    LOGGER.addHandler(stream_handler)

    # init the file logger
    file_handler = logging.FileHandler(LOG_FILE_PATH)
    file_handler.setLevel(logging.DEBUG)
    file_handler.setFormatter(log_format)  # add the format
    LOGGER.addHandler(file_handler)

## Sets up the project folder and creates log, data, and temp folders.

Also creates the path to the log file.


In [6]:
def setup_basic_file_paths() -> None:
    """
    Sets up the project folder and creates log, data, and temp folders.
    Also creates the path to the log file.
    """
    # pylint: disable=global-statement
    global PROJECT_FOLDER, DATA_FOLDER, TEMP_FOLDER, LOG_FOLDER, LOG_FILE_PATH

    # create the project folder
    cwd = os.getcwd()
    PROJECT_FOLDER = os.path.join(cwd, PROJECT_NAME)
    os.makedirs(PROJECT_FOLDER, exist_ok=True)

    # create the data folder
    DATA_FOLDER = os.path.join(PROJECT_FOLDER, 'data')
    os.makedirs(DATA_FOLDER, exist_ok=True)

    # make the temp folder
    TEMP_FOLDER = os.path.join(PROJECT_FOLDER, 'temp')
    os.makedirs(TEMP_FOLDER, exist_ok=True)

    # make the log folder and log file path
    LOG_FOLDER = os.path.join(PROJECT_FOLDER, 'logs')
    os.makedirs(LOG_FOLDER, exist_ok=True)
    LOG_FILE_PATH = os.path.join(LOG_FOLDER, f'{PROJECT_NAME}.log')

## Calculates how long the script has been running


In [7]:
def calc_elapsed_time() -> float:
    """calculates how long the script has been running and return time in seconds"""

    end_time = time.time()
    elapsed_time = float(end_time - START_TIME)

    return elapsed_time

## Removes newlines from the given error string.


In [8]:
def format_error(error: str) -> str:
    """
    Removes newlines from the given error string.
    """

    error_type = str(type(error))
    error = str(error).replace('\n', '')

    formatted_error = f'Error Type: {error_type}, Error: {error}'

    return formatted_error

## 🌟 Function responsible for NER via Gemini

Here, first I'll setup a conversation type message template where the user instructs GEMINI. 🚀

> 💡
>
> Gemini API has a rolling window ratelimit where you can burst 60 requests in a few seconds, and then you have to sleep until the next quota set is available. ⏰

To achieve this, I'll be using 3 threads to simultaneously process job summaries.

This will: 💡

1. 🚀 Make sure that I'm always hitting the ratelimit and not staying idle for a gemini response 
2. ⏰ And once the ratelimit is reached, I'll sleep in 10 sec intervals until new quota is available 


In [9]:
def extract_ner(idx: int, total: int, summary: str) -> tuple[int, str] | None:
    """Function responsible for NER via Gemini"""

    LOGGER.info(f'Working on {idx+1}/{total}')
    if len(summary) < 100:
        return None

    model = genai.GenerativeModel(model_name='gemini-pro')

    input_message = f"USER: Imagine you're an NER AI model. \
                    You task is to extract techinical skills, frameworks, languages, softwares, and concepts that are found in the given job posting. \
                    You are allowed to change the names of skills and software to be standard and meaningful.\
                    Make a single list. \
                    The goal is so that the users will get a overview of the skills they need to have. \
                    Do not write sentences, only 1-3 word entities. \
                    Format your response as a list.\n\
                    USER: Here is the posting: ```{summary}```\n\
                    AI: "

    retries = 0
    gemini_message = None
    while retries < 10:
        try:
            gemini_response = model.generate_content(input_message)
            gemini_message = gemini_response.text
            break
        except Exception as error:
            if '429' in str(error):
                LOGGER.warning(f'ratelimit on {idx}. Sleeping 10')
                time.sleep(10)
                retries += 1
                continue
                
            LOGGER.error(f'{format_error(error)} on {idx} via model.generate_content.')
            break    

    if not gemini_message:
        return None
    
    skills = gemini_message.strip().split('\n')
    extracted_skills = []
    for skill in skills:
        if len(skill) > 3:
            skill = skill.replace('-', '').replace(',', '').strip()
            extracted_skills.append(skill)

    extracted_skills_str = ', '.join(extracted_skills)

    return (idx, extracted_skills_str)

## 🎯 Function responsible for handling adding NER to data

It takes in a Pandas DataFrame called `original_df` as input and returns the processed DataFrame `complete_df` as output.

#### Steps:

1. 📝 Create a mini DataFrame called `mini_df` which includes only the columns 'job_link' and 'job_summary'. A column called 'job_skills' is also added and initialized to `None`.
2. 🔮 Use the `concurrent.futures.ThreadPoolExecutor` to process the rows of `mini_df` concurrently with a maximum of 3 worker threads.
3. ⚡ For each row in `mini_df`, submit a job to the thread executor by calling the function `extract_ner` with the parameters `idx`, `len(mini_df)`, and `row['job_summary']`. The returned `future` objects are stored in the `futures` list.
4. 🔄 Iterate through the `futures` list and wait for each job to complete using `concurrent.futures.as_completed`.
5. 📝 Extract the result from each completed job and update the 'job_skills' column in `mini_df` for the corresponding row.
6. ⚙️ Merge the processed data from `mini_df` with the original DataFrame (`original_df`) on the 'job_link' column. The resulting DataFrame is assigned to `complete_df`.
7. 🔁 Return the `complete_df` as the final output.


In [10]:
mini_df = None

In [11]:
def handle_adding_job_skills(original_df: pd.DataFrame) -> pd.DataFrame:
    """Function responsible for handling jobs"""
    global mini_df

    # create a mini df to use as a working df
    mini_df = original_df.loc[:, ['job_link', 'job_summary']]
    mini_df['job_skills'] = None

    futures = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as thread_executor:
        for idx, row in mini_df.iterrows():
            future = thread_executor.submit(
                extract_ner, idx, len(mini_df), row['job_summary'])
            futures.append(future)

        # handle the results of the futures
        for future in concurrent.futures.as_completed(futures):
            try:
                result = future.result()

                if result:
                    idx, skills = result
                    mini_df.loc[idx, 'job_skills'] = skills

            except Exception as error:
                LOGGER.critical(
                    f'Error when handlig a single job: {format_error(error)}')

    # merge the mini_df with the original_df
    mini_df.drop('job_summary', axis=1, inplace=True)
    complete_df = original_df.merge(mini_df, on='job_link', how='left')

    return complete_df

In [12]:
def upload_to_kaggle() -> None:
    """Upload the dataset to kaggle"""
    
    metadata = {
        "id":"asaniczka/linkedin-data-engineer-job-postings"
    }
    
    # save the metadata file of the dataset
    metadata_file_path = os.path.join(DATA_FOLDER,'dataset-metadata.json')
    with open(metadata_file_path,'w',encoding='utf-8') as metadata_file:
        json.dump(metadata,metadata_file)
        
    # upload our file to kaggle
    retries = 0
    while retries < 5:
        try:
            command = f"kaggle datasets version -p '{DATA_FOLDER}' -m 'added NER via gemini'"
            subprocess.run(command,shell=True,check=True)
            break
        except Exception as error:
            LOGGER.error(f"Error on upload_to_kaggle(): {error=}")
            time.sleep(5)
            retries+=1

## 🎯 Main executor for this notebook

💾 This will create a log statement for each processed job posting. Since there's 17K job postings, output cell will contains 17K rows

⏳ It might look like crap, but when a kaggle notebook is running, TQDM progress bars don't show up on logs. So this is the simple solution I came up with


In [13]:
def executor() -> None:
    """Main executor for this module"""
    # pylint: disable=global-statement
    global START_TIME

    START_TIME = time.time()
    setup_basic_file_paths()
    setup_logger()

    df = pd.read_csv(RAW_CSV_PATH)
    #sample_df = df.iloc[:150,:]

    completed_df = handle_adding_job_skills(df)
    
    dataset_path = os.path.join(DATA_FOLDER,'postings.csv')
    completed_df.to_csv(dataset_path,
                        index=False, quoting=csv.QUOTE_ALL)
    
    upload_to_kaggle()

    time_to_complete = calc_elapsed_time()
    LOGGER.info(f"Completed script in {time_to_complete/60:.2f} mins")

In [14]:
# Not running since the data is already extracted
should_run = False

if should_run:
    executor()