In [None]:
!pip install python-dotenv strands-agents[mistral] strands-agents-tools tqdm 

In [2]:
!aws s3 cp s3://gdsc25test/ . --recursive --quiet

In [1]:
import json
import os
import boto3
import dotenv
import requests

from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Type, TypeVar, Any
from tqdm import tqdm
from pydantic import BaseModel, Field
from strands.agent import Agent
from strands.models.mistral import MistralModel


T = TypeVar('T')
M = TypeVar('M', bound=BaseModel)

dotenv.load_dotenv(".env")

True

# Utilities

In [2]:
def get_agent(
    system_prompt: str = "",
    model_id: str = "mistral-large-latest"
) -> Agent:
    model = MistralModel(
        api_key=os.environ["MISTRAL_API_KEY"],
        model_id=model_id,
        stream=False
    )
    return Agent(model=model, system_prompt=system_prompt, callback_handler=None)


def load_file_content(path: str | Path) -> str:
    path = Path(path)
    with path.open('r', encoding='utf-8') as file:
        return file.read()


def read_json(path: str | Path):
    path = Path(path)
    with path.open('r', encoding='utf-8') as file:
        data = json.load(file)
    return data


def save_json(path: str | Path, data: Dict | List):
    path = Path(path)
    with path.open('w', encoding='utf-8') as file:
        json.dump(data, file)


def get_job_paths() -> List[Path]:
    data_dir = Path('./data/jobs')
    paths = []
    for file in data_dir.iterdir():
        if file.suffix == '.md':
            paths.append(file)
    return paths


def get_training_paths() -> List[Path]:
    data_dir = Path('./data/trainings')
    paths = []
    for file in data_dir.iterdir():
        if file.suffix == '.md':
            paths.append(file)
    return paths

# Data structures

In [3]:
class JobInfo(BaseModel):
    # domain: str = Field(default="", description="Field or industry of the job")
    required_skills: List[str] = Field(default_factory=list, description="List of required skills for the job.")
    location: str = Field(default="", description="Job location.")
    years_of_experience_required: str = Field(default="", description="Years of experience required to get this job.")

    def describe(self) -> str:
        skills = ', '.join(self.required_skills)
        return f"Required skills: {skills}\nLocation: {self.location}\nYears of experience required: {self.years_of_experience_required}"


class TrainingInfo(BaseModel):
    skill_acquired_and_level: Tuple[str, str] = Field(
        default_factory=tuple,
        description="A pair of skill name and level"
    )

    def describe(self) -> str:
        skill = f'{self.skill_acquired_and_level[0]}: level {self.skill_acquired_and_level[1]}'
        return f"Acquired skills: {skill}"


class PersonaInfo(BaseModel):
    name: str = Field(default="", description="Persona's name")
    skills: List[Tuple[str, str]] = Field(default_factory=list, description="List of pairs representing skills and its level.")
    location: str = Field(default="unknown", description="Current location")
    age: str = Field(default="unknown", description="Age of the persona")
    years_of_experience: str = Field(default="unknown", description="Years of experience in a field.")

    def describe(self) -> str:
        skills = ', '.join([
            f'{skill}: {level}'
            for skill, level in self.skills
        ])
        return (
            f"Name: {self.name}\n"
            f"Skills: {skills}\n"
            f"Location: {self.location}\n"
            f"Age: {self.age}\n"
            f"Years of experience: {self.years_of_experience}"
        )


class BoolOut(BaseModel):
    result: bool = Field(default=False, description="whether the condition is fulfilled or not")


class IDList(BaseModel):
    values: List[str] = Field(default=False, description="a list of string IDs")

# Accessing agents

