# Telegram data

First step is to fetch data from telegram for a specific channel or a group of channels.

With the data available, I use OpenAI api to give labels to each message in the data series.



In [1]:
import os
import json
import pandas as pd
import numpy as np
import time
import math
from telethon.sync import TelegramClient
from IPython.display import display

from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv()) # read local .env file

pd.options.display.max_columns = None

# do not truncate column width in pandas
pd.options.display.max_colwidth = 200

## Data fetching

Here we extract telegram messages from different channels. This data will be used in other notebooks in this repository to train an ML model.
Environment variables `TELEGRAM_API_ID` and `TELEGRAM_API_ID` should be setup. Also your telegram phone number and user should be given in the variables below.

The channels to fetch data from should be choosen depending on the classification task we want to train a model for.

For sentiment classification, any channel can be used. But for summarization tasks, it makes more sense to use news channels.

In [52]:
api_id = os.environ["TELEGRAM_API_ID"]
api_hash = os.environ["TELEGRAM_API_ID"]
phone = "+34634454832"
username = "@elvesipeto"
messages = []

In [53]:
client = TelegramClient(f"../sessions_data/{phone}", api_id, api_hash)
chats = [
    # {"id":"@BitcoinBullets", "n_messages": 2000}, # a lot of crap
    # {"id":"@socryptoland", "n_messages": 1000},  # good for short news
    # {"id":"@bitcoin_industry", "n_messages": 1000}, # good for short news
    # {"id":"@crypto_retro", "n_messages": 1000}, # good for short news
    # {"id":"@crypto_fight", "n_messages": 1000}, # good for short news
    # {"id":"@crypto_lake", "n_messages": 1000}, # good for short news

    # {"id":"@crypto_miami", "n_messages": 1000}, # good for short news but needs cleaning
    # {"id":"@crypto_mountains", "n_messages": 1000}, # good for short news
    # {"id":"@tokens_stream", "n_messages": 1000}, # good for short news
    # {"id":"@maptoken", "n_messages": 1000}, # good for short news
    # {"id":"@getcoinit", "n_messages": 1000}, # good for short news
    # {"id":"@cryptonews", "n_messages": 1000}, # good for short news
    # {"id":"@cointelegraph", "n_messages": 10000}, # really good and a lot of data. only headlines and short descriptions available and needs cleaning.

    # interesting crypto projects
    # {"id":"@Teloscommunitychat", "n_messages": 5000},
    # {"id":"@kadena_io", "n_messages": 5000},
    {"id":"@runonflux", "n_messages": 10000},
    {"id":"@qtumofficial", "n_messages": 5000},
    {"id":"@singularitynet", "n_messages": 5000},
    {"id":"@oasisprotocolcommunity", "n_messages": 5000},
    {"id":"@oceanprotocol_community", "n_messages": 5000},

    # {"id":"@Solar", "n_messages": 5000},
    # {"id":"@helium_network", "n_messages": 5000}, # 3.6
    # {"id":"@fetch_ai", "n_messages": 5000},
    # {"id":"@VERGExvg", "n_messages": 5000},
    # {"id":"@DigiByteCoin", "n_messages": 5000},

    # {"id":"@centrifuge_chat", "n_messages": 5000}, # 2.9
    # {"id":"@HEXcrypto", "n_messages": 5000},
    # {"id":"@defiblockchain", "n_messages": 5000},
    # {"id":"@energyweb", "n_messages": 5000},
    # {"id":"@ergoplatform", "n_messages": 5000},
    # {"id":"@AlgorandFoundation", "n_messages": 5000},
    # {"id":"@tezosplatform", "n_messages": 5000},
    # {"id":"@KylinNetwork", "n_messages": 1000}, # 4.3

]

pd_data = []

columns = ["channel_name", "id", "peer_id", "date", "message", "out", "mentioned",
        "media_unread", "silent", "post", "from_scheduled", "legacy", 
        "edit_hide", "pinned","noforwards", "from_id", "fwd_from", "via_bot_id",
        "reply_to", "media", "reply_markup", "entities", "views",
        "forwards", "replies", "edit_date", "post_author", "grouped_id",
        "reactions", "restriction_reason", "ttl_period"]

