In [21]:
import json

test_path = "data/merged_data/v0/nova_sft_testset.jsonl"

test_data = []
with open(test_path, 'r') as f:
    for line in f:
        if line.strip():
            sample = json.loads(line.strip())
            test_data.append(sample)

print(f"num: {len(test_data)}")
print(f"example: {json.dumps(test_data[0], indent=2, ensure_ascii=False)}")

num: 1000
example: {
  "schemaVersion": "bedrock-conversation-2024",
  "system": [
    {
    }
  ],
  "messages": [
    {
      "role": "user",
      "content": [
        {
          "image": {
            "format": "jpeg",
            "source": {
              "s3Location": {
                "uri": "s3://sagemaker-us-east-1-452145973879/projects/nova-image-tagging/data/shein_data/images/img_2011.jpg",
                "bucketOwner": "452145973879"
              }
            }
          }
        },
        {
          "text": "Please classify the item in this image according to the categories defined in the system."
        }
      ]
    },
    {
      "role": "assistant",
      "content": [
        {
          "text": "{\"result\":\"仿真子弹\"}"
        }
      ]
    }
  ]
}


In [None]:
import boto3

bedrock_client = boto3.client("bedrock-runtime", region_name="us-east-1")

def invoke_custom_model(deployment_arn, messages, system):
    response = bedrock_client.converse(
                modelId=deployment_arn,
                messages=messages,
                system=system,
                inferenceConfig={"maxTokens": 512, "temperature": 0.0, "topP": 0.9, "stopSequences": ["```"]},
            )
    
    prediction = json.loads(response["output"]["message"]["content"][0]["text"].replace("```", ""))
    
    return prediction

## single test

In [23]:
from copy import deepcopy

deployment_arn = "arn:aws:bedrock:us-east-1:452145973879:custom-model-deployment/u83i7ozdfnqa"

## local image
sample = deepcopy(test_data[0])
system = sample['system']
messages = sample['messages'][:-1]
image_uri = messages[0]['content'][0]['image']['source']['s3Location']["uri"]
local_image_path = '/'.join(image_uri.split('/')[-4:])
image_ext = local_image_path.split(".")[-1]
with open(local_image_path, "rb") as f:
    image = f.read()

messages[0]['content'][0]['image'] = {
                "format": 'jpeg',
                "source": {
                    "bytes": image
            }
}

print(messages)
response = invoke_custom_model(deployment_arn, messages, system)
print(response)

