In [None]:
import sys
import json
import re
import pandas as pd
import numpy as np
from tqdm import tqdm
from matrix import *
import os
from openai import OpenAI
import concurrent.futures

In [None]:
class Data_Processor:
    def __init__(self, meta_data_path, review_data_path, test_data_path, api_key, base_url):
        self.meta_data_path = meta_data_path
        self.review_data_path = review_data_path
        self.test_data_path = test_data_path
        self.api_key = api_key
        self.base_url = base_url
        self.client = OpenAI(api_key=self.api_key, base_url=self.base_url)

    def select_MAIN_image(self, images):
        if str(images) != 'nan':
            try:
                # 从images中选择variant为MAIN的图片, 如果没有MAIN的图片，选择第一张
                for image in images:
                    if image['variant'] == 'MAIN':
                        return image['hi_res']
                return images[0]['hi_res']
            except:
                return None
        else:
            return None

    def get_Images_from_all_items(self):
        # 从meta_data中获取所有item的images
        meta_data = pd.read_parquet(self.meta_data_path)
        all_images = []
        for i in range(len(meta_data)):
            item = meta_data.iloc[i]
            all_images.append(self.select_MAIN_image(item['images']))
        return all_images

    def convert_images_to_text(self, system_prompt, user_prompt, image_url, model="qwen-vl-plus-latest"):
        if image_url is None:
            return 'None'
        completion = self.client.chat.completions.create(
            model=model,
            messages=[{"role": "system", "content": system_prompt},
                      {"role": "user", "content": [{"type": "text", "text": user_prompt},
                                                   {"type": "image_url", "image_url": {"url": image_url}}]}
                      ])
        return completion.choices[0].message.content

    def save_all_images_to_text(self, system_prompt, user_prompt, model="qwen-vl-plus-latest", save_path='new_data'):
        all_image_url = self.get_Images_from_all_items()
        all_image_text = []
        for url in all_image_url:
            print(url)
            try:
                answer = self.convert_images_to_text(system_prompt, user_prompt, url, model)
                print(answer)
                all_image_text.append(answer)
            except Exception as e:
                print(e)
                all_image_text.append('None')
        # save to file parquet
        df = pd.DataFrame(all_image_text, columns=['image_text'])
        df.to_parquet(save_path + '/image_text.parquet')

    def check_and_save_images_to_text(self, system_prompt, user_prompt, model="qwen-vl-plus-latest", save_path='new_data'):
        if os.path.exists(save_path + '/image_text.parquet'):
            image_text = pd.read_parquet(save_path + '/image_text.parquet')
            all_image_url = self.get_Images_from_all_items()
            for i in range(len(image_text)):
                if image_text.iloc[i]['image_text'] == 'None' and all_image_url[i] is not None:
                    try:
                        print(all_image_url[i])
                        answer = self.convert_images_to_text(system_prompt, user_prompt, all_image_url[i], model)
                        print(answer)
                        image_text.loc[i, 'image_text'] = answer
                    except Exception as e:
                        print(e)
                        image_text.loc[i, 'image_text'] = 'None'
            # save to file parquet
            image_text.to_parquet(save_path + '/image_text.parquet')
        else:
            self.save_all_images_to_text(system_prompt, user_prompt, model, save_path)

    def delete_None_in_details(self, details):
        """
        Delete the None in details
        :param details: the details of the item, dict
        :return: the details of the item without None
        """
        for key in list(details.keys()):
            if type(details[key]) == dict:
                for k in list(details[key].keys()):
                    if details[key][k] is None:
                        del details[key][k]
            elif details[key] is None:
                del details[key]
        return details

    def generate_item_prompt_efficient(self, item_id, item_df):
        """
        Generate the item prompt, efficient version, without reading the data again
        :param item_id: int, the index of the item
        :return: str, the item prompt
        """
        data = item_df[item_df['parent_asin'] == item_id]
        item = data.iloc[0]
        prompt = f"Item ID: {item_id}\n" \
                 f"Title: {item['title']}\n" \
                 f"Average Rating: {item['average_rating']} ({item['rating_number']} ratings)\n" \
                 f"Price: {item['price']}\n" \
                 f"Store: {item['store']}"
        #  f"Categories: {item['categories'].tolist()}\n" \
        #  f"Details: {self.delete_None_in_details(item['details'])}"
        return prompt

    def generate_candidate_data(self, user_id, test_data, num_candidates, len_meta_data=711, if_shuffle=False):
        # 从meta_data中随机选取num_candidates-1个item，加上用户的最后一个交互item作为候选
        # 输出两个，一个是candidate item的list，一个是用户最后一个交互的item的index
        history = (test_data[test_data['user_id'] == user_id]['history'].values[0]).tolist()
        last_interacted = int(test_data[test_data['user_id'] == user_id]['parent_asin'].values[0])
        # 从meta_data中随机选取num_candidates-1个item，保证不在history中
        candidate_data = []
        for i in range(num_candidates-1):
            # 随机生成一个index
            index = np.random.randint(0, len_meta_data-1)
            while index in history or index in candidate_data or index == last_interacted:
                index = np.random.randint(0, len_meta_data-1)
            candidate_data.append(index)
        # 加上用户的最后一个交互item
        candidate_data = [last_interacted] + candidate_data
        # 打乱顺序
        if if_shuffle:
            np.random.shuffle(candidate_data)
        # np.random.shuffle(candidate_data)
        return candidate_data

    def save_candidate_to_test_data(self, len_meta_data=711, is_shuffle=False, name='candidate'):
        # 在test_data中加入candidate一列，每个用户的candidate是一个list，包含num_candidates个item的index
        test_data = pd.read_parquet(self.test_data_path)
        candidate_data = []
        for i in range(len(test_data)):
            user_id = test_data.iloc[i]['user_id']
            candidate = self.generate_candidate_data(user_id, test_data, 10, len_meta_data, is_shuffle)
            print('user_id:', user_id)
            print('candidate:', candidate)
            candidate_data.append(candidate)
        test_data[name] = candidate_data
        test_data.to_parquet(self.test_data_path)

    def re_index_test_data(self):
        test_data = pd.read_parquet(self.test_data_path)
        test_data = test_data.reset_index(drop=True)
        test_data.to_parquet(self.test_data_path)

    def re_index_meta_data(self):
        meta_data = pd.read_parquet(self.meta_data_path)
        meta_data = meta_data.reset_index(drop=True)
        meta_data.to_parquet(self.meta_data_path)

    def get_history_prompt_for_one_user(self, user_id, meta_data, review_data, test_data, images_data, max_history=15):
        # 为一个用户生成用户偏好总结
        history = (test_data[test_data['user_id'] == user_id]['history'].values[0]).tolist()
        if len(history) > max_history:
            history = history[-max_history:]
        user_preference_prompt = ''
        for i in range(len(history)):
            item_id = history[i]
            item_index = meta_data[meta_data['parent_asin'] == item_id].index[0]
            item_info = self.generate_item_prompt_efficient(item_id, meta_data)
            photos_text = images_data.iloc[item_index]['image_text']
            user_review = review_data[(review_data['user_id'] == user_id) & (review_data['parent_asin'] == item_id)].iloc[0]
            user_preference_prompt += item_info + '\n' \
                f'Description: {photos_text}\n' \
                f'User Review: {user_review["text"]}\n'\
                f'User Rating: {user_review["rating"]}\n\n'
        return user_preference_prompt

    def summariza_preference_for_one_user(self, user_id, meta_data, review_data, test_data, images_data, system_prompt, model="qwen2.5-7b-instruct-1m"):
        user_preference_prompt = self.get_history_prompt_for_one_user(user_id, meta_data, review_data, test_data, images_data)
        user_prompt = f'Please summarize the user preference with the following information:\n{user_preference_prompt}'
        result = self.client.chat.completions.create(
            model=model,
            messages=[{"role": "system", "content": system_prompt},
                      {"role": "user", "content": user_prompt},
                      ])
        return result.choices[0].message.content

    def summarize_user_preference(self, system_prompt, model="qwen2.5-7b-instruct-1m", save_path='new_data'):
        meta_data = pd.read_parquet(self.meta_data_path)
        review_data = pd.read_parquet(self.review_data_path)
        test_data = pd.read_parquet(self.test_data_path)
        images_data = pd.read_parquet(save_path + '/image_text.parquet')
        # 为每个用户生成用户偏好总结, 并保存到文件, 保存到test_data中 user_preference列
        user_preference = []
        for i in range(len(test_data)):
            user_id = test_data.iloc[i]['user_id']
            print('user_id:', user_id)
            try:
                result = self.summariza_preference_for_one_user(user_id, meta_data, review_data, test_data, images_data, system_prompt, model)
                print(result)
                user_preference.append(result)
            except Exception as e:
                print(e)
                user_preference.append('None')
        test_data['user_preference'] = user_preference
        test_data.to_parquet(self.test_data_path)

    def get_candidate_prompt_for_one_user(self, user_id, meta_data, test_data, images_data, candidate_name = "candidate", reason_data=None):
        # 为一个用户生成候选item总结
        candidate = (test_data[test_data['user_id'] == user_id][candidate_name].values[0]).tolist()
        target = (test_data[test_data['user_id'] == user_id]['parent_asin'].values[0])
        target_score = (test_data[test_data['user_id'] == user_id]['rating'].values[0])
        candidate_prompt = ''
        for i in range(len(candidate)):
            item_id = candidate[i]
            item_index = meta_data[meta_data['parent_asin'] == item_id].index[0]
            item_info = self.generate_item_prompt_efficient(item_id, meta_data)
            photos_text = images_data.iloc[item_index]['image_text']
            candidate_prompt += item_info + '\n' \
                f'Photos Text: {photos_text}\n\n'
        return candidate_prompt, len(candidate), target, target_score

    def get_list_from_result(self, result):
        # 从openai的返回结果中提取用户的排序结果, 答案中应该有一个list，类似于[1, 2, 3, 4, 5]
        # 返回一个list
        # 首先尝试用正则表达式提取
        result = result.replace('\n', '')
        result = result.replace(' ', '')
        pattern = re.compile(r'\[(.*?)\]')
        match = pattern.search(result)
        # 如果匹配不到，通过提取数字来获取
        if match is None:
            pattern = re.compile(r'\d+')
            match = pattern.findall(result)
        else:
            match = match.group(1)
            match = match.split(',')
        # 最终返回的结果type是list
        return [int(x) for x in match]

    def get_list_from_result_cot(self, result):
        """
        从LLM的返回结果中提取排序列表，特别处理带有思维链(CoT)的结果
        结果应该包含类似[1,2,3,4,5,6,7,8,9,10]这样的列表

        Args:
            result: LLM返回的字符串结果

        Returns:
            包含整数的列表，例如[1,2,3,4,5,6,7,8,9,10]
        """
        if result is None or not isinstance(result, str):
            return []

        # 预处理：删除换行符和多余空格
        result = result.replace('\n', ' ').replace('\t', ' ').replace('*', '')

        # 方法1：尝试查找标准列表格式 [x,x,x,...,x]
        pattern = re.compile(r'\[(\d+(?:\s*,\s*\d+){2,})\]')  # 至少3个数字的列表模式
        matches = pattern.findall(result)

        if matches:
            # 取最后一个匹配结果，通常CoT后的最终答案在最后
            numbers_str = matches[-1]
            # 解析数字
            try:
                numbers = [int(num.strip()) for num in numbers_str.split(',')]
                # 检查是否有约10个数字（允许8-12个）
                if 8 <= len(numbers) <= 12:
                    return numbers
            except:
                pass

        return []

    def convert_item_id_to_item_score(self, rank_result, target, target_score):
        # rank_result 为 list，包含了用户对候选item_id的排序
        # target 为用户的目标item_id
        # target_score 为用户对目标item的评分
        rank_result = [target_score if x == target else 0 for x in rank_result]
        if sum(rank_result) == 0:
            return 'Error'
        return rank_result

    def generate_sft_dataset(self, system_prompt, save_path='new_data'):
        # 生成sft_dataset
        meta_data = pd.read_parquet(self.meta_data_path)
        test_data = pd.read_parquet(self.test_data_path)
        images_data = pd.read_parquet(save_path + '/image_text.parquet')
        sft_dataset = []
        for i in range(len(test_data)):
            user_id = test_data.iloc[i]['user_id']
            print(i, ' User', user_id)
            candidate = (test_data.iloc[i]['candidate'].values[0]).tolist()
            target = test_data.iloc[i]['parent_asin'].values[0]
            # 打乱顺序
            np.random.shuffle(candidate)
            candidate_prompt = ''
            for j in range(len(candidate)):  # 修改i为j以避免与外层循环变量冲突
                item_id = candidate[j]
                item = meta_data[meta_data['parent_asin'] == item_id].iloc[0]
                candidate_prompt += f"Item ID: {item_id}\n" \
                    f"Title: {item['title']}\n" \
                    f"Average Rating: {item['average_rating']} ({item['rating_number']} ratings)\n" \
                    f"Price: {item['price']}\n" \
                    f"Store: {item['store']}\n" \
                    f"Description: {images_data.iloc[meta_data[meta_data['parent_asin'] == item_id].index[0]]['image_text']}\n\n"
            user_preference_prompt = test_data[test_data['user_id'] == user_id]['user_preference'].values[0]
            user_prompt = f"Please rank the following {len(candidate)} items based on user's preference and give back your answer with a form of list of these candidate items'ID :\n" \
                f"User Preference: {user_preference_prompt} \n" \
                f"Candidate Items: \n{candidate_prompt}"
            # 把候选item中的target item放在第一位
            candidate.remove(target)
            # 打乱顺序
            np.random.shuffle(candidate)
            candidate = [target] + candidate
            sft = {"instruction": system_prompt,
                   "input": user_prompt,
                   "output": candidate}  # candidate已经被转换为列表
            sft_dataset.append(sft)
        # save to file
        with open(save_path + '/sft_dataset.json', 'w') as f:
            json.dump(sft_dataset, f)


    def get_rank_result_for_one_user(self, user_id, meta_data, test_data, images_data, system_prompt, candidate_name="candidate", model="qwen2.5-7b-instruct-1m"):
        # ------ My baseline ------
        candidate_prompt, candidate_num, target, target_score = self.get_candidate_prompt_for_one_user(user_id, meta_data, test_data, images_data, candidate_name, reason_data=None)
        user_preference_prompt = test_data[test_data['user_id'] == user_id]['user_preference'].values[0]
        user_prompt = f"Please rank the following {candidate_num} items based on user's preference and give back your answer with a form of list of these candidate items'ID :\n \
            User Preference: {user_preference_prompt} \n \
            Candidate Items: \n{candidate_prompt}"
        # --------------------------
        # print(user_prompt)
        result = self.client.chat.completions.create(
            model=model,
            # temperature=0.2,
            messages=[{"role": "system", "content": system_prompt},
                      {"role": "user", "content": user_prompt},
                      ])
        return result.choices[0].message.content, target, target_score

    def get_sft_data_for_all_user(self, meta_data, test_data, images_data, system_prompt, rank_result_data, save_path='new_data'):
        sft_data = []
        for i in range(len(rank_result_data)):
            user_id = rank_result_data.iloc[i]['user_id']
            rank = rank_result_data.iloc[i]['rank_result']
            target = rank_result_data.iloc[i]['target']
            target_score = rank_result_data.iloc[i]['target_score']
            candidate = test_data[test_data['user_id'] == user_id]['candidate'].values[0]
            print('user_id:', user_id)
            candidate_prompt, candidate_num, target, target_score = self.get_candidate_prompt_for_one_user(user_id, meta_data, test_data, images_data, candidate_name="candidate", reason_data=None)
            user_preference_prompt = test_data[test_data['user_id'] == user_id]['user_preference'].values[0]
            user_prompt = f"Please rank the following {candidate_num} items based on user's preference and give back your answer with a form of list of these candidate items'ID :\n \
                User Preference: {user_preference_prompt} \n \
                Candidate Items: \n{candidate_prompt}"
            try:
                rank_list = self.get_list_from_result_cot(rank)
                rank_score = self.convert_item_id_to_item_score(rank_list, target, target_score)
            except Exception as e:
                print(e)
                continue
            if rank_score == 'Error':
                print('Error')
                continue
            if sorted(set(rank_list)) != sorted(set(candidate)):
                print('Mismatch')
                continue
            sft = {"instruction": system_prompt,
                   "input": user_prompt,
                   "output": rank}
            sft_data.append(sft)

        # save to file
        with open(save_path + '/sft_data_with_cot.json', 'w') as f:
            json.dump(sft_data, f)

    def save_rank_results_to_json(self, rank_result_path, system_prompt, save_path='new_data/train', output_file='rank_results_collection.json', candidate_name = "candidate"):
        """
        从rank_result文件中提取信息并保存到结构化JSON文件中

        Args:
            rank_result_path: 排序结果数据文件路径
            system_prompt: 系统提示词
            save_path: 保存路径
            output_file: 输出文件名

        Returns:
            包含所有用户结果的字典
        """
        output_filepath = os.path.join(save_path, output_file)

        # 加载排序结果数据
        rank_result_data = pd.read_parquet(rank_result_path)
        print(f"加载了 {len(rank_result_data)} 条数据来自 {rank_result_path}")

        # 加载其他必要数据
        meta_data = pd.read_parquet(self.meta_data_path)
        test_data = pd.read_parquet(self.test_data_path)
        images_data = pd.read_parquet(save_path + '/image_text.parquet')

        # 检查是否存在现有文件，如果存在则加载
        if os.path.exists(output_filepath):
            with open(output_filepath, 'r', encoding='utf-8') as f:
                try:
                    results_collection = json.load(f)
                    print(f"加载了现有的结果集合，包含 {len(results_collection)} 个用户")
                    # print(results_collection["50"])
                except json.JSONDecodeError:
                    print(f"现有文件格式错误，创建新的结果集合")
                    results_collection = {}
        else:
            results_collection = {}
            print(f"创建新的结果集合")

        # 确保保存路径存在
        os.makedirs(save_path, exist_ok=True)

        for i, row in tqdm(rank_result_data.iterrows(), total=len(rank_result_data), desc="处理用户"):
            user_id = str(row['user_id'])
            if results_collection.get(user_id) is None:
                rank = row['rank_result']
                target = row['target']
                target_score = row['target_score']
                candidate = test_data[test_data['user_id'] == int(user_id)][candidate_name].values[0]
                candidate_prompt, candidate_num, target, target_score = self.get_candidate_prompt_for_one_user(int(user_id), meta_data, test_data, images_data)
                user_preference_prompt = test_data[test_data['user_id'] == int(user_id)]['user_preference'].values[0]
                user_prompt = f"Please rank the following {candidate_num} items based on user's preference and give back your answer with a form of list of these candidate items'ID :\n \
                    User Preference: {user_preference_prompt} \n \
                    Candidate Items: \n{candidate_prompt}"

                try:
                    rank_list = self.get_list_from_result_cot(rank)
                    rank_score = self.convert_item_id_to_item_score(rank_list, target, target_score)
                except Exception as e:
                    print(e)

                if rank_score == 'Error':
                    rank_answer = 'Error'
                elif sorted(rank_list) != sorted(candidate):
                    rank_answer = 'Mismatch'
                else:
                    rank_answer = ndcg_at_k(rank_score, 10)
                results_collection[user_id] = {"prompt": system_prompt + "\n" + user_prompt, "target": str(target), "rank1": {"answer": rank, "score": rank_answer}}

            else:
                rank = row['rank_result']
                target = row['target']
                target_score = row['target_score']
                candidate = test_data[test_data['user_id'] == int(user_id)][candidate_name].values[0]

                try:
                    rank_list = self.get_list_from_result_cot(rank)
                    rank_score = self.convert_item_id_to_item_score(rank_list, target, target_score)
                except Exception as e:
                    print(e)

                if rank_score == 'Error':
                    rank_answer = 'Error'
                elif sorted(set(rank_list)) != sorted(set(candidate)):
                    rank_answer = 'Mismatch'
                else:
                    rank_answer = ndcg_at_k(rank_score, 10)
                # n 是当前的rank数
                n = len(results_collection[user_id])-1
                # print(n)
                results_collection[user_id][f"rank{n}"] = {"answer": rank, "score": rank_answer}

        # 保存到文件
        with open(output_filepath, 'w', encoding='utf-8') as f:
            json.dump(results_collection, f)

        print(f"保存了 {len(results_collection)} 个用户的排序结果到 {output_filepath}")

    def get_dpo_data_from_collection(self, rank_results_collection_path, save_path='new_data/train'):
        """
        从rank_results_collection.json生成DPO训练数据
        将有数字分数的结果作为chosen，没有数字分数的结果作为rejected
        
        Args:
            rank_results_collection_path: 排序结果集合的文件路径
            save_path: DPO数据保存路径
        
        Returns:
            生成的DPO数据列表
        """
        # 确保保存路径存在
        os.makedirs(save_path, exist_ok=True)
        
        # 加载排序结果集合
        try:
            with open(rank_results_collection_path, 'r', encoding='utf-8') as f:
                results_collection = json.load(f)
            print(f"加载了 {len(results_collection)} 个用户的排序结果")
        except Exception as e:
            print(f"加载排序结果集合失败: {e}")
            return []
        
        # 创建DPO数据列表
        dpo_data = []
        skipped_users = 0
        scored_vs_scored = 0
        scored_vs_error = 0
        total_users = len(results_collection)
        
        # 遍历所有用户
        for user_id, user_data in tqdm(results_collection.items(), desc="生成DPO数据"):
            # 获取用户的输入提示
            user_prompt = user_data.get("prompt", "")
            target = user_data.get("target", "")
            
            # 收集所有rank的结果，分为有分数和无分数两组
            ranks_with_score = []    # 有数字分数的结果
            ranks_without_score = [] # 分数为Error或Mismatch的结果
            
            # 遍历所有rank键
            for key, value in user_data.items():
                if key.startswith("rank") and isinstance(value, dict):
                    score = value.get("score")
                    answer = value.get("answer", "")
                    
                    # 检查score是否为数值
                    if isinstance(score, (int, float)) and not isinstance(score, bool):
                        ranks_with_score.append({
                            "key": key,
                            "answer": answer,
                            "score": score
                        })
                    else:
                        # 如果分数不是数值（如'Error'或'Mismatch'）
                        ranks_without_score.append({
                            "key": key,
                            "answer": answer,
                            "score": score
                        })
            
            # 如果没有有分数的结果，则跳过该用户
            if not ranks_with_score:
                skipped_users += 1
                continue
            
            # 按分数对有分数的结果排序（从高到低）
            sorted_ranks = sorted(ranks_with_score, key=lambda x: x["score"], reverse=True)
            
            # 获取得分最高的作为chosen
            chosen_result = sorted_ranks[0]["answer"]
            
            # 生成DPO样本：每个有分数但非最高分的结果作为一个rejected样本
            for i in range(1, len(sorted_ranks)):
                rejected_result = sorted_ranks[i]["answer"]
                
                dpo_item = {
                    "input": user_prompt,
                    "chosen": chosen_result,
                    "rejected": rejected_result,
                    "target": target,
                }
                scored_vs_scored += 1
                dpo_data.append(dpo_item)
            
            # 所有没有数字分数的结果也作为rejected样本
            for item in ranks_without_score:
                rejected_result = item["answer"]
                
                dpo_item = {
                    "input": user_prompt,
                    "chosen": chosen_result,
                    "rejected": rejected_result,
                    "target": target,
                }
                scored_vs_error += 1
                dpo_data.append(dpo_item)
        
        # 保存最终结果
        if dpo_data:
            with open(f"{save_path}/dpo_data_with_cot.json", 'w', encoding='utf-8') as f:
                json.dump(dpo_data, f, ensure_ascii=False)
        
        print(f"DPO数据生成完成，共 {len(dpo_data)} 对样本，跳过了 {skipped_users}/{total_users} 个用户")
        
        # 输出不同类型样本的统计
        print(f"有分数 vs 有分数的样本: {scored_vs_scored} 个")
        print(f"有分数 vs 错误/不匹配的样本: {scored_vs_error} 个")

    def save_rank_result_for_all_users(self, meta_data, test_data, images_data, reason_data, system_prompt, model="qwen2.5-7b-instruct-1m", save_path='new_data', candidate_name="candidate"):
        # 为所有用户生成排序结果
        rank_results = []
        for i in range(len(test_data)):
            user_id = test_data.iloc[i]['user_id']
            print('user_id:', user_id)
            try:
                result, target, target_score = self.get_rank_result_for_one_user(user_id, meta_data, test_data, images_data, reason_data, system_prompt, candidate_name, model)
                print(result)
                rank_results.append([user_id, result, target, target_score])
            except Exception as e:
                print(e)
                rank_results.append('None')
        # 保存到文件
        df = pd.DataFrame(rank_results, columns=['user_id', 'rank_result', 'target', 'target_score'])
        df.to_parquet(save_path + '/rank_result_image.parquet')

    def save_rank_result_for_all_users_efficient(self, meta_data, test_data, images_data, system_prompt, model="qwen2.5-7b-instruct-1m", save_path='new_data', max_workers=10, candidate_name="candidate"):
        """
        使用并发处理为所有用户生成排序结果

        Args:
            meta_data: 物品元数据
            test_data: 测试数据
            images_data: 图像文本数据
            reason_data: 物品选择原因数据
            system_prompt: 系统提示词
            model: 使用的模型，默认为"qwen2.5-7b-instruct-1m"
            save_path: 保存结果的路径，默认为'new_data'
            max_workers: 最大工作线程数，默认为10
        """
        # 创建一个临时函数来处理单个用户
        def process_one_user(idx):
            user_id = test_data.iloc[idx]['user_id']
            try:
                result, target, target_score = self.get_rank_result_for_one_user(user_id, meta_data, test_data, images_data, system_prompt, candidate_name, model)
                # print(f'用户 {user_id} 处理完成')
                return [user_id, result, target, target_score]
            except Exception as e:
                print(f'用户 {user_id} 处理出错: {e}')
                return [user_id, 'None', -1, -1]

        # 使用线程池并发处理
        rank_results = []
        total_users = len(test_data)

        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            # 提交所有任务
            future_to_idx = {executor.submit(process_one_user, idx): idx for idx in range(total_users)}

            # 使用tqdm显示进度条
            for future in tqdm(concurrent.futures.as_completed(future_to_idx), total=total_users, desc="处理用户"):
                result = future.result()
                rank_results.append(result)

        # 保存到文件
        df = pd.DataFrame(rank_results, columns=['user_id', 'rank_result', 'target', 'target_score'])
        df.to_parquet(save_path + '/rank_result.parquet')
        return df

    def calculate_ndcg(self, rank_result_path):
        rank_result_data = pd.read_parquet(rank_result_path)
        ndcg_1 = 0
        ndcg_5 = 0
        ndcg_10 = 0
        for i in range(len(rank_result_data)):
            rank = rank_result_data.iloc[i]['rank_result']
            target = rank_result_data.iloc[i]['target']
            target_score = rank_result_data.iloc[i]['target_score']
            rank = self.get_list_from_result(rank)
            rank_score = self.convert_item_id_to_item_score(rank, target, target_score)
            if rank_score == 'Error':
                print('Error')
                continue
            ndcg_1 += ndcg_at_k(rank_score, 1)
            ndcg_5 += ndcg_at_k(rank_score, 5)
            ndcg_10 += ndcg_at_k(rank_score, 10)
        print('ndcg@1:', ndcg_1/len(rank_result_data))
        print('ndcg@5:', ndcg_5/len(rank_result_data))
        print('ndcg@10:', ndcg_10/len(rank_result_data))

    def convert_img_description_to_label(self, description, system_prompt, model = "qwen-max"):
        # 将图像描述转换为标签
        result = self.client.chat.completions.create(
            model=model,
            messages=[{"role": "system", "content": system_prompt},
                      {"role": "user", "content": description},
                      ])
        return result.choices[0].message.content
    
    def convert_img_description_to_label_for_all(self, img_path, system_prompt, model = "qwen-max", save_path='new_data'):
        # 把原本的图像描述转换为标签，保存到文件中
        img_description = pd.read_parquet(img_path)
        result = []
        for i in tqdm(range(len(img_description)), desc="处理图像描述"):
            description = img_description.iloc[i]['image_text']
            if description == 'None':
                result.append('None')
                continue
            try:
                label = self.convert_img_description_to_label(description, system_prompt, model)
                # print(label)
                result.append(label)
            except Exception as e:
                print(e)
                result.append('None')
        # 保存到文件
        img_description['image_label'] = result
        img_description.to_parquet(save_path + '/image_label.parquet')
        
        
        

