# Prompt Engineering for Synthetic Data Generation Using Cohere Command R Models

---
## Introduction

In this notebook we step through how to leverage Cohere Command R family of models to create synthetic data based on provided schemas. The use case for this notebook is a health social media app. The health app includes different posts on the platform for users as well as user profiles for the health application. We begin with having these schemas defined and the LLM will use the schemas to generate the data. We will step throug how to ingest this data into Opensearch and then query the results to confirm the data resides in the Opensearch Collection.

This solution is intended to help teams and developers quickly build test data for various use cases whether the search engine is Opensearch or other data stores like relational databases.

--- 
## Cohere Command R Models
There are multiple different models available on Amazon Bedrock from Cohere Command R family of models:

### 1. Command R+
- **Description:** Command R+ is Cohere's most powerful generative language model optimized for long-context tasks, such as retrieval-augmented generation (RAG) and multi-step tool use.
- **MaxTokens:** 128K
- **Languages:** English, French, Spanish, Italian, German, Portuguese, Japanese, Korean, Arabic, and Chinese
- **Supported Use cases:** Text generation, text summarization, chat, knowledge assistants, Q&A, RAG.

### 2. Command R
- **Description:** Command R is Cohere's generative language model optimized for long-context tasks, such as retrieval-augmented generation (RAG) and tools, and large scale production workloads.
- **MaxTokens:** 128K
- **Languages:** English, French, Spanish, Italian, German, Portuguese, Japanese, Korean, Arabic, and Chinese
- **Supported Use cases:** Text generation, text summarization, chat, knowledge assistants, Q&A, RAG.

For the notebook, we will use Command R+ model as default with the option to switch to Command R to test accuracy and performance.

---
## Prerequisites
1. Use kernel either conda_python3, conda_pytorch_p310 or conda_tensorflow2_p310.
2. Install the required packages.
3. Access to the Cohere models on Amazon Bedrock and access to the Converse API.

---

## Getting Started
### Step 0: Install Dependencies and Import Modules

In [1]:
!pip install -U awscli -qU --force --quiet --no-warn-conflicts
!pip install boto3==1.34.127 -qU --force --quiet --no-warn-conflicts
!pip install numpy==1.26.4 -qU --force --quiet --no-warn-conflicts
!pip install opensearch-py -qU --force --quiet --no-warn-conflicts
!pip install requests-aws4auth -qU --force --quiet --no-warn-conflicts

Note: When installing libraries using the pip, you may encounter errors or warnings during the installation process. These are generally not critical and can be safely ignored. However, after installing the libraries, it is recommended to restart the kernel or computing environment you are working in. Restarting the kernel ensures that the newly installed libraries are loaded properly and available for use in your code or workflow.


In [27]:
import boto3
from botocore.exceptions import ClientError
import json
import os
from opensearchpy import OpenSearch, RequestsHttpConnection
from opensearchpy.helpers import bulk
import sagemaker
import time
import random
import re
from requests_aws4auth import AWS4Auth

In [3]:
# Setup Bedrock Client
bedrock_rt= boto3.client(
    service_name='bedrock-runtime'
)
session = boto3.session.Session()
region_name = session.region_name
# Create a SageMaker session that will be used when creatin the Opensearch Collection
sagemaker_role_arn = sagemaker.get_execution_role()
sagemaker_role_arn

'arn:aws:iam::709425451936:role/service-role/AmazonSageMaker-ExecutionRole-20240712T100744'

### Step 1: Read in the userprofile and healthpost schemas and corresponding index files

In [4]:
#Read in the userprofile and healthpost schema that are preset that are predefined
with open('schemas/userprofile_schema.json', 'r') as file:
    userprofile_schema = json.load(file)
#open the health posts scheams
with open('schemas/healthpost_schema.json', 'r') as file:
    healthpost_schema= json.load(file)

We also have a schemas formatted for the Opensearch index created later

In [5]:
with open('schemas/userprofile_schema_index.json', 'r') as file:
    userprofile_schema_index = json.load(file)
with open('schemas/healthpost_schema_index.json', 'r') as file:
    healthpost_schema_index= json.load(file)

We will need to access all the above information as strings later in the notebook so let's convert the json to strings for later use in the prompt

In [6]:
#get the correct formatting to send to system prompt
userprofile_schema_string= json.dumps(userprofile_schema, indent=2)
healthpost_schema_string= json.dumps(healthpost_schema, indent=2)
userprofile_index_string= json.dumps(userprofile_schema_index, indent=2)
healthpost_index_string= json.dumps(healthpost_schema_index, indent=2)

