In [None]:
import pandas as pd
from typing import List, Dict, Any

def parse_addresses(entities: List[Dict[str, Any]]) -> pd.DataFrame:
    """
    Parse NER entities into structured address rows following hierarchy:
    unit_id -> number_filter -> building_name -> street_number -> street_name -> postcode -> city
    
    Lower hierarchy entities inherit context from higher hierarchy entities.
    """
    
    # Define hierarchy levels (lower number = higher in hierarchy, more general)
    hierarchy = {
        'city': 0,
        'postcode': 1,
        'street_name': 2,
        'street_number': 3,
        'building_name': 4,
        'number_filter': 5,
        'unit_id': 6,
        'unit_type': 7  # Special case - travels backwards
    }
    
    # Sort entities by start position
    sorted_entities = sorted(entities, key=lambda x: x['start'])
    
    # Track current context at each hierarchy level
    current_context = {}
    address_rows = []
    
    def find_backwards_unit_type(unit_entity, all_entities):
        """Find unit_type that comes before the given unit_id"""
        unit_start = unit_entity['start']
        
        for entity in reversed(all_entities):
            if entity['start'] < unit_start and entity['type'] == 'unit_type':
                return entity
        return None
    
    def create_row(primary_entity, context):
        """Create a row with primary entity and inherited context"""
        row = {}
        
        # Add the primary entity
        if primary_entity['type'] == 'unit_id':
            row['unit_id'] = primary_entity['text'].strip()
            # Look backwards for unit_type
            unit_type = find_backwards_unit_type(primary_entity, sorted_entities)
            if unit_type:
                row['unit_type'] = unit_type['text'].strip()
        else:
            row[primary_entity['type']] = primary_entity['text'].strip()
        
        # Inherit all higher-level context (lower hierarchy numbers)
        primary_level = hierarchy[primary_entity['type']]
        for entity_type, level in hierarchy.items():
            if level < primary_level and entity_type in context:
                row[entity_type] = context[entity_type]['text'].strip()
        
        return row
    
    def update_context_from_ahead(start_index):
        """Look ahead and update context with upcoming entities"""
        temp_context = current_context.copy()
        
        for j in range(start_index, len(sorted_entities)):
            next_entity = sorted_entities[j]
            if next_entity['type'] not in ['unit_type']:
                temp_context[next_entity['type']] = next_entity
        
        return temp_context
    
    # Process entities
    i = 0
    while i < len(sorted_entities):
        entity = sorted_entities[i]
        entity_type = entity['type']
        
        # Skip unit_type (handled separately)
        if entity_type == 'unit_type':
            i += 1
            continue
        
        if entity_type == 'unit_id':
            # Units inherit context from higher hierarchy levels
            # Get full context including look-ahead
            full_context = update_context_from_ahead(i + 1)
            
            # Create row with inherited context
            row = create_row(entity, full_context)
            address_rows.append(row)
            i += 1
            
        elif entity_type == 'street_number':
            # Collect consecutive street numbers
            street_numbers = []
            start_idx = i
            while i < len(sorted_entities) and sorted_entities[i]['type'] == 'street_number':
                street_numbers.append(sorted_entities[i])
                i += 1
            
            # Update context with look-ahead
            full_context = update_context_from_ahead(start_idx)
            
            # Process boundary entities that come after street numbers
            boundary_found = False
            j = i
            while j < len(sorted_entities):
                next_entity = sorted_entities[j]
                if next_entity['type'] in ['street_name', 'postcode']:
                    # Update context and move main pointer
                    current_context[next_entity['type']] = next_entity
                    full_context[next_entity['type']] = next_entity
                    i = j + 1
                    boundary_found = True
                    break
                elif next_entity['type'] not in ['unit_type', 'street_number', 'unit_id']:
                    # Update context with other entities
                    current_context[next_entity['type']] = next_entity
                    full_context[next_entity['type']] = next_entity
                j += 1
            
            # Update main context with street numbers (they become available for lower hierarchy)
            for street_num in street_numbers:
                current_context['street_number'] = street_num
                
                # Create row for this street number
                row = create_row(street_num, full_context)
                address_rows.append(row)
        
        else:
            # Update context for other entity types
            current_context[entity_type] = entity
            i += 1
    
    # Handle case where we have context but no primary entities
    if not address_rows and current_context:
        row = {}
        for entity_type in ['city', 'postcode', 'street_name', 'building_name', 'number_filter']:
            if entity_type in current_context:
                row[entity_type] = current_context[entity_type]['text'].strip()
        if row:
            address_rows.append(row)
    
    # Convert to DataFrame
    if not address_rows:
        return pd.DataFrame()
    
    # Define column order
    column_order = ['unit_type', 'unit_id', 'number_filter', 'building_name', 
                   'street_number', 'street_name', 'postcode', 'city']
    
    df = pd.DataFrame(address_rows)
    
    # Reorder columns (only include columns that exist)
    existing_columns = [col for col in column_order if col in df.columns]
    df = df[existing_columns]
    
    return df

