In [None]:
import random
from pathlib import Path
import os

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
def generate_train_test_data(input_file,out_file, num_samples = 100):
  with open(input_file, 'r') as f:
    lines = f.readlines()
  train_lines = lines[:num_samples]
  final_lines = []
  for line in train_lines:
      line = line.split('=')[0]
      line = line + '=\n'
      final_lines.append(line)
  with open(out_file, 'w+') as f:
    f.writelines(final_lines)


In [None]:
generate_train_test_data('/content/drive/MyDrive/addition/addition_5_operands/5_addition_train_10000_2.txt', '/content/drive/MyDrive/addition/addition_5_operands/5_addition_train_test_100_2.txt')

In [None]:

def generate_3_digit_samples(num_samples=5000):
    """
    Generate 3-digit samples
    """
    samples = []
    strata = [
            (100, 399), # Lower range
            (400, 699), # Middle range
            (700, 999)  # Upper range (including numbers close to 999)
        ]

        # Allocate equal number of samples to each stratum combination
    samples_per_combo = num_samples // 3  # 9 combinations of strata
    for a_stratum in strata:
        for _ in range(samples_per_combo):
            a = random.randint(a_stratum[0], a_stratum[1])
            samples.append(a)
    # Add remaining samples randomly to reach exact num_samples
    remaining = num_samples - len(samples)
    for _ in range(remaining):
        a = random.randint(100, 999)
        samples.append(a)
    random.shuffle(samples)
    return samples

def generate_3_digit_addition_multiple(
        num_samples: int = 5_000,
        output_file_path: str = '/content/drive/MyDrive/addition/add_examples.txt',
        pad: bool = True,
        reverse: bool = False,
        num_addition: int = 4,
        num_addition_sample: int = 4,
        test_case: bool = False,
    ):
    """
    Build lines like 000+123+543+985=1651\n and write them to *output_file_path*.

    * num_addition ........... how many addends each line must contain
    * num_addition_sample .... how many real samples to draw for each line
                               (≤ num_addition; if smaller, front‑pad with 000)
    * reverse ................ if True, write each addend (and the sum) with
                               its digits reversed – handy for the rev‑addition
                               curriculum you used earlier.
    * pad .................... if True, force every addend to 3 digits

                           (sum is left un‑padded ‑– matches your example)
    """
    if test_case == False:
      file_name = f'train_reverse_{reverse}_additionOperations_{num_addition}_numOfChars_{num_addition_sample}_numOfSamples_{num_samples}.txt'
    else:
      file_name = f'test_reverse_{reverse}_additionOperations_{num_addition}_numOfChars_{num_addition_sample}_numOfSamples_{num_samples}.txt'
    output_file_path = os.path.join(output_file_path, file_name)
    path = Path(output_file_path)
    path.parent.mkdir(parents=True, exist_ok=True)
    if num_addition_sample > num_addition:
        raise ValueError("num_addition_sample must be ≤ num_addition")

    # 1.  Collect independent sample streams, one per ‘real’ addend.
    samples_list = [generate_3_digit_samples(num_samples)            # your helper
                    for _ in range(num_addition_sample)]

    zeros_to_prepend = num_addition - num_addition_sample
    lines: list[str] = []

    for i in range(num_samples):
        addends_int: list[int] = [0] * zeros_to_prepend \
                               + [samples_list[k][i] for k in range(num_addition_sample)]

        line_sum = sum(addends_int)

        # 2.  Convert to strings (with optional 0‑padding)
        if pad:
            addends_str = [f"{a:03}" for a in addends_int]
        else:
            addends_str = [str(a) for a in addends_int]

        if reverse:
            line_sum = str(line_sum)[::-1]
        if test_case:
            expr = "+".join(addends_str) + f"=\n"
        else:
            expr = "+".join(addends_str) + f"={line_sum}\n"
        lines.append(expr)

    with open(output_file_path, "w") as f:
        f.writelines(lines)

    return lines

