In [None]:
#  This cell loads the batch job from bedrock. It returns the extracted tasks for each
#  job.
from llmbo import StructuredBatchInferer
from extraction import TaskOutput


sbi = StructuredBatchInferer.recover_structured_job(
    job_arn="arn:aws:bedrock:us-east-1:992382722318:model-invocation-job/7whskxnh15zc",
    region="us-east-1",
    output_model=TaskOutput,
    session=boto3.Session()
)

sbi.download_results()
sbi.load_results()

print(len(sbi.instances))


2025-02-17 16:57:17,248 - llmbo.llmbo.StructuredBatchInferer - INFO - Attempting to Recover BatchInferer from arn:aws:bedrock:us-east-1:992382722318:model-invocation-job/7whskxnh15zc
2025-02-17 16:57:17,275 - botocore.credentials - INFO - Found credentials in shared credentials file: ~/.aws/credentials
2025-02-17 16:57:31,679 - llmbo.llmbo.StructuredBatchInferer - INFO - Initialized StructuredBatchInferer with TaskOutput schema
2025-02-17 16:57:31,679 - llmbo.llmbo.StructuredBatchInferer - INFO - Intialising BatchInferer
2025-02-17 16:57:31,686 - botocore.credentials - INFO - Found credentials in shared credentials file: ~/.aws/credentials
2025-02-17 16:57:45,853 - llmbo.llmbo.StructuredBatchInferer - INFO - Role 'BatchInferenceRole' exists.
2025-02-17 16:57:45,872 - llmbo.llmbo.StructuredBatchInferer - INFO - Initialized BatchInferer
2025-02-17 16:57:46,170 - llmbo.llmbo.StructuredBatchInferer - INFO - Job arn:aws:bedrock:us-east-1:992382722318:model-invocation-job/7whskxnh15zc is alr

23233


In [None]:
# Load the jobs dataframe

import pandas as pd

hmrc_jobs = pd.read_parquet("./data/jobs.pq")
hmrc_jobs = hmrc_jobs.loc[hmrc_jobs["department"] == "HM Revenue and Customs"].copy()

In [None]:
#  convert the instances from the batch processor into a data frame
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np

tasks_dict = {}
for item in sbi.instances:
    if item:
        # Extract vacancy_id from the recordId string
        vacancy_id = int(item["recordId"].split("=")[1])
        # Get the tasks from the outputModel
        if item["outputModel"]:
            tasks = item["outputModel"].model_dump()["tasks"]
            tasks_dict[vacancy_id] = tasks


tasks_df = pd.DataFrame(
    [{"vacancy_id": key, **item} for key, tasks in tasks_dict.items() for item in tasks]
).set_index("vacancy_id")


In [None]:
# Supplement the job listings with the llm output task and exposure score.
hmrc_jobs_with_tasks = hmrc_jobs.merge(tasks_df, on="vacancy_id", how="left")


In [None]:
#  This computes the summary stats for each vacancy, i.e. how exposed the role is to automation
hmrc_stats = hmrc_jobs_with_tasks.groupby("vacancy_id")["exposure_score"].agg(
    ["count", "mean", "median", "std"]
)


In [None]:
hmrc_with_stats = hmrc_jobs.merge(hmrc_stats, how="left", on="vacancy_id")


In [None]:
# Create a distribution plot
plt.figure(figsize=(10, 6))
sns.histplot(data=hmrc_stats["mean"], kde=True)
plt.title("Distribution of Mean Exposure Scores")
plt.xlabel("Mean Exposure Score")
plt.ylabel("Count")
plt.show()


In [None]:
# Concentrating on the roles where their is a high degree of automation potential
high_automation_exposure = hmrc_jobs.merge(
    hmrc_stats[hmrc_stats["mean"] >= 0.7], how="right", on="vacancy_id"
)


print(f"{len(high_automation_exposure)} of {len(hmrc_jobs)} are HIGH")
print(
    f"{len(high_automation_exposure['vacancy_title'].str.lower().unique())} vacancy_titles"
)

In [None]:
# We have a longish list of job titles, use an llm to uncover the groups.
from mirascope.core import Messages, bedrock
import boto3
from pydantic import BaseModel, Field
from typing import List

taxonomy_prompt = """
You are an expert in creating groups of similar job titles. You examine the full list of 
job titles. Create a list of groups which cover the full list. 
"""


class JobTitleTaxonomy(BaseModel):
    categories: List[str] = Field(..., description="A list of the job categories")


@bedrock.call(
    "us.anthropic.claude-3-5-sonnet-20241022-v2:0",
    call_params={"temperature": 0.2, "max_tokens": 8000},
    client=boto3.Session().client("bedrock-runtime", region_name="us-east-1"),
    response_model=JobTitleTaxonomy,
)  # type: ignore
def map_jobs(job_titles: str) -> Messages.Type:
    return [
        Messages.System(taxonomy_prompt),
        Messages.User(f"""
            Group these job titles:  
            <job_titles>{job_titles}</job_titles>
            """),
    ]


job_groupings = map_jobs(
    "\n".join(high_automation_exposure["vacancy_title"].str.lower().unique())
)

job_groupings.categories


In [None]:
# Now that the model has created a taxonomy/groupings of the roles. Use those and map
# each of the roles.

from enum import Enum
from llmbo import ModelInput
from datetime import datetime

# # Create the Enum dynamically
# Category = Enum("JobCategories", job_groupings.categories)

