## Quality Processes for Texts

In [11]:
#!pip install langdetect googletrans deep_translator

In [12]:
# Importing useful dependencies
import re
import unicodedata
import io
import os
import boto3
import chardet
import pandas as pd
import tiktoken
from langdetect import detect
from googletrans import Translator
import asyncio
from deep_translator import GoogleTranslator
# set tokenizer with openAI standard token
from typing import List, Dict
from ftfy import fix_text

In [13]:
# Setup S3 client for MinIO (MinIO implements Amazon S3 API)
s3 = boto3.client(
    "s3",
    endpoint_url="http://127.0.0.1:9000", # MinIO API endpoint
    aws_access_key_id="minioadmin", # User name
    aws_secret_access_key="minioadmin", # Password
)

In [14]:
# ---- Quality checks ----
def check_text_quality(body: bytes, key: str):
    """Return a simple dict of basic quality stats for one file."""
    if not body:
        return {"key": key, "empty": True}
    
    # detect encoding and decode safely
    guess = chardet.detect(body)
    enc = guess.get("encoding") or "utf-8"
    text = body.decode(enc, errors="replace")

    # check printable ratio (avoid binary garbage)
    printable_ratio = sum(c.isprintable() or c.isspace() for c in text) / max(1, len(text))
    origin_lang = detect(text)
    # --- Paragraph token stats ---
    paragraphs = [p.strip() for p in text.split("\n\n") if p.strip()]

    paragraph_tokens = [len(p) for p in paragraphs] if paragraphs else []
    
    avg_tokens_per_paragraph = (
        sum(paragraph_tokens) / len(paragraph_tokens) if paragraph_tokens else 0
    )
    max_tokens_paragraph = max(paragraph_tokens) if paragraph_tokens else 0
    min_tokens_paragraph = min(paragraph_tokens) if paragraph_tokens else 0
    # basic stats
    return {
        "key": key,
        "size_bytes": len(body),
        "encoding": enc,
        "empty": not bool(text.strip()),
        "too_short": len(text.strip()) < 20,
        "low_printable_ratio": printable_ratio < 0.9,
        "lines": len(text.splitlines()),
        "avg_tokens_per_paragraph": round(avg_tokens_per_paragraph, 2),
        "max_tokens_paragraph": max_tokens_paragraph,
        "min_tokens_paragraph": min_tokens_paragraph,
        "paragraph_token_list": paragraph_tokens,  # you can drop this if you don’t need full list
        "origin_language": origin_lang,
    }

In [15]:
# ---- Run checks on all txt files ----
def extract_datas(bucket,prefix=""):
    results = []
    paginator = s3.get_paginator("list_objects_v2")
    for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
        for obj in page.get("Contents", []):
            
            key = obj["Key"]

            if obj['Size'] == 0 and key.endswith("/"): # skip the folder itself
                continue
            # Download the text
            resp = s3.get_object(Bucket=bucket, Key=key)
            body = resp["Body"].read()
            stats = check_text_quality(body, key)
            results.append(stats)
    return results

In [16]:
data = extract_datas(bucket = "trusted-zone", prefix = "texts")

In [17]:
# Convert the data into a dataFrame
df_data = pd.DataFrame(data)
df_data

Unnamed: 0,key,size_bytes,encoding,empty,too_short,low_printable_ratio,lines,avg_tokens_per_paragraph,max_tokens_paragraph,min_tokens_paragraph,paragraph_token_list,origin_language
0,texts/text_1760786400687.txt,1043,ascii,False,False,False,1,1043.0,1043,1043,[1043],en
1,texts/text_1760786400752.txt,1373,utf-8,False,False,False,1,1369.0,1369,1369,[1369],en
2,texts/text_1760786400827.txt,734,utf-8,False,False,False,1,729.0,729,729,[729],en
3,texts/text_1760786400902.txt,1072,ascii,False,False,False,1,1072.0,1072,1072,[1072],en
4,texts/text_1760786400988.txt,1260,ascii,False,False,False,1,1260.0,1260,1260,[1260],en
...,...,...,...,...,...,...,...,...,...,...,...,...
984,texts/text_1760786502751.txt,1093,ascii,False,False,False,1,1093.0,1093,1093,[1093],en
985,texts/text_1760786502826.txt,693,ascii,False,False,False,1,693.0,693,693,[693],en
986,texts/text_1760786502909.txt,1046,ascii,False,False,False,1,1046.0,1046,1046,[1046],en
987,texts/text_1760786502982.txt,1279,ascii,False,False,False,1,1279.0,1279,1279,[1279],en


