## Util: Text cleaner

This notebook helps to clean the text data for further analysis. It uses chuking and LLM to clean the data.

1. Remove irrelevant content such as tables of contents, headers, footers, and page numbers;
2. Remove meaningless isolated numbers;
3. Retain only complete Chinese, French or English sentences, each on a new line, ending with a period;
4. Do not insert line breaks in the middle of sentences.

In [None]:
import openai
import os
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed

class TextCleaner:
    def __init__(self,
                 base_url="https://api.siliconflow.cn/v1",
                 model_name="Pro/deepseek-ai/DeepSeek-V3",
                 max_chars_per_request=8000, batch_size=50):
        self.client = openai.OpenAI(
            api_key=os.environ.get("DEEPSEEK_API_KEY"),
            base_url=base_url
        )
        self.model_name = model_name
        self.max_chars_per_request = max_chars_per_request
        self.batch_size = batch_size

    def _split_text(self, text):
        """Ë∂ÖÈïøÊñáÊú¨ÂàáÁâá"""
        return [text[i:i+self.max_chars_per_request] for i in range(0, len(text), self.max_chars_per_request)]

    def _stream_clean_chunk(self, text_chunk):
        """Stream clean a single small text chunk"""
        prompt = f"""Please clean the following text according to these rules:
    1. Remove irrelevant content such as tables of contents, headers, footers, and page numbers;
    2. Remove meaningless isolated numbers;
    3. Retain only complete Chinese, French or English sentences, each on a new line, ending with a period;
    4. Do not insert line breaks in the middle of sentences.

    Here is the raw text to be cleaned:
    {text_chunk}
    """

        response = self.client.chat.completions.create(
            model=self.model_name,
            messages=[
                {"role": "system", "content": prompt}
            ],
            temperature=0.0,
            stream=True
        )

        for chunk in response:
            if chunk.choices[0].delta.content:
                yield chunk.choices[0].delta.content

    def clean_and_save(self, text: str, output_path: str):
        # print(text)
        """‰ΩøÁî®Â§öÁ∫øÁ®ãÊ∏ÖÊ¥óÊñáÊú¨Âπ∂‰øùÂ≠ò"""
        text_chunks = self._split_text(text)
        id = output_path.split('/')[-1].split('.')[0]
        print(f"Ê≠£Âú®Ê∏ÖÊ¥óÊñáÊú¨{id}ÔºåÂàÜ‰∏∫ {len(text_chunks)} ÊÆµ...")

        def clean_chunk(index, chunk_text):
            buffer = []
            for delta in self._stream_clean_chunk(chunk_text):
                buffer.append(delta)
            return index, ''.join(buffer)  # ‰øùÁïôÂéüÂßãÈ°∫Â∫è

        cleaned_chunks = [None] * len(text_chunks)  # ‰øùÁïôÁªìÊûú‰ΩçÁΩÆ

        with ThreadPoolExecutor(max_workers=32) as executor:
            futures = {executor.submit(clean_chunk, idx, chunk): idx for idx, chunk in enumerate(text_chunks)}
            for future in tqdm(as_completed(futures), total=len(futures), desc="Ê∏ÖÊ¥óËøõÂ∫¶"):
                idx, cleaned_text = future.result()
                cleaned_chunks[idx] = cleaned_text

        print(output_path)    
        with open(output_path, "w", encoding="utf-8") as f_out:
            buffer = []
            for cleaned_text in cleaned_chunks:
                buffer.append(cleaned_text)
                if len(buffer) >= self.batch_size:
                    f_out.write(''.join(buffer))
                    f_out.flush()
                    buffer.clear()

            if buffer:
                f_out.write(''.join(buffer))
                f_out.flush()

        print(f"\n‚úÖ Â∑≤‰øùÂ≠òÂà∞: {output_path}")
        
def batch_clean_texts(text_cleaner: TextCleaner, raw_text_list, output_list, max_workers=15):
    """ÊâπÈáèÂπ∂ÂèëÊ∏ÖÊ¥óÔºåÁî±Â§ñÈÉ®ÊéßÂà∂"""
    def worker(text, output_path):
        try:
            text_cleaner.clean_and_save(text, output_path)
        except Exception as e:
            print(f"‚ùå Â§ÑÁêÜÂ§±Ë¥•: {output_path}ÔºåÈîôËØØÔºö{e}")

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = []
        for text, output_path in zip(raw_text_list, output_list):
            futures.append(executor.submit(worker, text, output_path))

        for _ in tqdm(as_completed(futures), total=len(futures), desc="Processing files"):
            pass

    print("\n‚úÖ ÊâÄÊúâÊñá‰ª∂Â§ÑÁêÜÂÆåÊØïÔºÅ")

