# Assignment1_Data Collection and Preprocessing_Yuqi Lai

## 1. Data Collection and Mixing

This section demonstrates the collection of large-scale text data from diverse domains, including **FineWeb**, **Wikipedia**, and **CC News**.

The script utilizes **streaming** to efficiently sample data and combine it into a single dataset.

In [3]:
import os
import json
from datasets import load_dataset, load_from_disk

# Set 1.2GB
TARGET_SIZE_BYTES = 1.2 * 1024 * 1024 * 1024

dataset_configs = [
    {"name": "FineWeb", "path": "HuggingFaceFW/fineweb", "split": "train", "subset": "sample-10BT"},
    {"name": "Wikipedia", "path": "wikitext", "split": "train", "subset": "wikitext-103-raw-v1"},
    {"name": "CC_News", "path": "vblagoje/cc_news", "split": "train", "subset": None},
]

output_file = "raw_mixed_data.jsonl"
target_per_dataset = TARGET_SIZE_BYTES / len(dataset_configs)

print(f"Target Total Size: {TARGET_SIZE_BYTES / (1024**3):.2f} GB")

# Statistics dictionary for the report
stats = {}

with open(output_file, "w", encoding="utf-8") as f:
    for config in dataset_configs:
        print(f"Processing {config['name']}...")
        try:
            # Load using streaming mode
            if config['subset']:
                ds = load_dataset(config['path'], config['subset'], split=config['split'], streaming=True)
            else:
                ds = load_dataset(config['path'], split=config['split'], streaming=True)

            current_ds_bytes = 0
            count = 0

            for sample in ds:
                # Add field fallback to avoid empty CC_News entries
                text = (
                    sample.get('text') or
                    sample.get('content') or
                    sample.get('maintext') or
                    sample.get('description') or ""
                )

                # Skip empty lines
                if not text.strip():
                    continue

                record = {"text": text, "source": config['name']}
                f.write(json.dumps(record, ensure_ascii=False) + "\n")

                text_bytes = len(text.encode('utf-8'))
                current_ds_bytes += text_bytes
                count += 1

                if count % 2000 == 0:
                    print(f"Collected {current_ds_bytes / (1024**2):.2f} MB", end='\r')

                # Stop when this dataset reaches its allocated size
                if current_ds_bytes >= target_per_dataset:
                    print(f"\nFinished {config['name']}: {current_ds_bytes / (1024**2):.2f} MB ({count} docs)")
                    break

            # Record data
            stats[config['name']] = {"size_mb": current_ds_bytes / (1024**2), "docs": count}

        except Exception as e:
            print(f"Error reading {config['name']}: {e}")

print("Converting to Dataset format...")
if os.path.exists(output_file):
    raw_dataset = load_dataset("json", data_files=output_file, split="train")
    # cleaning will read directly from this folder
    raw_dataset.save_to_disk("raw_mixed_dataset")
    print("Done. Saved to 'raw_mixed_dataset'.")

    # Print summary stats
    print("\n--- Statistics for Report ---")
    print(f"{'Source':<15} | {'Size (MB)':<10} | {'Documents':<10}")
    print("-" * 40)
    for name, data in stats.items():
        print(f"{name:<15} | {data['size_mb']:<10.2f} | {data['docs']:<10}")
else:
    print("Error: Output file not found.")

Target Total Size: 1.20 GB
Processing FineWeb...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md: 0.00B [00:00, ?B/s]



Resolving data files:   0%|          | 0/27468 [00:01<?, ?it/s]

Collected 407.86 MB
Finished FineWeb: 409.60 MB (138582 docs)
Processing Wikipedia...


README.md: 0.00B [00:00, ?B/s]

Collected 408.93 MB
Finished Wikipedia: 409.60 MB (927496 docs)
Processing CC_News...


README.md: 0.00B [00:00, ?B/s]

Collected 408.68 MB
Finished CC_News: 409.60 MB (186227 docs)
Converting to Dataset format...


Generating train split: 0 examples [00:00, ? examples/s]

Saving the dataset (0/3 shards):   0%|          | 0/1252305 [00:00<?, ? examples/s]

Done. Saved to 'raw_mixed_dataset'.

--- Statistics for Report ---
Source          | Size (MB)  | Documents 
----------------------------------------
FineWeb         | 409.60     | 138582    
Wikipedia       | 409.60     | 927496    
CC_News         | 409.60     | 186227    


## 2. Data Cleaning and Preprocessing

In this section, I implement a preprocessing pipeline to improve data quality for model pre-training. The pipeline includes:
1.  **Normalization**: Converting text to lowercase and removing excessive whitespace and artifacts using regular expressions.
2.  **Filtering**: Removing short documents (fewer than 50 words) that likely contain low-quality content.
3.  **Deduplication**: Removing exact duplicate entries to prevent the model from memorizing repeated data.

In [4]:
import os
import re
import hashlib
from datasets import load_from_disk

# 1. Load Raw Data
if os.path.exists("raw_mixed_dataset"):
    print("Loading raw dataset from disk...")
    raw_dataset = load_from_disk("raw_mixed_dataset")