[{'role': 'user', 'content': [{'image': {'format': 'jpeg', 'source': {'bytes': b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xfe\x00\x1fCompressed by jpeg-recompress\xff\xe2\x01\xd8ICC_PROFILE\x00\x01\x01\x00\x00\x01\xc8\x00\x00\x00\x00\x040\x00\x00mntrRGB XYZ \x07\xe0\x00\x01\x00\x01\x00\x00\x00\x00\x00\x00acsp\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\xf6\xd6\x00\x01\x00\x00\x00\x00\xd3-\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\tdesc\x00\x00\x00\xf0\x00\x00\x00$rXYZ\x00\x00\x01\x14\x00\x00\x00\x14gXYZ\x00\x00\x01(\x00\x00\x00\x14bXYZ\x00\x00\x01<\x00\x00\x00\x14wtpt\x00\x00\x01P\x00\x00\x00\x14rTRC\x00\x00\x01d\x00\x00\x00(gTRC\x00\x00\x01d\x00\x00\x00(bTRC\x00\x00\x01d\x00\x00\x00(cprt\x00\x00\x01\x8c\x00\x00\x00<mluc

response: {'ResponseMetadata': {'RequestId': '1609cb32-8ab0-48dd-95d4-95414380e4f5', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Thu, 21 Aug 2025 07:42:51 GMT', 'content-type': 'application/json', 'content-length': '215', 'connection': 'keep-alive', 'x-amzn-requestid': '1609cb32-8ab0-48dd-95d4-95414380e4f5'}, 'RetryAttempts': 0}, 'output': {'message': {'role': 'assistant', 'content': [{'text': '{"result":"仿真子弹"}'}]}}, 'stopReason': 'end_turn', 'usage': {'inputTokens': 4976, 'outputTokens': 12, 'totalTokens': 4988}, 'metrics': {'latencyMs': 7400}}
{'result': '仿真子弹'}


In [24]:
# from copy import deepcopy

# ## s3 image
# sample = deepcopy(test_data[0])
# system = sample['system']
# messages = sample['messages'][:-1]

# # image_uri = messages[0]['content'][0]['image']['source']['s3Location']["uri"]
# # messages[0]['content'][0]['image']['source']['s3Location']["uri"] = image_uri.replace('us-east-1', 'us-west-2')
# print(messages)

# response = invoke_custom_model(deployment_arn, messages, system)

# print(response)

## full test

In [35]:
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Semaphore
from threading import Lock
import pandas as pd

class RateLimiter:
    def __init__(self, max_calls_per_second=2):
        self.max_calls_per_second = max_calls_per_second
        self.min_interval = 1.0 / max_calls_per_second
        self.last_called = 0
        self.lock = Lock()

    def wait(self):
        with self.lock:
            elapsed = time.time() - self.last_called
            left_to_wait = self.min_interval - elapsed
            if left_to_wait > 0:
                time.sleep(left_to_wait)
            self.last_called = time.time()

def process_sample_with_rate_limit(args):
    i, sample, deployment_arn, n_shein, rate_limiter = args
    
    try:
        # Rate limiting
        rate_limiter.wait()
        
        system = sample['system']
        messages = deepcopy(sample['messages'][:-1])
        image_uri = messages[0]['content'][0]['image']['source']['s3Location']["uri"]

        local_image_path = '/'.join(image_uri.split('/')[-4:])
        
        with open(local_image_path, "rb") as f:
            image = f.read()

        messages[0]['content'][0]['image'] = {
            "format": 'jpeg',
            "source": {"bytes": image}
        }

        tgt_res = json.loads(sample['messages'][-1]['content'][0]['text'])['result']
        
        # Invoke model with retry logic
        max_retries = 3
        for attempt in range(max_retries):
            try:
                response = invoke_custom_model(deployment_arn, messages, system)
                break
            except Exception as e:
                if attempt == max_retries - 1:
                    raise e
                time.sleep(2 ** attempt)  # Exponential backoff

        result = {
            "image_uri": image_uri,
            "tag_gt": tgt_res,
            "inference_result": response['result'],
            "sample_index": i
        }

        if i < n_shein:
            result['source'] = 'shein'
        else:
            result['source'] = 'product10k'

        return result
        
    except Exception as e:
        print(f"Error processing sample {i}: {e}")
        return {
            "image_uri": image_uri if 'image_uri' in locals() else "unknown",
            "error": str(e),
            "sample_index": i
        }

# Main execution with rate limiting
results = []
failed_results = []
n_shein = 434
max_workers = 10  # Conservative for API rate limits
rate_limiter = RateLimiter(max_calls_per_second=5)  # Adjust based on your limits

# Prepare arguments
sample_args = [(i, sample, deployment_arn, n_shein, rate_limiter) 
               for i, sample in enumerate(test_data)]  # Adjust slice as needed

print(f"Processing {len(sample_args)} samples with {max_workers} workers...")

with ThreadPoolExecutor(max_workers=max_workers) as executor:
    # Submit all tasks
    future_to_index = {
        executor.submit(process_sample_with_rate_limit, args): args[0] 
        for args in sample_args
    }
    
    # Collect results with progress bar
    for future in tqdm(as_completed(future_to_index), 
                       total=len(future_to_index), 
                       desc="Processing samples"):
        
        sample_index = future_to_index[future]
        result = future.result()
        
        if result and 'error' not in result:
            results.append(result)
        else:
            failed_results.append(result)
            print(f"Failed sample {sample_index}: {result.get('error', 'Unknown error')}")

# Sort results by original order
results.sort(key=lambda x: x['sample_index'])
failed_results.sort(key=lambda x: x['sample_index'])

print(f"Successfully processed: {len(results)} samples")
print(f"Failed: {len(failed_results)} samples")

df_res = pd.DataFrame(results)
# # Optional: Save results
# import json
# with open('outputs/inference_results_v1.json', 'w') as f:
#     json.dump(results, f, indent=2)


Processing 1000 samples with 10 workers...


Processing samples:   0%|          | 0/1000 [00:00<?, ?it/s]

Processing samples: 100%|██████████| 1000/1000 [11:12<00:00,  1.49it/s]

Successfully processed: 1000 samples
Failed: 0 samples





In [36]:
save_path = 'outputs/inference_results_v0.csv'
df_res.to_csv(save_path)
df_res.head()

Unnamed: 0,image_uri,tag_gt,inference_result,sample_index,source
0,s3://sagemaker-us-east-1-452145973879/projects...,仿真子弹,伪装刀具,0,shein
1,s3://sagemaker-us-east-1-452145973879/projects...,仿真子弹,仿真子弹,1,shein
2,s3://sagemaker-us-east-1-452145973879/projects...,仿真子弹,伪装刀具,2,shein
3,s3://sagemaker-us-east-1-452145973879/projects...,仿真子弹,仿真刀具,3,shein
4,s3://sagemaker-us-east-1-452145973879/projects...,仿真子弹,仿真子弹,4,shein


In [None]:
import pandas as pd
import sys
from collections import defaultdict

def calculate_pr(df, test_name):
    # Get ground truth and predictions using column names
    # You can modify these column names to match your Excel file
    ground_truth_col = 'tag_gt'  # Change this to your actual column name
    predictions_col = 'inference_result'    # Change this to your actual column name

    try:
        ground_truth = df[ground_truth_col].astype(str)
        predictions = df[predictions_col].astype(str)
    except KeyError as e:
        print(f"Error: Column {e} not found in Excel file.")
        print(f"Available columns: {list(df.columns)}")
        sys.exit(1)

    ground_truth_set = set(ground_truth)

    # Calculate metrics for each label
    label_stats = defaultdict(lambda: {'tp': 0, 'fp': 0, 'fn': 0})

    for gt, pred in zip(ground_truth, predictions):
        pred_list = [p.strip() for p in pred.split(',') if p.strip()]
        
        # True positive: ground truth appears in predictions
        if gt in pred_list:
            label_stats[gt]['tp'] += 1
        else:
            label_stats[gt]['fn'] += 1
        
        # False positives: predicted labels that don't match ground truth
        for p in pred_list:
            if p != gt and p in ground_truth_set:
                label_stats[p]['fp'] += 1

    # Calculate and save results
    results = []
    for label in sorted(label_stats.keys()):
        stats = label_stats[label]
        
        precision = stats['tp'] / (stats['tp'] + stats['fp']) if (stats['tp'] + stats['fp']) > 0 else 0
        recall = stats['tp'] / (stats['tp'] + stats['fn']) if (stats['tp'] + stats['fn']) > 0 else 0
        
        results.append({
            'Label': label,
            'Precision': precision,
            'Recall': recall,
            'TP': stats['tp'],
            'FP': stats['fp'],
            'FN': stats['fn']
        })

    # Calculate average of Precision and Recall from all labels
    avg_precision = sum(result['Precision'] for result in results) / len(results) if results else 0
    avg_recall = sum(result['Recall'] for result in results) / len(results) if results else 0
    total_tp = sum(result['TP'] for result in results)
    total_fp = sum(result['FP'] for result in results)
    total_fn = sum(result['FN'] for result in results)

    # Add average row
    results.append({
        'Label': 'Macro',
        'Precision': avg_precision,
        'Recall': avg_recall,
        'TP': total_tp,
        'FP': total_fp,
        'FN': total_fn
    })

    results.append({
        'Label': 'Micro',
        'Precision': total_tp/(total_tp+total_fp),
        'Recall': total_tp/(total_tp+total_fn),
        'TP': total_tp,
        'FP': total_fp,
        'FN': total_fn
    })

    # Save to CSV
    results_df = pd.DataFrame(results)

    metric_file= save_path.replace('.csv', f'_{test_name}_metric.csv')

    results_df.to_csv(metric_file, index=False)

    return results_df

In [52]:
test_name = "shein"
df = df_res[df_res['source']=='shein']
results_df_shein = calculate_pr(df, test_name)
results_df_shein.tail()

Unnamed: 0,Label,Precision,Recall,TP,FP,FN
33,警用喷雾,0.923077,0.923077,12,1,1
34,重燃蜡烛,1.0,0.444444,4,0,5
35,镁粉,0.0,0.0,0,1,2
36,Macro,0.729694,0.549601,236,96,198
37,Micro,0.710843,0.543779,236,96,198


In [53]:
test_name = "full"
df = df_res
results_df = calculate_pr(df, test_name)
results_df.tail()

Unnamed: 0,Label,Precision,Recall,TP,FP,FN
233,饰品、首饰、手链,0.0,0.0,0,0,1
234,香水、香氛产品,0.666667,0.5,2,1,2
235,香薰用品,1.0,0.5,1,0,1
236,Macro,0.322425,0.246542,397,234,603
237,Micro,0.62916,0.397,397,234,603


## Test Original Nova

In [49]:
# import boto3
# import json
# import base64
# # Create a Bedrock Runtime client
# client = boto3.client("bedrock-runtime", 
#                       region_name="us-east-1", 
#                      )
# PRO_MODEL_ID = "us.amazon.nova-pro-v1:0"
# LITE_MODEL_ID = "us.amazon.nova-lite-v1:0"
# MICRO_MODEL_ID = "us.amazon.nova-micro-v1:0"
# PREMIER_MODEL_ID = "us.amazon.nova-premier-v1:0"

# messages = [{'role': 'user', 'content': [{'image': {'format': 'jpeg', 'source': {'s3Location': {'uri': 's3://sagemaker-us-east-1-452145973879/projects/nova-image-tagging/data/shein_data/images/img_2011.jpg', 'bucketOwner': '452145973879'}}}}, {'text': 'Please classify the item in this image according to the categories defined in the system.'}]}]

# image_uri = messages[0]['content'][0]['image']['source']['s3Location']["uri"]
# local_image_path = '/'.join(image_uri.split('/')[-4:])
# image_ext = local_image_path.split(".")[-1]
# with open(local_image_path, "rb") as f:
#     image = f.read()

# messages[0]['content'][0]['image'] = {
#                 "format": 'jpeg',
#                 "source": {
#                     "bytes": image
#             }
# }

# # messages = [
# #     {
# #         "role": "user",
# #         "content": [
# #             {
# #                 "image": {
# #                     "format": "png",
# #                     "source": {
# #                         "s3Location": {
# #                             #Replace the s3 bucket URI 
# #                             "uri": "s3://demo-bucket/cat.png"
# #                             "bucketOwner" : "123456789012"
# #                         }
# #                     },
# #                 }
# #             },
# #             {"text": "Describe the following image"},
# #         ],
# #     }
# # ]

# inf_params = {"maxTokens": 512, "topP": 0.9, "temperature": 0}
# model_response = client.converse(
#     modelId=LITE_MODEL_ID, messages=messages, system=system, inferenceConfig=inf_params
# )
# print("\n[Full Response]")
# print(json.dumps(model_response, indent=2, ensure_ascii=False))
# print("\n[Response Content Text]")
# print(model_response["output"]["message"]["content"][0]["text"])