# Whatstics

### This script summarizes a WhatsApp chat history.

<br>

🚨🚨🚨🚨🚨 CAUTION 🚨🚨🚨🚨🚨

The script use openai api and may cost you A LOT OF MONEY. Please be aware of that.

<br>

# 1. First configure Date limits and the paths

For example:
- START_DATE = "2024-08-23"
- END_DATE = "2024-08-23"
- PATH = "../history/chat.txt"
- OUTPUT_PATH = "../summary/output.txt"

Start and end date are inclusive, and must be equal

🚨🚨🚨🚨🚨 CAUTION 🚨🚨🚨🚨🚨

Bigger the date range, more expensive the cost.

In [None]:
START_DATE = "2024-08-29"
END_DATE = "2024-08-29"

FILE_PATH = "../history/conversa-ate-29.08.2024.txt"
SUMMARY_PATH = "../summary"

LLM_PROVIDER = "OPENAI" # GOOGLE or OPENAI

In [None]:
# 🚨🚨🚨🚨🚨 Do not change this variable! 🚨🚨🚨🚨🚨
SUMMARY_TITLE_PREFIX = "### *Resumo do Bate-Papo do WhatsApp* - Graduação em Inteligência Artificial - "

from datetime import datetime

start_datetime_prompt =  datetime.strptime(START_DATE, "%Y-%m-%d").strftime("%d/%m/%Y")
end_datetime_prompt = datetime.strptime(END_DATE, "%Y-%m-%d").strftime("%d/%m/%Y")

print("Date range:", start_datetime_prompt, end_datetime_prompt)

date_range_prompt = ""
if start_datetime_prompt != end_datetime_prompt:
    date_range_prompt = f"de {start_datetime_prompt} a {end_datetime_prompt}"
else:
    date_range_prompt = f"em {start_datetime_prompt}"
    
print("Title:")
SUMMARY_TITLE = f"{SUMMARY_TITLE_PREFIX}*{date_range_prompt}*"
print(SUMMARY_TITLE)
print("\n")

In [None]:
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()


In [None]:
import pandas as pd
import re


def add_mesage_to_df(
    messages, date, time, id, message, type_="message", subtype=None, is_summary=False
):
    if is_summary:
        print({
            "Date": date,
            "Time": time,
            "Id": id,
            "Message": message,
            "Type": type_,
            "Action_type": subtype,
            "Is_summary": is_summary,
        })
    messages.append(
        {
            "Date": date,
            "Time": time,
            "Id": id,
            "Message": message,
            "Type": type_,
            "Action_type": subtype,
            "Is_summary": is_summary,
        }
    )
    return messages


def manage_action(messages, current_date, current_time, id, rest_of_line):
    if "entrou usando o link de convite deste grupo" in rest_of_line:
        action_type = "entry"
        id = rest_of_line.split("entrou usando o link de convite deste grupo")[
            0
        ].strip()
    elif "saiu" in rest_of_line:
        action_type = "exit"
        id = rest_of_line.split("saiu")[0].strip()
    elif "mudou a descrição do grupo" in rest_of_line:
        action_type = "description_change"
        id = rest_of_line.split("mudou a descrição do grupo")[0].strip()
    elif "fixou uma mensagem" in rest_of_line:
        action_type = "message_pin"
        id = rest_of_line.split("fixou uma mensagem")[0].strip()
    elif "mudou as configurações" in rest_of_line:
        action_type = "group_settings_change"
        id = rest_of_line.split("mudou as configurações do grupo")[0].strip()
    elif "(arquivo anexado)" in rest_of_line:
        action_type = "file_attach"
        id = rest_of_line.split("(arquivo anexado)")[0].strip()
    elif "criou o grupo" in rest_of_line:
        action_type = "group_create"
        id = rest_of_line.split("criou o grupo")[0].strip()
    elif "foi adicionado(a)" in rest_of_line:
        action_type = "was_addded_by_someone"
        id = rest_of_line.split("foi adicionado(a)")[0].strip()
    elif "adicionou" in rest_of_line:
        action_type = "added_someone"
        id = rest_of_line.split("adicionou")[0].strip()
    else:
        print("Error line:", rest_of_line)
        assert False

    add_mesage_to_df(
        messages,
        current_date,
        current_time,
        id,
        rest_of_line,
        "action",
        subtype=action_type,
    )


