In [None]:
import pandas as pd
import httpx
import asyncio
import time
from pathlib import Path
from bs4 import BeautifulSoup
from markdownify import markdownify as md
from tqdm.notebook import tqdm

import pyarrow as pa
import pyarrow.parquet as pq

In [None]:
# Настройки
DATA_PATH = Path('../data/raw/html')
DATA_PATH.mkdir(parents=True, exist_ok=True)
METADATA_PATH = '../data/metadata/arxiv_NLP_2025_metadata.csv'

In [None]:
# Загрузка данных
df = pd.read_csv(METADATA_PATH, usecols=['arxiv_id', 'title', 'html_url'])
print(f'Данные загружены. Всего {df.shape[0]} статей.')

ids = df['arxiv_id'].to_list()
ids = set(ids)

downloaded_ids = set()
for f in DATA_PATH.glob('*.html'):
    downloaded_ids.add(f.stem)

ids_to_download = df[~df['arxiv_id'].isin(downloaded_ids)]
print(f'Скачанных статей: {len(downloaded_ids)}. Осталось статей: {ids_to_download.shape[0]}')

In [None]:
SEMAPHORE = asyncio.Semaphore(10)

async def download_article(client, row, pbar):
    async with SEMAPHORE:
        try:
            response = await client.get(row.html_url)
            
            if response.status_code == 200:
                paper_id = row.arxiv_id
                filename = DATA_PATH / f'{paper_id}.html'
                with open(filename, 'w', encoding='utf-8') as f:
                    f.write(response.text)
            
        except Exception as e:
            print(e)
            pass
        finally:
            pbar.update(1)

In [None]:
async def main():
    limits = httpx.Limits(max_keepalive_connections=10, max_connections=20)
    
    async with httpx.AsyncClient(limits=limits, timeout=20.0, follow_redirects=True) as client:
        tasks = []
        
        pbar = tqdm(total=len(ids_to_download))
        
        for row in ids_to_download.itertuples():
            task = download_article(client, row, pbar)
            tasks.append(task)
        
        await asyncio.gather(*tasks)
        pbar.close()

await main()

downloaded_ids = set()
for f in DATA_PATH.glob('*.html'):
    downloaded_ids.add(f.stem)

ids_to_download = df[~df['arxiv_id'].isin(downloaded_ids)]
len_all = len(ids)
len_dwd = len(downloaded_ids)
not_dwd_pct = ((len_all - len_dwd) / len_all) * 100
print(f'Скачано {len_dwd} из {len_all} статей. Процент нескачанных статей: {not_dwd_pct:.2f}%')

In [None]:
SIZE_THRESHOLD = 20 * 1024

for file in DATA_PATH.glob('*.html'):
    if file.is_file():
        # Сначала получаем размер
        file_size = file.stat().st_size 
        
        if file_size < SIZE_THRESHOLD:
            try:
                file.unlink()
                print(f"Удален: {file.name} ({file_size} bytes)")
            except Exception as e:
                print(f"Ошибка при удалении {file.name}: {e}")

# Собираем Parquet для хранения в S3

In [1]:
import pyarrow as pa
import pyarrow.parquet as pq
from pathlib import Path
from tqdm.notebook import tqdm
import math

In [2]:
SOURCE_DIR = Path('../data/raw/html').resolve()
DEST_DIR = Path("../data/raw/parquet").resolve()
DEST_DIR.mkdir(parents=True, exist_ok=True)
SHARD_SIZE = 1000

html_files = list(SOURCE_DIR.glob("*.html"))
total_shards = math.ceil(len(html_files) / SHARD_SIZE)

In [3]:
def get_batches(files, size):
    """Генератор чанков из списка файлов"""
    for i in range(0, len(files), size):
        yield files[i:i + size]

In [4]:
for shard_id, batch in enumerate(tqdm(get_batches(html_files, SHARD_SIZE), desc="Packing", total=total_shards)):
    data = {
        "doc_id": [f.stem for f in batch],
        "html": [f.read_text(encoding="utf-8", errors="replace") for f in batch],
        "source_path": [str(f) for f in batch]
    }
    
    table = pa.Table.from_pydict(data)
    pq.write_table(
        table,
        DEST_DIR / f"shard_{shard_id:04d}.parquet",
        compression="zstd"
    )

Packing:   0%|          | 0/15 [00:00<?, ?it/s]