<a href="https://colab.research.google.com/github/CodeHunterOfficial/ABC_DataMining/blob/main/NM/Creating_a_new_norpus.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install python-docx langdetect PyPDF2 bs4 langdetect

Collecting python-docx
  Downloading python_docx-1.1.2-py3-none-any.whl.metadata (2.0 kB)
Collecting langdetect
  Downloading langdetect-1.0.9.tar.gz (981 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/981.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m235.5/981.5 kB[0m [31m6.8 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m972.8/981.5 kB[0m [31m17.0 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m981.5/981.5 kB[0m [31m12.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting PyPDF2
  Downloading pypdf2-3.0.1-py3-none-any.whl.metadata (6.8 kB)
Collecting bs4
  Downloading bs4-0.0.2-py2.py3-none-any.whl.metadata (411 bytes)
Downloading python_docx-1.1.2-py3-none-any.whl (244 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0

In [3]:
import os
import re
from docx import Document
import logging
from typing import List, Dict, Optional
import nltk  # Для разделения на предложения
from langdetect import detect  # Для определения языка
import json  # Для сохранения в JSON
import xml.etree.ElementTree as ET  # Для сохранения в XML
from PyPDF2 import PdfReader  # Для чтения PDF
from bs4 import BeautifulSoup  # Для парсинга HTML

# Настройка логирования
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")


class BookCorpusProcessor:
    def __init__(self, books_folder: str, output_base: str = "tajik_books"):
        """
        Инициализация класса.

        :param books_folder: Путь к папке с книгами.
        :param output_base: Базовое имя выходных файлов (без расширений).
        """
        self.books_folder = books_folder
        self.output_base = output_base
        self.processed_books: List[str] = []

    def clean_text(self, text: str, custom_patterns: Optional[List[str]] = None) -> str:
        """
        Очищает текст от лишних символов и номеров страниц.

        :param text: Исходный текст.
        :param custom_patterns: Список пользовательских регулярных выражений для очистки текста.
        :return: Очищенный текст.
        """
        patterns = [
            r"^\s*\d+\s*$",  # Удалить номера страниц
            r"[^\w\s\.,!?;:()«»“”'\"\\/-]",  # Удалить специальные символы
            r"\s+",  # Заменить множественные пробелы на один пробел
            r"^\s*$",  # Удалить пустые строки
            r"\t+",  # Удалить табуляции
            r"\.{2,}",  # Заменить многоточия на точку
            r"<[^>]*>",  # Удалить HTML-теги
        ]

        if custom_patterns:
            patterns.extend(custom_patterns)

        for pattern in patterns:
            text = re.sub(pattern, " ", text, flags=re.MULTILINE)

        return text.strip()

    def extract_metadata(self, filename: str) -> Dict[str, str]:
        """
        Извлекает метаданные (название и автора) из имени файла.

        :param filename: Имя файла.
        :return: Словарь с метаданными.
        """
        base_name = os.path.splitext(filename)[0]
        parts = base_name.split("_", 1)

        if len(parts) == 2:
            title, author = parts
            return {"title": title.strip(), "author": author.strip()}
        else:
            return {"title": base_name.strip(), "author": "Неизвестный"}

    def process_docx_file(self, file_path: str) -> str:
        """
        Обрабатывает один .docx файл, извлекает текст и метаданные.

        :param file_path: Путь к файлу.
        :return: Обработанный текст с метаданными или пустая строка при ошибке.
        """
        try:
            doc = Document(file_path)
            paragraphs = [paragraph.text for paragraph in doc.paragraphs]
            raw_text = "\n".join(paragraphs)
            cleaned_text = self.clean_text(raw_text)
            metadata = self.extract_metadata(os.path.basename(file_path))
            metadata_str = f"# Название: {metadata['title']}\n# Автор: {metadata['author']}\n# -----\n"
            return metadata_str + cleaned_text + "\n\n"
        except Exception as e:
            logging.error(f"Ошибка при обработке файла {file_path}: {e}")
            return ""

    def process_txt_file(self, file_path: str) -> str:
        """
        Обрабатывает один .txt файл, извлекает текст и метаданные.

        :param file_path: Путь к файлу.
        :return: Обработанный текст с метаданными или пустая строка при ошибке.
        """
        try:
            with open(file_path, "r", encoding="utf-8") as file:
                raw_text = file.read()
            cleaned_text = self.clean_text(raw_text)
            metadata = self.extract_metadata(os.path.basename(file_path))
            metadata_str = f"# Название: {metadata['title']}\n# Автор: {metadata['author']}\n# -----\n"
            return metadata_str + cleaned_text + "\n\n"
        except Exception as e:
            logging.error(f"Ошибка при обработке файла {file_path}: {e}")
            return ""

    def process_pdf_file(self, file_path: str) -> str:
        """
        Обрабатывает один .pdf файл, извлекает текст и метаданные.

        :param file_path: Путь к файлу.
        :return: Обработанный текст с метаданными или пустая строка при ошибке.
        """
        try:
            reader = PdfReader(file_path)
            raw_text = "\n".join([page.extract_text() for page in reader.pages])
            cleaned_text = self.clean_text(raw_text)
            metadata = self.extract_metadata(os.path.basename(file_path))
            metadata_str = f"# Название: {metadata['title']}\n# Автор: {metadata['author']}\n# -----\n"
            return metadata_str + cleaned_text + "\n\n"
        except Exception as e:
            logging.error(f"Ошибка при обработке файла {file_path}: {e}")
            return ""

    def process_html_file(self, file_path: str) -> str:
        """
        Обрабатывает один .html файл, извлекает текст и метаданные.

        :param file_path: Путь к файлу.
        :return: Обработанный текст с метаданными или пустая строка при ошибке.
        """
        try:
            with open(file_path, "r", encoding="utf-8") as file:
                soup = BeautifulSoup(file.read(), 'html.parser')

            # Удаляем скрипты и стили
            for script_or_style in soup(["script", "style"]):
                script_or_style.decompose()

            raw_text = soup.get_text(separator="\n")
            cleaned_text = self.clean_text(raw_text)
            metadata = self.extract_metadata(os.path.basename(file_path))
            metadata_str = f"# Название: {metadata['title']}\n# Автор: {metadata['author']}\n# -----\n"
            return metadata_str + cleaned_text + "\n\n"
        except Exception as e:
            logging.error(f"Ошибка при обработке файла {file_path}: {e}")
            return ""

    def process_all_books(self):
        """
        Обрабатывает все файлы в указанной папке и сохраняет результат.
        """
        if not os.path.exists(self.books_folder):
            logging.error("Указанный путь не существует.")
            return

        all_books = []
        for filename in os.listdir(self.books_folder):
            file_path = os.path.join(self.books_folder, filename)

            if filename.endswith(".docx"):
                processed_text = self.process_docx_file(file_path)
            elif filename.endswith(".txt"):
                processed_text = self.process_txt_file(file_path)
            elif filename.endswith(".pdf"):
                processed_text = self.process_pdf_file(file_path)
            elif filename.endswith(".html"):
                processed_text = self.process_html_file(file_path)
            else:
                logging.warning(f"Файл {filename} пропущен из-за неподдерживаемого формата.")
                continue

            if processed_text:
                all_books.append(processed_text)
                self.processed_books.append(filename)
                logging.info(f"Обработан файл: {filename}")
            else:
                logging.warning(f"Файл {filename} пропущен из-за ошибки.")

        # Сохранение в TXT
        with open(f"{self.output_base}.txt", "w", encoding="utf-8") as txt_file:
            txt_file.write("\n".join(all_books))

        # Сохранение в JSON
        json_data = []
        for book in all_books:
            title, rest = book.split("\n", 1)
            author, text = rest.split("# -----\n", 1)
            json_data.append({
                "title": title.split(":")[1].strip(),
                "author": author.split(":")[1].strip(),
                "text": text.strip()
            })
        with open(f"{self.output_base}.json", "w", encoding="utf-8") as json_file:
            json.dump(json_data, json_file, ensure_ascii=False, indent=4)

        # Сохранение в XML
        root = ET.Element("books")
        for book in all_books:
            title, rest = book.split("\n", 1)
            author, text = rest.split("# -----\n", 1)
            book_elem = ET.SubElement(root, "book")
            ET.SubElement(book_elem, "title").text = title.split(":")[1].strip()
            ET.SubElement(book_elem, "author").text = author.split(":")[1].strip()
            ET.SubElement(book_elem, "text").text = text.strip()

        tree = ET.ElementTree(root)
        tree.write(f"{self.output_base}.xml", encoding="utf-8", xml_declaration=True)

        logging.info(f"Все книги успешно объединены в {self.output_base}.txt, {self.output_base}.json и {self.output_base}.xml.")
        logging.info(f"Обработано книг: {len(self.processed_books)}.")

    def split_into_sentences(self, text: str) -> str:
        """
        Разделяет текст на предложения.

        :param text: Исходный текст.
        :return: Текст, разделенный на предложения.
        """
        try:
            nltk.download('punkt', quiet=True)
            sentences = nltk.sent_tokenize(text, language="russian")
            return "\n".join(sentences)
        except Exception as e:
            logging.warning(f"Ошибка при разделении текста на предложения: {e}")
            return text

    def detect_language(self, text: str) -> str:
        """
        Определяет язык текста.

        :param text: Исходный текст.
        :return: Язык текста (например, 'ru', 'tg', 'en').
        """
        try:
            return detect(text[:1000])  # Проверяем первые 1000 символов
        except:
            return "unknown"


# Пример использования класса
if __name__ == "__main__":
    # Путь к папке с книгами
    books_folder = input("Введите путь к папке с книгами: ").strip()

    # Создание экземпляра класса
    processor = BookCorpusProcessor(books_folder, output_base="tajik_books")

    # Обработка всех книг
    processor.process_all_books()

Введите путь к папке с книгами: /content/




In [9]:
import os
import logging
import json
from typing import List, Dict
import nltk
from langdetect import detect
from transformers import pipeline
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
import xml.etree.ElementTree as ET

# Настройка логирования
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

# Скачиваем необходимые ресурсы NLTK
nltk.download('punkt', quiet=True)
nltk.download('stopwords', quiet=True)
nltk.download('wordnet', quiet=True)
nltk.download('punkt_tab')

class CleanCorpusProcessor:
    def __init__(self, input_file: str, output_base: str = "clean_corpus"):
        """
        Инициализация класса.
        :param input_file: Путь к файлу (JSON, TXT или XML), созданному BookCorpusProcessor.
        :param output_base: Базовое имя выходных файлов.
        """
        self.input_file = input_file
        self.output_base = output_base
        self.stop_words = set(stopwords.words('russian') + stopwords.words('english'))
        self.spell_correction_model = pipeline("text2text-generation", model="google/flan-t5-large")
        self.lemmatizer = WordNetLemmatizer()
        self.processed_books: List[Dict] = []

    def load_data(self) -> List[Dict]:
        """
        Загружает данные из файла (JSON, TXT или XML).
        """
        if not os.path.exists(self.input_file):
            logging.error(f"Файл {self.input_file} не найден.")
            return []

        _, ext = os.path.splitext(self.input_file)
        if ext == ".json":
            return self.load_from_json()
        elif ext == ".txt":
            return self.load_from_txt()
        elif ext == ".xml":
            return self.load_from_xml()
        else:
            logging.error(f"Неподдерживаемый формат файла: {ext}")
            return []

    def load_from_json(self) -> List[Dict]:
        """
        Загружает данные из JSON-файла.
        """
        with open(self.input_file, "r", encoding="utf-8") as file:
            data = json.load(file)
        return data

    def load_from_txt(self) -> List[Dict]:
        """
        Загружает данные из TXT-файла.
        """
        books = []
        with open(self.input_file, "r", encoding="utf-8") as file:
            content = file.read().strip().split("\n\n")
        for book in content:
            try:
                title, rest = book.split("\n", 1)
                author, text = rest.split("# -----\n", 1)
                books.append({
                    "title": title.split(":")[1].strip(),
                    "author": author.split(":")[1].strip(),
                    "text": text.strip()
                })
            except Exception as e:
                logging.warning(f"Ошибка при чтении книги из TXT: {e}")
        return books

    def load_from_xml(self) -> List[Dict]:
        """
        Загружает данные из XML-файла.
        """
        books = []
        tree = ET.parse(self.input_file)
        root = tree.getroot()
        for book_elem in root.findall("book"):
            title = book_elem.find("title").text.strip() if book_elem.find("title") is not None else "Unknown Title"
            author = book_elem.find("author").text.strip() if book_elem.find("author") is not None else "Unknown Author"
            text = book_elem.find("text").text.strip() if book_elem.find("text") is not None else ""
            books.append({"title": title, "author": author, "text": text})
        return books

    def correct_spelling(self, text: str) -> str:
        """
        Исправляет опечатки в тексте.
        """
        try:
            sentences = nltk.sent_tokenize(text)
            corrected_sentences = [self.spell_correction_model(sentence)[0]['generated_text'] for sentence in sentences]
            return " ".join(corrected_sentences)
        except Exception as e:
            logging.warning(f"Ошибка при исправлении опечаток: {e}")
            return text

    def lemmatize_text(self, text: str) -> str:
        """
        Лемматизирует текст.
        """
        tokens = word_tokenize(text.lower())
        lemmatized_tokens = [self.lemmatizer.lemmatize(token) for token in tokens]
        return " ".join(lemmatized_tokens)

    def remove_stopwords(self, text: str) -> str:
        """
        Удаляет стоп-слова из текста.
        """
        tokens = word_tokenize(text.lower())
        filtered_tokens = [token for token in tokens if token not in self.stop_words]
        return " ".join(filtered_tokens)

    def process_book(self, book: Dict) -> Dict:
        """
        Обрабатывает одну книгу: исправляет опечатки, лемматизирует и удаляет стоп-слова.
        """
        title = book.get("title", "Unknown Title")
        author = book.get("author", "Unknown Author")
        raw_text = book.get("text", "")

        # Исправление опечаток
        corrected_text = self.correct_spelling(raw_text)

        # Лемматизация
        lemmatized_text = self.lemmatize_text(corrected_text)

        # Удаление стоп-слов
        clean_text = self.remove_stopwords(lemmatized_text)

        return {
            "title": title,
            "author": author,
            "language": detect(raw_text[:1000]) if raw_text else "unknown",
            "text": clean_text
        }

    def process_all_books(self):
        """
        Обрабатывает все книги из входного файла и сохраняет результат.
        """
        books_data = self.load_data()
        if not books_data:
            logging.error("Нет данных для обработки.")
            return

        for book in books_data:
            processed_book = self.process_book(book)
            if processed_book:
                self.processed_books.append(processed_book)
                logging.info(f"Обработана книга: {processed_book['title']}")

        # Сохранение в JSON
        with open(f"{self.output_base}.json", "w", encoding="utf-8") as json_file:
            json.dump(self.processed_books, json_file, ensure_ascii=False, indent=4)

        # Сохранение в XML
        root = ET.Element("books")
        for book in self.processed_books:
            book_elem = ET.SubElement(root, "book")
            ET.SubElement(book_elem, "title").text = book["title"]
            ET.SubElement(book_elem, "author").text = book["author"]
            ET.SubElement(book_elem, "language").text = book["language"]
            ET.SubElement(book_elem, "text").text = book["text"]
        tree = ET.ElementTree(root)
        tree.write(f"{self.output_base}.xml", encoding="utf-8", xml_declaration=True)

        logging.info(f"Все книги успешно обработаны и сохранены в {self.output_base}.json и {self.output_base}.xml.")
        logging.info(f"Обработано книг: {len(self.processed_books)}.")

# Пример использования класса
if __name__ == "__main__":
    input_file = input("Введите путь к файлу (JSON, TXT или XML): ").strip()
    processor = CleanCorpusProcessor(input_file, output_base="clean_corpus")
    processor.process_all_books()

[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.


Введите путь к файлу (JSON, TXT или XML): /content/tajik_books.txt


Device set to use cpu


In [None]:
!pip install pytube tweepy readability-lxml lxml_html_clean

Collecting lxml_html_clean
  Downloading lxml_html_clean-0.4.1-py3-none-any.whl.metadata (2.4 kB)
Downloading lxml_html_clean-0.4.1-py3-none-any.whl (14 kB)
Installing collected packages: lxml_html_clean
Successfully installed lxml_html_clean-0.4.1


In [None]:
import os
import re
import logging
from typing import List, Dict, Optional
import json
import requests
from bs4 import BeautifulSoup, Tag
from readability import Document  # Для извлечения основного текста
from pytube import YouTube  # Для работы с YouTube
import tweepy  # Для работы с Twitter
import xml.etree.ElementTree as ET  # Для создания XML

# Настройка логирования
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

class NewsCorpusProcessor:
    def __init__(self, output_base: str = "news_corpus"):
        """
        Инициализация класса.

        :param output_base: Базовое имя выходных файлов (без расширений).
        """
        self.output_base = output_base
        self.processed_items: List[Dict] = []

    def clean_text(self, text: str, custom_patterns: Optional[List[str]] = None) -> str:
        """
        Очищает текст от HTML-тегов и лишних символов.
        :param text: Исходный текст.
        :param custom_patterns: Список пользовательских регулярных выражений для очистки текста.
        :return: Очищенный текст.
        """
        # Удаление HTML-тегов с помощью BeautifulSoup
        soup = BeautifulSoup(text, 'html.parser')
        plain_text = soup.get_text(separator=" ")

        # Определение базовых шаблонов для очистки текста
        patterns = [
            r"^\s*\d+\s*$",  # Удалить номера страниц
            r"[^\w\s\.,!?;:()«»“”'\"\\/-]",  # Удалить специальные символы
            r"\s+",  # Заменить множественные пробелы на один пробел
            r"^\s*$",  # Удалить пустые строки
            r"\t+",  # Удалить табуляции
            r"\.{2,}",  # Заменить многоточия на точку
        ]
        if custom_patterns:
            patterns.extend(custom_patterns)

        # Применение шаблонов для очистки текста
        for pattern in patterns:
            plain_text = re.sub(pattern, " ", plain_text, flags=re.MULTILINE)

        return plain_text.strip()

    def clean_content(self, data: Dict) -> Dict:
        """
        Очищает содержимое словаря с ключами 'title', 'author' и 'content'.
        :param data: Словарь с данными.
        :return: Очищенный словарь.
        """
        cleaned_data = {
            "title": self.clean_text(data.get("title", "")),
            "author": self.clean_text(data.get("author", "")),
            "content": self.clean_text(data.get("content", ""))
        }
        return cleaned_data

    def extract_web_content(self, url: str) -> Dict:
        """
        Извлекает заголовок, автора и основной текстовый контент из веб-страницы.
        :param url: URL веб-страницы.
        :return: Словарь с заголовком, автором и текстом.
        """
        try:
            response = requests.get(url)
            response.raise_for_status()

            # Парсим HTML с помощью BeautifulSoup
            soup = BeautifulSoup(response.text, 'html.parser')

            # Извлекаем заголовок
            title = soup.title.string.strip() if soup.title else "No Title"

            # Извлекаем автора (если указан)
            author_tag = soup.find("meta", attrs={"name": "author"})
            author = author_tag["content"].strip() if author_tag else "Unknown Author"

            # Используем Document для извлечения основного текста
            doc = Document(response.text)
            raw_text = doc.summary()  # Получаем основной текст статьи

            # Создаем словарь с сырыми данными
            raw_data = {
                "title": title,
                "author": author,
                "content": raw_text
            }

            # Чистим содержимое
            cleaned_data = self.clean_content(raw_data)

            return cleaned_data

        except Exception as e:
            logging.error(f"Ошибка при извлечении контента из {url}: {e}")
            return {"title": "Error", "author": "Unknown", "content": ""}

    def extract_youtube_transcript(self, video_url: str) -> str:
        """
        Извлекает транскрипт видео с YouTube.

        :param video_url: URL видео на YouTube.
        :return: Транскрипт видео.
        """
        try:
            yt = YouTube(video_url)
            transcript = yt.captions.get_by_language_code('ru')  # Получаем русский транскрипт
            if transcript:
                raw_text = transcript.generate_srt_captions()
                cleaned_text = self.clean_text(raw_text, custom_patterns=[r"\d+\n\d+:\d+:\d+,?\d* --> \d+:\d+:\d+,?\d*"])
                return cleaned_text
            else:
                logging.warning(f"Транскрипт для {video_url} не найден.")
                return ""
        except Exception as e:
            logging.error(f"Ошибка при извлечении транскрипта из {video_url}: {e}")
            return ""

    def extract_twitter_posts(self, username: str, count: int = 10) -> List[str]:
        """
        Извлекает последние твиты пользователя.

        :param username: Имя пользователя Twitter.
        :param count: Количество твитов для извлечения.
        :return: Список твитов.
        """
        try:
            client = tweepy.Client(bearer_token="YOUR_TWITTER_BEARER_TOKEN")
            user = client.get_user(username=username)
            tweets = client.get_users_tweets(id=user.data.id, max_results=count, tweet_fields=["text"])
            cleaned_tweets = [self.clean_text(tweet['text']) for tweet in tweets.data]
            return cleaned_tweets
        except Exception as e:
            logging.error(f"Ошибка при извлечении твитов пользователя {username}: {e}")
            return []

    def extract_meta_posts(self, page_id: str, access_token: str, count: int = 10) -> List[Dict]:
        """
        Извлекает последние посты со страницы Facebook или Instagram через Meta Graph API.

        :param page_id: ID страницы Facebook или Instagram.
        :param access_token: Токен доступа Meta.
        :param count: Количество постов для извлечения.
        :return: Список постов.
        """
        try:
            url = f"https://graph.facebook.com/{page_id}/posts?fields=message&limit={count}&access_token={access_token}"
            response = requests.get(url)
            response.raise_for_status()
            data = response.json().get("data", [])
            posts = [{"message": post.get("message", "")} for post in data]
            cleaned_posts = [{"message": self.clean_text(post["message"])} for post in posts]
            return cleaned_posts
        except Exception as e:
            logging.error(f"Ошибка при извлечении постов из Meta для страницы {page_id}: {e}")
            return []

    def extract_vk_posts(self, group_id: str, count: int = 10) -> List[Dict]:
        """
        Извлекает последние посты из группы ВКонтакте через VK API.

        :param group_id: ID группы ВКонтакте.
        :param count: Количество постов для извлечения.
        :return: Список постов.
        """
        try:
            vk_api_url = "https://api.vk.com/method/wall.get"
            params = {
                "owner_id": f"-{group_id}",  # Минус перед ID для групп
                "count": count,
                "access_token": "YOUR_VK_ACCESS_TOKEN",
                "v": "5.131"
            }
            response = requests.get(vk_api_url, params=params)
            response.raise_for_status()
            data = response.json().get("response", {}).get("items", [])
            posts = [{"text": post.get("text", "")} for post in data]
            cleaned_posts = [{"text": self.clean_text(post["text"])} for post in posts]
            return cleaned_posts
        except Exception as e:
            logging.error(f"Ошибка при извлечении постов из ВКонтакте для группы {group_id}: {e}")
            return []

    def process_all_sources(self, sources: List[Dict]):
        """
        Обрабатывает все источники данных (сайты, YouTube, Twitter, Meta, VK).

        :param sources: Список источников данных.
        """
        all_data = []
        for source in sources:
            if source["type"] == "web":
                content = self.extract_web_content(source["url"])
                if content:
                    all_data.append({"source": "web", "url": source["url"], "content": content})
            elif source["type"] == "youtube":
                transcript = self.extract_youtube_transcript(source["url"])
                if transcript:
                    all_data.append({"source": "youtube", "url": source["url"], "content": transcript})
            elif source["type"] == "twitter":
                tweets = self.extract_twitter_posts(source["username"], count=source.get("count", 10))
                if tweets:
                    all_data.extend([{"source": "twitter", "username": source["username"], "content": tweet} for tweet in tweets])
            elif source["type"] == "meta":
                posts = self.extract_meta_posts(
                    page_id=source["page_id"],
                    access_token=source["access_token"],
                    count=source.get("count", 10)
                )
                if posts:
                    all_data.extend([{"source": "meta", "page_id": source["page_id"], "content": post["message"]} for post in posts])
            elif source["type"] == "vk":
                posts = self.extract_vk_posts(group_id=source["group_id"], count=source.get("count", 10))
                if posts:
                    all_data.extend([{"source": "vk", "group_id": source["group_id"], "content": post["text"]} for post in posts])

        # Сохранение в JSON
        self.save_to_json(all_data)

        # Сохранение в TXT
        self.save_to_txt(all_data)

        # Сохранение в XML
        self.save_to_xml(all_data)

        logging.info(f"Все данные успешно сохранены в {self.output_base}.json, {self.output_base}.txt и {self.output_base}.xml.")
        logging.info(f"Обработано источников: {len(all_data)}.")

    def save_to_json(self, data: List[Dict]):
        """
        Сохраняет данные в JSON-файл.

        :param data: Список словарей с данными.
        """
        with open(f"{self.output_base}.json", "w", encoding="utf-8") as json_file:
            json.dump(data, json_file, ensure_ascii=False, indent=4)

    def save_to_txt(self, data: List[Dict]):
        with open(f"{self.output_base}.txt", "w", encoding="utf-8") as txt_file:
            for item in data:
                txt_file.write(f"Title: {item.get('title', 'N/A')}\n")
                txt_file.write(f"Author: {item.get('author', 'N/A')}\n")
                txt_file.write(f"Content: {item.get('content', '')}\n\n")


    def save_to_xml(self, data: List[Dict], default_title="N/A", default_author="N/A", default_content=""):
        # Validate input
        if not isinstance(data, list) or not all(isinstance(item, dict) for item in data):
            raise ValueError("Invalid input: 'data' must be a list of dictionaries.")

        if not hasattr(self, "output_base") or not self.output_base:
            raise ValueError("Output base filename is not set.")

        # Initialize XML structure
        root = ET.Element("news_corpus")
        for item in data:
            entry = ET.SubElement(root, "entry")

            # Handle title, author, and content
            ET.SubElement(entry, "title").text = str(item.get("title", default_title))

            if isinstance(item.get("author"), dict):  # Handle nested author dictionary
                author_elem = ET.SubElement(entry, "author")
                for key, value in item["author"].items():
                    ET.SubElement(author_elem, key).text = str(value)
            else:
                ET.SubElement(entry, "author").text = str(item.get("author", default_author))

            ET.SubElement(entry, "content").text = str(item.get("content", default_content))

            # Dynamically add other fields
            for key, value in item.items():
                if key not in ["title", "author", "content"]:
                    ET.SubElement(entry, key).text = str(value)

        # Save the XML file
        try:
            tree = ET.ElementTree(root)
            tree.write(f"{self.output_base}.xml", encoding="utf-8", xml_declaration=True)
        except Exception as e:
            print(f"Error saving XML: {e}")

# Пример использования класса
if __name__ == "__main__":
    # Создание экземпляра класса
    processor = NewsCorpusProcessor(output_base="social_media_news")

   # Список источников данных
    sources = [
        {"type": "web", "url": "https://en.wikipedia.org/wiki/Web_scraping"}
    ]

    # Обработка всех источников
    processor.process_all_sources(sources)



```
   # Список источников данных
    sources = [
        {"type": "web", "url": "https://example.com/article1"},
        {"type": "youtube", "url": "https://www.youtube.com/watch?v=VIDEO_ID"},
        {"type": "twitter", "username": "twitter_username", "count": 5},
        {"type": "meta", "page_id": "PAGE_ID", "access_token": "META_ACCESS_TOKEN", "count": 5},
        {"type": "vk", "group_id": "GROUP_ID", "count": 5}
    ]

```

