In [1]:
import json
import time
from typing import Dict, List
import torch
from PIL import Image
from modelscope import AutoModel, AutoTokenizer, BitsAndBytesConfig
import datetime
import random
import pandas as pd
import numpy as np
import os
from tqdm import tqdm

2024-07-29 19:48:02.890017: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-07-29 19:48:02.901222: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-07-29 19:48:02.916335: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-07-29 19:48:02.920609: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1452] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-07-29 19:48:02.931504: I tensorflow/core/platform/cpu_feature_guar

In [2]:



def read_test_csv(csv_file_path_: str):
    # 读取CSV文件
    return pd.read_csv(csv_file_path_)



In [3]:


from pandas import Series

data_labels = {
    "skirt_length_labels": ["Invisible", "Short Length", "Knee Length", "Midi Length", "Ankle Length", "Floor Length"],
    "coat_length_labels": ["Invisible", "High Waist Length", "Regular Length", "Long Length", "Micro Length",
                           "Knee Length", "Midi Length", "Ankle&Floor Length"],
    "collar_design_labels": ["Invisible", "Shirt Collar", "Peter Pan", "Puritan Collar", "Rib Collar"],
    "lapel_design_labels": ["Invisible", "Notched", "Collarless", "Shawl Collar", "Plus Size Shawl"],
    "neck_design_labels": ["Invisible", "Turtle Neck", "Ruffle Semi-High Collar", "Low Turtle Neck", "Draped Collar"],
    "neckline_design_labels": ["Invisible", "Strapless Neck", "Deep V Neckline", "Straight Neck", "V Neckline",
                               "Square Neckline", "Off Shoulder", "Round Neckline", "Sweat Heart Neck",
                               "One	Shoulder Neckline"],
    "pant_length_labels": ["Invisible", "Short Pant", "Mid Length", "3/4 Length", "Cropped Pant", "Full Length"],
    "sleeve_length_labels": ["Invisible", "Sleeveless", "Cup Sleeves", "Short Sleeves", "Elbow Sleeves", "3/4 Sleeves",
                             "Wrist Length", "Long Sleeves", "Extra Long Sleeves"]
}


# 根据数据 Images/collar_design_labels/ceb3b249ac875ce56558c442501bbd68.jpg,collar_design_labels,nnnny 和 data_labes 中的键值对，将数据标签转换为一个对象

class Attribute:
    def __init__(self, image_path: str, key: str, values: List[str], value: str, may_values: List[str]):
        self.key = key
        self.values = values
        self.value = value
        self.may_values = may_values
        self.image_path = f'{image_path}'
        self.conversation = {}
        self.format_key = self.key.replace("_", " ").replace("labels", "")
        self.query_content = f"""Instruction:: Recognize the {self.format_key} of Cloth Design
                        Input:: You are a Senior Cloth Designer. Now you have a Cloth Picture. Please tell me what the {self.format_key} of this Cloth is after careful identification.
                        Requires::
                        - You should fully understand what the cloth is in the picture.
                        - If you don't know what the picture shows, return "Can't Recognize {self.image_path}."
                        - Think about the answer step by step and you must choose the answer from [{','.join(self.values)}]
                    """
        self.answer = f"""The [Cloth Design]'s [{self.format_key}] is [{self.value}]""" if not self.may_values else f"""The [Cloth Design]'s [{self.format_key}] is [{self.value}] or [::MayBe::{','.join(self.may_values)}]"""

    def __str__(self):
        return json.dumps(self.__dict__, ensure_ascii=False)

    def to_json_conversation(self, ) -> Dict:
        self.conversation = {
            "messages": [
                {
                    "role": "user",
                    "content": self.query_content,
                    "image": self.image_path,
                },
                {
                    "role": "assistant",
                    "content": self.answer
                }
            ]
        }
        return self.conversation

    def to_query_template(self) -> Dict:
        original_example_messages = self.conversation["messages"]
        query = original_example_messages[0]
        query_content = query["content"]
        format_key = self.key.replace("_", " ").replace("labels", "")
        #         new_query = f"""Instruction:: Recognize the {self.format_key} of Cloth Design
        #                         Input:: You are a Senior Cloth Designer. Now you have a Cloth Picture. Please tell me what the {self.format_key} of this Cloth is after careful identification.

        #                         Requires::
        #                         - You should fully understand what the cloth is in the picture, and describe it.
        #                         - If you don't know what the picture shows, return "Can't Recognize {self.image_path}."
        #                         - Think about the answer step by step and your answer must follow these rules:
        #                             Rule-No1. Only choose the answer from [{','.join(self.values)}]

        #                     """
        # query["content"] = new_query
        query["answer"] = self.value
        # print(query)
        return query