### Step 2:  Synthetic Data Generation Based on Schemas
Now since we have two schemas for healthposts and userprofiles for the health app, let's learn how to generate synthetic data for the schemas using a foundational model. We will use the generate_data() function throughout out the notebook as the single point of calling the LLM of choice

In [7]:
def generate_data(bedrock_rt, model_id, system_prompt, query, inference_config):
    """
    Function to call the Bedrock Converse API
    """
    messages = []
    messages.append({
        "role": "user",
        "content": [{"text": query}]
        }
    )
    response = bedrock_rt.converse(
        modelId=model_id,
        system=system_prompt,
        messages=messages,
        inferenceConfig=inference_config
    )
    output = response['output']['message']['content'][0]['text']
    return output

In [8]:
#Set up variables to switch the model_id depending on which model you'd like to test. We will set Cohere Command R+ as default model.
model_id_commandR = "cohere.command-r-v1:0"
model_id_commandR_plus= "cohere.command-r-plus-v1:0"
model_id = model_id_commandR_plus
#we keep it at 0 because we don't want that much creativity
inference_config = {"temperature": 0}

We will set up two different messages to pass to the Converse API.
1. For health posts 
2. For user profiles

We separate the user prompt but keep the same system prompt for both calls. This is standard best practices as the user prompt can be subjective to different tasks at hand but system prompt will stay the same for both outputs.

In [15]:
message_healthpost= f"""Please generate 5 Health Post entries based on the following JSON schema. Health Post Schema:{healthpost_schema_string}.
        Please provide the generated data in JSON format, do not include any other information in response other than the JSON outputs.
        Put each of these additional dictionaries in separate <json> tags."""

message_userprofile= f"""Please generate 5 user profile entries based on the following JSON schema. User Profile Schema: {healthpost_schema_string=}
        Please provide the generated data in JSON format, do not include any other information in response other than the JSON outputs
         Put each of these additional dictionaries in separate <json> tags"""

In [16]:
system_prompt = [{"text": """You are an AI assistant tasked with generating synthetic data for a health tech social media platform. You will be provided a JSON schema. Your job is to create realistic, diverse, and consistent sample data entries based on the schema.

Follow these guidelines:
1. Generate data that adheres strictly to the provided schemas.
2. Create diverse and realistic entries, considering various demographics, health conditions, and interests.
3. Ensure consistency between User Profile and Health Post data (e.g., usernames, user IDs).
4. Use realistic values for all fields, including dates, metrics, and engagement statistics.
5. Generate geolocation data for major cities around the world.
6. Create a mix of verified and non-verified users/posts.
7. Vary the sentiment scores and engagement metrics realistically.
8. Include a range of health interests, fitness levels, and medical conditions.
9. Generate realistic content for post titles and content fields.

Remember to maintain data privacy by not using real people's information. All data should be fictional but plausible.

"""
}
]

In [17]:
#Call the generate_data() function and store in a variable for now. You can loop throguh a few examples of this leveraging the same system prompt
health_post_data = generate_data(bedrock_rt, model_id, system_prompt, message_healthpost, inference_config)
user_profile_data = generate_data(bedrock_rt, model_id, system_prompt, message_userprofile, inference_config)

We will store our synthetic data in the 'data' folder, let's create it and then add the data to individual files for later use

In [20]:
if not os.path.exists('data'):
    os.makedirs('data')

In [21]:
if not os.path.exists('data/healthpost_data.json'):  
    with open('data/healthpost_data.json', 'w', encoding='utf-8') as f:
        json.dump(health_posts_dict, f, ensure_ascii=False, indent=4)
if not os.path.exists('data/userprofile_data.json'):  
    with open('data/userprofile_data.json', 'w', encoding='utf-8') as f:
        json.dump(user_profile_dict, f, ensure_ascii=False, indent=4)

Now we have our raw data contained in "health_posts_dict" and "user_profile_dict" variables. Let's move onto the next step to create an Opensearch Collection, Opensearch Index and ingest the data.

### Step 3: Create Opensearch Collection and Index

In [22]:
#create initial client for opensearch
aoss_client = boto3.client('opensearchserverless')
suffix = random.randrange(200, 900)
identity = boto3.client('sts').get_caller_identity()['Arn']