In [None]:
# meta_data_path='video_data/train/meta_data.parquet'
# review_data_path='video_data/train/review_data.parquet'
# test_data_path='video_data/train/test_data.parquet'
# meta_data_path='video_data/test/meta_data.parquet'
# review_data_path='video_data/test/review_data.parquet'
# test_data_path='video_data/test/test_data.parquet'

# meta_data_path='new_data/train/meta_data.parquet'
# review_data_path='new_data/train/review_data.parquet'
# test_data_path='new_data/train/test_data.parquet'
# meta_data_path='new_data/test/meta_data.parquet'
# review_data_path='new_data/test/review_data.parquet'
# test_data_path='new_data/test/test_data.parquet'

meta_data_path='cd_data/train/meta_data.parquet'
review_data_path='cd_data/train/review_data.parquet'
test_data_path='cd_data/train/test_data.parquet'
# meta_data_path='cd_data/test/meta_data.parquet'
# review_data_path='cd_data/test/review_data.parquet'
# test_data_path='cd_data/test/test_data.parquet'

api_key = ""
base_url = ""
data_processor = Data_Processor(meta_data_path, review_data_path, test_data_path, api_key, base_url)



In [None]:
img_2_test_system_prompt = "Now you are a goods decoration expert. You can describe the following goods image in detail but would not spend more than 100 words. Please describe the following goods image."
img_2_test_user_prompt = "Please describe the following goods image in a short but detailed way."

