In [2]:
import os
import gzip
import shutil
import requests
import xmltodict

from tqdm import tqdm

from bs4 import BeautifulSoup

# Обработка данных

## Комментарии

In [1]:
import gc
import polars as pl

In [2]:
df = pl.read_parquet("Article 3 Data/posts/gathered.parquet")

In [29]:
df = df.with_columns(pl.when(pl.col('tickers').list.len() > 0).then(pl.col('tickers')),
                     pl.when(pl.col('prices').list.len() > 0).then(pl.col('prices')),
                     pl.when(pl.col('comments').list.len() > 0).then(pl.col('comments')),
                     pl.when(pl.col('reactions').list.len() > 0).then(pl.col('reactions')))

In [30]:
dct = {'января': '01', 'февраля': '02', 'марта': '03', 'апреля': '04', 'мая': '05', 'июня': '06', 'июля': '07', 'августа': '08', 'сентября': '09', 'октября': '10', 'ноября': '11', 'декабря': '12'}

In [31]:
def preprocess_timestamp(df, timestamp_col, drop_nulls=False):
    df = df.with_columns(pl.col(timestamp_col).str.replace_many(list(dct.keys()), list(dct.values())))
    df = df.with_columns(pl.col(timestamp_col).str.replace(' в', ''))
    df = df.with_columns(pl.col(timestamp_col).str.to_datetime('%d %m %Y %H:%M', strict=False))
    if drop_nulls:
        df = df.drop_nulls(timestamp_col)
    df = df.with_columns(pl.col('message').str.replace_many(['\n', '\xa0', '&nbsp;'], [' ', ' ', ' ']))
    return df

df = preprocess_timestamp(df, 'timestamp', drop_nulls=True)

In [32]:
df = df.filter(~((pl.col('message') == 'empty') | (pl.col('message') == 'error'))).unique().sort('timestamp')

In [42]:
df.write_parquet('Article 3 Data/posts/preprocessed.parquet')

#### Обработка комментариев к комментариям

In [4]:
df = pl.read_parquet('Article 3 Data/posts/preprocessed.parquet')

In [28]:
# df.filter(pl.col('n_comments').rank(descending=True)==100)

In [34]:
from tqdm import tqdm

def w_pbar(pbar, func):
    def foo(*args, **kwargs):
        pbar.update(1)
        return func(*args, **kwargs)

    return foo

pbar = tqdm(total=len(df))

  0%|          | 0/56712 [00:00<?, ?it/s]

In [35]:
def expand_comments(comments):
    # Инициализируем список значений None для 5 комментариев
    expanded = [None] * 10  # 5 дат + 5 комментариев
    
    if comments is not None:
        for i, (date, comment) in enumerate(comments):
            expanded[i] = date       # Дата i-го комментария
            expanded[i + 5] = comment # Комментарий i-го комментария
    
    return expanded

# Применяем функцию и создаем новые колонки
df = df.with_columns(
    pl.col("comments").map_elements(
        w_pbar(pbar, lambda x: expand_comments(x)),
        return_dtype=pl.List(pl.Utf8)
    ).alias("expanded_comments")
)

# Разделяем expanded_comments на отдельные колонки
df = df.with_columns(
    *[
        pl.col("expanded_comments").list.get(i).alias(f"{prefix}_{j+1}")
        for i in range(10)
        for j, prefix in [(i, "date") if i < 5 else (i-5, "comment")]
    ]
).drop("expanded_comments")

 68%|██████▊   | 38797/56712 [00:03<00:00, 28705.19it/s]

In [37]:
for i in range(5):
    df = preprocess_timestamp(df, f'date_{i+1}', drop_nulls=False)

In [38]:
df

