# Green Job Detection and Normalization
This notebooks aims to **detect** (know if a job contains green skills or not) and normalize (map the green skills from the *.csv* to the `ESCO` taxonomy).

In [2]:
import pandas as pd
import numpy as np
import faiss
import json 
from openai import OpenAI
from dotenv import load_dotenv
import os
from tqdm import tqdm
import jsonschema
from pydantic import BaseModel
import threading
from threading import Lock

from warning_management import send_warning
from warning_management import send_error

load_dotenv()

True

Load indexes

In [8]:
index_green_skills, index_job_skills, id_to_job, id_to_skill = get_indexes_and_mappings()

a, b = (index_green_skills.ntotal, index_green_skills.d,)
c, d = (index_job_skills.ntotal, index_job_skills.d,)

print(f"Green skills index contains {a} vectors of dimension {b}.")
print(f"Job skills index contains {c} vectors of dimension {d}.")

Green skills index contains 2539 vectors of dimension 3072.
Job skills index contains 70318 vectors of dimension 3072.


In [2]:
df_jobs = pd.read_csv("../data/jan_to_apr_2025_with_languages_cleaned.csv")

## Normalization

## Hyperparameters
- `K_N`: number of nearest neighbors to retrieve from the index.
- `MIN_THRESHOLD`: minimum score threshold to consider an entry relevant.

In [3]:
# Modify this value in case of needing a different threshold for the 
# confidence of the green job detection
MIN_THRESHOLD = 0.1

# Modify this value in case of needing a different number of neighbors
K_N = 5

## Loading the indexes and mappings

In [4]:
# Load indexes
"""
@Returns:
    index_green_skills: FAISS index for green skills embeddings.
    index_job_skills: FAISS index for job skills embeddings.
    id_to_job: Dictionary mapping job IDs to job names.
    id_to_skill: Dictionary mapping skill IDs to skill names.
"""
def get_indexes_and_mappings() -> tuple[faiss.Index, faiss.Index, dict[str, str], dict[str, str]]:
    index_green_skills = faiss.read_index("../data/embeddings/esco_green_skills_text-embedding-3-large.index")
    index_job_skills = faiss.read_index("../data/embeddings/job_skills_embeddings.index")

    # Load mappings
    id_to_job = json.load(open("../data/mapping/id_to_job.json", "r"))
    id_to_skill = json.load(open("../data/mapping/id_to_skill.json", "r"))
    return index_green_skills, index_job_skills, id_to_job, id_to_skill


## Prompts
## Green Skill Classification Prompt
We will use the following prompt to detect and normalize green skills in job descriptions, it receives as arguments: 
- `k_closest`: a list of the `k` closest green skills from the `ESCO` taxonomy.
- `skill`: the skill extracted from the job description, and also the one we want to normalize.
- `job`: the name of the job.
It returns a list of dictionaries representing the messages to be sent to the model and the roles of each message.
* **system**: It is a system message that defines the role of the model.
* **user**: It is the user message that contains the actual prompt with the context and instructions.
* **developer**: It is the developer message that contains the instructions for the model's response format (in this case the pydantic class).

In [5]:
def get_green_skill_classification_prompt(k_closest: list[str], skill: str, job: str) -> list[dict]:
    system_prompt = """
You are an expert in identifying whether a skill can be mapped to a *green skill* in the ESCO taxonomy.
A green skill is defined as the abilities, values and attitudes needed to live in, develop and support
a society which reduces the impact of human activity on the environment.

Focus on environmental or sustainability-related aspects.
Ignore social or economic aspects unless they have a clear environmental connection.
"""
    
    user_prompt = f"""
Determine if the following skill can be semantically matched to one of the provided green skills from ESCO.

Skill to classify: "{skill}"
Job context: "{job}"

Closest ESCO green skills (formatted as 'Main Name': 'Alternative Name'):
"""

    for k in k_closest:
        user_prompt += f"- {k}\n"

    user_prompt += """
Decide whether the skill can be reasonably mapped to one of the ESCO green skills above
based on meaning and context.
"""

    developer_prompt = """
You must respond strictly following the response schema.
If the skill matches one of the ESCO green skills, return its 'Main Name'.
Otherwise, return 'No'.
"""

    return [
        {"role": "system", "content": system_prompt.strip()},
        {"role": "user", "content": user_prompt.strip()},
        {"role": "developer", "content": developer_prompt.strip()}
    ]

# Models for output parsing

## GreenSkillClassification
Model to parse the output of the green skill normalization.

In [6]:
class GreenSkillClassification(BaseModel):
    mapped_skill: str # The main name of the mapped ESCO green skill, or "No" if there is no match.

