# Telegram Channel Scraper 

This **Notebook-as-Tool** allows you to:

1.   retrieve messages from Telegram channels, 
2.   gather data about contributors (message authors).

**How to Use:**
1. For running or adapting this Colab Notebook you need to create a copy in you Google drive: **File → Save a copy in Drive**. I will be stored in a folder ```Colab Notebooks```. Open this file with Google Colab and run the cells consecutively by pressing the **Play** button or pushing **shift+enter**.
2. Using the Telegram API requires authentication. For getting cretentials you have to sign up for Telegram (this step might require a smartphone) and create a so-called application following this link: https://my.telegram.org/auth. (Telegram developer documentation on obtaining API credentials: https://core.telegram.org/api/obtaining_api_id.)
3.  Since API ID, API hash and phone number are sensitive data they will be stored in a config file on Google Drive:
  - Download the Telegram config template (http://tiny.cc/telegram-config-template)
  -  Add your information using your preferred code editor like [SublimeText](https://www.sublimetext.com), [Atom](https://atom.io/), [Brackets](http://brackets.io/). 
  - Upload to your Google Drive. I assume that you rename your file to ```telegram_config.ini``` and place it a folder named ```Colab Data/Configs```.

**Important notes:**
- Code is hidden in the background of Colab forms. For viewing and editing the code **double click** cell or select  **View → Show/hide code**
- Data will be stored in Google Drive in the folder ```Colab Data```. A connection to your drive will be authenticated when running setup code cells. This is temporary and only your current notebook will be conncted to your drive. The connection will be revoked when the notebook is terminated or by selecting **Runtime → Factory reset runtimme**.

**Credits:** This notebook was written by Marcus Burkhardt. The code is based on and extends the 4cat Telegram data source. The original code can be found here: https://github.com/digitalmethodsinitiative/4cat/blob/master/datasources/telegram/search_telegram.py. It uses the Telethon library (https://pypi.org/project/Telethon/) for interacting with the Telegram API.

In [None]:
#@title Setup 1: Mount Google Drive for Loading and Storing Data
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
#@title Setup 2: Install and Load Required Libraries and Run Setup Procedures

try:
  from telethon.sync import TelegramClient
  from telethon.tl.functions.users import GetFullUserRequest
  from telethon.tl.types import User, Message, PeerChannel, PeerChat, PeerUser
except: 
  !pip install telethon
  from telethon.sync import TelegramClient
  from telethon.tl.functions.users import GetFullUserRequest
  from telethon.tl.types import User, Message, PeerChannel, PeerChat, PeerUser

import os
import re
import json
import time
import asyncio
import traceback
import configparser 
import pandas as pd
from tqdm.notebook import tqdm
from pathlib import Path

print('Successful installed and loaded libraries')

# custom_await workaround
import nest_asyncio
nest_asyncio.apply()
custom_await = lambda x: asyncio.get_event_loop().run_until_complete(x)

# Defining path variable for config path
config_path = os.path.join("gdrive", "MyDrive", "Colab_Data", "Configs")
if not os.path.isdir(config_path):
  os.makedirs(config_path)

# Defining path variable for sessions path
sessions_path = os.path.join(config_path, 'Sessions')
if not os.path.isdir(sessions_path):
  os.makedirs(sessions_path)

# Defining path variable for data path
data_path = os.path.join("gdrive", "MyDrive", "Colab_Data", "Data", "Telegram")
if not os.path.isdir(data_path):
  os.makedirs(data_path)

try: 
  # Reading config and setting configuration values
  config = configparser.ConfigParser()
  config.read(os.path.join(config_path, "telegram_config.ini"))

  api_id = int(config['Telegram']['api_id'])
  api_hash = str(config['Telegram']['api_hash'])
  phone = config['Telegram']['phone']

  # Check if API credentials were successfully parsed
  if api_id and api_hash and phone:
    print('Successful parsed config data.')

except:
  print('Error reading or parsing the config.')


In [None]:
#@title Setup 3: Definition of Core and Support Functions Used by the Tool(s)
def convert_to_int(value, default=0):
    try:
        return int(value)
    except (ValueError, TypeError):
        return default

def init_client():
  """
  First initialization of Telegram client
  :return bool: True if initialization successful.
  """
  try:
    client = custom_await(get_client())
    custom_await(client.disconnect())
    return True
  except:
    return False

def process(parameters):
  """
  Process request to Telegram
  :param dict parameters:  Query parameters.
  :return tuple: Tuple with Dataframe of messages, List of raw messages, Dataframe of userdata.
  """
  client = custom_await(get_client())

  # retrieve n (parameters["items"]) messages for quer[y|ies] (parameters["query"]) 

  queries = [query.strip() for query in parameters.get("query", "").split(",")]
  max_items = convert_to_int(parameters.get("items", 10), 10)

  # userinfo needs some work before it can be retrieved, something with async method calls
  userinfo = parameters.get("scrape-userinfo", False)
  
  try:
      if userinfo:
        posts, raw, users = custom_await(gather_posts(client, queries, max_items, userinfo))
        custom_await(client.disconnect())
        return posts, raw, users
      else:
        posts, raw = custom_await(gather_posts(client, queries, max_items, userinfo))
        custom_await(client.disconnect())
        return posts, raw
  except Exception as e:
      print('Error while gathering posts')
      custom_await(client.disconnect())  


async def get_client(client=None, sessions_path=sessions_path):
    """
    Create or start Telegram Client
    :param TelegramClient client:  Telegram Client
    :param str sessions_path:  Path where client session will be stored
    :return TelegramClient: Created or started Telegram Client.
    """
    eventloop = None
    eventloop = asyncio.new_event_loop()
    session_path = os.path.join(sessions_path, phone.strip('+'))
    try:
        client = TelegramClient(str(session_path), api_id, api_hash, loop=eventloop)
        #custom_await client.start(phone)
        custom_await(client.start(phone))
        return client
    except:
        print("Error connecting to the Telegram API with provided credentials.")
        if client and hasattr(client, "disconnect"):
            #custom_await client.disconnect()
            custom_await(client.disconnect())
        return None

async def gather_entity(client, entity):
  """
  Gather data about entities like users or channels.
  :param TelegramClient client:  Telegram Client
  :param PeerObject entity:  PeerChannel or Peer User
  :return dict: Dictionary with user data.
  """
  if entity:
    try:
      #data = custom_await client.get_entity(entity)
      data = custom_await(client.get_entity(entity))
      return data.to_dict()
    except:
      print("Error for entity: {}".format(entity))
      return None
  else:
    return None

async def gather_posts(client, queries, max_items, get_full_userinfo):
    """
    Gather messages for each entity for which messages are requested
    :param TelegramClient client:  Telegram Client
    :param list queries:  List of entities to query (as string)
    :param int max_items:  Messages to scrape per entity
    :param bool userinfo:  Whether to scrape detailed user information
    rather than just the ID
    :return tuple:  Tuple with Dataframe of messages, List of raw messages, Dataframe of userdata.
    """
    
    assert (max_items > 0),"max_items needs to big larger than 0."

    raw_posts = []
    posts = []

    # Rerieve messages for entities (e.g. Channels)
    for query in queries:
      print("Querying {} messages for entity '{}'".format(max_items, query))
      i = 0
      query_posts = []
      raw_query_posts = []
      try:      
        async for message in client.iter_messages(entity=query):
          raw_query_posts.append(message)        
          parsed_message = import_message(client, message, query)
          query_posts.append(parsed_message)
          i += 1
          if i >= max_items:
            break
      except ValueError as e:
        print("Could not scrape entity '{}'".format(query))
      print("Retrieved {} posts for entity '{}'".format(len(query_posts), query))
      
      posts += list(reversed(query_posts))
      raw_posts += list(reversed(raw_query_posts))
    # Retrieve user data
    if get_full_userinfo:
      userdata = []
      ids = []
      for item in tqdm(raw_posts, desc="Retrieving user data"):
        if hasattr(item, 'to_id') and item.to_id:
          if hasattr(item.to_id, 'channel_id') and item.to_id.channel_id and item.to_id.channel_id not in ids:
            ids.append(item.to_id.channel_id)
            userdata.append(custom_await(gather_entity(client, item.to_id)))
          if hasattr(item.to_id, 'group_id') and item.to_id.group_id and item.to_id.group_id not in ids:
            #untested
            print('Found group_id in an untested context. Check!')
            ids.append(item.to_id.group_id)
            userdata.append(custom_await(gather_entity(client, item.to_id)))
        if hasattr(item, 'fwd_from') and item.fwd_from:
          if hasattr(item.fwd_from, 'from_id'):
            if hasattr(item.fwd_from.from_id, 'channel_id') and item.fwd_from.from_id.channel_id:
              if item.fwd_from.from_id.channel_id not in ids:
                ids.append(item.fwd_from.from_id.channel_id)
                userdata.append(custom_await(gather_entity(client, item.fwd_from.from_id)))
            if hasattr(item.fwd_from.from_id, 'user_id') and item.fwd_from.from_id.user_id:
              if item.fwd_from.from_id.user_id not in ids:
                ids.append(item.fwd_from.from_id.user_id)
                userdata.append(custom_await(gather_entity(client, item.fwd_from.from_id)))
          elif hasattr(item.fwd_from, 'from_name') and item.fwd_from.from_name:
            if item.fwd_from.from_name not in ids:
              ids.append(item.fwd_from.from_name)
              userdata.append(custom_await(gather_entity(client, item.fwd_from)))
          else:
              print('Other type of fwd_from not yet handeled.')
              print(item.fwd_from)
        if hasattr(item, 'from_id') and item.from_id:
          if hasattr(item.from_id, 'user_id') and item.from_id.user_id:
            if item.from_id.user_id not in ids:
              ids.append(item.from_id.user_id)
              userdata.append(custom_await(gather_entity(client, item.from_id)))
              pass
          
          else:
            print('Item has from_id: stored in from_id_example.')
            print(item.from_id)
      
      
      posts = pd.json_normalize(posts)
      raw_posts = [post.to_dict() for post in raw_posts]
      userdata = [user for user in userdata if user]
      userdata = pd.json_normalize(userdata)

      usermap = {}
      for user in userdata.iterrows():
          usermap[user[1]['id']] = user[1]['username'] 
      posts['author_forwarded_from_resolved'] = posts['author_forwarded_from'].apply(lambda x: usermap.get(x, x))
      return posts, raw_posts, userdata
    else:
      posts = pd.json_normalize(posts)
      raw_posts = [post.to_dict() for post in raw_posts]
      return posts, raw_posts

def import_message(client, message, entity):
    """
    Convert Message object to 4CAT-ready data object
    :param TelegramClient client:  Telethon TelegramClient instance
    :param Message message:  Message to parse
    :return dict:  4CAT-compatible item object
    """
    thread = message.to_id

    # determine thread ID (= entity ID)
    if type(thread) == PeerChannel:
        thread_id = thread.channel_id
    elif type(thread) == PeerChat:
        thread_id = thread.chat_id
    elif type(thread) == PeerUser:
        thread_id = thread.user_id
    else:
        thread_id = 0
    
    # determine username
    # API responses only include the user *ID*, not the username, and to
    # complicate things further not everyone is a user and not everyone
    # has a username. If no username is available, try the first and
    # last name someone has supplied
    
    
    if message.from_id:
        user_id = message.from_id
        username = None
        user_is_bot = None
    elif message.post_author:
        user_id = message.post_author
        username = message.post_author
        user_is_bot = False
    else:
        user_id = "stream"
        username = None
        user_is_bot = False

    # determine media type
    # these store some extra information of the attachment in
    # attachment_data. Since the final result will be serialised as a csv
    # file, we can only store text content. As such some media data is
    # serialised as JSON.
    attachment_type = get_media_type(message.media)
    if attachment_type == "contact":
        attachment = message.contact
        attachment_data = json.dumps({property: getattr(attachment, property) for property in 
                                      ("phone_number", "first_name", "last_name", "vcard", "user_id")})

    elif attachment_type == "document":
        # videos, etc
        # This could add a separate routine for videos to make them a
        # separate type, which could then be scraped later, etc
        attachment_type = message.media.document.mime_type.split("/")[0]
        attachment_data = ""

    elif attachment_type == "game":
        # there is far more data in the API response for games but this
        # seems like a reasonable number of items to include
        attachment = message.game
        attachment_data = json.dumps(
            {property: attachment[property] for property in ("id", "short_name", "title", "description")})

    elif attachment_type in ("geo", "geo_live"):
        # untested whether geo_live is significantly different from geo
        attachment_data = "%s %s" % (message.geo.lat, message.geo.long)

    elif attachment_type == "invoice":
        # unclear when and where this would be used
        attachment = message.invoice
        attachment_data = json.dumps(
            {property: attachment[property] for property in ("title", "description", "currency", "total_amount")})

    elif attachment_type == "photo":
        # we don't actually store any metadata about the photo, since very
        # little of the metadata attached is of interest. Instead, the
        # actual photos may be downloaded via a processor that is run on the
        # search results
        attachment_data = ""

    elif attachment_type == "poll":
        # unfortunately poll results are only available when someone has
        # actually voted on the poll - that will usually not be the case,
        # so we store -1 as the vote count
        attachment = message.poll
        options = {option.option: option.text for option in attachment.poll.answers}
        attachment_data = json.dumps({
            "question": attachment.poll.question,
            "voters": attachment.results.total_voters,
            "answers": [{
                "answer": options[answer.option],
                "votes": answer.voters
            } for answer in attachment.results.results] if attachment.results.results else [{
                "answer": options[option],
                "votes": -1
            } for option in options]
        })

    elif attachment_type == "venue":
        # weird
        attachment = message.venue
        attachment_data = json.dumps({**{"geo": "%s %s" % (attachment.geo.lat, attachment.geo.long)}, **{
            {property: attachment[property] for property in ("title", "address", "provider", "venue_id", "venue_type")}}})

    elif attachment_type == "url":
        # easy!
        if hasattr(message.web_preview, "url"):
            attachment_data = message.web_preview.url
        else:
            attachment_data = ""

    else:
        attachment_data = ""

    # was the message forwarded from somewhere and if so when?
    forwarded = None
    forwarded_timestamp = None
    if message.fwd_from:
        forwarded_timestamp = message.fwd_from.date.timestamp()
        if hasattr(message.fwd_from, 'from_id') and message.fwd_from.from_id:
            if  hasattr(message.fwd_from.from_id, 'channel_id'):
                forwarded = message.fwd_from.from_id.channel_id
            elif hasattr(message.fwd_from.from_id, 'user_id'):
                forwarded = message.fwd_from.from_id.user_id
            else:
                forwarded = message.fwd_from.from_id
                print(forwarded)
        elif hasattr(message.fwd_from, 'channel_id') and message.fwd_from.channel_id:
            forwarded = message.fwd_from.channel_id
        elif hasattr(message.fwd_from, 'post_author') and message.fwd_from.post_author:
            forwarded = message.fwd_from.post_author
        elif hasattr(message.fwd_from, 'from_name') and message.fwd_from.from_name:
            forwarded = message.fwd_from.from_name
        else:
            from_id = "stream"
            
    msg = {
        "id": message.id,
        "thread_id": thread_id,
        "search_entity": entity,
        "author": user_id,
        "author_name": username,
        "author_is_bot": user_is_bot,
        "author_forwarded_from": forwarded,
        "subject": "",
        "body": message.message,
        "reply_to": message.reply_to_msg_id,
        "views": message.views,
        "timestamp": int(message.date.timestamp()),
        "timestamp_edited": int(message.edit_date.timestamp()) if message.edit_date else None,
        "timestamp_forwarded_from": forwarded_timestamp,
        "grouped_id": message.grouped_id,
        "attachment_type": attachment_type,
        "attachment_data": attachment_data
    }

    return msg

def get_media_type(media):
    """
    Get media type for a Telegram attachment
    :param media:  Media object
    :return str:  Textual identifier of the media type
    """
    try:
        return {
            "NoneType": "",
            "MessageMediaContact": "contact",
            "MessageMediaDocument": "document",
            "MessageMediaEmpty": "",
            "MessageMediaGame": "game",
            "MessageMediaGeo": "geo",
            "MessageMediaGeoLive": "geo_live",
            "MessageMediaInvoice": "invoice",
            "MessageMediaPhoto": "photo",
            "MessageMediaPoll": "poll",
            "MessageMediaUnsupported": "unsupported",
            "MessageMediaVenue": "venue",
            "MessageMediaWebPage": "url"
        }[type(media).__name__]
    except KeyError:
        return ""

In [None]:
#@title Setup 4: Initialize Telegram Client
init_client()

In [None]:
#@title Tool: Retrieve Messages From Telegram Channels and Get User Data (comma separate multiple channels)
parameters = dict()
Channels = "" #@param {type:"string"}
NumItems =  10#@param {type:"integer"}
GetUserData = True #@param {type:"boolean"}

parameters = {
    "query": Channels,
    "items": NumItems,
    "scrape-userinfo": GetUserData
}

results = process(parameters)
outpath = str(time.time()).split('.')[0]+'_'+ parameters['query'].replace(',', '_').replace(' ', '_').replace('__', '_') +'_' + str(parameters['items']) + '_items_each'

# Create outpath
if not os.path.isdir(os.path.join(data_path, outpath)):
  os.makedirs(os.path.join(data_path, outpath))

# Save messages.csv
results[0].to_csv(os.path.join(data_path, outpath, "messages.csv"), sep='\t', index=None)

# Save raw_messages.csv
pd.json_normalize(results[1]).to_csv(os.path.join(data_path, outpath, "raw_messages.csv"), sep='\t', index=None)

# Save users.csv
if parameters['scrape-userinfo']:
  results[2].to_csv(os.path.join(data_path, outpath, "users.csv"), sep='\t', index=None)

print()



print('Stored results in Google Drive path: {}'.format('/'.join(os.path.join(data_path, outpath).split('/')[2:])))