## Create other year sample

In [15]:
import pandas as pd
import numpy as np
import re

OCCs_1911 = pd.read_csv(
    "D:\\Postgraduate\\Data Science Project\\Data\\OCCs_1851.csv",
    sep=",",  
    header=0, 
    encoding='latin1',  
    on_bad_lines='skip'  
)

def clean_text(text):
    text = text.lower()
    text = re.sub(r'[^a-z\s]', '', text)
    return text.strip()
OCCs_1911['Occupation_String'] = OCCs_1911['Occupation_String'].astype(str)
OCCs_1911['Occupation_String'] = OCCs_1911['Occupation_String'].apply(clean_text)

OCCs_1911 = OCCs_1911.dropna(subset=['Occupation_String'])

OCCs_1911 = OCCs_1911[OCCs_1911['Occupation_String'].str.strip() != '']

Occ_HISCO = pd.read_excel("D:\\Postgraduate\\Data Science Project\\Data\\OCCODE_HISCO (FINAL)_ZZ.xlsx",header=0)

mapping_df = Occ_HISCO.rename(columns={
    "OCCODE": "Occode",
    "OCCUPATION": "Occode_Desc",
    "HISCO_TEXT": "HISCO_Desc"
})

# only keep necessary columns
mapping_df = mapping_df[["Occode", "HISCO", "Occode_Desc", "HISCO_Desc"]]

# use Occode only to merge
OCCs_1911_text = OCCs_1911.merge(mapping_df, on=["Occode"], how="left")

OCCs_1911_text['Sex'] = OCCs_1911_text['Sex'].replace({
    'F': 'Female',
    'M': 'Male',
    'U': 'Unknown'
})

In [16]:
import pandas as pd
import numpy as np
from collections import Counter
import re

def extract_occupation_sample(df, sample_size=1000, random_state=42):
    """
    Extract a representative sample for occupation string standardization.
    
    Parameters:
    - df: DataFrame with occupation data
    - sample_size: Target sample size
    - random_state: Random seed for reproducibility
    
    Returns:
    - DataFrame with sampled records
    """
    np.random.seed(random_state)
    
    # 1. STRATIFIED SAMPLING BY KEY DIMENSIONS
    stratified_samples = []
    
    # Sample by Sex to ensure gender representation
    sex_groups = df.groupby('Sex')
    sex_proportions = df['Sex'].value_counts(normalize=True)
    
    for sex, proportion in sex_proportions.items():
        sex_sample_size = max(1, int(sample_size * 0.3 * proportion))
        sex_data = sex_groups.get_group(sex)
        if len(sex_data) >= sex_sample_size:
            stratified_samples.append(sex_data.sample(n=sex_sample_size, random_state=random_state))
        else:
            stratified_samples.append(sex_data)
    
    # 2. OCCUPATION STRING COMPLEXITY SAMPLING
    df_copy = df.copy()
    
    # Categorize occupation strings by complexity
    def categorize_occupation_complexity(occupation_str):
        if pd.isna(occupation_str):
            return 'missing'
        
        occupation_str = str(occupation_str).lower().strip()
        
        # Simple single words
        if len(occupation_str.split()) == 1:
            return 'simple'
        
        # Contains numbers or special characters (potentially complex)
        if re.search(r'\d|[^\w\s]', occupation_str):
            return 'complex'
        
        # Multiple words but straightforward
        if len(occupation_str.split()) <= 3:
            return 'moderate'
        
        # Long descriptions
        return 'complex'
    
    df_copy['complexity'] = df_copy['Occupation_String'].apply(categorize_occupation_complexity)
    
    # Sample from each complexity category
    complexity_samples = []
    complexity_proportions = {'simple': 0.4, 'moderate': 0.3, 'complex': 0.2, 'missing': 0.1}
    
    for complexity, target_prop in complexity_proportions.items():
        complexity_data = df_copy[df_copy['complexity'] == complexity]
        if len(complexity_data) > 0:
            complexity_sample_size = max(1, int(sample_size * 0.3 * target_prop))
            if len(complexity_data) >= complexity_sample_size:
                complexity_samples.append(complexity_data.sample(n=complexity_sample_size, random_state=random_state))
            else:
                complexity_samples.append(complexity_data)
    
    # 3. FREQUENCY-BASED SAMPLING
    # Include both common and rare occupation strings
    occupation_counts = df['Occupation_String'].value_counts()
    
    # High frequency occupations (top 10%)
    high_freq_threshold = occupation_counts.quantile(0.9)
    high_freq_occupations = occupation_counts[occupation_counts >= high_freq_threshold].index
    high_freq_data = df[df['Occupation_String'].isin(high_freq_occupations)]
    high_freq_sample = high_freq_data.sample(n=min(len(high_freq_data), int(sample_size * 0.2)), 
                                           random_state=random_state)
    
    # Low frequency occupations (bottom 50%)
    low_freq_threshold = occupation_counts.median()
    low_freq_occupations = occupation_counts[occupation_counts <= low_freq_threshold].index
    low_freq_data = df[df['Occupation_String'].isin(low_freq_occupations)]
    low_freq_sample = low_freq_data.sample(n=min(len(low_freq_data), int(sample_size * 0.2)), 
                                         random_state=random_state)
    
    # 4. GEOGRAPHIC DIVERSITY
    # Sample across different counties if available
    county_samples = []
    if 'County' in df.columns:
        counties = df['County'].unique()
        county_sample_size = max(1, int(sample_size * 0.1 / len(counties)))
        
        for county in counties[:10]:  # Limit to top 10 counties to avoid too much fragmentation
            county_data = df[df['County'] == county]
            if len(county_data) >= county_sample_size:
                county_samples.append(county_data.sample(n=county_sample_size, random_state=random_state))
    
    # 5. SPECIAL CASES SAMPLING
    special_cases = []
    
    # Occupations with specific patterns that might need special handling
    patterns = {
        'family_relations': r'\b(wife|husband|son|daughter|mother|father)\b',
        'apprentices': r'\bapprentice\b',
        'servants': r'\bservant\b',
        'farmers': r'\bfarm\b',
        'makers': r'\bmaker\b',
        'numerical': r'\d+',
        'modifiers': r'\b(former|ex|retired|late)\b'
    }
    
    for pattern_name, pattern in patterns.items():
        pattern_data = df[df['Occupation_String'].str.contains(pattern, case=False, na=False)]
        if len(pattern_data) > 0:
            pattern_sample_size = min(len(pattern_data), max(1, int(sample_size * 0.05)))
            special_cases.append(pattern_data.sample(n=pattern_sample_size, random_state=random_state))
    
    # 6. COMBINE ALL SAMPLES AND DEDUPLICATE
    all_samples = (stratified_samples + complexity_samples + [high_freq_sample, low_freq_sample] + 
                  county_samples + special_cases)
    
    # Combine all samples
    combined_sample = pd.concat(all_samples, ignore_index=True)
    
    # Remove duplicates while preserving diversity
    combined_sample = combined_sample.drop_duplicates(subset=['Year', 'County', 'Sex', 'Occupation_String', 'Occode'])
    
    # If we have more than target, randomly sample down
    if len(combined_sample) > sample_size:
        final_sample = combined_sample.sample(n=sample_size, random_state=random_state)
    else:
        # If we need more, add random samples from remaining data
        remaining_data = df[~df.index.isin(combined_sample.index)]
        additional_needed = sample_size - len(combined_sample)
        
        if len(remaining_data) >= additional_needed:
            additional_sample = remaining_data.sample(n=additional_needed, random_state=random_state)
            final_sample = pd.concat([combined_sample, additional_sample], ignore_index=True)
        else:
            final_sample = combined_sample
    
    return final_sample.reset_index(drop=True)

def analyze_sample_quality(original_df, sample_df):
    """
    Analyze the quality and representativeness of the sample.
    """
    print("=== SAMPLE QUALITY ANALYSIS ===\n")
    
    print(f"Original dataset size: {len(original_df):,}")
    print(f"Sample size: {len(sample_df):,}")
    print(f"Sample rate: {len(sample_df)/len(original_df)*100:.2f}%\n")
    
    # Sex distribution comparison
    print("Sex Distribution:")
    orig_sex = original_df['Sex'].value_counts(normalize=True)
    sample_sex = sample_df['Sex'].value_counts(normalize=True)
    
    for sex in orig_sex.index:
        orig_pct = orig_sex.get(sex, 0) * 100
        sample_pct = sample_sex.get(sex, 0) * 100
        print(f"  {sex}: Original {orig_pct:.1f}% vs Sample {sample_pct:.1f}%")
    
    # Occupation string complexity analysis
    def get_complexity_stats(df):
        stats = {
            'unique_occupations': df['Occupation_String'].nunique(),
            'avg_length': df['Occupation_String'].str.len().mean(),
            'missing_values': df['Occupation_String'].isna().sum(),
            'single_word': df['Occupation_String'].str.split().str.len().eq(1).sum(),
            'multi_word': df['Occupation_String'].str.split().str.len().gt(1).sum()
        }
        return stats
    
    print(f"\nOccupation String Complexity:")
    orig_stats = get_complexity_stats(original_df)
    sample_stats = get_complexity_stats(sample_df)
    
    for metric, orig_val in orig_stats.items():
        sample_val = sample_stats[metric]
        if metric == 'avg_length':
            print(f"  {metric}: Original {orig_val:.1f} vs Sample {sample_val:.1f}")
        else:
            print(f"  {metric}: Original {orig_val:,} vs Sample {sample_val:,}")
    
    # Top occupations coverage
    print(f"\nTop 10 Most Common Occupations Coverage:")
    top_occupations = original_df['Occupation_String'].value_counts().head(10)
    
    for occupation, count in top_occupations.items():
        sample_count = (sample_df['Occupation_String'] == occupation).sum()
        coverage = sample_count / count * 100 if count > 0 else 0
        print(f"  '{occupation}': {sample_count}/{count} ({coverage:.1f}% coverage)")

# Example usage:
# df = pd.read_csv('your_1861_occupation_data.csv')
sample = extract_occupation_sample(OCCs_1911_text, sample_size=1000)
analyze_sample_quality(OCCs_1911_text, sample)
sample.to_csv('occupation_sample_for_standardization_1851.csv', index=False)

  pattern_data = df[df['Occupation_String'].str.contains(pattern, case=False, na=False)]


=== SAMPLE QUALITY ANALYSIS ===

Original dataset size: 1,725,688
Sample size: 1,000
Sample rate: 0.06%

Sex Distribution:
  Male: Original 68.3% vs Sample 67.1%
  Female: Original 29.8% vs Sample 30.9%
  Unknown: Original 1.9% vs Sample 2.0%

Occupation String Complexity:
  unique_occupations: Original 747,171 vs Sample 977
  avg_length: Original 19.7 vs Sample 18.9
  missing_values: Original 0 vs Sample 0
  single_word: Original 127,053 vs Sample 136
  multi_word: Original 1,598,635 vs Sample 864

Top 10 Most Common Occupations Coverage:
  'farmer of  acres': 0/7337 (0.0% coverage)
  'farmer of  acres employing  labourers': 2/6125 (0.0% coverage)
  'farmer  acres': 3/4363 (0.1% coverage)
  'farmer of  acres employing  lab': 2/3483 (0.1% coverage)
  'farmer of  acres employing  labourer': 2/1936 (0.1% coverage)
  'farmer of  acres employing  men': 0/1669 (0.0% coverage)
  'farmer  acres  lab': 0/1452 (0.0% coverage)
  'farmer  acres  labourers': 0/1406 (0.0% coverage)
  'farmer  acres

## Rule-based data pre-cleaning

In [17]:
import pandas as pd
import numpy as np
from sentence_transformers import SentenceTransformer
from sklearn.cluster import DBSCAN
from sklearn.metrics.pairwise import cosine_similarity
from rapidfuzz import fuzz, process
from tqdm import tqdm
from collections import defaultdict, Counter
import re
import string
from typing import Dict, List, Tuple, Set
import difflib
import os
import gc
import math

# English dictionary validation setup
try:
    import nltk
    from nltk.corpus import wordnet, words
    from nltk.stem import WordNetLemmatizer
    
    # Download required NLTK data
    for resource in ['wordnet', 'words', 'averaged_perceptron_tagger']:
        try:
            nltk.data.find(f'corpora/{resource}' if resource != 'averaged_perceptron_tagger' else f'taggers/{resource}')
        except LookupError:
            nltk.download(resource, quiet=True)
    
    NLTK_AVAILABLE = True
    print("NLTK resources loaded successfully")
except ImportError:
    print("NLTK not available. Using basic dictionary validation.")
    NLTK_AVAILABLE = False

# Basic English words as fallback
BASIC_ENGLISH_WORDS = {
    'worker', 'man', 'woman', 'boy', 'girl', 'master', 'apprentice', 'journeyman',
    'labourer', 'laborer', 'servant', 'farmer', 'baker', 'smith', 'carpenter', 
    'weaver', 'tailor', 'shoemaker', 'blacksmith', 'agricultural', 'domestic',
    'general', 'coal', 'iron', 'cotton', 'wool', 'silk', 'wife', 'widow', 
    'daughter', 'son', 'mother', 'father', 'sister', 'brother', 'retired',
    'unemployed', 'former', 'late', 'head', 'assistant', 'clerk', 'teacher',
    'nurse', 'doctor', 'merchant', 'shopkeeper', 'miner', 'miller', 'butcher',
    'grocer', 'publican', 'innkeeper', 'coachman', 'groom', 'gardener'
}