Find code to step through creating Opensearch Collections. Code sampled from here: https://github.com/aws-samples/Cohere-on-AWS/blob/main/cohere-cookbooks/Embeddings/Cohere_Embeddings_Search.ipynb

In [23]:
def create_policies_in_oss(es_name, aoss_client, role_arn):
    
    encryption_policy_name = f"sample-sp-{suffix}"
    network_policy_name = f"sample-np-{suffix}"
    access_policy_name = f'sample-ap-{suffix}'

    try:
        encryption_policy = aoss_client.create_security_policy(
            name=encryption_policy_name,
            policy=json.dumps(
                {
                    'Rules': [{'Resource': ['collection/' + es_name],
                               'ResourceType': 'collection'}],
                    'AWSOwnedKey': True
                }),
            type='encryption'
        )
    except Exception as ex:
        print(ex)
    
    try:
        network_policy = aoss_client.create_security_policy(
            name=network_policy_name,
            policy=json.dumps(
                [
                    {'Rules': [{'Resource': ['collection/' + es_name],
                                'ResourceType': 'collection'}],
                     'AllowFromPublic': True}
                ]),
            type='network'
        )
    except Exception as ex:
        print(ex)
    
    try:
        
        access_policy = aoss_client.create_access_policy(
            name=access_policy_name,
            policy=json.dumps(
                [
                    {
                        'Rules': [
                            {
                                'Resource': ['collection/' + es_name],
                                'Permission': [
                                    'aoss:CreateCollectionItems',
                                    'aoss:DeleteCollectionItems',
                                    'aoss:UpdateCollectionItems',
                                    'aoss:DescribeCollectionItems'],
                                'ResourceType': 'collection'
                            },
                            {
                                'Resource': ['index/' + es_name + '/*'],
                                'Permission': [
                                    'aoss:CreateIndex',
                                    'aoss:DeleteIndex',
                                    'aoss:UpdateIndex',
                                    'aoss:DescribeIndex',
                                    'aoss:ReadDocument',
                                    'aoss:WriteDocument'],
                                'ResourceType': 'index'
                            }],
                        'Principal': [identity, role_arn],
                        'Description': 'Easy data policy'}
                ]),
            type='data'
        )
    except Exception as ex:
        print(ex)
        
    return encryption_policy, network_policy, access_policy

**note**: **Only run the next cell once. If you run it more than once, will error since the policies already exist**

In [24]:
# Create Collection
es_name = f'es-collection-{suffix}'

encryption_policy, network_policy, access_policy = create_policies_in_oss(es_name=es_name,
                       aoss_client=aoss_client,
                       role_arn=sagemaker_role_arn)
#the type should be SEARCH, can be changed to VECTORSEARCH if we want a vectorDB
collection = aoss_client.create_collection(name=es_name,type='SEARCH')

**reminder**: only run the above cell ONCE

In [25]:
#extract the host from the Collection ID to be used 
collection_id = collection['createCollectionDetail']['id']
host = collection_id + '.' + region_name + '.aoss.amazonaws.com'
print(host)

hvl2ayqfiqdrqa8w1bxa.us-east-1.aoss.amazonaws.com


The following code will build the Opensearch client. 

In [28]:
service = 'aoss'
credentials= boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key,
                   region_name, service, session_token=credentials.token)
# Build the OpenSearch client
oss_client = OpenSearch(
    hosts=[{'host': host, 'port': 443}],
    http_auth=awsauth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection,
    timeout=300
)
# It can take up to a minute for data access rules to be enforced
time.sleep(60)

**oss_client** is the variable denoted the Opensearch Collection. Now, let's create the two indices for our use case based on the data we read in earlier in the notebook

In [29]:
#health post
healthpost_index = "healthpost_index"
healthpost_body = {
   "settings": {
        "index": {
            "number_of_shards": 2,
            "number_of_replicas": 1
        }
    },
   "mappings": healthpost_schema_index['mappings']
}

#user profile
userprofile_index = "userprofile_index"
userprofile_body = {
   "settings": {
        "index": {
            "number_of_shards": 2,
            "number_of_replicas": 1
        }
    },
   "mappings": userprofile_schema_index['mappings']
}

Now, we ingest the data and let's check the response was received succesfully.
**note** sometimes it can take the collection anywhere from a few minutes to 10 minutes to create. If you are getting errors, wait a few more minutes or check status of your Opensearch Collection in AWS console. The status needs to be "active"

