In [None]:

import pandas as pd
from PIL import Image
import pytesseract
import os
import numpy as np
from PIL import Image, ImageEnhance
import re
import os
import os, json, joblib
from datetime import datetime
pytesseract.pytesseract.tesseract_cmd = r"C:\Program Files\Tesseract-OCR\tesseract.exe"

print(np.__version__)

In [None]:
from src.categorizer import preprocessing
preprocessing.main()

In [None]:
def clean_text(raw_text):
    # Collapse whitespace
    text = re.sub(r"\s+", " ", raw_text).strip()

    # Remove common header/footer artifacts
    text = re.sub(r"Page\s*\d+|\d+\s*/\s*\d+", "", text, flags=re.IGNORECASE)

    # Strict zero replacement (only in CAPS words)
    text = re.sub(r"\b([A-Z]*?)0([A-Z]*?)\b", lambda m: m.group(0).replace("0", "O"), text)
    
    # Looser one replacement (fixes Th1s → This)
    text = re.sub(r"\b(\w*?)1(\w*?)\b", lambda m: m.group(0).replace("1", "l"), text)

    # Lowercase for consistency
    text = text.lower()

    # Optional: remove punctuation
    text = re.sub(r"[^\w\s]", "", text)

    return text


In [None]:
def preprocess_image(img):
    img_rgb = img.convert("RGB")
    enhancer = ImageEnhance.Contrast(img_rgb)
    img_contrast = enhancer.enhance(2.0)
    return img_contrast.convert("L").point(lambda x: 0 if x < 128 else 255, "1")

In [None]:
def compare_distributions(full_df, list_of_dfs, column, tolerance=1):
    true_pct = full_df[column].value_counts(normalize=True)*100
    labels = true_pct.index
    results = {}
    for name, df in list_of_dfs:
        df_pct = df[column].value_counts(normalize=True)*100
        df_pct = df_pct.reindex(labels, fill_value=0,)
        diff = (np.abs(true_pct - df_pct))
        bad = diff[diff > tolerance]
        ok = (diff<=tolerance).all()
        if ok:
            print (f"✅ Distributions are similar full vs {name}")
            
        else:
            print (f"🚨 Distributions from {name} are more than {tolerance}% "
                    f"different from full dataset {bad.index} with differences {bad.values}")
        results[name] = {'ok': bool(ok),
                        'max_drift': float(diff.max()), 
                        'bad_labels': bad.index.tolist()}
    return results

In [None]:
from pathlib import Path
print("CWD:", Path.cwd())
img_dir = Path("data/raw/images/Training")
print("Looking for:", img_dir.resolve())
print("Exists?", img_dir.exists())

In [None]:
from pathlib import Path
import os, pandas as pd
from PIL import Image
import pytesseract

file_types = ['budget', 'email', 'letter', 'invoice']
cols = ['filename', 'type', 'text']

img_dir = Path(r"C:\Users\abajp\PycharmProjects\BofAOCRProject\data\raw\images\Training")
processed_dir = Path(r"C:\Users\abajp\PycharmProjects\BofAOCRProject\data\processed")
processed_dir.mkdir(parents=True, exist_ok=True)

# img_dir       = Path("data/raw/images/Training")
# processed_dir = Path("data/processed")
processed_dir.mkdir(parents=True, exist_ok=True)
out_path = processed_dir / "output.csv"

# load processed filenames once (fast: read only that column)
processed = set()
if out_path.exists():
    processed = set(pd.read_csv(out_path, usecols=['filename'])['filename'])

for name in os.listdir(img_dir):
    path = img_dir / name
    if not path.is_file():
        continue

    match = next((w for w in file_types if w in name.lower()), None)
    if not match:
        continue
    if name in processed:
        continue

    with Image.open(path) as img:
        pre_img = preprocess_image(img)
        text = clean_text(pytesseract.image_to_string(pre_img))

    # append only the new row
    new_df = pd.DataFrame([[name, match, text]], columns=cols)
    new_df.to_csv(out_path, mode='a', index=False, header=not out_path.exists())
    processed.add(name)

