In [1]:
%cd /code/

import asyncio
import json
import re
import os
import time
import sys
from pathlib import Path

import pandas as pd
from tqdm.auto import tqdm
from tqdm.asyncio import tqdm_asyncio
import openai  # or your openai-like library that supports async

# Suppose your client library has an async method client.chat.completions.acreate(...)
# If it does not, you will have to wrap the blocking call in a function that runs in a thread.

async def chat_async(
    client: openai.AsyncClient,
    system_prompt: str,
    user_prompt: str,
    model: str = 'default',
):
    """Async version of the chat function."""
    messages = [
        {'role': 'system', 'content': system_prompt},
        {'role': 'user',   'content': user_prompt},
    ]
    # Retry loop to handle transient OpenAI errors
    while True:
        try:
            request = await client.chat.completions.create(
                model=model,
                messages=messages,
                temperature=0,
                max_tokens=1024,
            )
        except openai.OpenAIError as e:
            print(e)
            print('Retrying in 5 seconds...')
            await asyncio.sleep(5)
            continue
        else:
            break

    # Extract response from the result
    if request.choices[0].message.content:
        response = request.choices[0].message.content
    elif getattr(request.choices[0].message, 'refusal', None):
        response = request.choices[0].message.refusal
    else:
        response = ''
    return response


async def process_single_row(
    row,
    p_vlm_outdir,
    p_llm_outdir,
    system_prompt,
    llm_model,
    client,
    semaphore=asyncio.Semaphore(4),
    debug=False,
):
    """Helper coroutine to process a single row of df_ann_test (one video entry)."""
    p_json = (p_vlm_outdir / row['label'] / row['video']).with_suffix('.json')
    if not p_json.exists():
        print(f"Skipping generating llm captions of {p_json} (does not exist).")
        return

    video_record = json.load(p_json.open())
    p_json_new = (p_llm_outdir / video_record['label'] / video_record['video']).with_suffix('.json')
    if p_json_new.exists():
        print(f"Skipping {p_json_new}")
        return
    else:
        p_json_new.parent.mkdir(exist_ok=True, parents=True)
        print(f'\nProcessing {p_json}\n\t-> {p_json_new}', flush=True)

    for response_record in video_record['response_records']:
        user_prompt = f"Scene Description: {response_record['response']}"
        async with semaphore:
            llm_response = await chat_async(
                client,
                system_prompt=system_prompt,
                user_prompt=user_prompt,
                model=llm_model
            )
        response_record['score_raw'] = llm_response

        # If you had special logic for a certain model, you can keep it:
        if 'DeepSeek-R1' in llm_model:
            llm_response = re.sub(r'^(?s:.)*</think>', '', llm_response).strip()

        # Attempt to parse out the numeric score
        try:
            score = eval(llm_response)[0]
        except Exception as e:
            print(e, file=sys.stderr)
            print(response_record, llm_response, file=sys.stderr)
            score = None
        finally:
            response_record['score'] = score

        if debug:
            tqdm.write(json.dumps(response_record, indent=2))

    json.dump(video_record, p_json_new.open('w'), indent=2)