In [4]:
def send_message_to_chat(message: str, persona_id: str, conversation_id: str = None) -> Optional[Tuple[str, str]]:
    """
    Send a single message to the chat endpoint and return the response.

    Args:
        persona_id: ID of the persona
        message: Message to send

    Returns:
        The response from the chat endpoint
    """
    url = 'https://5xxe59fsr7.execute-api.eu-central-1.amazonaws.com/main/chat'
    headers = {
        "x-api-key": os.environ["AWS_API_KEY"]
    }
    payload = {
        "message": message,
        "persona_id": persona_id,
        "conversation_id": conversation_id,
        "team_id": "WitekTeam",
    }

    response = requests.post(
        url=url,
        json=payload,
        headers=headers
    )

    if response.status_code != 200:
        print(f"Err: {response}, {persona_id}, {response.text}")
        return None

    response_data = response.json()
    return response_data['response'], response_data['conversation_id']


def get_conversation(persona_id: str, max_turns: int = 5, verbose: bool = True) -> List[str]:
    """
    Perform a conversation with a single persona for a maximum number of turns.

    Args:
        persona_id: ID of the persona
        max_turns: Maximum number of conversation turns (default: 5)

    Returns:
        List of conversation messages
    """
    system_prompt = """
    **Speak only in english.**
    **Do not give info from the internet.** 
    Continue to ask questions about this person - do not provide the jobs, trainings or anything yet.
    You are a helpful and empathetic assistant. Your goal is to engage in a natural conversation with a persona to gather the following information:
    - Their name
    - Their skills and **level of this skill**
    - Their current location
    - Their age
    - Their preferences
    - Years of experience
    
    Remember to always gather all of those information!
    Ask open-ended questions to encourage detailed responses. Be polite, patient, and adapt your questions based on their answers.
    If the persona is unsure or vague, gently probe for more details. Do not ask all questions at once; let the conversation flow naturally.
    **Do not comment on whatever the response is. Just ask questions to retrieve the information.**
    **Focus only on asking question about persona. Do not provide any additional info from the internet.**
    """
    conversation = []
    current_turn = 0
    converation_agent = get_agent(system_prompt)
    conversation_id = None

    # greeting
    agent_message = "Hello! I'm here to help you find the best job or training opportunities. Can you tell me your name?"
    converation_agent.messages = [{
        "role": "assistant",
        "content": [{
            "text": agent_message
        }]
    }]
    conversation.append(f"Assistant: {agent_message}")

    while current_turn < max_turns:
        resp = send_message_to_chat(
            agent_message,
            persona_id,
            conversation_id
        )
        if resp is None:
            print(f"User: {persona_id} did not respond")
            break
        user_response, conversation_id = resp
        if verbose:
            print(f"Response: {user_response}\n\n")
        conversation.append(f"User: {user_response}")
        agent_message = str(converation_agent(user_response))
        conversation.append(f"Assistant: {agent_message}")
        current_turn += 1
    return conversation

#maybe pararel_conversations() ?
def get_conversations(persona_ids: List[str], max_workers: int = 10, verbose: bool = False, max_turns: int = 5) -> Dict[str, List[str]]:
    all_conversations = {}
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(get_conversation, persona_id, verbose=verbose, max_turns=max_turns): persona_id
            for persona_id in persona_ids
        }
        for future in futures:
            persona_id = futures[future]
            try:
                all_conversations[persona_id] = future.result()
            except Exception as e:
                print(f"Error occurred for {persona_id}: {e}")
    return all_conversations



# Extracting information

In [5]:
def extract_info(model: Type[M], text: str) -> M:
    extraction_agent = get_agent()
    return extraction_agent.structured_output(output_model=model, prompt=text)


def extract_info_from_conversation(conversation: List[str]) -> PersonaInfo:
    text = '\n'.join(conversation)
    return extract_info(PersonaInfo, text)


def extract_info_from_job_path(path: str | Path) -> JobInfo:
    text = load_file_content(path)
    return extract_info(JobInfo, text)


def extract_info_from_training_path(path: str | Path) -> TrainingInfo:
    text = load_file_content(path)
    return extract_info(TrainingInfo, text)