class TraditionalOccupationCleaner:
    def __init__(self, excel_file_path: str = None):
        """
        Initialize the traditional occupation cleaner
        Args:
            excel_file_path: Path to Excel file containing dictionaries
        """
        self.embedding_model = SentenceTransformer('sentence-transformers/all-mpnet-base-v2')
        
        # Initialize English dictionary validation
        self._initialize_english_dictionary_validation()
        
        # NEW: Initialize multi-occupation detection patterns
        self._initialize_multi_occupation_patterns()
        
        # Load dictionaries from Excel file or use empty defaults
        if excel_file_path and os.path.exists(excel_file_path):
            self.abbreviation_dict, self.common_misspellings, self.occupation_categories, self.important_modifiers = self.load_dictionaries_from_excel(excel_file_path)
        else:
            # Empty dictionaries if no file provided
            self.abbreviation_dict = {}
            self.common_misspellings = {}
            self.important_modifiers = {}
            self.occupation_categories = {
                'agricultural': [],
                'domestic_service': [],
                'textile_manufacture': [],
                'clothing_footwear': [],
                'mining_quarrying': [],
                'building_construction': [],
                'metalwork_engineering': [],
                'food_drink': [],
                'transport_communication': [],
                'professional_clerical': [],
                'retail_trade': [],
                'manufacturing_other': [],
                'crafts_trades': [],
                'personal_service': [],
                'public_service': [],
                'general_labour': [],
                'other': []
            }
    
    def _initialize_multi_occupation_patterns(self):
        """
        Initialize patterns for detecting multiple occupations
        """
        # Common separators and connectors in historical records
        self.occupation_separators = [
            r'\s+and\s+',           # " and "
            r'\s*&\s*',             # " & " or "&"
            r'\s*\+\s*',            # " + " or "+"
            r'\s*/\s*',             # " / " or "/"
            r'\s*,\s*and\s+',       # ", and "
            r'\s+also\s+',          # " also "
            r'\s+or\s+',            # " or "
        ]
        
        # Compile regex patterns for efficiency
        self.separator_patterns = [re.compile(pattern, re.IGNORECASE) for pattern in self.occupation_separators]
        
        # Words that should NOT be considered as separate occupations
        self.non_occupation_words = {
            'wife', 'widow', 'daughter', 'son', 'mother', 'father', 'sister', 'brother',
            'unmarried', 'spinster', 'bachelor', 'widower', 'married',
            'unemployed', 'retired', 'former', 'late', 'ex',
            'apprentice', 'master', 'journeyman', 'assistant', 'head',
            'the', 'a', 'an', 'of', 'in', 'at', 'to', 'for', 'with', 'by'
        }
    
    def _detect_multiple_occupations(self, text: str) -> bool:
        """
        Simple detection of multiple occupations - returns True/False only
        """
        if pd.isna(text) or not text.strip():
            return False
        
        normalized_text = self.normalize_text(text)
        
        # Check for obvious multi-occupation patterns
        for pattern in self.separator_patterns:
            if pattern.search(normalized_text):
                # Split by this pattern
                parts = pattern.split(normalized_text)
                # Clean up parts
                occupations = [part.strip() for part in parts if part.strip()]
                
                # Only consider it multiple if get 2+ meaningful parts
                if len(occupations) >= 2:
                    # Check that each part is substantial and looks like an occupation
                    substantial_parts = []
                    for part in occupations:
                        # Skip very short parts, common words, and modifier words
                        if (len(part) > 2 and 
                            part not in self.non_occupation_words and
                            len(part.split()) >= 1):  # At least one word
                            substantial_parts.append(part)
                    
                    # If we have 2+ substantial parts that look like occupations, it's multiple
                    if len(substantial_parts) >= 2:
                        return True
        
        # Special case: Check for comma-separated occupations (more conservative)
        # Only if the parts look like actual occupations and don't contain family/modifier words
        if ',' in normalized_text and 'and' not in normalized_text:
            parts = [part.strip() for part in normalized_text.split(',')]
            if len(parts) >= 2:
                # Check if each part looks like an occupation (not a modifier)
                occupation_like_parts = []
                for part in parts:
                    # Skip if it's likely a modifier (family relation, etc.)
                    if (len(part) > 3 and 
                        not any(modifier in part for modifier in self.non_occupation_words) and
                        not part in self.non_occupation_words):
                        occupation_like_parts.append(part)
                
                if len(occupation_like_parts) >= 2:
                    return True
        
        return False
    
    def _initialize_english_dictionary_validation(self):
        """
        Initialize English dictionary validation tools
        """
        self.lemmatizer = None
        self.english_words_set = set()
        
        if NLTK_AVAILABLE:
            try:
                self.lemmatizer = WordNetLemmatizer()
                english_words = set(words.words())
                self.english_words_set = {word.lower() for word in english_words}
                print(f"Loaded {len(self.english_words_set)} English words from NLTK")
            except Exception as e:
                print(f"Error loading NLTK resources: {e}")
                self.english_words_set = BASIC_ENGLISH_WORDS.copy()
        else:
            self.english_words_set = BASIC_ENGLISH_WORDS.copy()
        
        # Add historical occupation terms
        historical_terms = {
            'labourer', 'laborer', 'ag', 'lab', 'serv', 'dom', 'agric', 'gen',
            'blacksmith', 'whitesmith', 'tinsmith', 'goldsmith', 'silversmith',
            'wheelwright', 'millwright', 'shipwright', 'cordwainer', 'ostler',
            'victualler', 'chandler', 'draper', 'mercer', 'haberdasher',
            'maltster', 'brewer', 'distiller', 'tanner', 'currier', 'fellmonger'
        }
        self.english_words_set.update(historical_terms)

    def _is_valid_english_word(self, word: str) -> bool:
        """
        Check if a single word is a valid English word
        """
        if not word or len(word) < 2:
            return False
        
        word_lower = word.lower()
        
        # 1. Direct dictionary lookup
        if word_lower in self.english_words_set:
            return True
        
        # 2. NLTK wordnet check if available
        if NLTK_AVAILABLE and self.lemmatizer:
            try:
                # Check lemmatized form
                lemmatized = self.lemmatizer.lemmatize(word_lower)
                if lemmatized in self.english_words_set:
                    return True
                
                # Check wordnet synsets
                synsets = wordnet.synsets(word_lower)
                if synsets:
                    return True
            except:
                pass
        
        # 3. Check common occupation suffixes
        occupation_suffixes = ['er', 'or', 'ist', 'ian', 'man', 'woman', 'smith', 'wright', 'maker']
        for suffix in occupation_suffixes:
            if word_lower.endswith(suffix):
                root = word_lower[:-len(suffix)]
                if len(root) >= 3 and root in self.english_words_set:
                    return True
        
        # 4. Check plural forms
        if word_lower.endswith('s') and len(word_lower) > 3:
            singular = word_lower[:-1]
            if singular in self.english_words_set:
                return True
        
        return False

    def _validate_english_words(self, text: str) -> Dict[str, any]:
        """
        Validate English words in text for confidence calculation
        """
        if pd.isna(text) or not text.strip():
            return {
                'valid_words': [],
                'invalid_words': [],
                'validity_ratio': 0.0,
                'contains_invalid': True
            }
        
        normalized = self.normalize_text(text)
        words = normalized.split()
        
        if not words:
            return {
                'valid_words': [],
                'invalid_words': [],
                'validity_ratio': 0.0,
                'contains_invalid': True
            }
        
        valid_words = []
        invalid_words = []
        
        for word in words:
            if self._is_valid_english_word(word):
                valid_words.append(word)
            else:
                invalid_words.append(word)
        
        validity_ratio = len(valid_words) / len(words) if words else 0.0
        contains_invalid = len(invalid_words) > 0
        
        return {
            'valid_words': valid_words,
            'invalid_words': invalid_words,
            'validity_ratio': validity_ratio,
            'contains_invalid': contains_invalid
        }

    def load_dictionaries_from_excel(self, file_path: str) -> Tuple[Dict, Dict, Dict]:
        """
        Load occupation dictionaries from Excel file
        Expected sheets: 'abbreviations', 'misspellings', 'categories'
        """
        try:
            # Read Excel sheets
            abbreviations_df = pd.read_excel(file_path, sheet_name='abbreviations')
            misspellings_df = pd.read_excel(file_path, sheet_name='misspellings')
            categories_df = pd.read_excel(file_path, sheet_name='categories')
            modifiers_df = pd.read_excel(file_path, sheet_name='modifiers')
            # Convert to dictionaries
            abbreviation_dict = dict(zip(abbreviations_df['abbreviation'], abbreviations_df['full_form']))
            misspellings_dict = dict(zip(misspellings_df['misspelling'], misspellings_df['correction']))
            important_modifiers = {}
            all_modifiers_set = set()
            
            if 'modifier_word' in modifiers_df.columns and 'modifier_type' in modifiers_df.columns:
                # modifier_word, modifier_type
                for _, row in modifiers_df.iterrows():
                    word = row['modifier_word']
                    mod_type = row['modifier_type']
                    if mod_type not in important_modifiers:
                        important_modifiers[mod_type] = []
                    important_modifiers[mod_type].append(word)
                    all_modifiers_set.add(word)
            else:

                word_col = None
                type_col = None
                
                for col in modifiers_df.columns:
                    if 'modifier' in col.lower() and 'type' not in col.lower():
                        word_col = col
                    elif 'type' in col.lower():
                        type_col = col
                
                if word_col and type_col:
                    for _, row in modifiers_df.iterrows():
                        word = row[word_col]
                        mod_type = row[type_col]
                        if mod_type not in important_modifiers:
                            important_modifiers[mod_type] = []
                        important_modifiers[mod_type].append(word)
                        all_modifiers_set.add(word)
                else:
                    print("Warning: Could not find modifier columns, using defaults")
                    important_modifiers = self._get_default_modifiers()
                    all_modifiers_set = set()
                    for words in important_modifiers.values():
                        all_modifiers_set.update(words)
            
            self.all_modifiers_set = all_modifiers_set

            # Convert categories
            occupation_categories = {}
            for category in categories_df['category'].unique():
                occupations = categories_df[categories_df['category'] == category]['occupation'].tolist()
                occupation_categories[category] = occupations
            
            print(f"Loaded dictionaries from {file_path}")
            print(f"  - {len(abbreviation_dict)} abbreviations")
            print(f"  - {len(misspellings_dict)} misspellings")
            print(f"  - {len(occupation_categories)} categories")
            
            return abbreviation_dict, misspellings_dict, occupation_categories, important_modifiers
            
        except Exception as e:
            print(f"Error loading Excel file {file_path}: {e}")
            print("Using empty dictionaries")
            
            important_modifiers = self._get_default_modifiers()
            self.all_modifiers_set = set()
            for words in important_modifiers.values():
                self.all_modifiers_set.update(words)
            return {}, {}, {}, important_modifiers
        
    def _get_default_modifiers(self):
        return {
            'family_relation': ['wife', 'widow', 'daughter', 'son', 'mother', 'father', 'sister', 'brother'],
            'marital_status': ['unmarried', 'spinster', 'bachelor', 'widower', 'married'],
            'employment_status': ['unemployed', 'retired', 'former', 'late', 'ex'],
            'skill_level': ['apprentice', 'master', 'journeyman', 'assistant', 'head']
        }
    
    def normalize_text(self, text: str) -> str:
        """
        Normalize text by removing punctuation, extra spaces, and converting to lowercase
        """
        if pd.isna(text):
            return ""
        
        text = str(text).lower().strip()
        text = re.sub(r'[^\w\s\-&]', ' ', text)
        text = re.sub(r'\s+', ' ', text)
        text = text.strip()
        
        return text
    
    def expand_abbreviations(self, text: str) -> str:
        """
        Expand common abbreviations with word boundary detection
        """
        normalized_text = self.normalize_text(text)
        
        if not self.abbreviation_dict:
            return normalized_text
            
        if normalized_text in self.abbreviation_dict:
            return self.abbreviation_dict[normalized_text]
        
        sorted_abbrs = sorted(self.abbreviation_dict.items(), key=lambda x: len(x[0]), reverse=True)
        result_text = normalized_text
        
        for abbr, full_form in sorted_abbrs:
            pattern = r'\b' + re.escape(abbr) + r'\b'
            if re.search(pattern, result_text):
                result_text = re.sub(pattern, full_form, result_text)
        
        return result_text
    
    def correct_spelling(self, text: str) -> str:
        """
        FIXED: Correct common spelling errors with proper word boundary detection
        Now correctly handles "blacsmith wife" → "blacksmith wife"
        """
        if pd.isna(text):
            return ""
        
        normalized_text = self.normalize_text(text)
        
        if not self.common_misspellings:
            return normalized_text
            
        # Step 1: Check for exact match first (highest priority)
        if normalized_text in self.common_misspellings:
            return self.common_misspellings[normalized_text]
        
        result_text = normalized_text
        
        # Step 2: Apply word-level corrections
        # Sort by length (longest first) to prevent partial matches
        sorted_misspellings = sorted(self.common_misspellings.items(), key=lambda x: len(x[0]), reverse=True)
        
        for misspelling, correction in sorted_misspellings:
            # Skip if this is a multi-word entry (handled differently)
            if ' ' in misspelling:
                # For multi-word misspellings, use direct string replacement
                if misspelling in result_text:
                    result_text = result_text.replace(misspelling, correction)
            else:
                # For single-word misspellings, use word boundary matching
                pattern = r'\b' + re.escape(misspelling) + r'\b'
                if re.search(pattern, result_text, re.IGNORECASE):
                    result_text = re.sub(pattern, correction, result_text, flags=re.IGNORECASE)
        
        return result_text
    
    def fuzzy_match_occupations(self, text: str, threshold: int = 80) -> str:
        """
        Use fuzzy matching to find similar standard occupations
        """
        if not self.occupation_categories:
            return text
            
        all_standard_occupations = []
        for category, occupations in self.occupation_categories.items():
            all_standard_occupations.extend(occupations)
        
        if not all_standard_occupations:
            return text
            
        best_match = process.extractOne(
            text, 
            all_standard_occupations, 
            scorer=fuzz.ratio,
            score_cutoff=threshold
        )
        
        if best_match:
            return best_match[0]
        return text
    
    def get_occupation_category(self, occupation: str) -> str:
        """
        Determine the category of an occupation
        """
        if not self.occupation_categories:
            return 'other'
            
        occupation_lower = occupation.lower()
        
        for category, occupations in self.occupation_categories.items():
            for occ in occupations:
                if occ.lower() in occupation_lower or occupation_lower in occ.lower():
                    return category
        
        return 'other'
    
    def calculate_confidence(self, Occuption_String: str, standardized: str, is_abbreviation: bool, is_misspelled: bool) -> float:
        """
        ENHANCED: Multi-factor confidence estimation with English dictionary validation
        """
        if pd.isna(Occuption_String) or pd.isna(standardized):
            return 0.0
            
        original_clean = str(Occuption_String).lower().strip()
        standardized_clean = str(standardized).lower().strip()
        
        # Check if unchanged
        is_unchanged = original_clean == standardized_clean
        
        # English dictionary validation during confidence calculation
        original_validation = self._validate_english_words(original_clean)
        standardized_validation = self._validate_english_words(standardized_clean)
        
        # Calculate base confidence using original logic
        base_confidence = self._calculate_base_confidence_original_logic(
            original_clean, standardized_clean, is_abbreviation, is_misspelled
        )
        
        # Apply dictionary-based confidence adjustments
        final_confidence = self._apply_dictionary_confidence_adjustments(
            base_confidence, original_validation, standardized_validation, is_unchanged
        )
        
        return max(0.05, min(1.0, final_confidence))

    def _calculate_base_confidence_original_logic(self, original_clean: str, standardized_clean: str, 
                                                is_abbreviation: bool, is_misspelled: bool) -> float:
        """
        Calculate base confidence using original logic
        """
        # Perfect match gets high confidence (but not 1.0 anymore)
        if original_clean == standardized_clean:
            return 0.9  # Reduced from 1.0 to allow dictionary adjustment
        
        # Factor 1: Semantic similarity using multiple metrics
        token_similarity = fuzz.token_sort_ratio(original_clean, standardized_clean) / 100.0
        partial_similarity = fuzz.partial_ratio(original_clean, standardized_clean) / 100.0
        ratio_similarity = fuzz.ratio(original_clean, standardized_clean) / 100.0
        
        # Weighted combination of similarity metrics
        semantic_score = (0.4 * token_similarity + 0.3 * partial_similarity + 0.3 * ratio_similarity)
        
        # Factor 2: Length ratio penalty
        len_ratio = min(len(original_clean), len(standardized_clean)) / max(len(original_clean), len(standardized_clean))
        length_penalty = math.exp(1 - 1/len_ratio) if len_ratio > 0 else 0.0
        
        # Factor 3: Transformation type confidence weights
        transformation_confidence = 1.0
        
        if is_abbreviation:
            abbr_confidence = self._assess_abbreviation_quality(original_clean, standardized_clean)
            transformation_confidence *= abbr_confidence
        
        if is_misspelled:
            spell_confidence = self._assess_spelling_correction_quality(original_clean, standardized_clean)
            transformation_confidence *= spell_confidence
        
        # Factor 4: Character overlap coefficient
        char_set_original = set(original_clean.replace(' ', ''))
        char_set_standardized = set(standardized_clean.replace(' ', ''))
        char_overlap = len(char_set_original & char_set_standardized) / len(char_set_original | char_set_standardized) if char_set_original | char_set_standardized else 0
        
        # Factor 5: Word-level semantic preservation
        words_original = set(original_clean.split())
        words_standardized = set(standardized_clean.split())
        word_preservation = len(words_original & words_standardized) / len(words_original) if words_original else 0
        
        # Multi-factor confidence calculation using geometric mean
        factors = [semantic_score, length_penalty, transformation_confidence, char_overlap, word_preservation]
        non_zero_factors = [f for f in factors if f > 0]
        
        if not non_zero_factors:
            return 0.1
        
        confidence = math.pow(math.prod(non_zero_factors), 1.0/len(non_zero_factors))
        return confidence

    def _apply_dictionary_confidence_adjustments(self, base_confidence: float, original_validation: Dict, 
                                               standardized_validation: Dict, is_unchanged: bool) -> float:
        """
        Apply dictionary-based confidence adjustments
        """
        original_validity = original_validation['validity_ratio']
        standardized_validity = standardized_validation['validity_ratio']
        
        # Case 1: Unchanged with invalid words - Major confidence penalty
        if is_unchanged and original_validation['contains_invalid']:
            if original_validity < 0.5:
                return base_confidence * 0.3  # Strong penalty for many invalid words
            elif original_validity < 0.8:
                return base_confidence * 0.6  # Moderate penalty for some invalid words
            else:
                return base_confidence * 0.8  # Light penalty for few invalid words
        
        # Case 2: Unchanged with all valid words - Maintain confidence
        if is_unchanged and not original_validation['contains_invalid']:
            return base_confidence
        
        # Case 3: Modified - Check if modification improved validity
        if not is_unchanged:
            validity_improvement = standardized_validity - original_validity
            
            if validity_improvement > 0.3:
                return base_confidence * 1.1  # Bonus for significant improvement
            elif validity_improvement > 0:
                return base_confidence  # Neutral for moderate improvement
            elif standardized_validity < 0.7:
                return base_confidence * 0.7  # Penalty for poor result after modification
            else:
                return base_confidence * 0.9  # Small penalty for neutral modification
        
        return base_confidence
    
    def _assess_abbreviation_quality(self, original: str, standardized: str) -> float:
        """
        Assess the quality of abbreviation expansion
        High confidence for logical abbreviations, lower for poor matches
        """
        # Check if abbreviation follows common patterns
        original_words = original.split()
        standardized_words = standardized.split()
        
        # Pattern 1: Initials match (e.g., "ag lab" -> "agricultural labourer")
        if len(original_words) <= len(standardized_words):
            initial_match_score = 0.0
            for i, orig_word in enumerate(original_words):
                if i < len(standardized_words):
                    if standardized_words[i].startswith(orig_word[:3]):  # First 3 chars match
                        initial_match_score += 1.0
            initial_match_ratio = initial_match_score / len(original_words) if original_words else 0
            
            # High confidence if most words follow abbreviation pattern
            if initial_match_ratio >= 0.7:
                return 0.95  # Very high confidence for good abbreviations
            elif initial_match_ratio >= 0.5:
                return 0.85
            else:
                return 0.75
        
        # Default confidence for other abbreviation patterns
        return 0.8

    def _assess_spelling_correction_quality(self, original: str, standardized: str) -> float:
        """
        Assess the quality of spelling correction
        Based on edit distance and phonetic similarity
        """
        from rapidfuzz.distance import Levenshtein
        edit_distance = Levenshtein.distance(original, standardized)            
        max_len = max(len(original), len(standardized))
        
        if max_len == 0:
            return 0.5
        
        # Confidence decreases with edit distance
        edit_ratio = 1 - (edit_distance / max_len)
        
        # High confidence for minor spelling errors
        if edit_distance <= 2:
            return 0.9
        elif edit_distance <= 4:
            return 0.8
        else:
            return max(0.6, edit_ratio)
   
    def standardize_occupation(self, occupation_str: str) -> Dict:
        """
        ENHANCED: Standardize occupation string with multi-occupation detection
        """
        if pd.isna(occupation_str):
            return {
                'original': '',
                'standardized': '',
                'category': 'unknown',
                'confidence': 0.0,
                'needs_llm_check': True,
                'confidence_reason': 'empty_input',
                'invalid_words': [],
                'is_multiple_occupation': False  # NEW FIELD: Simple boolean flag
            }
        
        original = str(occupation_str).strip()
        
        # NEW: Check for multiple occupations (simple detection)
        is_multiple_occupation = self._detect_multiple_occupations(original)
        
        # PHASE 1: Preprocessing - Fix modifier variants and multi-word spellings
        # Step 1: Normalize the entire string
        normalized_full = self.normalize_text(original)
        
        # Step 2: First spelling correction - Handle modifier variants (daur→daughter)
        # This step will convert "black smith daur" → "black smith daughter"
        first_correction = self.correct_spelling(normalized_full)
        is_misspelled_phase1 = first_correction != normalized_full
        
        # PHASE 2: Modifier detection and base occupation processing  
        # Step 3: Detect modifiers on the corrected text
        modifiers, base_text = self.detect_modifiers(first_correction)
        
        # Step 4: Normalize base occupation text
        normalized_base = self.normalize_text(base_text)
        
        # Step 5: Expand abbreviations
        expanded = self.expand_abbreviations(normalized_base)
        is_abbreviation = expanded != normalized_base
        
        # Step 6: Second spelling correction - Handle base occupation spelling (black smith→blacksmith)
        corrected_base = self.correct_spelling(expanded)
        is_misspelled_phase2 = corrected_base != expanded
        
        # Combine spelling error detection from both phases
        is_misspelled = is_misspelled_phase1 or is_misspelled_phase2
        
        # Step 7: Fuzzy match to standard occupations
        matched_base = self.fuzzy_match_occupations(corrected_base, threshold=75)
        
        # Step 8: Recombine standardized result
        standardized = self.standardize_with_modifiers(original, modifiers, matched_base)
        
        # Step 9: Determine occupation category
        category = self.get_occupation_category_with_modifiers(standardized, modifiers)
        
        # Step 10: Calculate confidence score (includes dictionary validation)
        confidence = self.calculate_confidence(original, standardized, is_abbreviation, is_misspelled)

        # NEW: Adjust confidence if multiple occupation detected
        if is_multiple_occupation:
            confidence = confidence * 0.9  # Small penalty for multi-occupation complexity

        # Determine if LLM check is needed (considering multiple occupation)
        needs_llm_check, confidence_reason, invalid_words = self._determine_llm_check_need(
            original, standardized, confidence, is_abbreviation, is_misspelled, is_multiple_occupation
        )

        return {
            'original': original,
            'standardized': standardized,  
            'category': category,
            'confidence': confidence,
            'needs_llm_check': needs_llm_check,
            'confidence_reason': confidence_reason,
            'invalid_words': invalid_words,
            'is_multiple_occupation': is_multiple_occupation  # NEW FIELD: Simple boolean flag
        }
    
    def _determine_llm_check_need(self, original: str, standardized: str, confidence: float,
                                is_abbreviation: bool, is_misspelled: bool, is_multiple_occupation: bool = False) -> Tuple[bool, str, List[str]]:
        """
        ENHANCED: Determine if LLM check is needed (now considers multiple occupation)
        """
        original_clean = str(original).lower().strip()
        standardized_clean = str(standardized).lower().strip()
        is_unchanged = original_clean == standardized_clean
        
        # Validate original text for invalid words
        original_validation = self._validate_english_words(original_clean)
        
        # Priority 1: Multiple occupation - EXCLUDE from LLM check
        if is_multiple_occupation:
            return False, 'multiple_occupation_excluded_from_llm_check', original_validation['invalid_words']
        
        # Priority 2: Unchanged with invalid words (highest priority)
        if is_unchanged and original_validation['contains_invalid']:
            if original_validation['validity_ratio'] < 0.5:
                return True, 'unchanged_with_many_invalid_words', original_validation['invalid_words']
            elif original_validation['validity_ratio'] < 0.8:
                return True, 'unchanged_with_some_invalid_words', original_validation['invalid_words']
            else:
                return False, 'unchanged_with_few_invalid_words', original_validation['invalid_words']
        
        # Priority 3: Very low confidence regardless of change
        if confidence < 0.3:
            return True, 'very_low_confidence', original_validation['invalid_words']
        
        # Priority 4: Modified but still has quality issues
        if not is_unchanged:
            standardized_validation = self._validate_english_words(standardized_clean)
            if standardized_validation['validity_ratio'] < 0.7:
                return True, 'modification_poor_result', standardized_validation['invalid_words']
        
        # Priority 5: Low confidence
        if confidence < 0.5:
            return True, 'low_confidence', original_validation['invalid_words']
        
        # No LLM check needed
        return False, 'acceptable_quality', []
    
    def detect_modifiers(self, text: str) -> Tuple[List[str], str]:
        """
        Detect important modifier words in occupation text
        Returns: (list_of_modifiers, base_occupation_text)
        """
        if pd.isna(text):
            return [], ""
        
        normalized_text = self.normalize_text(text)
        words = normalized_text.split()
        
        detected_modifiers = []
        remaining_words = []
        
        modifier_set = getattr(self, 'all_modifiers_set', set())
        if not modifier_set and self.important_modifiers:
            for modifier_list in self.important_modifiers.values():
                if isinstance(modifier_list, list):
                    modifier_set.update(modifier_list)
        
        for word in words:
            if word in modifier_set:
                detected_modifiers.append(word)
            else:
                remaining_words.append(word)
        
        base_text = " ".join(remaining_words).strip()
        return detected_modifiers, base_text

    def standardize_with_modifiers(self, text: str, modifiers: List[str], base_occupation: str) -> str:
        """
        Standardize occupation while preserving important modifiers
        """
        if not base_occupation:
            return text
        
        standardized_base = self.fuzzy_match_occupations(base_occupation, threshold=75)
        
        if not modifiers:
            return standardized_base
        
        family_words = self.important_modifiers.get('family_relation', [])
        family_modifiers = [m for m in modifiers if m in family_words]
        
        if family_modifiers:
            if 'wife' in family_modifiers:
                return f"{standardized_base} wife"
            elif 'widow' in family_modifiers:
                return f"{standardized_base} widow" 
            elif 'daughter' in family_modifiers:
                return f"{standardized_base} daughter"
            elif 'son' in family_modifiers:
                return f"{standardized_base} son"
            else:
                return f"{standardized_base} {family_modifiers[0]}"
        
        skill_words = self.important_modifiers.get('skill_level', [])
        skill_modifiers = [m for m in modifiers if m in skill_words]
        if skill_modifiers:
            return f"{skill_modifiers[0]} {standardized_base}"
        
        employment_words = self.important_modifiers.get('employment_status', [])
        employment_modifiers = [m for m in modifiers if m in employment_words]
        if employment_modifiers:
            return f"{employment_modifiers[0]} {standardized_base}"
        
        if modifiers:
            return f"{' '.join(modifiers)} {standardized_base}".strip()
        
        return standardized_base

    def get_occupation_category_with_modifiers(self, occupation: str, modifiers: List[str]) -> str:
        """
        Determine category considering modifiers
        """
        if not modifiers:
            return self.get_occupation_category(occupation)
        
        # Check for family-related modifiers
        family_words = self.important_modifiers.get('family_relation', [])
        if any(m in family_words for m in modifiers):
            return 'family_dependent'
        
        # Check for unemployment
        employment_words = self.important_modifiers.get('employment_status', [])
        if any(m in employment_words for m in modifiers):
            return 'unemployed_seeking'
        
        # Otherwise use standard category detection
        return self.get_occupation_category(occupation)
    
    def batch_standardize_occupations(self, df: pd.DataFrame, chunk_size: int = 1000) -> Tuple[pd.DataFrame, pd.DataFrame]:
        """
        ENHANCED: Standardize all occupations with simple multi-occupation flagging
        """
        results = []
        unique_occupations = df["Occupation_String"].unique()
        
        print(f"Standardizing {len(unique_occupations)} unique occupations...")
        print("Multi-occupation detection enabled (simple flagging)")
        
        # Track multi-occupation statistics
        multi_occupation_count = 0
        
        for i in tqdm(range(0, len(unique_occupations), chunk_size), desc="Processing occupation chunks"):
            chunk_occupations = unique_occupations[i:i + chunk_size]
            chunk_results = []
            
            for occupation in chunk_occupations:
                result = self.standardize_occupation(occupation)
                
                # Track multi-occupation statistics
                if result.get('is_multiple_occupation', False):
                    multi_occupation_count += 1
                
                chunk_results.append(result)
            
            results.extend(chunk_results)
            gc.collect()
        
        standardization_df = pd.DataFrame(results)
        
        print("Merging results with original DataFrame by county...")
        df_chunks = []
        
        for county in tqdm(df['County'].unique(), desc="Processing counties"):
            county_df = df[df['County'] == county].copy()
            
            for i in range(0, len(county_df), chunk_size):
                chunk_df = county_df.iloc[i:i + chunk_size].copy()
                chunk_merged = chunk_df.merge(
                    standardization_df, 
                    left_on="Occupation_String", 
                    right_on="original", 
                    how="left"
                )
                df_chunks.append(chunk_merged)
        
        df_standardized = pd.concat(df_chunks, ignore_index=True)
        df_standardized = df_standardized.drop(columns=['original'])

        # Print comprehensive summary statistics
        total_records = len(df_standardized)
        llm_needed = df_standardized['needs_llm_check'].sum()
        llm_percentage = (llm_needed / total_records) * 100 if total_records > 0 else 0
        avg_confidence = df_standardized['confidence'].mean()
        
        # Multi-occupation statistics
        multi_occupation_records = df_standardized['is_multiple_occupation'].sum()
        multi_occupation_percentage = (multi_occupation_records / total_records) * 100 if total_records > 0 else 0
        
        print(f"\n=== ENHANCED PROCESSING SUMMARY ===")
        print(f"Total records processed: {total_records:,}")
        print(f"Records needing LLM check: {llm_needed:,} ({llm_percentage:.1f}%)")
        print(f"Average confidence score: {avg_confidence:.3f}")
        print(f"Multi-occupation records detected: {multi_occupation_records:,} ({multi_occupation_percentage:.1f}%)")
        
        # Show top reasons for LLM check
        if llm_needed > 0:
            print(f"\nTop reasons for LLM check:")
            reason_counts = df_standardized['confidence_reason'].value_counts()
            for reason, count in reason_counts.head(7).items():
                percentage = (count / total_records) * 100
                print(f"  {reason}: {count:,} ({percentage:.1f}%)")

        return df_standardized, standardization_df

