# SELECT FROM Database

In [None]:
import psycopg2
from psycopg2.extras import RealDictCursor
from psycopg2.extras import Json


def select_from_database(select_sql: str):
    pg_connection = {
        'dbname': "telegram",
        'user': os.getenv("ADL_CEO_POSTGRES_USERNAME"),
        'password': os.getenv("ADL_CEO_POSTGRES_PASSWORD"),
        'port': os.getenv("ADL_CEO_POSTGRES_PORT"),
        'host': os.getenv("ADL_CEO_POSTGRES_ENDPOINT")
    }

    conn = psycopg2.connect(**pg_connection)
    with conn.cursor(cursor_factory=RealDictCursor) as curs:
        curs.execute(select_sql)
        query_result = curs.fetchall()
        result_dict = [dict(row) for row in query_result]
    return result_dict


In [None]:
from datetime import datetime, timedelta


select_sql = """
SELECT *
FROM dialogs
WHERE active=True
"""
dialogs = select_from_database(select_sql)
dialog_list = [i for i in dialogs if i["download_photos"] or i["download_videos"] or i["download_documents"] or i["download_webpage_media"]]

for dialog in dialog_list:
    dialog_id = dialog["id"]
    if dialog["download_photos_override_dt"]:
        download_photos_since = dialog["download_photos_override_dt"]
    else:
        download_photos_since = dialog["db_date_added"] - timedelta(days=30)
    select_sql = f"""
    SELECT
        messages_media.media_id,
        messages_media.media_type,
        messages_media.mime_type,
        messages_media.missing_media,
        messages.date,
        messages.dialog_id,
        messages.message_id,
        messages.public_url
    FROM
        messages_media
        LEFT JOIN messages ON messages_media.media_id = messages.media_id
    WHERE
        messages.dialog_id = {dialog_id}
        AND messages.date >= '{download_photos_since}'
        AND s3_bucket is NULL
        AND s3_key is NULL
        AND missing_media is False
    ORDER BY messages.date DESC
    LIMIT 1000
    """
    messages_media_for_dialog = select_from_database(select_sql)
    if messages_media_for_dialog:
        print(messages_media_for_dialog[0])
        break

#     messages_to_scrape = []
#     if dialog["download_photos"]:
#         if dialog["download_webpage_media"]:
#             ids_missing_photo_media = [i for i in messages_media_for_dialog if i["media_type"] and "photo" in i["media_type"]]
#         else:
#             ids_missing_photo_media = [i for i in messages_media_for_dialog if i["media_type"] == "photo"]
#         print(f"Scraping {len(ids_missing_photo_media)} photos from Telegram")
#         messages_to_scrape.extend(ids_missing_photo_media)
#     if dialog["download_videos"]:
#         if dialog["download_webpage_media"]:
#             ids_missing_video_media = [i for i in messages_media_for_dialog if i["mime_type"] and "video" in i["mime_type"]]
#         else:
#             ids_missing_video_media = [i for i in messages_media_for_dialog if i["media_type"] == "document" and i["mime_type"] and "video" in i["mime_type"]]
#         print(f"Scraping {len(ids_missing_video_media)} videos from Telegram")
#         messages_to_scrape.extend(ids_missing_video_media)

# print(len(messages_to_scrape))

# 'subscriptions' and 'subscription_updates' tables

In [None]:
import json
from datetime import datetime


def get_updates_for_dialog_tables(dialog_data):
    """_summary_

    Args:
        dialog_data (_type_): _description_

    Returns:
        _type_: _description_
    """
    dialog_tables = {}

    dialogs = {}
    dialogs["id"] = dialog_data["entity"]["id"]
    dialogs["access_hash"] = dialog_data["entity"]["access_hash"]
    dialogs["is_user"] = dialog_data["is_user"]
    dialogs["is_group"] = dialog_data["is_group"]
    dialogs["is_channel"] = dialog_data["is_channel"]
    dialogs["update_dt"] = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S+0000")
    dialog_tables["dialogs"] = dialogs

    dialog_updates = {}
    dialog_updates["dialog_id"] = dialog_data["entity"]["id"]
    dialog_updates["title"] = dialog_data["entity"]["title"]
    dialog_updates["broadcast"] = dialog_data["entity"]["broadcast"]
    dialog_updates["verified"] = dialog_data["entity"]["verified"]
    dialog_updates["megagroup"] = dialog_data["entity"]["megagroup"]
    dialog_updates["restricted"] = dialog_data["entity"]["restricted"]
    dialog_updates["scam"] = dialog_data["entity"]["scam"]
    dialog_updates["has_link"] = dialog_data["entity"]["has_link"]
    dialog_updates["slowmode_enabled"] = dialog_data["entity"]["slowmode_enabled"]
    dialog_updates["fake"] = dialog_data["entity"]["fake"]
    dialog_updates["gigagroup"] = dialog_data["entity"]["gigagroup"]
    dialog_updates["noforwards"] = dialog_data["entity"]["noforwards"]
    dialog_updates["username"] = dialog_data["entity"]["username"]
    # dialog_updates["restriction_reason"] = dialog_data["entity"]["restriction_reason"] Error: 'is of type json[] but expression is of type text[]'
    dialog_updates["participants_count"] = dialog_data["entity"]["participants_count"]
    dialog_updates["update_dt"] = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S+0000")

    dialog_media = dialog_data["entity"]["photo"]
    if "photo_id" in dialog_media:
        dialog_media["dialog_id"] = dialog_data["entity"]["id"]
        dialog_tables["dialog_media"] = dialog_media
        dialog_updates["photo_id"] = dialog_media["photo_id"]
    else:
        dialog_media["dialog_id"] = None
        dialog_updates["photo_id"] = None
    dialog_tables["dialog_updates"] = dialog_updates
    return dialog_tables


