## Libraries

In [203]:
import subprocess
import json
import pandas as pd
import re
import datetime as dt

import clickhouse_connect

import time
from __future__ import annotations
from yandex_cloud_ml_sdk import YCloudML

## TDL export

In [204]:
def run_tdl_export():
    command = [
        "tdl",
        "chat",
        "export",
        "-n",
        "quickstart",
        "-c",
        "<YOUR TELEGRAM CHANNEL ID>", 
        "--all",
        "--with-content",
        "-o",
        "C:/db/tg.json"
    ]

    subprocess.run(command)

In [205]:
if __name__ == "__main__":
    run_tdl_export()

In [206]:
with open('C:\\db\\tg.json', 'r', encoding='utf-8') as f:
    data = json.load(f)

## Read current table from ClickHouse

In [207]:
client = clickhouse_connect.get_client(
    host='<YOUR HOST>',        
    port='<YOUR PORT>',            
    database='<YOUR DATABASE>',  
    username='<YOUR USERNAME>', 
    password='<YOUR PASSWORD>',  
    client_cert_key='<YOUR CERTIFICATE ROOT>' 
)

In [208]:
read_query = "SELECT * FROM tg_topics"
current_table = client.query_df(read_query)

In [209]:
existing_ids = set(current_table['id'].tolist())

## Data preparation

In [175]:
# Functions

In [210]:
# Function to convert UNIX timestamp to a formatted string
def convert_unix_to_date(timestamp):
    dt_object = dt.datetime.utcfromtimestamp(timestamp)
    formatted_date = dt_object.strftime('%Y-%m-%d')
    return formatted_date

In [211]:
# Getting hashtags into list
pattern = re.compile(r'#(\w+)')

def find_hashtags(text):
    return [hashtag.lower() for hashtag in pattern.findall(str(text))]

In [212]:
# Getting hashtags into string
def find_hashtags_string(text):
    return ', '.join([hashtag.lower() for hashtag in pattern.findall(str(text))])

In [213]:
# Classify some series
def text_category(text):   
    phrases_categories = {
        'регрессиада': 'Регрессиада',
        'вы возглавляете hr-аналитику. что теперь?': 'Вы возглавляете HR-аналитику. Что теперь?',
        'анализ hr вакансий': 'Анализ HR вакансий',
        'hr дашборд в excel': 'HR дашборд в Excel',
        'байесовский фреймворк и hr задачи': 'Байесовский фреймворк и HR задачи',
        'нормальное распределение': 'Обзорная статистическая серия',
        'логарифмирование и преобразование переменных': 'Обзорная статистическая серия',
        'доверительные интервалы': 'Обзорная статистическая серия',
        'корреляция. теория': 'Обзорная статистическая серия',
        'корреляция. практика': 'Обзорная статистическая серия',
        'статистическая значимость и размер эффекта. теория': 'Обзорная статистическая серия',
        'статистическая значимость и размер эффекта. практика': 'Обзорная статистическая серия',
        'как перестать бояться и полюбить r': 'Как перестать бояться и полюбить R'
    }
    text_lower = text.lower()
    
    for phrase, category in phrases_categories.items():
        if phrase in text_lower:
            return category
        
    return None 

In [214]:
# Function to get summary from YandexGPT
sdk = YCloudML(
    folder_id="<YOUR FOLDER ID>", 
    auth="<YOUR TOKEN>"
)
model = sdk.models.completions("yandexgpt")
model = model.configure(temperature=0.5)

# Generate summaries for all rows with a delay
def get_summary(text, delay=1):
    
    if not text:
        return ""
    try:
        time.sleep(delay)
        messages = [
            {"role": "system", "text": "Сделай очень краткий пересказ текста. Длина не более 200 знаков."},
            {"role": "user", "text": text},
        ]
        result = model.run(messages)
        for alternative in result:
            return alternative.text.strip()
    except Exception as e:
        print(f"Error processing text: {e}")
        return ""

    
# Generate summaries only for new rows
def conditional_summary(row, delay=1):
    if row['is_new']:
        time.sleep(delay)
        return get_summary(row['text'], delay=0)
    else:
        return current_table.loc[current_table['id'] == row['id'], 'summary'].values[0]

In [215]:
# Write data into pandas dataframe
df = pd.json_normalize(data['messages'])
df['is_new'] = ~df['id'].isin(existing_ids)

In [187]:
# Add columns
df['date'] = df['date'].apply(convert_unix_to_date)
df['title'] = df['text'].apply(lambda x: str(x).split('\n')[0])
df['link'] = 'https://t.me/h0h1_hr_analytics/' + df['id'].astype(str)
df['hashtags'] = df['text'].apply(find_hashtags)
df['hashtags_string'] = df['text'].apply(find_hashtags_string)
df['summary'] = df.apply(conditional_summary, axis=1)
# df['summary'] = df['text'].apply(get_summary)
df['refreshed_date'] =  dt.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
df['series'] = df['title'].apply(text_category)

# Remove columns
df = df.drop(columns=['is_new'])

## Writing into data base

In [188]:
create_table_query = """
    CREATE TABLE IF NOT EXISTS tg_topics (
        id Int32,
        type String,
        file String,
        date Date,
        text String,
        title String,
        link String,
        hashtags Array(String),
        hashtags_string String,
        summary String,
        refreshed_date DateTime,
        series String
    ) ENGINE = MergeTree()
    ORDER BY id
"""

client.command(create_table_query)

<clickhouse_connect.driver.summary.QuerySummary at 0x1f3d1494040>

In [189]:
truncate_query = "TRUNCATE TABLE tg_topics"
client.command(truncate_query)

<clickhouse_connect.driver.summary.QuerySummary at 0x1f3d1495510>

In [190]:
csv_data = df.to_csv(index=False)

In [191]:
insert_query = f"INSERT INTO tg_topics FORMAT CSV {csv_data}"
client.command(insert_query)

<clickhouse_connect.driver.summary.QuerySummary at 0x1f3cfcbaaa0>