#### 1. Install Required Packages

In [0]:
%pip install --quiet -r requirements.txt

%restart_python

#### Import All Required Packages

In [0]:
from autogen.agentchat.contrib.multimodal_conversable_agent import MultimodalConversableAgent
# Import necessary libraries for PySpark
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
from pyspark.sql import SparkSession, Row
############
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import datetime
import ast
import time
##################################
# from utility import MultiModel, Processing
from PIL import Image
import pandas as pd
import yaml
import csv
import os
import shutil
import json
import re
import autogen
import base64
import requests



# vision_model = MultiModel(api_key="9a66c4f7a6304b9ab03c4836c29cf79a", temp=0)
# process = Processing()

##### Azure Storage Mount

In [0]:
if not any(mount.mountPoint == mount_point for mount in dbutils.fs.mounts()):
    dbutils.fs.mount(
        source=f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/",
        mount_point=mount_point,
        extra_configs={f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net": storage_account_access_key}
    )

## Without Autogen Agent

In [0]:
class MultiModel():
    """
    A class to interact with the GPT-4 Vision model.

    Attributes:
        api_key (str): The API key for authentication.
        temp (float): The temperature parameter for the model. Default is 0.
    """
    def __init__(self,api_key,temp=0):
        """
        The constructor for the MultiModel class.
        """
        self.api_key=api_key
        self.temp=temp
        
    def gpt4v_img(self,prompt,image_path,max_tokens:int=4000):
        """
        Method to generate a response from the GPT-4 Vision model.

        Parameters:
            prompt (str): The prompt for the model.
            image_path (str): The path to the image file.
            max_tokens (int): The maximum number of tokens for the model to generate. Default is 4000.

        Returns:
            dict: A dictionary containing the model's response and inference time.
        
        """
        def encode_image(image_path):
            """
            Function to encode an image file into base64 format.

            Parameters:
                image_path (str): The path to the image file.

            Returns:
                str: The base64 encoded string of the image.
            """
            with open(image_path, "rb") as image_file:
                return base64.b64encode(image_file.read()).decode('utf-8')

        # Getting the base64 string
        base64_image = encode_image(image_path)
        headers = {
                "Content-Type": "application/json",
                "api-key": self.api_key,
            }

        payload = {
              "model": "gpt-4-vision-preview",
              "messages": [
                    {"role": "user", "content": [{"type": "text", "text": f"{prompt}"},
                                                 {"type": "image_url","image_url": {"url": f"data:image/jpeg;base64,{base64_image}","detail": "high"}}]
                    }],
              "max_tokens": max_tokens,
            "temperature":self.temp
                }
        
        start=time.time()
        response = requests.post("https://chatgpt4o.openai.azure.com/openai/deployments/gpt4odeployment/chat/completions?api-version=2023-03-15-preview", headers=headers, json=payload)
        end=time.time()
        ## Calculate the inferance time
        latency=end-start
        ## Lets Find only Final model response
        print(response.json())
        result = response.json()['choices'][0]['message']['content']
        token=response.json()["usage"]['total_tokens']
        return {"final_response":result,"inferance_time":latency,"token":token}
    
class Processing():
    def __init__(self):
        pass

    def save_uploadedfile(self,uploadedfile):
        """
        Function to save an uploaded file.

        Parameters:
            uploadedfile: The file uploaded by the user.

        Returns:
            str: The path where the file is saved.
        """

        # Join the directory "images" and the name of the uploaded file to form the path where the file will be saved
        saved_file_path = os.path.join("images", uploadedfile.name)

        # Open the file in write mode
        with open(saved_file_path, "wb") as f:
            # Write the contents of the uploaded file to the new file
            f.write(uploadedfile.getbuffer())
            # Display a success message in the Streamlit app
            st.success("Saved File: {} to tempDir".format(uploadedfile.name))

        # Return the path where the file is saved
        return saved_file_path




#### Pre-defined Tags

In [0]:
tags = """products category:TOPWEAR,BOTTOMWEAR
TOPWEAR--

Products:Shirt,Jumpsuit,Dungarees,Sleepwear,Sweatshirt,Innerwear,Dress,Top,T-shirt,Polo Collar T-shirt,Sweater/Cardigan,Hoodie,Jackets,Raincoat,LongCoat,Blazer,Suits (2-piece or 3-piece Men),Kurta,Kurta Set,Maxi Dress,Showerproof,vest,Sports Bra, Polo shirt.
Color:Pink,Red,Blue,Black,Grey,Navy Blue,Charcoal Grey,White,Green,Olive,Brown,Beige,Khaki,Cream,Maroon,Off White,Grey Melange,Teal,Coffee Brown,Mustard,Purple,Rust,Sea Green,Burgundy,Turquoise Blue,Taupe,Silver,Mauve,Orange,Yellow,Multi,Lavender,Tan,Peach,Magenta,Fluorescent Green,Coral,Copper.
Gender:Men,Women,Girls,Boys.
Pattern:Striped,Checked,Embellished,Ribbed,Colorblocked,Dyed,Printed,Embroidered,Self-Design,Solid,Graphic,Floral,Polka Dots,Camouflage,Animal,Self-Design,Ombre.
Silhouette:A-line,Peplum,Balloon,Fit and Flare,Sheath,Bodycon,Shirt Style,Jumper,Wrap,Kaftan.
Neckline:Turtle/High Neck,Round Neck,Square Neck,Halter Neck,Scoop Neck,V neck Boat Neck,Polo Collar,Open-Collar,Crew Neck,Tie-Up Neck,Boat Neck,Shirt Neck, pointed collar neckline, mandarin collar,notch lapel.
Sleeve Length:Sleeveless,Half Sleeves,Long Sleeves,Three-Quarter Sleeves, short sleeve,cap sleeve.
Sleeve Style:Batwing Sleeves,Bell Sleeves,Flared Sleeves,Balloon Sleeves,Puffed Sleeves,Cold Sleeves,Shoulder Sleeves,Regular Sleeves,Slit Sleeves,Roll Up Sleeves,No Sleeves,Flutter Sleeve.
Fabric:Cotton,Polyester,Leather,Denim,Silk,Wool,Corduroy,Fleece,Schiffli,Terry,Crepe,Net,Georgette.
Brand:Mango,Puma,Adidas,Nike,Calvin Klein,Lacoste,Fred Perry,Brooks Brothers,GAP,Levis,GANT,Superdry,Tommy Hilfiger,H&M,Zara,Louis Phillipe,Polo Raulph Lauren,Guess,Gucci,Prada,Versace,Aeropostale,Abercrombie & Fitch,DKNY,Michael Kors,Coach,Fendi.
Occasion:Casual,Formal,Parties/Evening,Sports,Maternity,Ethnic,Everyday,Work,winters.
Fit Type:Slim Fit,Oversized,Regular,Skinny Fit,Loose Fit.
Top Wear Length:Midi,Maxi,Mini,Above Knee,Cropped,Regular,hip length, waist length.

BOTTOMWEAR--

Products:Trouser,Jeans,Shorts,Trackpants,Joggers,Cargos,Skirts, dress pants,leggings, pants.
Color:Pink,Red,Blue,Black,Grey,Navy Blue,Charcoal Grey,White,Green,Olive,Brown,Beige,Khaki,Cream,Maroon,Off White,Grey Melange,Teal,Coffee Brown,Pink,Mustard,Purple,Rust,Sea Green,Burgundy,Turquoise Blue,Taupe,Silver,Mauve,Orange,Yellow,Multi,Lavender,Tan,Peach,Magenta,Fluorescent Green,Coral,Copper.
Gender:Men,Women,Girls,Boys.
Pattern:Striped,Checked,Embellished,Ribbed,Colorblocked,Dyed,Printed,Embroidered,Self-Design,Solid,Graphic,Floral,Polka Dots,Camouflage,Animal,Self-Design,Ombre.
Bottom Style:Flared,Straight,Loose Fit,A-Line,Peplum,Skinny Fit,Pencil, Slim,wide leg,tapered,wide leg.
Bottom Length:Above Knee,Below Knee,Knee Length,Midi,Mini,Ankle,Maxi,Regular length,full length.
Waist Rise:High-Rise,Low-Rise,Mid-Rise.
Fabric:Cotton,Chambray,Polyester,Leather,Denim,Corduroy,Silk,Wool,Fleece,Velvet.
Brand:Mango,Puma,Adidas,Nike,Calvin Klein,Lacoste,Fred Perry,Brooks Brothers,GAP,Levis,GANT,Superdry,Tommy Hilfiger,H&M,Zara,Louis Phillipe,Polo Raulph Lauren,Guess,Gucci,Prada,Versace,Aeropostale,Abercrombie & Fitch,DKNY,Michael Kors,Coach,Fendi.
Occasion:Casual,Formal,Parties/Evening,Sports,Maternity,Ethnic,Everyday,Work.
Fit Type:Slim Fit,Oversized,Regular,Skinny Fit,Loose Fit.
"""

#### Azure Open Ai model - GPT-4O

In [0]:
vision_model = MultiModel(api_key=dbutils.secrets.get(scope="dbx-us-scope",key="azure-openai-llm-api-key") , temp=0)

#### Env Variables

In [0]:
storage_account_name = "adlsusdldev02"
storage_account_access_key ==dbutils.secrets.get(scope="dbx-us-scope",key="storage-account-access-key") 
container_name = "intellitag"
mount_point = "/mnt/my_intellitag_mount"
database_name = "intellitag_catalog.intellitag_dbx"
table_name = "Result_Logs"
file_metadata="File_Metadata"

model="gpt4odeployment"
api_key= dbutils.secrets.get(scope="dbx-us-scope",key="azure-openai-llm-api-key") 
base_url= dbutils.secrets.get(scope="dbx-us-scope",key="azure-openai-llm-base-url") 
api_type= "azure"
api_version= "2024-02-01"

In [0]:

def list_jpg_files_recursively(directory):
    """
    Recursively lists all .jpg files in a given directory and its subdirectories on DBFS, 
    and returns their paths as a list with paths formatted for local access.

    Args:
        directory (str): The path of the directory to search for .jpg files.

    Returns:
        list: A list of file paths for all .jpg files found, with DBFS paths replaced by local file paths.
    """
    files = []  # Initialize an empty list to store the .jpg file paths
    
    # List all items (files and directories) in the provided directory
    items = dbutils.fs.ls(directory)
    
    for item in items:
        if item.isDir():
            # If the item is a directory, recursively search within it
            files.extend(list_jpg_files_recursively(item.path))
        elif item.path.endswith(".jpg"):
            # If the item is a .jpg file, add its path to the list
            # Convert DBFS path to a local path format (replace "dbfs:/" with "/dbfs/")
            files.append(item.path.replace("dbfs:/", "/dbfs/"))
    
    return files  # Return the list of .jpg file paths

#+"/Intellitag_Uploads" #+ "/Intellitag_Testing"  #"/Origamis Intellitag Testing"

directory_to_search = mount_point+ "/Intellitag_Testing/" 

# Get the list of all .jpg files
jpg_files = list_jpg_files_recursively(directory_to_search)

#### Filter the Data
Find the images which has the ground truth in the files.

In [0]:
import pandas as pd

top_wear_df=pd.read_excel("All-wears-groundtruths-json-top-bottom.xlsx",sheet_name="top-wear-updated").fillna("")
bottom_wear_df=pd.read_excel("All-wears-groundtruths-json-top-bottom.xlsx",sheet_name="bottom-wear-updated").fillna("")

top_wear_product_list=top_wear_df['KEY'].to_list()
bottom_wear_product_list=bottom_wear_df['KEY'].to_list()

product_list=set(top_wear_product_list+bottom_wear_product_list)

final_filter_product_list=[img_path for img_path in product_list if "/dbfs/mnt/my_intellitag_mount/Intellitag_Testing/" + img_path in jpg_files]

#### Without Agent Prompt

In [0]:
# model prompt
# Given is the image of clothing catalog.Your task is to identify items or products of clothing only which is in focus from a given image and based on the identified items or products
metadata_json = {"generated_tags": [
                                {
                                "products category": "string",
                                "Products": "string",
                                "Color": "string",
                                "Gender": "string",
                                "Pattern": "string",
                                "Silhouette": "string",
                                "Neckline": "string",
                                "Sleeve Length": "string",
                                "Sleeve Style": "string",
                                "Fabric": "string",
                                "Brand": "string",
                                "Occasion": "string",
                                "Fit Type": "string",
                                "Top Wear Length": "string",
                                }
                                ]
                }

predictor_system_message = f"""
You will receive an image_path as input.
1. Analyze the provided catalog image and identify all the topwear and/or bottomwear items that are in focus and fully captured within the image.
2. For each identified clothing item, extract the following metadata:
	{tags}
3. Organize the metadata for each identified clothing item in a structured JSON format, with each item represented as a dictionary containing the specified keys.
4. If the color recognition tags provided do not cover all the colors present in the image, feel free to use additional color descriptors as necessary.
5. Ensure that the gender and material composition type are correctly mapped based on the identified products.
6. Carefully match the provided tags with the appropriate metadata entities, and do not include any additional comments in the JSON output.
7. If the image contains clothing items that are not fully captured or are out of focus, do not include them in the output.
8. If the image does not contain any topwear or bottomwear items, or if the provided tags do not match the contents of the image, respond with an empty JSON array.
<thinking>
Review the provided image and tags, and extract the relevant metadata for each identified clothing item according to the instructions above. Organize the metadata in a structured JSON format, ensuring that all required fields are populated accurately.
</thinking>

Answer in the below format and DO NOT explain anything else:

Format:
 {str(metadata_json)}
"""


In [0]:
def json_sorting(predicted_output,groundtruth_output):
    """
    Sorts predicted and ground truth JSON objects into two categories: 'top-wear' and 'bottom-wear'.
    It appends the corresponding JSON objects based on the 'products category' key into a dictionary.

    Args:
        predicted_output (list): A list of dictionaries representing predicted output, where each 
                                 dictionary contains a 'products category' key with values like "TOPWEAR" or "BOTTOMWEAR".
        groundtruth_output (list): A list of dictionaries representing ground truth output, where each 
                                   dictionary contains a 'products category' key with values like "TOPWEAR" or "BOTTOMWEAR".

    Returns:
        dict: A dictionary with two keys: 'top-wear' and 'bottom-wear'. Each key contains a list of dictionaries 
              (combined from predicted and ground truth) sorted into their respective categories.
    """
    # Initialize an empty dictionary to store sorted items into 'top-wear' and 'bottom-wear' categories
    category_dict={'top-wear':[],'bottom-wear':[]}

    # Loop through each dictionary in the predicted output
    for dict1 in predicted_output:
        # Check if the 'products category' is 'TOPWEAR', and append to the 'top-wear' list
        if dict1["products category"]=="TOPWEAR":
            category_dict["top-wear"].append(dict1)
        # Check if the 'products category' is 'BOTTOMWEAR', and append to the 'bottom-wear' list
        elif dict1["products category"]=="BOTTOMWEAR":
            category_dict["bottom-wear"].append(dict1)

    # Loop through each dictionary in the ground truth output
    for dict1 in groundtruth_output:
        # Check if the 'products category' is 'TOPWEAR', and append to the 'top-wear' list
        if dict1["products category"]=="TOPWEAR":
            category_dict["top-wear"].append(dict1)
        # Check if the 'products category' is 'BOTTOMWEAR', and append to the 'bottom-wear' list
        elif dict1["products category"]=="BOTTOMWEAR":
            category_dict["bottom-wear"].append(dict1)
            
    return category_dict


In [0]:
def get_score(category_dict):
    """
    Calculates the accuracy score based on predicted and ground truth values for different categories 
    such as 'top wear' and 'bottom wear'. The function compares the predicted values with the ground truth,
    determines correctness, and computes a score for each category and an overall average score.

    Args:
        category_dict (dict): A dictionary where keys are category types (e.g., 'top wear', 'bottom wear'),
                              and values are tuples containing predicted values and ground truth values.

    Returns:
        dict: A dictionary containing:
              - 'avg_score': The average accuracy score across all categories.
              - 'sum_corrected_values': Total number of incorrect predictions.
              - 'sum_predicted_values': Total number of predicted values considered.
    """
    # Initialize dictionaries to store scores and counts
    score_dict={}
    predicted_values_dict={}
    corrected_values_dict={}

    # Loop through each category type in the category dictionary
    for category_type in category_dict:
        number_of_corrected_values = 0  # To count incorrect predictions
        number_of_predicted_values=0    # To count total predictions

        print("-------------------------------category key-------------------")
        print(category_type) # Print the current category, e.g., 'top wear', 'bottom wear'

        # Retrieve the predicted values and ground truth for the category
        final_predict,groundtruth=category_dict[category_type]
        
        # Loop through each key in the predicted values
        for key in final_predict:
            # Check if the predicted key exists in the ground truth and is not a category label
            if key in groundtruth.keys() and key not in ["products category"]:

                # Treat 'Not Available' in ground truth as equivalent to 'Not Visible'
                if groundtruth[key]=="Not Available":
                    groundtruth[key]="Not Visible"

                # Convert to lowercase and check if predicted and ground truth values match
                if groundtruth[key].lower().replace("-"," ") in  final_predict[key].lower().replace("-"," ") or final_predict[key].lower().replace("-"," ") in groundtruth[key].lower().replace("-"," "):
                    # Step-1 :: Check if ground truth is present and predicted value is missing
                    if groundtruth[key]!="" and final_predict[key]=="":
                        number_of_predicted_values+=1
                        number_of_corrected_values+=1
                        print("============================================")
                        print(f"key :: {key} ::: Ground Truth :: {groundtruth[key]} ::: Predicted Value :: {final_predict[key]} ::: Status :: Incorrect")
                    else:
                        number_of_predicted_values+=1
                        print("============================================")
                        print(f"key :: {key} ::: Ground Truth :: {groundtruth[key]} ::: Predicted Value :: {final_predict[key]} ::: Status :: Correct")
                else:
                    # If prediction is incorrect, increment counts
                    number_of_corrected_values+=1
                    number_of_predicted_values+=1
                    print("=============================================")
                    print(f"key :: {key} ::: Ground Truth :: {groundtruth[key]} ::: Predicted Value :: {final_predict[key]} ::: Status :: Incorrect")
                   
            else:
                pass
                # If predicted keys are not found in the ground truth, skip
                # print("Predicted keys not found in the groundtruth ::",key)

        # print for summary of the current category's results        
        print("-----------------------------------------------------")
        print(f"Type :: {category_type}  :::  number_of_corrected_values :: {number_of_corrected_values} ::: number_of_predicted_values:{number_of_predicted_values}")
        print("-----------------------------------------------------")
        
        # Calculate score for the current category (accuracy)
        score_dict[category_type]=1-(number_of_corrected_values/number_of_predicted_values)
        corrected_values_dict[category_type]=number_of_corrected_values
        predicted_values_dict[category_type]=number_of_predicted_values
    
    # Calculate the average score across all categories
    avg_score=sum(score_dict.values())/2

    # Sum the total corrected and predicted values across all categories
    sum_corrected_values=sum(corrected_values_dict.values())
    sum_predicted_values=sum(predicted_values_dict.values())
    
    # Return a dictionary containing the overall results
    return {"avg_score":avg_score,"sum_corrected_values":sum_corrected_values,"sum_predicted_values":sum_predicted_values}

In [0]:
result_dict={}
result_list=[]

print("Total Number Test Images ::",len(final_filter_product_list))

# Loop through each file path in the list of final filtered products
for file_path in final_filter_product_list:
    #--------------------- Load and display the image----------------------------
    # Construct the full image path by appending the file path to the DBFS mount location
    image_path="/dbfs/mnt/my_intellitag_mount/Intellitag_Testing/" + file_path

    # Find the ground truth for the given image by filtering the top-wear and bottom-wear dataframes
    top_wear_filter_list=top_wear_df.loc[top_wear_df['KEY']==file_path].to_dict(orient='records')
    bottom_wear_filter_list=bottom_wear_df.loc[bottom_wear_df['KEY']==file_path].to_dict(orient='records')
    # Combine the filtered lists from top-wear and bottom-wear to form the ground truth
    groundtruth=top_wear_filter_list+bottom_wear_filter_list
     
    #----------------------- Run the Custom Flow --------------------------------
    # Call the vision model's function to get predictions for the image
    output = vision_model.gpt4v_img(
        predictor_system_message, image_path=image_path)
    
    # converting the string response into a Python dictionary
    predicted_dict = eval(output['final_response'].replace("```json", '').replace('```', ''))

    #----------------------Get final dict --------------------------
    # Store the predicted results in the result_dict with the file_path as the key
    result_dict[file_path]=predicted_dict
   
    # Get the final predicted tags from the result dictionary
    final_predict=result_dict[file_path]["generated_tags"]
    # Sort the predicted and ground truth data into categories using json_sorting function
    category_dict=json_sorting(final_predict,groundtruth)
    # Get the evaluation score by comparing the predicted tags with the ground truth
    evaluation_results=get_score(category_dict)
    # Prepare a dictionary with image path, ground truth, predictions, and evaluation results
    data = {
            'image_path': file_path,
            'groundtruth': groundtruth,
            'final_predictor': predicted_dict,
            'evaluation_score': evaluation_results["avg_score"],
            'sum_corrected_values': evaluation_results["sum_corrected_values"],
            'sum_predicted_values': evaluation_results["sum_predicted_values"]
            }
    # Append the data dictionary to the result_list
    result_list.append(data)
    # Pause execution for 30 seconds to avoid rate limiting
    time.sleep(30)

# data_df = spark.createDataFrame([data], schema=schema)
# data_df.write.format("delta").mode("append").saveAsTable(evaluation_table)
# ##################
    

In [0]:
# Specify the CSV file name
csv_file = "Evaluation_without_agent_final.csv"

# Get the keys from the first dictionary to use as column headers
keys = result_list[0].keys()

# Open a CSV file for writing
with open(csv_file, mode='w', newline='') as file:
    writer = csv.DictWriter(file, fieldnames=keys)

    # Write the header
    writer.writeheader()

    # Write the data rows
    writer.writerows(result_list)

print(f"Data has been written to {csv_file}.")


## Agentic Evaluation

In [0]:
llm_config_azure = [
    {
        "model": model,
        "api_key": api_key,
        "base_url": base_url,
        "api_type": api_type,
        "api_version": api_version,
        "temperature":0.0
    }
]

llm_config = {"config_list": llm_config_azure}

In [0]:
# User proxy
tip_message = "\n If you do your BEST WORK, I'll tip you $100!"

##### User Proxy Agent

In [0]:
user_proxy = autogen.UserProxyAgent(
    name="Admin",
    system_message="A human admin. Once the task is completed, answer 'TERMINATE-AGENT'",
    human_input_mode="NEVER",
    is_termination_msg=lambda msg: "TERMINATE-AGENT" in msg["content"].lower(),
    code_execution_config=False,
)

#### Predictor Agent

In [0]:
metadata_json = {"generated_tags": [
                                {
                                "products category": "string",
                                "Products": "string",
                                "Color": "string",
                                "Gender": "string",
                                "Pattern": "string",
                                "Silhouette": "string",
                                "Neckline": "string",
                                "Sleeve Length": "string",
                                "Sleeve Style": "string",
                                "Fabric": "string",
                                "Brand": "string",
                                "Occasion": "string",
                                "Fit Type": "string",
                                "Top Wear Length": "string",
                                }
                                ]
                }

predictor_system_message = f"""
You will receive an image_path as input.
1. Analyze the provided catalog image and identify all the topwear and/or bottomwear items that are in focus and fully captured within the image.
2. For each identified clothing item, extract the following metadata:
	{tags}
3. Organize the metadata for each identified clothing item in a structured JSON format, with each item represented as a dictionary containing the specified keys.
4. If the color recognition tags provided do not cover all the colors present in the image, feel free to use additional color descriptors as necessary.
5. Ensure that the gender and material composition type are correctly mapped based on the identified products.
6. Carefully match the provided tags with the appropriate metadata entities, and do not include any additional comments in the JSON output.
7. If the image contains clothing items that are not fully captured or are out of focus, do not include them in the output.
8. If the image does not contain any topwear or bottomwear items, or if the provided tags do not match the contents of the image, respond with an empty JSON array.


<thinking>
Review the provided image and tags, and extract the relevant metadata for each identified clothing item according to the instructions above. Organize the metadata in a structured JSON format, ensuring that all required fields are populated accurately.
</thinking>

Answer in the below format and DO NOT explain anything else:

Format:
 {str(metadata_json)}
"""

#------------------------------------------------------------------------------------------------------------------


predictor = MultimodalConversableAgent(
    name="predictor", system_message=predictor_system_message, llm_config=llm_config
)

##### Evaluator Agent

In [0]:
sample_response_evaluator = {
    "generated_tags_with_critic": [
        [{"tag": "Products", "value": "T-shirt", "score": 1, "critic_message": ""},
        {"tag": "Color", "value": "Blue", "score": 1, "critic_message": ""},
        {"tag": "Pattern","value": "Solid","score": 0,"critic_message": "The pattern is actually striped."}]
    ],
}

evaluator = MultimodalConversableAgent(
    name="evaluator",
    system_message=f"""You are an evaluator agent. Your task is to evaluate the accuracy of the tags generated for a given image.
    You will receive an image and generated tags as inputs.
    Analyze the image thoroughly and for each of the generated tag, give a score of 0 or 1. 0 if incorrect, 1 if it is correct.
    If the score is 0 for a given tag, also give a critic message for that particular tag.
    Finally answer the image and your generated tags with score and message. Follow the example response given below
    
    Refer to the pre-defined tags and values. Your evaluation must be based on this.
    pre-defined tags: {tags}

    If your evaluation score for all the tags are 1 and all tags are correct, then must write 'ALL-GOOD' at last.
    Sample response format: 
    {sample_response_evaluator}""",
    llm_config=llm_config,
)


##### Terminator Agent

In [0]:
terminator_system_message = f"""

You need to answer with 'max-3-tries'. Do NOT add any introductory phrase or do NOT explain anything else.

"""

tip_message = "\nIf you do your BEST WORK, I'll tip you $100!"

terminator = autogen.AssistantAgent(
    name="terminator",
    system_message=terminator_system_message + tip_message,
    human_input_mode="NEVER",
    llm_config=llm_config,
)

In [0]:
def check_name_occurrences(data, name_value, no_of_iters):
    count = sum(1 for entry in data if entry.get("name") == name_value)
    # print('count: ', count)
    return count >= no_of_iters


def state_transition(last_speaker, groupchat):
    messages = groupchat.messages
    # print('messages: ', messages)
    # print('groupchat_messages', messages)
    if last_speaker is user_proxy:
        # init -> retrieve
        return predictor
    if last_speaker is predictor:
        return evaluator

    if last_speaker is evaluator:
        if "all-good" in messages[-1]["content"].lower():
            None
        elif check_name_occurrences(messages, "evaluator", 3):
            return terminator
        else:
            return predictor

groupchat = autogen.GroupChat(
    agents=[user_proxy, predictor, evaluator, terminator],
    messages=[],
    max_round=50,
    speaker_selection_method=state_transition,
)


manager_system_message = """
"""

manager = autogen.GroupChatManager(
    groupchat=groupchat, system_message=manager_system_message, llm_config=llm_config
)

Delete cache folder
---------------------------------------------------------------

In [0]:
def delete_Cach_folder():
      
    # Define the directory name
    dir_name = ".cache"
    
    # # Get the current working directory
    current_dir = os.getcwd()
    
    # # Construct the full path to the directory
    dir_path = os.path.join(current_dir, dir_name)
    
    # # Check if the directory exists
    if os.path.exists(dir_path) and os.path.isdir(dir_path):
        # Remove the directory and its contents
        shutil.rmtree(dir_path)
        print(f"The directory '{dir_name}' has been deleted.")
    else:
        print(f"The directory '{dir_name}' does not exist.")
 

Get final dictionary 
---------------------------------------------------------------

In [0]:
def get_final_dict(result, model_type):
    try:
        predict_dict={}
        for num,pr in enumerate([history for history in result.chat_history if history['name']=="predictor"]):
            # predictor_results=pr['content'][pr['content'].find("["):pr['content'].rfind("]")+1].replace("```json","").replace("```","")
            # predict_dict[num]=ast.literal_eval(predictor_results)
            predict_dict[num]=eval(pr['content'].replace("```json","").replace("```",""))

        return str(predict_dict[max(predict_dict.keys())])
    except Exception as e:
        print(e)
        return None

In [0]:
# final_predict=eval(result_dict["Semi-Formal/Men/img2.jpg"])["generated_tags"]
# final_predict

In [0]:
# from collections import OrderedDict

# def merge_dicts_in_order(dict1, dict2):
#             # Start with dict1 to preserve its order
#             merged_dict = OrderedDict(dict1)
#             for key, value2 in dict2.items():
#                 value1 = dict1.get(key, "")
#                 if key in merged_dict:
#                     # If both values are non-empty and not the same, concatenate them
#                     if value1 and value2 and value1 != value2:
#                         merged_dict[key] = f"{value1}, {value2}"
#                     else:
#                         # Otherwise, keep the non-empty value or an empty string if both are empty
#                         merged_dict[key] = value1 or value2
#                 else:
#                     # Add keys from dict2 that are not in dict1
#                     merged_dict[key] = value2
                    
#             return merged_dict

In [0]:
print(" Total Images ::",len(final_filter_product_list))

In [0]:
result_dict={}
result_list=[]

for file_path in final_filter_product_list:
    print(file_path)
    #--------------------- Load and display the image----------------------------
    image_path="/dbfs/mnt/my_intellitag_mount/Intellitag_Testing/" + file_path

    top_wear_filter_list=top_wear_df.loc[top_wear_df['KEY']==file_path].to_dict(orient='records')
    bottom_wear_filter_list=bottom_wear_df.loc[bottom_wear_df['KEY']==file_path].to_dict(orient='records')

    final_ground_truth_list=top_wear_filter_list+bottom_wear_filter_list
     
    #-----------------------Run the Agent ---------------------------------------
    img_path_format = f"image_path:   <img {image_path}>"
    result = user_proxy.initiate_chat(
        manager, message=img_path_format, summary_method="reflection_with_llm"
    )

    #----------------------Get final dict --------------------------
    final_dict=get_final_dict(result,"gpt-4o")
    result_dict[file_path]=final_dict
    #################
    groundtruth=final_ground_truth_list
    final_predict=eval(result_dict[file_path])["generated_tags"]
    final_predict
    category_dict=json_sorting(final_predict,groundtruth)
    evaluation_results=get_score(category_dict)
    data = {
        'image_path': file_path,
        'groundtruth': final_ground_truth_list,
        'final_predictor': final_dict,
        'evaluation_score': evaluation_results["avg_score"],
        'sum_corrected_values': evaluation_results["sum_corrected_values"],
        'sum_predicted_values': evaluation_results["sum_predicted_values"]
        }
    # return embed_data
    result_list.append(data)
    # delete cache folder
    delete_Cach_folder()

In [0]:
import csv

# Specify the CSV file name
csv_file = "Evaluation_agentic_final.csv"

# Get the keys from the first dictionary to use as column headers
keys = result_list[0].keys()

# Open a CSV file for writing
with open(csv_file, mode='w', newline='') as file:
    writer = csv.DictWriter(file, fieldnames=keys)

    # Write the header
    writer.writeheader()

    # Write the data rows
    writer.writerows(result_list)

print(f"Data has been written to {csv_file}.")


In [0]:
########################################################################################################################################################