def Occupation_cleaning_pipeline(df: pd.DataFrame, 
                                excel_file_path: str = None,
                                min_frequency: int = 2,
                                chunk_size: int = 1000):
    """
    Complete traditional cleaning pipeline with simple multi-occupation detection
    """
    cleaner = TraditionalOccupationCleaner(excel_file_path)
    
    print(f"Processing {len(df)} records in chunks of {chunk_size}...")
    print("Multi-occupation detection: Simple flagging enabled")
    
    # Standardize occupations
    print("Standardizing occupations...")
    df_standardized, standardization_results = cleaner.batch_standardize_occupations(df, chunk_size)

    return df_standardized   

def integrate_traditional_cleaning_with_export(df: pd.DataFrame, 
                                             excel_file_path: str = None,
                                             chunk_size: int = 1000,
                                             export_mapping: bool = True,
                                             output_dir: str = "."):
    """
    ENHANCED: Memory-efficient integration with simple multi-occupation detection and LLM flagging
    """
    n_records = len(df)
    n_unique_occupations = len(df["Occupation_String"].unique())
    
    print(f"Dataset: {n_records} records, {n_unique_occupations} unique occupations")
    print(f"Counties: {len(df['County'].unique())}")

    df_cleaned = Occupation_cleaning_pipeline(
        df, 
        excel_file_path,
        chunk_size=chunk_size
    ) 
    df_result = df_cleaned
    
    main_output = os.path.join(output_dir, "cleaned_occupations_with_categories.csv")
    df_result.to_csv(main_output, index=False)
    print(f"Exported main dataset to {main_output}")

    # Export LLM check subset
    llm_check_subset = df_result[df_result['needs_llm_check'] == True].copy()
    if len(llm_check_subset) > 0:
        # Sort by confidence (lowest first) for prioritized review
        llm_check_subset = llm_check_subset.sort_values('confidence')
        llm_output = os.path.join(output_dir, "llm_check_needed.csv")
        llm_check_subset.to_csv(llm_output, index=False)
        print(f"Exported LLM check subset to {llm_output} ({len(llm_check_subset):,} records)")

    # NEW: Export multi-occupation flagged records
    multi_occupation_subset = df_result[df_result['is_multiple_occupation'] == True].copy()
    if len(multi_occupation_subset) > 0:
        multi_output = os.path.join(output_dir, "multi_occupation_flagged.csv")
        multi_occupation_subset.to_csv(multi_output, index=False)
        print(f"Exported multi-occupation flagged records to {multi_output} ({len(multi_occupation_subset):,} records)")

    return df_result