In [18]:
def get_text(bucket, key):
    resp = s3.get_object(Bucket=bucket, Key=key)
    body = resp["Body"].read()
    text = body.decode("utf-8")
    return text

In [21]:
_CTRL_RE = re.compile(r"[\x00-\x08\x0B-\x0C\x0E-\x1F\x7F]")          # control character
_WS_RE = re.compile(r"[ \t\u00A0\u2000-\u200B\u3000]+")
HIGH_UNICODE_GARBAGE_RE = re.compile(r"[^\x00-\x7F]+")
LANG_MAP = {
    "zh-cn": "zh-CN",
    "zh_cn": "zh-CN",
    "zh-CN": "zh-CN",
    "zh": "zh-CN",
    "en": "en",
    "es": "es",
    "fr": "fr",
    "ja": "ja",
    "ko": "ko"
}

_SENT_SPLIT_RE = re.compile(
    r'(?:\s*\n+\s*)|'                 # paragraph/newline breaks
    r'(?<=[。！？!?])\s+'             # CJK sentence enders + whitespace
    r'|(?<=[\.\?\!])\s+'              # English . ? ! + whitespace
)


try:
    import emoji
    EMOJI_RE = emoji.get_emoji_regexp()
except Exception:
    EMOJI_RE = re.compile(r'[\U0001F300-\U0001FAFF\U00002700-\U000027BF]+')


COMBINING_MARKS_RE = re.compile(r'[\u0300-\u036F]+')     # combining marks

def basic_clean(text: str) -> str:
    text = unicodedata.normalize("NFKC", text)
    text = text.replace("\ufeff", "")
    text = _CTRL_RE.sub("", text)
    text = text.replace("\r\n", "\n").replace("\r", "\n")
    text = "\n".join(_WS_RE.sub(" ", ln).strip() for ln in text.split("\n"))
    text = re.sub(r"\n{3,}", "\n\n", text) 
    return text

    
def to_english(text: str, lang) -> str:
    try:
        src_lang = LANG_MAP.get(lang.lower(), lang)
        result =  GoogleTranslator(source=src_lang, target='english').translate(text)
        return result
    except Exception as e:
        print(f"⚠️ Translation failed ({e}), keeping original text.")
        return text

def split_into_sentences(text: str) -> List[str]:

    text = re.sub(r'\r\n?', '\n', text).strip()
    parts = []
    for para in filter(None, text.split('\n')):
        para = re.sub(r'\s+', ' ', para).strip()
        if not para:
            continue
        parts.extend([s.strip() for s in re.split(_SENT_SPLIT_RE, para) if s.strip()])
    return parts