# Test functions remain the same
def test_simple_address():
    entities = [
        {'type': 'street_number', 'text': '28', 'start': 0, 'end': 2, 'confidence': 0.99999917},
        {'type': 'street_name', 'text': ' Newbridge Close', 'start': 2, 'end': 18, 'confidence': 0.99999857},
        {'type': 'city', 'text': ' Manchester', 'start': 30, 'end': 41, 'confidence': 0.99999166},
        {'type': 'postcode', 'text': 'M26 2WB', 'start': 43, 'end': 50, 'confidence': 0.99998176}
    ]
    
    result = parse_addresses(entities)
    print("Simple Address:")
    print(result)
    print()

def test_complex_address():
    entities = [
        {'type': 'street_number', 'text': '1', 'start': 0, 'end': 1, 'confidence': 0.9999207},
        {'type': 'street_number', 'text': ' 2', 'start': 2, 'end': 4, 'confidence': 0.99931705},
        {'type': 'street_number', 'text': ' 3', 'start': 5, 'end': 7, 'confidence': 0.99909544},
        {'type': 'street_name', 'text': ' Heath House Close', 'start': 60, 'end': 78, 'confidence': 0.9999873},
        {'type': 'street_number', 'text': ' 106', 'start': 91, 'end': 95, 'confidence': 0.99926156},
        {'type': 'street_number', 'text': ' 108', 'start': 99, 'end': 103, 'confidence': 0.9999858},
        {'type': 'street_name', 'text': ' Stone Cross Lane North', 'start': 103, 'end': 126, 'confidence': 0.99997276},
        {'type': 'city', 'text': ' Warrington', 'start': 135, 'end': 146, 'confidence': 0.99999464}
    ]
    
    result = parse_addresses(entities)
    print("Complex Address:")
    print(result)
    print()

def test_unit_address():
    entities = [
        {'type': 'unit_type', 'text': 'Flat', 'start': 0, 'end': 4, 'confidence': 0.999},
        {'type': 'unit_id', 'text': ' 1A', 'start': 4, 'end': 7, 'confidence': 0.999},
        {'type': 'unit_type', 'text': ' Flat', 'start': 8, 'end': 12, 'confidence': 0.999},
        {'type': 'unit_id', 'text': ' 1B', 'start': 12, 'end': 15, 'confidence': 0.999},
        {'type': 'street_number', 'text': ' 25', 'start': 16, 'end': 19, 'confidence': 0.999},
        {'type': 'street_name', 'text': ' Oak Street', 'start': 19, 'end': 30, 'confidence': 0.999},
        {'type': 'city', 'text': ' London', 'start': 31, 'end': 38, 'confidence': 0.999}
    ]
    
    result = parse_addresses(entities)
    print("Unit Address:")
    print(result)
    print()

if __name__ == "__main__":
    test_simple_address()
    test_complex_address()
    test_unit_address()

Simple Address:
  street_number      street_name postcode        city
0            28  Newbridge Close  M26 2WB  Manchester

Complex Address:
  street_number             street_name        city
