# 分析db数据的所有用户attitude变化

In [1]:
import asyncio
import os
import pandas as pd
from string import Template
import json

from camel.models import ModelFactory
from camel.types import ModelPlatformType
from camel.messages import OpenAIMessage
import sqlite3

import sys

# 修改工作目录到项目根目录
os.chdir("/home/lsj/Projects/CIMagent")



In [2]:
vllm_model = ModelFactory.create(
    model_platform=ModelPlatformType.VLLM,
    model_type="/data/model/Qwen3-14B",
    url="http://localhost:21474/v1",
    model_config_dict={"max_tokens": 16000}
)




In [None]:
# 读取db数据
topic_index = 3
topic = json.load(open("data/CIM_experiments/topics.json"))[f"topic_{topic_index}"]['title']
db_path = f"experiments/20250509_223056/backups/twitter_simulation_24.db"
conn = sqlite3.connect(db_path)
cursor = conn.cursor()

# 分析db数据
cursor.execute("SELECT * FROM user")
users_data = cursor.fetchall()
users_df = pd.DataFrame(users_data, columns=[i[0] for i in cursor.description])
print(users_df.columns)
cursor.execute("SELECT * FROM post")
posts_data = cursor.fetchall()
posts_df = pd.DataFrame(posts_data, columns=[i[0] for i in cursor.description])
print(posts_df.columns)

# 关闭连接
conn.close()


In [8]:
attitude_analysis_template = Template(
    "You are a social media analysis expert. You are given a post and you need to analyze the attitude of the post based on the topic. "
    "Here is the topic of the post: $topic\n"
    "Here is the information of the post you are interested in:\n"
    "Post Information:\n"
    "Content: $post_content\n"
    "You only answer the attitude of the post, not any other information."
    "You should answer in the following format. The attitude should be a single word (positive, negative, neutral), don't include any other information: \n"
    "Attitude: <post_attitude>"
)

async def attitude_analysis(user_posts):
    # 通过llm_model分析posts_list中每个post的attitude
    user_posts = user_posts.iloc[::-1]
    post_attitudes = []
    for post_index in range(-1, -5, -1):
        try:
            post_info = user_posts.iloc[post_index]
        except:
            break
        post_msg = attitude_analysis_template.substitute(topic=topic, post_content=post_info['content'])
        # 将文本消息转换为OpenAI格式的消息列表
        messages = [{"role": "user", "content": post_msg}]
        # 直接使用消息列表,不需要转换为OpenAIMessage
        post_attitude = await vllm_model.arun(messages)
        print(post_attitude)
        post_attitudes.append(post_attitude)
    return post_attitudes

async def analyze_users_attitudes(users_df, posts_df):
    """分析所有用户的态度
    
    Args:
        users_df: 用户数据DataFrame
        posts_df: 帖子数据DataFrame
        
    Returns:
        attitudes_results: 所有用户的态度分析结果列表
    """
    attitudes_analysis = []
    for user_id in users_df['user_id']:
        # 找到该user发表的post
        user_posts = posts_df[posts_df['user_id'] == user_id]
        attitudes_analysis.append(attitude_analysis(user_posts))
    attitudes_results = await asyncio.gather(*attitudes_analysis)
    return attitudes_results


def calculate_user_attitude_score(attitudes_result, attitudes_score={'positive': 1, 'negative': -1, 'neutral': 0}, discard_rate=0.5):
    """计算单个用户的态度分数
    
    Args:
        attitudes_result: 用户的态度分析结果列表
        attitudes_score: 各态度对应的分数字典
        discard_rate: 衰减率
        
    Returns:
        user_attitude_score: 用户的态度分数
        user_attitudes_count: 用户各态度的计数
    """
    import re
    
    user_attitude_score = 0
    user_attitudes_count = {'positive': 0, 'negative': 0, 'neutral': 0}
    
    pattern = r'Attitude:\s*(\w+)'
    
    for one_attitude in attitudes_result:
        content = one_attitude.choices[0].message.content
        match = re.search(pattern, content)
        if match:
            attitude = match.group(1).lower()
            user_attitudes_count[attitude] += 1
            user_attitude_score = (user_attitude_score + attitudes_score[attitude])*discard_rate
        
    return user_attitude_score, user_attitudes_count