## Model
We will use the **gpt-4o** model from **OpenAI** for the normalization task.

In [7]:
# Load OpenAI key
OPENAI_KEY = os.getenv("OPENAI_KEY")

# Source: https://platform.openai.com/docs/guides/structured-outputs
# Function to run the model with output parsing
def run_model(prompt: list[str], client: OpenAI) -> str:
    response = client.responses.parse(
        model="gpt-4o",
        input=[
            {"role": msg["role"], "content": msg["content"]} for msg in prompt
        ],
        text_format=GreenSkillClassification,
    )
    return response.output_parsed


In [12]:
# Test run of the model
D, I = index_green_skills.search(np.array([index_job_skills.reconstruct(0)]), K_N)
k_closest = [f"\t{id_to_skill[str(idx)][0]}: {id_to_skill[str(idx)][1]}" for idx in I[0]]
name  = df_jobs[df_jobs["Job_ID"] == id_to_job[str(0)][0]]["Title"].values[0]
skill = id_to_job[str(0)][1]
prompt = get_green_skill_classification_prompt(k_closest, skill, name)
response = run_model(prompt, OpenAI(api_key=OPENAI_KEY))
print("Model response:", response)
print(type(response))
print(response.mapped_skill)

Model response: mapped_skill='No'
<class '__main__.GreenSkillClassification'>
No


## Prompt generator examples
An entry used to generate sample prompts to test the prompt generator function.

In [14]:
with open("../test/example_prompts.txt", "w") as f:
    index_green_skills, index_job_skills, id_to_job, id_to_skill = get_indexes_and_mappings()
    for i in range(0, 100):
        D, I = index_green_skills.search(np.array([index_job_skills.reconstruct(i)]), K_N)
    
        k_closest = [f"\t{id_to_skill[str(idx)][0]}: {id_to_skill[str(idx)][1]}" for idx in I[0]]
        name  = df_jobs[df_jobs["Job_ID"] == id_to_job[str(i)][0]]["Title"].values[0]
        
        skill = id_to_job[str(i)][1]
        prompt = get_green_skill_classification_prompt(k_closest, skill, name)
        f.write(str(prompt) + "\n\n")
    

## Normalization process
The following code cell shows how the normalization process is done for every job skill in the dataset. The steps are:
1. For every i-th job, get the top `K_N` nearest neighbors to the i-th job skill, comparing them with the `ESCO` green skill taxonomy using **cosine similarity** as distance metric.
2. If the **max score** among the `K_N` neighbors is greater or equal than `MIN_THRESHOLD`, then we consider that the job **may** contains green skills, and we proceed to normalize it using the prompt defined before and the language model.
3. We run the prompt, were we could get two types of answers:
   - The skill has been normalized to one of the `k` closest green skills, in that case it returns the **normalized skill**.
   - The skill could not be normalized to any of the `k` closest green skills, in that case it returns **No**.
4. We save the entry in a new dataframe, and also keep a set to avoid duplicates (i.e. the same job skill being normalized to the same green skill more than once, which would lead to counting it multiple times when analyzing the results).

