In [1]:
import pandas as pd
from pydantic_ai import Agent
import dotenv
from tqdm.auto import tqdm
from whatstk import WhatsAppChat
import uuid


# dotenv.load_dotenv()

# file_path = '../data/WhatsAppChat/_chat.txt'

# wa_chat = WhatsAppChat.from_source(filepath=file_path)

# file_path = '../data/WhatsAppChat/whatsmeow_contacts_202502090741.csv'
# contacts_df = pd.read_csv(file_path)

# def match_and_rename_users(wa_chat: WhatsAppChat, contacts_df: DataFrame) -> WhatsAppChat:
#     dict_of_users = defaultdict(list)

#     contacts_df.fillna("", inplace=True)

#     for index, row in contacts_df.iterrows():
#         phone_number = row['their_jid'].split('@')[0]
#         # Using standard hyphen and handling variable length numbers
#         long_number = f'+{phone_number[0:3]} {phone_number[3:5]}-{phone_number[5:8]}-{phone_number[8:]}'
#         dict_of_users[phone_number].extend([long_number])

#         if row['full_name']:
#             dict_of_users[phone_number].extend([
#                 row['full_name'],
#                 f"~ {row['full_name']}"
#             ])

#         elif row['push_name']:
#             dict_of_users[phone_number].extend([
#                 row['push_name'],
#                 f"~ {row['push_name']}"
#             ])

#     dict_of_users = {k: list(set(v)) for k, v in dict_of_users.items()}

#     swapped_names = wa_chat.rename_users(mapping=dict_of_users)
#     return swapped_names

# # renames users to phone numbers for latter tagging use
# renamed_wa_chat = match_and_rename_users(wa_chat, contacts_df)

# chat_df = renamed_wa_chat.df

# #filter out not interesting messages
# patterns = '|'.join([
# "This message was deleted",
# "you deleted this message.",
# "you deleted this message as admin",
# "Contact card omitted",
# "GIF omitted",
# "image omitted",
# "video omitted",
#  r'\d{3}[-‐]?\d{3,4}\s+left',  # matches phone number patterns followed by "left"
# "requested to join",
# "Your security code with ",
# 'security code',
# 'pinned a message',
# 'changed their phone number to a new number',
# joined using this group\'s invite link',
# 'Waiting for this message'
# ])

# chat_df = chat_df[~chat_df['message'].str.contains(patterns, case=False, na=False, regex=True)]


# chat_df['group'] = "GenAI Israel"
# chat_df['id'] = [f"imported_{uuid.uuid4()}" for _ in range(len(chat_df))]

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
import pandas as pd

chat_df_file_path = "../data/WhatsAppChat/chat_gen_ai_1_2025_02_17.csv"
chat_df = pd.read_csv(chat_df_file_path)

Groups chat to Conversations. From Conversations to topics.

In [3]:
import asyncio
from models import Message
from typing import Dict
from daily_ingest.daily_ingest import get_conversation_topics, load_topics
from sqlalchemy.ext.asyncio import create_async_engine
from sqlmodel.ext.asyncio.session import AsyncSession
from sqlmodel import select
from config import Settings
from models.group import Group
from voyageai.client_async import AsyncClient


settings = Settings()  # pyright: ignore [reportCallIssue]

engine = create_async_engine(
    settings.db_uri,
    pool_size=20,
    max_overflow=40,
    pool_timeout=30,
    pool_pre_ping=True,
    pool_recycle=600,
    future=True,
)
db_session = AsyncSession(engine)

embedding_client = AsyncClient(
    api_key=settings.voyage_api_key, max_retries=settings.voyage_max_retries
)

group_from_file = (
    await db_session.exec(select(Group).where(Group.group_name == chat_df["group"][1]))
).all()
group_jid = group_from_file[0].group_jid


def _identify_conversations(df, time_threshold_minutes):
    """
    Identifies separate conversations in WhatsApp chat data based on time gaps between messages.

    Parameters:
    df (pandas.DataFrame): DataFrame containing WhatsApp chat data with 'date' column
    time_threshold_minutes (int): Time gap (in minutes) to consider as a new conversation

    Returns:
    pandas.DataFrame: Original DataFrame with an additional 'conversation_id' column
    """
    # Make sure we're working with a copy
    df = df.copy()

    # Convert date column to datetime if it's not already
    df["date"] = pd.to_datetime(df["date"])

    # Sort by date
    df = df.sort_values("date")

    # Initialize conversation ID
    conversation_id = 0
    conversation_ids = [conversation_id]

    # Iterate through messages (except the first one)
    for i in range(1, len(df)):
        current_time = df.iloc[i]["date"]
        previous_time = df.iloc[i - 1]["date"]

        # Calculate time difference in minutes
        time_diff = (current_time - previous_time).total_seconds() / 60

        # If time difference is greater than threshold, start new conversation
        if time_diff > time_threshold_minutes:
            conversation_id += 1

        conversation_ids.append(conversation_id)

    # Add conversation IDs to DataFrame
    df["conversation_id"] = conversation_ids
    return df


def _create_user_mapping(wa_df: pd.DataFrame) -> Dict[str, str]:
    """
    Creates a mapping of usernames to shortened names (@user_[id]),
    where more frequent speakers get lower IDs.

    Parameters:
    df (pandas.DataFrame): DataFrame containing WhatsApp chat data with 'username' column

    Returns:
    Dict[str, str]: Mapping of original usernames to shortened names
    """
    # Count messages per user and sort by frequency (descending)
    user_counts = wa_df["username"].value_counts()

    # Create mapping with lower IDs for more frequent speakers
    user_mapping = {
        username: f"@user_{i+1}" for i, (username, _) in enumerate(user_counts.items())
    }

    return user_mapping


async def _process_conversation(conv_id: int) -> Dict:
    """Helper function to process a single conversation"""
    conv_data = df_with_conversation_ids[
        df_with_conversation_ids["conversation_id"] == conv_id
    ]
    messages = [
        Message(
            message_id=f"na-{row['date']}",
            timestamp=row["date"],
            chat_jid=group_jid,
            text=row["message"],
            sender_jid=row["username"],
            group_jid=group_jid,
        )
        for _, row in conv_data.iterrows()
    ]
    topics = await get_conversation_topics(messages)
    await load_topics(
        db_session,
        group_jid,
        embedding_client,
        topics,
        conv_data["date"].min().to_pydatetime(),
    )


async def _bounded_process_conversation(conversation_id: int) -> Dict:
    async with semaphore:
        return await _process_conversation(conversation_id)


user_mapping = _create_user_mapping(chat_df)

# Add mapped usernames as a new column
chat_df["mapped_username"] = chat_df["username"].map(user_mapping)


# Identify conversations
df_with_conversation_ids = _identify_conversations(
    chat_df, time_threshold_minutes=60 * 3
)

conversation_ids = df_with_conversation_ids["conversation_id"].unique()

# If you want to limit the number of conversations to process
conversation_ids = conversation_ids[:10]

# Create a semaphore to limit concurrency to max_concurrency tasks
max_concurrency = 3
semaphore = asyncio.Semaphore(max_concurrency)

# Process all conversations concurrently, limited by the semaphore
await asyncio.gather(
    *[
        _bounded_process_conversation(conversation_id)
        for conversation_id in conversation_ids
    ]
)

[None, None, None, None, None, None, None, None, None, None]