In [None]:
import pandas as pd
import numpy as np
import math
from datetime import datetime
import ast

def calculate_recency_scores(df):
    df_copy = df.copy()
    current_date = datetime.now()
    
    for idx, row in df_copy.iterrows():
        scores = calculate_row_recency_scores(row, current_date)
        
        for score_col, score_val in scores.items():
            df_copy.at[idx, score_col] = score_val
    
    return df_copy

def calculate_row_recency_scores(row, current_date):
    scores = {}
    
    address_indices = []
    n = 0
    max_checks = 10  
    
    while n < max_checks:
        address_col = f'Address.{n}.completeAddress'
        if address_col in row and pd.notna(row[address_col]) and str(row[address_col]).strip() != '':
            address_indices.append(n)
        n += 1
    
    if not address_indices:
        return scores
    
    all_address_types = []
    for i in address_indices:
        types = get_address_types(row, i)
        all_address_types.extend(types)
    
    unique_types_count = len(set(all_address_types)) if all_address_types else 1
    
    for i in address_indices:
        score = calculate_single_address_score(row, i, current_date, unique_types_count)
        address_text = str(row[f'Address.{i}.completeAddress']).strip()
        
        scores[f'Address.{i}_recency_score'] = round(score, 2)
        scores[f'Address.{i}_address_text'] = address_text
    
    return scores

def get_address_types(row, address_index):
  
    prefix = f'Address.{address_index}'
    types_field = f'{prefix}.addressType'
    
    if types_field not in row or pd.isna(row[types_field]):
        return []
    
    types_value = row[types_field]
    
    if isinstance(types_value, list):
        return types_value
    elif isinstance(types_value, str):
        types_value = types_value.strip()
        if types_value.startswith('[') and types_value.endswith(']'):
            try:
                return ast.literal_eval(types_value)
            except:
                return [types_value.strip("[]'\"")]
        else:
            return [types_value]
    else:
        return []

def calculate_single_address_score(row, address_index, current_date, unique_types_count):
    """
    Calculate recency score for a single address
    """
    prefix = f'Address.{address_index}'
    
    # 1. Temporal Score (40%)
    temporal_score = calculate_temporal_score(row, prefix, current_date)
    
    # 2. Frequency Score (25%)
    frequency_score = calculate_frequency_score(row, prefix)
    
    # 3. Usage Pattern Score (20%)
    pattern_score = calculate_pattern_score(row, address_index)
    
    # 4. Consistency Score (10%)
    consistency_score = calculate_consistency_score(unique_types_count)
    
    # 5. Quality Impact (5%)
    quality_score = calculate_quality_score(row, prefix)
    
    # Final weighted calculation
    final_score = (
        temporal_score * 0.40 +
        frequency_score * 0.25 +
        pattern_score * 0.20 +
        consistency_score * 0.10 +
        quality_score * 0.05
    )
    
    return final_score

def calculate_temporal_score(row, prefix, current_date):
    delivery_date_field = f'{prefix}.lastDeliveryDate'
    
    if delivery_date_field not in row or pd.isna(row[delivery_date_field]):
        return 0
    
    try:
        delivery_date = pd.to_datetime(row[delivery_date_field])
        
        if delivery_date.tz is not None:
            delivery_date = delivery_date.tz_convert('UTC').tz_localize(None)
        
        if hasattr(current_date, 'tz') and current_date.tz is not None:
            current_date = current_date.tz_localize(None)
        
        days_diff = (current_date - delivery_date).days
        
        if days_diff <= 7:
            return 100
        elif days_diff <= 90:
            return 50 + 35 * math.exp(-0.05 * (days_diff - 7))
        elif days_diff <= 365:
            return max(5, 20 - 15 * ((days_diff - 90) / 275))
        else:
            return 5
    except Exception as e:
        return 0

def calculate_frequency_score(row, prefix):
    times_seen_field = f'{prefix}.timesSeen'
    
    if times_seen_field not in row or pd.isna(row[times_seen_field]):
        times_seen = 0
    else:
        times_seen = int(row[times_seen_field])
    
    if times_seen == 1:
        return 40
    elif times_seen <= 3:
        return 40 + (times_seen - 1) * 20
    elif times_seen <= 10:
        return 80 + (times_seen - 3) * 2.4
    else:
        return min(100, 97 + (times_seen - 10) * 0.3)

def calculate_pattern_score(row, address_index):
    """
    Calculate usage pattern score based on address type
    """
    address_types = get_address_types(row, address_index)
    
    type_scores = {
        'logisticsAddress': 90,        # High recency importance - active delivery
        'transportDlAddress': 75,      # Moderate - government registered, changes occasionally  
        'taxAddress': 80,              # High - tax filing addresses change when business moves
        'businessAddress': 85,         # High - business operations, moderate change frequency
        'temporaryAddress': 95,        # Highest - by definition temporary
        'billingAddress': 70,          # Moderate - billing addresses change less frequently
        'permanentAddress': 40         # Low - permanent by definition
    }
    
    if not address_types:
        return 60  
    
    scores = [type_scores.get(addr_type, 60) for addr_type in address_types]
    return max(scores)