In [None]:
def task(left : int, right : int, partition_id: int, global_set: set[str], save_every: int = 50):
    # df_new_dataset = pd.DataFrame(columns=["job_id", "job_skill", "skill_id" ,"esco_skill_name", "alternative_name", "prompt"])
    FILE_NAME = f"../data/green_skill_classification/green_skills_with_GPT-5_part_{partition_id}.csv"
    CLIENT = OpenAI(api_key = OPENAI_KEY)

    df_new_dataset = []

    index_green_skills, index_job_skills, id_to_job, id_to_skill = get_indexes_and_mappings()

    ctr = 0  
    for i in range(left, right): 
        try:
            D, I = index_green_skills.search(np.array([index_job_skills.reconstruct(i)]), K_N)
        except Exception as e:
            send_error(f"Searching embeddings at index {i}: {e}")
            continue

        try:
            if max(D[0]) < MIN_THRESHOLD:
                df_new_dataset.append({
                    "job_id": id_to_job[str(i)][0],
                    "job_skill": id_to_job[str(i)][1],
                    "skill_id": "No",
                    "esco_skill_name": "No",
                    "alternative_name": "No",
                    "prompt": "No"})
                continue
        except Exception as e:
            send_error(f"Threshold check or concatenation failed at index {i}: {e}")
            continue

        flag = False
        try:
            for score, idx in zip(D[0], I[0]):
                try:
                    name  = df_jobs[df_jobs["Job_ID"] == id_to_job[str(i)][0]]["Title"].values[0]
                    prompt = get_green_skill_classification_prompt([f"{id_to_skill[str(idx)][0]}: {id_to_skill[str(idx)][1]}" for idx in I[0]], id_to_job[str(i)][1], name)
                    response = run_model(prompt, CLIENT)

                    response.mapped_skill.replace("//", "").strip()
                except Exception as e:
                    df_new_dataset.append({
                        "job_id": id_to_job[str(i)][0],
                        "job_skill": id_to_job[str(i)][1],
                        "skill_id": "No",
                        "esco_skill_name": "No",
                        "alternative_name": "No",
                        "prompt": "No"})
                    continue

                try:
                    if response.mapped_skill != "No" and (id_to_job[str(i)][0], id_to_skill[str(idx)][0]) not in global_set:
                        new_prompt = str(prompt) + " | " + response.mapped_skill
                        new_prompt = new_prompt.replace("\n", " ").replace("\t", " ").strip()

                        df_new_dataset.append({
                            "job_id": id_to_job[str(i)][0],
                            "job_skill": id_to_job[str(i)][1],
                            "skill_id": list(id_to_skill.keys())[idx],
                            "esco_skill_name": id_to_skill[str(idx)][0],
                            "alternative_name": id_to_skill[str(idx)][1],
                            "prompt": new_prompt})
                        with lock:
                            global_set.add((id_to_job[str(i)][0], id_to_skill[str(idx)][0]))
                        flag = True
                        break
                except Exception as e:
                    send_error(f"Adding green skill at job index {i}, skill index {idx}: {e}")
                    continue
        except Exception as e:
            send_error(f"Iterating through scores at index {i}: {e}")
            continue

        try:
            if not flag and (id_to_job[str(i)][1], "No") not in global_set:
                df_new_dataset.append({
                    "job_id": id_to_job[str(i)][0],
                    "job_skill": id_to_job[str(i)][1],
                    "skill_id": "No",
                    "esco_skill_name": "No",
                    "alternative_name": "No",
                    "prompt": "No"})
                with lock:
                    global_set.add((id_to_job[str(i)][0], "No"))
        except Exception as e:

            df_new_dataset.append({
                    "job_id": id_to_job[str(i)][0],
                    "job_skill": id_to_job[str(i)][1],
                    "skill_id": "No",
                    "esco_skill_name": "No",
                    "alternative_name": "No",
                    "prompt": "No"})
            continue
        ctr += 1
        if ctr % save_every == 0:
            pd.DataFrame(df_new_dataset).to_csv(FILE_NAME, index=False)
        with lock:
            pbar.update(1)
    pd.DataFrame(df_new_dataset).to_csv(FILE_NAME, index=False)

In [None]:
# Global parameters for partitioning
def generate_limits(total_entries: int, number_of_partitions: int):
    entries_per_partition = total_entries // number_of_partitions
    partitions = [(i * entries_per_partition, (i + 1) * entries_per_partition) for i in range(number_of_partitions)]
    partitions[-1] = (partitions[-1][0], total_entries)
    return partitions

global_set = set()
NUMBER_OF_PARTITIONS = 10

_, job_skills, _, _ = get_indexes_and_mappings()

TOTAL_ENTRIES = job_skills.ntotal
partition_limits = generate_limits(TOTAL_ENTRIES, NUMBER_OF_PARTITIONS)

pbar = tqdm(total=job_skills.ntotal, desc="Overall Progress", position=0)
lock = Lock()



Overall Progress:   0%|          | 0/70318 [00:00<?, ?it/s]

In [10]:
print(partition_limits)

[(0, 7031), (7031, 14062), (14062, 21093), (21093, 28124), (28124, 35155), (35155, 42186), (42186, 49217), (49217, 56248), (56248, 63279), (63279, 70318)]


In [11]:
threads = []

for partition_id, (left, right) in enumerate(partition_limits):
    thread = threading.Thread(target=task, args=(left, right, partition_id, global_set))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()


Overall Progress: 100%|█████████▉| 70245/70318 [11:58:56<04:06,  3.38s/it]  

In [5]:
dataframe = pd.DataFrame(columns=["job_id", "job_skill", "skill_id" ,"esco_skill_name", "alternative_name", "prompt"])
FOLDER = "../data/green_skill_classification/"

folders = []

for file in os.listdir(FOLDER):
    if file.endswith(".csv"):
        folders.append(os.path.join(FOLDER, file))

folders.sort()
for file in folders:
    df_part = pd.read_csv(file)
    dataframe = pd.concat([dataframe, df_part], ignore_index=True)

In [6]:
dataframe.to_csv("../data/green_skill_classification/green_skills_with_GPT-5_full_dataset.csv", index=False)

In [130]:
df_new_dataset.to_csv("../data/green_skills_with_GPT-5.csv", index=False)