0             1       Heath House Close  Warrington
1             2       Heath House Close  Warrington
2             3       Heath House Close  Warrington
3           106  Stone Cross Lane North  Warrington
4           108  Stone Cross Lane North  Warrington

Unit Address:
  unit_type unit_id street_number street_name    city
0      Flat      1A            25  Oak Street  London
1      Flat      1B            25  Oak Street  London
2       NaN     NaN            25  Oak Street  London



In [None]:

test_simple_address()
test_complex_address()
test_unit_address()

Simple Address:
  street_number      street_name postcode        city
0            28  Newbridge Close  M26 2WB  Manchester

Complex Address:
  street_number             street_name        city
0             1       Heath House Close  Warrington
1             2       Heath House Close  Warrington
2             3       Heath House Close  Warrington
3           106  Stone Cross Lane North  Warrington
4           108  Stone Cross Lane North  Warrington

Unit Address:
  unit_type unit_id street_number street_name    city
0      Flat      1A            25  Oak Street  London
1      Flat      1B            25  Oak Street  London
2       NaN     NaN            25  Oak Street  London



In [21]:
import pandas as pd
from typing import List, Dict, Any
import multiprocessing as mp
from functools import partial
import numpy as np

# Define the desired column order at module level so it's consistent everywhere
DESIRED_COLUMN_ORDER = [
    'unit_type', 'unit_id', 'number_filter', 'building_name', 
    'street_number', 'street_name', 'postcode', 'city',
    'datapoint_id',  'original_address'
]

