In [1]:
import os
import re
import hashlib
import shutil
import logging
from pathlib import Path
from chardet import detect
import pandas as pd
import unicodedata

# ========== CONFIGURATION ==========
BASE_DIR = Path(r"C:\Users\ayush\OneDrive\Desktop\Stories")
CLEANED_DIR = BASE_DIR / "cleaned_files"
RAW_DIR = BASE_DIR
LOG_DIR = CLEANED_DIR / "cleaning_phase_log"
DUP_LOG = LOG_DIR / "duplicates_log.txt"
CSV_LOG = LOG_DIR / "cleaning_report.csv"

# Create required directories if they don't exist
for dir_path in [CLEANED_DIR, LOG_DIR]:
    dir_path.mkdir(parents=True, exist_ok=True)

# ========== PRE-COMPILED REGEX PATTERNS ==========
# For normalize_text
ILLUSTRATION_PAT = re.compile(r"\[Illustration:.*?\]", re.IGNORECASE|re.DOTALL)
PAGE_PAT = re.compile(r"-?\[?Page\s*\d+\]?", re.IGNORECASE)
START_MARKER_PAT = re.compile(r"\*{3,}.*?START.*?\*{3,}", re.IGNORECASE|re.DOTALL)
END_MARKER_PAT = re.compile(r"\*{3,}.*?END.*?\*{3,}", re.IGNORECASE|re.DOTALL)
PRODUCTION_CREDIT_PAT = re.compile(r"^\s*\*{3,}.*?Produced by.*$", re.IGNORECASE|re.MULTILINE)
END_NOTICE_PAT = re.compile(r"^\s*End of (the )?Project Gutenberg.*$", re.IGNORECASE|re.MULTILINE)
HYPHEN_REJOIN_PAT = re.compile(r"(\S)-\n(\S)")
EXCESS_NEWLINE_PAT = re.compile(r"\n{3,}")
WHITESPACE_PAT = re.compile(r"[ \t]+")

# For extract_metadata
METADATA_PATTERNS = {
    "PGID": re.compile(r'PG(?:ID)?:?\s*(\d{3,})', re.IGNORECASE),
    "Title": re.compile(r'Title:?\s*([^\n]+)', re.IGNORECASE),
    "Author": re.compile(r'Author:?\s*([^\n]+)', re.IGNORECASE),
    "ReleaseDate": re.compile(r'Release\s*Date:?\s*([^\n]+)', re.IGNORECASE),
    "Language": re.compile(r'Language:?\s*([^\n]+)', re.IGNORECASE)
}

# For filename sanitization
INVALID_CHARS_PAT = re.compile(r'[\\/*?:"<>|]')

# ========== ROBUST LOGGING SETUP ==========
log_formatter = logging.Formatter('[%(asctime)s] %(levelname)s: %(message)s')
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# File handler
file_handler = logging.FileHandler(
    LOG_DIR / "phase1_cleaning.log", 
    mode='w', 
    encoding='utf-8'
)
file_handler.setFormatter(log_formatter)
logger.addHandler(file_handler)

# Console handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(log_formatter)
logger.addHandler(console_handler)

# ========== TRACKING STATS ==========
report_data = []
seen_hashes = {}
file_counter = 0
processed_files = 0
duplicates_found = 0
error_files = 0

# ========== OPTIMIZED UTILITY FUNCTIONS ==========
def sanitize_filename(name):
    """Sanitize filenames by removing invalid characters"""
    return INVALID_CHARS_PAT.sub("", name).strip()

