In [89]:
import base64
import os
import time
from collections.abc import Iterable
from enum import Enum, StrEnum
from functools import wraps
from typing import Generic, TypeVar

import dspy
import litellm
import mlflow
import polars as pl
import ujson
from dotenv import load_dotenv
from litellm import RateLimitError
from pydantic import BaseModel, Field
from dspy.teleprompt import BootstrapFewShot
from dspy.evaluate.evaluate import Evaluate

load_dotenv("./.envrc")

True

In [55]:
ARK_I18N_API_BASE = "https://ark-ap-southeast.byteintl.net/api/v3"


def ark_i18n(endpoint: str, **kwargs) -> dspy.LM:
    return dspy.LM(
        model=f"volcengine/{endpoint}",
        api_base=ARK_I18N_API_BASE,
        api_key=os.environ.get("ARK_I18N_API_KEY"),
        max_tokens=16384,
        kwargs=kwargs,
    )


ds_v3_ark_i18n = ark_i18n("ep-20250305202533-sv6ls")
ds_r1_ark_i18n = ark_i18n("ep-20250306200824-2zd8h", temperature=0.6)

In [56]:
with open("./code.json", "r") as f:
    codes = ujson.load(f)

LanguageCode = Enum("LanguageCode", codes)


class Role(Enum):
    Merchant = "merchant"
    PlatformReprentative = "platform representative"

    def __str__(self):
        return self.value


class Message(BaseModel):
    sender: Role = Field(description="Message sender.")
    content: str = Field(description="Message content.")

    def __init__(self, content: str, fromuserid: int = None, sender: Role = None):
        if not fromuserid and not sender:
            raise ValueError("Either fromuserid or sender must be provided.")

        role = sender or (
            Role.Merchant if fromuserid < 80000000000000 else Role.PlatformReprentative
        )

        # try parse content
        try:
            rich_content = ujson.loads(content)
            content = rich_content.get("text", content)
        except Exception:
            # likely here
            content = content

        super().__init__(
            sender=role,
            content=content,
        )


class KeyFinding(BaseModel):
    finding: str = Field(description="The key finding.")
    message: list[str] = Field(
        description="Corresponding messages that supports the finding."
    )


class Summarize(dspy.Signature):
    """
    Summarize the give chat history, figure out the source language and output the summary.
    """

    history: list[Message] = dspy.InputField(
        desc="The chat history, sorted by create time order."
    )
    source_language: LanguageCode = dspy.OutputField(
        desc="The source language of the chat history, in ISO 639-1 format."
    )
    summary_en: str = dspy.OutputField(
        desc="The summary of the chat history, in English."
    )
    summary_zh: str = dspy.OutputField(
        desc="The summary of the chat history, in Chinese."
    )
    # key_findings: list[KeyFinding] = dspy.OutputField(
    #     desc="The key findings of the chat history that supports your summary, at most 5."
    # )


def format_history(histories):
    return "\n".join(
        f"- {'Merchant' if history['fromuserid'] < 80000000000000 else 'AM/BPO  '}: {
            history['content']
        }"
        for history in histories
    )


In [None]:
class OfferResponse(StrEnum):
    ACCEPT = "accept"
    REJECT = "reject"
    CONDITIONAL = "conditional"
    NO_RESPONSE = "no_response"


class CampaignInfo(BaseModel):
    name: str = Field(description="Campaign name, in full ORIGINAL text")
    is_pitched: bool = Field(
        description="Whether the PR is pitched(or promoted) the merchant to enroll the campaign."
    )
    offer_response: OfferResponse = Field(
        description="If the offer is accepted by the seller."
    )
    reason: str = Field(
        description="The seller's reason about any of 1) why rejects the offer 2) the seller will accept the offer after any conditions is met."
    )
    original_messages: list[str] = Field(
        description="Seller's original messages about the specific campaign, at least one.",
        min_length=1,
    )