f = open("/Users/aaron/Dev/Repos/ADL/telegram_ingest/telegram-scrape/s3/test/subscription-files/1663677247.json")
dialog_data_list = json.load(f)

dialog_data = dialog_data_list[0]
updates = get_updates_for_dialog_tables(dialog_data)
updates

# 'messages' and 'messages_media' table updates

In [None]:
import copy
import time
import boto3
import json
from datetime import datetime, timezone
from pprint import pprint
import pandas as pd
import psycopg2
from psycopg2.extras import RealDictCursor
from psycopg2.extras import Json


bucket_name = "telegram-scrape"
key_name = "messages/staging/"
s3 = boto3.client('s3')
bucket_contents = s3.list_objects_v2(Bucket=bucket_name, Prefix=key_name)["Contents"]
bucket_keys = [i for i in bucket_contents if i["Key"] != "messages/staging/"]
# filter_for = [
#     "messages/staging/1430826036-from-106262-to-125129.json",
#     "messages/staging/1446913475-from-66880-to-71105.json",
#     "messages/staging/1277769680-from-106763-to-128833.json",
# ]
date_filter = datetime(year=2022, month=9, day=25, tzinfo=timezone.utc)
filtered_keys = [i for i in bucket_keys if i["LastModified"] >= date_filter]

print(f"Downloading {len(filtered_keys)} files from S3.")


telegram_messages = []
for file in filtered_keys:
    f = s3.get_object(Bucket=bucket_name, Key=file["Key"])
    download = f["Body"].read()
    json_file = json.loads(download.decode('utf-8'))
    print(f"{file['Key']} contains {len(json_file)} messages.")
    telegram_messages.extend(json_file)
print(f"Found {len(filtered_keys)} file with a total of {len(telegram_messages)} messages.")