def chunk_text_with_token_budget(
    text: str,
    tokenizer,
    max_tokens: int = 512,
    stride: int = 64,
    reserve_special: int = 2,  # reserve tokens for [CLS]/[SEP] or similar
) -> List[Dict]:
    
    # The actual available token budget after reserving special tokens
    budget = max_tokens - reserve_special
    assert budget > 0, "max_tokens is too small; cannot reserve special tokens"
    
    # Split text into sentences
    sents = split_into_sentences(text)

    chunks: List[Dict] = []
    curr_sents: List[str] = []
    curr_tokens = 0

    # Helper: count token length for a given text (excluding special tokens)
    def token_len(txt: str) -> int:
        return len(tokenizer.encode(txt))

    # Precompute token lengths of all sentences for efficiency
    sent_lens = [token_len(s) for s in sents]

    i = 0
    while i < len(sents):
        s = sents[i]
        s_len = sent_lens[i]

        # Case A: single sentence exceeds budget → split within sentence (sliding window)
        if s_len > budget:
            # If current chunk buffer is not empty, finalize it first
            if curr_sents:
                chunk_text = " ".join(curr_sents).strip()
                chunks.append({"text": chunk_text, "n_tokens": token_len(chunk_text), "input_ids": None})
                curr_sents, curr_tokens = [], 0

            # Encode the long sentence into token IDs
            input_ids = tokenizer.encode(s)
            start = 0
            while start < len(input_ids):
                # Take a slice of up to `budget` tokens
                end = min(start + budget, len(input_ids))
                piece_ids = input_ids[start:end]
                # Decode back to text for saving
                piece_text = tokenizer.decode(piece_ids).strip()
                chunks.append({"text": piece_text, "n_tokens": len(piece_ids), "input_ids": None})

                # If reached the end, stop
                if end == len(input_ids):
                    break
                # Move window forward with overlap (`stride`)
                start = max(end - stride, start + 1)
            i += 1
            continue

        # Case B: greedy packing — keep adding sentences to current chunk
        if curr_tokens + s_len <= budget:
            curr_sents.append(s)
            curr_tokens += s_len
            i += 1
        else:
            # Finalize the current chunk when adding the next sentence would exceed the budget
            chunk_text = " ".join(curr_sents).strip()
            chunks.append({"text": chunk_text, "n_tokens": token_len(chunk_text), "input_ids": None})
            curr_sents, curr_tokens = [], 0

    # Finalize any remaining sentences
    if curr_sents:
        chunk_text = " ".join(curr_sents).strip()
        chunks.append({"text": chunk_text, "n_tokens": token_len(chunk_text), "input_ids": None})

    return chunks
        
def clean_text(df,max_tokens = 512):
    bucket="trusted-zone"
    tokenizer = tiktoken.get_encoding("cl100k_base")
    for row in df.itertuples(index = False):

        
        #check text need remove or clean
        if row.empty or row.too_short or row.low_printable_ratio:
            client.delete_object(Bucket=bucket, Key=row.key)
            continue
        #get text
        text = get_text("trusted-zone",row.key)        
        #basic clean and get text
        text = basic_clean(text)

        #if language of text isn't english tranlate it.
        if(row.origin_language != "en"):
            text = to_english(text, row.origin_language)

        paragraphs = [p.strip() for p in text.split("\n\n") if p.strip()]
        for i in range (0, len(row.paragraph_token_list)):

            if (row.paragraph_token_list[i] > max_tokens):
                chunks = chunk_text_with_token_budget(paragraphs[i], tokenizer, max_tokens)
                full_text = "\n\n".join(chunk["text"].strip() for chunk in chunks if chunk["text"].strip())
                paragraphs[i]=full_text

        text = "\n\n".join(paragraphs)
        fixed = fix_text(text)

        fixed = unicodedata.normalize("NFKC", fixed)
        fixed = COMBINING_MARKS_RE.sub("", fixed)
        fixed = _CTRL_RE.sub(" ", fixed)

        fixed = EMOJI_RE.sub(" ", fixed)
        fixed = HIGH_UNICODE_GARBAGE_RE.sub(" ", fixed)
        s3.put_object(
            Bucket=bucket,
            Key=row.key,  # Make sure the file key (path) is correct
            Body=fixed.encode('utf-8'),
            ContentType="text/plain"
        )

        
        
            
clean_text(df_data)

⚠️ Translation failed (Let me first state that the combat in this game has a certain degree of difficulty and is not suitable for friends who like to play idle games. Since there are not many testers, I am mainly testing myself. Even if I play it many times, there will inevitably be omissions and unreasonable things. If there are bugs or you think the difficulty is unreasonable in some places, I hope you can give positive feedback, leave a message in the comment area or join the group @! Thank you for playing! The game has a combat mechanism that is different from other turn-based games. Each character has unique talent points that can produce qualitative changes. You can train the character in the direction you want. But note that your talents need to be matched with the skills you mainly use to achieve maximum effect. Points to note in this game: 1. Many skills require TP points to release. We can improve the efficiency of TP acquisition by increasing attack speed or TP recovery. 2. 


KeyboardInterrupt

