# Generative Psuedo Labeling with Claude-3.5-Sonnet & Classification of "Work Behaviors and Skills" using AWS Bedrock
* Notebook by Adam Lang
* Date: 2/11/2025

# Overview
* The goal of this notebook is to extract "Work Behaviors" and "Skills" from a dataset and use Claude-3.5-Sonnet on bedrock to classify them.

# Workflow
* We will implement chain-of-thought prompting with Claude-3.5-Sonnet and extract "behaviors" and "skills" from a pre-processed CSV file that includes a column called "english_message" which is translated non-english text to english.
* I will then use the LLM to perform Generative Pseudo Labeling where it will generate a label for each behavior and skill so we can build a pseudo classification on the data. 

## Step 1 - Extract Behaviors and skills into 2 new columns
* We will implement chain-of-thought prompting, and extract specific information from the CSV file. 
* This is what I will do:

1. Utilize chain of thought prompting .
2. I am going to extract phrases from a column in a csv file. The csv file is called `df_triplets_experiment.csv`. There are 3 columns: "Award Type", "english_message", and "Department". The goal is to extract into separate 2 new columns "work behaviors" and another column for "skills". Think of this as an entity recognition or extraction problem but also I want to be able to classify these results.
3. After I extract the 2 new columns for "work behaviors" and "skills", then in Step 2 I am going to go back and create a general classification of the types of "work behaviors" and types of "skills". 
4. The final step would then be to label each of the rows with these in 2 new columns for "behavior_class" and "skill_class" so that i can link these labels to the "Award_Type" and "Department" later on for data visualization.


## Import Dependencies

In [1]:
%%capture 
!pip install nest_asyncio

In [20]:
import pandas as pd
import asyncio
import json
import boto3
from tqdm.notebook import tqdm
import nest_asyncio

import time
import random
from botocore.exceptions import ClientError

In [3]:
# Apply nest_asyncio at the beginning of your script
nest_asyncio.apply()

In [4]:
# Set up Bedrock client
bedrock = boto3.client(
    service_name='bedrock-runtime',
    region_name='<your region here>'  # Replace with your region
)

In [None]:
## init boto session
session = boto3.Session()
credentials = session.get_credentials()
print(f"Access Key: {credentials.access_key}")
print(f"Secret Key: {'*' * len(credentials.secret_key)}")
print(f"Region: {session.region_name}")

In [6]:
import boto3

bedrock = boto3.client(
    service_name='bedrock-runtime',
    region_name='<your region here>',  # Make sure this is the correct region
    aws_access_key_id='YOUR_ACCESS_KEY',  # Optional if using IAM role
    aws_secret_access_key='YOUR_SECRET_KEY'  # Optional if using IAM role
)

In [None]:
import sagemaker

role = sagemaker.get_execution_role()
print(role)

In [None]:
##ensure you're using the updated IAM role

import boto3

sts = boto3.client('sts')
response = sts.get_caller_identity()
print(f"Current role ARN: {response['Arn']}")

### Important Note about IAM Role
* Note in order to invoke a bedrock model you need to update IAM role permissions.
* This includes going into permissions and updating the `bedrock_access` with this JSON block to allow invoking a bedrock model.
```
{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Effect": "Allow",
			"Action": "bedrock:InvokeModel*",
			"Resource": "*"
		}
	]
}
```



### Important Note about IAM Role SageMaker Studio access
* You need to update not only bedrock access but the `studio-role`
* This needs to be added to the bottom of the IAM role:
```
{
    "Effect": "Allow",
    "Action": [
        "bedrock:InvokeModel",
        "bedrock:ListFoundationModels"
    ],
    "Resource": "*"
}
```

### Code below will validate if IAM role is appropriate to run bedrock models

In [None]:
import boto3
import json
from botocore.exceptions import ClientError

# Create a Bedrock client
bedrock = boto3.client('bedrock-runtime', region_name='<your region here>')  # Replace with your region if different

def test_bedrock_access():
    try:
        # Try to invoke the model
        response = bedrock.invoke_model(
            modelId='anthropic.claude-3-5-sonnet-20240620-v1:0',  # Make sure this is the correct model ID
            body=json.dumps({
                "anthropic_version": "bedrock-2023-05-31",
                "max_tokens": 10,
                "messages": [{"role": "user", "content": "Hello, can you hear me?"}]
            })
        )
        
        # If we get here, the invocation was successful
        result = json.loads(response['body'].read())
        print("Successfully connected to Bedrock!")
        print("Model response:", result['content'][0]['text'])
        return True
    
    except ClientError as e:
        error_code = e.response['Error']['Code']
        error_message = e.response['Error']['Message']
        print(f"Error accessing Bedrock: {error_code} - {error_message}")
        return False


