In [1]:
# pip install -r requirements.txt

In [2]:
import os
import argparse
import random
import re
import csv

BASE_DIR = os.getcwd()
DATASET_DIR = BASE_DIR + '\\dataset'
BASE_DIR, DATASET_DIR

('c:\\Users\\pockg\\OneDrive\\Desktop\\New',
 'c:\\Users\\pockg\\OneDrive\\Desktop\\New\\dataset')

In [3]:
def merge_text_files(folder_path, output_file="merged.txt", encoding="utf-8"):
    with open(output_file, "w", encoding=encoding) as outfile:
        for filename in sorted(os.listdir(folder_path)):
            if filename.lower().endswith(".txt"):
                file_path = os.path.join(folder_path, filename)
                with open(file_path, "r", encoding=encoding) as infile:
                    content = infile.read().strip()
                    outfile.write(content + "\n")
    print(f"[OK] All text files has been combined into '{output_file}'.")

source = DATASET_DIR+'\\ko\\'
src_path = BASE_DIR+'\\source.txt'
if not os.path.exists(src_path):
    merge_text_files(source, output_file='source.txt')
else:
    print("The source file already exists.")
    print(f'- file path: {src_path}')

target = DATASET_DIR+'\\en\\'
tgt_path = BASE_DIR+'\\target.txt'
if not os.path.exists(BASE_DIR+'\\target.txt'):
    merge_text_files(target, output_file='target.txt')
else:
    print("The target file already exists.")
    print(f'- file path: {tgt_path}')


[OK] All text files has been combined into 'source.txt'.
[OK] All text files has been combined into 'target.txt'.


In [4]:
p = argparse.ArgumentParser(description="Build synthetic QE dataset CSV")
p.add_argument("--source", default="source.txt", help="UTF-8 Korean source file")
p.add_argument("--target", default="target.txt", help="UTF-8 English target file")
p.add_argument("--out", default="dataset.csv", help="Output CSV path")
p.add_argument("--seed", type=int, default=42)
p.add_argument("--no-perfect", action="store_true", help="Do NOT include 100-point rows")

args, _ = p.parse_known_args()
if not os.path.exists(args.source) or not os.path.exists(args.target):
    raise FileNotFoundError("Check the file source.txt / target.txt")

In [5]:
def read_lines(path: str):
    with open(path, "r", encoding="utf-8") as f:
        return [ln.rstrip("\n\r") for ln in f.readlines()]

srcs = read_lines(args.source)
tgts = read_lines(args.target)
if len(srcs) != len(tgts):
    raise ValueError("Both files are not matching each other.")
len(srcs), len(tgts)

(8030, 8030)

In [6]:
# ---------- Text helpers ----------
_WORD_RE = re.compile(r"[^\W\d_]+", re.UNICODE)  # Extract unicode words
_NUM_RE = re.compile(r"\b\d{1,4}\b")
_AMPM_RE = re.compile(r"\b(am|pm|a\.m\.|p\.m\.)\b", re.IGNORECASE)

MONTHS = ["january","february","march","april","may","june",
          "july","august","september","october","november","december"]
DAYS = ["monday","tuesday","wednesday","thursday","friday","saturday","sunday"]

AUX = ["am","is","are","was","were","be","been","being",
       "do","does","did","have","has","had",
       "can","could","will","would","shall","should","may","might","must"]

STOP_LITE = {"a","an","the","to","of","in","on","for","and","or","but","so",
             "as","at","by","with","from","than","then","there","here"}

In [7]:
def _preserve_case(src_token: str, repl: str) -> str:
    if src_token.isupper():
        return repl.upper()
    if src_token[0].isupper():
        return repl.capitalize()
    return repl

def replace_one_of_set(text: str, vocab: list[str]) -> str:
    # Replace with another item only once by word boundary
    lowered = text.lower()
    candidates = [w for w in vocab if re.search(rf"\b{re.escape(w)}\b", lowered)]
    if not candidates:
        return text
    target = random.choice(candidates)
    others = [w for w in vocab if w != target]
    if not others:
        return text
    repl = random.choice(others)

    def _sub(m):
        tok = m.group(0)
        return _preserve_case(tok, repl)
    return re.sub(rf"\b{re.escape(target)}\b", _sub, text, count=1, flags=re.IGNORECASE)

def mutate_numbers(text: str) -> str:
    # Damage 1 to 4 digits to +-1 or random approximation
    def _mut(m):
        n = int(m.group(0))
        if n <= 2:
            return str(n + 1)
        if random.random() < 0.5:
            return str(max(0, n - 1))
        return str(n + 1)
    return _NUM_RE.sub(_mut, text, count=random.randint(1, 2))

