In [2]:
!pip install docx2txt


Collecting docx2txt
  Downloading docx2txt-0.9-py3-none-any.whl.metadata (529 bytes)
Downloading docx2txt-0.9-py3-none-any.whl (4.0 kB)
Installing collected packages: docx2txt
Successfully installed docx2txt-0.9


In [3]:
import os
from pathlib import Path
import fitz  # PyMuPDF
import docx2txt

# Step 1: Define the root folder containing all product folders
root_folder = Path(r"C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data\abb_products_pdf_files")
output_folder = root_folder / "processed"
individual_docs_folder = output_folder / "individual_docs"
individual_docs_folder.mkdir(parents=True, exist_ok=True)

# Step 2: Collect all PDF and DOCX files recursively
pdf_files = list(root_folder.rglob("*.pdf"))
docx_files = list(root_folder.rglob("*.docx"))
all_files = pdf_files + docx_files

# Step 3: Define functions to extract text
def extract_text_from_pdf(file_path):
    try:
        doc = fitz.open(file_path)
        text = "\n".join([page.get_text() for page in doc])
        doc.close()
        return text
    except Exception as e:
        print(f"❌ Error reading PDF {file_path}: {e}")
        return ""

def extract_text_from_docx(file_path):
    try:
        return docx2txt.process(file_path)
    except Exception as e:
        print(f"❌ Error reading DOCX {file_path}: {e}")
        return ""

# Step 4: Extract and save individual documents + build final corpus
merged_corpus = []
for i, file in enumerate(all_files):
    if file.suffix.lower() == ".pdf":
        text = extract_text_from_pdf(file)
    elif file.suffix.lower() == ".docx":
        text = extract_text_from_docx(file)
    else:
        continue

    if text.strip():
        # Save to individual .txt file
        clean_name = f"{i:04d}__{file.stem}".replace(" ", "_").replace("/", "_")
        individual_file_path = individual_docs_folder / f"{clean_name}.txt"
        with open(individual_file_path, "w", encoding="utf-8") as f:
            f.write(text.strip())

        # Append to merged corpus
        merged_corpus.append(text.strip())

# Step 5: Save merged corpus
merged_path = output_folder / "abb_corpus.txt"
with open(merged_path, "w", encoding="utf-8") as f:
    for doc in merged_corpus:
        f.write(doc + "\n\n")

print(f"✅ Preprocessing complete.")
print(f"📄 {len(merged_corpus)} documents processed.")
print(f"📁 Individual files saved in: {individual_docs_folder}")
print(f"📘 Merged corpus saved at: {merged_path}")


✅ Preprocessing complete.
📄 601 documents processed.
📁 Individual files saved in: C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data\abb_products_pdf_files\processed\individual_docs
📘 Merged corpus saved at: C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data\abb_products_pdf_files\processed\abb_corpus.txt


In [1]:
import os
from pathlib import Path
import fitz  # PyMuPDF
import docx2txt
import re

# Set paths
root_folder = Path(r"C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data\abb_products_pdf_files")
output_folder = root_folder / "processed_removedunwanted"
individual_docs_folder = output_folder / "individual_docs"
individual_docs_folder.mkdir(parents=True, exist_ok=True)

# Step 1: Collect all PDF and DOCX files recursively
pdf_files = list(root_folder.rglob("*.pdf"))
docx_files = list(root_folder.rglob("*.docx"))
all_files = pdf_files + docx_files

# Step 2: Define regex patterns for cleaning
noise_patterns = [
    r"^Contents\b.*",  # Table of contents header
    r"^Page\s+\d+",
    r"^ABB\s*$",
    r"^Wireless Controller.*",
    r"^Product version.*",
    r"^Issued.*",
    r"^Revision.*",
    r"^Figure\s+\d+\..*",
    r"^GUID-[\w\-]+",
    r"^© Copyright.*",
    r"^All rights reserved.*",
    r"^Trademarks.*",
    r"^Disclaimer.*",
    r"^\s*-+\s*$",
    r"^\s*$"
]
noise_regex = re.compile("|".join(noise_patterns), re.IGNORECASE)

# Step 3: Define text extractors
def extract_text_from_pdf(file_path):
    try:
        doc = fitz.open(file_path)
        text = "\n".join([page.get_text() for page in doc])
        doc.close()
        return text
    except Exception as e:
        print(f"❌ Error reading PDF {file_path}: {e}")
        return ""

def extract_text_from_docx(file_path):
    try:
        return docx2txt.process(file_path)
    except Exception as e:
        print(f"❌ Error reading DOCX {file_path}: {e}")
        return ""

def clean_text(text):
    cleaned_lines = []
    for line in text.splitlines():
        if not noise_regex.match(line.strip()):
            cleaned_lines.append(line.strip())
    return "\n".join(cleaned_lines).strip()

# Step 4: Process files, save clean individual and merged corpus
merged_corpus = []

for i, file in enumerate(all_files):
    if file.suffix.lower() == ".pdf":
        raw_text = extract_text_from_pdf(file)
    elif file.suffix.lower() == ".docx":
        raw_text = extract_text_from_docx(file)
    else:
        continue

    cleaned = clean_text(raw_text)
    if cleaned:
        # Save individual cleaned file
        name = f"{i:04d}__{file.stem}".replace(" ", "_").replace("/", "_")
        file_path = individual_docs_folder / f"{name}.txt"
        with open(file_path, "w", encoding="utf-8") as f:
            f.write(cleaned)
        merged_corpus.append(cleaned)

# Step 5: Save final merged corpus
merged_path = output_folder / "abb_corpus.txt"
with open(merged_path, "w", encoding="utf-8") as f:
    for doc in merged_corpus:
        f.write(doc + "\n\n")

print(f"✅ Preprocessing complete.")
print(f"📄 {len(merged_corpus)} cleaned documents saved to: {individual_docs_folder}")
print(f"📘 Merged corpus saved at: {merged_path}")


✅ Preprocessing complete.
📄 601 cleaned documents saved to: C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data\abb_products_pdf_files\processed_removedunwanted\individual_docs
📘 Merged corpus saved at: C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data\abb_products_pdf_files\processed_removedunwanted\abb_corpus.txt


In [None]:
import os
from pathlib import Path
import fitz  # PyMuPDF
import docx2txt
import re

# Set paths
root_folder = Path(r"C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data\abb_products_pdf_files")
root2_folder=Path(r"C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data")
output_folder = root2_folder / "processed_removedunwanted_images"
individual_docs_folder = output_folder / "individual_docs"
images_folder = output_folder / "extracted_images"
individual_docs_folder.mkdir(parents=True, exist_ok=True)
images_folder.mkdir(parents=True, exist_ok=True)

# Collect all PDF and DOCX files
pdf_files = list(root_folder.rglob("*.pdf"))
docx_files = list(root_folder.rglob("*.docx"))
all_files = pdf_files + docx_files

# Regex patterns for cleaning unwanted text
noise_regex = re.compile("|".join(noise_patterns), re.IGNORECASE)

# Text extraction and cleaning
def extract_text_from_pdf(file_path):
    try:
        doc = fitz.open(file_path)
        text = "\n".join([page.get_text() for page in doc])
        doc.close()
        return text
    except Exception as e:
        print(f"❌ Error reading PDF {file_path}: {e}")
        return ""

def extract_text_from_docx(file_path):
    try:
        return docx2txt.process(file_path)
    except Exception as e:
        print(f"❌ Error reading DOCX {file_path}: {e}")
        return ""

