In [1]:
import gdown
import zipfile
import os
import dask.dataframe as dd

import cudf.pandas
cudf.pandas.install()

import pandas as pd
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed
from language_tool_python import LanguageTool
import threading
import queue
import subprocess

In [1]:
# Параметры
ZIP_FILE_URL = 'https://drive.google.com/uc?id=15Hm6qXnbKzG63yJEjxgfVQ4ojwy1QgNt'
ZIP_FILE_PATH = 'wbdirs.zip'
EXTRACT_FOLDER_PATH = 'wbdirs/'
OUTPUT_CSV_FILE = 'corrected_wildberries_reviews.csv'
NUM_TOOLS = 6  # Количество экземпляров LanguageTool
MAX_CHECK_THREADS = 12  # Количество потоков для каждого экземпляра LanguageTool

# Загрузка zip-файла
if not os.path.exists(ZIP_FILE_PATH):
    gdown.download(ZIP_FILE_URL, ZIP_FILE_PATH, quiet=False)

# Распаковка zip-файла
with zipfile.ZipFile(ZIP_FILE_PATH, 'r') as zip_ref:
    zip_ref.extractall(EXTRACT_FOLDER_PATH)

def install_spacy_model_if_not_exists(model_name):
    try:
        result = subprocess.run(
            ['python', '-m', 'spacy', 'info'],
            capture_output=True, text=True, check=True
        )
        if model_name not in result.stdout:
            subprocess.run(['python', '-m', 'spacy', 'download', model_name], check=True)
    except subprocess.CalledProcessError as e:
        print(f"Ошибка при выполнении команды: {e}")

install_spacy_model_if_not_exists('ru_core_news_lg')

def setup_languagetool():
    process = subprocess.Popen(
        ['find', '/', '-name', 'languagetool-server.jar'],
        stdout=subprocess.PIPE,
        stderr=subprocess.DEVNULL,
        text=True
    )
    output, _ = process.communicate()
    if not os.path.isdir(output):
        os.system('apt update && apt install -y default-jre wget unzip')
        os.system('wget https://languagetool.org/download/LanguageTool-stable.zip && unzip -o LanguageTool-stable.zip')

temp = LanguageTool('ru-RU', config={'maxSpellingSuggestions': 1})
temp.close()

def create_tool(tools_queue, pbar):
    tool = LanguageTool('ru-RU', config={'maxSpellingSuggestions': 1, 'maxCheckThreads': MAX_CHECK_THREADS})
    tools_queue.put(tool)
    pbar.update(1)

def correct_text(text, tool, pbar):
    if not isinstance(text, str):
        pbar.update(1)
        return text
    try:
        corrected_text = tool.correct(text)
        pbar.update(1)
        return corrected_text if corrected_text != text else text
    except Exception as e:
        print(f"Ошибка исправления текста: {e}")
        pbar.update(1)
        return text

def process_chunk(chunk, tools, chunk_idx, num_tools, results_queue, pbar):
    corrected_chunk = chunk.copy()
    tool = tools[chunk_idx % num_tools]
    corrected_chunk['corrected_text'] = chunk['review_full_text'].apply(lambda x: correct_text(x, tool, pbar))
    results_queue.put(corrected_chunk)

def process_data(ddf, num_tools, output_file):
    tools_queue = queue.Queue()
    threads = []
    with tqdm(total=num_tools, desc="Создание экземпляров LanguageTool") as pbar:
        for _ in range(num_tools):
            thread = threading.Thread(target=create_tool, args=(tools_queue, pbar))
            thread.start()
            threads.append(thread)
        for thread in threads:
            thread.join()

    tools = [tools_queue.get() for _ in range(num_tools)]

    num_rows = len(ddf)
    chunk_size = num_rows // num_tools

    results_queue = queue.Queue()
    futures = []
    with ThreadPoolExecutor(max_workers=num_tools) as executor:
        with tqdm(total=num_rows, desc="Обработка текста") as pbar:
            for i in range(num_tools):
                chunk = ddf.partitions[i].compute()
                futures.append(executor.submit(process_chunk, chunk, tools, i, num_tools, results_queue, pbar))

            for future in as_completed(futures):
                pass

    # Объединение результатов
    corrected_df = pd.concat([results_queue.get() for _ in range(num_tools)], ignore_index=True)

    # Преобразование обратно в cuDF DataFrame перед сохранением
    corrected_df = cudf.DataFrame.from_pandas(corrected_df)

    corrected_df.to_csv(output_file, index=False)
    print(f"Обработка завершена для {num_rows} строк и {num_tools} экземпляров LanguageTool.")
    print(f"Результаты сохранены в {output_file}")

# Чтение всех CSV файлов из распакованных папок и объединение их в один Dask DataFrame
ddf = dd.read_csv(os.path.join(EXTRACT_FOLDER_PATH, '**', '*.csv'), assume_missing=True)

# Запуск обработки для всех строк в файле
process_data(ddf, num_tools=NUM_TOOLS, output_file=OUTPUT_CSV_FILE)

# Проверка результатов
t_df = cudf.read_csv(OUTPUT_CSV_FILE)
print(t_df[(t_df["review_full_text"] != t_df["corrected_text"])])


Downloading...
From (original): https://drive.google.com/uc?id=15Hm6qXnbKzG63yJEjxgfVQ4ojwy1QgNt
From (redirected): https://drive.google.com/uc?id=15Hm6qXnbKzG63yJEjxgfVQ4ojwy1QgNt&confirm=t&uuid=6cf20f71-58cb-4008-883f-1aff1c84e4ea
To: /workspace/wbdirs.zip
100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 161M/161M [00:03<00:00, 51.8MB/s]
Downloading LanguageTool 6.4: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 246M/246M [00:08<00:00, 28.5MB/s]
Unzipping /tmp/tmpm02m3vol.zip to /root/.cache/language_tool_python.
Downloaded https://www.languagetool.org/download/LanguageTool-6.4.zip to /root/.cache/language_tool_python.
Создание экземпляров LanguageTool: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████

Ошибка исправления текста: Command '['/usr/bin/java', '-version']' returned non-zero exit status 143.
Ошибка исправления текста: 
Ошибка исправления текста: 'NoneType' object has no attribute 'terminate'
Ошибка исправления текста: 'NoneType' object has no attribute 'terminate'
Ошибка исправления текста: 'NoneType' object has no attribute 'terminate'
Ошибка исправления текста: 'NoneType' object has no attribute 'terminate'
Ошибка исправления текста: 'NoneType' object has no attribute 'terminate'
Ошибка исправления текста: 'NoneType' object has no attribute 'terminate'
Ошибка исправления текста: 'NoneType' object has no attribute 'terminate'
Ошибка исправления текста: 'NoneType' object has no attribute 'terminate'
Ошибка исправления текста: 'NoneType' object has no attribute 'terminate'
Ошибка исправления текста: 'NoneType' object has no attribute 'terminate'
Ошибка исправления текста: 'NoneType' object has no attribute 'terminate'
Ошибка исправления текста: 'NoneType' object has no attr