def calculate_consistency_score(unique_types_count):
    if unique_types_count == 1:
        return 85
    elif unique_types_count == 2:
        return 70
    elif unique_types_count == 3:
        return 55
    else:
        return 40

def calculate_quality_score(row, prefix):
    completeness_field = f'{prefix}.completeAddress.addressCompletenessScore'
    
    if completeness_field not in row or pd.isna(row[completeness_field]):
        return 40  # Default low score
    
    completeness = float(row[completeness_field])
    
    if completeness >= 80:
        return 100
    elif completeness >= 60:
        return 85
    elif completeness >= 40:
        return 70
    elif completeness >= 20:
        return 55
    else:
        return 40

def analyze_results(df_with_scores):
    score_cols = [col for col in df_with_scores.columns if col.endswith('_recency_score')]
    address_text_cols = [col for col in df_with_scores.columns if col.endswith('_address_text')]
    
    if not score_cols:
        print("No recency scores found!")
        return
    
    print("=== RECENCY SCORE ANALYSIS ===\n")
    
    all_scores = []
    for col in score_cols:
        all_scores.extend(df_with_scores[col].dropna().tolist())
    
    if all_scores:
        print(f"Total addresses scored: {len(all_scores)}")
        print(f"Average recency score: {np.mean(all_scores):.2f}")
        print(f"Score range: {min(all_scores):.2f} - {max(all_scores):.2f}")
        print(f"Standard deviation: {np.std(all_scores):.2f}\n")
        
        print("Score Distribution:")
        print(f"Excellent (90-100): {sum(1 for s in all_scores if s >= 90)} addresses")
        print(f"Good (75-89): {sum(1 for s in all_scores if 75 <= s < 90)} addresses")
        print(f"Fair (60-74): {sum(1 for s in all_scores if 60 <= s < 75)} addresses")
        print(f"Poor (40-59): {sum(1 for s in all_scores if 40 <= s < 60)} addresses")
        print(f"Very Poor (0-39): {sum(1 for s in all_scores if s < 40)} addresses")
    
    print(f"\n=== SAMPLE RESULTS ===")
    
    display_cols = ['name', 'email']
    
    # Add score and address pairs in order
    for i in range(10):  # Check up to Address.9
        score_col = f'Address.{i}_recency_score'
        addr_col = f'Address.{i}_address_text'
        if score_col in df_with_scores.columns:
            display_cols.extend([score_col, addr_col])
    
    available_cols = [col for col in display_cols if col in df_with_scores.columns]
    
    sample_df = df_with_scores[available_cols].head(10)
    
    # Display in a more readable format
    for idx, row in sample_df.iterrows():
        print(f"\n--- {row['name']} ({row['email']}) ---")
        for col in available_cols:
            if col.endswith('_recency_score') and pd.notna(row[col]):
                addr_idx = col.split('.')[1].split('_')[0]
                addr_text_col = f'Address.{addr_idx}_address_text'
                addr_text = row.get(addr_text_col, 'N/A')
                print(f"  Score: {row[col]:.2f} | Address: {str(addr_text)[:80]}...")
        
        if idx >= 4:  # Show only first 5 rows in detail
            break
    
    return df_with_scores[score_cols].describe()

# Main execution function
def run_recency_analysis(file_path):
    """
    Main function to run the complete recency analysis
    """
    try:
        # Load the data
        print("Loading data...")
        df = pd.read_csv(file_path)
        print(f"Loaded {len(df)} rows and {len(df.columns)} columns")
        
        # Check for address columns
        address_cols = [col for col in df.columns if 'Address.' in col and 'completeAddress' in col]
        print(f"Found {len(address_cols)} address columns: {address_cols[:5]}...")  # Show first 5
        
        # Calculate recency scores
        print("\nCalculating recency scores...")
        df_with_scores = calculate_recency_scores(df)
        
        # Analyze results
        stats = analyze_results(df_with_scores)
        
        return df_with_scores, stats
        
    except Exception as e:
        print(f"Error: {str(e)}")
        return None, None

# Usage example:
if __name__ == "__main__":
    # Run the analysis
    file_path = "DCB_AlternateAddress.csv"
    df_result, statistics = run_recency_analysis(file_path)
    
    if df_result is not None:
        print(f"\n=== DETAILED STATISTICS ===")
        print(statistics)

In [None]:
file_path = "DCB_AlternateAddress.csv"

df_with_scores, statistics = run_recency_analysis(file_path)