def clean_text(text):
    cleaned_lines = [line.strip() for line in text.splitlines() if not noise_regex.match(line.strip())]
    return "\n".join(cleaned_lines).strip()

# Image extractor from PDFs
def extract_images_from_pdf(pdf_path, base_name):
    try:
        doc = fitz.open(pdf_path)
        for page_num in range(len(doc)):
            for img_index, img in enumerate(doc.get_page_images(page_num)):
                xref = img[0]
                base_image = doc.extract_image(xref)
                image_bytes = base_image["image"]
                image_ext = base_image["ext"]
                image_path = images_folder / f"{base_name}_p{page_num+1}_img{img_index+1}.{image_ext}"
                with open(image_path, "wb") as f:
                    f.write(image_bytes)
        doc.close()
    except Exception as e:
        print(f"⚠️ Error extracting image from {pdf_path}: {e}")

# Process all documents
merged_corpus = []

for i, file in enumerate(all_files):
    name = f"{i:04d}__{file.stem}".replace(" ", "_").replace("/", "_")

    if file.suffix.lower() == ".pdf":
        raw_text = extract_text_from_pdf(file)
        extract_images_from_pdf(file, name)
    elif file.suffix.lower() == ".docx":
        raw_text = extract_text_from_docx(file)
    else:
        continue

    cleaned = clean_text(raw_text)
    if cleaned:
        txt_path = individual_docs_folder / f"{name}.txt"
        with open(txt_path, "w", encoding="utf-8") as f:
            f.write(cleaned)
        merged_corpus.append(cleaned)

# Save merged corpus
merged_path = output_folder / "abb_corpus.txt"
with open(merged_path, "w", encoding="utf-8") as f:
    for doc in merged_corpus:
        f.write(doc + "\n\n")

# Summary
print(f"✅ Preprocessing complete.")
print(f"📄 {len(merged_corpus)} cleaned documents saved to: {individual_docs_folder}")
print(f"🖼️ Extracted images saved to: {images_folder}")
print(f"📘 Merged corpus saved at: {merged_path}")


✅ Preprocessing complete.
📄 601 cleaned documents saved to: C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data\processed_removedunwanted_images\individual_docs
🖼️ Extracted images saved to: C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data\processed_removedunwanted_images\extracted_images
📘 Merged corpus saved at: C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data\processed_removedunwanted_images\abb_corpus.txt


In [10]:
import os
from pathlib import Path
import fitz  # PyMuPDF
import docx2txt
import re

# Set paths
root_folder = Path(r"C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data\abb_products_pdf_files")
root2_folder = Path(r"C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data")
output_folder = root2_folder / "processed_filtered"
individual_docs_folder = output_folder / "individual_docs"
images_folder = output_folder / "extracted_images"
individual_docs_folder.mkdir(parents=True, exist_ok=True)
images_folder.mkdir(parents=True, exist_ok=True)

# Noise patterns
noise_patterns = [
    r"^Contents\b.*", r"^Page\s+\d+", r"^ABB\s*$",
    r"^Product version.*", r"^Issued.*", r"^Revision.*",
    r"^Figure\s+\d+\..*", r"^GUID-[\w\-]+",
    r"^© Copyright.*", r"^All rights reserved.*",
    r"^Trademarks.*", r"^Disclaimer.*",
    r"^\s*-+\s*$", r"^\s*$"
]
noise_regex = re.compile("|".join(noise_patterns), re.IGNORECASE)

def is_near_image(block_rect, image_rects, margin=30):
    """Check if block overlaps or is very close to any image"""
    for img in image_rects:
        if (block_rect.intersects(img) or
            abs(block_rect.y0 - img.y1) < margin or
            abs(block_rect.y1 - img.y0) < margin):
            return True
    return False

def extract_clean_text_from_pdf(file_path):
    """Extract text from PDF excluding regions near images"""
    text_blocks = []
    try:
        doc = fitz.open(file_path)
        for page in doc:
            image_rects = [fitz.Rect(img[1:5]) for img in page.get_images(full=True)]
            for block in page.get_text("blocks"):
                rect = fitz.Rect(block[:4])
                text = block[4].strip()
                if not is_near_image(rect, image_rects) and text and not noise_regex.match(text):
                    text_blocks.append(text)
        doc.close()
        return "\n".join(text_blocks)
    except Exception as e:
        print(f"❌ Error processing {file_path}: {e}")
        return ""

def extract_text_from_docx(file_path):
    try:
        return docx2txt.process(file_path)
    except Exception as e:
        print(f"❌ Error reading DOCX {file_path}: {e}")
        return ""

def extract_images_from_pdf(pdf_path, base_name):
    try:
        doc = fitz.open(pdf_path)
        for page_num in range(len(doc)):
            for img_index, img in enumerate(doc.get_page_images(page_num)):
                xref = img[0]
                base_image = doc.extract_image(xref)
                image_bytes = base_image["image"]
                image_ext = base_image["ext"]
                image_path = images_folder / f"{base_name}_p{page_num+1}_img{img_index+1}.{image_ext}"
                with open(image_path, "wb") as f:
                    f.write(image_bytes)
        doc.close()
    except Exception as e:
        print(f"⚠️ Error extracting image from {pdf_path}: {e}")

# Process files
pdf_files = list(root_folder.rglob("*.pdf"))
docx_files = list(root_folder.rglob("*.docx"))
all_files = pdf_files + docx_files
merged_corpus = []

for i, file in enumerate(all_files):
    name = f"{i:04d}__{file.stem}".replace(" ", "_").replace("/", "_")

    if file.suffix.lower() == ".pdf":
        raw_text = extract_clean_text_from_pdf(file)
        extract_images_from_pdf(file, name)
    elif file.suffix.lower() == ".docx":
        raw_text = extract_text_from_docx(file)
    else:
        continue

    if raw_text.strip():
        txt_path = individual_docs_folder / f"{name}.txt"
        with open(txt_path, "w", encoding="utf-8") as f:
            f.write(raw_text.strip())
        merged_corpus.append(raw_text.strip())

# Save final merged corpus
merged_path = output_folder / "abb_corpus.txt"
with open(merged_path, "w", encoding="utf-8") as f:
    for doc in merged_corpus:
        f.write(doc + "\n\n")

print(f"✅ Done. Cleaned text saved to {individual_docs_folder}")
print(f"🖼️ Images saved to {images_folder}")
print(f"📘 Merged corpus: {merged_path}")


✅ Done. Cleaned text saved to C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data\processed_filtered\individual_docs
🖼️ Images saved to C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data\processed_filtered\extracted_images
📘 Merged corpus: C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data\processed_filtered\abb_corpus.txt


In [None]:
import os
from pathlib import Path
import fitz  # PyMuPDF
import docx2txt
import re

# Set paths
root_folder = Path(r"C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data\abb_products_pdf_files")
root2_folder = Path(r"C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data")
output_folder = root2_folder / "processed_removed2words_new"
individual_docs_folder = output_folder / "individual_docs"
images_folder = output_folder / "extracted_images"
individual_docs_folder.mkdir(parents=True, exist_ok=True)
images_folder.mkdir(parents=True, exist_ok=True)

# Noise patterns
noise_patterns = [
    r"^Contents\\b.*", r"^Page\\s+\\d+", r"^ABB\\s*$",
    r"^Product version.*", r"^Issued.*", r"^Revision.*",
    r"^Figure\\s+\\d+\\..*", r"^GUID-[\\w\\-]+",
    r"^\\u00a9 Copyright.*", r"^All rights reserved.*",
    r"^Trademarks.*", r"^Disclaimer.*",
    r"^\s*-+\s*$", r"^\s*$"
]
noise_regex = re.compile("|".join(noise_patterns), re.IGNORECASE)