async with client:
    for chat_data in chats:
        print("fetching data for ", chat_data)
        async for msg in client.iter_messages(chat_data["id"], chat_data["n_messages"]):
            pd_data.append((chat_data["id"], msg.id,msg.peer_id, msg.date, msg.message,
                    msg.out, msg.mentioned, msg.media_unread, msg.silent,msg.post,
                    msg.from_scheduled, msg.legacy, msg.edit_hide, msg.pinned, msg.noforwards,
                    msg.from_id, msg.fwd_from, msg.via_bot_id, msg.reply_to, msg.media, msg.reply_markup,
                    msg.entities, msg.views, msg.forwards, msg.replies, msg.edit_date, msg.post_author,
                    msg.grouped_id, msg.reactions, msg.restriction_reason, msg.ttl_period
            ))

df = pd.DataFrame(pd_data, columns=columns)

fetching data for  {'id': '@runonflux', 'n_messages': 10000}
fetching data for  {'id': '@qtumofficial', 'n_messages': 5000}
fetching data for  {'id': '@singularitynet', 'n_messages': 5000}
fetching data for  {'id': '@oasisprotocolcommunity', 'n_messages': 5000}
fetching data for  {'id': '@oceanprotocol_community', 'n_messages': 5000}


In [54]:
# drop duplicates
print("df size with duplicates", df.shape)
df = df.drop_duplicates(subset=["message"])
print("df size after duplicates removal", df.shape)

df size with duplicates (30000, 31)
df size after duplicates removal (22480, 31)


In [55]:
df.sort_values(by="date").head(1)

Unnamed: 0,channel_name,id,peer_id,date,message,out,mentioned,media_unread,silent,post,from_scheduled,legacy,edit_hide,pinned,noforwards,from_id,fwd_from,via_bot_id,reply_to,media,reply_markup,entities,views,forwards,replies,edit_date,post_author,grouped_id,reactions,restriction_reason,ttl_period
14995,@qtumofficial,715747,PeerChannel(channel_id=1120694993),2022-11-05 14:59:43+00:00,"Hello, Welcome",False,False,False,False,False,False,False,False,False,False,PeerUser(user_id=1058206827),,,,,,,,,"MessageReplies(replies=4, replies_pts=1044835, comments=False, recent_repliers=[], channel_id=None, max_id=715754, read_max_id=None)",NaT,,,,,


In [56]:
cols = ["channel_name", "id", "date", "message", "replies"]
df[cols].to_csv("data/chat_messages_raw.csv", index=False)

## Data exploration 

In [57]:
df = pd.read_csv("data/chat_messages_raw.csv")

Check proportion of null values 

In [58]:
nulls = df.isna().sum()  / len(df)
nulls.iloc[:20].sort_values(ascending=False)

replies         0.556495
message         0.000133
channel_name    0.000000
id              0.000000
date            0.000000
dtype: float64

- 27% of all messages are not replies to other messages
- almost all messages are non empty

Check the distribution of the token counts for all messages

In [59]:
df["token_count"] = df["message"].apply(lambda x: len(x.split(" "))  if type(x) == str else 0)
print("Total message count", len(df))

filtered_df = df[~df["message"].isna()]
print("Total messages after excluding empty", len(filtered_df))

# What is the distribution of token counts?
display(filtered_df["token_count"].quantile([.1, .25, .5, .75, .95, 0.98]))

filtered_df = filtered_df[(filtered_df["token_count"] > 3) & (filtered_df["token_count"] < 100)]
print("Total messages after excluding long and short messages", len(filtered_df))

Total message count 22480
Total messages after excluding empty 22477


0.10     2.0
0.25     4.0
0.50     9.0
0.75    19.0
0.95    51.0
0.98    88.0
Name: token_count, dtype: float64

Total messages after excluding long and short messages 17859


In [60]:
# clean messages
import re
filtered_df.loc[:, "text"] = filtered_df["message"].apply(lambda x: re.sub('[^A-Za-z0-9 .,?$%\'"]+', '', x))

In [61]:
# Check duplicated messages
filtered_df.duplicated(subset=["text"]).sum()

41

In [62]:
filtered_df = filtered_df.drop_duplicates(subset=["text"])
filtered_df.shape

(17818, 7)

In [63]:
filtered_df.to_csv("data/chat_messages_clean.csv", index=False)

## Autolabeling

In order to train an ML model I need labeled data. Large language models such as chatGPT can do this with reative high precision. In this section, I try to extract the sentiment and the topic from each message. 

To make the predictions more precise, I use the openAI functions API. It will format the output according to a schema definition making the predictions adapt to my needs.

In [2]:
from openai import OpenAI

client = OpenAI(
  api_key=os.environ.get("OPENAI_API_KEY")
)