def parse_single_address(address_data: Dict[str, Any]) -> pd.DataFrame:
    """
    Parse a single address from the provided data structure.
    
    Args:
        address_data: Dictionary containing 'datapoint_id', 'entities', 'original_address', etc.
    
    Returns:
        DataFrame with parsed address rows plus metadata columns
    """
    
    # Define hierarchy levels (lower number = higher in hierarchy, more general)
    hierarchy = {
        'city': 0,
        'postcode': 1,
        'street_name': 2,
        'street_number': 3,
        'building_name': 4,
        'number_filter': 5,
        'unit_id': 6,
        'unit_type': 7  # Special case - travels backwards
    }
    
    entities = address_data['entities']
    
    # Handle empty entities
    if not entities:
        # Return empty row with metadata - ensure all columns are present
        empty_row = {col: [None] for col in DESIRED_COLUMN_ORDER}
        empty_row['datapoint_id'] = [address_data['datapoint_id']]
        empty_row['row_index'] = [address_data['row_index']]
        empty_row['original_address'] = [address_data['original_address']]
        return pd.DataFrame(empty_row)[DESIRED_COLUMN_ORDER]
    
    # Sort entities by start position
    sorted_entities = sorted(entities, key=lambda x: x['start'])
    
    # Track current context at each hierarchy level
    current_context = {}
    address_rows = []
    
    def find_backwards_unit_type(unit_entity, all_entities):
        """Find unit_type that comes before the given unit_id"""
        unit_start = unit_entity['start']
        
        for entity in reversed(all_entities):
            if entity['start'] < unit_start and entity['type'] == 'unit_type':
                return entity
        return None
    
    def create_row(primary_entity, context):
        """Create a row with primary entity and inherited context"""
        row = {}
        
        # Add the primary entity
        if primary_entity['type'] == 'unit_id':
            row['unit_id'] = primary_entity['text'].strip()
            # Look backwards for unit_type
            unit_type = find_backwards_unit_type(primary_entity, sorted_entities)
            if unit_type:
                row['unit_type'] = unit_type['text'].strip()
        else:
            row[primary_entity['type']] = primary_entity['text'].strip()
        
        # Inherit all higher-level context (lower hierarchy numbers)
        primary_level = hierarchy[primary_entity['type']]
        for entity_type, level in hierarchy.items():
            if level < primary_level and entity_type in context:
                row[entity_type] = context[entity_type]['text'].strip()
        
        return row
    
    def update_context_from_ahead(start_index):
        """Look ahead and update context with upcoming entities"""
        temp_context = current_context.copy()
        
        for j in range(start_index, len(sorted_entities)):
            next_entity = sorted_entities[j]
            if next_entity['type'] not in ['unit_type']:
                temp_context[next_entity['type']] = next_entity
        
        return temp_context
    
    # Process entities
    i = 0
    while i < len(sorted_entities):
        entity = sorted_entities[i]
        entity_type = entity['type']
        
        # Skip unit_type (handled separately)
        if entity_type == 'unit_type':
            i += 1
            continue
        
        if entity_type == 'unit_id':
            # Units inherit context from higher hierarchy levels
            # Get full context including look-ahead
            full_context = update_context_from_ahead(i + 1)
            
            # Create row with inherited context
            row = create_row(entity, full_context)
            address_rows.append(row)
            i += 1
            
        elif entity_type == 'street_number':
            # Collect consecutive street numbers
            street_numbers = []
            start_idx = i
            while i < len(sorted_entities) and sorted_entities[i]['type'] == 'street_number':
                street_numbers.append(sorted_entities[i])
                i += 1
            
            # Update context with look-ahead
            full_context = update_context_from_ahead(start_idx)
            
            # Process boundary entities that come after street numbers
            j = i
            while j < len(sorted_entities):
                next_entity = sorted_entities[j]
                if next_entity['type'] in ['street_name', 'postcode']:
                    # Update context and move main pointer
                    current_context[next_entity['type']] = next_entity
                    full_context[next_entity['type']] = next_entity
                    i = j + 1
                    break
                elif next_entity['type'] not in ['unit_type', 'street_number', 'unit_id']:
                    # Update context with other entities
                    current_context[next_entity['type']] = next_entity
                    full_context[next_entity['type']] = next_entity
                j += 1
            
            # Update main context with street numbers (they become available for lower hierarchy)
            for street_num in street_numbers:
                current_context['street_number'] = street_num
                
                # Create row for this street number
                row = create_row(street_num, full_context)
                address_rows.append(row)
        
        else:
            # Update context for other entity types
            current_context[entity_type] = entity
            i += 1
    
    # Handle case where we have context but no primary entities (street_number/unit_id)
    if not address_rows and current_context:
        row = {}
        for entity_type in ['city', 'postcode', 'street_name', 'building_name', 'number_filter']:
            if entity_type in current_context:
                row[entity_type] = current_context[entity_type]['text'].strip()
        if row:
            address_rows.append(row)
    
    # Convert to DataFrame
    if not address_rows:
        # Create empty row if no address components found
        address_rows = [{}]
    
    df = pd.DataFrame(address_rows)
    
    # Add metadata columns
    df['datapoint_id'] = address_data['datapoint_id']
    df['row_index'] = address_data['row_index'] 
    df['original_address'] = address_data['original_address']
    
    # Ensure all columns exist (fill missing with None)
    for col in DESIRED_COLUMN_ORDER:
        if col not in df.columns:
            df[col] = None
    
    # Return with correct column order
    return df[DESIRED_COLUMN_ORDER]

def process_address_batch(address_data_list: List[Dict[str, Any]]) -> pd.DataFrame:
    """
    Process a batch of addresses.
    
    Args:
        address_data_list: List of address data dictionaries
    
    Returns:
        Combined DataFrame with all parsed addresses
    """
    results = []
    
    for address_data in address_data_list:
        try:
            df = parse_single_address(address_data)
            results.append(df)
        except Exception as e:
            # Create error row with metadata - ensure all columns are present
            error_row = {col: [None] for col in DESIRED_COLUMN_ORDER}
            error_row.update({
                'datapoint_id': [address_data['datapoint_id']],
                'row_index': [address_data['row_index']],
                'original_address': [address_data['original_address']],
            })
            error_df = pd.DataFrame(error_row)[DESIRED_COLUMN_ORDER]
            error_df['error'] = str(e)  # Add error column at the end
            results.append(error_df)
    
    if results:
        # Concatenate and ensure column order is preserved
        combined_df = pd.concat(results, ignore_index=True)
        
        # Ensure the final result has the correct column order
        final_columns = [col for col in DESIRED_COLUMN_ORDER if col in combined_df.columns]
        
        # Add any extra columns (like 'error') at the end
        extra_columns = [col for col in combined_df.columns if col not in DESIRED_COLUMN_ORDER]
        final_columns.extend(extra_columns)
        
        return combined_df[final_columns]
    else:
        return pd.DataFrame(columns=DESIRED_COLUMN_ORDER)