def convert_labels_to_object(base_img_path, data_labels_, label_list_: Series) -> Attribute:
    # 分割标签字符串

    image_path_ = label_list_[0]
    # 获取属性键和属性值
    attr_key = label_list_[1]
    attr_value_index = label_list_[2]
    # attr_value = 'nmmyn', 取出'y'的下标
    position = attr_value_index.find('y')
    # 查找属性值列表
    values = data_labels_[attr_key]
    # 获取属性值列表中对应下标的值
    indices = [values[index] for index, char in enumerate(attr_value_index) if char == 'm']
    attr_value = values[position] if position < len(values) else "Not Exist"
    # 创建并返回属性对象
    return Attribute(f'{base_img_path}/{image_path_}', attr_key, values, attr_value, indices)


In [4]:



def append_to_jsonl(jsonl_file_path, data):
    # 追加数据到jsonl文件
    with open(jsonl_file_path, 'a') as f:
        f.write(json.dumps(data) + '\n')


def process_and_append(base_img_path, jsonl_file_path, df_, nums_: int) -> List[Attribute]:
    # 检查文件是否存在，以及文件的大小
    start_index = 0
    attrs_ = []
    nums_ = nums_ if nums_ else len(df_)
    if os.path.exists(jsonl_file_path):
        with open(jsonl_file_path, 'r') as f:
            for line in f:
                start_index += 1
    datas_ = df_.sample(n=nums_)
    # 从上次中断的地方开始处理
    for index, row in tqdm(datas_.iterrows(), total=nums_, desc="Processing DataFrame"):
        if index < start_index:
            continue
        attribute_ = convert_labels_to_object(base_img_path, data_labels_=data_labels, label_list_=row)
        append_to_jsonl(jsonl_file_path, attribute_.to_json_conversation())
        attrs_.append(attribute_)
    return attrs_



In [5]:


# 调用模型进行推理
device = "cuda"
MODEL_PATH = "ZhipuAI/glm-4v-9b"

tokenizer = AutoTokenizer.from_pretrained("ZhipuAI/glm-4v-9b", trust_remote_code=True)


# 模型进行推理
def generate_input_(attribute_: Attribute):
    query = attribute_.to_query_template()
    # Function to load and process images
    image_path = query["image"]
    image = Image.open(image_path).convert('RGB')
    print(f"Image=> {image_path} | Query => {query['content']} | Expect Answer => {query['answer']}")
    # Function to prepare inputs for batch processing
    input_ = tokenizer.apply_chat_template([{"role": "user", "image": image, "content": query["content"]}],
                                           add_generation_prompt=True, tokenize=True, return_tensors="pt",
                                           return_dict=True)  # chat mode
    return input_


def prepare_inputs(attributes_: List[Attribute]):
    inputs_list = [generate_input_(attribute_) for attribute_ in attributes_]
    # Combine inputs into a batch
    input_ids_list = [inputs['input_ids'][0] for inputs in inputs_list]
    padded_input_ids = torch.nn.utils.rnn.pad_sequence(input_ids_list, batch_first=True,
                                                       padding_value=tokenizer.pad_token_id)
    attention_mask_ = torch.cat([padded_input_ids], dim=0)
    return {'input_ids': padded_input_ids, 'attention_mask': attention_mask_}