def is_near_image(block_rect, image_rects, margin=30):
    for img in image_rects:
        if (block_rect.intersects(img) or
            abs(block_rect.y0 - img.y1) < margin or
            abs(block_rect.y1 - img.y0) < margin):
            return True
    return False

def is_meaningful(text, min_word_count=3):
    return len([w for w in text.split() if w.isalpha()]) >= min_word_count

def extract_clean_text_from_pdf(file_path):
    """Extract text from PDF excluding regions near images and low-quality lines."""
    text_blocks = []
    try:
        doc = fitz.open(file_path)
        for page in doc:
            image_rects = [fitz.Rect(img[1:5]) for img in page.get_images(full=True)]
            for block in page.get_text("blocks"):
                rect = fitz.Rect(block[:4])
                text = block[4].strip()
                
                # Skip image-area or empty block
                if is_near_image(rect, image_rects) or not text:
                    continue
                
                # Apply strict quality filters to each line in the block
                for line in text.splitlines():
                    line = line.strip()
                    if len(line) < 5:
                        continue  # Skip very short lines
                    words = line.split()
                    word_count = len(words)
                    if word_count < 4:
                        continue  # Skip short lines (likely table or figure fragments)
                    alpha_chars = sum(c.isalpha() for c in line)
                    total_chars = len(line)
                    if total_chars == 0 or (alpha_chars / total_chars) < 0.5:
                        continue  # Skip lines with mostly symbols/numbers
                    if not noise_regex.match(line):  # Skip known boilerplate patterns
                        text_blocks.append(line)
        doc.close()
        return "\n".join(text_blocks)
    except Exception as e:
        print(f"❌ Error processing {file_path}: {e}")
        return ""


def extract_text_from_docx(file_path):
    try:
        return docx2txt.process(file_path)
    except Exception as e:
        print(f"❌ Error reading DOCX {file_path}: {e}")
        return ""

def extract_images_from_pdf(pdf_path, base_name):
    try:
        doc = fitz.open(pdf_path)
        for page_num in range(len(doc)):
            for img_index, img in enumerate(doc.get_page_images(page_num)):
                xref = img[0]
                base_image = doc.extract_image(xref)
                image_bytes = base_image["image"]
                image_ext = base_image["ext"]
                image_path = images_folder / f"{base_name}_p{page_num+1}_img{img_index+1}.{image_ext}"
                with open(image_path, "wb") as f:
                    f.write(image_bytes)
        doc.close()
    except Exception as e:
        print(f"⚠️ Error extracting image from {pdf_path}: {e}")

# Process files
pdf_files = list(root_folder.rglob("*.pdf"))
docx_files = list(root_folder.rglob("*.docx"))
all_files = pdf_files + docx_files
merged_corpus = []

for i, file in enumerate(all_files):
    name = f"{i:04d}__{file.stem}".replace(" ", "_").replace("/", "_")

    if file.suffix.lower() == ".pdf":
        raw_text = extract_clean_text_from_pdf(file)
        extract_images_from_pdf(file, name)
    elif file.suffix.lower() == ".docx":
        raw_text = extract_text_from_docx(file)
    else:
        continue

    if raw_text.strip():
        txt_path = individual_docs_folder / f"{name}.txt"
        with open(txt_path, "w", encoding="utf-8") as f:
            f.write(raw_text.strip())
        merged_corpus.append(raw_text.strip())

# Save final merged corpus
merged_path = output_folder / "abb_corpus.txt"
with open(merged_path, "w", encoding="utf-8") as f:
    for doc in merged_corpus:
        f.write(doc + "\n\n")

print(f"✅ Done. Cleaned text saved to {individual_docs_folder}")
print(f"🖼️ Images saved to {images_folder}")
print(f"📘 Merged corpus: {merged_path}")


✅ Done. Cleaned text saved to C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data\processed_removed2words_new\individual_docs
🖼️ Images saved to C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data\processed_removed2words_new\extracted_images
📘 Merged corpus: C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data\processed_removed2words_new\abb_corpus.txt


In [None]:
import os
from pathlib import Path
import fitz  # PyMuPDF
import docx2txt
import re

# Paths
root_folder = Path(r"C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data\abb_products_pdf_files")
root2_folder = Path(r"C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data")
output_folder = root2_folder / "processed_final"
individual_docs_folder = output_folder / "individual_docs"
images_folder = output_folder / "extracted_images"
individual_docs_folder.mkdir(parents=True, exist_ok=True)
images_folder.mkdir(parents=True, exist_ok=True)

# Noise patterns
noise_patterns = [
    r"^Contents\b.*", r"^Page\s+\d+", r"^ABB\s*$",
    r"^Product version.*", r"^Issued.*", r"^Revision.*",
    r"^Figure\s+\d+\..*", r"^GUID-[\w\-]+", r"^© Copyright.*",
    r"^All rights reserved.*", r"^Trademarks.*", r"^Disclaimer.*",
    r"^http[s]?://.*", r"^\s*-+\s*$", r"^\s*$"
]
noise_regex = re.compile("|".join(noise_patterns), re.IGNORECASE)

# Utility functions
def is_near_image(block_rect, image_rects, margin=30):
    for img in image_rects:
        if (block_rect.intersects(img) or
            abs(block_rect.y0 - img.y1) < margin or
            abs(block_rect.y1 - img.y0) < margin):
            return True
    return False

def is_meaningful(line):
    words = line.split()
    if len(words) < 4:
        return False
    alpha_ratio = sum(c.isalpha() for c in line) / len(line)
    return alpha_ratio > 0.5

# Main PDF text extractor
def extract_clean_text_from_pdf(file_path):
    text_blocks = []
    try:
        doc = fitz.open(file_path)
        first_heading = None
        for page_num, page in enumerate(doc):
            if page_num == 0:
                # Capture the title on the first page
                blocks = page.get_text("dict")["blocks"]
                titles = [
                    block["lines"][0]["spans"][0]["text"].strip()
                    for block in blocks
                    if block.get("lines") and block["lines"][0]["spans"][0]["size"] > 15  # Likely headings
                ]
                if titles:
                    first_heading = " ".join(titles).strip()
            
            image_rects = [fitz.Rect(img[1:5]) for img in page.get_images(full=True)]
            for block in page.get_text("blocks"):
                rect = fitz.Rect(block[:4])
                text = block[4].strip()
                if is_near_image(rect, image_rects) or not text:
                    continue
                for line in text.splitlines():
                    line = line.strip()
                    if noise_regex.match(line):
                        continue
                    if is_meaningful(line):
                        text_blocks.append(line)
        doc.close()
        if first_heading:
            return f"{first_heading}\n\n" + "\n".join(text_blocks)
        return "\n".join(text_blocks)
    except Exception as e:
        print(f"❌ Error processing {file_path}: {e}")
        return ""

# DOCX extractor
def extract_text_from_docx(file_path):
    try:
        return docx2txt.process(file_path)
    except Exception as e:
        print(f"❌ Error reading DOCX {file_path}: {e}")
        return ""

# Image extractor
def extract_images_from_pdf(pdf_path, base_name):
    try:
        doc = fitz.open(pdf_path)
        for page_num in range(len(doc)):
            for img_index, img in enumerate(doc.get_page_images(page_num)):
                xref = img[0]
                base_image = doc.extract_image(xref)
                image_bytes = base_image["image"]
                image_ext = base_image["ext"]
                image_path = images_folder / f"{base_name}_p{page_num+1}_img{img_index+1}.{image_ext}"
                with open(image_path, "wb") as f:
                    f.write(image_bytes)
        doc.close()
    except Exception as e:
        print(f"⚠️ Error extracting image from {pdf_path}: {e}")