if df_with_scores is not None:
    
    print("\n=== DETAILED BREAKDOWN FOR FIRST 3 ROWS ===")
    
    score_cols = [col for col in df_with_scores.columns if col.endswith('_recency_score')]
    
    for idx in range(min(3, len(df_with_scores))):
        row = df_with_scores.iloc[idx]
        print(f"\nRow {idx + 1} - {row.get('name', 'Unknown')} ({row.get('email', 'No email')}):")
        
        for score_col in score_cols:
            if pd.notna(row[score_col]):
                addr_idx = score_col.split('.')[1].split('_')[0]
                
                # Get the address text from new column
                addr_text_col = f'Address.{addr_idx}_address_text'
                address_text = row.get(addr_text_col, 'N/A')
                
                delivery_date = row.get(f'Address.{addr_idx}.lastDeliveryDate', 'No date')
                times_seen = row.get(f'Address.{addr_idx}.timesSeen', 0)
                addr_type = row.get(f'Address.{addr_idx}.addressType', 'Unknown')
                
                print(f"  {score_col}: {row[score_col]:.2f}")
                print(f"    Address: {str(address_text)[:80]}...")
                print(f"    Last delivery: {delivery_date}")
                print(f"    Times seen: {times_seen}")
                print(f"    Type: {addr_type}")
    
    print("\n=== CLEAN SUMMARY TABLE ===")
    summary_data = []
    
    for idx, row in df_with_scores.iterrows():
        base_info = {
            'Row': idx + 1,
            'Name': row.get('name', 'Unknown'),
            'Email': row.get('email', 'No email')
        }
        
        has_addresses = False
        for score_col in score_cols:
            if pd.notna(row[score_col]):
                has_addresses = True
                addr_idx = score_col.split('.')[1].split('_')[0]
                addr_text_col = f'Address.{addr_idx}_address_text'
                
                row_data = base_info.copy()
                row_data.update({
                    'Address_Index': f'Address.{addr_idx}',
                    'Recency_Score': row[score_col],
                    'Address_Text': str(row.get(addr_text_col, 'N/A'))[:60] + '...',
                    'Last_Delivery': row.get(f'Address.{addr_idx}.lastDeliveryDate', 'No date'),
                    'Times_Seen': row.get(f'Address.{addr_idx}.timesSeen', 0)
                })
                summary_data.append(row_data)
        
        if not has_addresses:
            summary_data.append(base_info)
    
    summary_df = pd.DataFrame(summary_data)
    print(summary_df.head(20).to_string(index=False))
    
    # Save complete results to CSV
    output_file = "DCB_AlternateAddress_with_recency_scores.csv"
    df_with_scores.to_csv(output_file, index=False)
    print(f"\n✅ Complete results saved to: {output_file}")
    
    # Create and save clean final dataframe with only essential columns
    essential_cols = ['email']
    
    # Add all recency score and address text columns
    score_cols = [col for col in df_with_scores.columns if col.endswith('_recency_score')]
    address_cols = [col for col in df_with_scores.columns if col.endswith('_address_text')]
    
    # Combine in pairs (score, address) for each address index
    for i in range(10):  # Check up to Address.9
        score_col = f'Address.{i}_recency_score'
        addr_col = f'Address.{i}_address_text'
        if score_col in df_with_scores.columns:
            essential_cols.extend([score_col, addr_col])
    
    # Create clean dataframe
    df_clean = df_with_scores[essential_cols].copy()
    
    # Save clean version
    clean_output_file = "DCB_Recency_Scores_FINAL.csv"
    df_clean.to_csv(clean_output_file, index=False)
    print(f"✅ Clean final dataframe saved to: {clean_output_file}")
    
    # Show preview of clean dataframe
    print(f"\n=== CLEAN FINAL DATAFRAME PREVIEW ===")
    print(f"Columns: {list(df_clean.columns)}")
    print(f"Shape: {df_clean.shape}")
    print("\nFirst 5 rows:")
    for idx, row in df_clean.head().iterrows():
        print(f"\nRow {idx + 1}: {row['email']}")
        for col in df_clean.columns:
            if col.endswith('_recency_score') and pd.notna(row[col]):
                addr_idx = col.split('.')[1].split('_')[0]
                addr_col = f'Address.{addr_idx}_address_text'
                print(f"  Score: {row[col]:.2f} | Address: {str(row.get(addr_col, 'N/A'))[:50]}...")
    
    # Show all new columns created
    print(f"\n=== NEW COLUMNS CREATED ===")
    new_cols = [col for col in df_with_scores.columns if '_recency_score' in col or '_address_text' in col]
    
    for col in sorted(new_cols):
        if col.endswith('_recency_score'):
            non_null_count = df_with_scores[col].count()
            avg_score = df_with_scores[col].mean() if non_null_count > 0 else 0
            print(f"{col}: {non_null_count} addresses, avg score: {avg_score:.2f}")
        elif col.endswith('_address_text'):
            non_null_count = df_with_scores[col].count()
            print(f"{col}: {non_null_count} address texts")

else:
    print("❌ Analysis failed. Please check your CSV file path and format.")