In [None]:
lines: list[str] = generate_3_digit_addition_multiple(num_samples=500000,output_file_path='/content/drive/MyDrive/addition/data/',pad=True,reverse=True,num_addition=4,num_addition_sample=4,test_case=False)

In [None]:
lines

In [None]:
import random
import itertools
from pathlib import Path

def generate_all_problems(num_operands):
    """
    Generate all unique addition problems with (num_operands + 1) operands
    where the result is a two-digit number.
    """
    combos = itertools.product(range(1, 10), repeat=num_operands + 1)
    return [
        f"{'+'.join(map(str, combo))}={sum(combo)}"
        for combo in combos
        if 10 <= sum(combo) <= 99
    ]

def split_dataset(num_operands, train_frac=0.8, seed=42):
    """
    Generate, shuffle, and split the dataset into train and test sets.

    Args:
        num_operands (int): Number of operands minus one (so total numbers = num_operands + 1).
        train_frac (float): Fraction of data to use for training.
        seed (int): Random seed for reproducibility.

    Writes:
        train.txt, test.txt in the current directory.
    """
    problems = generate_all_problems(num_operands)
    random.Random(seed).shuffle(problems)

    split_idx = int(2000)
    train = problems[:split_idx]
    val = problems[split_idx:split_idx+401]
    test = problems[split_idx+401:split_idx+801]

    # Ensure no overlap
    assert set(train).isdisjoint(test), "Train/Test overlap detected!"

    # Write to files
    Path("train.txt").write_text("\n".join(train) + "\n")
    Path("test.txt").write_text("\n".join(test) + "\n")
    Path("val.txt").write_text("\n".join(val) + "\n")

    print(f"Train size: {len(train)} (written to train.txt)")
    print(f"Test size: {len(test)} (written to test.txt)")

# Example usage for 3 operands (4 numbers):
split_dataset(num_operands=3, train_frac=0.8, seed=42)


Train size: 2000 (written to train.txt)
Test size: 400 (written to test.txt)


In [None]:
def generate_addition_problem(num_operands):
    """Generate an addition problem with specified number of operands where result is 2 digits."""
    while True:
        # Generate n+1 numbers (n operands plus one more to ensure we can get a 2-digit result)
        numbers = [random.randint(1, 9) for _ in range(num_operands + 1)]
        result = sum(numbers)
        if 10 <= result <= 99:  # Ensure 2-digit result
            # Format the numbers with + between them
            problem = "+".join(map(str, numbers))
            return f"{problem}={result}\n"

In [None]:
def generate_data(num_operands, num_samples, output_path):
    """Generate addition data and write to file."""
    output_path = Path(output_path)
    output_path.parent.mkdir(parents=True, exist_ok=True)

    with open(output_path, 'w') as f:
        for _ in range(num_samples):
            line = generate_addition_problem(num_operands)
            f.write(line)

In [None]:
def reverse_results(input_path, output_path):
    """Read the input file, reverse the results, and write to output file."""
    input_path = Path(input_path)
    output_path = Path(output_path)
    output_path.parent.mkdir(parents=True, exist_ok=True)

    with open(input_path, 'r') as f_in, open(output_path, 'w') as f_out:
        for line in f_in:
            # Split the line into problem and result
            problem, result = line.strip().split('=')
            # Reverse the result
            reversed_result = result[::-1]
            # Write the new line with reversed result
            f_out.write(f"{problem}={reversed_result}\n")

In [None]:
reverse_results('/content/val.txt', '/content/drive/MyDrive/addition/addition_4_operands_1_digit/reverse_val.txt')

In [None]:
generate_data(6, 200, '/content/drive/MyDrive/addition/addition_7_operands/7_addition_val_200.txt')

In [None]:
generate_train_test_data('/content/test.txt', '/content/drive/MyDrive/addition/addition_4_operands_1_digit/reverse_test.txt', num_samples=400)

In [None]:
from pathlib import Path

def add_padding_at_the_end(input_file, output_file):
    output_path = Path(output_file)
    output_path.parent.mkdir(parents=True, exist_ok=True)

    with open(input_file, 'r') as f:
        lines = f.readlines()

    with open(output_file, 'w') as f:
        for line in lines:
            f.write(line.strip() + '$' + '\n')