def flip_ampm(text: str) -> str:
    def _flip(m):
        v = m.group(0)
        low = v.lower()
        alt = "pm" if "am" in low else "am"
        if "." in low:
            alt = alt[0] + ".m."
        return _preserve_case(v, alt)
    return _AMPM_RE.sub(_flip, text, count=1)

def insert_or_remove_not(text: str) -> str:
    # Insert 'not' after an auxiliary verb or remove an existing 'not'
    if re.search(r"\bnot\b", text, flags=re.IGNORECASE) and random.random() < 0.5:
        return re.sub(r"\bnot\b", "", text, count=1, flags=re.IGNORECASE).replace("  ", " ")

    # Insert
    for aux in AUX:
        m = re.search(rf"\b{aux}\b", text, flags=re.IGNORECASE)
        if m:
            i = m.end()
            return text[:i] + " not" + text[i:]
    return text

def drop_content_words(text: str, drop_ratio=(0.15, 0.35)) -> str:
    # Remove some terminology (keep short/non-word), reassemble blank criteria    
    tokens = text.split()
    if not tokens:
        return text
    k = random.uniform(*drop_ratio)
    kept = []
    for t in tokens:
        plain = re.sub(r"[^\w]", "", t, flags=re.UNICODE)
        is_word = bool(_WORD_RE.fullmatch(plain)) and plain
        if is_word and len(plain) > 3 and plain.lower() not in STOP_LITE:
            if random.random() < k:
                continue
        kept.append(t)
    out = " ".join(kept)
    return out if out.strip() else text  # If entire is removed, keep it origin.

def shuffle_chunks(text: str) -> str:
    # Mix sequences, with 2~3 chunks based on comma/adjungtive. 
    parts = re.split(r"(,|;| and | but )", text)
    if len(parts) < 3:
        toks = text.split()
        random.shuffle(toks)
        return " ".join(toks)
    
    # Combine small parts
    chunks, cur = [], ""
    for p in parts:
        cur += p
        if p in {",",";"," and "," but "}:
            chunks.append(cur.strip())
            cur = ""
    if cur.strip():
        chunks.append(cur.strip())
    random.shuffle(chunks)
    return " ".join(chunks)

def degrade_mid(text: str) -> str:
    # Create sentences which have about 50 score, with demage on rules.
    ops = [
        lambda s: replace_one_of_set(s, MONTHS),
        lambda s: replace_one_of_set(s, DAYS),
        mutate_numbers,
        flip_ampm,
        insert_or_remove_not,
        drop_content_words,
        shuffle_chunks,
    ]
    random.shuffle(ops)
    out = text

    # apply 3~5 things
    for fn in ops[:random.randint(3, 5)]:
        out = fn(out)

    # If there's no change, try one more dropping.
    if out.strip() == text.strip():
        out = drop_content_words(out, drop_ratio=(0.30, 0.45))
    return re.sub(r"\s+", " ", out).strip()


In [8]:
# ---------- Dataset building ----------
def make_dataset(sources, targets,
                 include_perfect = True,
                 seed = 42):
    random.seed(seed)
    n = min(len(sources), len(targets))
    rows = []

    # Index permutation for miss match(score Zero)
    perm = list(range(n))
    random.shuffle(perm)
    for i in range(n):
        if perm[i] == i:
            j = (i + 1) % n
            perm[i], perm[j] = perm[j], perm[i]

    for i in range(n):
        src = sources[i].strip()
        good = targets[i].strip()

        if include_perfect:
            rows.append([src, good, 100])

        # Score 50: partial damage to meaning
        mid = degrade_mid(good)
        if len(mid.split()) <= max(2, len(good.split()) // 4):
            # If it's getting too short, try one more demage.
            mid = degrade_mid(good)
        rows.append([src, mid, 50])

        # Zero: Other line's sentence.
        wrong = targets[perm[i]].strip()
        rows.append([src, wrong, 0])

    return rows

rows = make_dataset(
    srcs, tgts,
    include_perfect=not args.no_perfect,
    seed=args.seed
)

In [9]:
def write_csv(rows, out_path: str):
    with open(out_path, "w", encoding="utf-8", newline="") as f:
        w = csv.writer(f)
        w.writerow(["SOURCE", "TARGET", "SCORE"])
        for r in rows:
            w.writerow(r)

write_csv(rows, args.out)