def parse_whatsapp_history(file_path):
    # Lista para armazenar as mensagens extraídas
    messages = []

    # Ler o arquivo de histórico
    with open(file_path, "r", encoding="utf-8") as file:
        lines = file.readlines()

    # Regex para identificar a data, hora e número do remetente
    date_and_time_pattern = re.compile(r"(\d{2}/\d{2}/\d{4}) (\d{2}:\d{2}) - (.*)")

    # Variáveis para acumular mensagens
    current_date = None
    current_time = None
    current_id = None
    current_message = None
    mark_as_summary = False

    for line in lines:
        # Verifica se a linha corresponde ao padrão de uma nova mensagem
        match = date_and_time_pattern.match(line)
        if match:
            if current_message:
                messages = add_mesage_to_df(
                    messages,
                    current_date,
                    current_time,
                    current_id,
                    current_message,
                    is_summary=mark_as_summary,
                )
            current_message = ""
            current_date, current_time, rest_of_line = match.groups()
            if "‎" in rest_of_line:
                manage_action(
                    messages, current_date, current_time, current_id, rest_of_line
                )
            elif ":" in rest_of_line and rest_of_line.split(":")[1].startswith(
                f" {SUMMARY_TITLE_PREFIX}"
            ):
                current_id, message = rest_of_line.split(":", 1)
                current_message = message.strip()
                mark_as_summary = True
            else:
                if ":" in rest_of_line:
                    mark_as_summary = False
                    current_id, message = rest_of_line.split(":", 1)
                    current_message = message.strip()
        else:
            current_message += "\n" + line.strip()

    messages = add_mesage_to_df(
        messages,
        current_date,
        current_time,
        current_id,
        current_message,
        "message",
        is_summary=mark_as_summary,
    )

    df = pd.DataFrame(messages)

    df["Order"] = range(1, len(df) + 1)

    df["DateTime"] = pd.to_datetime(
        df["Date"].astype(str) + " " + df["Time"].astype(str)
    )

    return df

In [None]:
# Exemplo de uso
history = parse_whatsapp_history(FILE_PATH)
history

In [None]:
history[history["Type"] != "message"].head()

In [None]:
# print until 100 lines each column
pd.set_option("display.max_colwidth", 112)
history[history["Is_summary"]].tail()

In [None]:
history.info()

In [None]:
print("Range de data para o arquivo carregado:")
print("Min DateTime:", history["DateTime"].min())
print("Max DateTime:", history["DateTime"].max())

In [None]:
from datetime import datetime, timedelta

print("Adições de hoje")
start_datetime = datetime.strptime(START_DATE, "%Y-%m-%d")
end_datetime = datetime.strptime(END_DATE, "%Y-%m-%d") + timedelta(days=1)

entries = history[
    (history["Action_type"].isin(["was_added_by_someone", "added_someone", "entry"]))
    & (history["DateTime"] >= start_datetime)
    & (history["DateTime"] <= end_datetime)
]
entries["Id"].values.tolist()

In [None]:
from datetime import datetime, timedelta

# Convert strings to datetime objects
start_datetime = datetime.strptime(START_DATE, "%Y-%m-%d")
end_datetime = datetime.strptime(END_DATE, "%Y-%m-%d") + timedelta(days=1)

print(start_datetime, end_datetime)

messages_df = history[
    (
        (history["Type"] == "message")
        | (history["Action_type"].isin(["was_added_by_someone", "added_someone", "entry"]))
    )
    & (history["DateTime"] >= start_datetime)
    & (history["DateTime"] < end_datetime)
    & (~history["Is_summary"])
]
messages_df