def get_updates_for_message_tables(telegram_messages) -> (list, list):
    """Transform Telegram messages data and create lists that
    will be used to update the 'messages' and 'media' tables
    in the telegram database.

    The input for this function are the S3 files stored in the
    telegram-scrape/messages/staging directory. This data is
    parsed and split into two lists:
        1. messages_table: Used to update the 'messages' table
        2. media_table: Used to update the 'media' table
    """
    messages_table = [] # Updates for 'messages' table
    media_attributes = [] # List of media attributes that need to be formatted before inserting into database
    for i in range(len(telegram_messages)):
        message = telegram_messages[i]
        message_data = {}
        try:
            message_data["id"] = str(message["dialog_id"]) + str(message["id"])
            message_data["dialog_id"] = message["dialog_id"]
        except:
            # renamed database column from subscription_id to dialog_id, older files may still have previous name
            message_data["id"] = str(message["subscription_id"]) + str(message["id"])
            message_data["dialog_id"] = message["subscription_id"]
        message_data["message_id"] = message["id"]
        message_data["date"] = message["date"]
        message_data["from_id"] = message["from_id"]
        message_data["message"] = message["message"]
        message_data["pinned"] = message["pinned"]
        message_data["post_author"] = message["post_author"]
        message_data["private_url"] = message["private_url"]
        message_data["public_url"] = message["public_url"]
        message_data["sender_username"] = message["sender"]
        reply_to = message["reply_to"]
        if reply_to and "reply_to_msg_id" in reply_to:
            message_data["reply_to_msg_id"] = reply_to["reply_to_msg_id"]
        else:
            message_data["reply_to_msg_id"] = None
        if reply_to and "reply_to_peer_id" in reply_to:
            message_data["reply_to_peer_id"] = reply_to["reply_to_peer_id"]
        else:
            message_data["reply_to_peer_id"] = None
        fwd_from = message["fwd_from"]
        if fwd_from:
            if fwd_from["from_id"] and "channel_id" in fwd_from["from_id"]:
                message_data["fwd_from_channel_id"] = fwd_from["from_id"]["channel_id"]
            else:
                message_data["fwd_from_channel_id"] = None
            if fwd_from["from_id"] and "user_id" in fwd_from["from_id"]:
                message_data["fwd_from_user_id"] = fwd_from["from_id"]["user_id"]
            else:
                message_data["fwd_from_user_id"] = None
            message_data["fwd_from_date"] = message["fwd_from"]["date"]
            message_data["fwd_from_from_name"] = message["fwd_from"]["from_name"]
            message_data["fwd_from_message_id"] = message["fwd_from"]["channel_post"]
            message_data["fwd_from_post_author"] = message["fwd_from"]["post_author"]
        else:
            message_data["fwd_from_channel_id"] = None
            message_data["fwd_from_user_id"] = None
            message_data["fwd_from_date"] = None
            message_data["fwd_from_from_name"] = None
            message_data["fwd_from_message_id"] = None
            message_data["fwd_from_post_author"] = None

        # Add webpage media to 'message' and extract any media to be placed in the 'message_media' table
        if message["media"] and "webpage" in message["media"] and "url" in message["media"]["webpage"]:
            webpage_data = message["media"]["webpage"]
            message_data["webpage_url"] = webpage_data["url"]
            message_data["webpage_type"] = webpage_data["type"]
            message_data["webpage_site_name"] = webpage_data["site_name"]
            message_data["webpage_title"] = webpage_data["title"]
            message_data["webpage_description"] = webpage_data["description"]
            message_data["author"] = webpage_data["author"]
            if "photo" in webpage_data and webpage_data["photo"] and "id" in webpage_data["photo"]:
                message_data["webpage_photo_media_id"] = webpage_data["photo"]["id"]
                webpage_data["photo"]["media_type"] = "webpage_photo"
                webpage_data["photo"]["message_id"] = message["id"]
                try:
                    webpage_data["photo"]["dialog_id"] = message["dialog_id"]
                except:
                # renamed database column from subscription_id to dialog_id, older files may still have previous name
                    webpage_data["photo"]["subscription_id"] = message["subscription_id"]
                media_attributes.append(message["media"]["webpage"]["photo"])
            else:
                message_data["webpage_photo_media_id"] = None

            if "document" in webpage_data and webpage_data["document"] and "id" in webpage_data["document"]:
                message_data["webpage_document_media_id"] = webpage_data["document"]["id"]
                webpage_data["document"]["media_type"] = "webpage_document"
                webpage_data["document"]["message_id"] = message["id"]
                try:
                    webpage_data["document"]["dialog_id"] = message["dialog_id"]
                except:
                # renamed database column from subscription_id to dialog_id, older files may still have previous name
                    webpage_data["document"]["subscription_id"] = message["subscription_id"]
                media_attributes.append(message["media"]["webpage"]["document"])
            else:
                message_data["webpage_document_media_id"] = None
        else:
            message_data["webpage_url"] = None
            message_data["webpage_type"] = None
            message_data["webpage_site_name"] = None
            message_data["webpage_title"] = None
            message_data["webpage_description"] = None
            message_data["author"] = None
            message_data["webpage_photo_media_id"] = None
            message_data["webpage_document_media_id"] = None

        if message["media"] and "photo" in message["media"]:
            message_data["media_id"] = message["media"]["photo"]["id"]
            message["media"]["photo"]["media_type"] = "photo"
            message["media"]["photo"]["message_id"] = message["id"]
            try:
                message["media"]["photo"]["dialog_id"] = message["dialog_id"]
            except:
            # renamed database column from subscription_id to dialog_id, older files may still have previous name
                message["media"]["photo"]["subscription_id"] = message["subscription_id"]
            media_attributes.append(message["media"]["photo"])

        elif message["media"] and "document" in message["media"]:
            message_data["media_id"] = message["media"]["document"]["id"]
            message["media"]["document"]["media_type"] = "document"
            message["media"]["document"]["message_id"] = message["id"]
            try:
                message["media"]["document"]["dialog_id"] = message["dialog_id"]
            except:
            # renamed database column from subscription_id to dialog_id, older files may still have previous name
                message["media"]["document"]["subscription_id"] = message["subscription_id"]
            media_attributes.append(message["media"]["document"])
        else:
            message_data["media_id"] = None
        messages_table.append(message_data)

    return messages_table, media_attributes


