In [1]:
import time
import os
import pandas as pd
from dotenv import dotenv_values
import concurrent.futures
import dashscope
import pymysql
import json
import re
# from zhipuai import ZhipuAI

# 0. Load data

In [2]:
def load_data_to_df(database, command, columns):
    USER = '<USER>'
    PWD = '<PASSWORD>'
    URL = '<URL>'
    conn = pymysql.connect(host=URL, port=3306, user=USER, passwd=PWD, db=database, charset='utf8mb4')
    
    with conn.cursor() as cursor:
        cursor.execute(command)
        result = cursor.fetchall()
        df = pd.DataFrame(result, columns=columns)
    
    return df

In [3]:
%%time

SQL_LOAD = '''
SELECT `id`, `content`
FROM `tagged_content_by_kwd_dict_3000`
'''

df_xhs = load_data_to_df('validation', SQL_LOAD, columns=['id', 'content'])
            
len(df_xhs)

CPU times: user 47.7 ms, sys: 783 µs, total: 48.4 ms
Wall time: 72.2 ms


3000

In [5]:
# overlap = set(df_old_samp['id']).intersection(set(df['id']))
# len(overlap)

8

In [6]:
# df_xhs = df[~df['id'].isin(overlap)]
# df_xhs.shape

(2992, 3)

In [7]:
# df_xhs.head(1)

Unnamed: 0,id,content,output
0,e04592c0da33650d9c837464dca6c706|b9393c69e2e6d...,一拖二的周六，一早起床，勿勿忙忙准备好小娃需要看病的资料，再准备我需要摆摊的工具，大娃补课催...,"{'食品类别': '未知', '是否为零售食品': '否', '文本主题': '日常生活挑战..."


# 1. Tag using LLM

In [4]:
MODEL = 'qwen1.5-14b-chat'
NUM_PARALLEL = 15

In [5]:
dashscope.api_key = "<API_KEY>"

In [6]:
def get_chat_completion(model, messages, temperature=0.85):
    response = dashscope.Generation.call(
        model=model,
        messages=messages,
        result_format='message', 
        temperature=temperature
    )
    try:
        return response['output']['choices'][0]['message']['content']
    except:
        return '{}'


def item_process_tag(item, col_name, model_version, prompt):

    # item format: (int, pd.Series)
    row = item[1]

    messages = [{'role': 'system', 'content': prompt}
                , {'role': 'user', 'content': f'{row[col_name]}'}]

    response = get_chat_completion(model_version, messages)
    try: 
        idx_start, idx_end = response.find('{'), response.find('}')

        row['output'] = response[idx_start:(idx_end+1)].replace('\n', '')
    except:
        row['output'] = response

    return row

In [7]:
PROMPT = '''
"你是一位国际食品零售企业的数字营销资深分析师，擅长消费者洞察。
# 牢记你的任务和分析目标：
* 你的任务：根据你的理解，甄别平台帖子内容，对文本主题进行标注

# 取值范围：
* 属性值列表: "美食分享与推荐", "健康与饮食", "健康与养生", "运动健身"
            , "户外活动", "广告与推广", "食物评测", "日常记录", "烹饪与食谱"
            , "生活感悟", "节日庆祝", "体重管理", "家庭日常", "社交互动"
            , "旅行", "怀旧与回忆", "学习与教育", "校园生活"
            , "孕期饮食与健康", "儿童饮食与健康", "其他"

# 输出格式
* 输出JSON格式的回复，包含两个子字段，"标签值", "原文依据"。"标签值"包含一个标签值。"原文依据"包含一个原文关键字词的列表。列表用"["开始，"]"结束。

# 要求
* 从列表中挑选一个最能总结概括文本内容的值，作为"标签值"输出。只能输出列表中的值
* 如果无法确定，回复"其他"
* "标签值"字段只能输出列表中的值
* 只要输出JSON格式的回复，不要输出其他任何多余的内容
'''

In [8]:
def parse_output(row, attrib):
    # ATTRIB_LIST = ['食品类别', '是否为零售食品', '文本主题', '文本类型', '主题相关性'
    #               , '情感氛围', '食品用途', '地点场合', '时间场合', '时间', '人物类型'
    #               , '痛点', '宏观场合', '微观场合']
    
    text = row['output']
    
    # post-process - format
    ## remove extra content at the end (去除尾部多余的内容, 且如果没有“}”, 补上)
    idx_lst = [index for index, element in enumerate(list(text)) if element == '}']
    if len(idx_lst) == 0:
        text = text + '}'
        idx_lst = [(len(text)-1),]
    idx_tail = max(idx_lst)
    text = text[:idx_tail+1]
    
    ## remove extra comma
    if text[-2:] == ',}':
        text = text[:-2] + '}'
    
    ## format 原文依据
    try:
        result = json.loads(text)
    except:
        # “原文依据” 格式整理, 无 部分统一格式填充
        text = re.sub(r'("原文依据": 无[^ ]*)', '"原文依据": []', text)
        # “原文依据” 格式整理, 确保列表用[]包裹
        text = re.sub(r'("原文依据": )((?! \[)[^}\n]*)([}\n])', r'\1[\2]\3', text)

    # parse attribute and keywords
    result = {}
    try:
        attr = json.loads(text)
        result[attrib] = attr.get('标签值', '未知')
        result['关键词-'+attrib] = attr.get('原文依据', [])
    except: 
        result = {attrib: "PARSING ERROR", ('关键词-'+attrib): "PARSING ERROR"}

    return pd.Series(result) 