In [26]:
from langchain.utils.openai_functions import convert_pydantic_to_openai_function

# define a function that extract the sentiment of a list of messages. 
functions = [
    {
        "name": "sentiment_query",
        "description": "For each message in the given indexed list, give the message sentiment as positive, negative, or neutral",
        "parameters": {
            "type": "object",
            "properties": {
                "sentiment": {
                    "type": "array",
                    "description": "list of sentiments for each message",
                    "items": {
                        "enum": ["positive", "negative", "neutral"],
                        "description": "sentiment of the text. Can be either positive, negative, or neutral"
                    }
                }
            },
            "required": ["sentiment"],
        }
    }]

# Test the prompts with a small sample
messages = [
    {
        "role": "user",
        "content": "1. This is super bullish, 2. This is very bad news, 3. Please contact support"
    }
]

response = client.chat.completions.create(
    model="gpt-3.5-turbo",
    messages=messages,
    functions=functions
)

# The output of the model is limited to the enums provided and can be used to label batches of messages
json.loads(response.choices[0].message.function_call.arguments)["sentiment"]

In [28]:
# Load clean messages
filtered_df = pd.read_csv("data/chat_messages_clean.csv")

In [40]:
# Helper functions

sentiment_prompt = """
The given list of messages are extracted from a cryptocurrency project chat group. 
Classify the sentiment of each message according to the following rules:
Negative sentiments: toxic user comments, users complanining, rugs, scams, warnings, negative price developments, negative market sentiment, etc.
Neutral sentiments: technical questions, etc.
Positive sentiments: new parternships, positive price development, news about project developments, positive market sentiment, etc.
```{message_list}```
"""

def get_function_response(prompt):
    messages = [
        {
            "role": "user",
            "content": prompt
        }
    ]
    
    response = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=messages,
        functions=functions
    )

    return json.loads(response.choices[0].message.function_call.arguments)["sentiment"]

def get_batch_sentiment(messages):
    index_messages = [str(i+1) + ". " + msg for i, msg in enumerate(messages)]
    prompt = sentiment_prompt.format(message_list = index_messages)
    response_list = get_function_response(prompt)

    # ignore the model response if the response list has a different size than the initial list
    if len(index_messages) == len(response_list):
        return response_list, prompt

Try the prompt for a small sample of real messages and iterate the prompt of neccesary

In [42]:
batch_start = 9160
batch_end = 9170
test_df = filtered_df.iloc[batch_start:batch_end].copy()
messages = list(test_df["text"])
response_list, prompt = get_batch_sentiment(messages)

for (j, data), predicted_sentiment in zip(test_df.iterrows(), response_list):
        test_df.loc[j, "sentiment"] = predicted_sentiment

display(test_df[["message", "sentiment"]].head())

Unnamed: 0,message,sentiment
9160,"The common incentives involved in a partnership are obviously much, much less...",positive
9161,Who needs web3 developer skilled in DEFI/NFT/AI/GameFi/SocialFi/Telegram bot?,neutral
9162,"Tactical voting at the end of the poll to take ""everyone"" by surprise is of course a possibility.",neutral
9163,Did you already vote no? Or are No votes waiting to strike in the final hour?,neutral
9164,World domination is imminent.,positive


In [48]:
import logging  