def standardize_media_attributes(media_attributes) -> list:
    media_table = []
    for i in range(len(media_attributes)):
        m = {}
        attributes = media_attributes[i].get("attributes")
        if attributes:
            duration_key = [a for a in attributes if "duration" in a]
            if duration_key:
                m["media_duration"] = duration_key[0]["duration"]
            else:
                m["media_duration"] = None
            video_width_key = [a for a in attributes if "w" in a]
            if video_width_key:
                m["video_width"] = video_width_key[0]["w"]
            else:
                m["video_width"] = None
            video_height_key = [a for a in attributes if "h" in a]
            if video_height_key:
                m["video_height"] = video_height_key[0]["h"]
            else:
                m["video_height"] = None
            file_name_key = [a for a in attributes if "file_name" in a]
            if file_name_key:
                m["media_filename"] = file_name_key[0]["file_name"]
            else:
                m["media_filename"] = None
        else:
            m["media_duration"] = None
            m["video_width"] = None
            m["video_height"] = None
            m["media_filename"] = None
        m["media_id"] = media_attributes[i].get("id")
        m["access_hash"] = media_attributes[i].get("access_hash")
        m["file_reference"] = media_attributes[i].get("file_reference")
        m["date"] = media_attributes[i].get("date")
        m["dc_id"] = media_attributes[i].get("dc_id")
        m["has_stickers"] = media_attributes[i].get("has_stickers")
        m["mime_type"] = media_attributes[i].get("mime_type")
        m["size"] = media_attributes[i].get("size")
        m["media_type"] = media_attributes[i].get("media_type")
        m["message_id"] = media_attributes[i].get("message_id")
        try:
            m["dialog_id"] = media_attributes[i]["dialog_id"]
            m["dialog_message_id"] = str(media_attributes[i]["dialog_id"]) + str(media_attributes[i].get("message_id"))
        except:
            m["dialog_id"] = media_attributes[i]["subscription_id"]
            m["dialog_message_id"] = str(media_attributes[i]["subscription_id"]) + str(media_attributes[i].get("message_id"))
        media_table.append(m)
    return media_table


def insert_into_database_table(table_name, insert_list, pk_col = "id"):
    col_names = ", ".join(insert_list[0].keys())
    insert_values = [tuple(e.values()) for e in insert_list]

    with psycopg2.connect(**pg_connection) as conn:
        with conn.cursor(cursor_factory=RealDictCursor) as curs:
            sql = f"INSERT INTO {table_name} ({col_names}) VALUES %s ON CONFLICT ({pk_col}) DO NOTHING"
            psycopg2.extras.execute_values(curs, sql, insert_values, page_size=1000)


messages_table, media_attributes = get_updates_for_message_tables(telegram_messages)
media_table = standardize_media_attributes(media_attributes)

pg_connection = {
    'dbname': "telegram",
    'user': os.getenv("ADL_CEO_POSTGRES_USERNAME"),
    'password': os.getenv("ADL_CEO_POSTGRES_PASSWORD"),
    'port': os.getenv("ADL_CEO_POSTGRES_PORT"),
    'host': os.getenv("ADL_CEO_POSTGRES_ENDPOINT")
}

print(f"Inserting {len(messages_table)} into messages_table")
insert_into_database_table("messages", messages_table)

print(f"Inserting {len(media_table)} into media_table")
pk_col = "media_id"
insert_into_database_table("messages_media", media_table, pk_col=pk_col)



In [None]:
import boto3
import time
import json
import psycopg2
from psycopg2.extras import RealDictCursor
from psycopg2.extras import Json
from tqdm import tqdm



def extract_data_from_s3(bucket_name, key_name):
    all_response_contents = []
    truncated = True
    s3 = boto3.client('s3')
    while truncated:
        if not all_response_contents:
            response = s3.list_objects_v2(Bucket=bucket_name, Prefix=key_name)
            resp_contents = response["Contents"]
            all_response_contents.extend(resp_contents)
            truncated = response["IsTruncated"]
            token = response.get("NextContinuationToken")
        else:
            response = s3.list_objects_v2(Bucket=bucket_name, Prefix=key_name, ContinuationToken=token)
            resp_contents = response["Contents"]
            all_response_contents.extend(resp_contents)
            truncated = response["IsTruncated"]
            token = response.get("NextContinuationToken")
    extracted_data = [i["Key"] for i in all_response_contents if i["Key"] != key_name]
    return extracted_data


