In [5]:
!pip install polars pydantic pydantic-ai-slim[openai] tqdm
!wget "https://minio.misile.xyz/noa/datasets/comments.avro.zst"
!zstd -d --rm comments.avro.zst

--2025-06-18 06:46:19--  https://minio.misile.xyz/noa/datasets/comments.avro.zst
Resolving minio.misile.xyz (minio.misile.xyz)... 211.219.106.17
Connecting to minio.misile.xyz (minio.misile.xyz)|211.219.106.17|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 15762868 (15M) [application/octet-stream]
Saving to: ‘comments.avro.zst.1’


2025-06-18 06:46:26 (2.70 MB/s) - ‘comments.avro.zst.1’ saved [15762868/15762868]

comments.avro.zst   : 54432695 bytes                                           


In [6]:
from pathlib import Path
from json import dumps
from types import CoroutineType
import asyncio
from itertools import islice

from pydantic_ai import Agent
from polars import read_avro, col, DataFrame, concat
from tqdm import tqdm

from utils import Data, read_cached_avro, ProcessedData

comments = read_avro("comments.avro")
df = read_cached_avro("processed.avro")

prompt = Path("prompt").read_text()
comments_iter = comments.iter_rows(named=True)

def append(df: DataFrame, data: ProcessedData) -> DataFrame:
  return concat([df, DataFrame(data.model_dump())], how="vertical", rechunk=True)

def get_batch_size() -> int:
  return 2

async def process_comment(agent: Agent, data: Data, comments_df: DataFrame) -> ProcessedData | None:
  if parent_id := data.parent_id:
    parent = Data.model_validate(comments_df.filter(col('comment_id') == parent_id).to_dicts()[0])
    parent_string = dumps(parent.model_dump(exclude={"comment_id", "parent_id", "author_image_url", "video_id"}), ensure_ascii=False)
  else:
    parent_string = "" 
  current_string = dumps(data.model_dump(exclude={"comment_id", "parent_id", "author_image_url", "video_id"}), ensure_ascii=False)
  response = await agent.run(f"""first profile image is the current comment, second (if exist) is the parent comment.
  current comment: {current_string}
  parent comment: {parent_string}""")

  if response.output in ["A", "B"]:
    return ProcessedData(
      is_bot_comment=response.output == "A",
      **data.model_dump() # pyright: ignore[reportAny]
    )
  return None

async def process_batch(batch: list[dict[str, str]], agent: Agent, comments_df: DataFrame) -> list[ProcessedData]:
  tasks: list[CoroutineType[None, None, ProcessedData | None]] = []
  for item in batch:
    data = Data.model_validate(item)
    tasks.append(process_comment(agent, data, comments_df))

  results = await asyncio.gather(*tasks)
  return [r for r in results if r is not None]

async def main():
  agent = Agent(
    'ollama:phi4-reasoning:plus',
    system_prompt=prompt
  )

  batch_size = get_batch_size()
  df = read_cached_avro("processed.avro")

  with tqdm(total=len(comments)) as pbar:
    while True:
      batch = list(islice(comments_iter, batch_size))
      if not batch:
        break

      processed_batch = await process_batch(batch, agent, comments)
      for processed in processed_batch:
        df = append(df, processed)

      df.write_avro("processed.avro")
      _ = pbar.update(len(batch))

  await main()


ModuleNotFoundError: No module named 'utils'