class RecognizerV2(dspy.Signature):
    __doc__ = f"""
    [新增强制约束]
    - 一个对话可能提到多个campaign，你必须返回所有campaign.
    XBP,BXP是不同的campaign，注意区分！
    你需要知道VXP也是一个campaign，和XBP是不同的campaign.


    You're a quality control specialist in e-commerce, especially focusing on quality check of chat histories between Platform Representive(PR in short) and seller.
    You will be given the full chat history in one thread. Based on the given history, your task is to:
    - Recognize if the the chat history is about campaign enrollment, including ask for campaign details, invitation about join a campaign and so on.
    - Recognize *all* campaigns mentioned, if any.
    - For each campaign, find all related infomation. The instructions follows later.
    - Summarize the chat in both Chinese and English.

    To properly recognize the campaign info, you should follow the general SOP:
    - Is the chat is discussing campaign enrollment related topics?
      - If no, or campaign related by not mentioning any enrollment related topics, then stop.
      - If yes, then you should find *EVERY* campaign, with:
        - Is PR pitched the seller to enroll the campaign?
          - If no, then record the result and go to the next campaign.
          - If yes, then looking for seller's response:
            - If the seller accepts the offer, then record the result as `accept`.
            - If the seller rejects the offer, then record the result as `reject` and also summarize their reason in one clear sentence.
            - If the seller gives conditional acceptance, then record the result as `conditional` and also summarize the extra conditions in a bullet-in list where each item is a clear sentence.
            - If the seller not sure whether to join or just don't give clear answer, then record the result as `no_response`.

    You can assume the seller don't enroll any of those campaigns, but if you find any campaign the seller clearly and exactly says they has enrolled,
    also record the campaign with "accept" as their offer response.
    """

    history: list[Message] = dspy.InputField(
        desc="The chat history, sorted by create time order."
    )
    retrieved_information:str=dspy.InputField(
        desc="Campaign information retrieved from the database"
    )
    is_campaign_related: bool = dspy.OutputField(
        desc="Whether the chat is related to campaign enrollment."
    )
    campaigns: list[CampaignInfo] = dspy.OutputField(
        desc="The mentioned campaigns, if any."
    )
    summary_zh: str = dspy.OutputField(desc="The summary of the chat in Chinese")
    summary_en: str = dspy.OutputField(desc="The summary of the chat in English")

In [58]:
df = pl.read_csv("202501.csv").with_columns(
    dialogid=pl.col("extmap").map_elements(
        lambda x: int(ujson.loads(x)["dialogId"]), return_dtype=int
    )
)

df.head()