sum_user_preference_system_prompt = "Now you are a user preference expert. You can summarize the user preference based on the user's history which includes the information of the items and the reviews the user given to the items. Please summarize the user preference based on the user's history and provide a brief summary of it, for example, you can summarize the user's preference, the user's preference is mainly focused on the price, quality, or other aspects, make it concise and clear which should not exceed 80 words."

rank_candidate_system_prompt = "Now you are a goods ranker. You can rank the following goods based on user's preference and give back your answer with a form of list of these candidate items'ID. Please rank the following goods based on user's preference and give back your answer with a form of list of these candidate items'ID. For example, if candidate item ids are 1, 2, 3, 4, 5, 6, 7, 8, 9 and 10(number is the item's ID), you must rank them in a form of list such as [3, 6, 1, 4, 10, 2, 7, 8, 9, 5](the order is random, which is just an example). Also, make sure the list contains all the candidate item ids and you have not missed or changed any item id. Please make sure the list is correct and complete based on user's preference and don't need to explain the reason for the ranking."
rank_candidate_system_prompt_with_cot = """
Now you are a goods ranker. You can rank the following goods based on user's preference and give back your answer with a form of list of these candidate items'ID and some reasonable thinking steps.
You have to give back your thinking steps. However, your thinking steps should be refining, short and complete.
You can do it step by step, for example, you can provide the following thinking steps:
step1 : From the candidate items, find out all the item ids which are to be ranked.
step2 : Think about the user's preference, and analyze the items based on it.
step3 : Rank the items based on the user's preference. Give back your answer with the item's ID in a form of list. Remember, all the candidate item ids you find in step1 should be included in the ranked items list, and you have not missed or changed any of them.

Example output:
step1 : all the item ids are [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

step2 :
From the user's preference, we can analyze the items based on the following criteria:
1. **Affordability and Practicality:** Items with lower prices and high ratings are prioritized. Items 2 and 5 are cheap and functional.
2. **Cost-Effectiveness:** Items 1 and 6 offer good value for money.
3. **Eco-Friendly Options:** Item 1 is noted for its eco-friendly features.
4. **Cute Designs:** Item 7 has a cute design.
5. **High Ratings:** Items 10 and 7 have high ratings.
6. **Reliability:** Items 6 and 4 have consistent reviews indicating reliability.
7. **Avoid Unreliable Products:** Items like 8 which had disappointing reviews were ranked lower.
(You can provide more detailed reasons for the ranking based on the information provided in the prompt. The above steps are just an example and may not be applicable to the current prompt.)

step3 : the ranked items are [2, 5, 1, 6, 4, 9, 7, 10, 8, 3]
"""