def normalize_text(text):
    """Optimized text normalization with pre-compiled patterns"""
    # Remove Project Gutenberg artifacts
    text = ILLUSTRATION_PAT.sub("", text)
    text = PAGE_PAT.sub("", text)
    text = START_MARKER_PAT.sub("", text)
    text = END_MARKER_PAT.sub("", text)
    text = PRODUCTION_CREDIT_PAT.sub("", text)
    text = END_NOTICE_PAT.sub("", text)
    
    # Structural improvements
    text = HYPHEN_REJOIN_PAT.sub(r"\1\2", text)  # Rejoin hyphenated words
    text = EXCESS_NEWLINE_PAT.sub("\n\n", text)   # Reduce excessive newlines
    text = WHITESPACE_PAT.sub(" ", text)          # Collapse whitespace
    text = text.replace("--", "—")               # Em-dash conversion
    
    # Special character handling
    char_map = {
        "æ": "ae",    # U+00e6
        "œ": "oe",    # U+0153
        "ß": "ss",    # U+00df
        "½": "1/2",   # U+00bd
        "“": '"',     # U+201c
        "”": '"',     # U+201d
        "‘": "'",     # U+2018
        "’": "'"      # U+2019
    }
    
    for char, replacement in char_map.items():
        text = text.replace(char, replacement)
    
    # Normalize unicode characters
    return unicodedata.normalize('NFKC', text).strip()

def extract_metadata(text):
    """Optimized metadata extraction with pre-compiled patterns"""
    metadata_text = text[:5000]  # Only examine first 5K characters
    metadata = {}
    
    for key, pattern in METADATA_PATTERNS.items():
        match = pattern.search(metadata_text)
        result = match.group(1).strip() if match else ""
        # Validate and truncate metadata
        if result and len(result) <= 150:
            metadata[key] = result
        else:
            metadata[key] = "Unknown"
    
    return metadata

# ========== MAIN CLEANING PROCESS ==========
logging.info("🚀 Starting Phase 1 Cleaning...")
all_files = list(RAW_DIR.rglob("*.txt"))
total_files = len(all_files)
logging.info(f"📂 Total files detected: {total_files}\n")

with open(DUP_LOG, 'w', encoding='utf-8') as dup_log:
    for file_path in all_files:
        file_counter += 1
        file_stats = {
            "Original Filename": file_path.name,
            "PGID": "N/A",
            "Title": "N/A",
            "Author": "N/A",
            "Encoding": "Unknown",
            "Original Size (KB)": round(file_path.stat().st_size / 1024, 2),
            "Status": "Pending"
        }
        
        logging.info(f"[{file_counter}/{total_files}] Processing: {file_path.name}")
        
        try:
            # ===== SECURITY CHECK =====
            file_size = file_path.stat().st_size
            if file_size > 10 * 1024 * 1024:  # 10MB limit
                raise ValueError("File exceeds size limit (10MB)")
                
            # ===== SINGLE READ OPERATION =====
            binary_data = file_path.read_bytes()
            
            # ===== DUPLICATE DETECTION =====
            sha256 = hashlib.sha256(binary_data).hexdigest()
            file_stats["SHA256"] = sha256
                
            if sha256 in seen_hashes:
                dup_log.write(f"REMOVED: {file_path.name} (duplicate of {seen_hashes[sha256]})\n")
                duplicates_found += 1
                file_stats["Status"] = "Duplicate"
                report_data.append(file_stats)
                continue
                
            seen_hashes[sha256] = file_path.name

            # ===== CONTENT PROCESSING =====
            # Detect encoding from sample
            sample = binary_data[:50000]
            encoding = detect(sample)['encoding'] or 'utf-8'
            file_stats["Encoding"] = encoding
            
            # Decode content
            try:
                content = binary_data.decode(encoding, errors='replace')
            except UnicodeDecodeError:
                content = binary_data.decode('utf-8', errors='replace')
                
            if not content.strip():
                raise ValueError("Empty file after reading")
                
            # Extract metadata
            metadata = extract_metadata(content)
            for key in ["PGID", "Title", "Author"]:
                file_stats[key] = metadata.get(key, "Unknown")
                
            # Normalize content
            content = normalize_text(content)
            
            # ===== FINAL STRUCTURE =====
            safe_title = sanitize_filename(metadata['Title'])
            output_file = CLEANED_DIR / f"{metadata['PGID']}_{safe_title}_clean.txt"
            
            # Handle duplicate filenames
            if output_file.exists():
                counter = 1
                while output_file.exists():
                    output_file = CLEANED_DIR / f"{metadata['PGID']}_{safe_title}_clean_{counter}.txt"
                    counter += 1
                    
            with open(output_file, 'w', encoding='utf-8-sig') as out:
                out.write(
                    f"[DOC_START]\n"
                    f"# PGID: {metadata['PGID']}\n"
                    f"# Title: {metadata['Title']}\n"
                    f"# Author: {metadata['Author']}\n"
                    f"# Release Date: {metadata.get('ReleaseDate', 'Unknown')}\n"
                    f"# Language: {metadata.get('Language', 'Unknown')}\n\n"
                    f"{content}\n"
                    f"[DOC_END]"
                )
                
            processed_files += 1
            file_stats["Status"] = "Processed"
            
        except Exception as e:
            error_files += 1
            logging.error(f"❌ Error processing {file_path.name}: {str(e)}")
            file_stats["Status"] = f"Error: {str(e)}"
            
        report_data.append(file_stats)