In [None]:
add_padding_at_the_end('/content/drive/MyDrive/addition/addition_4_operands_1_digit/reverse_val.txt','/content/drive/MyDrive/addition/addition_4_operands_1_digit/reverse_val.txt')

In [None]:
def generate_addition_perturbations_test(pos, num_operands):
    """Generate an addition problem with specified number of operands where result is 2 digits."""
    while True:
        # Generate n+1 numbers (n operands plus one more to ensure we can get a 2-digit result)
        numbers = [random.randint(1, 9) for _ in range(num_operands+1)]
        lines = []
        for i in range(1,10):
          numbers[pos] = i
          result = sum(numbers)
          if 10 <= result <= 99:  # Ensure 2-digit result
            problem = "+".join(map(str, numbers))
            lines.append(f"{problem}=\n")
        return lines

In [None]:
def generate_perturbations_test_data(pos, num_operands, num_samples, output_path):
    """Generate addition data and write to file."""
    output_path = Path(output_path)
    output_path.parent.mkdir(parents=True, exist_ok=True)

    with open(output_path, 'w') as f:
        count = 0
        while True:
            lines = generate_addition_perturbations_test(pos, num_operands)  # assumed to return a list of strings
            for line in lines:
                f.write(line)
            count += len(lines)
            if count >= num_samples:
                break

In [None]:
generate_perturbations_test_data(4, 4, 100, '/content/drive/MyDrive/addition/addition_5_operands/pos_4_perturbations_test.txt')

In [None]:
def generate_test_data(input_file,out_file, num_samples = 100):
  with open(input_file, 'r') as f:
    lines = f.readlines()
  final_lines = []
  for line in lines:
      line = line.split('=')[0]
      line = line + '=\n'
      final_lines.append(line)
  with open(out_file, 'w+') as f:
    f.writelines(final_lines)

In [None]:
def generate_addition_rotated_test(num_operands):
    """Generate an addition problem with specified number of operands where result is 2 digits."""
    while True:
        # Generate n+1 numbers (n operands plus one more to ensure we can get a 2-digit result)
        numbers = [random.randint(1, 9) for _ in range(num_operands+1)]
        lines = []
        for i in range(1,10):
          result = sum(numbers)
          if 10 <= result <= 99:  # Ensure 2-digit result
            problem = "+".join(map(str, numbers))
            lines.append(f"{problem}=\n")
        return lines

In [None]:
import random
import itertools
def select_digits_with_two_digit_sum(num_operands):
    """Keep sampling until the sum of digits is in [10,99]."""
    while True:
        digits = [random.randint(1, 9) for _ in range(num_operands+1)]
        if 10 <= sum(digits) <= 99:
            return digits

def write_perms_from_random_multiset(num_operands, output_path):
    """
    1. Pick a random list of `num_operands` digits (1–9) summing to two digits.
    2. Generate all *unique* orderings of that list.
    3. Write each as "a+b+c+...=" to `output_path`.
    """
    digits = select_digits_with_two_digit_sum(num_operands)
    unique_perms = set(itertools.permutations(digits))

    output_path = Path(output_path)
    output_path.parent.mkdir(parents=True, exist_ok=True)

    with open(output_path, 'w') as f:
        # Optionally, write a header/comment showing the chosen multiset:
        for perm in unique_perms:
            f.write("+".join(map(str, perm)) + "=\n")

In [None]:
write_perms_from_random_multiset(4, '/content/drive/MyDrive/addition/addition_5_operands/permutations.txt')

In [None]:
from pathlib import Path
import random

# ------------------------------------------------------------
# helpers
# ------------------------------------------------------------
def _format(nums):
    """Return 'a+b+c=SUM' string (ordered, permutation-sensitive)."""
    return f"{'+'.join(map(str, nums))}={sum(nums)}"