def extract_info_to_json(
    model: BaseModel,
    description_paths: List[str | Path],
    save_path: str | Path,
    cache_period: int = 20,
    max_retries: int = 3
):
    save_path = Path(save_path)

    if not save_path.exists():
        save_path.touch()
        save_json(save_path, {})

    extracted_data = read_json(save_path)
    description_paths = [Path(path) for path in description_paths]
    print(f'Total descriptions for {model.__name__}: {len(description_paths)}')
    print(f'Extracted infos: {len(extracted_data)}')

    counter = 0
    for path in description_paths:
        id_ = path.stem
        retries = 0
        err = None
        if id_ not in extracted_data:
            text = load_file_content(path)
            while retries < max_retries:
                try:
                    info = extract_info(model, text)
                    break
                except ValueError as e:
                    retries += 1
                    err = e
            else:
                print(f'Error for id: {id_}', err)
            extracted_data[id_] = info.model_dump_json()
            counter += 1
        if counter % cache_period == 1:
            save_json(save_path, extracted_data)
            print(len(extracted_data))
    save_json(save_path, extracted_data)


def extract_jobs_info_to_json(
    save_path: str | Path,
    cache_period: int = 20,
    max_retries: int = 3
):
    job_paths = get_job_paths()
    extract_info_to_json(
        model=JobInfo,
        description_paths=job_paths,
        save_path=save_path,
        cache_period=cache_period,
        max_retries=max_retries
    )


def extract_trainings_info_to_json(
    save_path: str | Path,
    cache_period: int = 20,
    max_retries: int = 3
):
    training_paths = get_training_paths()
    extract_info_to_json(
        model=TrainingInfo,
        description_paths=training_paths,
        save_path=save_path,
        cache_period=cache_period,
        max_retries=max_retries
    )


# Matching personas to jobs and trainings

In [18]:
def find_job_matches_for_persona(
    persona_info: PersonaInfo,
    jobs_data: Dict[str, JobInfo],
) -> List[str]:
    jobs_text = "\n".join([
        f'{job_id}: {job_info.describe()}'
        for job_id, job_info in jobs_data.items()
    ])
    system_prompt = f"""
    You have a list of all available jobs. Given a candidate info provide
    a list od 3 to 5 job IDs that would match that candidate:
    {jobs_text}
    """
    agent = get_agent(system_prompt=system_prompt)
    res = agent.structured_output(IDList, persona_info.describe())
    return res.values


def find_training_matches_for_persona(
    persona_info: PersonaInfo,
    trainings_data: Dict[str, TrainingInfo]
) -> List[str]:
    trainings_text = "\n".join([
        f'{training_id}: {training_info.describe()}'
        for training_id, training_info in trainings_data.items()
    ])
    system_prompt = f"""
    You have a list of all available trainings. Given a candidate info provide
    a list od 3 to 5 training IDs that would match that candidate:
    {trainings_text}
    """
    agent = get_agent(system_prompt=system_prompt)
    res = agent.structured_output(IDList, persona_info.describe())
    return res.values

# Extracting job infos

In [7]:
jobs_save_path = './extracted_jobs_info.json'
extract_jobs_info_to_json(jobs_save_path)
jobs_info = read_json(jobs_save_path)
jobs_info = {
    job_id: JobInfo.model_validate_json(job_data)
    for job_id, job_data in jobs_info.items()
}

Total descriptions for JobInfo: 200
Extracted infos: 200


# Extracting trainings info

In [8]:
trainings_save_path = './extracted_trainings_info.json'
extract_trainings_info_to_json(trainings_save_path)
trainings_info = read_json(trainings_save_path)
trainings_info = {
    training_id: TrainingInfo.model_validate_json(training_data)
    for training_id, training_data in trainings_info.items()
}

Total descriptions for TrainingInfo: 497
Extracted infos: 497


# Test matching algorithm

In [19]:
persona = PersonaInfo(
    name='Pedro Araújo',
    skills=[('Food Safety', 'Intermediate'), ('Food Sustainability', 'Intermediate')],
    location='Brasília',
    age='16',
    years_of_experience='1'
)
print(persona.describe())

persona_jobs = find_job_matches_for_persona(persona, jobs_info)
persona_trainings = find_training_matches_for_persona(persona, trainings_info)
print(f"Jobs: {persona_jobs}")
print(f"Trainings: {persona_trainings}")