# ========== FINALIZATION & REPORTING ==========
logging.info("\n📊 Generating final reports...")
try:
    report_df = pd.DataFrame(report_data)
    column_order = [
        "Original Filename", "Status", "PGID", "Title", "Author", 
        "SHA256", "Encoding", "Original Size (KB)"
    ]
    report_df = report_df.reindex(columns=column_order)
    report_df.to_csv(CSV_LOG, index=False, encoding='utf-8-sig')
except Exception as e:
    logging.critical(f"Failed to generate CSV report: {str(e)}")

# Final summary
logging.info("\n✅ Phase 1 Cleaning Complete")
logging.info(f"📂 Total Files Scanned: {total_files}")
logging.info(f"✅ Successfully Processed: {processed_files}")
logging.info(f"🚫 Duplicates Skipped: {duplicates_found}")
logging.info(f"❗ Errors Encountered: {error_files}")
logging.info(f"📄 Detailed report saved to: {CSV_LOG}")
logging.info(f"📓 Duplicates log saved to: {DUP_LOG}")

[2025-07-28 18:41:16,063] INFO: 🚀 Starting Phase 1 Cleaning...
[2025-07-28 18:41:16,162] INFO: 📂 Total files detected: 9530

[2025-07-28 18:41:16,164] INFO: [1/9530] Processing: 01 Harry Potter and the Sorcerers Stone.txt
[2025-07-28 18:41:16,322] INFO: [2/9530] Processing: 02 Harry Potter and the Chamber of Secrets.txt
[2025-07-28 18:41:16,494] INFO: [3/9530] Processing: 03 Harry Potter and the Prisoner of Azkaban.txt
[2025-07-28 18:41:16,688] INFO: [4/9530] Processing: 04 Harry Potter and the Goblet of Fire.txt
[2025-07-28 18:41:17,016] INFO: [5/9530] Processing: 05 Harry Potter and the Order of the Phoenix.txt
[2025-07-28 18:41:17,599] INFO: [6/9530] Processing: 06 Harry Potter and the Half-Blood Prince.txt
[2025-07-28 18:41:18,815] INFO: [7/9530] Processing: 07 Harry Potter and the Deathly Hallows.txt
[2025-07-28 18:41:19,447] INFO: [8/9530] Processing: 1000 Mythological Characters Briefly Described.txt
[2025-07-28 18:41:19,568] INFO: [9/9530] Processing: 10004_the_warriors__by_ann

In [7]:
import os
import logging
import pandas as pd
import numpy as np
from pathlib import Path
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.cluster import MiniBatchKMeans
from sklearn.decomposition import TruncatedSVD
import re
import shutil
import gc
import time
from contextlib import suppress
from concurrent.futures import ThreadPoolExecutor, as_completed

# ====== CONFIG ======
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)

BASE_DIR = Path(r"C:/Users/ayush/OneDrive/Desktop/Stories")
CLEANED_DIR = BASE_DIR / "cleaned_files"
SPLIT_DIR = BASE_DIR / "split"
TRAIN_DIR = SPLIT_DIR / "train"
VAL_DIR = SPLIT_DIR / "val"
REJECTED_DIR = SPLIT_DIR / "rejected"
LOG_DIR = SPLIT_DIR / "split_logs"