In [None]:
# files = os.listdir("data/raw/images/Training")
# file_types = ['budget', 'email', 'letter', 'invoice']
# cols = ['filename', 'type', 'text']
# from pathlib import Path
# img_dir = Path("data/raw/images/Training")
# processed_dir = Path("data/processed")
# 
# contents = []
# file_dict = {}
# output_file = "output.csv"
# 
# if not os.path.exists(output_file):
#     print("🚨 CSV does not exist, creating blank one...")
#     df = pd.DataFrame(columns=cols)
#     df.to_csv(output_file, index=False)  # Create blank CSV with header
# else:
#     print("✅ CSV exists already")
#     df = pd.read_csv(output_file)
# 
# for name in os.listdir(img_dir):
#     path = img_dir/name
#     match = next((word for word in file_types if word.lower() in name.lower()), None)
#     if match:
#         print(f"{name}:{match}")
#     processed_files = set(df['filename'])
#     if path in processed_files:
#         print(f"{name} already in df")
#         continue
#     file_dict.update({path:match})
#     with Image.open(path) as img:
#         pre_img = preprocess_image(img)
#         text = pytesseract.image_to_string(pre_img)
#         # text = pytesseract.image_to_string(Image.open(f"Training_half/{i}"))
#         text = clean_text(text)   
#         # contents.append(text)
#         new_df = pd.DataFrame([[path, match, text]], columns = cols)
#         df = pd.concat([df, new_df], ignore_index=True)
#         if not os.path.exists("output.csv"):
#             df.to_csv(f"{processed_dir}/output.csv", index=False)  # Write with header
#         else:
#             df.to_csv(f"{processed_dir}/output.csv", mode="a", index=False, header=False)
#         


In [None]:
# Save a quick look at the nulls
df = pd.read_csv(r'C:\Users\abajp\PycharmProjects\BofAOCRProject\data\processed\output.csv')
nulls = df[df.isna().any(axis=1)]
nulls.to_csv("null_rows_snapshot.csv", index=False)

# Drop nulls & empty strings in core columns
df = df.dropna(subset=["text", "type"])
df = df[df["text"].str.strip() != ""]

In [None]:
from sklearn.model_selection import train_test_split
train_val, test = train_test_split(df, test_size=0.1, random_state=42, stratify=df['type'])
train, val = train_test_split(train_val, test_size=0.09, random_state=42, stratify=train_val['type'])

In [None]:
train_val_pct = train_val.value_counts('type')/len(train_val)*100
test_val_pct = test.value_counts('type')/len(test)*100
diff = train_val_pct - test_val_pct
diff

In [None]:
results = compare_distributions(df, [('train', train), ('val', val), ('test', test)], 'type')
results

In [None]:
train.to_csv("../data/processed/datasets/v1/train.csv", index=False)
val.to_csv("../data/processed/datasets/v1/validation.csv", index=False)
test.to_csv("../data/processed/datasets/v1/test.csv", index=False)

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer
vectorizer = TfidfVectorizer(ngram_range=(1,2), max_features=50000, min_df=2, sublinear_tf=True)
X_train = vectorizer.fit_transform(train['text'].astype(str))
X_va = vectorizer.transform(val['text'].astype(str))

In [None]:
codes = {'letter':0, 'budget':1, 'invoice':2, 'email':3}

y_train = train['type']
y_va = val['type']
y_train = y_train.map(codes)
y_va =  y_va.map(codes)

In [None]:
# train logreg model and show performance of model
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, confusion_matrix

clf = LogisticRegression(solver='liblinear', max_iter=2000, random_state=42)
clf.fit(X_train, y_train)

preds = clf.predict(X_va)
print(classification_report(y_va, preds, target_names=codes.keys()))