author,id,timestamp,message,tickers,prices,n_comments,comments,reactions,n_reactions,date_1,date_2,date_3,date_4,date_5,comment_1,comment_2,comment_3,comment_4,comment_5
str,str,datetime[μs],str,list[str],list[str],i64,list[list[str]],list[str],i64,datetime[μs],datetime[μs],datetime[μs],datetime[μs],datetime[μs],str,str,str,str,str
"""T-Journal""","""5908908c-0c37-3026-96f2-e926a6…",2018-02-13 11:48:00,"""Годовой отчет Nvidia: рост выр…",,,1,"[[""10 августа 2023 в 13:44"", ""10""]]",,,2023-08-10 13:44:00,,,,,"""10""",,,,
"""vc.ru""","""db161da4-646c-3be8-a0a7-509483…",2018-06-26 17:18:00,"""Калифорнийский стартап Ice Ene…","[""LNT"", ""CNP""]","[""42,28$+45,74%"", ""27,33$+39,96%""]",,,,,,,,,,,,,,
"""Interfax""","""5db638fc-3ac7-33bb-8431-6c7f15…",2018-06-28 08:08:00,"""Apple и Samsung урегулировали …","[""AAPL""]","[""46,38$+354,53%""]",,,,,,,,,,,,,,
"""vc.ru""","""a71d23b9-5c90-384e-9762-5509ac…",2018-06-30 08:39:00,"""Мессенджер WhatsApp представил…",,,,,,,,,,,,,,,,
"""vc.ru""","""5bb400ee-2e25-3443-9683-a3c0e7…",2018-07-26 11:36:00,"""Редакция российского Forbes об…",,,,,,,,,,,,,,,,
…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…
"""Dmitry1261988""","""ebeae89c-855a-48a7-bac4-aedf8d…",2025-04-13 14:45:00,"""$FLOT Ну что где ваши анализы …","[""FLOT""]","[""85,4₽+6,9%""]",5,"[[""13 апреля 2025 в 15:12"", ""Что случилось 😳, где торги, или это стакан на меня смотрит""], [""13 апреля 2025 в 15:13"", ""@lexx400 хотят всех на куканить""], … [""13 апреля 2025 в 15:19"", ""@Dmitry1261988 ну опять 25, переговоры и ещё одни переговоры. По черноморской сделки может уже решат. Буду ждать. Спасибо. Не является иир""]]",,,2025-04-13 15:12:00,2025-04-13 15:13:00,2025-04-13 15:14:00,2025-04-13 15:17:00,2025-04-13 15:19:00,"""Что случилось 😳, где торги, ил…","""@lexx400 хотят всех на куканит…","""Ну с ВТБ это уже терпимо, а по…","""@lexx400 только если из наших …","""@Dmitry1261988 ну опять 25, пе…"
"""Torreodor8888""","""cc461b86-4663-473c-9b6e-10181c…",2025-04-13 15:00:00,"""$GAZP как думаете, планку раск…","[""GAZP""]","[""135,95₽+13,95%""]",5,"[[""13 апреля 2025 в 15:04"", ""Можно неспеша шорт набирать пока ракетчикам заявки на покупку исполняют""], [""13 апреля 2025 в 15:05"", ""@Adition риск большой""], … [""13 апреля 2025 в 15:09"", ""@PlankTowa шо даже тут на бирже ?🤭🙄""]]","[""083b50db-7d1b-43a0-aae3-88a73e8c3c39?size=small"", ""083b50db-7d1b-43a0-aae3-88a73e8c3c39?size=small"", ""083b50db-7d1b-43a0-aae3-88a73e8c3c39?size=small""]",1,2025-04-13 15:04:00,2025-04-13 15:05:00,2025-04-13 15:05:00,2025-04-13 15:07:00,2025-04-13 15:09:00,"""Можно неспеша шорт набирать по…","""@Adition риск большой""","""@Torreodor8888 он всегда и вез…","""@Torreodor8888 я думаю кто сей…","""@PlankTowa шо даже тут на бир…"
"""Analis_Rosta""","""11e8e6a2-ae51-4f84-9d49-c6c084…",2025-04-13 15:20:00,"""📊 Анализ графика акции ММК $MA…","[""MAGN""]","[""33,5₽+3,63%""]",1,"[[""14 апреля 2025 в 8:08"", ""Поддержка оказалась неуверенной, не расстраивайтесь, вы типичный аналитик""]]","[""083b50db-7d1b-43a0-aae3-88a73e8c3c39?size=small"", ""c53ba585-196d-48eb-a463-8d7fd3d6d141?size=small""]",3,2025-04-14 08:08:00,,,,,"""Поддержка оказалась неуверенно…",,,,
"""andreses85""","""ab2e616b-1535-4e9d-bf5c-a826cb…",2025-04-13 16:05:00,"""На побережье Анапы найдены нов…",,,20,"[[""13 апреля 2025 в 16:08"", ""Это будет всегда... Пока есть порт...""], [""13 апреля 2025 в 16:08"", ""И до аварии это было... И будет...""], … [""13 апреля 2025 в 16:13"", ""Добро пожаловать на отдых!""]]","[""083b50db-7d1b-43a0-aae3-88a73e8c3c39?size=small"", ""083b50db-7d1b-43a0-aae3-88a73e8c3c39?size=small"", … ""083b50db-7d1b-43a0-aae3-88a73e8c3c39?size=small""]",10,2025-04-13 16:08:00,2025-04-13 16:08:00,2025-04-13 16:09:00,2025-04-13 16:09:00,2025-04-13 16:13:00,"""Это будет всегда... Пока есть …","""И до аварии это было... И буде…","""В космосе сколько...""","""На помойках сколько мусора...""","""Добро пожаловать на отдых!"""


In [None]:
# df.write_parquet('Article 3 Data/posts/preprocessed.parquet')

## Профили

In [5]:
df = pl.read_parquet('Article 3 Data/Profiles/gathered.parquet')

In [175]:
df = df.rename({'column_0': 'staus', 'column_1': 'size', 'column_2': 'n_subscribtions',
                 'column_3': 'n_subscribers', 'column_4': 'portfolio', 'column_5': 'trades', 'column_6': 'yields'})

In [176]:
df = df.with_columns(pl.when(pl.col('portfolio') != [[]]).then(pl.col('portfolio')),
                     pl.when(pl.col('trades').list.len() > 0).then(pl.col('trades'))
                     )

In [177]:
# Убираем людей, у которых нет счета
df = df.filter(~pl.col('yields').str.contains('No acc') | pl.col('yields').is_null())

In [178]:
# Убираем аккаунты компаний
df = df.filter(~pl.col('yields').str.contains('Not invest') | pl.col('yields').is_null())

In [179]:
# Убираем скрытые профили
df = df.filter(~pl.col('n_subscribtions').is_null() & ~pl.col('n_subscribers').is_null())

In [180]:
# Убираем забагованные профили с активами до 1 рубля
df = df.filter(~pl.col('size').str.contains('до1₽'))

In [4]:
df = pl.read_parquet('Article 3 Data/Profiles/preprocessed.parquet')

In [197]:
df = df.filter(~pl.col('yields').str.contains('bugged'))

In [201]:
company_profiles = df.filter(pl.col('portfolio').is_null() & pl.col('trades').is_null() & pl.col('yields').is_null())['name'].to_list()

In [203]:
df.write_parquet('Article 3 Data/Profiles/preprocessed.parquet')

#### Преобразование доходностей к удобному виду

In [34]:
df = pl.read_parquet('Article 3 Data/Profiles/preprocessed.parquet')

In [35]:
df = df.with_columns(
    type = pl.col('portfolio').list.get(0),
    name = pl.col('portfolio').list.get(1),
    sector = pl.col('portfolio').list.get(2),
    currency = pl.col('portfolio').list.get(3),
).drop('portfolio')

In [36]:
def parse_data(row):
    try:
        data = eval(row)
        flat = {}
        for year, months in data.items():
            for month_dict in months:
                for month, value in month_dict.items():
                    col_name = f"{year}_{month}"
                    flat[col_name] = value
        return flat
    except:
        return {}
    
df = df.with_columns(
    pl.col('yields').map_elements(parse_data, return_dtype=pl.Struct)
).unnest("yields")

In [37]:
df.write_parquet('Article 3 Data/Profiles/preprocessed.parquet')

## Сбор всех комментов в один файл

In [1]:
import os
import gzip
import polars as pl
from tqdm import tqdm

In [2]:
def prep_df(df: pl.DataFrame):
    return df.with_columns(pl.col('tickers').cast(pl.List(str)), pl.col('prices').cast(pl.List(str)), pl.col('comments').cast(pl.List(pl.List(str))), pl.col('reactions').cast(pl.List(str)))

In [3]:
files = ['28apr', '28apr_error'] #os.listdir('Article 3 Data/posts')
for folder in [f for f in files if '.parquet' not in f]:
    print(folder)
    dfs = [prep_df(pl.read_parquet(f'Article 3 Data/posts/{folder}/{file}')) for file in tqdm(os.listdir(f'Article 3 Data/posts/{folder}'))]
    pl.concat(dfs, how='diagonal_relaxed').write_parquet(f'Article 3 Data/posts/{folder}.parquet')  

28apr


100%|██████████| 63903/63903 [07:24<00:00, 143.84it/s]


28apr_error


100%|██████████| 53/53 [00:00<00:00, 137.53it/s]


In [4]:
concat_files = [el + '.parquet' for el in files] # os.listdir("Article 3 Data/posts")
df = pl.concat([pl.read_parquet(f"Article 3 Data/posts/{el}") for el in concat_files if 'parquet' in el])
df.write_parquet('Article 3 Data/posts/gathered_28apr.parquet')

In [None]:
# df.write_parquet('Article 3 Data/posts/gathered.parquet')

### Работа с профилями

In [None]:
with gzip.open('all_profiles.gz', 'rb') as f:
    all_profiles = f.readlines()
    all_profiles = [p.decode().replace('\n', '') for p in all_profiles]

In [173]:
root = 'bugged_raw'
error_files = []
def error_handle(file):
    try:
        if file == 'muhametislamov.parquet':
            return pl.DataFrame([[None], [None]], strict=False)
            
        return pl.read_parquet(f'Article 3 Data/Profiles/{root}/{file}')
    except Exception:
        error_files.append(file)
        return pl.DataFrame([[None], [None]], strict=False)

# def prep_df(df: pl.DataFrame):
#     if df.dtypes[0] == pl.List(pl.List(pl.Null)):
#         df = df.with_columns(column_0 = [['']])
#     if df['column_0'].dtype == pl.List(pl.String):
#         df = df.with_columns(column_0 = [['Company']])
#     if df.dtypes[1] == pl.Null:
#         df = df.with_columns(column_1 = [''])
#     return df

dfs = [error_handle(file) for file in tqdm(os.listdir(f'Article 3 Data/Profiles/{root}'))]
df = pl.concat(dfs, how="diagonal_relaxed")
df = df.with_columns(pl.Series('name', [el.replace('.parquet', '') for el in os.listdir(f'Article 3 Data/Profiles/{root}')]))
# df.write_parquet('Article 3 Data/Profiles/gathered.parquet')

100%|██████████| 2360/2360 [00:01<00:00, 2256.20it/s]


In [191]:
drop_names = [el.replace('.parquet', '') for el in os.listdir(f'Article 3 Data/Profiles/error_small')]

## Получаем все ссылки на посты

In [1]:
import os
import gzip
import shutil
import requests
import xmltodict
import time

from tqdm import tqdm

In [2]:
root = 'all xmls/xmls_9may/'
# Преобразуем файл со всеми ссылками на сайтмапы
def prep_xml(gz_file_root: str):
    
    with gzip.open(gz_file_root, 'rb') as f_in:
        f_in.readline()
        with open(gz_file_root.replace('.gz', ''), 'wb') as f_out:
            f_out.write(b'<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">\n')
            shutil.copyfileobj(f_in, f_out)
    os.remove(gz_file_root)

In [3]:
prep_xml(f'{root}pulse.xml.gz')

In [4]:
# Вытаскиваем все ссылки на сайтмапы
def extract_xmlgz_links(pulse_xml_root: str):
    with open(pulse_xml_root, 'r')as file:
        data = xmltodict.parse(file.read())
    if 'sitemap' in pulse_xml_root:
        return [el['loc'] for el in data['urlset']['url']]
    return [el['loc'] for el in data['urlset']['sitemap']]

In [5]:
gz_links = extract_xmlgz_links(f'{root}pulse.xml')

In [6]:
# Загружаем все сайтмапы
def load_gz(link: str, save_root: str=root):
    name = link.split('/')[-1]
    file = requests.get(link, stream=True).content
    with open(save_root + name, 'wb') as f:
        f.write(file)

In [7]:
for link in tqdm(gz_links):
    load_gz(link)
    time.sleep(1)

100%|██████████| 115/115 [02:15<00:00,  1.18s/it]


In [8]:
gz_files = [f for f in os.listdir(root) if '.gz' in f]

In [9]:
# Обрабатываем полученные сайтмапы
for file in tqdm(gz_files):
    prep_xml(f'{root}{file}')

100%|██████████| 115/115 [00:04<00:00, 27.03it/s]


In [10]:
xml_files = [f for f in os.listdir(root) if '.xml' in f and 'sitemap' in f]

In [11]:
# Получаем ссылки для одного файла
def get_all_links(root):
    with gzip.open(root, 'r') as f:
        list_of_links = f.readlines()
    return [el.decode().replace('\n', '') for el in list_of_links]

# Достаем все ссылки, которые уже распарсили
def get_all_parsed_links(main_root):
    list_of_links = set()
    for root in os.listdir(main_root):
        with gzip.open(f'{main_root}/{root}', 'r') as f:
            list_of_links = list_of_links.union(set(f.readlines()))
    return [el.decode().replace('\n', '') for el in list_of_links]

In [12]:
parsed_links = get_all_parsed_links('post_links')
len(parsed_links)

5939598

In [43]:
with gzip.open('post_links/bad_links.gz', 'rb') as f:
    bad_links = f.readlines()
    bad_links = [p.decode().replace('\n', '') for p in bad_links]

with gzip.open('post_links/all_gathered_links_2may.gz', 'rb') as f:
    prev_links = f.readlines()
    prev_links = [p.decode().replace('\n', '') for p in prev_links]

In [45]:
new_links =  (set(parsed_links) - set(prev_links)) - set(bad_links)

In [47]:
new_links

{'РћС€РёР±РєР° РїСЂРё СЃРѕР·РґР°РЅРёРё С‚Р°Р±Р»РёС†С‹'}

In [None]:
with gzip.open('post_links/_links.gz', 'w') as f:
    for link in new_links:
        f.write(f'{link}\n'.encode())

## Обработка лог файлов

In [19]:
import gzip

In [1]:
with open('loggs/parsing_error.log') as file:
    loggs = file.readlines()

In [25]:
def get_error_links(loggs):
    return [el.split(' - ')[2] for el in loggs if 'ERROR' in el]

In [26]:
error_list_of_links = get_error_links(loggs)

In [20]:
with gzip.open('18mar_error_links.gz', 'w') as f:
    for link in error_list_of_links:
        f.write(f'{link}\n'.encode())

In [17]:
with gzip.open('all_links_new_error_error.gz', 'r') as f:
    error_list_of_links = f.readlines()

In [18]:
error_list_of_links

[b'https://www.tbank.ru/invest/social/profile/Maksim_averin/10557633-1c11-4ee9-afa1-7e4203782b15/\n',
 b'https://www.tbank.ru/invest/social/profile/Xobot_Slona/fc7d72e1-c1db-4b72-a6ef-cfc5b9eef885/\n',
 b'https://www.tbank.ru/invest/social/profile/mixa33e/42c4ad16-ed6a-4e4f-be25-78a8c60a40c8/\n',
 b'https://www.tbank.ru/invest/social/profile/pipes/ab70edcb-dcc9-42dc-ab93-4ce63228d587/\n']

## Получаем все профили

In [3]:
files = [f for f in os.listdir() if '.gz' in f]

In [23]:
all_profiles = []
for f in tqdm(files):
    with gzip.open(f, 'rb') as f:
        lst = f.readlines()
        lst = [l.split('/')[6] for l in list_of_links]
        all_profiles.extend(lst)

all_profiles = list(set(all_profiles))

100%|██████████| 5/5 [00:11<00:00,  2.34s/it]


In [25]:
len(all_profiles)

360918

In [29]:
with gzip.open('all_profiles.gz', 'w') as f:
    for link in all_profiles:
        f.write(f'{link}\n'.encode())

In [32]:
with gzip.open('all_profiles.gz', 'rb') as f:
    all_profiles = f.readlines()
    all_profiles = [p.decode().replace('\n', '') for p in all_profiles]

# Обработка логов для выделения комментариев закрытых профилей

In [15]:
with open('loggs/parsing_error.log', 'r') as f:
    loggs = f.readlines()

with open('loggs/parsing_error_error.log', 'r') as f:
    loggs1 = f.readlines()

In [16]:
loggs = set([log for log in loggs if 'NoneType' in log] + [log for log in loggs1 if 'NoneType' in log])

In [18]:
deleted_posts = set([log.split(' - ')[2] for log in loggs])

In [19]:
closed_links = set([f'https://www.tbank.ru/invest/social/profile/{a}/{i}/' for a, i in zip(df1['author'], df1['id'])])

In [21]:
bad_links = deleted_posts.union(closed_links)

In [25]:
import gzip
with gzip.open('bad_links.gz', 'w') as f:
    for link in bad_links:
        f.write(f'{link}\n'.encode())