# Process all files
all_files = list(root_folder.rglob("*.pdf")) + list(root_folder.rglob("*.docx"))
merged_corpus = []

for i, file in enumerate(all_files):
    name = f"{i:04d}__{file.stem}".replace(" ", "_").replace("/", "_")
    
    if file.suffix.lower() == ".pdf":
        raw_text = extract_clean_text_from_pdf(file)
        extract_images_from_pdf(file, name)
    elif file.suffix.lower() == ".docx":
        raw_text = extract_text_from_docx(file)
    else:
        continue

    if raw_text.strip():
        txt_path = individual_docs_folder / f"{name}.txt"
        with open(txt_path, "w", encoding="utf-8") as f:
            f.write(raw_text.strip())
        merged_corpus.append(raw_text.strip())

# Save final merged corpus
merged_path = output_folder / "abb_corpus.txt"
with open(merged_path, "w", encoding="utf-8") as f:
    for doc in merged_corpus:
        f.write(doc + "\n\n")

print(f"✅ Final preprocessing complete.")
print(f"📄 Individual text files saved to: {individual_docs_folder}")
print(f"🖼️ Images extracted to: {images_folder}")
print(f"📘 Merged corpus written to: {merged_path}")


✅ Final preprocessing complete.
📄 Individual text files saved to: C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data\processed_final\individual_docs
🖼️ Images extracted to: C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data\processed_final\extracted_images
📘 Merged corpus written to: C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data\processed_final\abb_corpus.txt


: 

In [2]:
!pip install pdfplumber

Collecting pdfplumber
  Downloading pdfplumber-0.11.6-py3-none-any.whl.metadata (42 kB)
Collecting pdfminer.six==20250327 (from pdfplumber)
  Downloading pdfminer_six-20250327-py3-none-any.whl.metadata (4.1 kB)
Collecting pypdfium2>=4.18.0 (from pdfplumber)
  Downloading pypdfium2-4.30.1-py3-none-win_amd64.whl.metadata (48 kB)
Downloading pdfplumber-0.11.6-py3-none-any.whl (60 kB)
Downloading pdfminer_six-20250327-py3-none-any.whl (5.6 MB)
   ---------------------------------------- 0.0/5.6 MB ? eta -:--:--
   --- ------------------------------------ 0.5/5.6 MB 2.4 MB/s eta 0:00:03
   ------------- -------------------------- 1.8/5.6 MB 4.8 MB/s eta 0:00:01
   -------------------------- ------------- 3.7/5.6 MB 6.4 MB/s eta 0:00:01
   ------------------------------------- -- 5.2/5.6 MB 6.6 MB/s eta 0:00:01
   ---------------------------------------- 5.6/5.6 MB 6.5 MB/s eta 0:00:00
Downloading pypdfium2-4.30.1-py3-none-win_amd64.whl (3.0 MB)
   ---------------------------------------- 0.

In [8]:
import os
from pathlib import Path
import fitz  # PyMuPDF
import docx2txt
import re
import csv
from io import StringIO
import logging

# Set up logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)

# Check PyMuPDF version
if not hasattr(fitz.Page, "find_tables"):
    logger.error("PyMuPDF version does not support table extraction. Please upgrade to version 1.23.0 or higher.")
    raise ImportError("PyMuPDF version too old. Run 'pip install --upgrade pymupdf'.")

# Paths
root_folder = Path(r"C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data\abb_products_pdf_files")
root2_folder = Path(r"C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data")
output_folder = root2_folder / "processed_final_grok"
individual_docs_folder = output_folder / "individual_docs"
images_folder = output_folder / "extracted_images"
individual_docs_folder.mkdir(parents=True, exist_ok=True)
images_folder.mkdir(parents=True, exist_ok=True)

# Noise patterns
noise_patterns = [
    r"^Contents\b.*", r"^Page\s+\d+", r"^ABB\s*$",
    r"^Product version.*", r"^Issued.*", r"^Revision.*",
    r"^Figure\s+\d+\..*", r"^GUID-[\w\-]+", r"^© Copyright.*",
    r"^All rights reserved.*", r"^Trademarks.*", r"^Disclaimer.*",
    r"^http[s]?://.*", r"^\s*-+\s*$", r"^\s*$"
]
noise_regex = re.compile("|".join(noise_patterns), re.IGNORECASE)

# Utility functions
def is_near_image(block_rect, image_rects, margin=30):
    for img in image_rects:
        if (block_rect.intersects(img) or
            abs(block_rect.y0 - img.y1) < margin or
            abs(block_rect.y1 - img.y0) < margin):
            return True
    return False

def is_meaningful(line):
    words = line.split()
    if len(words) < 4:
        return False
    alpha_ratio = sum(c.isalpha() for c in line) / len(line)
    return alpha_ratio > 0.5

def extract_text_from_table_region(page, table_bbox):
    """Extract text from table region using text blocks method as fallback."""
    try:
        # Get text blocks within the table bounding box
        table_rect = fitz.Rect(table_bbox)
        blocks = page.get_text("dict", clip=table_rect)
        
        # Extract text from blocks and organize by position
        text_items = []
        for block in blocks.get("blocks", []):
            if "lines" in block:
                for line in block["lines"]:
                    for span in line["spans"]:
                        text = span["text"].strip()
                        if text:
                            bbox = span["bbox"]
                            text_items.append({
                                "text": text,
                                "x0": bbox[0],
                                "y0": bbox[1],
                                "x1": bbox[2],
                                "y1": bbox[3]
                            })
        
        # Sort by vertical position first, then horizontal
        text_items.sort(key=lambda x: (x["y0"], x["x0"]))
        
        # Group items into rows based on vertical position
        rows = []
        current_row = []
        current_y = None
        tolerance = 5  # pixels tolerance for same row
        
        for item in text_items:
            if current_y is None or abs(item["y0"] - current_y) <= tolerance:
                current_row.append(item)
                current_y = item["y0"] if current_y is None else current_y
            else:
                if current_row:
                    # Sort current row by horizontal position
                    current_row.sort(key=lambda x: x["x0"])
                    rows.append([item["text"] for item in current_row])
                current_row = [item]
                current_y = item["y0"]
        
        # Add the last row
        if current_row:
            current_row.sort(key=lambda x: x["x0"])
            rows.append([item["text"] for item in current_row])
        
        return rows
    except Exception as e:
        logger.warning(f"Error extracting text from table region: {e}")
        return []

def format_table_to_text(page, table):
    """Convert a table to a CSV-like text format with proper text extraction."""
    try:
        if not table:
            logger.warning("Empty table encountered.")
            return ""
        
        output = StringIO()
        writer = csv.writer(output, lineterminator='\n')
        
        # First, try to extract using PyMuPDF's table extraction
        try:
            table_data = table.extract()
            if table_data and len(table_data) > 0:
                # Check if we got actual text content
                sample_cell = str(table_data[0][0]) if table_data[0] else ""
                if not (sample_cell.startswith("(") and "," in sample_cell and sample_cell.endswith(")")):
                    # We got actual text, not coordinates
                    for row in table_data:
                        cleaned_row = []
                        for cell in row:
                            cell_text = str(cell).strip() if cell is not None else ""
                            cleaned_row.append(cell_text)
                        if any(cleaned_row):  # Only add non-empty rows
                            writer.writerow(cleaned_row)
                    
                    table_text = output.getvalue()
                    output.close()
                    return table_text if table_text.strip() else ""
        except Exception as e:
            logger.warning(f"PyMuPDF table extraction failed: {e}")
        
        # Fallback: Extract text from table region using coordinate-based method
        logger.info("Using fallback text extraction for table")
        table_bbox = table.bbox
        rows = extract_text_from_table_region(page, table_bbox)
        
        if rows:
            for row in rows:
                if any(cell.strip() for cell in row):  # Only add rows with content
                    writer.writerow(row)
            
            table_text = output.getvalue()
            output.close()
            return table_text if table_text.strip() else ""
        
        output.close()
        return ""
        
    except Exception as e:
        logger.error(f"Error formatting table: {e}")
        return ""

