In [5]:
# Parses and organizes all the messages in telegram account.

In [1]:
from telethon import TelegramClient
from telethon.tl.functions.messages import GetHistoryRequest
from telethon.tl.types import User, PeerUser
from telethon.errors import FloodWaitError
import asyncio
import time 
import openai
from dotenv import load_dotenv
import json
import os
import sys

In [125]:
dotenv_path = ".env"
load_dotenv(dotenv_path=dotenv_path)

api_id = os.getenv('TELEGRAM_API_ID')
api_hash = os.getenv('TELEGRAM_HASH_ID')
phone_number = os.getenv('PHONE_NUMBER')
my_telegram_id = os.getenv('my_telegram_id')
session_name = "telegram_parser"
client = TelegramClient(session_name, api_id, api_hash)


In [9]:
openai.api_key = os.getenv('OPENAI_API_KEY')

def categorize_message(message):
    prompt = f'Classify the following message as either "context" or "response":\n\n"{message}"\n\nAnswer with one word only.'
    
    response = openai.Completion.create(
        engine="text-davinci-003",  # Choose a suitable engine
        prompt=prompt,
        max_tokens=1,
        n=1,
        stop=None,
        temperature=0.5,
    )
    
    return response.choices[0].text.strip()

In [10]:
def optimize_messages(messages):
      """
      Function which uses a set of tuning algorithms to meet the criteria of optimized data for future models.
      """

      # TODO: Include only messages in ukrainian language


      # TODO: Put todos below in order of priority 
      # For each of the points below, if true: add one, if false: minus one
      # TODO: Add detection system for context and response:

      # TODO: If the message contains question mark in the end of the message, it is a context
      # TODO: The first message of the new day is probably a context.
      # TODO: If there are few messages in a row from user, concatenate them into one message.
      # TODO: If there is a significant time gap (e.g., several hours) between messages, the first message after the gap might be a context.
      # TODO: Look for specific keywords or phrases that typically indicate a context (e.g., "What do you think about...", "Can you explain...", "Why is...").
      # TODO: If a message is a direct reply to a previous context message, it is likely a response.
      # TODO: Short messages that directly follow a context are likely responses.
      # TODO: If the same user repeatedly sends messages ending with question marks or messages at the start of the day, those are likely contexts.
       

In [11]:
async def get_total_messages(session_name, api_id, api_hash, phone_number, only_personal=True):
      """
      Shows a total amount of messages that your account has. 
      """
      total_messages = 0

      async with TelegramClient(session_name, api_id, api_hash) as client:
            client.start(phone_number)
            dialogs = await client.get_dialogs()

            if only_personal:
                  dialogs = [dialog for dialog in dialogs if isinstance(dialog.entity, User)]
                  print(f"Total dialogs: {len(dialogs)}")
            for dialog in dialogs:
                  async for message in client.iter_messages(dialog.entity, limit=None):
                        total_messages += 1
            print(f"Total messages: {total_messages}")
            client.disconnect()
            return total_messages

In [12]:
# Takes some time to run
#%time
#total_messages = await get_total_messages(session_name, api_id, api_hash, phone_number, only_personal=True)

In [114]:
async def extract_message_info(messages):
      extracted_dialog = []
      last_message=None

      for message in messages:
            try: 
                  text = message.message.strip() if message.message else ""
                  sender = message.from_id if message.from_id else (await client.get_entity(message.peer_id)).id
                  sender = sender.user_id if isinstance(sender, PeerUser) else sender # Deletes PeerUser classes and keeps only int id

                  #sent_by_me = my_telegram_id == sender   
                  date = message.date 
            except FloodWaitError as e:
                  print(f"FloodWaitError: sleeping for {e.seconds} seconds.")
                  await asyncio.sleep(e.seconds)
                  continue 
                  
            if text:
                  if last_message and sender == last_message[1]:
                        last_message[0] = " ".join([last_message[0], text])
                  else:
                        if last_message:
                              extracted_dialog.append(last_message)
                        last_message = [text, sender, date]

      if last_message:
            extracted_dialog.append(last_message)
      
      return extracted_dialog

In [115]:
import pandas as pd

