In [1]:
import requests
import time
import json
import re
from bs4 import BeautifulSoup
from langchain.text_splitter import RecursiveCharacterTextSplitter
from urllib.parse import unquote, quote #для извлечения slug'ов из URL
from tqdm import tqdm #Для прогресс-бара


Для разбиения на чанки была использована предложенная бибилиотека: https://python.langchain.com/v0.1/docs/modules/data_connection/document_transformers/ 
Для разделения основного текста с максимальным сохранением смысла из каждого абзаца использовали RecursiveCharacterTextSplitter: https://python.langchain.com/v0.1/docs/modules/data_connection/document_transformers/recursive_text_splitter/


In [19]:

PAGE_LIMIT = 50 # Количество статей для обработки (мы парсили 68000)
OUTPUT_FILE = "starwars_dataset.json"  # Имя выходного файла
BASE_URL = "https://starwars.fandom.com/api.php"
HEADERS = {'User-Agent': 'FandomDataCollector/1.0'}

# Разбиение на чанки
CHUNK_SIZE = 1850 # Размер чанка в символах, в одном английском слове в среднем 5 символов
CHUNK_OVERLAP = 320  # Перекрытие чанков, примерно 10-15 процентов от общего объема чанка
TEXT_SPLITTER = RecursiveCharacterTextSplitter(
    chunk_size=CHUNK_SIZE,
    chunk_overlap=CHUNK_OVERLAP,
    separators=["\n\n", "\n", ". ", " ", ""]
)

In [7]:
def normalize_slug(slug):
    #Нормализация названий для использования в качестве ID
    slug = unquote(slug)  # Декодируем URL-спецсимволы
    slug = re.sub(r"#.*$", "", slug)  # Удаляем якоря
    slug = re.sub(r"[^\w\-]", "", slug)  # Удаляем спецсимволы
    return slug.replace(" ", "_").lower()  # Единый формат


In [9]:
def get_all_pages(limit=PAGE_LIMIT):
    #Получение списка страниц с возможностью ограничения количества
    all_pages = []
    apcontinue = None
    progress = tqdm(desc="Сбор списка статей", unit="page")
    
    while True:
        params = {
            "action": "query",
            "format": "json",
            "list": "allpages",
            "apnamespace": 0,
            "aplimit": "max",
            "apfilterredir": "nonredirects"  # Только не-перенаправления
        }
        if apcontinue:
            params["apcontinue"] = apcontinue

        response = requests.get(BASE_URL, headers=HEADERS, params=params)
        data = response.json()
        batch = data['query']['allpages']
        
        # Фильтрация странных статей
        valid_batch = [
            p for p in batch 
            if not p['title'].startswith(('"', "'", ":", "!")) 
            and len(p['title']) > 2
        ]
        
        # Применяем лимит
        if limit and len(all_pages) + len(valid_batch) > limit:
            all_pages.extend(valid_batch[:limit - len(all_pages)])
            progress.update(len(valid_batch[:limit - len(all_pages)]))
            break
        else:
            all_pages.extend(valid_batch)
            progress.update(len(valid_batch))

        if 'continue' in data and (not limit or len(all_pages) < limit):
            apcontinue = data['continue']['apcontinue']
            time.sleep(0.5)
        else:
            break
            
    progress.close()
    return all_pages[:limit] if limit else all_pages