else:
    raise FileNotFoundError("Please run Data Collection first.")

# Global set for deduplication (In-memory)
seen_hashes = set()

def clean_and_dedup(example):
    text = example.get('text', "")

    if not text:
        return {"text": "", "is_valid": False}

    try:
        # 1. Remove Markdown Links & Images
        text = re.sub(r'!\[.*?\]\(.*?\)', '', text)        # Remove images
        text = re.sub(r'\[([^\]]+)\]\(.*?\)', r'\1', text) # Keep link text, remove URL

        # 2. Remove HTML tags
        text = re.sub(r'<[^>]+>', '', text)

        # 3. Remove URLs (http/https)
        text = re.sub(r'http\S+', '', text)

        # 4. Remove Wiki References
        text = re.sub(r'\[\d+\]', '', text)

        # 5. Remove Code Blocks
        text = re.sub(r'```.*?```', '', text, flags=re.DOTALL)
        text = re.sub(r'`[^`]+`', '', text)

        # 6. Remove Markdown Formatting
        text = re.sub(r'^\s*[-*]\s+', '', text, flags=re.MULTILINE) # List bullets
        text = re.sub(r'(#+\s|\*\*|__)', '', text)                  # Headers/Bold

        # 7. Irrelevant Symbol Stripping
        text = re.sub(r'[^\w\s.,!?;:\'\"-]+', '', text)

        # 8. Normalize Whitespace & Lowercase
        text = re.sub(r'\s+', ' ', text).strip().lower()


        # Filter documents shorter than 50 words
        if len(text.split()) < 50:
            return {"text": text, "is_valid": False}

        # Exact Deduplication (MD5)
        # Strategy: Hash POST-cleaning
        text_hash = hashlib.md5(text.encode('utf-8')).hexdigest()

        if text_hash in seen_hashes:
            return {"text": text, "is_valid": False} # Duplicate

        seen_hashes.add(text_hash)
        return {"text": text, "is_valid": True}

    except Exception as e:
        return {"text": "", "is_valid": False}

print(f"Original Raw Count: {len(raw_dataset)}")
print("Running Enhanced Cleaning Pipeline (Markdown+Symbols+Dedup)...")

processed_dataset = raw_dataset.map(clean_and_dedup, num_proc=1)

# Filter valid rows
cleaned_dataset = processed_dataset.filter(lambda x: x["is_valid"])
cleaned_dataset = cleaned_dataset.remove_columns(["is_valid"])

# Save
cleaned_dataset.save_to_disk("cleaned_dataset")

print(f"Final Cleaned Count: {len(cleaned_dataset)}")
print(f"Removed {len(raw_dataset) - len(cleaned_dataset)} documents.")
print("Saved to 'cleaned_dataset'.")

Loading raw dataset from disk...
Original Raw Count: 1252305
Running Enhanced Cleaning Pipeline (Markdown+Symbols+Dedup)...


Map:   0%|          | 0/1252305 [00:00<?, ? examples/s]

Filter:   0%|          | 0/1252305 [00:00<?, ? examples/s]

Saving the dataset (0/2 shards):   0%|          | 0/808966 [00:00<?, ? examples/s]

Final Cleaned Count: 808966
Removed 443339 documents.
Saved to 'cleaned_dataset'.


## 3. Tokenization

In this section, I using the **Hugging Face `AutoTokenizer`** interface.

**Tokenizer Configuration:**
* **Base Model**: GPT-2 (Byte-Level BPE)
* **Vocabulary Size**: 32,000
* **Max Sequence Length**: 1024 tokens
* **Special Tokens**: Inherited from GPT-2 + Padding Token added.

In [5]:
import os
from transformers import AutoTokenizer
from datasets import load_from_disk
from itertools import chain

if os.path.exists("cleaned_dataset"):
    print("Loading cleaned dataset from disk...")
    dataset = load_from_disk("cleaned_dataset")
else:
    raise FileNotFoundError("'cleaned_dataset' not found!")

print(f"Loaded {len(dataset)} documents for tokenization.")

# 2. Train Custom Tokenizer

def batch_iterator(batch_size=1000):
    for i in range(0, len(dataset), batch_size):
        yield dataset[i : i + batch_size]["text"]

print("\nTraining custom tokenizer (Vocab Size: 32,000)...")
# Using GPT-2 base tokenizer (Byte-Level BPE)
base_tokenizer = AutoTokenizer.from_pretrained("gpt2")
tokenizer = base_tokenizer.train_new_from_iterator(batch_iterator(), vocab_size=32000)

# set it to EOS token to enable batching in PyTorch DataLoader later
tokenizer.pad_token = tokenizer.eos_token

tokenizer.save_pretrained("./my_custom_tokenizer")
print("Tokenizer trained and saved to './my_custom_tokenizer'.")

block_size = 1024