def _sample_valid_tuple(n_operands_plus_one):
    """Draw (n_operands+1) numbers in [1,9] whose sum is two-digit."""
    while True:
        nums = [random.randint(1, 9) for _ in range(n_operands_plus_one)]
        if 10 <= sum(nums) <= 99:
            return nums

# ------------------------------------------------------------
# main API
# ------------------------------------------------------------
def build_train_and_test(
    num_operands: int,
    n_train: int,
    train_path: str,
    test_path: str,
    n_test: int = 100,
    seed: int | None = 42,
):
    """
    Train/test generator where *ordering matters* (3+4 ≠ 4+3).
    - num_operands  : number of '+' signs (i.e. operands  = num_operands+1)
    - n_train       : rows in train split
    - n_test        : rows in test split (default 100)
    """
    if seed is not None:
        random.seed(seed)

    train_seen = set()   # exact tuples already in train
    test_seen  = set()   # exact tuples already in test

    # --- build training set --------------------------------------------------
    Path(train_path).parent.mkdir(parents=True, exist_ok=True)
    with Path(train_path).open("w") as f_train:
        while len(train_seen) < n_train:
            nums = _sample_valid_tuple(num_operands + 1)
            key  = tuple(nums)           # ORDER-SENSITIVE key
            if key in train_seen:
                continue
            train_seen.add(key)
            f_train.write(_format(nums) + "\n")

    # --- build test set (no overlap with train) ------------------------------
    Path(test_path).parent.mkdir(parents=True, exist_ok=True)
    with Path(test_path).open("w") as f_test:
        while len(test_seen) < n_test:
            nums = _sample_valid_tuple(num_operands + 1)
            key  = tuple(nums)
            if key in train_seen or key in test_seen:
                continue
            test_seen.add(key)
            f_test.write(_format(nums) + "\n")

In [None]:
build_train_and_test(
        num_operands=4,         # → 5 addends per line
        n_train=10000,           # 2 000-row training file
        train_path="/content/drive/MyDrive/addition/addition_5_operands/5_addition_train_10000_2.txt",
        test_path="/content/drive/MyDrive/addition/addition_5_operands/5_addition_test_100_2.txt",  # 100 brand-new ordered problems
        n_test=100,
    )


In [None]:
import random
import numpy as np
from pathlib import Path
def random_addition_quad():
    """Return (nums, total) where nums is a list of 4 random 3-digit ints
       and total ≥ 1000."""
    while True:
        nums = [random.randint(100, 999) for _ in range(4)]
        s = sum(nums)
        if s >= 1000:               # enforce four-digit total
            return nums, s

def line_from(nums, total):
    """Format: 350+540+750+100=1740\n"""
    return "+".join(map(str, nums)) + f"={total}\n"