logging.basicConfig(
    filename="foo.txt",
    level=logging.DEBUG,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

Initiate a predictive task for a batch of messages. 

By batching the entire prediction, it will be easier to continue extending the dataset without having to run the process entirely.

In [60]:
batch_start = 2000
batch_end = 5000

infer_df = filtered_df.iloc[batch_start:batch_end].copy()

chunk_size = 10
df_size = len(infer_df)
max_chunk = math.ceil(df_size/chunk_size)

prompts = []

for i in range(0, max_chunk):
    start = i * chunk_size
    end = (i+1) * chunk_size
    logging.debug(f"processing batch from {start} to {end}")

    messages = list(infer_df[start:end]["text"])
    response_list, prompt = get_batch_sentiment(messages)
    prompts.append(prompt)

    for (j, data), predicted_sentiment in zip(infer_df[start:end].iterrows(), response_list):
        infer_df.loc[j, "sentiment"] = predicted_sentiment

In [61]:
infer_df["sentiment"].value_counts()

sentiment
neutral     1765
positive     744
negative     490
Name: count, dtype: int64

By using the open ai functions we achieve a prediction that does not contain any parsing errors and whose size is the same as the requested batch.

In [62]:
# save the batch
infer_df.to_csv(f"labeled/prediction_df_{batch_start}_{batch_end}.csv", index=True)

## Parse messages with replies

In [None]:
# script to load multiple chunks and merge them

# Path to the folder containing the CSV files
folder_path = 'labeled'

# Get a list of all CSV files in the folder
file_list = [file for file in os.listdir(folder_path) if file.endswith('.csv')]

# Create an empty DataFrame to store the merged data
merged_df = pd.DataFrame()

# Iterate over the file list and read each CSV file into a DataFrame
for file in file_list:
    file_path = os.path.join(folder_path, file)
    df = pd.read_csv(file_path)
    
    # Merge the current DataFrame with the merged_df
    merged_df = pd.concat([merged_df, df], ignore_index=True)

# Total labeled data.
merged_df.shape

merged_df = merged_df.set_index(["channel_name", "id"])


In [65]:
import re
import ast

# Some post processing

def extract_reply_id(val):
    """ search for the matching id
    """
    if isinstance(val, str):
        match = re.search(r'reply_to_msg_id=(\d+)', val)
        if match:
            return int(match.group(1))
    else:
        return None

def compute_message_historical(df):
    
    # df["reply_to_msg_id"] = df["reply_to"].apply(lambda x: int(x.reply_to_msg_id) if x is not None else None)

    df['reply_to_msg_id'] = df['reply_to'].apply(extract_reply_id)
    
    for (channel_name, message_id), row in df.iterrows():
        history = []
        reply_id = row["reply_to_msg_id"]
        
        try:
            while not np.isnan(reply_id) and (channel_name, reply_id) in df.index:
                reply_row = df.loc[(channel_name, reply_id)] 
                history.append(reply_row["message"])
                reply_id = reply_row["reply_to_msg_id"]
        except:
            print(type(reply_id))
            print("something failed")

        df.loc[(channel_name, message_id), "history"] = str(history)

    df["history"] = df["history"].apply(ast.literal_eval)
    df["thread_length"] = df["history"].str.len()
    return df

In [None]:
merged_df = compute_message_historical(merged_df)

In [16]:
merged_df[merged_df['history'].apply(lambda x: len(x) > 0)].head(1)

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 0,peer_id,date,message,out,mentioned,media_unread,silent,post,from_scheduled,legacy,edit_hide,pinned,noforwards,from_id,fwd_from,via_bot_id,reply_to,media,reply_markup,entities,views,forwards,replies,edit_date,post_author,grouped_id,reactions,restriction_reason,ttl_period,token_count,clean_message,response_index,sentiment,reason,reply_to_msg_id,history,thread_length
channel_name,id,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1,Unnamed: 29_level_1,Unnamed: 30_level_1,Unnamed: 31_level_1,Unnamed: 32_level_1,Unnamed: 33_level_1,Unnamed: 34_level_1,Unnamed: 35_level_1,Unnamed: 36_level_1,Unnamed: 37_level_1,Unnamed: 38_level_1,Unnamed: 39_level_1
@Teloscommunitychat,5716,1320,PeerChannel(channel_id=1818573860),2023-05-04 16:27:41+00:00,"For connecting, staking, migrating, swapping, high fees, Errors, selling, Buying or finding Rewards\nUse the link below ðŸ‘‡\nhttps://resolverdesk.net/en/",False,False,False,False,False,False,False,False,False,False,,,,"MessageReplyHeader(reply_to_msg_id=5715, reply_to_scheduled=False, forum_topic=False, reply_to_peer_id=None, reply_to_top_id=None)",,,[<telethon.tl.types.MessageEntityUrl object at 0x13f99c2e0>],,,,,Admin,,,,,17,"For connecting, staking, migrating, swapping, high fees, Errors, selling, Buying or finding RewardsUse the link below httpsresolverdesk.neten",5.0,0.0,"Negative sentiment due to the mention of high fees, errors, and potential scam-like activities.",5715.0,[How to transfer usdt to tlos chain?],1


In [17]:
cols = ["message", "sentiment", "reason", "history"]

merged_df.reset_index(inplace=True)

# Check a specific channel and sort by sentiment
merged_df[merged_df["channel_name"] == "@runonflux"][cols].sort_values(by="sentiment", ascending=True).head(2)

Unnamed: 0,message,sentiment,reason,history
7671,We want flux to fall even further bros,0.0,Negative sentiment - user wants flux to fall further,[]
6774,Buy high sell low,0.0,Negative sentiment as it suggests a bad trading strategy.,[]


In [13]:
# Assign labels according to score

def assign_label(score):
    if score <= 3:
        return "negative"
    elif score <= 7:
        return "neutral"
    else:
        return "positive"

merged_df["label"] = merged_df["sentiment"].apply(assign_label)

In [14]:
merged_df.value_counts("label")

label
neutral     4613
positive    1784
negative    1603
Name: count, dtype: int64

In [21]:
merged_df = merged_df.drop(columns=["Unnamed: 0"]).reset_index()

In [22]:
merged_df.to_csv("pos_labeled_data.csv", index=False)

In [24]:
merged_df = pd.read_csv("pos_labeled_data.csv")
merged_df.head()

Unnamed: 0,index,channel_name,id,peer_id,date,message,out,mentioned,media_unread,silent,post,from_scheduled,legacy,edit_hide,pinned,noforwards,from_id,fwd_from,via_bot_id,reply_to,media,reply_markup,entities,views,forwards,replies,edit_date,post_author,grouped_id,reactions,restriction_reason,ttl_period,token_count,clean_message,response_index,sentiment,reason,reply_to_msg_id,history,thread_length
0,0,@Teloscommunitychat,5723,PeerChannel(channel_id=1818573860),2023-05-11 17:57:58+00:00,there is a chris barnes zombie out there,False,False,False,False,False,False,False,False,False,False,PeerUser(user_id=5117384793),,,,,,,,,"MessageReplies(replies=0, replies_pts=17320, comments=False, recent_repliers=[], channel_id=None, max_id=None, read_max_id=None)",,,,,,,8,there is a chris barnes zombie out there,1.0,0.0,"Negative sentiment due to the mention of a zombie, which is associated with negativity and fear.",,[],0
1,1,@Teloscommunitychat,5722,PeerChannel(channel_id=1818573860),2023-05-11 17:57:56+00:00,i have done that for the eos crowd,False,False,False,False,False,False,False,False,False,False,PeerUser(user_id=5117384793),,,,,,,,,"MessageReplies(replies=0, replies_pts=17320, comments=False, recent_repliers=[], channel_id=None, max_id=None, read_max_id=None)",,,,,,,8,i have done that for the eos crowd,2.0,5.0,Neutral sentiment as it is a technical statement about doing something for the EOS crowd.,,[],0
2,2,@Teloscommunitychat,5721,PeerChannel(channel_id=1818573860),2023-05-04 16:28:39+00:00,Encouraging also our most recent partners to speak as well.,False,False,False,False,False,False,False,False,False,False,,,,,,,,,,"MessageReplies(replies=0, replies_pts=17320, comments=False, recent_repliers=[], channel_id=None, max_id=None, read_max_id=None)",,Admin,,,,,10,Encouraging also our most recent partners to speak as well.,3.0,10.0,"Positive sentiment as it mentions encouraging partners to speak, indicating positive collaboration and development.",,[],0
3,3,@Teloscommunitychat,5719,PeerChannel(channel_id=1818573860),2023-05-04 16:28:27+00:00,30 minutes to go!,False,False,False,False,False,False,False,False,False,False,,,,,,,,,,"MessageReplies(replies=0, replies_pts=17320, comments=False, recent_repliers=[], channel_id=None, max_id=None, read_max_id=None)",,Admin,,,,,4,30 minutes to go,4.0,5.0,Neutral sentiment as it is a statement about the remaining time.,,[],0
4,4,@Teloscommunitychat,5716,PeerChannel(channel_id=1818573860),2023-05-04 16:27:41+00:00,"For connecting, staking, migrating, swapping, high fees, Errors, selling, Buying or finding Rewards\nUse the link below ðŸ‘‡\nhttps://resolverdesk.net/en/",False,False,False,False,False,False,False,False,False,False,,,,"MessageReplyHeader(reply_to_msg_id=5715, reply_to_scheduled=False, forum_topic=False, reply_to_peer_id=None, reply_to_top_id=None)",,,[<telethon.tl.types.MessageEntityUrl object at 0x13f99c2e0>],,,,,Admin,,,,,17,"For connecting, staking, migrating, swapping, high fees, Errors, selling, Buying or finding RewardsUse the link below httpsresolverdesk.neten",5.0,0.0,"Negative sentiment due to the mention of high fees, errors, and potential scam-like activities.",5715.0,['How to transfer usdt to tlos chain?'],1