def group_texts(examples):
    # a. Tokenize all texts in the batch
    tokenized_inputs = tokenizer(examples["text"], truncation=False)

    # b. Concatenate all tokens
    concatenated = {k: list(chain(*tokenized_inputs[k])) for k in tokenized_inputs.keys()}
    total_length = len(concatenated['input_ids'])

    # c. Drop the small remainder at the end
    if total_length >= block_size:
        total_length = (total_length // block_size) * block_size

    # d. Split the concatenated chain into chunks
    result = {
        k: [t[i : i + block_size] for i in range(0, total_length, block_size)]
        for k, t in concatenated.items()
    }

    # e. Add labels
    result["labels"] = result["input_ids"].copy()
    return result

print("Running Tokenization and Chunking (Block Size: 1024)...")

lm_datasets = dataset.map(
    group_texts,
    batched=True,
    num_proc=4,
    remove_columns=dataset.column_names, # Remove raw 'text' column, keep only tensors
    desc="Tokenizing"
)

print(f"\nStep 3 Complete!")
print(f"Total Training Blocks: {len(lm_datasets)}")

Loading cleaned dataset from disk...
Loaded 808966 documents for tokenization.

Training custom tokenizer (Vocab Size: 32,000)...


config.json:   0%|          | 0.00/665 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/1.04M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

Tokenizer trained and saved to './my_custom_tokenizer'.
Running Tokenization and Chunking (Block Size: 1024)...


Tokenizing (num_proc=4):   0%|          | 0/808966 [00:00<?, ? examples/s]

Token indices sequence length is longer than the specified maximum sequence length for this model (1026 > 1024). Running this sequence through the model will result in indexing errors
Token indices sequence length is longer than the specified maximum sequence length for this model (1079 > 1024). Running this sequence through the model will result in indexing errors
Token indices sequence length is longer than the specified maximum sequence length for this model (1076 > 1024). Running this sequence through the model will result in indexing errors



Step 3 Complete!
Total Training Blocks: 241416


## 4. Custom Data Loader
Implemented a custom data pipeline using PyTorch to efficiently handle dataset.

**Loader Configuration:**
* **Framework:** PyTorch (`torch.utils.data`).
* **Class Architecture:** `IterableDataset` (Selected for streaming capabilities).
* **Memory Strategy:** Lazy loading (Generator-based) to prevent memory bottlenecks.
* **Batching & Shuffling:** Managed via `DataLoader` with buffered shuffling.

In [9]:
import torch
from torch.utils.data import IterableDataset, DataLoader
import traceback

class GPTStreamingDataset(IterableDataset):

    def __init__(self, hf_dataset, shuffle=False):
        self.hf_dataset = hf_dataset
        self.shuffle = shuffle

    def __iter__(self):
        if self.shuffle:
            try:
                # Case 1: Map-style dataset (loaded from disk)
                shuffled_ds = self.hf_dataset.shuffle(seed=42)
            except TypeError:
                # Case 2: Iterable/Streaming dataset
                shuffled_ds = self.hf_dataset.shuffle(seed=42, buffer_size=10000)

            iterator = iter(shuffled_ds)
        else:
            iterator = iter(self.hf_dataset)

        for example in iterator:
            # Convert raw lists to PyTorch LongTensors
            yield {
                "input_ids": torch.tensor(example["input_ids"], dtype=torch.long),
                "labels": torch.tensor(example["labels"], dtype=torch.long),
                "attention_mask": torch.tensor(example["attention_mask"], dtype=torch.long)
            }

    def __len__(self):
        return len(self.hf_dataset)

print("\nInitializing Data Loader...")

# Instantiate Dataset and DataLoader
train_dataset = GPTStreamingDataset(lm_datasets, shuffle=True)
train_dataloader = DataLoader(
    train_dataset,
    batch_size=4,
    pin_memory=True
)

print("Custom Data Loader is ready.")

print("\n--- Extracting Samples for Submission ---")
sample_batches = []

try:
    # Iterate through the loader to fetch the first 5 batches
    for i, batch in enumerate(train_dataloader):
        if i >= 5: break

        sample_batches.append(batch)
        # Print shape to verify batch structure
        print(f"   Captured batch {i+1}: Input Shape {batch['input_ids'].shape}")

    # Save the samples to a .pt file
    if len(sample_batches) > 0:
        torch.save(sample_batches, "sample_dataset.pt")
        print("\nSUCCESS: 'sample_dataset.pt' saved successfully.")
    else:
        print("ERROR: No batches were captured. The dataset might be empty.")

except Exception as e:
    print(f"\nCRITICAL ERROR: {e}")
    traceback.print_exc()


Initializing Data Loader...
Custom Data Loader is ready.

--- Extracting Samples for Submission ---
   Captured batch 1: Input Shape torch.Size([4, 1024])
   Captured batch 2: Input Shape torch.Size([4, 1024])
   Captured batch 3: Input Shape torch.Size([4, 1024])
   Captured batch 4: Input Shape torch.Size([4, 1024])
   Captured batch 5: Input Shape torch.Size([4, 1024])

SUCCESS: 'sample_dataset.pt' saved successfully.
