In [1]:
import nest_asyncio
import asyncio
import openai
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

nest_asyncio.apply()

api_key = "OPENAI_API_KEY"

In [2]:
def split_data(
    df: pd.DataFrame,
    label_col: str = 'label',
    test_size: float = 0.3,
    random_state: int = None
) -> tuple[pd.DataFrame]:

    def ensure_all_labels(original_df, train_df, test_df, label_col):
        original_labels = set(original_df[label_col])
        train_labels = set(train_df[label_col])
        missing_labels = original_labels - train_labels

        if missing_labels:
            for label in missing_labels:
                missing_sample = test_df[test_df[label_col] == label].sample(n=1)
                test_df = test_df[~(test_df[label_col].isin(missing_sample.labels))]
                train_df = pd.concat([train_df, missing_sample])
        return train_df, test_df

    try:
        train_df, test_df = train_test_split(
            df,
            test_size=test_size,
            stratify=df[label_col],
            random_state=random_state
        )

        train_df, test_df = ensure_all_labels(df, train_df, test_df, label_col)

    except ValueError as e:
        warnings.warn(
            f"{e} --- Falling back to random split. Consider reviewing extremely small classes."
        )

        # Fallback to random split if stratified split fails
        train_df, test_df = train_test_split(
            df,
            test_size=test_size,
            random_state=random_state
        )

        train_df, test_df = ensure_all_labels(df, train_df, test_df, label_col)

    return train_df, test_df

In [3]:
class BatchIterator:
    def __init__(self, prompt_list, uid_list, batch_size):
        self.prompt_list = prompt_list
        self.uid_list = uid_list
        self.batch_size = batch_size
        self.current_index = 0

    def __iter__(self):
        return self

    def __next__(self):
        if self.current_index < len(self.prompt_list):
            start_index = self.current_index
            self.current_index += self.batch_size
            end_index = self.current_index
            return (
                self.prompt_list[start_index:end_index],
                self.uid_list[start_index:end_index]
            )
        else:
            raise StopIteration

def construct_classification_prompt(text: str) -> str:
    
    categories = [
        "Pump announcement",
        "Countdown message",
        "Coin release announcement",
        "Pump result",
        "Delay or cancellation announcement",
        "Other/Garbage Class"
    ]
    
    category_description = ", ".join([f"{i} - {category}" for i, category in enumerate(categories)])
    return f"""
    Classify the following message around pump and dump schemes in cryptocurrencies into one of these categories: {category_description}.
    
    Here follow some examples:
    0: 💎 NEW KUCOIN_CEX PUMP ANNOUNCEMENT 💎 ⏳ Date : Sunday , 11th June ⏰ Time : 4 pm GMT 🏰 Exchange : Kucoin_CEX 🐳 + 33 Whale Channels 🎯 Target : 500 % PROFIT ! ! !
    1: 5 minutes left until the signal on Mexc_CEX be ready . The next message will be the coin name to buy !
    2: BQT, MMDA / USDT, COIN IS : $ STRIKE
    3: 🔷 ️Pump Result 🔥 ✅ + 567 % Pump on XT_CEX 😊 ⚡ ️ 6x Profits in Minutes 🤑 100 $ Turned into more than 600 $ 💋 🔴 Coin Name : Mile 💯 🔸 ️Start : 0.02 🔸 ️Pump at : 0.19 🔥 💫 ❤ ️ Biggest Pump 😎 🤩 We are the King of Market 💋 ❤ 
    4: Todays Pump needs to be postponed . Unexplained volume was added to our pick . Therefore to protect everyone we will reschedule
    5: Buy Avax Entry : 40.154 - 40.023 SL : 38.798
    
    Message: "{text}"
    
    Respond with the category number (from 0 to 5) only.
    """

async def dispatch_openai_requests(
    messages_list: list[list[dict]],
    model: str,
    temperature: float,
    max_tokens: int,
    top_p: float
) -> list[str]:
    """
    Dispatches requests to OpenAI API asynchronously.
    """
    client = openai.AsyncOpenAI(api_key=api_key)

    async_responses = [
        client.chat.completions.create(
            model=model,
            messages=x,
            temperature=temperature,
            max_tokens=max_tokens,
            top_p=top_p,
        )
        for x in messages_list
    ]
    return await asyncio.gather(*async_responses)