def get_attitude_label(score):
    """根据分数获取态度标签
    
    Args:
        score: 态度分数
        
    Returns:
        str: 态度标签
    """
    if score > 0:
        return 'positive'
    elif score < 0:
        return 'negative'
    else:
        return 'neutral'
    

In [None]:
# 执行用户态度分析
attitudes_results = await analyze_users_attitudes(users_df, posts_df)

In [None]:

# 分析每个用户的态度
for user_id, attitudes_result in enumerate(attitudes_results):
    score, counts = calculate_user_attitude_score(attitudes_result)
    attitude = get_attitude_label(score)
    print(f"用户 {user_id} 的态度分数: {score}")
    print(f"用户 {user_id} 的态度是: {attitude}")


# 分析某个experiment下所有db数据中users的态度分布情况

In [29]:
import asyncio
import os
import pandas as pd
from string import Template
import json

from camel.models import ModelFactory
from camel.types import ModelPlatformType
from camel.messages import OpenAIMessage
import sqlite3

import sys

# 修改工作目录到项目根目录
os.chdir("/home/lsj/Projects/CIMagent")

vllm_model = ModelFactory.create(
    model_platform=ModelPlatformType.VLLM,
    model_type="/data/model/Qwen3-14B",
    url="http://localhost:21474/v1",
    model_config_dict={"max_tokens": 16000}
)

attitude_analysis_template = Template(
    "You are a social media analysis expert. You are given a post and you need to analyze the attitude of the post based on the topic. "
    "Here is the topic of the post: $topic\n"
    "Here is the information of the post you are interested in:\n"
    "Post Information:\n"
    "Content: $post_content\n"
    "You only answer the attitude of the post, not any other information."
    "You should answer in the following format. The attitude should be a single word (positive, negative, neutral), don't include any other information: \n"
    "Attitude: <post_attitude>"
)

async def attitude_analysis(user_posts):
    # 通过llm_model分析posts_list中每个post的attitude
    user_posts = user_posts.iloc[::-1]
    post_attitudes = []
    for post_index in range(-1, -5, -1):
        try:
            post_info = user_posts.iloc[post_index]
        except:
            break
        post_msg = attitude_analysis_template.substitute(topic=topic, post_content=post_info['content'])
        # 将文本消息转换为OpenAI格式的消息列表
        messages = [{"role": "user", "content": post_msg}]
        # 直接使用消息列表,不需要转换为OpenAIMessage
        post_attitude = await vllm_model.arun(messages)
        print(post_attitude)
        post_attitudes.append(post_attitude)
    return post_attitudes

async def analyze_users_attitudes(users_df, posts_df):
    """分析所有用户的态度
    
    Args:
        users_df: 用户数据DataFrame
        posts_df: 帖子数据DataFrame
        
    Returns:
        attitudes_results: 所有用户的态度分析结果列表
    """
    attitudes_analysis = []
    for user_id in users_df['user_id']:
        # 找到该user发表的post
        user_posts = posts_df[posts_df['user_id'] == user_id]
        attitudes_analysis.append(attitude_analysis(user_posts))
    attitudes_results = await asyncio.gather(*attitudes_analysis)
    return attitudes_results