In [9]:
def extract_attribute_keyword(df, attrib):
    '''
    Return a list/dictionary of pd.DataFrame, each of columns ['arrtibute_type'(标签类别), 'arrtibute_value'(标签名称), 'keyword'(关键词)]
    '''
    # ATTRIB_LIST = ['食品类别', '是否为零售食品', '文本主题', '文本类型', '主题相关性'
    #               , '情感氛围', '食品用途', '地点场合', '时间场合', '时间', '人物类型'
    #               , '痛点', '宏观场合', '微观场合']
    output_dict = {}
    # for c in ATTRIB_LIST:
    
    df_c = df[~df[attrib].isin(['PARSING ERROR', '未知', ''])][['id', attrib, f'关键词-{attrib}']]
    df_c = df_c.explode(f'关键词-{attrib}') 
    df_c = df_c[~df_c[f'关键词-{attrib}'].isna()]
    # remove rows where either tag or keyword is still list
    df_c['is_str_tag'] = df_c[attrib].apply(lambda x: 'Y' if isinstance(x, str) else 'N')
    df_c['is_str_kwd'] = df_c[f'关键词-{attrib}'].apply(lambda x: 'Y' if isinstance(x, str) else 'N')
    df_c = df_c[(df_c['is_str_tag']=='Y')&(df_c['is_str_kwd']=='Y')]
    df_c = df_c.drop_duplicates()
    df_c['标签类别'] = attrib
    output_dict[attrib] = df_c[['id', '标签类别', attrib, f'关键词-{attrib}']]
    
    return output_dict

In [10]:
def write_to_mysql(df, database, table):
    # Create a connection to the database
    USER = '<USER>'
    PWD = '<PASSWORD>'
    URL = '<URL>'
    conn = pymysql.connect(host=URL, port=3306, user=USER, passwd=PWD, db=database, charset='utf8mb4')
    
    rows_fail = []

    # Create a cursor
    cursor = conn.cursor()

    # Iterate over the rows of the DataFrame and insert each one into the database
    for i, row in df.iterrows():
        # Create an INSERT query
        cols = '`'+'`, `'.join(df.columns)+'`'
        query = f"INSERT INTO `{table}` ({cols}) VALUES ({', '.join(['%s'] * len(row))})"

        # Format row data
        data = tuple(row[col] for col in df.columns)

        try: 
            # Execute the query
            cursor.execute(query, data)

            # Commit the changes
            conn.commit()
        except:
            rows_fail.append(data)

    # Close the cursor and the connection
    cursor.close()
    conn.close()
    return rows_fail

In [11]:
database = 'validation'
attribute_chn = '文本主题'
attribute_eng = 'text_topic'

In [12]:
for i in range(0, len(df_xhs), 100):
# for i in range(0, 100, 100):
    print("Start index", i)
    df = df_xhs.iloc[i:(i+100), :]
    
    # end
    if len(df) == 0:
        break
    
    # tag using LLM
    t1 = time.time()
    with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_PARALLEL) as executor:
        futures = {executor.submit(item_process_tag, row, 'content', MODEL, PROMPT): row for row in df.iterrows()}
        result_list = [f.result() for f in concurrent.futures.as_completed(futures)]
    df = pd.DataFrame(result_list).sort_index()
    t2 = time.time()
    print(f"Qwen tagging done, took {t2-t1} seconds")
    
    # parse LLM output
    ## normal case
    df_ = df[['output']]
    df_ = df_.join(df_.apply(lambda x: parse_output(x, attribute_chn), axis=1))
    df = df.merge(df_, on='output', how='left')
    
    ## need further format
    t1 = time.time()
    df_error = df[df[attribute_chn]=='PARSING ERROR']
    num_error = len(df_error)
    print(f"After normal parsing, {num_error} rows have issues")
    
    # extract keyword and attribute accordingly
    kwd_dicts = extract_attribute_keyword(df, attribute_chn)
    cols_keywords = list(filter(lambda x: x[:3] == '关键词' in x, df_.columns))
    df_attrib = df.drop(cols_keywords, axis=1)
    df_attrib = df_attrib.fillna('未知')
    
    # ATTRIB_MAP = {'食品类别':'food_category'
    #               , '是否为零售食品':'is_retail_food'
    #               , '文本主题':'text_topic'
    #               , '文本类型':'text_type'
    #               , '主题相关性':'topic_relevance'
    #               , '情感氛围':'emotional_atmosphere'
    #               , '食品用途':'food_usage'
    #               , '地点场合':'where'
    #               , '时间场合':'when'
    #               , '时间':'time'
    #               , '人物类型':'character_type'
    #               , '痛点':'painpoint'
    #               , '宏观场合':'macro_occasion'
    #               , '微观场合':'micro_occasion'}
    
    # save result
    ## save tag-keyword dictionary
    for kwd, df_kwd in kwd_dicts.items():
        dst_table = f'tag_kwd_{attribute_eng}_prompt_v2'
        df_kwd = df_kwd.rename(columns={'id': 'content_id', '标签类别':'tag_category', f'{kwd}':'tag_value', f'关键词-{kwd}':'keyword'})
        rows_fail = write_to_mysql(df_kwd, database, dst_table)
    print(f'Tag keyword dictionary uploaded to mysql')
        
    ## save text with attribute 
    df_attrib = df_attrib.rename(columns={attribute_chn: attribute_eng})
    df_attrib_valid = df_attrib[df_attrib[attribute_eng]!='PARSING ERROR']
    df_attrib_valid = df_attrib_valid.drop('output', axis=1)

    t1 = time.time()
    rows_fail_attr = write_to_mysql(df_attrib_valid, database, f'tagged_content_prompt_v2_{attribute_eng}_only')
    t2 = time.time()
    print(f'''Tagged texts uploaded to mysql
              , successfully loaded {len(df_attrib_valid) - len(rows_fail_attr)} rows to database "{database}"
              , took {int(t2-t1)} seconds''')