In [None]:
# Run the test
test_bedrock_access()

# Additionally, you can check your current identity
sts = boto3.client('sts')
identity = sts.get_caller_identity()
print("\nCurrent Identity:")
print(f"Account: {identity['Account']}")
print(f"User/Role ARN: {identity['Arn']}")

# Test the IAM role permission for AWS Bedrock API
* This will attempt to make a simple call to the Bedrock API and print the response or any error messages.

In [11]:
import boto3
import json

bedrock = boto3.client('bedrock-runtime')

try:
    response = bedrock.invoke_model(
        modelId='anthropic.claude-3-5-sonnet-20240620-v1:0',
        body=json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 10,
            "messages": [{"role": "user", "content": "Hello, world!"}]
        })
    )
    print("Successfully connected to Bedrock!")
    print(json.loads(response['body'].read())['content'][0]['text'])
except Exception as e:
    print(f"Error connecting to Bedrock: {str(e)}")

Successfully connected to Bedrock!
Hello! It's nice to meet you. How


### Load Dataset

In [15]:
# Load the DataFrame
df = pd.read_csv('<csv file name>')
df.head()

Unnamed: 0,award_date,award_type,english_message,department
0,2025-01-01,Applaud 2,Kam Wei has been instrumental in helping to cl...,6600 Solution Consultant
1,2025-01-01,Applaud 2,Sid is a rare Player-Coach that inspires his c...,6630 1st Line Mgr - SC
2,2025-01-01,Applaud 2,"Myung Soo rejoined Qlik in August 2024, but qu...",6600 Solution Consultant
3,2025-01-01,Applaud 2,"Congrats, Sean!",6600 Solution Consultant
4,2025-01-01,Applaud 2,Thank you Jason for being persistent and consi...,6120 QC Sales Enterprise


# Class Object to generate pseudo labels for behaviors and skills
* The format of the prompt expected by Claude-3.5-Sonnet when using AWS Bedrock is the prompt must start with "Human:" (abbreviated as "H:") after an optional system prompt.

### Code Optimization
* The two primary methods used for efficient processing in this code are:

1. Batch Processing
    * Batch Processing: Instead of sending individual requests for each row of data, the code groups multiple rows into batches.
    * Batch processing reduces the number of API calls, which can significantly improve overall performance, especially when dealing with large datasets.

2. Asynchronous Requests
    * Asynchronous Requests: The code uses asynchronous programming to make API calls concurrently. This is implemented using Python's asyncio library and the aiohttp library for asynchronous HTTP requests.
    * Asynchronous processing allows the code to send multiple API requests concurrently without waiting for each one to complete before sending the next. This can dramatically reduce the total time spent waiting for API responses, especially when dealing with network latency.

3. Additionally, the code includes some other optimizations:
    * Chunked data loading (pd.read_csv(..., chunksize=chunk_size)), which allows processing of large files that don't fit in memory.
    * Caching of prompts using `@lru_cache`, which can speed up processing if there are repeated texts in the input.