def extract_tables_with_textboxes(page):
    """Alternative table extraction using text positioning analysis."""
    try:
        # Get all text with detailed positioning
        text_dict = page.get_text("dict")
        
        # Collect all text elements with their positions
        text_elements = []
        for block in text_dict.get("blocks", []):
            if "lines" in block:
                for line in block["lines"]:
                    for span in line["spans"]:
                        text = span["text"].strip()
                        if text and len(text) > 1:  # Filter out single characters
                            bbox = span["bbox"]
                            text_elements.append({
                                "text": text,
                                "x0": bbox[0],
                                "y0": bbox[1],
                                "x1": bbox[2],
                                "y1": bbox[3],
                                "size": span["size"]
                            })
        
        # Group elements that might form tables (similar y-coordinates, aligned x-coordinates)
        potential_tables = []
        processed_elements = set()
        
        for i, elem in enumerate(text_elements):
            if i in processed_elements:
                continue
                
            # Look for elements at similar y-level (potential row)
            row_elements = [elem]
            processed_elements.add(i)
            
            for j, other_elem in enumerate(text_elements[i+1:], i+1):
                if j in processed_elements:
                    continue
                    
                # Check if elements are on the same row (similar y-coordinates)
                if abs(elem["y0"] - other_elem["y0"]) <= 3:
                    row_elements.append(other_elem)
                    processed_elements.add(j)
            
            # If we found multiple elements in a row, it might be a table row
            if len(row_elements) >= 2:
                row_elements.sort(key=lambda x: x["x0"])  # Sort by x-coordinate
                potential_tables.append([elem["text"] for elem in row_elements])
        
        return potential_tables
        
    except Exception as e:
        logger.warning(f"Alternative table extraction failed: {e}")
        return []

# Main PDF text and table extractor
def extract_clean_text_from_pdf(file_path):
    text_blocks = []
    try:
        doc = fitz.open(file_path)
        first_heading = None
        
        for page_num, page in enumerate(doc):
            # Capture the title on the first page
            if page_num == 0:
                blocks = page.get_text("dict")["blocks"]
                titles = [
                    block["lines"][0]["spans"][0]["text"].strip()
                    for block in blocks
                    if block.get("lines") and block["lines"][0]["spans"] and block["lines"][0]["spans"][0]["size"] > 15
                ]
                if titles:
                    first_heading = " ".join(titles).strip()
            
            # Extract images to avoid text near them
            image_rects = [fitz.Rect(img[1:5]) for img in page.get_images(full=True)]
            
            # Extract tables using improved method
            tables_found = False
            try:
                tables = page.find_tables()
                for table in tables:
                    table_text = format_table_to_text(page, table)
                    if table_text.strip():
                        text_blocks.append("=== TABLE START ===")
                        text_blocks.append(table_text.strip())
                        text_blocks.append("=== TABLE END ===")
                        tables_found = True
                        logger.info(f"Successfully extracted table from page {page_num + 1}")
            except Exception as e:
                logger.warning(f"Primary table extraction failed for page {page_num + 1}: {e}")
            
            # If no tables found with primary method, try alternative approach
            if not tables_found:
                try:
                    alt_tables = extract_tables_with_textboxes(page)
                    if alt_tables and len(alt_tables) >= 2:  # At least 2 rows to consider it a table
                        text_blocks.append("=== TABLE START ===")
                        output = StringIO()
                        writer = csv.writer(output, lineterminator='\n')
                        for row in alt_tables:
                            writer.writerow(row)
                        table_text = output.getvalue()
                        output.close()
                        text_blocks.append(table_text.strip())
                        text_blocks.append("=== TABLE END ===")
                        logger.info(f"Alternative table extraction found {len(alt_tables)} rows on page {page_num + 1}")
                except Exception as e:
                    logger.warning(f"Alternative table extraction failed for page {page_num + 1}: {e}")
            
            # Extract non-tabular text
            for block in page.get_text("blocks"):
                rect = fitz.Rect(block[:4])
                text = block[4].strip()
                if is_near_image(rect, image_rects) or not text:
                    continue
                for line in text.splitlines():
                    line = line.strip()
                    if noise_regex.match(line):
                        continue
                    if is_meaningful(line):
                        text_blocks.append(line)
        
        doc.close()
        if first_heading:
            return f"{first_heading}\n\n" + "\n".join(text_blocks)
        return "\n".join(text_blocks)
    except Exception as e:
        logger.error(f"Error processing {file_path}: {e}")
        return ""

# DOCX extractor
def extract_text_from_docx(file_path):
    try:
        return docx2txt.process(file_path)
    except Exception as e:
        logger.error(f"Error reading DOCX {file_path}: {e}")
        return ""

# Image extractor
def extract_images_from_pdf(pdf_path, base_name):
    try:
        doc = fitz.open(pdf_path)
        for page_num in range(len(doc)):
            for img_index, img in enumerate(doc.get_page_images(page_num)):
                xref = img[0]
                base_image = doc.extract_image(xref)
                image_bytes = base_image["image"]
                image_ext = base_image["ext"]
                image_path = images_folder / f"{base_name}_p{page_num+1}_img{img_index+1}.{image_ext}"
                with open(image_path, "wb") as f:
                    f.write(image_bytes)
        doc.close()
    except Exception as e:
        logger.error(f"Error extracting image from {pdf_path}: {e}")

# Process all files
all_files = list(root_folder.rglob("*.pdf")) + list(root_folder.rglob("*.docx"))
merged_corpus = []

logger.info(f"Found {len(all_files)} files to process")

for i, file in enumerate(all_files):
    name = f"{i:04d}__{file.stem}".replace(" ", "_").replace("/", "_")
    logger.info(f"Processing file {i+1}/{len(all_files)}: {file.name}")
    
    try:
        if file.suffix.lower() == ".pdf":
            raw_text = extract_clean_text_from_pdf(file)
            extract_images_from_pdf(file, name)
        elif file.suffix.lower() == ".docx":
            raw_text = extract_text_from_docx(file)
        else:
            logger.warning(f"Skipping unsupported file type: {file}")
            continue

        if raw_text.strip():
            txt_path = individual_docs_folder / f"{name}.txt"
            with open(txt_path, "w", encoding="utf-8") as f:
                f.write(raw_text.strip())
            merged_corpus.append(raw_text.strip())
            logger.info(f"✅ Successfully processed: {file.name}")
        else:
            logger.warning(f"No meaningful content extracted from {file}")
    except Exception as e:
        logger.error(f"Failed to process {file}: {e}")
        continue

# Save final merged corpus
merged_path = output_folder / "abb_corpus.txt"
try:
    with open(merged_path, "w", encoding="utf-8") as f:
        for doc in merged_corpus:
            f.write(doc + "\n\n" + "="*80 + "\n\n")
    logger.info(f"✅ Merged corpus written to: {merged_path}")