async def parse_data(threshold: int =50, 
                     message_limit=None,
                      dialogs_limit: int = 100,
                      verbose=1,
                      top_chats_first: bool = False):
    """
    Parses all the messages in the profile.
    
    Args:
        threshold: int
            The minimum amount of messages in a dialog to be processed.
        message_limit: int
            The maximum amount of messages to be processed in a dialog.
        dialogs_limit: int
            The maximum amount of dialogs to be processed.
        verbose: int
            The amount of output to be printed.
        top_chats_first: bool
            Whether to process chats with most messages first.

    Returns:
        pd.DataFrame
            The parsed data.
    """
    async with client:
        # TODO: Implement checkpoint system
            # Don't forget about assigning filtered_dialogs
            # You can implement in this structure: filtered_dialogs, itered_dialogs.


        dialogs = await client.get_dialogs()
        dialogs = [dialog for dialog in dialogs if isinstance(dialog.entity, User)]
        dialogs = [dialog for dialog in dialogs if not dialog.entity.bot]
        dialogs = dialogs[:dialogs_limit]
        if verbose: 
            total = 0 
            print(f"Total dialogs: {len(dialogs)}")
        filtered_dialogs = pd.DataFrame(columns=["Message", "Sender","Date"])
        for dialog in dialogs[:dialogs_limit]:
            start_time = time.time() if verbose else None
            messages_info = []
            async for message in client.iter_messages(dialog.entity, limit=message_limit, wait_time=10):
                messages_info.append(message)

            total_messages = len(messages_info)
            if total_messages > threshold:
                extracted_dialog = await extract_message_info(messages_info)
                filtered_dialogs = pd.concat([filtered_dialogs, pd.DataFrame(extracted_dialog, columns=["Message", "Sender", "Date"])])
                if verbose: 
                    total += 1
                    run_time = time.time() - start_time
                    print(f"Dialogs processed: {total}, left: {len(dialogs) - total}. Run time: {run_time:.2f} seconds") 
        return filtered_dialogs

## **If you have >10k messages, it will take a long time to run. Hope you are patient.**

In [132]:
%time
async def main():
    if os.path.exists(f"parsers\{session_name}.session-journal"):
        print(f"Session {session_name} exists. Please delete it and restart the script. Or change the session name in the script.")
        sys.exit()
    else:
        await client.start(phone_number)
        print(f"Connecting with {client.session}")
        data = await parse_data(message_limit=None, dialogs_limit=None, verbose=1)
        data = pd.DataFrame(data, columns=["Message", "Sender", "Date"])
        data["Sent_by_me"] = int(my_telegram_id) == data["Sender"]
        return data
        client.disconnect()
        print("DONE")

data = await main()
#client.disconnect

CPU times: user 1 µs, sys: 1e+03 ns, total: 2 µs
Wall time: 5.25 µs
Connecting with <telethon.sessions.sqlite.SQLiteSession object at 0x11056a580>
Total dialogs: 161


  filtered_dialogs = pd.concat([filtered_dialogs, pd.DataFrame(extracted_dialog, columns=["Message", "Sender", "Date"])])


Dialogs processed: 1, left: 160. Run time: 4.98 seconds
Dialogs processed: 2, left: 159. Run time: 17.58 seconds


Server closed the connection: [Errno 54] Connection reset by peer


FloodWaitError: sleeping for 114 seconds.
FloodWaitError: sleeping for 114 seconds.
Dialogs processed: 3, left: 158. Run time: 1344.39 seconds
FloodWaitError: sleeping for 117 seconds.
FloodWaitError: sleeping for 116 seconds.
FloodWaitError: sleeping for 111 seconds.
FloodWaitError: sleeping for 108 seconds.
FloodWaitError: sleeping for 113 seconds.
FloodWaitError: sleeping for 105 seconds.
FloodWaitError: sleeping for 112 seconds.
FloodWaitError: sleeping for 110 seconds.
Dialogs processed: 4, left: 157. Run time: 2750.08 seconds
FloodWaitError: sleeping for 99 seconds.
Dialogs processed: 5, left: 156. Run time: 274.20 seconds
FloodWaitError: sleeping for 98 seconds.
Dialogs processed: 6, left: 155. Run time: 217.82 seconds
FloodWaitError: sleeping for 62 seconds.
FloodWaitError: sleeping for 113 seconds.
Dialogs processed: 7, left: 154. Run time: 746.40 seconds
Dialogs processed: 8, left: 153. Run time: 12.89 seconds
FloodWaitError: sleeping for 81 seconds.
Dialogs processed: 9, lef

  filtered_dialogs = pd.concat([filtered_dialogs, pd.DataFrame(extracted_dialog, columns=["Message", "Sender", "Date"])])


Dialogs processed: 35, left: 126. Run time: 11.89 seconds
Dialogs processed: 36, left: 125. Run time: 7.65 seconds
Dialogs processed: 37, left: 124. Run time: 6.64 seconds


In [133]:
data = data.to_csv(r'full_telegram_data.csv', index=False)

In [137]:
# Are there NAN values?
data.isna().sum()

Message       0
Sender        0
Date          0
Sent_by_me    0
dtype: int64