In [23]:
class SkillBehaviorProcessor:
    def __init__(self, region_name='<your region here>'):
        nest_asyncio.apply()
        self.bedrock = boto3.client('bedrock-runtime', region_name=region_name)
        self.model_id = 'anthropic.claude-3-5-sonnet-20240620-v1:0'

    async def invoke_with_retry(self, body, max_retries=5, initial_delay=1):
        for attempt in range(max_retries):
            try:
                response = await asyncio.to_thread(
                    self.bedrock.invoke_model,
                    modelId=self.model_id,
                    body=body
                )
                return response
            except ClientError as e:
                if e.response['Error']['Code'] == 'ThrottlingException':
                    delay = (2 ** attempt) + random.uniform(0, 1)
                    print(f"Request throttled. Retrying in {delay:.2f} seconds...")
                    await asyncio.sleep(delay)
                else:
                    raise
        raise Exception("Max retries reached")

    async def generate_pseudo_label_async(self, text):
        prompt = f"""Given the following text, identify and extract key phrases related to work behaviors, hard skills, and soft skills demonstrated. Use a step-by-step approach:

Text: "{text}"

Step 1: Identify potential work behaviors, hard skills, and soft skills mentioned in the text.
Step 2: For each identified item, determine if it's a work behavior, hard skill, or soft skill.
Step 3: Formulate concise phrases for each work behavior, hard skill, and soft skill.
Step 4: List the work behaviors, hard skills, and soft skills separately.

Output the results in the following format:
Work Behaviors:
1. [Work Behavior phrase]
2. [Work Behavior phrase]
3. [Work Behavior phrase]

Hard Skills:
1. [Hard Skill phrase]
2. [Hard Skill phrase]
3. [Hard Skill phrase]

Soft Skills:
1. [Soft Skill phrase]
2. [Soft Skill phrase]
3. [Soft Skill phrase]

If fewer than three are evident for any category, list only those that are clearly demonstrated."""

        body = json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 500,
            "messages": [{"role": "user", "content": prompt}],
            "temperature": 0.5,
            "top_p": 1,
        })

        try:
            response = await self.invoke_with_retry(body)
            response_body = json.loads(response['body'].read())
            return response_body['content'][0]['text'].strip()
        except Exception as e:
            print(f"Error in API call: {str(e)}")
            return f"API Error: {str(e)}"

    async def classify_behaviors_and_skills_async(self, behaviors, hard_skills, soft_skills):
        prompt = f"""Given the following work behaviors, hard skills, and soft skills, classify them into general categories:

Work Behaviors: {behaviors}
Hard Skills: {hard_skills}
Soft Skills: {soft_skills}

Provide a general classification for the behaviors, hard skills, and soft skills. Output the results in the following format:
Behavior Class: [General category for behaviors]
Hard Skill Class: [General category for hard skills]
Soft Skill Class: [General category for soft skills]"""

        body = json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 150,
            "messages": [{"role": "user", "content": prompt}],
            "temperature": 0.3,
            "top_p": 1,
        })

        try:
            response = await self.invoke_with_retry(body)
            response_body = json.loads(response['body'].read())
            return response_body['content'][0]['text'].strip()
        except Exception as e:
            print(f"Error in API call: {str(e)}")
            return f"API Error: {str(e)}"

    async def process_extraction_batch_async(self, batch):
        tasks = [self.generate_pseudo_label_async(text) for text in batch['english_message']]
        return await asyncio.gather(*tasks)

    async def process_classification_batch_async(self, batch):
        tasks = [self.classify_behaviors_and_skills_async(row['work_behaviors'], row['hard_skills'], row['soft_skills']) for _, row in batch.iterrows()]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return [str(result) if isinstance(result, Exception) else result for result in results]

    async def process_extraction_dataframe_async(self, df, batch_size=10):
        results = []
        total_batches = (len(df) + batch_size - 1) // batch_size
        for i in tqdm(range(0, len(df), batch_size), total=total_batches, desc="Processing extraction batches"):
            batch = df.iloc[i:i+batch_size]
            batch_results = await self.process_extraction_batch_async(batch)
            results.extend(batch_results)
        return results

    async def process_classification_dataframe_async(self, df, chunk_size=100):
        results = []
        total_chunks = (len(df) + chunk_size - 1) // chunk_size
        for i in tqdm(range(0, len(df), chunk_size), total=total_chunks, desc="Processing classification chunks"):
            chunk = df.iloc[i:i+chunk_size]
            batch_results = await self.process_classification_batch_async(chunk)
            results.extend(batch_results)
        return results

    @staticmethod
    def extract_skills_and_behaviors(text):
        work_behaviors, hard_skills, soft_skills = [], [], []
        current_category = None
        for line in text.split('\n'):
            if line.startswith('Work Behaviors:'):
                current_category = 'behaviors'
            elif line.startswith('Hard Skills:'):
                current_category = 'hard_skills'
            elif line.startswith('Soft Skills:'):
                current_category = 'soft_skills'
            elif line.strip().startswith(('1.', '2.', '3.')):
                item = line.split('.', 1)[1].strip()
                if current_category == 'behaviors':
                    work_behaviors.append(item)
                elif current_category == 'hard_skills':
                    hard_skills.append(item)
                elif current_category == 'soft_skills':
                    soft_skills.append(item)
        return ', '.join(work_behaviors), ', '.join(hard_skills), ', '.join(soft_skills)

    @staticmethod
    def extract_classifications(classifications):
        behavior_class = hard_skill_class = soft_skill_class = ''
        for line in classifications.split('\n'):
            if line.startswith('Behavior Class:'):
                behavior_class = line.split(':', 1)[1].strip()
            elif line.startswith('Hard Skill Class:'):
                hard_skill_class = line.split(':', 1)[1].strip()
            elif line.startswith('Soft Skill Class:'):
                soft_skill_class = line.split(':', 1)[1].strip()
        return behavior_class, hard_skill_class, soft_skill_class

    def run_async(self, coro):
        return asyncio.get_event_loop().run_until_complete(coro)

    def process_dataframe(self, df, text_column='english_message', extraction_batch_size=50, classification_chunk_size=200):
        print(f"Processing dataframe with {len(df)} rows")
        
        # Create a copy of the dataframe to avoid modifying the original
        result_df = df.copy()
        
        # Step 1: Extract skills and behaviors
        print("Extracting skills and behaviors...")
        pseudo_labels = self.run_async(self.process_extraction_dataframe_async(result_df[[text_column]], batch_size=extraction_batch_size))
        result_df['pseudo_labels'] = pseudo_labels
        result_df['work_behaviors'], result_df['hard_skills'], result_df['soft_skills'] = zip(*result_df['pseudo_labels'].apply(self.extract_skills_and_behaviors))
        
        # Step 2: Classify behaviors and skills
        print("Classifying behaviors and skills...")
        classifications = self.run_async(self.process_classification_dataframe_async(result_df[['work_behaviors', 'hard_skills', 'soft_skills']], chunk_size=classification_chunk_size))
        result_df['classifications'] = classifications
        result_df['behavior_class'], result_df['hard_skill_class'], result_df['soft_skill_class'] = zip(*result_df['classifications'].apply(self.extract_classifications))
        
        print(f"Processed {len(result_df)} rows")
        print(f"Sample results:\n{result_df[[text_column, 'work_behaviors', 'hard_skills', 'soft_skills', 'behavior_class', 'hard_skill_class', 'soft_skill_class']].head()}")
        
        return result_df