def calculate_user_attitude_score(attitudes_result, attitudes_score={'positive': 1, 'negative': -1, 'neutral': 0}, discard_rate=0):
    """计算单个用户的态度分数
    
    Args:
        attitudes_result: 用户的态度分析结果列表
        attitudes_score: 各态度对应的分数字典
        discard_rate: 衰减率
        
    Returns:
        user_attitude_score: 用户的态度分数
        user_attitudes_count: 用户各态度的计数
    """
    import re
    
    user_attitude_score = 0
    user_attitudes_count = {'positive': 0, 'negative': 0, 'neutral': 0}
    
    pattern = r'Attitude:\s*(\w+)'
    
    for one_attitude in attitudes_result:
        content = one_attitude.choices[0].message.content
        match = re.search(pattern, content)
        if match:
            attitude = match.group(1).lower()
            user_attitudes_count[attitude] += 1
            user_attitude_score = (user_attitude_score + attitudes_score[attitude])*discard_rate
        
    return user_attitude_score, user_attitudes_count

def get_attitude_label(score):
    """根据分数获取态度标签
    
    Args:
        score: 态度分数
        
    Returns:
        str: 态度标签
    """
    if score > 0:
        return 'positive'
    elif score < 0:
        return 'negative'
    else:
        return 'neutral'



In [None]:
# 读取db数据
topic_index = 3
topic = json.load(open("data/CIM_experiments/topics.json"))[f"topic_{topic_index}"]['title']
db_dir = f"experiments/20250509_223056/backups"

attitudes_per_step = []

for db_file in os.listdir(db_dir):
    db_path = os.path.join(db_dir, db_file)
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    # 分析db数据
    cursor.execute("SELECT * FROM user")
    users_data = cursor.fetchall()
    users_df = pd.DataFrame(users_data, columns=[i[0] for i in cursor.description])
    print(users_df.columns)
    cursor.execute("SELECT * FROM post")
    posts_data = cursor.fetchall()
    posts_df = pd.DataFrame(posts_data, columns=[i[0] for i in cursor.description])
    print(posts_df.columns)
    # 关闭连接
    conn.close()

    # 执行用户态度分析
    attitudes_step_task = analyze_users_attitudes(users_df, posts_df)
    
    # 计算当前step的attitude中三种态度的分布情况
    attitudes_distribution = {}
    # 分析每个用户的态度
    for user_id, attitudes_result in enumerate(attitudes_results):
        score, counts = calculate_user_attitude_score(attitudes_result)
        print(score, counts)
        attitude = get_attitude_label(score)
        attitudes_distribution[attitude] = attitudes_distribution.get(attitude, 0) + 1
    
    attitudes_per_step.append(attitudes_distribution)





In [None]:


# 将attitudes_per_step保存为json文件
with open(f"experiments/20250509_223056/attitudes_per_step.json", "w") as f:
    json.dump(attitudes_per_step, f)

# 将attitudes_per_step的分布情况绘制为图表
import matplotlib.pyplot as plt

# 提取每个step的attitude分布的各个态度值
positive_values = [step.get('positive', 0) for step in attitudes_per_step]
neutral_values = [step.get('neutral', 0) for step in attitudes_per_step]
negative_values = [step.get('negative', 0) for step in attitudes_per_step]

# 绘制每个step的attitude分布
plt.figure(figsize=(10, 6))
plt.plot(positive_values, label='Positive')
plt.plot(neutral_values, label='Neutral') 
plt.plot(negative_values, label='Negative')
plt.xlabel('Step')
plt.ylabel('Number of Users')
plt.title('Attitude Distribution over Steps')
plt.legend()
plt.show()

In [None]:
db_file_list = sorted(os.listdir(db_dir), key=lambda x: int(x.split('_')[-1].split('.')[0]))

users_posts_view = []
db_path = f"experiments/20250509_223056/backups/twitter_simulation_24.db"
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("SELECT * FROM user")
users_data = cursor.fetchall()
users_df = pd.DataFrame(users_data, columns=[i[0] for i in cursor.description])
user_id = users_df['user_id'].iloc[0]
print(users_df.columns)



for db_file in db_file_list:
    db_path = os.path.join(db_dir, db_file)
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM post")
    posts_data = cursor.fetchall()
    posts_df = pd.DataFrame(posts_data, columns=[i[0] for i in cursor.description])
    users_posts_view.append(posts_df[posts_df['user_id'] == user_id])
    # 关闭连接
    conn.close()