Start index 0
Qwen tagging done, took 27.6506609916687 seconds
After normal parsing, 0 rows have issues
Tag keyword dictionary uploaded to mysql
Tagged texts uploaded to mysql
              , successfully loaded 100 rows to database "validation"
              , took 0 seconds
Start index 100
Qwen tagging done, took 29.380916595458984 seconds
After normal parsing, 0 rows have issues
Tag keyword dictionary uploaded to mysql
Tagged texts uploaded to mysql
              , successfully loaded 100 rows to database "validation"
              , took 0 seconds
Start index 200
Qwen tagging done, took 27.326350212097168 seconds
After normal parsing, 0 rows have issues
Tag keyword dictionary uploaded to mysql
Tagged texts uploaded to mysql
              , successfully loaded 100 rows to database "validation"
              , took 0 seconds
Start index 300
Qwen tagging done, took 27.557279109954834 seconds
After normal parsing, 0 rows have issues
Tag keyword dictionary uploaded to mysql
Tagged texts

In [13]:
df = pd.read_excel("Validation Dataset.xlsx")
len(df)

3000

In [14]:
df.head(1)

Unnamed: 0,id,content,output
0,e04592c0da33650d9c837464dca6c706|b9393c69e2e6d...,一拖二的周六，一早起床，勿勿忙忙准备好小娃需要看病的资料，再准备我需要摆摊的工具，大娃补课催...,"{'食品类别': '未知', '是否为零售食品': '否', '文本主题': '日常生活挑战..."


In [15]:
%%time

SQL_LOAD = '''
SELECT `id`, content, text_topic
FROM `tagged_content_prompt_v2_text_topic_only`
'''

df_result = load_data_to_df('validation', SQL_LOAD, columns=['id', 'content', 'text_topic'])
            
len(df_result)

CPU times: user 40.3 ms, sys: 4.08 ms, total: 44.4 ms
Wall time: 74.4 ms


2999

In [16]:
df = df[['id']].merge(df_result, on='id', how='left')
len(df)

3000

In [17]:
df.head()

Unnamed: 0,id,content,text_topic
0,e04592c0da33650d9c837464dca6c706|b9393c69e2e6d...,一拖二的周六，一早起床，勿勿忙忙准备好小娃需要看病的资料，再准备我需要摆摊的工具，大娃补课催...,家庭日常
1,18336dc645f26e3c3944e8c47c1c9831|d035d1c7e67d6...,津铺子的这个0卡果冻回购很多次了，一共四种口味：白桃、青提、荔枝和芒果。四个口味都还不错，口...,健康与饮食
2,b9240d65115b682962187bac31a5a14d|e64571aeb3fbf...,话梅柠檬茶！建议女孩子都把奶茶换成它！ 全网安利的咸柠茶！我能喝一整个夏天！真的好喝到原地转...,健康与饮食
3,a0e6ebe573ea14303727123a5abbef3e|de754b5d37397...,随手买的香辣鱼尾也太好吃了吧！鱼肉很有嚼劲，麻麻辣辣的也太过瘾了，平时追剧或者嘴馋了来一包，...,美食分享与推荐
4,d3fc3728a8c3e13ae5988aa15e4bb0fd|320b8f65d7782...,#踏春赏花最美的景点推荐 #感受大自然的气息和美景 #旅行推荐官 #抖音带你去赏花 #春暖花...,旅行


In [18]:
df.to_csv("0510_validation_3000.csv", index=False)

In [21]:
pd.DataFrame(df['text_topic'].value_counts(normalize=True).reset_index()).head(20)

Unnamed: 0,text_topic,proportion
0,美食分享与推荐,0.304435
1,生活感悟,0.063021
2,健康与饮食,0.06002
3,社交互动,0.044015
4,体重管理,0.037346
5,广告与推广,0.022007
6,家庭日常,0.01934
7,健康与养生,0.01934
8,节日庆祝,0.018673
9,零食推荐,0.016672