In [None]:
def build_fast(
    n_train=800_000, n_test=200_000,
    bucket=100, train_path="train.txt", test_path="test.txt",
    seed=42, batch_size=200_000
):
    rng = np.random.default_rng(seed)
    edges     = np.arange(1000, 4000+1, bucket)
    n_buckets = len(edges) - 1

    quota_train = np.full(n_buckets, n_train // n_buckets, dtype=int)
    quota_test  = np.full(n_buckets, n_test  // n_buckets, dtype=int)
    quota_train[: n_train % n_buckets] += 1
    quota_test [: n_test  % n_buckets] += 1

    train_lines, test_lines = [], []

    while quota_train.sum() or quota_test.sum():
        nums  = rng.integers(100, 1000, size=(batch_size, 4), dtype=np.int32)
        sums  = nums.sum(axis=1)
        keep  = sums >= 1000
        nums, sums = nums[keep], sums[keep]

        idxs = np.digitize(sums, edges) - 1           # bucket indices

        for i in range(len(nums)):
            b = idxs[i]
            line = f"{nums[i,0]}+{nums[i,1]}+{nums[i,2]}+{nums[i,3]}={sums[i]}\n"
            if quota_train[b]:
                train_lines.append(line); quota_train[b]-=1
            elif quota_test[b]:
                test_lines.append(line); quota_test[b]-=1
            # else: bucket full → ignore

    Path(train_path).write_text("".join(train_lines))
    Path(test_path ).write_text("".join(test_lines))

    print(f"✅  Wrote {len(train_lines):,} → {train_path}")
    print(f"✅  Wrote {len(test_lines):,} → {test_path}")

In [None]:
import numpy as np
from pathlib import Path
MIN_TOTAL = 1000
MAX_TOTAL = 999 * 4          # 3996
BUCKET_SIZE = 100            # 1000-1099, 1100-1199, …

# ────────────────────────────────────────────────────────────────
def _sample_bucket(rng: np.random.Generator, lo: int, hi: int, n: int) -> list[str]:
    """Vectorised sampling of *n* rows whose total ∈ [lo, hi]."""
    rows = []
    # oversample a bit so we rarely loop
    need = n
    while need:
        k = need * 2                       # try twice the remaining quota
        adds = rng.integers(100, 1000, size=(k, 3))
        targets = rng.integers(lo, hi + 1, size=k)
        d = targets - adds.sum(1)
        ok = (d >= 100) & (d <= 999)
        adds_ok = adds[ok][:need]
        d_ok = d[ok][:need]
        tgt_ok = targets[ok][:need]

        rows.extend(
            f"{a}+{b}+{c}+{d}={t}\n"
            for (a, b, c), d, t in zip(adds_ok, d_ok, tgt_ok, strict=False)
        )
        need = n - len(rows)
    return rows

# ────────────────────────────────────────────────────────────────
def build_dataset(n_samples: int, out_path: Path, seed: int = 42,
                  bucket: int = BUCKET_SIZE) -> None:
    rng = np.random.default_rng(seed)

    # lay out bucket boundaries
    buckets = list(range(MIN_TOTAL, MAX_TOTAL + 1, bucket))
    n_buckets = len(buckets)
    per_bucket, extra = divmod(n_samples, n_buckets)

    with out_path.open("w") as f:
        for i, lo in enumerate(buckets):
            hi = min(lo + bucket - 1, MAX_TOTAL)
            quota = per_bucket + (1 if i < extra else 0)
            rows = _sample_bucket(rng, lo, hi, quota)
            f.writelines(rows)

In [None]:
build_dataset(10_000,  Path("val.txt"), 42,  100)

In [None]:
# 1) load the train set
train_seen = set(
    Path("train.txt")
    .read_text()
    .splitlines(keepends=True)
)

# 2) make an RNG for test
rng = np.random.default_rng(43)

# 3) sample until you have 10 000 *new* rows
rows = []
lo, hi = 1000, 999 * 4
while len(rows) < 10_000:
    # <-- correct signature: (rng, lo, hi, n)
    batch = _sample_bucket(rng, lo, hi, 500)
    for r in batch:
        if r not in train_seen:
            rows.append(r)
            train_seen.add(r)          # also avoid dupes within test
        if len(rows) >= 10_000:
            break

# 4) write out
#Path("content").mkdir(exist_ok=True, parents=True)
with open("test.txt", "w") as f:
    f.writelines(rows)

In [None]:
def shuffle_file(input_path: str, output_path: str, seed: int = 42) -> None:
    """
    Reads all lines from input_path, shuffles them (reproducibly by seed),
    and writes the result to output_path.
    """
    input_path = Path(input_path)
    output_path = Path(output_path)
    output_path.parent.mkdir(parents=True, exist_ok=True)

    # 1) Read all lines
    lines = input_path.read_text().splitlines(keepends=True)

    # 2) Shuffle in-place
    rng = random.Random(seed)
    rng.shuffle(lines)

    # 3) Write shuffled lines
    output_path.write_text("".join(lines))

In [None]:
shuffle_file('/content/drive/MyDrive/addition/addition_4_operands_3_digit/test/10000_test.txt', '/content/drive/MyDrive/addition/addition_4_operands_3_digit/test/10000_test.txt')

In [None]:
!cp /content/val.txt /content/drive/MyDrive/addition/addition_4_operands_3_digit/val.txt