In [None]:
import pandas as pd
import regex as re
from pathlib import Path
from Levenshtein import distance

current_dir = Path.cwd()
parsed_dataframe_directory = current_dir.parent / 'dataframes' / 'parsed_dataframes'
file_location = parsed_dataframe_directory / 'all_entries.csv'  # combined output from llm_parser

In [None]:
# quick sanity check: entry counts per year
parsed_dataframe_directory = current_dir.parent / 'dataframes' / 'parsed_dataframes'

for item in sorted(parsed_dataframe_directory.iterdir()):

    if item.name.startswith('.') or item.name.startswith('all') or item.is_dir():
        continue

    file_name_regex = r"entries_[0-9]{4}"
    file_name_match = re.findall(file_name_regex, str(item.name))
    if not file_name_match:
        print(f"⚠️ Skipping file with unexpected name format: {item.name}")
        continue

    file_name = file_name_match[0] + ".csv"
    df = pd.read_csv(parsed_dataframe_directory / file_name)
    print(f"{file_name_match[0]}: {len(df)}")

In [None]:
def entry_clean(entry):
    """Stitch parsed fields back into a single string for comparison."""
    fields = ['opening_bits', 'author(s)', 'title', 'format', 'little_bits', 'publisher', 'date']
    parts = [str(entry[field]) if pd.notna(entry[field]) else ' ' for field in fields]
    stitched = ' '.join(parts).strip()    
    return stitched

def strip_string_levenshtein(entry):
    """Remove all non-alphanumeric chars for levenshtein comparison."""
    if not isinstance(entry, str) or pd.isna(entry):
        return ''
    return re.sub("[\W]","",entry).lower()

def strip_string(entry):
    """Normalize whitespace/punctuation for jaccard comparison."""
    return " ".join(re.sub(r"\W", " ", entry).split())

def jaccard_similarity(str1, str2):
    """Token-level jaccard similarity between two strings."""
    set1 = set(str1.split())
    set2 = set(str2.split())
    intersection = len(set1.intersection(set2))
    union = len(set1.union(set2))
    return intersection / union if union != 0 else 0

def compute_row_scores(row):
    """Compare original entry vs stitched parsed fields.
    Uses levenshtein for char-level diff, jaccard for token-level.
    Flags entries where parsing lost or added content."""
    stiched_entry = entry_clean(row)
    
    original_str_lev = strip_string_levenshtein(str(row['original_entry']))
    stiched_entry_lev = strip_string_levenshtein(stiched_entry)
    lev_score = distance(original_str_lev, stiched_entry_lev)
    
    jaccard_score = None
    if lev_score > 1:
        original_str_jac = strip_string(str(row['original_entry'])).lower()
        stiched_entry_jac = strip_string(stiched_entry).lower()
        jaccard_score = jaccard_similarity(original_str_jac, stiched_entry_jac)
    else:
        jaccard_score = 1.0
    
    # flag if levenshtein >= 1 AND jaccard <= 0.99
    flagged = (lev_score >= 1) and (jaccard_score <= 0.99)
    
    diff = token_diff(str(row['original_entry']), stiched_entry)
    
    return pd.Series({
        'levenshtein': lev_score,
        'jaccard': jaccard_score,
        'flagged_for_correction': flagged,
        'diff': diff
    })

def token_diff(a, b):
    """Show which tokens are shared vs only in original/parsed."""
    a_clean = strip_string(a)
    b_clean = strip_string(b)
    
    a_tokens = set(a_clean.split())
    b_tokens = set(b_clean.split())
    
    shared = a_tokens & b_tokens
    only_in_a = a_tokens - b_tokens
    only_in_b = b_tokens - a_tokens

    return {
        'shared': sorted(shared),
        'only_in_a': sorted(only_in_a),
        'only_in_b': sorted(only_in_b),
        'jaccard': len(shared) / len(a_tokens | b_tokens) if a_tokens | b_tokens else 1.0
    }

def print_entry(entry):
    """Pretty-print a single entry for inspection."""
    print(f"{entry['original_entry']}")
    print("- - - - -")
    print(f"opening_bits: {entry['opening_bits']}")
    print(f"author(s): {entry['author(s)']}")
    print(f"title: {entry['title']}")
    print(f"format: {entry['format']}")
    print(f"little_bits: {entry['little_bits']}")
    print(f"publisher: {entry['publisher']}")
    print(f"date: {entry['date']}")
    print(f"levenshtein: {entry['levenshtein']}")
    print(f"jaccard: {entry['jaccard']}")
    print(f"diff: {entry['diff']['shared']}")
    print(f"only in original entry: {entry['diff']['only_in_a']}")
    print(f"only in parsed entry: {entry['diff']['only_in_b']}")
    print("–––––––\n")

In [None]:
df = pd.read_csv(file_location)

In [None]:
# score every row: levenshtein, jaccard, flagged, token diff
df[['levenshtein', 'jaccard', 'flagged', 'diff']] = df.apply(compute_row_scores, axis=1) 

In [None]:
# also flag entries missing both author+title, or missing publisher
no_author_or_title = (df['author(s)'].isna()) & (df['title'].isna())
df.loc[no_author_or_title, 'flagged'] = True

no_publisher = df['publisher'].isna()
df.loc[no_publisher, 'flagged'] = True

In [None]:
# summary stats
print(f"Total entries: {len(df)}")
print(f"Flagged: {df['flagged'].sum()} ({df['flagged'].mean()*100:.1f}%)")
print(f"\nLevenshtein > 0: {(df['levenshtein'] > 0).sum()}")
print(f"Jaccard < 1.0: {(df['jaccard'] < 1.0).sum()}")
print(f"Jaccard < 0.9: {(df['jaccard'] < 0.9).sum()}")

In [None]:
# inspect low-accuracy entries (sorted worst first)
low_accuracy = df[df['flagged'] == True].sort_values('jaccard')
print(f"{len(low_accuracy)} flagged entries\n")

for _, entry in low_accuracy.head(20).iterrows():
    print_entry(entry)

In [None]:
# export flagged entries for manual review
low_accuracy.to_csv('flagged_entries.csv', index=True)
print(f"Exported {len(low_accuracy)} flagged entries to flagged_entries.csv")

In [None]:
# keep only the core columns, drop score columns
# new_column_order = ['original_entry', 'opening_bits', 'author(s)', 'title', 'format', 'little_bits', 'publisher', 'date', 'catalogue_year', 'page_num', 'doc_page_num']
# df = df.reindex(columns=new_column_order)

In [None]:
# save final mega list
# df.to_csv('parsed_entries_mega_list.csv', index=True)