def infer_result_eval_(result_, attribute_: Attribute, ):
    """
    评估模型的输出结果
    """
    return {
        "correct": attribute_.value in result_,
        "result": result_,
        "image_path": attribute_.image_path
    }


def model_run(attributes_, ):
    # input_lists = prepare_inputs(attributes_)
    # inputs = {k: v.to(device) for k, v in input_lists.items()}
    model = AutoModel.from_pretrained(
        MODEL_PATH,
        trust_remote_code=True,
        quantization_config=BitsAndBytesConfig(load_in_4bit=True),
        torch_dtype=torch.bfloat16,
        low_cpu_mem_usage=True
    ).eval()
    gen_kwargs = {"max_length": 2500, "do_sample": True, "top_k": 1}
    eval_results_ = []
    with torch.no_grad():
        for attribute_ in attributes_:
            start_time = time.time()
            input_ = generate_input_(attribute_)
            input_ = input_.to(device)
            outputs = model.generate(**input_, **gen_kwargs)
            outputs = outputs[:, input_['input_ids'].shape[1]:]
            result = tokenizer.decode(outputs[0])
            print(result)
            end_time = time.time()
            print(f"推理时间: {end_time - start_time:.4f} 秒")
            eval_obj = infer_result_eval_(result, attribute_)
            eval_results_.append(eval_obj)
            # 将 result_obj 保存到 jsonl 文件中
    return eval_results_



Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


In [10]:
# APP Block 代码启动模块
def jsonl_file_creator(out_put_file, csv_file_path: str, image_folder_path: str,sample_num:int):
    """
    csv_file_path: csv 文件, 用于构建对话数据
    image_folder_path: 图片文件路径
    """
    df = read_test_csv(csv_file_path)
    #  从 df 数据(pandas)中的 dataFrame 中提取数据，并转换为 Attribute 对象列表
    attributes = process_and_append(image_folder_path, out_put_file, df, sample_num)
    # queries = [q.to_query_template() for q in attributes]
    # print(queries)
    # 运行模型进行评估
    eval_objs = []
    # model_run(attributes)
    # 将评估结果, 放到 result 结果中.
    if eval_objs:
        with open(f'result-{version_}.jsonl', 'w', encoding='utf-8') as file:
            file.write(json.dumps({
                "type": "META-INFO",
                "version": version_,
                "eval_objs": attributes[0].to_query_template(),
                "totals": len(eval_objs),
                "correct": len([eval_obj for eval_obj in eval_objs if eval_obj["correct"]]),
            }, ensure_ascii=False))
            for obj in eval_objs:
                json_line = json.dumps(obj, ensure_ascii=False) + '\n'
                file.write(json_line)

In [11]:
# 生成训练数据

# 获取当前的日期和时间
now = datetime.datetime.now()
# 将日期和时间以及随机数字组合成 version_
version_ = f"{now.strftime('%Y%m%d%H%M')}"



jsonl_file_creator(f"train_{version_}.jsonl", 'r1_train/Tests/label.csv', '/mnt/workspace/data-clear/train_images', 1000)
# 生成测试数据
jsonl_file_creator(f"dev_{version_}.jsonl", 'r1_a/Tests/label.csv', '/mnt/workspace/data-clear/val_images', 1000)

  image_path_ = label_list_[0]
  attr_key = label_list_[1]
  attr_value_index = label_list_[2]
Processing DataFrame: 100%|██████████| 1000/1000 [00:00<00:00, 8025.13it/s]
  image_path_ = label_list_[0]
  attr_key = label_list_[1]
  attr_value_index = label_list_[2]
Processing DataFrame: 100%|██████████| 1000/1000 [00:00<00:00, 8386.33it/s]