In [None]:
data_processor.save_all_images_to_text(img_2_test_system_prompt, img_2_test_user_prompt, model="qwen-vl-plus-latest", save_path='cd_data/train')
data_processor.check_and_save_images_to_text(img_2_test_system_prompt, img_2_test_user_prompt, model="qwen-vl-plus-latest", save_path='cd_data/train')

In [None]:
data_processor.summarize_user_preference(sum_user_preference_system_prompt, save_path='cd_data/train')

In [None]:
data_processor.save_rank_result_for_all_users_efficient(meta_data=pd.read_parquet(meta_data_path), test_data=pd.read_parquet(test_data_path), images_data=pd.read_parquet('cd_data/image_text.parquet'), 
                                                        system_prompt=rank_candidate_system_prompt_with_cot, model="qwen2.5-7b-instruct", save_path='cd_data', max_workers=3)

In [None]:
rank_result_data = pd.read_parquet('cd_data/train/rank_result.parquet')

ndcg_1 = 0
ndcg_5 = 0
ndcg_10 = 0
a = 0

error_num = 0
for i in range(len(rank_result_data)):
    user_id = rank_result_data.iloc[i]['user_id']
    rank = rank_result_data.iloc[i]['rank_result']
    target = rank_result_data.iloc[i]['target']
    target_score = rank_result_data.iloc[i]['target_score']
    try:
        rank_list = data_processor.get_list_from_result_cot(rank)
        rank_score = data_processor.convert_item_id_to_item_score(rank_list, target, target_score)
    except Exception as e:
        print(e)
        continue
    if rank_score == 'Error' or len(rank_list) < 9:
        error_num += 1
        continue
    ndcg_1 += ndcg_at_k(rank_score, 1)
    ndcg_5 += ndcg_at_k(rank_score, 5)
    ndcg_10 += ndcg_at_k(rank_score, 10)