businesslineid,content,createdtime,extmap,fromtype,fromuserid,fromusername,pt_month,servermessageid,touserid,dialogid
i64,str,i64,str,i64,i64,str,i64,i64,i64,i64
8000696,"""sippp kakk, btw kakk kmrn aku …",1736302749000,"""{""s:oth_new_encrypt"":""true"",""s…",1,80006966902868,"""Muthia Octavia""",202501,7457363354340952080,20782698,31509425
8000696,"""Halo! Pesan anda akan di respo…",1736249611000,"""{""callBackRequired"":""false"",""d…",1,80006966902868,"""Muthia Octavia""",202501,7457362621407266817,20732936,31434071
8000696,"""1736302588733""",1736302588765,"""{""dialogScene"":""1"",""conversati…",4,80006966902868,"""Link Customer Service Represen…",202501,7457362628655024129,20782698,31509425
8000696,"""Halo! Pesan anda akan di respo…",1736255609000,"""{""businessLineId"":""8000696"",""c…",1,80006966902868,"""Muthia Octavia""",202501,7457362629670062096,20782698,31509425
8000696,"""Kak muthia Aku minta rekomenda…",1736255605000,"""{""externalMessageType"":""text"",…",0,20782698,"""Debylianti""",202501,7457362628655040513,80006966902868,31509425


In [59]:
agg_df = (
    df.lazy()
    .with_columns(content_with_orientation=pl.struct(["content", "fromuserid"]))
    .sort("createdtime")
    .group_by("dialogid")
    .agg(
        pl.col("content_with_orientation"),
        pl.first("createdtime"),
    )
)

agg_df.collect(new_streaming=True).write_parquet("processed_dialogs.parquet")

In [61]:
import os
import dspy
import numpy as np
from openai import OpenAI

# 初始化火山引擎客户端
client = OpenAI(
    api_key=os.environ.get("EMBED_API_KEY"),
    base_url="https://ark-cn-beijing.bytedance.net/api/v3",
)


# 定义可调用的嵌入函数
def openai_embedding_callable(texts):
    try:
        # 调用 OpenAI 的 Embedding API
        resp = client.embeddings.create(
            model="ep-20250417161947-pmq4l",
            input=texts,
            encoding_format="float"
        )
        # 从响应中提取嵌入向量
        embeddings = [data.embedding for data in resp.data]
        # 将嵌入向量列表转换为 2D numpy 数组
        return np.array(embeddings, dtype=np.float32)
    except Exception as e:
        print(f"请求出错: {e}")
        raise


In [None]:
import json
import random
import string
import csv

def generate_random_text(max_characters=1000):
    """
    生成随机文本
    :param max_characters: 文本最大字符数
    :return: 随机生成的文本
    """
    # 随机确定文本长度
    text_length = random.randint(10, max_characters)
    # 所有可能的字符集合
    all_characters = string.ascii_letters + string.digits + string.punctuation + " "
    # 随机选择字符组成文本
    text = ''.join(random.choice(all_characters) for _ in range(text_length))
    return text

def generate_random_corpus(num_docs=1000, max_characters=1000):
    """
    生成随机语料库
    :param num_docs: 语料库中文档的数量
    :param max_characters: 每个文档的最大字符数
    :return: 语料库列表
    """
    corpus = []
    for doc_id in range(num_docs):
        text = generate_random_text(max_characters)
        doc = {"id": f"doc_{doc_id}", "text": text}
        corpus.append(doc)
    return corpus

def load_csv_corpus(file_path):
    """
    从CSV文件加载知识库（返回列表 of dict，每个文档包含campaign_ids,title,abstract）
    """
    corpus = []
    with open(file_path, "r", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        for row in reader:
            corpus.append({
                "campaign_ids": row["campaign_ids"],
                "title": row["title"],
                "abstract": row["abstract"]
            })
    return corpus

# 加载你的CSV文件
corpus = load_csv_corpus("campaign_JanFeb.csv")


    

In [170]:

# 创建 dspy 的 Embedding 实例
max_characters = 600  # for truncating >99th percentile of documents
topk_docs_to_retrieve = 5  # number of documents to retrieve per search query
corpus = [str(doc) if not isinstance(doc, str) else doc for doc in corpus]
embedder = dspy.Embedder(openai_embedding_callable)
search = dspy.retrievers.Embeddings(embedder=embedder, corpus=corpus, k=topk_docs_to_retrieve)

In [None]:
import dspy
from dspy import Module, ChainOfThought, Retrieve

class RAG(Module): 
    def __init__(self, num_passages=3):
        super().__init__()
        self.generate_answer = dspy.ChainOfThought(RecognizerV2)

    def forward(self, history):
        print(f"history: {history}")

        merchant_information = "".join([msg.content for msg in history if msg.sender == Role.Merchant])
        context = search(merchant_information).passages
        prediction = self.generate_answer(history=history, retrieved_information=context)
        return dspy.Prediction(context=context, answer=prediction.campaigns)
        



In [180]:
history = [
    Message(**item)
    for item in agg_df.filter(pl.col("dialogid") == 31905857)
    .select("content_with_orientation")
    .collect(new_streaming=True)[0, 0]
]
print(history)

[Message(sender=<Role.PlatformReprentative: 'platform representative'>, content='hi kakk Ardi, besok akan dtg Summit kan yaa? plannya jam brp kakk dtgnyaa'), Message(sender=<Role.PlatformReprentative: 'platform representative'>, content='1736826548258'), Message(sender=<Role.PlatformReprentative: 'platform representative'>, content='aku mau ngobrol ttg Ramadan juga kakk kalo emg bisa hehe'), Message(sender=<Role.PlatformReprentative: 'platform representative'>, content='oh iya, tktnya kak Ardi blm save nomor aku yg dari kantor, ini nomor aku Muthia AM Lowyid Shop | Tokopedia ya kakk'), Message(sender=<Role.Merchant: 'merchant'>, content='hi ka'), Message(sender=<Role.PlatformReprentative: 'platform representative'>, content='halo kak Ardi apa kabar kakk'), Message(sender=<Role.PlatformReprentative: 'platform representative'>, content='kakk Ardi, udh dpt QR codenya?'), Message(sender=<Role.Merchant: 'merchant'>, content='belum ka'), Message(sender=<Role.PlatformReprentative: 'platform r

In [181]:

rag = RAG()
rag.set_lm(ds_r1_ark_i18n)
rag(history)


history: [Message(sender=<Role.PlatformReprentative: 'platform representative'>, content='hi kakk Ardi, besok akan dtg Summit kan yaa? plannya jam brp kakk dtgnyaa'), Message(sender=<Role.PlatformReprentative: 'platform representative'>, content='1736826548258'), Message(sender=<Role.PlatformReprentative: 'platform representative'>, content='aku mau ngobrol ttg Ramadan juga kakk kalo emg bisa hehe'), Message(sender=<Role.PlatformReprentative: 'platform representative'>, content='oh iya, tktnya kak Ardi blm save nomor aku yg dari kantor, ini nomor aku Muthia AM Lowyid Shop | Tokopedia ya kakk'), Message(sender=<Role.Merchant: 'merchant'>, content='hi ka'), Message(sender=<Role.PlatformReprentative: 'platform representative'>, content='halo kak Ardi apa kabar kakk'), Message(sender=<Role.PlatformReprentative: 'platform representative'>, content='kakk Ardi, udh dpt QR codenya?'), Message(sender=<Role.Merchant: 'merchant'>, content='belum ka'), Message(sender=<Role.PlatformReprentative: 'p

Prediction(
    context=["{'campaign_ids': '[7457018658356889361]', 'title': 'ID 2025 Tokopedia Promo Special January WIB-Guncang 2.2', 'abstract': 'ID 2025 Shop | Tokopedia Promo Special January WIB-Guncang 2.2'}", "{'campaign_ids': '[7457094002836817681]', 'title': 'ID 2025 Tokopedia Promo Spesial Ramadhan ', 'abstract': 'ID 2025 Tokopedia Promo Spesial Ramadhan'}", "{'campaign_ids': '[7460401858106001153]', 'title': 'ID 2025 Shop | Tokopedia Weekly Sale, Jan WIB, Imlek Sale & 2.2 [Paakage Sellers]', 'abstract': 'ID 2025 Shop | Tokopedia Weekly Sale, Jan WIB, Imlek Sale & 2.2 [Paakage Sellers]'}", "{'campaign_ids': '[7456708228971235088]', 'title': 'ID 2025 Shop|Tokopedia January WIB&Imlek Sale&D2', 'abstract': 'ID 2025 Shop|Tokopedia January WIB&Hoki Sale&D2'}", "{'campaign_ids': '[7460867191760504592]', 'title': 'ID 2025 Shop | Tokopedia Ramadan Ekstra Seru', 'abstract': 'ID 2025 Shop | Tokopedia Ramadan Ekstra Seru'}"],
    answer=[CampaignInfo(name='Shop | Tokopedia Ramadan Ekstr