<a href="https://colab.research.google.com/github/JKTK25/-JSONL-Cleaner-with-LLM-Enhancement/blob/main/DATA_PDF_PROCESSING_EXTRACTION.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install PyPDF2 tiktoken pytesseract Pillow langdetect pymupdf docx

In [None]:
pip install PyMuPDF langdetect

In [8]:
pip install python-docx

Collecting python-docx
  Downloading python_docx-1.2.0-py3-none-any.whl.metadata (2.0 kB)
Downloading python_docx-1.2.0-py3-none-any.whl (252 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/253.0 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m253.0/253.0 kB[0m [31m8.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: python-docx
Successfully installed python-docx-1.2.0


# **EXTRACTING DATA FOR MACHINE LEARNING**

# **DATA PDF PROCESSING EXTRACTION**


In [15]:
import json
import re
import hashlib
from pathlib import Path
from typing import List, Optional, Dict, Tuple
import pytesseract
from PIL import Image
import fitz  # PyMuPDF
import langdetect
from langdetect import DetectorFactory
import logging
from docx import Document
import os
import concurrent.futures
from functools import partial

try:
    from tqdm import tqdm
    USE_TQDM = True
except ImportError:
    USE_TQDM = False

# Try importing tkinter, but allow fallback
try:
    import tkinter as tk
    from tkinter import filedialog
    TKINTER_AVAILABLE = True
except (ImportError, RuntimeError):
    TKINTER_AVAILABLE = False

# Consistent language detection
DetectorFactory.seed = 0

# Setup logging
logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s')

class EnhancedDocumentExtractor:
    def __init__(
        self,
        input_dir: Optional[str] = None,
        output_file: Optional[str] = None,
        max_tokens: Optional[int] = 512,
        min_chunk_size: int = 100,
        overlap: int = 50,
        target_language: Optional[str] = "en",
        min_text_quality: float = 0.7,
        enable_ocr: bool = True,
        num_workers: int = None
    ):
        # Default directories
        default_input_dir = "./documents"
        default_output_file = "./pretraining_data.jsonl"

        # Handle input directory selection
        if input_dir:
            self.input_dir = Path(input_dir)
        else:
            if TKINTER_AVAILABLE and os.environ.get('DISPLAY'):
                try:
                    root = tk.Tk()
                    root.withdraw()
                    self.input_dir = Path(filedialog.askdirectory(title="Select Input Directory") or default_input_dir)
                    root.destroy()
                except Exception:
                    logging.warning("Falling back to console input for directory selection")
                    self.input_dir = self._prompt_directory("input directory", default_input_dir)
            else:
                self.input_dir = self._prompt_directory("input directory", default_input_dir)

        if not self.input_dir.exists():
            raise ValueError(f"Input directory {self.input_dir} does not exist")

        # Handle output file selection
        if output_file:
            self.output_file = Path(output_file)
        else:
            if TKINTER_AVAILABLE and os.environ.get('DISPLAY'):
                try:
                    root = tk.Tk()
                    root.withdraw()
                    selected_file = filedialog.asksaveasfilename(
                        title="Select Output File",
                        defaultextension=".jsonl",
                        filetypes=[("JSON Lines", "*.jsonl"), ("All files", "*.*")]
                    )
                    self.output_file = Path(selected_file or default_output_file)
                    root.destroy()
                except Exception:
                    logging.warning("Falling back to console input for output file selection")
                    self.output_file = self._prompt_directory("output file", default_output_file, is_file=True)
            else:
                self.output_file = self._prompt_directory("output file", default_output_file, is_file=True)

        self.max_tokens = max_tokens
        self.min_chunk_size = min_chunk_size
        self.overlap = overlap
        self.target_language = target_language
        self.min_text_quality = min_text_quality
        self.enable_ocr = enable_ocr
        self.num_workers = num_workers or os.cpu_count() // 2 or 1
        self.seen_hashes = self._load_existing_hashes()
        self._setup_encoder()

    def _setup_encoder(self):
        """Lazy load the tokenizer only when needed"""
        if self.max_tokens:
            import tiktoken
            self.encoder = tiktoken.get_encoding("cl100k_base")
        else:
            self.encoder = None

    def _prompt_directory(self, dir_type: str, default: str, is_file: bool = False) -> Path:
        """Prompt user for directory or file path via console with a default option."""
        prompt = f"Enter {dir_type} (default: {default}): "
        user_input = input(prompt).strip()
        path = Path(user_input or default)

        if is_file:
            # Ensure parent directory exists for output file
            path.parent.mkdir(parents=True, exist_ok=True)
        else:
            # Ensure input directory exists
            path.mkdir(parents=True, exist_ok=True)

        return path

    def _load_existing_hashes(self) -> set:
        """Prevent duplicate processing by loading already written hashes from output file."""
        seen = set()
        if self.output_file.exists():
            logging.info("Resuming from previous output...")
            with open(self.output_file, 'r', encoding='utf-8') as f:
                for line in f:
                    try:
                        record = json.loads(line)
                        text = record.get("text", "")
                        hash_ = self.generate_content_hash(text)
                        seen.add(hash_)
                    except Exception:
                        continue
        return seen

    def calculate_text_quality(self, text: str) -> float:
        """Efficient text quality calculation using precomputed values"""
        total_chars = len(text)
        if total_chars == 0:
            return 0.0

        alpha_count = sum(1 for c in text if c.isalpha())
        alpha_ratio = alpha_count / total_chars

        words = re.findall(r'\b\w+\b', text)
        word_count = len(words)
        if word_count == 0:
            return 0.0

        valid_words = sum(1 for word in words if word.isalpha())
        word_ratio = valid_words / word_count

        special_chars = len(re.findall(r'[^\w\s]', text))
        special_ratio = 1 - (special_chars / total_chars)

        return (alpha_ratio * 0.4 + word_ratio * 0.4 + special_ratio * 0.2)

    def detect_language(self, text: str) -> Optional[str]:
        """Efficient language detection using sampling"""
        sample_text = text[:2000]  # Use first 2000 characters for detection
        if len(sample_text.strip()) < 10:
            return None
        try:
            return langdetect.detect(sample_text)
        except:
            return None

    def generate_content_hash(self, text: str) -> str:
        normalized = re.sub(r'\s+', ' ', text).strip().lower()
        return hashlib.md5(normalized.encode('utf-8')).hexdigest()

    def is_duplicate(self, text: str) -> bool:
        content_hash = self.generate_content_hash(text)
        return content_hash in self.seen_hashes

    def clean_text(self, text: str) -> str:
        """Optimized text cleaning with compiled regex patterns"""
        if not text:
            return ""

        # Compile regex patterns once
        if not hasattr(self, 'clean_patterns'):
            self.clean_patterns = {
                'whitespace': re.compile(r'\s+'),
                'hyphen_newline': re.compile(r'-\n'),
                'newline': re.compile(r'\n'),
                'control_chars': re.compile(r'[\x00-\x1f\x7f-\x9f]'),
                'form_feed': re.compile(r'\x0c'),
                'header_footer': re.compile(r'^\d+\s+\w+\s+\d+$', flags=re.MULTILINE)
            }

        text = self.clean_patterns['hyphen_newline'].sub('', text)
        text = self.clean_patterns['header_footer'].sub('', text)
        text = self.clean_patterns['control_chars'].sub('', text)
        text = self.clean_patterns['form_feed'].sub('', text)
        text = self.clean_patterns['newline'].sub(' ', text)
        text = self.clean_patterns['whitespace'].sub(' ', text).strip()
        return text

    def extract_pdf_text(self, pdf_path: Path) -> Tuple[str, bool]:
        """Extract text from PDF using PyMuPDF with OCR fallback per page"""
        text = ""
        is_ocr = False

        try:
            doc = fitz.open(pdf_path)
            for page in doc:
                # First try text extraction
                page_text = page.get_text()
                if page_text.strip() and len(page_text) > 50:  # Valid text
                    text += self.clean_text(page_text) + "\n"
                elif self.enable_ocr:
                    # Use OCR only for this page
                    pix = page.get_pixmap()
                    img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
                    ocr_text = pytesseract.image_to_string(img)
                    cleaned_ocr = self.clean_text(ocr_text)
                    if cleaned_ocr:
                        text += cleaned_ocr + "\n"
                        is_ocr = True
            doc.close()
        except Exception as e:
            logging.warning(f"PDF processing failed for {pdf_path.name}: {e}")
            if self.enable_ocr:
                logging.info(f"Attempting full OCR for {pdf_path.name}")
                try:
                    doc = fitz.open(pdf_path)
                    for page in doc:
                        pix = page.get_pixmap()
                        img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
                        ocr_text = pytesseract.image_to_string(img)
                        text += self.clean_text(ocr_text) + "\n"
                    is_ocr = True
                except Exception as e2:
                    logging.warning(f"OCR failed for {pdf_path.name}: {e2}")

        return text, is_ocr

    def extract_docx_text(self, docx_path: Path) -> Tuple[str, bool]:
        text = ""
        try:
            doc = Document(docx_path)
            for para in doc.paragraphs:
                if para.text.strip():
                    text += self.clean_text(para.text) + "\n"
        except Exception as e:
            logging.warning(f"Failed to process {docx_path.name}: {e}")
        return text, False

    def extract_text(self, file_path: Path) -> Tuple[str, bool, str]:
        ext = file_path.suffix.lower()
        if ext == '.pdf':
            text, is_ocr = self.extract_pdf_text(file_path)
            return text, is_ocr, 'pdf'
        elif ext in ('.docx', '.doc'):
            text, is_ocr = self.extract_docx_text(file_path)
            return text, is_ocr, 'docx'
        return "", False, 'unknown'

    def chunk_text(self, text: str) -> List[str]:
        """Efficient text chunking with token-based boundaries"""
        if not self.max_tokens or not text.strip():
            return [text] if text.strip() else []

        # Split into paragraphs first
        paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
        chunks = []
        current_chunk = []
        current_token_count = 0

        for para in paragraphs:
            para_tokens = self.encoder.encode(para) if self.encoder else []
            para_token_count = len(para_tokens)

            # If paragraph is too big, split into sentences
            if para_token_count > self.max_tokens:
                sentences = re.split(r'(?<=[.!?])\s+', para)
                for sentence in sentences:
                    if not sentence.strip():
                        continue
                    sent_tokens = self.encoder.encode(sentence) if self.encoder else []
                    sent_token_count = len(sent_tokens)

                    # Add sentence to current chunk if it fits
                    if current_token_count + sent_token_count <= self.max_tokens:
                        current_chunk.append(sentence)
                        current_token_count += sent_token_count
                    else:
                        # Finalize current chunk
                        if current_chunk:
                            chunk_text = ' '.join(current_chunk)
                            if len(chunk_text) >= self.min_chunk_size:
                                chunks.append(chunk_text)

                            # Start new chunk with overlap
                            overlap_sents = current_chunk[-min(len(current_chunk), 3):]  # Last 1-3 sentences
                            current_chunk = overlap_sents + [sentence]
                            current_token_count = sum(len(self.encoder.encode(s)) for s in current_chunk)
                        else:
                            current_chunk = [sentence]
                            current_token_count = sent_token_count
            else:
                # Add entire paragraph to current chunk
                if current_token_count + para_token_count <= self.max_tokens:
                    current_chunk.append(para)
                    current_token_count += para_token_count
                else:
                    # Finalize current chunk
                    if current_chunk:
                        chunk_text = '\n\n'.join(current_chunk)
                        if len(chunk_text) >= self.min_chunk_size:
                            chunks.append(chunk_text)

                        # Start new chunk with overlap
                        overlap_paras = [current_chunk[-1]] if current_chunk else []
                        current_chunk = overlap_paras + [para]
                        current_token_count = para_token_count + (len(self.encoder.encode(overlap_paras[0])) if overlap_paras else 0)
                    else:
                        current_chunk = [para]
                        current_token_count = para_token_count

        # Add final chunk
        if current_chunk:
            chunk_text = '\n\n'.join(current_chunk)
            if len(chunk_text) >= self.min_chunk_size:
                chunks.append(chunk_text)

        return chunks

    def process_single_file(self, file_path: Path, stats: Dict[str, int]) -> List[Dict]:
        """Process a single file and return chunks"""
        try:
            text, is_ocr, file_type = self.extract_text(file_path)
            if not text.strip():
                return []

            # Update file type stats
            if file_type == 'pdf':
                stats["pdf_files"] += 1
            elif file_type == 'docx':
                stats["docx_files"] += 1

            # Language detection
            lang = None
            if self.target_language:
                lang = self.detect_language(text)
                if lang != self.target_language:
                    stats["language_filtered"] += 1
                    return []

            # Chunk text
            chunks = self.chunk_text(text)
            records = []

            for chunk in chunks:
                # Skip duplicates and low-quality chunks
                if self.is_duplicate(chunk):
                    stats["duplicates_removed"] += 1
                    continue

                quality_score = self.calculate_text_quality(chunk)
                if quality_score < self.min_text_quality:
                    continue

                # Generate record
                records.append({
                    "text": chunk,
                    "metadata": {
                        "source": file_path.name,
                        "file_type": file_type,
                        "language": lang,
                        "quality_score": quality_score,
                        "ocr_used": is_ocr,
                        "chunk_length": len(chunk),
                        "token_count": len(self.encoder.encode(chunk)) if self.encoder else None
                    }
                })

                # Update seen hashes
                content_hash = self.generate_content_hash(chunk)
                self.seen_hashes.add(content_hash)

            if records:
                stats["processed"] += 1
                if is_ocr:
                    stats["ocr_used"] += 1

            return records
        except Exception as e:
            logging.error(f"Error processing {file_path.name}: {e}")
            return []

    def process_documents(self):
        # Collect files
        pdf_files = list(self.input_dir.rglob("*.pdf"))
        docx_files = list(self.input_dir.rglob("*.docx"))
        files = pdf_files + docx_files
        if not files:
            logging.warning(f"No PDF or Word files found in {self.input_dir}")
            return

        stats = {
            "total_files": len(files),
            "processed": 0,
            "language_filtered": 0,
            "duplicates_removed": 0,
            "ocr_used": 0,
            "pdf_files": 0,
            "docx_files": 0
        }

        # Prepare output file
        self.output_file.parent.mkdir(parents=True, exist_ok=True)

        # Parallel processing with thread pool
        with open(self.output_file, 'a', encoding='utf-8') as outfile:
            if self.num_workers > 1:
                with concurrent.futures.ThreadPoolExecutor(max_workers=self.num_workers) as executor:
                    # Process files in parallel
                    future_to_file = {
                        executor.submit(self.process_single_file, file, stats): file
                        for file in files
                    }

                    # Create progress bar
                    iterator = tqdm(
                        concurrent.futures.as_completed(future_to_file),
                        total=len(files),
                        desc="Processing Documents"
                    ) if USE_TQDM else concurrent.futures.as_completed(future_to_file)

                    # Collect results
                    for future in iterator:
                        records = future.result()
                        for record in records:
                            outfile.write(json.dumps(record, ensure_ascii=False) + '\n')
            else:
                # Sequential processing
                iterator = tqdm(files, desc="Processing Documents") if USE_TQDM else files
                for file in iterator:
                    records = self.process_single_file(file, stats)
                    for record in records:
                        outfile.write(json.dumps(record, ensure_ascii=False) + '\n')

        logging.info("\n✅ Processing Complete")
        for key, val in stats.items():
            logging.info(f"- {key.replace('_', ' ').capitalize()}: {val}")
        logging.info(f"\n📝 Output saved to: {self.output_file}")

if __name__ == "__main__":
    extractor = EnhancedDocumentExtractor(
        max_tokens=512,
        min_chunk_size=100,
        overlap=50,
        target_language="en",
        min_text_quality=0.65,
        enable_ocr=True,
        num_workers=4  # Optimal for I/O bound tasks
    )
    extractor.process_documents()

Enter input directory (default: ./documents): /content/phyctex
Enter output file (default: ./pretraining_data.jsonl): /content/phyctex/physic2.jsonl


Processing Documents: 100%|██████████| 5/5 [04:29<00:00, 53.88s/it]


# **CLEANING DATA FOR GOOD SMART**

# **INSTALLING**

# **TRYING**

In [16]:
import json
import re
import time
import os
import threading
import hashlib
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
import spacy
from language_tool_python import LanguageTool

# Global thread-local storage for resources
thread_local = threading.local()

class TextCleaner:
    def __init__(self, enable_grammar=False):
        """Initialize with models for Colab"""
        self.enable_grammar = enable_grammar
        self.nlp = None
        self.tool = None

        # Precompile regex patterns for efficiency
        self.patterns = {
            'non_ascii': re.compile(r'[^\x00-\x7F]+'),
            'zero_width': re.compile(r'[\u200b-\u200d\uFEFF]'),
            'control_chars': re.compile(r'[\x00-\x1f\x7f-\x9f]'),
            'quotes': re.compile(r'[‘’]'),
            'double_quotes': re.compile(r'[“”]'),
            'symbols': re.compile(r"[^a-zA-Z0-9\s.,!?;:'\"-]"),
            'whitespace': re.compile(r'\s+'),
            'repeated_words': re.compile(r'\b(\w+)\s+\1\b', re.I),
            'sentence_end': re.compile(r'[.!?]$'),
            'sentence_start': re.compile(r'^\s*[a-z]')
        }

    def _load_models(self):
        """Lazy load models when needed with proper pipeline setup"""
        if self.nlp is None:
            # Load with minimal components and add sentencizer
            self.nlp = spacy.load("en_core_web_sm", disable=["parser", "ner", "lemmatizer", "tagger"])
            if "sentencizer" not in self.nlp.pipe_names:
                self.nlp.add_pipe("sentencizer")

        if self.tool is None and self.enable_grammar:
            self.tool = LanguageTool('en-US', config={'cacheSize': 1000, 'pipelineCaching': True})

    def clean_text(self, text):
        """Optimized text cleaning with precompiled regex"""
        if not text or not isinstance(text, str):
            return ""

        # Apply regex substitutions in sequence
        text = self.patterns['non_ascii'].sub(' ', text)
        text = self.patterns['zero_width'].sub('', text)
        text = self.patterns['control_chars'].sub('', text)
        text = self.patterns['quotes'].sub("'", text)
        text = self.patterns['double_quotes'].sub('"', text)
        text = self.patterns['symbols'].sub(' ', text)
        text = self.patterns['whitespace'].sub(' ', text).strip()

        return text

    def fix_grammar(self, text):
        """Grammar correction with caching and length limits"""
        if not self.enable_grammar or len(text.split()) < 3 or len(text) > 5000:
            return text

        try:
            # Use caching and limit text size for performance
            cache_key = hashlib.md5(text.encode()).hexdigest()
            if not hasattr(self, 'grammar_cache'):
                self.grammar_cache = {}

            if cache_key in self.grammar_cache:
                return self.grammar_cache[cache_key]

            matches = self.tool.check(text)
            if matches:
                corrected = self.tool.correct(text)
                self.grammar_cache[cache_key] = corrected
                return corrected
            return text
        except:
            return text

    def fix_sentences(self, text):
        """Efficient sentence structure improvement with fallback"""
        if not text:
            return ""

        self._load_models()
        if not self.nlp:
            return text

        try:
            # Process in chunks for long texts
            if len(text) > 10000:
                chunks = [text[i:i+10000] for i in range(0, len(text), 10000)]
                return " ".join(self.fix_sentences(chunk) for chunk in chunks)

            doc = self.nlp(text)
            sentences = []

            for sent in doc.sents:
                sent_text = sent.text.strip()
                if not sent_text:
                    continue

                # Efficient sentence formatting
                if not self.patterns['sentence_end'].search(sent_text):
                    sent_text += '.'
                if self.patterns['sentence_start'].search(sent_text):
                    sent_text = sent_text[0].upper() + sent_text[1:]

                sentences.append(sent_text)

            return ' '.join(sentences)
        except Exception as e:
            print(f"Sentence fixing failed: {e}")
            # Fallback to simple sentence splitting
            sentences = []
            for line in re.split(r'(?<=[.!?])\s+', text):
                line = line.strip()
                if line:
                    if line[0].islower():
                        line = line[0].upper() + line[1:]
                    if not line.endswith(('.', '!', '?')):
                        line += '.'
                    sentences.append(line)
            return ' '.join(sentences)

    def check_quality(self, text):
        """Fast quality checks with early termination"""
        if not text.strip():
            return False

        words = text.split()
        word_count = len(words)

        if word_count < 5:
            return False
        if len(set(map(str.lower, words))) < 3:
            return False
        if self.patterns['repeated_words'].search(text):
            return False
        return True

    def process_text(self, text):
        """Optimized text processing pipeline with better error handling"""
        if not text or not isinstance(text, str):
            return None

        cleaned = self.clean_text(text)

        # Early termination for low-quality text
        if not self.check_quality(cleaned):
            return None

        if self.enable_grammar:
            try:
                cleaned = self.fix_grammar(cleaned)
            except Exception as e:
                print(f"Grammar fixing failed: {e}")

        try:
            cleaned = self.fix_sentences(cleaned)
        except Exception as e:
            print(f"Sentence fixing failed: {e}")

        return cleaned if self.check_quality(cleaned) else None

def init_worker(enable_grammar):
    """Initialize thread-local resources"""
    thread_local.cleaner = TextCleaner(enable_grammar)

def process_line(line):
    """Process a single line with thread-local cleaner"""
    try:
        record = json.loads(line)
        if 'text' in record and record['text']:
            cleaned = thread_local.cleaner.process_text(record['text'])
            if cleaned:
                record['text'] = cleaned
                record['cleaned_at'] = time.time()
                return record
        return None
    except Exception as e:
        print(f"Error processing line: {e}")
        return None

def clean_jsonl(input_path, output_path, enable_grammar=False, num_workers=4):
    """Optimized JSONL processing with parallel execution"""
    # Count total lines for progress bar
    total_lines = 0
    with open(input_path, 'r', encoding='utf-8') as f:
        total_lines = sum(1 for _ in f)

    processed_count = 0
    skipped_count = 0

    with open(input_path, 'r', encoding='utf-8') as infile, \
         open(output_path, 'w', encoding='utf-8') as outfile:

        # Create a list of all lines for processing
        lines = infile.readlines()

        with ThreadPoolExecutor(
            max_workers=num_workers,
            initializer=init_worker,
            initargs=(enable_grammar,)
        ) as executor:

            # Process all lines with tqdm progress bar
            results = []
            with tqdm(total=total_lines, desc="Processing Records") as pbar:
                for result in executor.map(process_line, lines):
                    if result:
                        outfile.write(json.dumps(result) + '\n')
                        processed_count += 1
                    else:
                        skipped_count += 1
                    pbar.update(1)

    print(f"\nProcessed: {processed_count} records")
    print(f"Skipped: {skipped_count} records")
    return processed_count, skipped_count

# For Google Colab file handling
from google.colab import files

def main():
    print("🧼 Optimized Text Cleaner for Colab")

    # Configuration
    ENABLE_GRAMMAR = False  # Disabled by default due to performance impact
    NUM_WORKERS = 4          # Optimal for Colab environment

    # File handling
    input_file = 'input.jsonl'
    if not os.path.exists(input_file):
        print("Please upload your input file:")
        uploaded = files.upload()
        if uploaded:
            input_file = next(iter(uploaded.keys()))
        else:
            print("No file uploaded!")
            return

    output_file = 'cleaned_output.jsonl'

    print("\n⚡ Processing your data...")
    start = time.time()

    # Process the file
    processed_count, skipped_count = clean_jsonl(
        input_file,
        output_file,
        enable_grammar=ENABLE_GRAMMAR,
        num_workers=NUM_WORKERS
    )

    duration = time.time() - start
    rate = processed_count / duration if duration > 0 else 0

    print(f"\n✅ Cleaning complete in {duration:.2f} seconds")
    print(f"Processing rate: {rate:.1f} records/sec")

    # Verify output
    output_size = os.path.getsize(output_file)
    if output_size == 0:
        print("\n⚠️ Warning: Output file is empty!")
        print("Possible reasons:")
        print("- All records were filtered out by quality checks")
        print("- Input file format is invalid")
        print("- No 'text' field found in input records")
        print("- Text cleaning removed all content")

        # Debug: Show first 5 records
        print("\nDebugging first 5 records:")
        with open(input_file, 'r', encoding='utf-8') as f:
            for i, line in enumerate(f):
                if i >= 5:
                    break
                try:
                    record = json.loads(line)
                    text = record.get('text', '')
                    print(f"Record {i+1}:")
                    print(f"  Original length: {len(text)}")
                    print(f"  First 100 chars: {text[:100]}")
                except:
                    print(f"Record {i+1}: Invalid JSON")
    else:
        print(f"Output file size: {output_size/1024:.1f} KB")
        print("Download your cleaned file:")
        files.download(output_file)

if __name__ == "__main__":
    main()

🧼 Optimized Text Cleaner for Colab
Please upload your input file:


Saving physic2(1).jsonl to physic2(1).jsonl

⚡ Processing your data...


Processing Records: 100%|██████████| 530/530 [00:11<00:00, 45.24it/s]


Processed: 434 records
Skipped: 96 records

✅ Cleaning complete in 11.74 seconds
Processing rate: 37.0 records/sec
Output file size: 952.2 KB
Download your cleaned file:





<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [None]:
!pip install python-docx language_tool_python

In [None]:
!pip install language_tool_python spacy
!python -m spacy download en_core_web_sm

In [None]:
!pip install language_tool_python spacy
!python -m spacy download en_core_web_sm

In [None]:
!pip install pytesseract