for d in [TRAIN_DIR, VAL_DIR, REJECTED_DIR, LOG_DIR]:
    d.mkdir(parents=True, exist_ok=True)

logging.basicConfig(
    level=logging.INFO,
    format='[%(asctime)s] %(levelname)s: %(message)s',
    handlers=[
        logging.FileHandler(LOG_DIR / "split_pipeline.log", mode='w', encoding='utf-8'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger()

# ====== CONSTANTS & PATTERNS ======
DOC_PATTERN = re.compile(r'\[DOC_START\](.*?)\[DOC_END\]', re.DOTALL)
TOKEN_PATTERN = re.compile(r'\b\w{2,}\b')
MIN_TOKENS = 5

# ====== UTILS ======
def copy_task(src, dest):
    try:
        shutil.copy(src, dest)
        return True, src.name, None
    except Exception as e:
        return False, src.name, str(e)

def parallel_copy(file_paths, target_dir, max_workers=8):
    logger.info(f"🛠️ Copying {len(file_paths)} files to {target_dir.name}")
    start_time = time.time()
    tasks = [(src, target_dir / src.name) for src in file_paths]

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(copy_task, src, dest) for src, dest in tasks]
        success_count = 0
        for future in as_completed(futures):
            try:
                success, filename, error = future.result()
                if success:
                    success_count += 1
                else:
                    logger.error(f"❌ Failed: {filename} - {error}")
                    reject_path = REJECTED_DIR / filename
                    with suppress(Exception):
                        shutil.copy(CLEANED_DIR / filename, reject_path)
            except Exception as e:
                logger.exception(f"🔥 Unhandled future exception: {str(e)}")

    elapsed = time.time() - start_time
    logger.info(f"📆 Copied {success_count}/{len(file_paths)} files in {elapsed:.2f}s")
    return success_count

def process_file(file_path):
    try:
        with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
            file_content = f.read()
            match = DOC_PATTERN.search(file_content)
            if not match:
                return None, file_path, 'missing_DOC_TAG'
            content = match.group(1)
            tokens = TOKEN_PATTERN.findall(content)
            if len(tokens) < MIN_TOKENS:
                return None, file_path, 'low_token_count'
            return ' '.join(tokens), file_path, len(tokens)
    except Exception as e:
        return None, file_path, str(e)

# ====== MAIN ======
def main():
    global_start = time.time()
    logger.info("🚀 Starting hyper-optimized document processing pipeline")

    logger.info("🔍 Scanning and preprocessing files...")
    start_scan = time.time()
    files = list(CLEANED_DIR.glob("*_clean.txt"))
    logger.info(f"📂 Found {len(files)} files")

    valid_texts = []
    valid_paths = []
    token_counts = []
    rejected_paths = []
    low_token_count = 0
    error_count = 0
    missing_tag_count = 0

    with ThreadPoolExecutor(max_workers=os.cpu_count() * 2) as executor:
        futures = {executor.submit(process_file, f): f for f in files}
        for i, future in enumerate(as_completed(futures)):
            try:
                text, file_path, result = future.result()
            except Exception as e:
                logger.error(f"🔥 Crash in file: {futures[future].name} | Exception: {str(e)}")
                rejected_paths.append(futures[future])
                error_count += 1
                continue

            if text is None:
                rejected_paths.append(file_path)
                if result == 'low_token_count':
                    low_token_count += 1
                elif result == 'missing_DOC_TAG':
                    missing_tag_count += 1
                else:
                    error_count += 1
                    logger.debug(f"🚨 Error in {file_path.name}: {result}")
            else:
                valid_texts.append(text)
                valid_paths.append(file_path)
                token_counts.append(result)

            if (i + 1) % 1000 == 0:
                logger.info(f"⏱️ Processed {i+1}/{len(files)} files")

    if rejected_paths:
        parallel_copy(rejected_paths, REJECTED_DIR)

    scan_time = time.time() - start_scan
    logger.info(f"✅ Preprocessing complete: Valid={len(valid_texts)} | Rejected={len(rejected_paths)} (low tokens={low_token_count}, missing tags={missing_tag_count}, other errors={error_count}) in {scan_time:.2f}s")

    if not valid_texts:
        logger.warning("🛑 No valid documents found. Exiting early.")
        return

    logger.info("⚡ Vectorizing text data...")
    start_vec = time.time()
    vectorizer = HashingVectorizer(
        n_features=2**16,
        ngram_range=(1, 2),
        alternate_sign=False,
        norm=None,
        dtype=np.float32
    )
    X = vectorizer.transform(valid_texts)
    del valid_texts
    gc.collect()

    logger.info("🌀 Performing dimensionality reduction...")
    svd = TruncatedSVD(n_components=48, algorithm='arpack', random_state=RANDOM_SEED)
    X_reduced = svd.fit_transform(X)
    logger.info(f"⭐ Explained variance: {svd.explained_variance_ratio_.sum():.4f}")
    del X
    gc.collect()

    vec_time = time.time() - start_vec
    logger.info(f"📊 Vectorized {X_reduced.shape[0]} docs in {vec_time:.2f}s")

    logger.info("🧩 Clustering documents...")
    start_cluster = time.time()
    n_clusters = min(50, max(8, len(valid_paths) // 100))
    kmeans = MiniBatchKMeans(
        n_clusters=n_clusters,
        batch_size=4096,
        compute_labels=True,
        random_state=RANDOM_SEED,
        n_init='auto'
    )
    labels = kmeans.fit_predict(X_reduced)
    del X_reduced
    gc.collect()

    cluster_time = time.time() - start_cluster
    logger.info(f"🏷️ Clustered {len(valid_paths)} docs in {cluster_time:.2f}s")

    logger.info("✂️ Performing stratified split...")
    df = pd.DataFrame({
        'path': valid_paths,
        'cluster': labels,
        'token_count': token_counts
    })

    train_paths = []
    val_paths = []
    for cluster_id in range(n_clusters):
        cluster_df = df[df['cluster'] == cluster_id]
        if len(cluster_df) <= 1:
            train_paths.extend(cluster_df['path'].tolist())
            continue
        cluster_df = cluster_df.sample(frac=1, random_state=RANDOM_SEED)
        split_idx = int(len(cluster_df) * 0.9)
        train_paths.extend(cluster_df.iloc[:split_idx]['path'].tolist())
        val_paths.extend(cluster_df.iloc[split_idx:]['path'].tolist())

    logger.info(f"✅ Split complete: Train={len(train_paths)} | Val={len(val_paths)}")
    del df, labels
    gc.collect()

    parallel_copy(train_paths, TRAIN_DIR)
    parallel_copy(val_paths, VAL_DIR)

    total_time = time.time() - global_start
    logger.info(f"🎉 Total processing time: {total_time:.2f}s")
    logger.info(f"📊 Final counts: Train={len(train_paths)}, Val={len(val_paths)}, Rejected={len(rejected_paths)}")

if __name__ == "__main__":
    main()

[2025-07-29 00:11:41,998] INFO: 🚀 Starting hyper-optimized document processing pipeline
[2025-07-29 00:11:41,999] INFO: 🔍 Scanning and preprocessing files...
[2025-07-29 00:11:42,058] INFO: 📂 Found 8644 files
[2025-07-29 00:13:55,990] INFO: ⏱️ Processed 1000/8644 files
[2025-07-29 00:13:56,271] INFO: ⏱️ Processed 2000/8644 files
[2025-07-29 00:13:57,154] INFO: ⏱️ Processed 3000/8644 files
[2025-07-29 00:13:57,664] INFO: ⏱️ Processed 4000/8644 files
[2025-07-29 00:14:06,719] INFO: ⏱️ Processed 5000/8644 files
[2025-07-29 00:14:35,619] INFO: ⏱️ Processed 6000/8644 files
[2025-07-29 00:15:05,576] INFO: ⏱️ Processed 7000/8644 files
[2025-07-29 00:15:33,047] INFO: ⏱️ Processed 8000/8644 files
[2025-07-29 00:15:49,004] INFO: ✅ Preprocessing complete: Valid=8644 | Rejected=0 (low tokens=0, missing tags=0, other errors=0) in 247.00s
[2025-07-29 00:15:49,005] INFO: ⚡ Vectorizing text data...
[2025-07-29 00:27:02,950] INFO: 🌀 Performing dimensionality reduction...
[2025-07-29 00:27:55,058] INFO:

In [12]:
import os
import logging
from pathlib import Path
from datetime import datetime

# ======================= CONFIG (Must match prior script) =======================
RANDOM_SEED = 42
BASE_DIR = Path(r"C:/Users/ayush/OneDrive/Desktop/Stories")
SPLIT_DIR = BASE_DIR / "split"
TRAIN_DIR = SPLIT_DIR / "train"
VAL_DIR = SPLIT_DIR / "val"
LOG_DIR = SPLIT_DIR / "split_logs"
MERGE_OUTPUT_DIR = BASE_DIR / "training_data" / "train_bulk_txt"
MERGED_TRAIN_FILE = MERGE_OUTPUT_DIR / "merged_train.txt"
MERGED_VAL_FILE = MERGE_OUTPUT_DIR / "merged_val.txt"

MERGE_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# ======================= LOGGING =======================
logging.basicConfig(
    level=logging.INFO,
    format='[%(asctime)s] %(levelname)s: %(message)s',
    handlers=[
        logging.FileHandler(LOG_DIR / "merge_pipeline.log", mode='w', encoding='utf-8'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger("MERGE")

# ======================= MERGE FUNCTION =======================
def merge_texts(source_dir: Path, output_path: Path, label: str):
    logger.info(f"📦 Merging all .txt files in {source_dir} for [{label.upper()}]...")
    files = sorted(source_dir.glob("*.txt"))
    total_size = 0
    merged_count = 0

    with open(output_path, 'w', encoding='utf-8') as outfile:
        for file in files:
            try:
                with open(file, 'r', encoding='utf-8') as f:
                    content = f.read().strip()
                    if content:
                        outfile.write(content + '\n')
                        merged_count += 1
                        total_size += len(content.encode('utf-8'))
                    else:
                        logger.warning(f"⚠️ Empty file skipped: {file.name}")
            except Exception as e:
                logger.error(f"❌ Failed to read {file.name}: {e}")

    logger.info(f"✅ Merged {merged_count} files → {output_path.name} ({total_size / 1024:.2f} KB)")

# ======================= EXECUTION =======================
def main():
    logger.info("🚀 Starting merge script (post split pipeline)")
    start_time = datetime.now()

    if not TRAIN_DIR.exists():
        logger.error(f"❌ Train directory not found: {TRAIN_DIR}")
    else:
        merge_texts(TRAIN_DIR, MERGED_TRAIN_FILE, "train")

    if not VAL_DIR.exists():
        logger.error(f"❌ Val directory not found: {VAL_DIR}")
    else:
        merge_texts(VAL_DIR, MERGED_VAL_FILE, "val")

    elapsed = (datetime.now() - start_time).total_seconds()
    logger.info(f"🏁 Merging complete in {elapsed:.2f} seconds")

if __name__ == '__main__':
    main()

[2025-07-29 09:00:39,828] INFO: 🚀 Starting merge script (post split pipeline)
[2025-07-29 09:00:39,830] INFO: 📦 Merging all .txt files in C:\Users\ayush\OneDrive\Desktop\Stories\split\train for [TRAIN]...
[2025-07-29 09:03:11,819] INFO: ✅ Merged 7762 files → merged_train.txt (2954239.10 KB)
[2025-07-29 09:03:11,825] INFO: 📦 Merging all .txt files in C:\Users\ayush\OneDrive\Desktop\Stories\split\val for [VAL]...
[2025-07-29 09:03:28,787] INFO: ✅ Merged 882 files → merged_val.txt (339923.38 KB)
[2025-07-29 09:03:28,787] INFO: 🏁 Merging complete in 168.96 seconds


In [13]:
2+2

4