except Exception as e:
    logger.error(f"Error writing merged corpus to {merged_path}: {e}")

logger.info(f"✅ Final preprocessing complete!")
logger.info(f"📄 Individual text files saved to: {individual_docs_folder}")
logger.info(f"🖼️ Images extracted to: {images_folder}")
logger.info(f"📘 Merged corpus written to: {merged_path}")
logger.info(f"📊 Total files processed: {len(merged_corpus)}/{len(all_files)}")

2025-06-10 12:33:54,587 - INFO - Found 608 files to process
2025-06-10 12:33:54,588 - INFO - Processing file 1/608: Wireless Controller ARC600, Product Guide.pdf
2025-06-10 12:33:54,849 - INFO - Successfully extracted table from page 2
2025-06-10 12:33:55,082 - INFO - Successfully extracted table from page 3
2025-06-10 12:33:55,656 - INFO - Successfully extracted table from page 4
2025-06-10 12:33:55,785 - INFO - Successfully extracted table from page 5
2025-06-10 12:33:56,500 - INFO - Successfully extracted table from page 6
2025-06-10 12:33:56,612 - INFO - Successfully extracted table from page 7
2025-06-10 12:33:56,686 - INFO - Successfully extracted table from page 8
2025-06-10 12:33:56,688 - INFO - Successfully extracted table from page 8
2025-06-10 12:33:56,971 - INFO - Successfully extracted table from page 9
2025-06-10 12:33:56,977 - INFO - Successfully extracted table from page 9
2025-06-10 12:33:57,098 - INFO - Successfully extracted table from page 10
2025-06-10 12:33:57,281

In [9]:
import os
import json
import re
from pathlib import Path
from typing import List, Dict, Tuple
import logging
from collections import Counter
import matplotlib.pyplot as plt
import seaborn as sns

# Tokenizer libraries
from tokenizers import Tokenizer, models, pre_tokenizers, decoders, trainers, processors
from tokenizers.models import BPE, WordPiece
from tokenizers.trainers import BpeTrainer, WordPieceTrainer
from tokenizers.pre_tokenizers import Whitespace, ByteLevel
from tokenizers.processors import TemplateProcessing
from tokenizers.decoders import ByteLevel as ByteLevelDecoder, WordPiece as WordPieceDecoder



2025-06-10 13:39:54,969 - INFO - Note: NumExpr detected 12 cores but "NUMEXPR_MAX_THREADS" not set, so enforcing safe limit of 8.
2025-06-10 13:39:54,970 - INFO - NumExpr defaulting to 8 threads.


In [None]:
# Set up logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)