async def generate_llm_async(
    host: str = 'localhost',
    port: int = 30002,
    rank: int = 0,
    world_size: int = 1,
    vlm_model: str = 'lmms-lab/llava-onevision-qwen2-7b-ov',
    llm_model: str = 'meta-llama/Llama-3.2-3B-Instruct',
    prompt_vlm: str = "Describe the video in a few sentences.",
    prompt_llm_system_language: str = "en",
    duration_sec=1,
    whole_context_as_prompt: bool = False,
    debug: bool = False,
):
    from pathlib import Path
    import pandas as pd

    # Suppose we have a global or a top-level definition
    OPENAI_MODELS = [
        'gpt-4o', 'chatgpt-4o-latest', 'gpt-4o-mini',
        'o1', 'o1-mini', 'o3-mini', 'gpt-4-turbo',
    ]

    p_annroot = Path('./data/annotations')
    p_ann_test = p_annroot / 'Temporal_Anomaly_Annotation_for_Testing_Videos.txt'
    p_vlm_outdir = Path('output/ucf-crime-captions') / f"prompt={prompt_vlm.replace(' ', '_')}_duration_{duration_sec}s/raw"
    df_ann_test = pd.read_csv(
        p_ann_test, sep=r'\s+', header=None, names=['video', 'label', 's1', 'e1', 's2', 'e2']
    )
    p_llm_outdir = p_vlm_outdir.parent / llm_model.replace('/', '-') / prompt_llm_system_language
    p_llm_outdir.mkdir(exist_ok=True, parents=True)

    # Because we handle only rank::world_size, slice the df for multi-process usage
    df_ann_test = df_ann_test.iloc[rank::world_size]

    # Prepare the OpenAI-like client
    if llm_model not in OPENAI_MODELS:
        # Example: If your local server is asynchronous, confirm that with your library
        server_address = f'http://{host}:{port}'
        print(f'Using local LLM server at {server_address}...')
        # Possibly wait for server or do something else
        client = openai.AsyncClient(api_key='EMPTY', base_url=f'{server_address}/v1')
    else:
        print('Using OpenAI API', flush=True)
        client = openai.AsyncClient(api_key=os.environ.get('OPENAI_API_KEY'))

    # Compose system prompt
    if prompt_llm_system_language == 'en':
        context_prompt = (
            "The following is a scene description. If you were a law enforcement agency, "
            "how would you rate this scene on a scale from 0 to 1, with 0 representing a "
            "standard scene and 1 denoting a scene with suspicious activities?"
        )
        format_prompt = (
            "Please provide the response in the form of a Python list and respond with only "
            "one number in the provided list below [0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, "
            "0.9, 1.0] without any textual explanation. It should begin with '[' and end with  ']'."
        )
    else:
        context_prompt = (
            "다음은 비디오에 대한 설명이야. 만약 너가 경찰이라면 이 장면의 이상한 정도를 "
            "0부터 1까지의 점수 중에 어떻게 평가할 거야? 0은 일상적인 장면이고 1은 의심스러운 "
            "활동이 있는 장면이야."
        )
        format_prompt = (
            "아래에 제공된 목록에서 하나의 숫자로만 응답해줘. [0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, "
            "0.7, 0.8, 0.9, 1.0] 이 숫자는 어떤 설명도 없이 제공되어야 해. "
            "'['로 시작해서 ']'로 끝나야 해."
        )

    system_prompt = f'{context_prompt} {format_prompt}'

    # Now run the async processing for each row:
    semaphore = asyncio.Semaphore(6)
    tasks = []
    # for _, row in tqdm(df_ann_test.iterrows(), total=len(df_ann_test), desc='LLM scoring'):
    for _, row in df_ann_test.iterrows():
        tasks.append(
            process_single_row(
                row=row,
                p_vlm_outdir=p_vlm_outdir,
                p_llm_outdir=p_llm_outdir,
                system_prompt=system_prompt,
                llm_model=llm_model,
                client=client,
                semaphore=semaphore,
                debug=debug
            )
        )

    # Gather tasks concurrently. This is where the actual parallelism with asyncio happens.
    # await asyncio.gather(*tasks)
    await tqdm_asyncio.gather(*tasks)


def main():
    """Helper function to run the above async pipeline in a standard script."""
    # Just run the coroutine
    asyncio.run(
        generate_llm_async(
            host='llm_server',
            port=30002,
            rank=0,
            world_size=1,
            # etc. - pass other arguments as needed
        )
    )

await generate_llm_async(
    host='llm_server',
    port=30002,
    rank=0,
    world_size=1,
    llm_model='meta-llama/Llama-3.1-8B-Instruct',
    # etc. - pass other arguments as needed
)


/code
Using local LLM server at http://llm_server:30002...


  0%|          | 0/290 [00:00<?, ?it/s]


Processing output/ucf-crime-captions/prompt=Describe_the_video_in_a_few_sentences._duration_1s/raw/Explosion/Explosion011_x264.json
	-> output/ucf-crime-captions/prompt=Describe_the_video_in_a_few_sentences._duration_1s/meta-llama-Llama-3.1-8B-Instruct/en/Explosion/Explosion011_x264.json

Processing output/ucf-crime-captions/prompt=Describe_the_video_in_a_few_sentences._duration_1s/raw/Normal/Normal_Videos_634_x264.json
	-> output/ucf-crime-captions/prompt=Describe_the_video_in_a_few_sentences._duration_1s/meta-llama-Llama-3.1-8B-Instruct/en/Normal/Normal_Videos_634_x264.json

Processing output/ucf-crime-captions/prompt=Describe_the_video_in_a_few_sentences._duration_1s/raw/Normal/Normal_Videos_929_x264.json
	-> output/ucf-crime-captions/prompt=Describe_the_video_in_a_few_sentences._duration_1s/meta-llama-Llama-3.1-8B-Instruct/en/Normal/Normal_Videos_929_x264.json

Processing output/ucf-crime-captions/prompt=Describe_the_video_in_a_few_sentences._duration_1s/raw/Shoplifting/Shopliftin

  0%|          | 1/290 [00:00<01:46,  2.72it/s]

100%|██████████| 290/290 [20:55<00:00,  4.33s/it] 