def parse_all_addresses(address_data_list: List[Dict[str, Any]], 
                       use_multiprocessing: bool = True, 
                       n_cores: int = None,
                       chunk_size: int = 1000,
                       show_progress: bool = True) -> pd.DataFrame:
    """
    Parse all addresses with optional multiprocessing.
    
    Args:
        address_data_list: List of address data dictionaries
        use_multiprocessing: Whether to use multiple cores
        n_cores: Number of cores to use (None = auto-detect)
        chunk_size: Size of batches for processing
        show_progress: Whether to print progress updates
    
    Returns:
        Combined DataFrame with all parsed addresses
    """
    
    if not address_data_list:
        return pd.DataFrame(columns=DESIRED_COLUMN_ORDER)
    
    total_addresses = len(address_data_list)
    
    if show_progress:
        print(f"Processing {total_addresses:,} addresses...")
    
    if not use_multiprocessing or total_addresses < chunk_size:
        # Single-threaded processing
        if show_progress:
            print("Using single-threaded processing...")
        return process_address_batch(address_data_list)
    
    # Multi-threaded processing
    if n_cores is None:
        n_cores = mp.cpu_count()
    
    if show_progress:
        print(f"Using multiprocessing with {n_cores} cores...")
    
    # Split into chunks
    chunks = [address_data_list[i:i+chunk_size] 
              for i in range(0, len(address_data_list), chunk_size)]
    
    if show_progress:
        print(f"Split into {len(chunks)} chunks of ~{chunk_size} addresses each")
    
    # Process chunks in parallel
    with mp.Pool(processes=n_cores) as pool:
        if show_progress:
            print("Processing chunks...")
        
        results = pool.map(process_address_batch, chunks)
    
    # Combine all results
    if show_progress:
        print("Combining results...")
    
    # Filter out empty results and concatenate
    non_empty_results = [r for r in results if not r.empty]
    
    if non_empty_results:
        final_result = pd.concat(non_empty_results, ignore_index=True)
        
        final_columns = [col for col in DESIRED_COLUMN_ORDER if col in final_result.columns]
        extra_columns = [col for col in final_result.columns if col not in DESIRED_COLUMN_ORDER]
        final_columns.extend(extra_columns)
        
        final_result = final_result[final_columns]
    else:
        final_result = pd.DataFrame(columns=DESIRED_COLUMN_ORDER)
    
    if show_progress:
        print(f"✅ Completed! Processed {len(final_result):,} address rows from {total_addresses:,} original addresses")
    
    return final_result