class ABBTokenizerTrainer:
    """Custom tokenizer trainer for ABB technical documentation."""
    
    def __init__(self, corpus_path: str, output_dir: str):
        self.corpus_path = Path(corpus_path)
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(parents=True, exist_ok=True)
        
        # Tokenizer configurations
        self.vocab_sizes = [8000, 16000, 32000]
        self.special_tokens = ["<pad>", "<unk>", "<bos>", "<eos>", "<mask>", "=== TABLE START ===", "=== TABLE END ==="]
        
        # Statistics storage
        self.training_stats = {}
        
    def preprocess_corpus(self) -> Tuple[str, Dict]:
        """Clean and preprocess the ABB corpus for tokenizer training."""
        logger.info("Preprocessing ABB corpus...")
        
        if not self.corpus_path.exists():
            raise FileNotFoundError(f"Corpus file not found: {self.corpus_path}")
        
        with open(self.corpus_path, 'r', encoding='utf-8') as f:
            raw_text = f.read()
        
        # Clean the text
        cleaned_text = self._clean_text(raw_text)
        
        # Generate statistics
        stats = self._generate_corpus_stats(cleaned_text)
        
        # Save cleaned corpus
        cleaned_path = self.output_dir / "cleaned_corpus.txt"
        with open(cleaned_path, 'w', encoding='utf-8') as f:
            f.write(cleaned_text)
        
        logger.info(f"Cleaned corpus saved to: {cleaned_path}")
        return str(cleaned_path), stats
    
    def _clean_text(self, text: str) -> str:
        """Clean and normalize text for tokenizer training."""
        # Remove table markers
        text = re.sub(r'=== TABLE START ===.*?=== TABLE END ===', '', text, flags=re.DOTALL)
        text = re.sub(r'=== Table ===.*?=== End Table ===', '', text, flags=re.DOTALL)
        
        # Clean up document separators
        text = re.sub(r'={50,}', '\n', text)
        
        # Normalize whitespace
        text = re.sub(r'\n\s*\n', '\n\n', text)  # Multiple newlines to double newline
        text = re.sub(r'[ \t]+', ' ', text)  # Multiple spaces/tabs to single space
        
        # Preserve ABB product codes and technical terms
        # Keep alphanumeric codes like "ARC600", "IEC-104", etc.
        
        # Remove excessive punctuation
        text = re.sub(r'[.]{3,}', '...', text)
        text = re.sub(r'[-]{3,}', '---', text)
        
        # Clean up common formatting artifacts
        text = re.sub(r'\s+([.,;:!?])', r'\1', text)  # Remove space before punctuation
        
        return text.strip()
    
    def _generate_corpus_stats(self, text: str) -> Dict:
        """Generate statistics about the corpus."""
        words = text.split()
        chars = list(text)
        
        stats = {
            'total_characters': len(text),
            'total_words': len(words),
            'unique_words': len(set(words)),
            'unique_chars': len(set(chars)),
            'avg_word_length': sum(len(word) for word in words) / len(words) if words else 0,
            'vocabulary_size': len(set(word.lower() for word in words)),
        }
        
        # Find ABB-specific terms
        abb_terms = []
        patterns = [
            r'\bABB\b',
            r'\b[A-Z]{2,}\d+\b',  # Product codes like ARC600
            r'\b[A-Z]+-\d+\b',    # Standards like IEC-104
            r'\b\d+[A-Z]+\b',     # Model numbers
        ]
        
        for pattern in patterns:
            matches = re.findall(pattern, text)
            abb_terms.extend(matches)
        
        stats['abb_terms_count'] = len(abb_terms)
        stats['unique_abb_terms'] = len(set(abb_terms))
        
        return stats
    
    def train_bpe_tokenizer(self, corpus_path: str, vocab_size: int) -> Tokenizer:
        """Train a BPE tokenizer on the ABB corpus."""
        logger.info(f"Training BPE tokenizer with vocab size {vocab_size}...")
        
        # Initialize BPE tokenizer
        tokenizer = Tokenizer(BPE(unk_token="<unk>"))
        
        # Set pre-tokenizer
        tokenizer.pre_tokenizer = pre_tokenizers.Sequence([
            Whitespace(),
            ByteLevel(add_prefix_space=False)
        ])
        
        # Set decoder
        tokenizer.decoder = ByteLevelDecoder()
        
        # Configure trainer
        trainer = BpeTrainer(
            vocab_size=vocab_size,
            min_frequency=2,
            special_tokens=self.special_tokens,
            show_progress=True,
            initial_alphabet=ByteLevel.alphabet()
        )
        
        # Train the tokenizer
        tokenizer.train([corpus_path], trainer)
        
        # Add post-processor for special tokens
        tokenizer.post_processor = TemplateProcessing(
            single="<bos> $A <eos>",
            special_tokens=[("<bos>", 2), ("<eos>", 3)]
        )
        
        return tokenizer
    
    def train_wordpiece_tokenizer(self, corpus_path: str, vocab_size: int) -> Tokenizer:
        """Train a WordPiece tokenizer on the ABB corpus."""
        logger.info(f"Training WordPiece tokenizer with vocab size {vocab_size}...")
        
        # Initialize WordPiece tokenizer
        tokenizer = Tokenizer(WordPiece(unk_token="<unk>"))
        
        # Set pre-tokenizer
        tokenizer.pre_tokenizer = Whitespace()
        
        # Set decoder
        tokenizer.decoder = WordPieceDecoder()
        
        # Configure trainer
        trainer = WordPieceTrainer(
            vocab_size=vocab_size,
            min_frequency=2,
            special_tokens=self.special_tokens,
            show_progress=True
        )
        
        # Train the tokenizer
        tokenizer.train([corpus_path], trainer)
        
        # Add post-processor for special tokens
        tokenizer.post_processor = TemplateProcessing(
            single="<bos> $A <eos>",
            special_tokens=[("<bos>", 2), ("<eos>", 3)]
        )
        
        return tokenizer
    
    def evaluate_tokenizer(self, tokenizer: Tokenizer, test_text: str, name: str) -> Dict:
        """Evaluate tokenizer performance on test text."""
        logger.info(f"Evaluating {name} tokenizer...")
        
        # Encode test text
        encoding = tokenizer.encode(test_text)
        tokens = encoding.tokens
        
        # Calculate metrics
        metrics = {
            'name': name,
            'num_tokens': len(tokens),
            'num_characters': len(test_text),
            'compression_ratio': len(test_text) / len(tokens) if tokens else 0,
            'vocab_size': tokenizer.get_vocab_size(),
            'unk_count': sum(1 for token in tokens if token == '<unk>'),
            'unk_ratio': sum(1 for token in tokens if token == '<unk>') / len(tokens) if tokens else 0
        }
        
        # Sample tokenization examples
        sample_texts = [
            "ABB Wireless Controller ARC600 provides remote monitoring capabilities.",
            "The IEC-104 protocol ensures reliable communication in distribution networks.",
            "System Average Interruption Duration Index (SAIDI) measurements show improvement.",
            "Configure the switching devices using I/O expansion modules."
        ]
        
        examples = []
        for text in sample_texts:
            encoding = tokenizer.encode(text)
            examples.append({
                'text': text,
                'tokens': encoding.tokens,
                'token_count': len(encoding.tokens)
            })
        
        metrics['examples'] = examples
        return metrics
    
    def save_tokenizer(self, tokenizer: Tokenizer, name: str, vocab_size: int):
        """Save tokenizer to files."""
        tokenizer_dir = self.output_dir / f"{name}_tokenizer_{vocab_size}"
        tokenizer_dir.mkdir(exist_ok=True)
        
        # Save tokenizer
        tokenizer.save(str(tokenizer_dir / "tokenizer.json"))
        
        # Save vocabulary
        vocab = tokenizer.get_vocab()
        with open(tokenizer_dir / "vocab.json", 'w', encoding='utf-8') as f:
            json.dump(vocab, f, indent=2, ensure_ascii=False)
        
        # Save vocab as text file
        with open(tokenizer_dir / "vocab.txt", 'w', encoding='utf-8') as f:
            for token, idx in sorted(vocab.items(), key=lambda x: x[1]):
                f.write(f"{token}\n")
        
        logger.info(f"Tokenizer saved to: {tokenizer_dir}")
        return tokenizer_dir
    
    def compare_tokenizers(self, metrics_list: List[Dict]):
        """Compare different tokenizers and generate report."""
        logger.info("Generating tokenizer comparison report...")
        
        # Create comparison DataFrame-like structure
        comparison_data = []
        for metrics in metrics_list:
            comparison_data.append({
                'Name': metrics['name'],
                'Vocabulary Size': metrics['vocab_size'],
                'Compression Ratio': f"{metrics['compression_ratio']:.2f}",
                'Unknown Token %': f"{metrics['unk_ratio']*100:.2f}%",
                'Tokens Generated': metrics['num_tokens']
            })
        
        # Save comparison report
        report_path = self.output_dir / "tokenizer_comparison.json"
        with open(report_path, 'w', encoding='utf-8') as f:
            json.dump({
                'comparison_summary': comparison_data,
                'detailed_metrics': metrics_list
            }, f, indent=2, ensure_ascii=False)
        
        # Generate text report
        text_report_path = self.output_dir / "tokenizer_comparison.txt"
        with open(text_report_path, 'w', encoding='utf-8') as f:
            f.write("ABB Tokenizer Comparison Report\n")
            f.write("=" * 50 + "\n\n")
            
            for data in comparison_data:
                f.write(f"Tokenizer: {data['Name']}\n")
                f.write(f"  Vocabulary Size: {data['Vocabulary Size']}\n")
                f.write(f"  Compression Ratio: {data['Compression Ratio']} chars/token\n")
                f.write(f"  Unknown Token Rate: {data['Unknown Token %']}\n")
                f.write(f"  Tokens Generated: {data['Tokens Generated']}\n\n")
            
            # Add recommendations
            f.write("Recommendations:\n")
            f.write("-" * 20 + "\n")
            
            best_compression = max(metrics_list, key=lambda x: x['compression_ratio'])
            best_coverage = min(metrics_list, key=lambda x: x['unk_ratio'])
            
            f.write(f"Best Compression: {best_compression['name']} ({best_compression['compression_ratio']:.2f} chars/token)\n")
            f.write(f"Best Coverage: {best_coverage['name']} ({best_coverage['unk_ratio']*100:.2f}% unknown tokens)\n")
        
        logger.info(f"Comparison reports saved to: {report_path} and {text_report_path}")
    
    def train_all_tokenizers(self):
        """Train all tokenizer variants and compare them."""
        logger.info("Starting comprehensive tokenizer training...")
        
        # Preprocess corpus
        cleaned_corpus_path, corpus_stats = self.preprocess_corpus()
        
        # Save corpus statistics
        stats_path = self.output_dir / "corpus_statistics.json"
        with open(stats_path, 'w', encoding='utf-8') as f:
            json.dump(corpus_stats, f, indent=2)
        
        logger.info(f"Corpus statistics: {corpus_stats}")
        
        # Load test text for evaluation
        with open(cleaned_corpus_path, 'r', encoding='utf-8') as f:
            full_text = f.read()
        
        # Use first 10% of text for evaluation
        test_text = full_text[:len(full_text)//10]
        
        all_metrics = []
        
        # Train tokenizers for different vocab sizes
        for vocab_size in self.vocab_sizes:
            # Train BPE tokenizer
            bpe_tokenizer = self.train_bpe_tokenizer(cleaned_corpus_path, vocab_size)
            bpe_metrics = self.evaluate_tokenizer(bpe_tokenizer, test_text, f"BPE_{vocab_size}")
            all_metrics.append(bpe_metrics)
            self.save_tokenizer(bpe_tokenizer, "BPE", vocab_size)
            
            # Train WordPiece tokenizer
            wp_tokenizer = self.train_wordpiece_tokenizer(cleaned_corpus_path, vocab_size)
            wp_metrics = self.evaluate_tokenizer(wp_tokenizer, test_text, f"WordPiece_{vocab_size}")
            all_metrics.append(wp_metrics)
            self.save_tokenizer(wp_tokenizer, "WordPiece", vocab_size)
        
        # Generate comparison report
        self.compare_tokenizers(all_metrics)
        
        logger.info("✅ Tokenizer training complete!")
        logger.info(f"📁 All outputs saved to: {self.output_dir}")
        
        return all_metrics



In [11]:
def main():
    """Main function to run tokenizer training."""
    
    # Configuration
    corpus_path = r"C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data\processed_final_grok\abb_corpus.txt"
    output_dir = r"C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data\processed_final_grok\tokenizers"
    
    # Initialize trainer
    trainer = ABBTokenizerTrainer(corpus_path, output_dir)
    
    try:
        # Train all tokenizers
        metrics = trainer.train_all_tokenizers()
        
        # Print summary
        print("\n" + "="*60)
        print("TOKENIZER TRAINING SUMMARY")
        print("="*60)
        
        for metric in metrics:
            print(f"\n{metric['name']}:")
            print(f"  Vocabulary Size: {metric['vocab_size']:,}")
            print(f"  Compression Ratio: {metric['compression_ratio']:.2f} chars/token")
            print(f"  Unknown Token Rate: {metric['unk_ratio']*100:.2f}%")
            
            # Show first example
            if metric['examples']:
                example = metric['examples'][0]
                print(f"  Example: '{example['text']}'")
                print(f"  Tokens: {example['tokens'][:5]}..." if len(example['tokens']) > 5 else f"  Tokens: {example['tokens']}")
        
        print(f"\n📁 All tokenizer files saved to: {output_dir}")
        print("✅ Ready for Step 3: SLM Architecture Building!")
        
    except Exception as e:
        logger.error(f"Error during tokenizer training: {e}")
        raise

if __name__ == "__main__":
    main()

2025-06-10 13:40:01,053 - INFO - Starting comprehensive tokenizer training...
2025-06-10 13:40:01,055 - INFO - Preprocessing ABB corpus...
2025-06-10 13:40:07,280 - INFO - Cleaned corpus saved to: C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data\processed_final_grok\tokenizers\cleaned_corpus.txt
2025-06-10 13:40:07,299 - INFO - Corpus statistics: {'total_characters': 19322004, 'total_words': 3157414, 'unique_words': 56324, 'unique_chars': 258, 'avg_word_length': 5.113947046538718, 'vocabulary_size': 49980, 'abb_terms_count': 37190, 'unique_abb_terms': 1559}
2025-06-10 13:40:07,410 - INFO - Training BPE tokenizer with vocab size 8000...
2025-06-10 13:40:10,936 - INFO - Evaluating BPE_8000 tokenizer...
2025-06-10 13:40:12,431 - INFO - Tokenizer saved to: C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data\processed_final_grok\tokenizers\BPE_tokenizer_8000
2025-06-10 13:40:12,433 - INFO - Training WordPiece tokenizer with vocab size 8000...
2025-06-10 13


TOKENIZER TRAINING SUMMARY

BPE_8000:
  Vocabulary Size: 8,000
  Compression Ratio: 4.77 chars/token
  Unknown Token Rate: 0.00%
  Example: 'ABB Wireless Controller ARC600 provides remote monitoring capabilities.'
  Tokens: ['<bos>', 'ABB', 'Wireless', 'Control', 'ler']...

WordPiece_8000:
  Vocabulary Size: 8,000
  Compression Ratio: 4.72 chars/token
  Unknown Token Rate: 0.00%
  Example: 'ABB Wireless Controller ARC600 provides remote monitoring capabilities.'
  Tokens: ['<bos>', 'ABB', 'Wireless', 'Control', '##ler']...

BPE_16000:
  Vocabulary Size: 16,000
  Compression Ratio: 5.00 chars/token
  Unknown Token Rate: 0.00%
  Example: 'ABB Wireless Controller ARC600 provides remote monitoring capabilities.'
  Tokens: ['<bos>', 'ABB', 'Wireless', 'Controller', 'ARC']...

WordPiece_16000:
  Vocabulary Size: 16,000
  Compression Ratio: 5.03 chars/token
  Unknown Token Rate: 0.00%
  Example: 'ABB Wireless Controller ARC600 provides remote monitoring capabilities.'
  Tokens: ['<bos>', 'AB

In [2]:
from tokenizers import Tokenizer

tokenizer_path = r"C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data\processed_final_updated\tokenizers\BPE_tokenizer_32000\tokenizer.json"
tokenizer = Tokenizer.from_file(tokenizer_path)

# Test encoding
ids = tokenizer.encode("Wireless Controller ARC600 supports IEC-104 protocol.").ids
print(ids)


[2, 6700, 8902, 3172, 705, 1905, 578, 19, 2979, 810, 20, 3]


In [4]:
from tokenizers import Tokenizer

# Load your tokenizer
tokenizer_path = r"C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data\processed_final_updated\tokenizers\WordPiece_tokenizer_32000\tokenizer.json"
tokenizer = Tokenizer.from_file(tokenizer_path)

# Sample text
text = "Wireless Controller ARC600 supports IEC-104 protocol succeeding."

# Encode and get both tokens and ids
encoding = tokenizer.encode(text)

tokens = encoding.tokens
token_ids = encoding.ids

# Print side-by-side
print("🧾 Tokenized Output:")
for token, id_ in zip(tokens, token_ids):
    print(f"{token:20} --> {id_}")


🧾 Tokenized Output:
<bos>                --> 2
Wireless             --> 7607
Controller           --> 10178
ARC600               --> 9841
supports             --> 2174
IEC                  --> 767
-                    --> 41
104                  --> 3378
protocol             --> 1010
succeeding           --> 27857
.                    --> 42
<eos>                --> 3


In [5]:
from tokenizers import Tokenizer

# Load your tokenizer
tokenizer_path = r"C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data\processed_final_updated\tokenizers\WordPiece_tokenizer_32000\tokenizer.json"
tokenizer = Tokenizer.from_file(tokenizer_path)

# Sample text
text = "Wireless Controller ARC600 supports IEC-104 protocol succeeding."

# Encode and get both tokens and ids
encoding = tokenizer.encode(text)

tokens = encoding.tokens
token_ids = encoding.ids

# Print side-by-side
print("🧾 Tokenized Output:")
for token, id_ in zip(tokens, token_ids):
    print(f"{token:20} --> {id_}")


🧾 Tokenized Output:
<bos>                --> 2
Wireless             --> 7607
Controller           --> 10178
ARC600               --> 9841
supports             --> 2174
IEC                  --> 767
-                    --> 41
104                  --> 3378
protocol             --> 1010
succeeding           --> 27857
.                    --> 42
<eos>                --> 3


In [6]:
from tokenizers import Tokenizer

# Load your tokenizer
tokenizer_path = r"C:\Users\INKARED5\OneDrive - ABB\Karan_ABB_Internship\Projects\Data\processed_final_updated\tokenizers_best\bpe_tokenizer_32000.json"
tokenizer = Tokenizer.from_file(tokenizer_path)

# Sample text
text = "Wireless Controller ARC600 supports IEC-104 protocol succeeding."

# Encode and get both tokens and ids
encoding = tokenizer.encode(text)

tokens = encoding.tokens
token_ids = encoding.ids

# Print side-by-side
print("🧾 Tokenized Output:")
for token, id_ in zip(tokens, token_ids):
    print(f"{token:20} --> {id_}")


🧾 Tokenized Output:
<bos>                --> 0
ĠWireless            --> 11578
ĠController          --> 14153
Ġ                    --> 4360
ARC600               --> 162
Ġsupports            --> 6895
Ġ                    --> 4360
IEC-104              --> 1779
Ġprotocol            --> 5192
Ġsucceed             --> 15482
ing                  --> 4414
.                    --> 4209
<eos>                --> 1