In [None]:
# Example usage
import os
import re
raw_path = "path/to/your/raw/text/files"  # Replace with your actual path
output_path = "path/to/your/output/files"  # Replace with your actual path

def natural_sort_key(s):
    return [int(text) if text.isdigit() else text.lower() for text in re.split(r'(\d+)', s)]

raw_filenames = sorted(
    [filename for filename in os.listdir(raw_path)
        if filename.endswith('.txt')
        and not filename.startswith('cleaned_')
        and not os.path.exists(os.path.join(output_path, f"cleaned_{filename}"))],
    key=natural_sort_key
)

raw_text_list = []
for filename in raw_filenames:
    with open(os.path.join(raw_path, filename), 'r', encoding='utf-8') as f:
        raw_text_list.append(f.read())
        
output_list = [os.path.join(output_path, f"cleaned_{filename}") for filename in raw_filenames]
batch_clean_texts(TextCleaner(), raw_text_list, output_list, max_workers=3)

### Util: Translator

This notebook helps to translate the text data for further analysis. It uses Standard OpenAI API to translate the text data.

In [None]:
import asyncio
import os
from typing import List
from pathlib import Path
import aiofiles
from asyncio import Semaphore
import openai
from tqdm.asyncio import tqdm
from config import BASE_URL, PAYLOAD_MODEL, API_KEY

class AsyncTextTranslator:
    def __init__(self,
                 target_lang: str,
                 base_url: str = BASE_URL,
                 model_name: str = PAYLOAD_MODEL,
                 max_chars_per_request: int = 2000,
                 batch_size: int = 50,
                 concurrency_limit: int = 100,
                 timeout: int = 360):
        self.client = openai.AsyncOpenAI(
            api_key=API_KEY,
            base_url=base_url
        )
        self.model_name = model_name
        self.target_lang = target_lang
        self.max_chars_per_request = max_chars_per_request
        self.batch_size = batch_size
        self.semaphore = Semaphore(concurrency_limit)
        self.timeout = timeout

    def _split_text(self, text: str) -> List[str]:
        return [text[i:i + self.max_chars_per_request] for i in range(0, len(text), self.max_chars_per_request)]

    async def _safe_stream_translate_chunk(self, chunk_text: str) -> str:
        try:
            return await asyncio.wait_for(self._stream_translate_chunk(chunk_text), timeout=self.timeout)
        except asyncio.TimeoutError:
            print(f"[‚è∞ Timeout] Chunk exceeded {self.timeout}s")
            return "[Translation Timeout]"
        except Exception as e:
            print(f"[‚ùå Error] Chunk failed: {e}")
            return "[Translation Error]"

    async def _stream_translate_chunk(self, text_chunk: str) -> str:
        system_prompt = (
            f"You are a translation engine. "
            f"Translate all input to {self.target_lang}. Keep format, do not add extra content."
        )
        async with self.semaphore:
            # print(f"[üí¨] Ê≠£Âú®ÁøªËØë: {text_chunk[:20]}...")
            response = await self.client.chat.completions.create(
                model=self.model_name,
                messages=[
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": text_chunk}
                ],
                temperature=0.0,
                stream=True
            )
            buffer = []
            async for chunk in response:
                if chunk.choices[0].delta.content:
                    buffer.append(chunk.choices[0].delta.content)
            return ''.join(buffer)

    async def translate_and_save(self, text_path: Path, output_path: Path = None):
        async with aiofiles.open(text_path, "r", encoding="utf-8", errors="replace") as f_in:
            text = await f_in.read()
        text_chunks = self._split_text(text)
        if output_path is None:
            output_path = text_path.with_suffix('.translated.txt')
            print(f"[üìÅ] ÁøªËØëÊñá‰ª∂: {output_path.stem}")

        coroutines = [self._safe_stream_translate_chunk(
            chunk) for chunk in text_chunks]
        translated_chunks = await tqdm.gather(*coroutines, desc=f"ÁøªËØëËøõÂ∫¶: {text_path.stem}")

        async with aiofiles.open(output_path, "w", encoding="utf-8") as f_out:
            buffer = []
            for chunk in translated_chunks:
                buffer.append(chunk)
                if len(buffer) >= self.batch_size:
                    await f_out.write(''.join(buffer))
                    await f_out.flush()
                    buffer.clear()
            if buffer:
                await f_out.write(''.join(buffer))
                await f_out.flush()

        # print(f"‚úÖ Â∑≤‰øùÂ≠òËá≥: {output_path}")

    async def translate_all(self, text_paths: List[Path]):
        results = await asyncio.gather(
            *(self.translate_and_save(path) for path in text_paths),
            return_exceptions=True
        )
        for r in results:
            if isinstance(r, Exception):
                print("[‚ùå]", repr(r))