In [11]:
def parse_page(title):
    #Полный парсинг страницы с несколькими уровнями отказоустойчивости
    params = {
        "action": "parse",
        "page": title,
        "format": "json"
    }
    
    try:
        # Запрос к API
        response = requests.get(BASE_URL, headers=HEADERS, params=params, timeout=15)
        
        # Проверка статуса ответа
        if response.status_code != 200:
            print(f"API вернул статус {response.status_code} для '{title}'")
            # Попробуем прямой запрос к HTML-странице (иной способ парсинга)
            return parse_page_direct(title)
        
        data = response.json()
        
        # Извлечение канонического URL
        canonical_url = data.get("parse", {}).get("canonicalurl", "")
        if not canonical_url:
            canonical_url = f"https://starwars.fandom.com/wiki/{quote(title.replace(' ', '_'), safe='')}"
        
        # Попытка получить HTML через API
        html = data.get("parse", {}).get("text", {}).get("*", None)
        
        # Если HTML не получен через API - переходим к прямому парсингу
        if not html:
            print(f" Не удалось получить HTML через API для '{title}'")
            return parse_page_direct(title, canonical_url)
        
        # Первичный парсинг BeautifulSoup
        soup = BeautifulSoup(html, "html.parser")
        content = soup.find("div", class_="mw-parser-output")
        
        # Если основной контент не найден - попробуем альтернативные методы
        if not content:
            print(f" Основной контент не найден для '{title}' через API")
            return parse_page_direct(title, canonical_url)
        
        # Очистка контента с улучшенной обработкой ошибок
        elements_to_remove = []
        for element in content.find_all(["table", "div", "span"]):
            # Улучшенная проверка элементов
            if not element or not hasattr(element, 'get'):
                continue
                
            classes = element.get("class", [])
            if not classes: 
                continue
                
            # Проверяем классы через множества для безопасности
            class_set = set(classes)
            if class_set & {"infobox", "navbox", "metadata", "toc", "portable-infobox"}:
                elements_to_remove.append(element)
        
        # Удаляем после цикла, чтобы не нарушать итерацию
        for element in elements_to_remove:
            element.decompose()
        
        # Извлечение текста и ссылок с резервными методами
        full_text = ""
        link_map = {}
        
        # Сначала попробуем стандартные элементы
        elements = content.find_all(["p", "h2", "h3", "h4", "ul", "ol"])
        
        # Если не найдено - попробуем более агрессивный подход
        if not elements:
            print(f"Стандартные элементы не найдены для '{title}', пробуем все содержимое")
            elements = [content]
        
        for element in elements:
            try:
                elem_text = ""
                # Используем get_text() как резервный метод
                elem_text = element.get_text(separator=" ", strip=True)
                
                # Дополнительно извлекаем ссылки
                for link in element.find_all("a", href=True):
                    href = link.get("href", "")
                    if "/wiki/" in href:
                        slug = href.split("/wiki/")[-1].split("#")[0]
                        normalized_slug = normalize_slug(slug)
                        anchor_text = link.get_text(strip=True)
                        if anchor_text:
                            link_map[anchor_text] = normalized_slug
                
                full_text += elem_text + "\n\n"
            except Exception as e:
                print(f" Ошибка при обработке элемента: {str(e)}")
                continue
        
        # Проверка минимального контента
        if len(full_text.strip()) < 50:
            print(f"Мало контента ({len(full_text)} симв.) для '{title}', пробуем прямой метод")
            return parse_page_direct(title, canonical_url)
        
        return full_text.strip(), link_map, canonical_url
        
    except Exception as e:
        print(f"\n Критическая ошибка при парсинге {title}: {str(e)}")
        # Последняя попытка - прямой парсинг
        return parse_page_direct(title)

def parse_page_direct(title, canonical_url=None):
    #Альтернативный метод парсинга через прямой запрос к странице
    try:
        if not canonical_url:
            canonical_url = f"https://starwars.fandom.com/wiki/{quote(title.replace(' ', '_'), safe='')}"
        
        print(f"Пробуем прямой парсинг для '{title}'")
        response = requests.get(canonical_url, headers=HEADERS, timeout=15)
        soup = BeautifulSoup(response.text, "html.parser")
        
        # Основной контент - попробуем разные варианты
        content = soup.find("div", class_="mw-parser-output")
        if not content:
            # Резервный поиск
            content = soup.find("div", {"id": "content"})
        
        if not content:
            print(f"Прямой парсинг не удался для '{title}'")
            return None, None, canonical_url
        
        # Упрощенная очистка
        for element in content.find_all(class_=["portable-infobox", "infobox", "navbox", "toc"]):
            element.decompose()
        
        # Извлечение текста
        full_text = content.get_text(separator="\n", strip=True)
        
        # Извлечение ссылок
        link_map = {}
        for link in content.find_all("a", href=True):
            href = link.get("href", "")
            if "/wiki/" in href:
                slug = href.split("/wiki/")[-1].split("#")[0]
                normalized_slug = normalize_slug(slug)
                anchor_text = link.get_text(strip=True)
                if anchor_text:
                    link_map[anchor_text] = normalized_slug
        
        return full_text.strip(), link_map, canonical_url
        
    except Exception as e:
        print(f"Ошибка при прямом парсинге '{title}': {str(e)}")
        return None, None, canonical_url