Category = Enum("JobCategories", ['Data & Analytics',
 'Administrative & Support',
 'Finance & Accounting',
 'Project & Program Management',
 'Customer Service',
 'Technical & IT',
 'HR & Recruitment',
 'Compliance & Risk',
 'Management & Leadership',
 'Legal & Policy',
 'Communications & Marketing',
 'Operations'])

SYSTEM_PROMPT = """
You are an expert in mapping job titles to categories provided
"""


def prompt(job_description):
    return f"""
    Map a single job title to the closest category provied.
    <job_title>{job_description}</job_title>
    """


class Mapper(BaseModel):
    mapped_category: Category


try:
    job_mapper_batcher = StructuredBatchInferer.recover_structured_job(
        job_arn="arn:aws:bedrock:us-east-1:992382722318:model-invocation-job/st34z39z3ckv",
        region="us-east-1",
        output_model=Mapper,
    )

    job_mapper_batcher.download_results()
    job_mapper_batcher.load_results()
except ValueError:
    # modify this to use a better model.
    job_mapper_batcher = StructuredBatchInferer(
        output_model=Mapper,
        model_name="us.anthropic.claude-3-5-sonnet-20241022-v2:0",
        region="us-east-1",
        job_name=f"hmrc-job-mapper-{datetime.now().strftime('%Y%m%d-%H%M%S')}",
        bucket_name="cddo-af-bedrock-batch-inference-us-east-1",
        role_arn="arn:aws:iam::992382722318:role/BatchInferenceRole",
    )
    inputs = {
        f"vacancy_id={row.vacancy_id}": ModelInput(
            system=SYSTEM_PROMPT,
            messages=[{"role": "user", "content": prompt(row.job_description)}],
        )
        for row in high_automation_exposure.itertuples()
    }

    job_mapper_batcher.prepare_requests(inputs)
    job_mapper_batcher.push_requests_to_s3()
    job_mapper_batcher.create()
    job_mapper_batcher.poll_progress(60)

    job_mapper_batcher.download_results()
    job_mapper_batcher.load_results()


print(job_mapper_batcher.manifest)


In [None]:
mapped_job_titles = pd.DataFrame(
    [
        {
            "vacancy_id": int(item["recordId"].split("=")[1]),  # Extract ID
            "category": item["outputModel"].mapped_category.name,
        }
        for item in job_mapper_batcher.instances
    ]
)


high_automation_exposure = high_automation_exposure.merge(
    mapped_job_titles, on="vacancy_id", how="left"
)


In [None]:
# Compute summary statistics for each job group
summary = (
    high_automation_exposure.groupby("category")
    .apply(
        lambda g: pd.Series(
            {
                "count": g["count"].sum(),
                "mean": np.average(g["mean"], weights=g["count"]),  # Weighted mean
                "std": np.sqrt(
                    np.sum(
                        (g["count"] - 1) * (g["std"] ** 2)
                        + g["count"]
                        * (g["mean"] - np.average(g["mean"], weights=g["count"])) ** 2
                    )
                    / (g["count"].sum() - 1)  # Pooled standard deviation formula
                ),
            }
        ),
    include_groups=False)
    .reset_index()
)

summary

In [None]:
import plotly.express as px
import pandas as pd

# Create the scatter plot
fig = px.scatter(
    summary,
    x="mean",
    y="std",
    # text="category",
    # size="count", 
    hover_data={
        "category": True,
        "count": True,
        "mean": True,
        "std": True,
    },  # Display extra details
    title="Mean vs Standard Deviation by Job Category",
    labels={"mean": "Mean", "std": "Standard Deviation"},
)

# Improve layout
fig.update_traces(marker=dict(opacity=0.7, line=dict(width=1, color="black")))
fig.update_layout(
    xaxis=dict(title="Mean"), yaxis=dict(title="Standard Deviation"), showlegend=False
)

# Show the figure
fig.show()

In [None]:
import plotly.express as px
import pandas as pd

# Create the scatter plot
fig = px.scatter(
    high_automation_exposure,
    x="mean",
    y="std",
    color="category",
    # text="category",
    # size="count", 
    hover_data={
        "category": True,
        "count": True,
        "mean": True,
        "std": True,
    },  # Display extra details
    title="Mean vs Standard Deviation by Job Category",
    labels={"mean": "Mean", "std": "Standard Deviation"},
)

# Improve layout
fig.update_traces(marker=dict(opacity=0.7, line=dict(width=1, color="black")))
fig.update_layout(
    xaxis=dict(title="Mean"), yaxis=dict(title="Standard Deviation"), showlegend=False
)

# Show the figure
fig.show()

In [None]:

import plotly.express as px
import pandas as pd

# Create the scatter plot
fig = px.scatter(
    hmrc_with_stats,
    x="mean",
    y="std",
    color=hmrc_with_stats["mean"] >= 0.7,
    # text="category",
    # size="count", 
    hover_data={
        "vacancy_id": True,
        "vacancy_title":True,
        "count": True,
        "mean": True,
        "std": True,
    },  # Display extra details
    title="Mean vs Standard Deviation by Job Category",
    labels={"mean": "Mean", "std": "Standard Deviation"},
)

# Improve layout
fig.update_traces(marker=dict(opacity=0.7, line=dict(width=1, color="black")))
fig.update_layout(
    xaxis=dict(title="Mean"), yaxis=dict(title="Standard Deviation"), showlegend=False
)

# Show the figure
fig.show()