Name: Pedro Araújo
Skills: Food Safety: Intermediate, Food Sustainability: Intermediate
Location: Brasília
Age: 16
Years of experience: 1
Jobs: values=['job_foo_003', 'job_foo_006', 'job_foo_009', 'job_foo_001']
Trainings: values=['tr_foo_food_safety_standards_haccp_gmp__02', 'tr_foo_waste_reduction_02', 'tr_foo_quality_inspection_02', 'tr_foo_hygiene_protocols_02', 'tr_foo_food_safety_standards_haccp_gmp__03']


# Collecting conversations

In [14]:
persona_ids = [f'persona_{i:03}' for i in range(1, 101)]
cache_period = 4

personas_save_path = Path('./extracted_personas_info.json')
if not personas_save_path.exists():
    personas_save_path.touch()
    save_json(personas_save_path, {})

persona_infos = read_json(personas_save_path)
print(f'Total conversations for personas: {len(persona_infos)}')
print(f'Collected conversations: {len(persona_infos)}')

counter = 0
for persona_id in persona_ids:
    if persona_id not in persona_infos:
        conversation = get_conversation(persona_id, max_turns=2, verbose=False)
        persona_info = extract_info_from_conversation(conversation)
        persona_infos[persona_id] = persona_info.model_dump_json()
        counter += 1
    if counter % cache_period == 1:
        save_json(personas_save_path, persona_infos)
        print(len(persona_infos))
save_json(personas_save_path, persona_infos)

persona_infos = {
    persona_id: PersonaInfo.model_validate_json(persona_info)
    for persona_id, persona_info in persona_infos.items()
}


Total conversations for personas: 100
Collected conversations: 100


# Generating the final results

In [19]:
results = []
for persona_id, persona_info in tqdm(persona_infos.items()):
    jobs = find_job_matches_for_persona(persona_info, jobs_info)
    trainings = find_training_matches_for_persona(persona_info, trainings_info)
    data = {'persona_id': persona_id}
    if jobs and trainings:
        data['gold_type'] = 'jobs+trainings'
        data['jobs'] = [
            {'job_id': job_id, 'suggested_trainings': trainings}
            for job_id in jobs
        ]
    elif trainings:
        data['gold_type'] = 'trainings_only'
        data['trainings'] = trainings
    else:
        data['gold_type'] = 'awareness'
        data['gold_items'] = ''
    results.append(data)

100%|██████████| 100/100 [05:36<00:00,  3.36s/it]


In [20]:
results[0]

{'persona_id': 'persona_001',
 'gold_type': 'jobs+trainings',
 'jobs': [{'job_id': 'job_foo_003',
   'suggested_trainings': ['tr_foo_food_safety_standards_haccp_gmp__01',
    'tr_foo_food_safety_standards_haccp_gmp__02',
    'tr_foo_hygiene_protocols_01',
    'tr_foo_hygiene_protocols_02',
    'tr_foo_hygiene_protocols_03']},
  {'job_id': 'job_foo_004',
   'suggested_trainings': ['tr_foo_food_safety_standards_haccp_gmp__01',
    'tr_foo_food_safety_standards_haccp_gmp__02',
    'tr_foo_hygiene_protocols_01',
    'tr_foo_hygiene_protocols_02',
    'tr_foo_hygiene_protocols_03']},
  {'job_id': 'job_foo_005',
   'suggested_trainings': ['tr_foo_food_safety_standards_haccp_gmp__01',
    'tr_foo_food_safety_standards_haccp_gmp__02',
    'tr_foo_hygiene_protocols_01',
    'tr_foo_hygiene_protocols_02',
    'tr_foo_hygiene_protocols_03']},
  {'job_id': 'job_foo_008',
   'suggested_trainings': ['tr_foo_food_safety_standards_haccp_gmp__01',
    'tr_foo_food_safety_standards_haccp_gmp__02',
    '

In [21]:
results_path = Path('./results.jsonl')
with results_path.open('w', encoding='utf-8') as file:
    for res in results:
        line = json.dumps(res) + '\n'
        file.write(line)