# Usage example with simple multi-occupation detection
if __name__ == "__main__":
    # Load your data
    df = sample
    
    # Ensure County column exists
    if 'County' not in df.columns:
        print("Warning: 'County' column not found. Adding default county...")
        df['County'] = 'Unknown'

    # Set parameters
    CHUNK_SIZE = 1000
    EXCEL_FILE_PATH = "occupation_dictionaries.xlsx"
    
    print(f"Loaded {len(df)} records from {len(df['County'].unique())} counties.")
    print(f"Processing in chunks of {CHUNK_SIZE} per county...")
    
    # Create output directory
    os.makedirs("./cleaning_results_1851_validation", exist_ok=True)
    
    # Run enhanced cleaning pipeline with simple multi-occupation detection
    df_cleaned = integrate_traditional_cleaning_with_export(
        df, 
        excel_file_path=EXCEL_FILE_PATH,
        chunk_size=CHUNK_SIZE,
        export_mapping=True,
        output_dir="./cleaning_results_1851_validation"
    )
    
    print(f"\nProcessing completed! Cleaned {len(df_cleaned)} records.")

  from .autonotebook import tqdm as notebook_tqdm
  warn(


NLTK resources loaded successfully
Loaded 1000 records from 56 counties.
Processing in chunks of 1000 per county...
Dataset: 1000 records, 977 unique occupations
Counties: 56
Loaded 234377 English words from NLTK
Loaded dictionaries from occupation_dictionaries.xlsx
  - 235 abbreviations
  - 84 misspellings
  - 17 categories
Processing 1000 records in chunks of 1000...
Multi-occupation detection: Simple flagging enabled
Standardizing occupations...
Standardizing 977 unique occupations...
Multi-occupation detection enabled (simple flagging)


Processing occupation chunks: 100%|██████████| 1/1 [00:03<00:00,  3.44s/it]


Merging results with original DataFrame by county...


Processing counties: 100%|██████████| 56/56 [00:00<00:00, 873.70it/s]


=== ENHANCED PROCESSING SUMMARY ===
Total records processed: 1,000
Records needing LLM check: 189 (18.9%)
Average confidence score: 0.768
Multi-occupation records detected: 30 (3.0%)

Top reasons for LLM check:
  acceptable_quality: 771 (77.1%)
  unchanged_with_some_invalid_words: 95 (9.5%)
  unchanged_with_many_invalid_words: 50 (5.0%)
  modification_poor_result: 34 (3.4%)
  multiple_occupation_excluded_from_llm_check: 30 (3.0%)
  unchanged_with_few_invalid_words: 10 (1.0%)
  low_confidence: 10 (1.0%)
Exported main dataset to ./cleaning_results_1851_validation\cleaned_occupations_with_categories.csv
Exported LLM check subset to ./cleaning_results_1851_validation\llm_check_needed.csv (189 records)
Exported multi-occupation flagged records to ./cleaning_results_1851_validation\multi_occupation_flagged.csv (30 records)

Processing completed! Cleaned 1000 records.





## Single Process with LLM:Qwen

In [18]:
import pandas as pd
import numpy as np
import requests
import json
import time
import re
import os
from typing import Dict, List, Optional, Tuple
from tqdm import tqdm
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import string

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class OptimizedSingleProcessor:
    """
    Optimized single record processor - accurate and efficient
    """
    
    def __init__(self, 
                 model_name: str = "llama2:7b",
                 ollama_url: str = "http://localhost:11434",
                 timeout: int = 45,
                 max_retries: int = 2,
                 delay_between_requests: float = 0.1,  # Very fast processing
                 max_workers: int = 6):                # More workers for single requests
        """
        Initialize optimized single processor
        """
        self.model_name = model_name
        self.ollama_url = ollama_url
        self.timeout = timeout
        self.max_retries = max_retries
        self.delay_between_requests = delay_between_requests
        self.max_workers = max_workers
        
        # Thread lock for statistics
        self.lock = threading.Lock()
        
        # Statistics
        self.stats = {
            "total_processed": 0,
            "corrections_made": 0,
            "api_errors": 0,
            "processing_time": 0,
            "average_response_time": 0,
            "unique_records_processed": 0,        # NEW: Track unique records
            "duplicate_records_saved": 0,         # NEW: Track duplicates saved
            "deduplication_ratio": 0.0            # NEW: Track deduplication efficiency
        }
        
        # Test connection
        self._test_connection()
        logger.info(f" Optimized single processor initialized")
        logger.info(f" Max workers: {max_workers}, Delay: {delay_between_requests}s")
    
    def _test_connection(self):
        """Test Ollama connection"""
        try:
            response = requests.get(f"{self.ollama_url}/api/tags", timeout=10)
            if response.status_code == 200:
                models = response.json().get("models", [])
                model_names = [m["name"] for m in models]
                logger.info(f" Ollama connection successful! Available models: {model_names}")
                
                if self.model_name not in model_names:
                    logger.warning(f"  Model '{self.model_name}' not found. Please run: ollama pull {self.model_name}")
            else:
                logger.error(f" Ollama connection failed: HTTP {response.status_code}")
        except Exception as e:
            logger.error(f" Cannot connect to Ollama service: {e}")
    
    def create_optimized_single_prompt(self, original: str, rule_based_result: str, 
                                     confidence: float, county: str = "", sex: str = "", Occode_Desc: str = "") -> str:
        """
        Create optimized prompt for single record processing
        """
        
        # Add context if available
        context = ""
        if county and county != "Unknown":
            context += f" County: {county}."
        if sex and sex != "Unknown":
            context += f" Gender: {sex}."
        if Occode_Desc and Occode_Desc != "Unknown":
            context += f" Description for Original:{Occode_Desc}."
        
        prompt = f"""Fix the spelling and formatting of this British historical occupation. Keep the meaning exactly the same.

Original: "{original}"
Current: "{rule_based_result}"
Confidence: {confidence:.2f}{context}

Rules:
- ONLY fix spelling mistakes and formatting
- Keep family terms: wife, widow, daughter, son
- Keep core occupation unchanged (blacksmith stays blacksmith)
- If Original is good, keep it
- Do NOT change occupation meaning

Examples:
- "black smith" → "blacksmith" (fix spacing)
- "ag lab" → "agricultural labourer" (standard abbreviation)
- "black smith wife" → "blacksmith wife" (fix spacing, keep family term)

Output only the corrected occupation:"""
        
        return prompt
    
    def call_ollama_optimized(self, prompt: str) -> str:
        """
        Optimized Ollama API call for single records
        """
        for attempt in range(self.max_retries):
            try:
                response = requests.post(
                    f"{self.ollama_url}/api/generate",
                    json={
                        "model": self.model_name,
                        "prompt": prompt,
                        "stream": False,
                        "options": {
                            "temperature": 0.0,         # Zero randomness for consistency
                            "num_predict": 50,          # Short responses for single occupations
                            "top_k": 1,                 # Most deterministic
                            "top_p": 0.1,               # Very focused
                            "repeat_penalty": 1.0,     # No penalty needed
                            "stop": ["Original:", "Current:", "Rules:", "Examples:", "Output"]
                        }
                    },
                    timeout=self.timeout
                )
                response.raise_for_status()
                
                result = response.json()
                return result.get("response", "").strip()
                
            except requests.exceptions.Timeout:
                logger.warning(f"API timeout (attempt {attempt + 1}/{self.max_retries})")
                if attempt < self.max_retries - 1:
                    time.sleep(1)  # Short wait before retry
                else:
                    return ""
                    
            except Exception as e:
                logger.error(f"API request failed (attempt {attempt + 1}): {e}")
                if attempt < self.max_retries - 1:
                    time.sleep(0.5)
                else:
                    return ""
        
        return ""
    
    def parse_single_response_clean(self, response_text: str, original_result: str) -> str:
        """
        Clean parser for single record responses
        """
        if not response_text:
            return original_result
        
        # Clean the response
        cleaned = response_text.strip()
        
        # Remove common prefixes that LLM might add
        prefixes = [
            "Output:", "Result:", "Corrected:", "Answer:", "The corrected occupation is:",
            "Corrected occupation:", "Fixed:", "Final result:"
        ]
        
        for prefix in prefixes:
            if cleaned.lower().startswith(prefix.lower()):
                cleaned = cleaned[len(prefix):].strip()
                break
        
        # Remove quotes and punctuation
        cleaned = cleaned.strip('"\'.,!?')
        
        # Remove any trailing explanation
        # Split on common explanation starters
        explanation_starters = [" because", " since", " as", " (", " -", "\n"]
        for starter in explanation_starters:
            if starter in cleaned:
                cleaned = cleaned.split(starter)[0].strip()
        
        # Validate result
        if not cleaned or len(cleaned) > 100:
            return original_result
        
        # Check for error responses
        error_words = ['sorry', 'cannot', 'unclear', 'error', 'invalid', 'unsure', 'difficult']
        if any(word in cleaned.lower() for word in error_words):
            return original_result
        
        # Additional validation - check if result is reasonable
        if self._validate_single_result(cleaned, original_result):
            return cleaned
        else:
            return original_result
    
    def _validate_single_result(self, llm_result: str, original_result: str) -> bool:
        """
        Validate that LLM result is reasonable for single record
        """
        llm_lower = llm_result.lower()
        orig_lower = original_result.lower()
        
        # If same as original, it's valid
        if llm_lower == orig_lower:
            return True
        
        # Check length - shouldn't be too different
        if abs(len(llm_result) - len(original_result)) > 20:
            return False
        
        # Should contain similar key words
        llm_words = set(llm_lower.split())
        orig_words = set(orig_lower.split())
        
        # At least 50% word overlap for short occupations
        if len(orig_words) <= 3:
            overlap = len(llm_words & orig_words) / len(orig_words | llm_words)
            if overlap < 0.3:
                return False
        
        return True
    
    def process_single_record_optimized(self, record: Dict) -> Dict:
        """
        Process single record with optimization
        """
        start_time = time.time()
        
        try:
            # Create prompt
            prompt = self.create_optimized_single_prompt(
                record['original'],
                record['rule_based_result'],
                record['confidence'],
                record.get('county', ''),
                record.get('sex', ''),
                record.get('Occode_Desc', '')
            )
            
            # Call LLM
            llm_response = self.call_ollama_optimized(prompt)
            
            # Parse result
            final_result = self.parse_single_response_clean(llm_response, record['rule_based_result'])
            
            # Determine if correction was made
            is_corrected = final_result != record['rule_based_result']
            
            processing_time = time.time() - start_time
            
            # Update statistics
            with self.lock:
                self.stats["total_processed"] += 1
                if is_corrected:
                    self.stats["corrections_made"] += 1
                self.stats["processing_time"] += processing_time
                
                # Update average response time
                self.stats["average_response_time"] = self.stats["processing_time"] / self.stats["total_processed"]
            
            return {
                'Occupation_String': record['original'],
                'rule_based_result': record['rule_based_result'],
                'rule_based_confidence': record['confidence'],
                'county': record.get('county', 'Unknown'),
                'sex': record.get('sex', 'Unknown'),
                'Occode_Desc': record.get('Occode_Desc', 'Unknown'),
                'llm_corrected': final_result,
                'is_corrected': is_corrected,
                'final_result': final_result,
                'processing_time': processing_time,
                'llm_response_raw': llm_response[:100] if llm_response else ''
            }
            
        except Exception as e:
            logger.error(f"Error processing record: {e}")
            
            with self.lock:
                self.stats["api_errors"] += 1
            
            return {
                'Occupation_String': record['original'],
                'rule_based_result': record['rule_based_result'],
                'rule_based_confidence': record['confidence'],
                'county': record.get('county', 'Unknown'),
                'sex': record.get('sex', 'Unknown'),
                'Occode_Desc': record.get('Occode_Desc', 'Unknown'),
                'llm_corrected': record['rule_based_result'],
                'is_corrected': False,
                'final_result': record['rule_based_result'],
                'processing_time': 0,
                'llm_response_raw': f'Error: {str(e)}'
            }
    
    def save_checkpoint_single(self, df: pd.DataFrame, processed_count: int, checkpoint_file: str):
        """Save checkpoint for single processing"""
        checkpoint_data = {
            "processed_count": processed_count,
            "stats": self.stats,
            "timestamp": time.time(),
            "processing_mode": "single_record"
        }
        
        # Save progress info
        with open(checkpoint_file + '.json', 'w') as f:
            json.dump(checkpoint_data, f, indent=2)
        
        # Save current data
        df.to_csv(checkpoint_file + '.csv', index=False)
        
        logger.info(f" Checkpoint saved: {processed_count:,} records processed")
    
    def load_checkpoint_single(self, checkpoint_file: str) -> Tuple[Optional[pd.DataFrame], int]:
        """Load checkpoint for single processing"""
        json_file = checkpoint_file + '.json'
        csv_file = checkpoint_file + '.csv'
        
        if os.path.exists(json_file) and os.path.exists(csv_file):
            # Load progress info
            with open(json_file, 'r') as f:
                checkpoint_data = json.load(f)
            
            # Load data
            df = pd.read_csv(csv_file)
            
            self.stats = checkpoint_data.get("stats", self.stats)
            processed_count = checkpoint_data.get("processed_count", 0)
            
            logger.info(f"🔄 Resumed from checkpoint: {processed_count:,} records processed")
            return df, processed_count
        
        return None, 0
    
    def create_unique_records_map(self, df: pd.DataFrame, confidence_threshold: float) -> Tuple[pd.DataFrame, Dict[str, str]]:
        """
        NEW METHOD: Create unique records for processing and mapping back to original
        """
        # Filter records that need processing
        mask = df['confidence'] < confidence_threshold
        to_process_df = df[mask].copy()
        
        if len(to_process_df) == 0:
            logger.info(" No records need processing")
            return pd.DataFrame(), {}
        
        logger.info(f" Records below confidence threshold: {len(to_process_df):,}")
        
        # Group by Occupation_String to find duplicates
        occupation_groups = to_process_df.groupby('Occupation_String').agg({
            'standardized': 'first',  # Take first standardized result
            'confidence': 'first',    # Take first confidence score
            'County': 'first',        # Take first county (for context)
            'Sex': lambda x: x.mode().iloc[0] if not x.mode().empty else x.iloc[0],  # Take most common sex
            'Occode_Desc': 'first'    # Take first description
        }).reset_index()
        
        # Count duplicates
        occupation_counts = to_process_df['Occupation_String'].value_counts()
        
        # Create mapping from unique occupation to final result
        unique_to_final_map = {}
        
        # Add duplicate count to stats
        total_duplicates = len(to_process_df) - len(occupation_groups)
        self.stats["duplicate_records_saved"] = total_duplicates
        self.stats["unique_records_processed"] = len(occupation_groups)
        self.stats["deduplication_ratio"] = total_duplicates / len(to_process_df) if len(to_process_df) > 0 else 0
        
        logger.info(f" Unique occupation strings to process: {len(occupation_groups):,}")
        logger.info(f" Duplicate records saved from processing: {total_duplicates:,}")
        logger.info(f" Deduplication efficiency: {self.stats['deduplication_ratio']:.1%}")
        
        # Show top duplicates
        top_duplicates = occupation_counts.head(5)
        if len(top_duplicates) > 0:
            logger.info(" Most frequent occupation strings:")
            for occupation, count in top_duplicates.items():
                if count > 1:
                    logger.info(f"  '{occupation}': {count:,} occurrences")
        
        return occupation_groups, unique_to_final_map
    
    def apply_unique_results_to_original(self, df: pd.DataFrame, unique_results: pd.DataFrame, 
                                       confidence_threshold: float) -> pd.DataFrame:
        """
        NEW METHOD: Apply results from unique processing back to original dataframe
        """
        logger.info(" Mapping unique results back to original dataset...")
        
        # Create mapping from occupation string to final result
        occupation_to_result = dict(zip(unique_results['Occupation_String'], unique_results['final_result']))
        
        # Add final_result column if not exists
        if 'final_result' not in df.columns:
            df['final_result'] = df['standardized'].copy()
        
        # Apply results to all matching occupation strings
        mask = df['confidence'] < confidence_threshold
        updated_count = 0
        
        for idx, row in df[mask].iterrows():
            occupation = row['Occupation_String']
            if occupation in occupation_to_result:
                df.loc[idx, 'final_result'] = occupation_to_result[occupation]
                updated_count += 1
        
        logger.info(f" Applied results to {updated_count:,} records")
        
        return df
    
    def process_large_dataset_single(self, 
                                   df: pd.DataFrame,
                                   confidence_threshold: float = 0.8,
                                   chunk_size: int = 20000,        # Large chunks for memory efficiency
                                   save_interval: int = 1000,      # Save every 1000 records
                                   checkpoint_file: str = "ollama_single_checkpoint",
                                   resume_from_checkpoint: bool = True,
                                   use_threading: bool = True,
                                   use_unique_processing: bool = True) -> pd.DataFrame:  # NEW PARAMETER
        """
        ENHANCED: Process large dataset with optimized single record processing and unique record deduplication
        """
        
        logger.info(" Starting optimized single record processing with unique deduplication")
        logger.info(f" Total records: {len(df):,}")
        logger.info(f" Threading: {'Enabled' if use_threading else 'Disabled'} ({self.max_workers} workers)")
        logger.info(f" Unique processing: {'Enabled' if use_unique_processing else 'Disabled'}")
        
        # NEW: Create unique records if enabled
        if use_unique_processing:
            unique_df, unique_map = self.create_unique_records_map(df, confidence_threshold)
            
            if len(unique_df) == 0:
                logger.info(" All records have been processed or no records need processing")
                return df
            
            # Process unique records instead of full dataset
            processing_df = unique_df.copy()
        else:
            # Original logic: process full dataset
            processing_df = df.copy()
        
        # Try to resume from checkpoint
        processed_count = 0
        if resume_from_checkpoint:
            loaded_df, processed_count = self.load_checkpoint_single(checkpoint_file)
            if loaded_df is not None:
                processing_df = loaded_df
                logger.info(f" Successfully resumed from checkpoint")
        
        # Add final_result column if not exists
        if 'final_result' not in processing_df.columns:
            processing_df['final_result'] = processing_df['standardized'].copy()
        
        # Filter records that need processing (for unique processing, all records need processing)
        if use_unique_processing:
            to_process_indices = processing_df.index.tolist()
        else:
            mask = processing_df['confidence'] < confidence_threshold
            to_process_indices = processing_df[mask].index.tolist()
        
        # Skip already processed records
        if processed_count > 0:
            to_process_indices = [idx for idx in to_process_indices if idx >= processed_count]
        
        logger.info(f" Records to process: {len(to_process_indices):,}")
        logger.info(f" Already processed: {processed_count:,}")
        
        if len(to_process_indices) == 0:
            logger.info(" All records have been processed")
            if use_unique_processing:
                # Map results back to original dataset
                return self.apply_unique_results_to_original(df, processing_df, confidence_threshold)
            else:
                return processing_df
        
        # Estimate processing time
        if self.stats["total_processed"] > 0:
            avg_time = self.stats["average_response_time"]
            estimated_time = avg_time * len(to_process_indices)
            logger.info(f"  Estimated remaining time: {estimated_time/3600:.1f} hours")
        
        # Prepare records for processing
        records_to_process = []
        for idx in to_process_indices:
            row = processing_df.loc[idx]
            record = {
                'index': idx,
                'original': row['Occupation_String'],
                'rule_based_result': row['standardized'],
                'confidence': row['confidence'],
                'county': row.get('County', 'Unknown'),
                'sex': row.get('Sex', 'Unknown'),
                'Occode_Desc': row.get('Occode_Desc', 'Unknown')
            }
            records_to_process.append(record)
        
        # Process records
        try:
            if use_threading and self.max_workers > 1:
                self._process_records_threaded(processing_df, records_to_process, processed_count, save_interval, checkpoint_file)
            else:
                self._process_records_sequential(processing_df, records_to_process, processed_count, save_interval, checkpoint_file)
        
        except KeyboardInterrupt:
            logger.info(" Processing interrupted, saving current progress...")
            self.save_checkpoint_single(processing_df, processed_count, checkpoint_file)
            raise
        
        # Final save
        self.save_checkpoint_single(processing_df, len(to_process_indices), checkpoint_file)
        
        # NEW: If using unique processing, map results back to original dataset
        if use_unique_processing:
            final_df = self.apply_unique_results_to_original(df, processing_df, confidence_threshold)
            logger.info(" Optimized unique record processing completed!")
        else:
            final_df = processing_df
            logger.info(" Optimized single record processing completed!")
        
        self.print_final_statistics()
        
        return final_df
    
    def _process_records_sequential(self, df: pd.DataFrame, records: List[Dict],
                                  processed_count: int, save_interval: int, checkpoint_file: str):
        """Process records sequentially"""
        current_count = processed_count
        
        for record in tqdm(records, desc="Processing records"):
            enhanced_record = self.process_single_record_optimized(record)
            
            # Update DataFrame
            idx = record['index']
            df.loc[idx, 'final_result'] = enhanced_record['final_result']
            
            current_count += 1
            
            # Save checkpoint periodically
            if current_count % save_interval == 0:
                self.save_checkpoint_single(df, current_count, checkpoint_file)
            
            # Delay between requests
            if self.delay_between_requests > 0:
                time.sleep(self.delay_between_requests)
    
    def _process_records_threaded(self, df: pd.DataFrame, records: List[Dict],
                                processed_count: int, save_interval: int, checkpoint_file: str):
        """Process records with threading"""
        current_count = processed_count
        processed_indices = []
        
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # Submit all records
            future_to_record = {
                executor.submit(self.process_single_record_optimized, record): record
                for record in records
            }
            
            # Process completed results
            for future in tqdm(as_completed(future_to_record), total=len(records), desc="Processing records"):
                record = future_to_record[future]
                
                try:
                    enhanced_record = future.result()
                    
                    # Update DataFrame
                    idx = record['index']
                    df.loc[idx, 'final_result'] = enhanced_record['final_result']
                    
                    processed_indices.append(idx)
                    current_count += 1
                    
                    # Save checkpoint periodically
                    if current_count % save_interval == 0:
                        self.save_checkpoint_single(df, current_count, checkpoint_file)
                
                except Exception as e:
                    logger.error(f"Record processing failed: {e}")
                
                # Small delay for rate limiting
                if self.delay_between_requests > 0:
                    time.sleep(self.delay_between_requests)
    
    def print_final_statistics(self):
        """Print comprehensive final statistics"""
        logger.info("\n" + "="*60)
        logger.info(" OPTIMIZED SINGLE PROCESSING STATISTICS")
        logger.info("="*60)
        logger.info(f"Total records processed: {self.stats['total_processed']:,}")
        logger.info(f"Total corrections made: {self.stats['corrections_made']:,}")
        logger.info(f"API errors: {self.stats['api_errors']:,}")
        
        # NEW: Unique processing statistics
        if self.stats['unique_records_processed'] > 0:
            logger.info(f"Unique records processed: {self.stats['unique_records_processed']:,}")
            logger.info(f"Duplicate records saved: {self.stats['duplicate_records_saved']:,}")
            logger.info(f"Deduplication efficiency: {self.stats['deduplication_ratio']:.1%}")
        
        if self.stats['total_processed'] > 0:
            success_rate = (self.stats['total_processed'] - self.stats['api_errors']) / self.stats['total_processed'] * 100
            correction_rate = self.stats['corrections_made'] / self.stats['total_processed'] * 100
            avg_speed = self.stats['total_processed'] / self.stats['processing_time'] if self.stats['processing_time'] > 0 else 0
            
            logger.info(f"Success rate: {success_rate:.1f}%")
            logger.info(f"Correction rate: {correction_rate:.1f}%")
            logger.info(f"Average processing speed: {avg_speed:.1f} records/second")
            logger.info(f"Average response time: {self.stats['average_response_time']:.2f} seconds")
            logger.info(f"Total processing time: {self.stats['processing_time']/3600:.2f} hours")
        
        logger.info("="*60)

# Enhanced convenience function with unique processing
def process_occupation_data_single_optimized(df_cleaned: pd.DataFrame,
                                           model_name: str = "Qwen2.5:7b-instruct",
                                           confidence_threshold: float = 0.8,
                                           chunk_size: int = 20000,
                                           save_interval: int = 1000,
                                           delay: float = 0.1,
                                           output_file: str = "df_cleaned_single_optimized.csv",
                                           resume_from_checkpoint: bool = True,
                                           use_threading: bool = True,
                                           max_workers: int = 6,
                                           use_unique_processing: bool = True) -> pd.DataFrame:  # NEW PARAMETER
    """
    ENHANCED: Optimized single record processing function with unique processing
    
    Args:
        df_cleaned: Cleaned DataFrame
        model_name: Ollama model name
        confidence_threshold: Confidence threshold for processing
        chunk_size: Records per processing chunk
        save_interval: Checkpoint save interval
        delay: Delay between requests (very small for single processing)
        output_file: Output file name
        resume_from_checkpoint: Whether to resume from checkpoint
        use_threading: Whether to use multi-threading
        max_workers: Number of concurrent workers
        use_unique_processing: NEW - Whether to deduplicate by Occupation_String before processing
    """
    
    logger.info(" Starting optimized single record Ollama processing with unique deduplication")
    logger.info(f" Input data: {len(df_cleaned):,} records")
    logger.info(f" Single record mode with {max_workers} workers")
    logger.info(f" Threading: {'Enabled' if use_threading else 'Disabled'}")
    logger.info(f" Unique processing: {'Enabled' if use_unique_processing else 'Disabled'}")
    
    # Check required columns
    required_columns = ['Occupation_String', 'standardized', 'confidence']
    missing_columns = [col for col in required_columns if col not in df_cleaned.columns]
    if missing_columns:
        raise ValueError(f"Missing required columns: {missing_columns}")
    
    # Check auxiliary columns
    auxiliary_columns = []
    for col in ['County', 'Sex', 'Occode_Desc']:
        if col in df_cleaned.columns:
            auxiliary_columns.append(col)
    
    logger.info(f" Available auxiliary columns: {auxiliary_columns}")
    
    # NEW: Show deduplication potential analysis
    if use_unique_processing:
        mask = df_cleaned['confidence'] < confidence_threshold
        to_process_df = df_cleaned[mask]
        
        if len(to_process_df) > 0:
            unique_occupations = to_process_df['Occupation_String'].nunique()
            total_to_process = len(to_process_df)
            potential_savings = total_to_process - unique_occupations
            savings_percentage = (potential_savings / total_to_process) * 100 if total_to_process > 0 else 0
            
            logger.info(f"🔍 Deduplication analysis:")
            logger.info(f"  Total records to process: {total_to_process:,}")
            logger.info(f"  Unique occupation strings: {unique_occupations:,}")
            logger.info(f"  Potential processing savings: {potential_savings:,} ({savings_percentage:.1f}%)")
            
    
    # Create optimized single processor
    processor = OptimizedSingleProcessor(
        model_name=model_name,
        delay_between_requests=delay,
        max_workers=max_workers
    )
    
    # Process data with single record optimization and unique processing
    df_result = processor.process_large_dataset_single(
        df_cleaned,
        confidence_threshold=confidence_threshold,
        chunk_size=chunk_size,
        save_interval=save_interval,
        resume_from_checkpoint=resume_from_checkpoint,
        use_threading=use_threading,
        use_unique_processing=use_unique_processing  # NEW PARAMETER
    )
    
    # final_result to lowercase and remove all puncuation
    df_result['final_result'] = df_result['final_result'].str.lower().str.replace(f"[{string.punctuation}]", "", regex=True)

    # Save final results
    df_result.to_csv(output_file, index=False)
    logger.info(f" Final results saved to: {output_file}")
    
    # Show correction examples
    corrections_mask = df_result['final_result'] != df_result['standardized']
    corrections = df_result[corrections_mask]
    
    if len(corrections) > 0:
        logger.info(f"\n Correction examples (total: {len(corrections):,}):")
        for _, row in corrections.head(5).iterrows():
            logger.info(f"  '{row['Occupation_String']}' → '{row['final_result']}'")
            logger.info(f"    (Original: {row['standardized']})")
    
    # NEW: Show unique processing efficiency
    if use_unique_processing and processor.stats['unique_records_processed'] > 0:
        total_that_would_process = len(df_cleaned[df_cleaned['confidence'] < confidence_threshold])
        actual_processed = processor.stats['unique_records_processed']
        saved_processing = total_that_would_process - actual_processed
        
        logger.info(f"\n Unique Processing Efficiency:")
        logger.info(f"  Would have processed: {total_that_would_process:,} records")
        logger.info(f"  Actually processed: {actual_processed:,} unique records")
        logger.info(f"  Processing saved: {saved_processing:,} records ({(saved_processing/total_that_would_process)*100:.1f}%)")
    
    return df_result

# Usage example
if __name__ == "__main__":
    
    df_llm_check = pd.read_csv("D:\\Postgraduate\\Data Science Project\\Code\\cleaning_results_1851_validation\\llm_check_needed.csv",header=0) 
    print(" Dataset overview:")
    print(f"Total records: {len(df_llm_check):,}")
    print(f"Low confidence records: {len(df_llm_check[df_llm_check['confidence'] < 0.8]):,}")
    print()
    
    try:
        # Enhanced single record processing with unique deduplication
        df_final = process_occupation_data_single_optimized(
            df_llm_check,
            model_name="Qwen2.5:7b-instruct",
            confidence_threshold=0.8,
            chunk_size=20000,                # Large chunks for memory efficiency
            save_interval=1000,              # Save every 1000 records
            delay=0.1,                       # Fast single processing
            output_file=".\\cleaning_results_1851_validation\\df_llm_check_single_1911.csv",
            resume_from_checkpoint=True,     # Support resumption
            use_threading=True,              # Enable multi-threading
            max_workers=6,                   # 6 concurrent workers for single requests
            use_unique_processing=True       # NEW: Enable unique processing
        )        

        # Show performance statistics
        total_corrections = len(df_final[df_final['final_result'] != df_final['standardized']])
        print(f" Total corrections made: {total_corrections:,}")
        
    except Exception as e:
        print(f" Processing failed: {e}")
        print("\n Troubleshooting:")
        print("1. Ensure Ollama is running: ollama serve")
        print("2. Ensure model is downloaded: ollama pull Qwen2.5:7b-instruct")
        print("3. Check system resources")

2025-08-20 19:57:55,081 - INFO -  Starting optimized single record Ollama processing with unique deduplication
2025-08-20 19:57:55,083 - INFO -  Input data: 189 records
2025-08-20 19:57:55,083 - INFO -  Single record mode with 6 workers
2025-08-20 19:57:55,083 - INFO -  Threading: Enabled
2025-08-20 19:57:55,084 - INFO -  Unique processing: Enabled
2025-08-20 19:57:55,084 - INFO -  Available auxiliary columns: ['County', 'Sex', 'Occode_Desc']
2025-08-20 19:57:55,085 - INFO - 🔍 Deduplication analysis:
2025-08-20 19:57:55,086 - INFO -   Total records to process: 189
2025-08-20 19:57:55,086 - INFO -   Unique occupation strings: 187
2025-08-20 19:57:55,087 - INFO -   Potential processing savings: 2 (1.1%)


 Dataset overview:
Total records: 189
Low confidence records: 189



2025-08-20 19:57:59,191 - ERROR -  Cannot connect to Ollama service: HTTPConnectionPool(host='localhost', port=11434): Max retries exceeded with url: /api/tags (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x0000024A8D095350>: Failed to establish a new connection: [WinError 10061] 由于目标计算机积极拒绝，无法连接。'))
2025-08-20 19:57:59,192 - INFO -  Optimized single processor initialized
2025-08-20 19:57:59,192 - INFO -  Max workers: 6, Delay: 0.1s
2025-08-20 19:57:59,193 - INFO -  Starting optimized single record processing with unique deduplication
2025-08-20 19:57:59,193 - INFO -  Total records: 189
2025-08-20 19:57:59,194 - INFO -  Threading: Enabled (6 workers)
2025-08-20 19:57:59,194 - INFO -  Unique processing: Enabled
2025-08-20 19:57:59,196 - INFO -  Records below confidence threshold: 189
2025-08-20 19:57:59,234 - INFO -  Unique occupation strings to process: 187
2025-08-20 19:57:59,234 - INFO -  Duplicate records saved from processing: 2
2025-08-20 19:57:59,23

 Total corrections made: 90


In [19]:
import pandas as pd
df_llm_final = pd.read_csv(".\\cleaning_results_1851_validation\\df_llm_check_single_1911.csv",header=0)
df_rule_based = pd.read_csv(".\\cleaning_results_1851_validation\\cleaned_occupations_with_categories.csv", header = 0)
# merge df_llm_final to df_rule_based，by County, Sex, Occupation_String, Occode, HISCO_x, HISCO_y
df_merged = pd.merge(
    df_rule_based,
    df_llm_final[['County', 'Sex', 'Occupation_String', 'Occode', 'HISCO_x', 'HISCO_y', 'final_result', 'standardized']],
    on=['County', 'Sex', 'Occupation_String', 'Occode', 'HISCO_x', 'HISCO_y'],
    how='left',
    suffixes=('', '_llm')
)

# if final_result is NaN，then use standardized to filled
df_merged['final_result'] = df_merged['final_result'].fillna(df_merged['standardized'])
df_merged.to_csv(".\\cleaning_results_1851_validation\\cleaned_occupation_with_llm.csv",index=False)

## Aggregate Occupation

In [20]:
import os
def aggregate_similar_occupations(df: pd.DataFrame) -> pd.DataFrame:
    """
    Aggregate occupations with same County, Sex, Occode, and standardized values
    
    Args:
        df: DataFrame with final_result occupations
    
    Returns:
        DataFrame with aggregated records
    """
    
    # Check required columns
    required_cols = ['County', 'Sex', 'Occode', 'final_result', 'Count']
    missing_cols = [col for col in required_cols if col not in df.columns]
    
    if missing_cols:
        print(f"Warning: Missing required columns for aggregation: {missing_cols}")
        return df
    
    print(f"Before aggregation: {len(df):,} records")
    
    # Define grouping columns
    group_columns = ['County', 'Sex', 'Occode', 'final_result']
    
    # Check for records that can be aggregated
    pre_agg_check = df.groupby(group_columns).size()
    aggregatable_groups = (pre_agg_check > 1).sum()
    total_aggregatable_records = pre_agg_check[pre_agg_check > 1].sum()
    
    print(f"Found {aggregatable_groups:,} groups with {total_aggregatable_records:,} records that can be aggregated")
    
    # Prepare aggregation dictionary
    agg_dict = {'Count': 'sum'}  # Sum the counts
    
    # For other columns, take the first value
    other_columns = [col for col in df.columns if col not in group_columns + ['Count']]
    for col in other_columns:
        agg_dict[col] = 'first'
    
    # Perform aggregation
    try:
        df_aggregated = df.groupby(group_columns, as_index=False).agg(agg_dict)
        
        # Sort by Count descending to put most frequent occupations first
        df_aggregated = df_aggregated.sort_values(['County', 'Count'], ascending=[True, False])
        
        # Reset index
        df_aggregated = df_aggregated.reset_index(drop=True)
        
        # Calculate aggregation statistics
        original_count = len(df)
        aggregated_count = len(df_aggregated)
        reduction_rate = (original_count - aggregated_count) / original_count
        
        print(f"After aggregation: {aggregated_count:,} records")
        print(f"Reduction: {original_count - aggregated_count:,} records ({reduction_rate:.1%})")

        
        return df_aggregated
        
    except Exception as e:
        print(f"Error during aggregation: {e}")
        return df


df_aggregated = aggregate_similar_occupations(df_merged)

agg_output = os.path.join("./cleaning_results_1851_validation", "aggregated_occupations.csv")
df_aggregated.to_csv(agg_output, index=False)

Before aggregation: 1,000 records
Found 4 groups with 8 records that can be aggregated
After aggregation: 996 records
Reduction: 4 records (0.4%)


## Multiple Occupation

In [21]:
import pandas as pd
import numpy as np
import re
from typing import List, Dict, Tuple, Optional

class MultiOccupationProcessor:
    
    def __init__(self):
        # connection patterns
        self.connection_patterns = [
            r'\s+and\s+',           # and
            r'\s+&\s+',             # &
            r'\s+or\s+',            # or
            r'\s*,\s*',             # comma
            r'\s*/\s*',             # slash
        ]
        
        # non_occupation_terms
        self.non_occupation_terms = {
            'pauper', 'unemployed', 'invalid', 'disabled', 'retired', 
            'deceased', 'widow', 'widower', 'single', 'married',
            'at home', 'home', 'at school', 'school'
        }
        
        # modifier terms
        self.modifier_terms = {
            'former', 'ex', 'retired', 'late', 'widow of', 'wife of',
            'son of', 'daughter of', 'employing', 'employed by'
        }
    
    def split_occupation_string(self, occupation_string: str) -> List[str]:
        """Split occupation string into single occupations"""
        if pd.isna(occupation_string):
            return []
        
        text = str(occupation_string).lower().strip()
        
        # split by connection patterns
        parts = [text]
        for pattern in self.connection_patterns:
            new_parts = []
            for part in parts:
                new_parts.extend(re.split(pattern, part))
            parts = new_parts
        
        # clean parts
        cleaned_parts = []
        for part in parts:
            part = part.strip()
            if part and part not in self.non_occupation_terms:
                # remove modifier terms
                for modifier in self.modifier_terms:
                    part = re.sub(rf'\b{re.escape(modifier)}\b', '', part)
                
                part = re.sub(r'\s+', ' ', part).strip()
                
                if part and len(part) > 2:  
                    cleaned_parts.append(part)
        
        return cleaned_parts
    
    def create_single_occupation_dataset(self, df: pd.DataFrame) -> pd.DataFrame:
        """Create single occupation dataset for OccCANINE prediction"""
        print("\n CREATING SINGLE OCCUPATION DATASET")
        print("=" * 50)
        
        single_occupation_records = []
        
        for idx, row in df.iterrows():
            occupation_parts = self.split_occupation_string(row['Occupation_String'])
            
            for i, part in enumerate(occupation_parts):
                single_record = {
                    'original_index': idx,
                    'original_occupation_string': row['Occupation_String'],
                    'split_index': i,
                    'single_occupation': part,
                    'County': row.get('County', ''),
                    'Sex': row.get('Sex', ''),
                    'Year': row.get('Year', ''),
                    'Count': row.get('Count', 1),
                    'original_hisco': row.get('HISCO_y', ''),
                    'category': row.get('category', ''),
                    'confidence': row.get('confidence', 0)
                }
                single_occupation_records.append(single_record)
        
        single_df = pd.DataFrame(single_occupation_records)
        
        print(f" Single occupation dataset created:")
        print(f"   Original multi-occupation records: {len(df):,}")
        print(f"   Total single occupations extracted: {len(single_df):,}")
        print(f"   Average occupations per record: {len(single_df)/len(df):.1f}")
        
        # Show sample splitting results
        print(f"\n🔍 Sample splitting results:")
        sample_original = df.head(3)
        for i, (_, row) in enumerate(sample_original.iterrows(), 1):
            original_idx = row.name
            splits = single_df[single_df['original_index'] == original_idx]['single_occupation'].tolist()
            print(f"   {i}. '{row['Occupation_String']}'")
            print(f"      → {splits}")
        
        return single_df
    
    def prepare_for_occanine(self, single_df: pd.DataFrame, 
                           enhanced_input: bool = True) -> pd.DataFrame:
        """Prepare input data for OccCANINE prediction"""
        print("\n PREPARING FOR OCCANINE PREDICTION")
        print("=" * 50)
        
        def create_input_string(row):
            """Create OccCANINE input string"""
            if enhanced_input:
                # Enhanced format: Sex + Occupation + in + County
                sex = str(row['Sex']).title() if pd.notna(row['Sex']) else ""
                occupation = str(row['single_occupation']).lower()
                county = str(row['County']).title() if pd.notna(row['County']) else ""
                
                parts = []
                if sex:
                    parts.append(sex)
                if occupation:
                    parts.append(occupation)
                
                base_string = " ".join(parts)
                
                if county:
                    return f"{base_string} in {county}"
                else:
                    return base_string
            else:
                # Basic format: occupation only
                return str(row['single_occupation']).lower()
        
        # Create input strings
        single_df['occanine_input'] = single_df.apply(create_input_string, axis=1)
        
        # Create CSV file for OccCANINE
        occanine_input = pd.DataFrame({
            'record_id': range(len(single_df)),
            'original_index': single_df['original_index'],
            'split_index': single_df['split_index'],
            'input_text': single_df['occanine_input'],
            'original_occupation': single_df['single_occupation'],
            'county': single_df['County'],
            'sex': single_df['Sex']
        })
        
        print(f" OccCANINE input prepared:")
        print(f"   Records for prediction: {len(occanine_input):,}")
        print(f"   Enhanced input format: {enhanced_input}")
        
        # Show sample inputs
        print(f"\n Sample inputs for OccCANINE:")
        for i, (_, row) in enumerate(occanine_input.head(3).iterrows(), 1):
            print(f"   {i}. {row['input_text']}")
        
        return occanine_input

    def merge_predictions_back(self, original_df: pd.DataFrame, 
                             single_predictions: pd.DataFrame) -> pd.DataFrame:
        """Merge single occupation prediction results back to original data"""
        print("\n MERGING PREDICTIONS BACK TO ORIGINAL DATA")
        print("=" * 50)
        
        # Add new columns to original data
        result_df = original_df.copy()
        result_df['all_predicted_hisco'] = None
        result_df['prediction_count'] = 0
        
        # Group prediction results by original index
        prediction_groups = single_predictions.groupby('original_index')
        
        processed_count = 0
        
        for original_idx, group in prediction_groups:
            if original_idx not in result_df.index:
                continue
            
            # Filter valid predictions (only need predicted_hisco not null)
            valid_predictions = group[
                group['predicted_hisco'].notna()
            ].copy()
            
            if len(valid_predictions) == 0:
                result_df.loc[original_idx, 'all_predicted_hisco'] = 'no_valid_predictions'
                result_df.loc[original_idx, 'prediction_count'] = 0
                continue
            
            # Collect all prediction results
            all_hisco_codes = []
            all_predictions_detail = []
            
            for _, pred_row in valid_predictions.iterrows():
                hisco_code = pred_row['predicted_hisco']
                all_hisco_codes.append(str(hisco_code))
                
                # Save detailed information (including original occupation name if available)
                detail = {
                    'hisco': hisco_code,
                    'split_index': pred_row.get('split_index', ''),
                    'occupation': pred_row.get('original_occupation', '')
                }
                all_predictions_detail.append(detail)
            
            # Concatenate all HISCO codes into string (semicolon separated)
            result_df.loc[original_idx, 'all_predicted_hisco'] = '; '.join(all_hisco_codes)
            result_df.loc[original_idx, 'prediction_count'] = len(all_hisco_codes)
            result_df.loc[original_idx, 'prediction_details'] = str(all_predictions_detail)
            
            processed_count += 1
        
        print(f" Merging results:")
        print(f"   Original multi-occupation records: {len(prediction_groups)}")
        print(f"   Successfully processed: {processed_count}")
        
        # Statistics of merging results
        successful_merges = result_df['all_predicted_hisco'].notna().sum()
        records_with_predictions = (result_df['all_predicted_hisco'] != 'no_valid_predictions').sum()
        print(f"   Records with merged predictions: {records_with_predictions}")
        
        # Statistics of prediction count distribution
        prediction_counts = result_df['prediction_count'].value_counts().sort_index()
        print(f"\n Prediction count distribution:")
        for count, freq in prediction_counts.head(10).items():
            print(f"   {count} predictions: {freq} records")
        
        if processed_count > 0:
            print(f"\n Sample merged results:")
            sample_merged = result_df[
                (result_df['all_predicted_hisco'].notna()) & 
                (result_df['all_predicted_hisco'] != 'no_valid_predictions')
            ].head(3)
            
            for i, (_, row) in enumerate(sample_merged.iterrows(), 1):
                print(f"   {i}. '{row['Occupation_String']}'")
                print(f"      → All HISCO codes: {row['all_predicted_hisco']}")
                print(f"      → Total predictions: {row['prediction_count']}")
        
        return result_df
    
def process_multi_occupations_complete_workflow(df: pd.DataFrame,
                                              output_dir: str = "./cleaning_results_1861",
                                              enhanced_input: bool = True,
                                              ) -> Tuple[pd.DataFrame, pd.DataFrame, Dict]:
    """
    Complete multi-occupation processing workflow
    
    Args:
        df: Original data DataFrame
        output_dir: Output directory
        enhanced_input: Whether to use enhanced input format
    
    Returns:
        tuple: (merged data, single occupation prediction data, evaluation results)
    """
    
    import os
    os.makedirs(output_dir, exist_ok=True)
    
    print(" COMPLETE MULTI-OCCUPATION PROCESSING WORKFLOW")
    print("="*60)
    
    # Initialize processor
    processor = MultiOccupationProcessor()
    
    # Create single occupation dataset
    single_occupation_df = processor.create_single_occupation_dataset(df)
    
    # Step 3: Prepare OccCANINE input
    occanine_input_df = processor.prepare_for_occanine(single_occupation_df, enhanced_input)
    
    # Save OccCANINE input file
    input_file_path = os.path.join(output_dir, "multi2single_occupations_for_occanine.csv")
    occanine_input_df.to_csv(input_file_path, index=False)
    
    return occanine_input_df, {}

def merge_occanine_predictions(original_df: pd.DataFrame,
                             prediction_file: str,
                             ) -> Tuple[pd.DataFrame, Dict]:
    """
    Merge OccCANINE prediction results to original data
    
    Args:
        original_df: Original data (multi-occupation identified)
        prediction_file: OccCANINE prediction result file path
    
    Returns:
        tuple: (merged data, evaluation results)
    """
    print(" MERGING OCCANINE PREDICTIONS")
    print("=" * 40)
    
    # Load prediction results
    try:
        predictions_df = pd.read_csv(prediction_file)
        print(f" Loaded predictions from: {prediction_file}")
        print(f"   Prediction records: {len(predictions_df):,}")
    except Exception as e:
        print(f" Error loading predictions: {e}")
        return original_df, {}
    
    # Initialize processor
    processor = MultiOccupationProcessor()
    
    # Merge prediction results
    merged_df = processor.merge_predictions_back(original_df, predictions_df)
    
    return merged_df


if __name__ == "__main__":

    df = pd.read_csv("D:\\Postgraduate\\Data Science Project\\Code\\cleaning_results_1851_validation\\multi_occupation_flagged.csv",header=0)

    single_occs, _ = process_multi_occupations_complete_workflow(df)
    
    single_occs_array = single_occs['input_text']
    from histocc import OccCANINE

    model = OccCANINE()
    model.verbose = True
    x = model.predict(
        single_occs_array,
        get_dict = True,
        lang = "en"
    )
    
    single_occs['predicted_hisco'] = x

    single_occs.to_csv(".\\cleaning_results_1851_validation\\multi_OccCANINE_predictions.csv",index=False)

    merged_df = merge_occanine_predictions(original_df=df,
                                                       prediction_file='./cleaning_results_1851_validation/multi_OccCANINE_predictions.csv')
    merged_df.to_csv("D:\\Postgraduate\\Data Science Project\\Code\\cleaning_results_1851_validation\\OccCANINE_result_multiple.csv",index=False)

 COMPLETE MULTI-OCCUPATION PROCESSING WORKFLOW

 CREATING SINGLE OCCUPATION DATASET
 Single occupation dataset created:
   Original multi-occupation records: 30
   Total single occupations extracted: 60
   Average occupations per record: 2.0

🔍 Sample splitting results:
   1. 'brace bit and gunblet mak'
      → ['brace bit', 'gunblet mak']
   2. 'teacher classical mathematical and commercial'
      → ['teacher classical mathematical', 'commercial']
   3. 'groser and weaver wife'
      → ['groser', 'weaver wife']

 PREPARING FOR OCCANINE PREDICTION
 OccCANINE input prepared:
   Records for prediction: 60
   Enhanced input format: True

 Sample inputs for OccCANINE:
   1. Male brace bit in Yorkshire West Riding
   2. Male gunblet mak in Yorkshire West Riding
   3. Male teacher classical mathematical in Yorkshire West Riding
Processed batch 1 out of 1 batchesProduced HISCO codes for 60 observations in 0 hours, 0 minutes and 0.875 seconds.
Estimated hours saved compared to human labeller (

## Create InputText with context

In [22]:
import pandas as pd
import numpy as np
import os

def create_enhanced_input_strings(df: pd.DataFrame) -> pd.Series:
    """
    Create enhanced input strings in format: "Sex final_result in County"
    
    Args:
        df: DataFrame with columns: Sex, final_result, Occode_Desc, County
    
    Returns:
        pd.Series: Enhanced input strings
    """
    
    print(f" CREATING ENHANCED INPUT STRINGS")
    print("=" * 50)
    print("Format: 'Sex final_result in County'")
    
    def format_enhanced_string(row):
        """
        Create enhanced string: "Sex final_result in County"
        Example: "Male agricultural labourer (AGRICULTURAL LABOURER, FARM SERVANT) in ANGLESEY"
        """
        try:
            # Get components
            sex = str(row['Sex']).title() if pd.notna(row['Sex']) else ""
            occupation = str(row['final_result']).lower() if pd.notna(row['final_result']) else ""
            county = str(row['County']).title() if pd.notna(row['County']) else ""

            
            # Build the enhanced string
            result_parts = []
            
            # Add sex
            if sex:
                result_parts.append(sex)
            
            result_parts.append(f"{occupation}")

            # Join parts so far
            base_string = " ".join(result_parts)
            
            # Add location with "in"
            if county:
                if base_string:
                    return f"{base_string} in {county}"
                else:
                    return f"in {county}"
            else:
                return base_string
                
        except Exception as e:
            print(f"Error formatting row: {e}")
            # Fallback to original occupation
            return str(row.get('final_result', ''))
    
    # Apply formatting to all rows
    enhanced_strings = df.apply(format_enhanced_string, axis=1)
    
    # Print statistics
    print(f"\n Enhancement Statistics:")
    print(f"   Total strings created: {len(enhanced_strings):,}")
    print(f"   Average length: {enhanced_strings.str.len().mean():.1f} characters")
    print(f"   Max length: {enhanced_strings.str.len().max()} characters")
    print(f"   Min length: {enhanced_strings.str.len().min()} characters")
    
    # Count how many strings have each component
    has_sex = enhanced_strings.str.contains(r'^(Male|Female)', case=False, na=False).sum()
    has_description = enhanced_strings.str.contains(r'\([^)]+\)', na=False).sum()
    has_location = enhanced_strings.str.contains(r' in \w+', case=False, na=False).sum()
    
    print(f"\n Component Coverage:")
    print(f"   With gender: {has_sex:,} ({has_sex/len(enhanced_strings)*100:.1f}%)")
    print(f"   With description: {has_description:,} ({has_description/len(enhanced_strings)*100:.1f}%)")
    print(f"   With location: {has_location:,} ({has_location/len(enhanced_strings)*100:.1f}%)")
    
    # Show sample results
    print(f"\n Sample Enhanced Strings:")
    for i in range(min(5, len(enhanced_strings))):
        original = df.iloc[i]['final_result'] if 'final_result' in df.columns else 'N/A'
        enhanced = enhanced_strings.iloc[i]
        print(f"   {i+1}. Original: {original}")
        print(f"      Enhanced: {enhanced}")
        print()
    
    return enhanced_strings

def save_enhanced_inputs_for_occanine(df: pd.DataFrame, 
                                    enhanced_strings: pd.Series,
                                    output_dir: str = "./cleaning_results_1861") -> str:
    """
    Save enhanced input strings for OccCANINE testing
    
    Args:
        df: Original DataFrame
        enhanced_strings: Enhanced input strings
        output_dir: Output directory
    
    Returns:
        str: Path to saved file
    """
    
    os.makedirs(output_dir, exist_ok=True)
    
    print(f"\n SAVING ENHANCED INPUTS FOR OCCANINE")
    print("=" * 50)
    
    # Create output DataFrame for OccCANINE
    occanine_input = pd.DataFrame({
        'record_id': range(len(df)),
        'original_hisco': df.get('HISCO_x', ''),
        'enhanced_input': enhanced_strings,
        'original_occupation': df.get('final_result', ''),
        'sex': df.get('Sex', ''),
        'county': df.get('County', ''),
        'count': df.get('Count', 1)
    })
    
    # Save enhanced input file
    enhanced_file_path = os.path.join(output_dir, "enhanced_input_for_occanine.csv")
    occanine_input.to_csv(enhanced_file_path, index=False)
    
    print(f" Enhanced inputs saved: {enhanced_file_path}")
    print(f"    Records: {len(occanine_input):,}")
    print(f"    Ready for OccCANINE processing")
    
    # Also save just the enhanced strings array (like your original final_result_array)
    enhanced_array_path = os.path.join(output_dir, "enhanced_input_array.npy")
    np.save(enhanced_array_path, enhanced_strings.to_numpy())
    
    print(f" Enhanced array saved: {enhanced_array_path}")
    print(f"    Array shape: {enhanced_strings.to_numpy().shape}")
    print(f"    Load with: np.load('{enhanced_array_path}')")
    
    return enhanced_file_path

def analyze_enhancement_impact(df: pd.DataFrame, enhanced_strings: pd.Series):
    """
    Analyze the impact of the enhancement on input strings
    """
    
    print(f"\n ENHANCEMENT IMPACT ANALYSIS")
    print("=" * 40)
    
    original_strings = df['final_result'].astype(str)
    
    # Length comparison
    orig_avg_length = original_strings.str.len().mean()
    enhanced_avg_length = enhanced_strings.str.len().mean()
    length_increase = enhanced_avg_length - orig_avg_length
    
    print(f"   Length Analysis:")
    print(f"   Original average length: {orig_avg_length:.1f} characters")
    print(f"   Enhanced average length: {enhanced_avg_length:.1f} characters")
    print(f"   Average increase: {length_increase:.1f} characters ({length_increase/orig_avg_length*100:.1f}%)")
    
    # Information density analysis
    print(f"\n Information Density:")
    
    # Count unique information elements
    unique_occupations = df['final_result'].nunique()
    unique_sexes = df['Sex'].nunique() if 'Sex' in df.columns else 0
    unique_counties = df['County'].nunique() if 'County' in df.columns else 0
    unique_descriptions = df['Occode_Desc'].nunique() if 'Occode_Desc' in df.columns else 0
    
    print(f"   Unique occupations: {unique_occupations:,}")
    print(f"   Unique genders: {unique_sexes}")
    print(f"   Unique counties: {unique_counties}")
    print(f"   Unique descriptions: {unique_descriptions:,}")
    
    # Context richness score
    context_elements_per_record = 1  # Always have occupation
    if 'Sex' in df.columns:
        context_elements_per_record += (df['Sex'].notna()).mean()
    if 'County' in df.columns:
        context_elements_per_record += (df['County'].notna()).mean()
    if 'Occode_Desc' in df.columns:
        context_elements_per_record += (df['Occode_Desc'].notna()).mean()
    
    print(f"   Average context elements per record: {context_elements_per_record:.2f}")
    
    # Estimate improvement potential
    print(f"\n Expected HISCO Prediction Improvement:")
    
    if context_elements_per_record >= 3.5:
        expected_improvement = "20-30%"
        confidence = "High"
    elif context_elements_per_record >= 3.0:
        expected_improvement = "15-25%"
        confidence = "High"
    elif context_elements_per_record >= 2.5:
        expected_improvement = "10-18%"
        confidence = "Medium-High"
    elif context_elements_per_record >= 2.0:
        expected_improvement = "5-12%"
        confidence = "Medium"
    else:
        expected_improvement = "2-8%"
        confidence = "Low-Medium"
    
    print(f"   Expected accuracy improvement: {expected_improvement}")
    print(f"   Confidence level: {confidence}")
    
    # Show most informative examples
    print(f"\n Most Information-Rich Examples:")
    # Find records with all components
    complete_records = df[
        df['Sex'].notna() & 
        df['County'].notna() & 
        df['Occode_Desc'].notna() &
        df['final_result'].notna()
    ]
    
    if len(complete_records) > 0:
        for i in range(min(3, len(complete_records))):
            idx = complete_records.index[i]
            print(f"   {i+1}. {enhanced_strings.iloc[idx]}")
    
    return {
        'original_avg_length': orig_avg_length,
        'enhanced_avg_length': enhanced_avg_length,
        'length_increase_pct': length_increase/orig_avg_length*100,
        'context_elements': context_elements_per_record,
        'expected_improvement': expected_improvement,
        'confidence': confidence
    }

def enhance_occupation_inputs(df_predict: pd.DataFrame, 
                            output_dir: str = "./cleaning_results_1881") -> pd.Series:
    """
    Main function to create enhanced input strings for OccCANINE
    Format: "Sex final_result (Occode_Desc) in County"
    
    Args:
        df_predict: DataFrame with your aggregated occupation data
        output_dir: Directory to save output files
    
    Returns:
        pd.Series: Enhanced input strings (equivalent to your final_result_array)
    """
    
    print(" ENHANCING OCCUPATION INPUTS FOR OCCANINE")
    print("="*60)
    
    # Validate required columns
    required_cols = ['final_result']
    missing_cols = [col for col in required_cols if col not in df_predict.columns]
    
    if missing_cols:
        raise ValueError(f"Missing required columns: {missing_cols}")
    
    # Check optional columns
    optional_cols = ['Sex', 'County', 'HISCO_x']
    available_cols = [col for col in optional_cols if col in df_predict.columns]
    
    print(f" Required columns found: {required_cols}")
    print(f" Available optional columns: {available_cols}")
    
    if not available_cols:
        print("  Warning: No optional columns found. Enhancement will be limited.")
    
    # Create enhanced input strings
    enhanced_strings = create_enhanced_input_strings(df_predict)
    
    # Analyze enhancement impact
    impact_analysis = analyze_enhancement_impact(df_predict, enhanced_strings)
    
    # Save for OccCANINE
    output_file = save_enhanced_inputs_for_occanine(df_predict, enhanced_strings, output_dir)
    
    print(f"\n ENHANCEMENT COMPLETE!")
    print(f" Files saved in: {output_dir}")
    print(f" Ready for OccCANINE testing")
    print(f" Expected improvement: {impact_analysis['expected_improvement']}")
    
    # Return the enhanced strings array (like your original final_result_array)
    return enhanced_strings

if __name__ == "__main__":

    df_predict = pd.read_csv(".\\cleaning_results_1851_validation\\aggregated_occupations.csv", header=0)
    enhanced_input_array = enhance_occupation_inputs(df_predict, output_dir = "./cleaning_results_1851_validation")

 ENHANCING OCCUPATION INPUTS FOR OCCANINE
 Required columns found: ['final_result']
 Available optional columns: ['Sex', 'County', 'HISCO_x']
 CREATING ENHANCED INPUT STRINGS
Format: 'Sex final_result in County'

 Enhancement Statistics:
   Total strings created: 996
   Average length: 39.5 characters
   Max length: 100 characters
   Min length: 18 characters

 Component Coverage:
   With gender: 976 (98.0%)
   With description: 0 (0.0%)
   With location: 996 (100.0%)

 Sample Enhanced Strings:
   1. Original: dressmaker
      Enhanced: Male dressmaker in Anglesey

   2. Original: retired draper
      Enhanced: Male retired draper in Anglesey

   3. Original: clogger barber
      Enhanced: Male clogger barber in Anglesey

   4. Original: charwoman
      Enhanced: Female charwoman in Bedfordshire

   5. Original: journeyman shoe maker
      Enhanced: Male journeyman shoe maker in Bedfordshire


 ENHANCEMENT IMPACT ANALYSIS
   Length Analysis:
   Original average length: 18.8 characters


  has_sex = enhanced_strings.str.contains(r'^(Male|Female)', case=False, na=False).sum()


## OccCANINE predict HISCO

In [2]:
from histocc import OccCANINE
import numpy as np
import pandas as pd
model = OccCANINE()
model_new = OccCANINE("Finetuned/finetune_model",hf = False)
enhanced_input_array = np.load("D:\\Postgraduate\\Data Science Project\\Code\\cleaning_results_1851_validation\\enhanced_input_array.npy",allow_pickle=True)

df_predict = pd.read_csv(".\\cleaning_results_1851_validation\\aggregated_occupations.csv", header=0)


raw_input_array = df_predict["Occupation_String"]
standardized_input_array = df_predict["final_result"]


# no finetune model
raw_input = model.predict(
    raw_input_array,
    get_dict = True,
    lang = "en"
)

standardized_input = model.predict(
    standardized_input_array,
    get_dict = True,
    lang = "en"
)

context_input = model.predict(
    enhanced_input_array,
    get_dict = True,
    lang = "en"
)

# Finetuned model
model_new.verbose = True
# context input
context_input_finetune = model_new.predict(
    enhanced_input_array,
    get_dict = True,
    lang = "en"
)

df_predict["OccCANINE_raw_input"] = raw_input
df_predict["OccCANINE_standardized_input"] = standardized_input
df_predict["OccCANINE_context_input"] = context_input
df_predict["OccCANINE_finetuned_context_input"] = context_input_finetune
import numpy as np

finetune_series = pd.Series(context_input_finetune)
context_series = pd.Series(context_input)
raw_series = pd.Series(raw_input)

# find []
def is_invalid(val):
    if val is None:
        return True
    if isinstance(val, float) and np.isnan(val):
        return True
    if isinstance(val, str) and (val.strip() == "" or val.strip() == "[]"):
        return True
    if isinstance(val, (list, np.ndarray)) and len(val) == 0:
        return True
    return False

# use context_input fill in finetuned
final_result = finetune_series.copy()
mask_invalid = finetune_series.apply(is_invalid)
final_result[mask_invalid] = context_series[mask_invalid]

# rest part use raw_input to fill nan
mask_still_invalid = final_result.apply(is_invalid)
final_result[mask_still_invalid] = raw_series[mask_still_invalid]

df_predict["OccCANINE_final"] = final_result

df_predict.to_csv(".\\cleaning_results_1851_validation\\OccCANINE_finetune_result.csv",index=False)

merged_df = pd.read_csv("D:\\Postgraduate\\Data Science Project\\Code\\cleaning_results_1851_validation\\OccCANINE_result_multiple.csv",header=0)
multi_df = merged_df[['Occupation_String', 'all_predicted_hisco', 'prediction_count', 'prediction_details']]
all_result_merge = pd.merge(df_predict,multi_df,how='left',on='Occupation_String')
all_result_merge.to_csv(".\\cleaning_results_1851_validation\\OccCANINE_result_merged.csv",index=False)

  from .autonotebook import tqdm as notebook_tqdm
  warn(


Processed batch 4 out of 4 batchesProduced HISCO codes for 996 observations in 0 hours, 0 minutes and 1.331 seconds.
Estimated hours saved compared to human labeller (assuming 10 seconds per label):
 ---> 2 hours, 45 minutes and 59 seconds


In [5]:
import pandas as pd

df = pd.read_csv(".\\cleaning_results_1911\\OccCANINE_result_merged.csv", header=0,encoding="latin1")
df_val = df.sample(n=100, random_state=42)

df_val.to_csv(".\\1911_validation_sample_auto.csv")