def transform_data(extracted_data):
    transformed_data = []
    for i in range(len(extracted_data)):
        transformed_data.append(
            {
                "media_id": int(extracted_data[i].split("-")[-1].replace(".jpg", "")),
                "s3_bucket": "telegram-scrape",
                "s3_key": extracted_data[i],
                "media_id_conflict": "NULL"
            }
        )
    return transformed_data


# bucket_name = "telegram-scrape"
# key_name = "media/"
# extracted_data  = extract_data_from_s3(bucket_name, key_name)
# transformed_data = transform_data(extracted_data)
# transformed_data[0]
pg_connection = {
    'dbname': "telegram",
    'user': os.getenv("ADL_CEO_POSTGRES_USERNAME"),
    'password': os.getenv("ADL_CEO_POSTGRES_PASSWORD"),
    'port': os.getenv("ADL_CEO_POSTGRES_PORT"),
    'host': os.getenv("ADL_CEO_POSTGRES_ENDPOINT")
}

with psycopg2.connect(**pg_connection) as conn:
    for record in tqdm(transformed_data[0:1]):
        with conn.cursor(cursor_factory=RealDictCursor) as curs:
            update_sql = f"""
            UPDATE messages_media
            SET s3_bucket='{record["s3_bucket"]}', s3_key='{record["s3_key"]}', media_id_conflict={record["media_id_conflict"]}
            WHERE media_id={record["media_id"]}
            """
            curs.execute(update_sql)

print("OK")

# Read JSON File

In [9]:
import json
  
# Opening JSON file
f = open("/Users/aaron/Dev/Repos/ADL/telegram_ingest/telegram-scrape/messages/1414255764-from-2-to-22239-(20787).json")
  
# returns JSON object as 
# a dictionary
data = json.load(f)

# Iterating through the json
for i in range(len(data)):
    if data[i]["media"]:
        if "photo" in data[i]["media"]:
            try:
                media_id = data[i]["media"]["photo"]["id"]
            except:
                print(data[i])
                break
            


{'out': False, 'mentioned': False, 'media_unread': False, 'silent': False, 'post': True, 'from_scheduled': False, 'legacy': False, 'edit_hide': False, 'id': 16478, 'from_id': None, 'peer_id': {'channel_id': 1414255764}, 'fwd_from': None, 'via_bot_id': None, 'reply_to': None, 'date': '2022-03-03 21:00:19+0000', 'message': '', 'media': {'title': 'Donation', 'description': 'for @MurderTheMedia', 'currency': 'EUR', 'total_amount': 100, 'start_param': '', 'shipping_address_requested': False, 'test': False, 'photo': None, 'receipt_msg_id': None}, 'reply_markup': {'rows': [{'buttons': [{'text': 'Donate'}]}]}, 'entities': None, 'views': 1837, 'forwards': 4, 'replies': None, 'edit_date': None, 'pinned': False, 'noforwards': False, 'post_author': None, 'grouped_id': None, 'reactions': None, 'restriction_reason': None, 'ttl_period': None, 'action': None, '_client': {'session': {}, 'api_id': 4420582, 'api_hash': '12322a8096d3b7691a71b7df47abdbce'}, '_text': None, '_file': None, '_reply_message': N

In [12]:
from pprint import pprint

pprint(data[i])

{'_action_entities': None,
 '_broadcast': True,
 '_buttons': None,
 '_buttons_count': None,
 '_buttons_flat': None,
 '_chat': {'access_hash': 7716041229790944475,
           'admin_rights': None,
           'banned_rights': None,
           'broadcast': True,
           'call_active': True,
           'call_not_empty': True,
           'creator': False,
           'date': '2021-05-10 21:54:03+0000',
           'default_banned_rights': None,
           'fake': False,
           'gigagroup': False,
           'has_geo': False,
           'has_link': True,
           'id': 1414255764,
           'join_request': False,
           'join_to_send': False,
           'left': False,
           'megagroup': False,
           'min': False,
           'noforwards': False,
           'participants_count': None,
           'photo': {'dc_id': 1,
                     'has_video': True,
                     'photo_id': 5105101375200668995,
                     'stripped_thumb': [1,
                    

# Read JSON Lines File

In [None]:
import json

data = []
with open('/Users/aaron/Dev/Repos/ADL/telegram_ingest/telegram-scrape/messages/archive.jsonl') as f:
    for line in f:
        data.append(json.loads(line))

In [None]:
import pandas as pd


df = pd.DataFrame(data)
df.to_excel("/Users/aaron/Dev/Repos/ADL/telegram_ingest/telegram-scrape/messages/archive.xlsx")