In [13]:
def extract_chunk_links(chunk_text, link_map):
    #Извлечение ссылок, релевантных конкретному чанку
    found_links = set()
    for anchor_text, slug in link_map.items():
        # Поиск с учетом границ слов для точного соответствия
        if re.search(rf"\b{re.escape(anchor_text)}\b", chunk_text, re.IGNORECASE):
            found_links.add(slug)
    return list(found_links)

In [15]:
def main():
    # Сбор и обработка данных
    print(f"Конфигурация: Лимит статей = {PAGE_LIMIT or 'Без ограничений'}")
    print(f"             Выходной файл = {OUTPUT_FILE}")
    print(f"             Размер чанка = {CHUNK_SIZE} симв.")
    
    pages = get_all_pages()
    dataset = []
    
    print(f"\nНачата обработка {len(pages)} статей...")
    progress_bar = tqdm(total=len(pages), desc="Обработка статей", unit="статья")
    
    for page in pages:
        title = page['title']
        doc_slug = normalize_slug(title)
        
        try:
            full_text, link_map, canonical_url = parse_page(title)
            if not full_text or not link_map or not canonical_url:
                progress_bar.update(1)
                continue
                
            # Разбиение на чанки
            chunks = TEXT_SPLITTER.split_text(full_text)
            
            # Формирование записей для каждого чанка
            for chunk_id, chunk_text in enumerate(chunks, start=1):
                chunk_links = extract_chunk_links(chunk_text, link_map)
                
                dataset.append({
                    "document_id": doc_slug,  # Одинаковый для всех чанков статьи
                    "chunk_id": chunk_id,
                    "title": title,
                    "chunk_text": chunk_text,
                    "outgoing_links": chunk_links,
                    "source": canonical_url,  # Одинаковый для всех чанков статьи
                    "comment": f"Всего чанков: {len(chunks)}"
                })
                
        except Exception as e:
            print(f"\nКритическая ошибка при обработке {title}: {str(e)}")
        
        progress_bar.update(1)
        time.sleep(0.2)  # Защита от блокировки
    
    progress_bar.close()
    
    # Сохранение в JSON
    with open(OUTPUT_FILE, 'w', encoding='utf-8') as f:
        json.dump(dataset, f, indent=2, ensure_ascii=False)
    
    # Отчет о выполнении
    total_articles = len(set(d['document_id'] for d in dataset))
    print(f"\n Готово! Обработано статей: {total_articles}")
    print(f" Создано чанков: {len(dataset)}")
    print(f" Средняя длина чанка: {sum(len(d['chunk_text']) for d in dataset)//max(1, len(dataset))} символ.")
    print(f" Данные сохранены в: {OUTPUT_FILE}")



In [17]:
main()

Конфигурация: Лимит статей = 50
             Выходной файл = starwars_dataset_checking.json
             Размер чанка = 1850 симв.


Сбор списка статей: 0page [00:00, ?page/s]



Начата обработка 50 статей...


Обработка статей: 100%|██████████| 50/50 [00:37<00:00,  1.33статья/s]


 Готово! Обработано статей: 50
 Создано чанков: 255
 Средняя длина чанка: 1319 символ.
 Данные сохранены в: starwars_dataset_checking.json