In [24]:
async def attitude_ONE(user_post:str):
    # 通过llm_model分析posts_list中每个post的attitude
    post_msg = attitude_analysis_template.substitute(topic=topic, post_content=user_post)
    # 将文本消息转换为OpenAI格式的消息列表
    messages = [{"role": "user", "content": post_msg}]
    # 直接使用消息列表,不需要转换为OpenAIMessage
    post_attitude = await vllm_model.arun(messages)
    print(post_attitude)
    return post_attitude

In [None]:
print(attitudes_per_step)

# 分析某个experiment下所有db数据中users的stance分布情况

In [None]:
from tqdm import tqdm
import asyncio
import os
import pandas as pd
from string import Template
import json

from camel.models import ModelFactory
from camel.types import ModelPlatformType
from camel.messages import OpenAIMessage
import sqlite3

import sys

# 修改工作目录到项目根目录
os.chdir("/home/lsj/Projects/CIMagent")

attitude_analysis_template = Template(
    "You are a social media analysis expert. You are given a post and you need to analyze the attitude of the post based on the topic. "
    "Here is the topic of the post: $topic\n"
    "Here is the information of the post you are interested in:\n"
    "Post Information:\n"
    "Content: $post_content\n"
    "You only answer the stance of the post, not any other information."
    "You should answer in the following format. The stance should be a single word (pro, con, neutral), don't include any other information: \n"
    "Stance: <post_stance>"
)



vllm_model = ModelFactory.create(
    model_platform=ModelPlatformType.VLLM,
    model_type="/data/model/Qwen3-14B",
    url="http://localhost:21474/v1",
    model_config_dict={"max_tokens": 16000}
)



async def attitude_ONE(user_post:str):
    # 通过llm_model分析posts_list中每个post的attitude
    post_msg = attitude_analysis_template.substitute(topic=topic, post_content=user_post)
    # 将文本消息转换为OpenAI格式的消息列表
    messages = [{"role": "user", "content": post_msg}]
    # 直接使用消息列表,不需要转换为OpenAIMessage
    post_attitude = await vllm_model.arun(messages)
    return post_attitude

topic_index = 3
topic = json.load(open("data/CIM_experiments/topics.json"))[f"topic_{topic_index}"]['title']

db_dir = f"experiments/20250509_223056/backups"
db_file_list = sorted(os.listdir(db_dir), key=lambda x: int(x.split('_')[-1].split('.')[0]))
db_file_start = db_file_list[0]
conn = sqlite3.connect(os.path.join(db_dir, db_file_start))
cursor = conn.cursor()
cursor.execute("SELECT * FROM user")
users_data = cursor.fetchall()
users_df = pd.DataFrame(users_data, columns=[i[0] for i in cursor.description])
users_nums = len(users_df)
users_attitude_list = []  # users_nums*len(db_file_list) asyncio.tasks

for db_file in db_file_list:
    db_path = os.path.join(db_dir, db_file)
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM post")
    posts_data = cursor.fetchall()
    posts_df = pd.DataFrame(posts_data, columns=[i[0] for i in cursor.description])
    for user_id in users_df['user_id']:
        user_posts = posts_df[posts_df['user_id'] == user_id]
        if not user_posts.empty:
            last_post_content = user_posts.iloc[-1]['content']
            users_attitude_list.append(attitude_ONE(last_post_content))
    conn.close()



# 使用tqdm包装tasks列表来显示进度条
users_attitude_list_result = await asyncio.gather(*users_attitude_list)
users_attitude_per_step = []
for step_index in range(len(db_file_list)):
    users_attitude_per_step.append(users_attitude_list_result[step_index*users_nums:(step_index+1)*users_nums])
print(users_attitude_per_step)



In [None]:
print(users_attitude_list)