In [None]:
messages_df[messages_df["Type"] != "message"]

In [None]:
print("Range de data para os dados do resumo:")
print("Min DateTime:", messages_df["DateTime"].min())
print("Max DateTime:", messages_df["DateTime"].max())

In [None]:
from pprint import pprint

# Full chat string
# Every new message starts with a new line followed by NM:
full_chat_str = " ".join(messages_df["Message"].values)

pprint(full_chat_str[100:400])

In [None]:

from pprint import pprint

RANKING_PROMPT = """
Analise as mensagens do grupo de WhataApp do curso de Graduação em Inteligência Artificial. 
Classifique os assuntos discutidos com base na relevância para a turma.
Considere a importância das informações compartilhadas, a frequência com que foram discutidas e a quantidade de interações que geraram.
Informações consideradas importantes podem incluir:
    - contatos (LinkedIn, redes sociais, emails e telefones);
    - novos membros
    - recursos educacionais (links, playlists do youtube e outros, livros, cursos etc);
    - indicação de softwares;
    - notas de aula;
    - vagas de emprego;
    - recomendações de empresas ou de serviços específicos;
    - informações acadêmicas (eventos, datas, prazos, trabalhos, atividades);
    - quaisquer outros tópicos que sejam relevantes.
Por favor, retorne apenas uma lista com um título para as categorias mais relevantes (máximo de 6 categorias), sem nenhuma informação adicional.
"""

NEWSLETTER_PROMPT = f"""Por favor, forneça um parágrafo para abrir uma newsletter cobrindo os seguintes tópicos:"""

pprint(RANKING_PROMPT)

In [None]:
def print_model(model):
    print("Uging model: ")
    print(model.dict())
    print('\n\n')

In [None]:
import langchain
from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_google_genai import ChatGoogleGenerativeAI

if LLM_PROVIDER == "GOOGLE":
    llm = ChatGoogleGenerativeAI(model="gemini-1.5-pro", max_tokens=2500)
    model_name = "gemini-1.5-pro"
elif LLM_PROVIDER == "OPENAI":
    llm = ChatOpenAI(model="gpt-4o-mini", max_tokens=2500)
    model_name = "gpt-4o-mini"
else:
    raise ValueError("Invalid LLM_PROVIDER value")

print_model(llm)

In [None]:

ranking_prompt = PromptTemplate(
    template="{instruction}\nAs mensagens: ```\{messages}```",
    input_variables=["instruction", "messages"],
)

ranking_chain = ranking_prompt | llm | StrOutputParser()


langchain.debug = False

ranking_response = ranking_chain.invoke(
    {
        "instruction": RANKING_PROMPT,
        "messages": full_chat_str,
    }
)

pprint(ranking_response)

In [None]:
# SUMMARY_PROMPT = """
# Você receberá um histórico de conversas do WhatsApp. Para cada um dos seguintes tópicos: "{topics}", analise as mensagens e
# faça um resumo em subtópicos do que discutido. Mostre todos os links que foram compartilhados sobre cada tópico.
# """

SUMMARY_PROMPT = (
    "Você receberá um histórico de conversas do WhatsApp. Para cada um dos "
    'seguintes tópicos: "{topics}", '
    "analise as mensagens e faça um resumo em subtópicos do que foi discutido. "
    "Para cada subtópico, liste **todos** os links compartilhados explicitamente, sem omitir nenhum. "
    "Se houver links adicionais que não estejam claramente associados a nenhum dos tópicos listados, "
    'crie uma categoria "Outros links" e inclua-os lá. Seja minucioso na listagem de links para garantir '
    "que nenhum seja deixado de fora. "
    "Não incluir título, inicie pelos tópicos. "
    "Cite os contatos de todos os novos membros, sem destacar nenhum deles. "
)

summary_instruction = SUMMARY_PROMPT.format(
    topics=ranking_response, SUMMARY_TITLE=SUMMARY_TITLE
)
pprint(summary_instruction)

In [None]:

summary_prompt = PromptTemplate(
    template="{instruction}\nAs mensagens: ```\{messages}```",
    input_variables=["instruction", "messages"],
)

summary_chain = ranking_prompt | llm

langchain.debug = False

summary_response = summary_chain.invoke({
    "instruction": summary_instruction,
    "messages": full_chat_str,
})

pprint(summary_response)

In [None]:
summary_response.usage_metadata

In [None]:
def calculate_cost(input_tokens: int, output_tokens: int, llm_model: str):
    # Dollar costs above 128k tokens
    dollar_dict = {
        "gpt-4o-mini": {
            "input_cost_1M_tokens": 0.15,
            "output_cost_1M_tokens": 0.6,
        },
        "gemini-1.5-pro": {
            "input_cost_1M_tokens": 0.075,
            "output_cost_1M_tokens": 0.3,
        },
    }

    input_cost = dollar_dict[llm_model]["input_cost_1M_tokens"]
    output_cost = dollar_dict[llm_model]["output_cost_1M_tokens"]
    total_cost = input_cost * input_tokens / 1e6 + output_cost * output_tokens / 1e6
    print("Custo total em dólares:", total_cost)
    return total_cost

dollars = calculate_cost(
    summary_response.usage_metadata["input_tokens"],
    summary_response.usage_metadata["output_tokens"],
    model_name,
)

In [None]:
summary_to_save = (
    SUMMARY_TITLE
    + "\n\nGerado pelo [**Whatstics**](**https://github.com/brunoconterato/Whatstics**). Contribua!\n\n"
    + f"Modelo gerador: {model_name}\n\n"
    + summary_response.content
)
pprint(summary_to_save)

In [None]:
# copy to clipboard:
import pyperclip
pyperclip.copy(summary_to_save)

In [None]:
start_range = datetime.strptime(START_DATE, "%Y-%m-%d").strftime("%d-%m-%Y")
end_range = datetime.strptime(END_DATE, "%Y-%m-%d").strftime("%d-%m-%Y")

if start_range == end_range:
    date_range_filename = f"summary_whatsapp_on_{start_range}"
else:
    date_range_filename = f"summary_whatsapp_from_{start_range}_to_{end_range}"

llm_filename = f"_{LLM_PROVIDER}".lower()
extension = ".txt"

filepath = f"{SUMMARY_PATH}/{date_range_filename}{llm_filename}{extension}"

# save to file
with open(filepath, "w") as file:
    file.write(summary_to_save)
    print(f"Saved summary to: {filepath}")

In [None]:
# 🚨🚨🚨🚨🚨 Chain multipla! 🚨🚨🚨🚨🚨
# 🚀🚀🚀🚀🚀 Apenas aprendizado! O script vai rodar separado mesmo 🚀🚀🚀🚀🚀 

# from operator import itemgetter

# from langchain_core.output_parsers import StrOutputParser
# from langchain_openai import ChatOpenAI

# ranking_prompt = PromptTemplate(
#     template="{instruction}\nAs mensagens: ```\{messages}```",
#     input_variables=["instruction", "messages"],
# )
# summary_prompt = PromptTemplate(
#     template=SUMMARY_PROMPT,
#     input_variables=["topics", "SUMMARY_TITLE"]
# )

# model = ChatOpenAI()

# ranking_chain = ranking_prompt | model | StrOutputParser()

# summary_chain = (
#     # A variável "topics" é exatamente a saída do ranking_chain
#     {"topics": ranking_chain, "SUMMARY_TITLE": itemgetter("SUMMARY_TITLE")}
#     # Entradas do summary_prompt: input_variables=["topics", "SUMMARY_TITLE"]
#     | summary_prompt
#     | model
#     | StrOutputParser()
# )
# langchain.debug = False

# summary_chain.invoke(
#     {
#         "instruction": RANKING_PROMPT,
#         "messages": full_chat_str,
#         "SUMMARY_TITLE": SUMMARY_TITLE,
#     }
# )