# Apply Class Behavior and Skills to the Qlik Dataset

In [22]:
## lets try 50 rows of the data first
df_test = df.iloc[:50]
df_test 

Unnamed: 0,award_date,award_type,english_message,department
0,2025-01-01,Applaud 2,Kam Wei has been instrumental in helping to cl...,6600 Solution Consultant
1,2025-01-01,Applaud 2,Sid is a rare Player-Coach that inspires his c...,6630 1st Line Mgr - SC
2,2025-01-01,Applaud 2,"Myung Soo rejoined Qlik in August 2024, but qu...",6600 Solution Consultant
3,2025-01-01,Applaud 2,"Congrats, Sean!",6600 Solution Consultant
4,2025-01-01,Applaud 2,Thank you Jason for being persistent and consi...,6120 QC Sales Enterprise
5,2025-01-01,Tack,Thank you always for the support on case handl...,2001 Customer Success & Support
6,2025-01-01,Tack,Thank you always for the support on case handl...,2000 OpCo Support
7,2025-01-01,Tack,Thank you always for the support on case handl...,2000 OpCo Support
8,2025-01-01,Tack,Thank you always for the support on case handl...,2000 OpCo Support
9,2025-01-01,Tack,Thank you always for the support on case handl...,2000 OpCo Support


In [None]:
# apply class object 
if __name__ == "__main__":
    import os
    import pandas as pd
    
    # Load your DataFrame (assuming it's already loaded as df_qlik)
    # df_qlik = pd.read_csv('your_qlik_data.csv')  # Uncomment and use this if you need to load the data
    
    # Initialize the processor
    processor = SkillBehaviorProcessor(region_name='eu-central-1')

    # Process the DataFrame
    result_df = processor.process_dataframe(
        df_test, 
        text_column='english_message',  # Specify the column containing the text to analyze
        extraction_batch_size=50,
        classification_chunk_size=200
    )

    # Display the results (be cautious with large DataFrames)
    print(result_df.head().to_string())

    # Save the results to a CSV file
    result_df.to_csv('file_name.csv', index=False)

    print("Processing complete. Results saved to 'file_name.csv'")

# Usage Example with dummy data

In [None]:
# Usage example
if __name__ == "__main__":
    # Example usage
    import os
    
    # Ensure AWS credentials are set up in your environment variables or AWS config file
    
    # Create a sample DataFrame
    data = {
        'english_message': [
            "John consistently meets deadlines and demonstrates excellent time management skills. He is proficient in Python programming and database management. His ability to communicate complex technical concepts to non-technical stakeholders is outstanding.",
            "Sarah led a cross-functional team to successfully implement a new CRM system. She showcased strong project management skills and adaptability in the face of changing requirements. Her expertise in data analysis and visualization was crucial to the project's success.",
            "Mike is known for his innovative problem-solving approach. He has deep knowledge of machine learning algorithms and has applied them to optimize business processes. His collaborative nature and mentoring skills have made him a valuable team member."
        ]
    }
    df = pd.DataFrame(data)

    # Initialize the processor
    processor = SkillBehaviorProcessor(region_name='<your region here>')  # Replace with your AWS region

    # Process the DataFrame
    result_df = processor.process_dataframe(df, extraction_batch_size=2, classification_chunk_size=2)

    # Display the results
    print(result_df.to_string())

    # Optionally, save the results to a CSV file
    result_df.to_csv('processed_skills_behaviors.csv', index=False)