In [22]:
sample_data = [
        {'row_index': 0,
         'datapoint_id': 0,
         'original_address': 'Westleigh Lodge Care Home, Nel Pan Lane, Leigh (WN7 5JT)',
         'entities': [{'type': 'building_name',
           'text': 'Westleigh Lodge Care Home',
           'start': 0,
           'end': 25,
           'confidence': 0.99999654},
          {'type': 'street_name',
           'text': ' Nel Pan Lane',
           'start': 26,
           'end': 39,
           'confidence': 0.9999978},
          {'type': 'city',
           'text': ' Leigh',
           'start': 40,
           'end': 46,
           'confidence': 0.99988425},
          {'type': 'postcode',
           'text': 'WN7 5JT',
           'start': 48,
           'end': 55,
           'confidence': 0.9999841}]},
        
        {'row_index': 1,
         'datapoint_id': 1,
         'original_address': 'Flat 1, 1a Canal Street, Manchester (M1 3HE)',
         'entities': [{'type': 'unit_type',
           'text': 'Flat',
           'start': 0,
           'end': 4,
           'confidence': 0.9999956},
          {'type': 'unit_id',
           'text': ' 1',
           'start': 4,
           'end': 6,
           'confidence': 0.9999999},
          {'type': 'street_name',
           'text': ' Canal Street',
           'start': 10,
           'end': 23,
           'confidence': 0.9999965},
          {'type': 'city',
           'text': ' Manchester',
           'start': 24,
           'end': 35,
           'confidence': 0.9999862},
          {'type': 'postcode',
           'text': 'M1 3HE',
           'start': 37,
           'end': 43,
           'confidence': 0.9999883}]},
           
                   {'row_index': 47,
         'datapoint_id': 47,
         'original_address': '3a, 5a and 8a Hesketh Drive, Standish, Wigan (WN6 0SF)',
         'entities': [{'type': 'street_number',
           'text': '3a',
           'start': 0,
           'end': 2,
           'confidence': 0.99989045},
          {'type': 'street_number',
           'text': ' 5a',
           'start': 3,
           'end': 6,
           'confidence': 0.9999326},
          {'type': 'street_number',
           'text': ' 8a',
           'start': 10,
           'end': 13,
           'confidence': 0.9999838},
          {'type': 'street_name',
           'text': ' Hesketh Drive',
           'start': 13,
           'end': 27,
           'confidence': 0.9999949},
          {'type': 'city',
           'text': ' Wigan',
           'start': 38,
           'end': 44,
           'confidence': 0.99999774},
          {'type': 'postcode',
           'text': 'WN6 0SF',
           'start': 46,
           'end': 53,
           'confidence': 0.9999707}]}
    ]
    
# Test single-threaded
print("=== Single-threaded Results ===")
result = parse_all_addresses(sample_data, use_multiprocessing=False)
result
    


=== Single-threaded Results ===
Processing 3 addresses...
Using single-threaded processing...


Unnamed: 0,unit_type,unit_id,number_filter,building_name,street_number,street_name,postcode,city,datapoint_id,original_address
0,,,,Westleigh Lodge Care Home,,Nel Pan Lane,WN7 5JT,Leigh,0,"Westleigh Lodge Care Home, Nel Pan Lane, Leigh..."
1,Flat,1.0,,,,Canal Street,M1 3HE,Manchester,1,"Flat 1, 1a Canal Street, Manchester (M1 3HE)"
2,,,,,3a,Hesketh Drive,WN6 0SF,Wigan,47,"3a, 5a and 8a Hesketh Drive, Standish, Wigan (..."
3,,,,,5a,Hesketh Drive,WN6 0SF,Wigan,47,"3a, 5a and 8a Hesketh Drive, Standish, Wigan (..."
4,,,,,8a,Hesketh Drive,WN6 0SF,Wigan,47,"3a, 5a and 8a Hesketh Drive, Standish, Wigan (..."


In [23]:
# Test multi-threaded (will fall back to single-threaded for small data)
print("=== Multi-threaded Results ===")
result_mt = parse_all_addresses(sample_data, use_multiprocessing=True, chunk_size=1)
result_mt

=== Multi-threaded Results ===
Processing 3 addresses...
Using multiprocessing with 4 cores...
Split into 3 chunks of ~1 addresses each
Processing chunks...
Combining results...
✅ Completed! Processed 5 address rows from 3 original addresses


Unnamed: 0,unit_type,unit_id,number_filter,building_name,street_number,street_name,postcode,city,datapoint_id,original_address
0,,,,Westleigh Lodge Care Home,,Nel Pan Lane,WN7 5JT,Leigh,0,"Westleigh Lodge Care Home, Nel Pan Lane, Leigh..."
1,Flat,1.0,,,,Canal Street,M1 3HE,Manchester,1,"Flat 1, 1a Canal Street, Manchester (M1 3HE)"
2,,,,,3a,Hesketh Drive,WN6 0SF,Wigan,47,"3a, 5a and 8a Hesketh Drive, Standish, Wigan (..."
3,,,,,5a,Hesketh Drive,WN6 0SF,Wigan,47,"3a, 5a and 8a Hesketh Drive, Standish, Wigan (..."
4,,,,,8a,Hesketh Drive,WN6 0SF,Wigan,47,"3a, 5a and 8a Hesketh Drive, Standish, Wigan (..."