In [30]:
# We would get an index already exists exception if the index already exists, and that is okay. Ignore that error if it occurs
try:
    response_health = oss_client.indices.create(healthpost_index, body=healthpost_body) 
    response_user = oss_client.indices.create(userprofile_index, body=userprofile_body)
    print(f"response received for the create index -> {response_health}")
    print(f"response received for the create index -> {response_user}")

except Exception as e:
    print(f"error, exception={e}")

response received for the create index -> {'acknowledged': True, 'shards_acknowledged': True, 'index': 'healthpost_index'}
response received for the create index -> {'acknowledged': True, 'shards_acknowledged': True, 'index': 'userprofile_index'}


### Step 4: Ingest Synthetic Data into Opensearch

In [31]:
#read in the data created from earlier
with open('data/healthpost_data.json', 'r') as file:
    healthposts_json = json.load(file)
with open('data/userprofile_data.json', 'r') as file:
    userprofile_json = json.load(file)

In [32]:
def ingest_data(posts, user, healthpost_index, userprofile_index, oss_client):
    actions = []
    for k1, k2 in zip(healthposts_json, userprofile_json):
        actions.append(
        {
            "_index": healthpost_index,
            "_source": k1
        })
        actions.append(
        {
            "_index": userprofile_index,
            "_source": k2
        })
    success, failed = bulk(oss_client, actions)
    print(f"Successfully indexed {success} documents")
    print(f"Failed to index {len(failed)} documents")

In [33]:
ingest_data(posts= healthposts_json, user= userprofile_json, healthpost_index=healthpost_index, userprofile_index=userprofile_index, oss_client=oss_client)

Successfully indexed 10 documents
Failed to index 0 documents


If we get a succesful statement then we are good to go to the next step to create examples.

### Step 5: Query All Records in Opensearch Collection

Below is an example query to match_all the records or return all record in the Opensearch collection.Again the data was generated by the LLM and now we are just confirming that the data resides in the collection and we can start running queries against the collection.

In [34]:
query_body = {
    'query': {
        'match_all': {}
    }
}

In [35]:
def query_oss(query):
    #extract the json tags with the function generated beforehand
    response = oss_client.search(
    index = "_all",
    body= query
    )
    return response['hits']['hits']

In [37]:
output = query_oss(query_body)
print(output)

[{'_index': 'healthpost_index', '_id': 'E-9Gn5EBiT38F8xt-2vb', '_score': 1.0, '_source': {'post_id': 'post_5', 'user_id': 'user_345', 'username': 'FitnessGuru', 'post_type': 'video', 'title': 'Full-Body HIIT Workout for Beginners', 'content': 'Check out this 30-minute high-intensity interval training routine that will get your heart pumping and muscles burning!', 'created_at': '2023-05-05T12:00:00Z', 'updated_at': '2023-05-05T12:00:00Z', 'tags': ['hiit', 'workout', 'fitness'], 'category': 'exercise', 'likes_count': 1025, 'comments_count': 125, 'shares_count': 85, 'media_urls': ['https://example.com/hiit_workout.mp4'], 'location': {'lat': -22.9068, 'lon': -43.1729}, 'mentioned_users': [], 'health_metrics': {'steps': 5000, 'calories': 450.0, 'heart_rate': 145, 'blood_pressure': {'systolic': 125, 'diastolic': 80}, 'sleep_duration': 7.2}, 'sentiment_score': 0.72, 'is_verified': True}}, {'_index': 'userprofile_index', '_id': 'DO9Gn5EBiT38F8xt-2vb', '_score': 1.0, '_source': {'user_id': 'use

Above we can see that the model was able to generate a query as well as accurately return data from the Opensearch Collection.

**note**: if you receive authorization errors eventually, just scroll up and rerun the cell that builds the Opensearch client. 

---
## Clean Up

After we are done, delete the indexes for the collection.

In [None]:
def delete_opensearch_serverless_indices(collection_id, client):
    # Create a boto3 client for OpenSearch Serverles
    try:
        client.indices.delete(index='_all')
    except Exception as e:
        print(f"An error occurred: {e}")
        
delete_opensearch_serverless_indices(collection_id, oss_client)

---
## Conclusion

We observed through this notebook how to create synthetic data to improve pace of testing with existing schemas leveraging Cohere Command R+ models. You can leverage a similar approach to integration synthetic data generation into testing for multiple use cases whether that is for Opensearch or other data stores for your applications.

This notebook allows you to change the model_id to various Cohere Command models as well to test accuracy and performance. As a next step, you can incorporate reranking search results from the Opensearch collection.