async def call_api_async(
    messages_list: list[list[dict]],
    model: str,
    temperature: float,
    max_tokens: int,
    top_p: float
) -> list[str]:
    print(f"Calling APIs, {len(messages_list)} in total, temp={temperature}.")
    
    responses = await dispatch_openai_requests(
        messages_list=messages_list,
        model=model,
        temperature=temperature,
        max_tokens=max_tokens,
        top_p=top_p
    )

    answer = [x.choices[0].message.content for x in responses]

    print(f"API returns {len(answer)} in total.")
    return answer


async def main():    
    df = pd.read_csv(r'data/internal/training_data/training_data_no_duplicates_per_channel.csv')
    
    _, test_df = train_test_split(df, test_size=0.2, random_state=42)

    test_df['message'] = test_df['message'].astype(str)
    messages = test_df['message'].to_list()
    
    prompt_list = [[{'role': 'user', 'content': construct_classification_prompt(text)}] for text in messages]
    batch_iterator = BatchIterator(prompt_list, messages, batch_size=50)

    all_raw_responses = []
    batch_delay = 5
    retry_limit = 20
    delay_multiplier = 2

    for messages_batch, text_batch in batch_iterator:
        attempts = 0
        delay = batch_delay
        while attempts < retry_limit:
            try:
                return_msg = await call_api_async(
                    messages_batch,
                    model='gpt-4o',
                    temperature=0,
                    max_tokens=1,
                    top_p=1.0
                )

                all_raw_responses.extend([msg.strip() for msg in return_msg])
                await asyncio.sleep(batch_delay)

                break

            except openai.RateLimitError:
                print(f"Rate limit hit. Retrying after {delay} seconds...")
                await asyncio.sleep(delay)
                delay *= delay_multiplier  # Exponential backoff for retries

            finally:
                attempts += 1
    
    # Saving to csv file
    labels = pd.DataFrame(all_raw_responses, columns=['prediction'])
    predicted = pd.concat([test_df.reset_index(drop=True), labels], axis=1)
    predicted.to_csv("data/internal/gpt4o_zero_shot_predictions_with_examples.csv", index=False) 
    
await main()

Calling APIs, 50 in total, temp=0.
API returns 50 in total.
Calling APIs, 50 in total, temp=0.
Rate limit hit. Retrying after 5 seconds...
Calling APIs, 50 in total, temp=0.
Rate limit hit. Retrying after 10 seconds...
Calling APIs, 50 in total, temp=0.
Rate limit hit. Retrying after 20 seconds...
Calling APIs, 50 in total, temp=0.
Rate limit hit. Retrying after 40 seconds...
Calling APIs, 50 in total, temp=0.
API returns 50 in total.
Calling APIs, 50 in total, temp=0.
Rate limit hit. Retrying after 5 seconds...
Calling APIs, 50 in total, temp=0.
Rate limit hit. Retrying after 10 seconds...
Calling APIs, 50 in total, temp=0.
Rate limit hit. Retrying after 20 seconds...
Calling APIs, 50 in total, temp=0.
Rate limit hit. Retrying after 40 seconds...
Calling APIs, 50 in total, temp=0.
API returns 50 in total.
Calling APIs, 50 in total, temp=0.
Rate limit hit. Retrying after 5 seconds...
Calling APIs, 50 in total, temp=0.
Rate limit hit. Retrying after 10 seconds...
Calling APIs, 50 in tot

In [5]:
test_df = pd.read_csv(r"data/internal/few_shot_prediction/gpt4o_zero_shot_predictions_with_examples.csv")
print('\nClassification Report:')
categories = ['Pump Announcement', 'Countdown', 'Coin Release', 'Pump Result', 'Delay/Cancellation', 'Other/Garbage']
print(classification_report(test_df['label'].astype(int), test_df['prediction'].astype(int), target_names=categories))


Classification Report:
                    precision    recall  f1-score   support

 Pump Announcement       0.70      0.98      0.82       169
         Countdown       0.95      0.86      0.90       345
      Coin Release       0.96      0.86      0.91       151
       Pump Result       0.70      0.95      0.80       131
Delay/Cancellation       0.79      0.96      0.87        82
     Other/Garbage       0.92      0.77      0.84       541

          accuracy                           0.86      1419
         macro avg       0.84      0.90      0.86      1419
      weighted avg       0.88      0.86      0.86      1419