print('ndcg@1:', ndcg_1/len(rank_result_data))
print('ndcg@5:', ndcg_5/len(rank_result_data))
print('ndcg@10:', ndcg_10/len(rank_result_data))
print('valid_rate:', (1 - error_num/len(rank_result_data)))

In [None]:
data_processor.save_candidate_to_test_data(len_meta_data=711, is_shuffle=True, name='candidate')

In [None]:
data_processor.get_sft_data_for_all_user(meta_data=pd.read_parquet(meta_data_path), test_data=pd.read_parquet(test_data_path), images_data=pd.read_parquet('cd_data/train/image_text.parquet'), system_prompt=rank_candidate_system_prompt_with_cot, rank_result_data=pd.read_parquet('cd_data/train/rank_result.parquet'), save_path='cd_data/train/sft_data_with_cot.json')

In [None]:
data_processor.save_rank_results_to_json(rank_result_path='cd_data/train/rank_result1_1.parquet', system_prompt=rank_candidate_system_prompt_with_cot, 
                                         save_path='cd_data/train', output_file='rank_results_collection_c1.json', candidate_name="candidate1")
data_processor.save_rank_results_to_json(rank_result_path='cd_data/train/rank_result1_2.parquet', system_prompt=rank_candidate_system_prompt_with_cot, 
                                         save_path='cd_data/train', output_file='rank_results_collection_c1.json', candidate_name="candidate1")
data_processor.get_dpo_data_from_collection(rank_results_collection_path='cd_data/train/rank_results_collection_c1.json', save_path='cd_data/train')