In [2]:
!pip install geopy


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.0.1[0m[39;49m -> [0m[32;49m25.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3 -m pip install --upgrade pip[0m


In [None]:
import pandas as pd
from geopy.geocoders import Nominatim
from geopy.distance import geodesic
from geopy.exc import GeocoderTimedOut, GeocoderServiceError
import time
import re
import os # For checking file existence

# --- Configuration ---
# Set defaults if city/state are often missing or known
DEFAULT_CITY = None # Example: "Washington" or None if usually present
DEFAULT_STATE = None # Example: "DC" or None
DEFAULT_COUNTRY = "USA"
DEFAULT_COUNTRY_CODE = "us" # 2-letter ISO code for country_codes

# --- Address Cleaning Function ---
def clean_and_format_address(address_str, city=None, state=None, country=None):
    """
    Cleans the address string and attempts to format it completely.
    Tries to remove leading names/non-address info.
    Removes common suite/unit patterns from the end.
    Appends default city/state/country if provided and seemingly missing.
    """
    if not isinstance(address_str, str) or not address_str.strip():
        return None # Handle empty or non-string data

    cleaned = address_str.strip()

    # --- Attempt to remove leading non-address info ---
    parts = cleaned.split()
    address_start_index = 0
    found_number = False
    for i, part in enumerate(parts):
        if part.isdigit() or (len(part) > 0 and part[0].isdigit()):
             # Basic check: consider this the start if it's a number
             address_start_index = i
             found_number = True
             break

    # If we found a likely street number and it's not the first word, strip leading parts
    if found_number and address_start_index > 0:
        cleaned = " ".join(parts[address_start_index:])
        # print(f"      -> Stripped leading info, starting address at: '{cleaned}'") # Debug
        
    elif not found_number:
        # Handle cases like "PO BOX 123" or intersections if needed later
        if "PO BOX" not in cleaned.upper() and " & " not in cleaned:
            # print(f"      -> Warning: No leading number found in address: {cleaned}") # Debug
            pass

    # --- Remove trailing suite/unit numbers ---
    cleaned = re.sub(r'[ ,]+\b(?:STE|SUITE|APT|UNIT|#)\s*[A-Z0-9-]+$', '', cleaned, flags=re.IGNORECASE).strip()
    cleaned = cleaned.rstrip(', ')

    # --- Add City/State/Country if missing (using defaults) ---
    full_address = cleaned
    has_state_zip_pattern = re.search(r',\s*([A-Z]{2}|\d{5}(-\d{4})?)\s*$', full_address, re.IGNORECASE)

    if city and state and not has_state_zip_pattern:
        # Avoid adding if city/state seems present as whole words
        in_str = False
        temp_lower = full_address.lower()
        if re.search(r'\b' + re.escape(city.lower()) + r'\b', temp_lower): in_str = True
        if not in_str and re.search(r'\b' + re.escape(state.lower()) + r'\b', temp_lower): in_str = True

        if not in_str:
            full_address = f"{cleaned}, {city}, {state}"
            # print(f"      -> Appended default city/state: '{full_address}'") # Debug

    # Append country if specified and not already present
    if country and country.upper() not in full_address.upper() and not re.search(r'\bUSA\b', full_address, re.IGNORECASE):
         if has_state_zip_pattern or (state and state.upper() in full_address.upper()):
              full_address = f"{full_address}, {country}"
              # print(f"      -> Appended default country: '{full_address}'") # Debug

    # Final cleanup
    full_address = re.sub(r', ,', ',', full_address)
    full_address = re.sub(r'\s+', ' ', full_address).strip()

    return full_address if full_address else None

# --- Geocoding Function ---
def geocode_address(geolocator, address_cleaned, attempt=1, max_attempts=3):
    """Geocode a *cleaned* address string with retry logic."""
    if not address_cleaned:
        return (None, None)
    try:
        # We assume 'address_cleaned' is already processed before calling this function
        location = geolocator.geocode(address_cleaned, timeout=10, country_codes=DEFAULT_COUNTRY_CODE)
        if location:
            return (location.latitude, location.longitude)
        else:
            print(f"  -> Geocoder returned None for: {address_cleaned}")
            return (None, None)
    except GeocoderTimedOut:
        if attempt <= max_attempts:
            print(f"Timeout geocoding '{address_cleaned}', retrying ({attempt}/{max_attempts})...")
            time.sleep(2**attempt)
            return geocode_address(geolocator, address_cleaned, attempt + 1) # Pass cleaned address in retry
        else:
            print(f"Failed to geocode '{address_cleaned}' after {max_attempts} attempts (Timeout).")
            return (None, None)
    except GeocoderServiceError as e:
        print(f"Geocoding service error for '{address_cleaned}': {e}")
        return (None, None)
    except Exception as e:
        print(f"An unexpected error occurred geocoding '{address_cleaned}': {e}")
        return (None, None)

# --- 1. Preprocessing Addresses ---
def preprocess_addresses(input_path, output_path):
    """Reads addresses, cleans them, geocodes, and saves to output."""
    try:
        # Try reading CSV first, then Excel
        if input_path.lower().endswith('.csv'):
            df = pd.read_csv(input_path)
            df.rename(columns = {'Shipping Address': 'Address'}, inplace=True)
        elif input_path.lower().endswith(('.xlsx', '.xls')):
             df = pd.read_csv(input_path)
        else:
             print(f"Error: Input file must be a CSV or Excel file (.csv, .xlsx, .xls). Path: {input_path}")
             return
    except FileNotFoundError:
        print(f"Error: Input file not found at {input_path}")
        return
    except Exception as e:
        print(f"Error reading input file: {e}")
        return

    # --- IMPORTANT: Define your address column name ---
    address_col = 'Address' # CHANGE THIS if your column name is different
    # Optional: Define city/state/zip columns if they exist
    city_col = None # e.g., 'City' or None
    state_col = None # e.g., 'State' or None
    # zip_col = None # e.g., 'ZipCode' or None
    # ---

    if address_col not in df.columns:
        print(f"Error: Input file must contain an '{address_col}' column.")
        return

    # Use a unique user_agent for Nominatim
    geolocator = Nominatim(user_agent="my_address_locator_app_v1") # CHANGE THIS to something unique

    latitudes = []
    longitudes = []
    processed_addresses = []

    print(f"Starting geocoding for {len(df)} addresses...")
    for index, row in df.iterrows():
        original_address = row[address_col]
        # Get city/state from columns if defined, otherwise use defaults
        current_city = row.get(city_col) if city_col and city_col in df.columns else DEFAULT_CITY
        current_state = row.get(state_col) if state_col and state_col in df.columns else DEFAULT_STATE

        print(f"\nProcessing {index + 1}/{len(df)}: {original_address}")

        # Clean and format the address
        address_to_geocode = clean_and_format_address(
            original_address, current_city, current_state, DEFAULT_COUNTRY
        )
        processed_addresses.append(address_to_geocode)

        if address_to_geocode:
            print(f"  -> Attempting to geocode: {address_to_geocode}")
            lat, lon = geocode_address(geolocator, address_to_geocode)
        else:
            print(f"  -> Skipping due to empty/invalid address after cleaning.")
            lat, lon = None, None

        latitudes.append(lat)
        longitudes.append(lon)

        time.sleep(1.1) # Respect Nominatim rate limit (1 req/sec)

    # Add new columns to the DataFrame
    # Keep original address for display, processed for debugging
    df['Original_Address'] = df[address_col]
    df['Processed_For_Geocoding'] = processed_addresses
    df['Latitude'] = latitudes
    df['Longitude'] = longitudes

    # Save results to CSV (recommended over Excel for simplicity)
    df.to_csv(output_path, index=False)
    print(f"\nGeocoding complete. Results saved to {output_path}")

    failures = df['Latitude'].isna().sum()
    print(f"Total addresses: {len(df)}, Successful: {len(df) - failures}, Failed: {failures}")

# --- 2. Find Closest Addresses ---
def find_closest(user_address_str, geocoded_data_path, num_closest=10):
    """
    Finds the N closest *distinct* addresses from the pre-geocoded file.
    Uniqueness is based on the 'Processed_For_Geocoding' column.
    """
    try:
        df_geocoded = pd.read_csv(geocoded_data_path)
        df_geocoded['Latitude'] = pd.to_numeric(df_geocoded['Latitude'], errors='coerce')
        df_geocoded['Longitude'] = pd.to_numeric(df_geocoded['Longitude'], errors='coerce')
        df_geocoded.dropna(subset=['Latitude', 'Longitude'], inplace=True)

        if df_geocoded.empty:
            print("Error: No valid geocoded addresses found in the data file.")
            return []
    except FileNotFoundError:
        print(f"Error: Geocoded data file not found at {geocoded_data_path}")
        print("Tip: Run the preprocessing step first.")
        return []
    except Exception as e:
        print(f"Error loading or processing geocoded data: {e}")
        return []

    # Geocode the user's input address
    # Ensure you have a unique user_agent string
    geolocator = Nominatim(user_agent="my_address_locator_app_v1_runtime_distinct") 
    print(f"\nGeocoding user address: {user_address_str}")

    # Assuming clean_and_format_address is defined elsewhere
    user_address_cleaned = clean_and_format_address( 
        user_address_str, DEFAULT_CITY, DEFAULT_STATE, DEFAULT_COUNTRY
    )
    if not user_address_cleaned:
         print(f"Could not clean the user address: {user_address_str}")
         return []

    print(f"  -> Cleaned user address: {user_address_cleaned}")
    # Assuming geocode_address is defined elsewhere
    user_lat, user_lon = geocode_address(geolocator, user_address_cleaned) 

    if user_lat is None or user_lon is None:
        print(f"Could not geocode the user address: '{user_address_cleaned}'")
        return []

    user_coords = (user_lat, user_lon)
    print(f"User coordinates: {user_coords}")

    # --- Calculate all distances ---
    all_distances = []
    # Determine which column to use for uniqueness check and display
    # Using 'Processed_For_Geocoding' ensures we treat identical cleaned addresses as duplicates
    # If you want uniqueness based on 'Original_Address', change this.
    unique_check_col = 'Processed_For_Geocoding' 
    
    # Fallback if the preferred column doesn't exist
    if unique_check_col not in df_geocoded.columns:
        if 'Original_Address' in df_geocoded.columns:
            unique_check_col = 'Original_Address'
            print(f"Warning: '{unique_check_col}' column not found. Using 'Original_Address' for uniqueness.")
        else:
             # As a last resort, maybe use the first column if 'Address' isn't there? Unlikely needed.
             print(f"Error: Column '{unique_check_col}' not found for uniqueness check.")
             return []

    for index, row in df_geocoded.iterrows():
        # Skip rows where the address used for uniqueness check is missing/NaN
        if pd.isna(row[unique_check_col]):
            continue
            
        list_coords = (row['Latitude'], row['Longitude'])
        address_for_uniqueness = row[unique_check_col]
        
        # Use Original_Address for display if available, otherwise use the uniqueness check column
        display_addr_col = unique_check_col
        display_addr = row[display_addr_col] if not pd.isna(row[display_addr_col]) else address_for_uniqueness

        try:
            distance_km = geodesic(user_coords, list_coords).km
            # Store distance, the address used for uniqueness check, and the address to display
            all_distances.append((distance_km, address_for_uniqueness, display_addr))
        except ValueError as e:
            print(f"Warn: Could not calculate distance for coords {list_coords} / address '{address_for_uniqueness}': {e}")
            continue

    # --- Sort by distance ---
    all_distances.sort(key=lambda x: x[0])

    # --- Filter for unique addresses ---
    unique_results = []
    seen_addresses = set() # Keep track of addresses we've already added

    for dist, unique_addr, display_addr in all_distances:
        # Convert to string just in case there are non-string types, though unlikely after cleaning
        unique_addr_str = str(unique_addr) 
        if unique_addr_str not in seen_addresses:
            unique_results.append((dist, display_addr)) # Add the display address to results
            seen_addresses.add(unique_addr_str)         # Mark this unique address as seen
            if len(unique_results) >= num_closest:      # Stop if we have enough unique results
                break

    return unique_results



In [5]:
# --- Configuration: File Paths ---
INPUT_FILE = 'CAFB_Shopping_Partners_Data.csv'  # CHANGE THIS to your input Excel or CSV file name
GECODED_FILE = 'geocoded_addresses.csv' # Output file name

preprocess_addresses(INPUT_FILE, GECODED_FILE)

Starting geocoding for 924 addresses...

Processing 1/924: gloria ward-ravenell 2263 mount view place, se washington DC 20020
  -> Attempting to geocode: 2263 mount view place, se washington DC 20020


  -> Geocoder returned None for: 2263 mount view place, se washington DC 20020

Processing 2/924: Attn: St. Martin's Social Service 1908 North Capitol Street NW Washington DC 20002
  -> Attempting to geocode: 1908 North Capitol Street NW Washington DC 20002

Processing 3/924: Attn: Bethesda Cares 5033 Wilson Lane Bethesda MD 20814
  -> Attempting to geocode: 5033 Wilson Lane Bethesda MD 20814

Processing 4/924: Attn: Bethesda Cares 5033 Wilson Lane Bethesda MD 20814
  -> Attempting to geocode: 5033 Wilson Lane Bethesda MD 20814

Processing 5/924: Attn: Bethesda Cares 5033 Wilson Lane Bethesda MD 20814
  -> Attempting to geocode: 5033 Wilson Lane Bethesda MD 20814

Processing 6/924: Attn: Bethesda Cares 5033 Wilson Lane Bethesda MD 20814
  -> Attempting to geocode: 5033 Wilson Lane Bethesda MD 20814

Processing 7/924: Attn: Bethesda Cares 5033 Wilson Lane Bethesda MD 20814
  -> Attempting to geocode: 5033 Wilson Lane Bethesda MD 20814

Processing 8/924: Attn: Brookland Senior Day Care C

In [None]:
# --- Step 2: Get User Input and Find Closest ---
if os.path.exists(GECODED_FILE): # Only proceed if preprocessing was successful or file exists
    user_input = input("\nEnter an address to find nearby locations: ")
    if user_input:
        closest_addresses = find_closest(user_input, GECODED_FILE, num_closest=10)

        if closest_addresses:
            print("\nTop 10 Closest Addresses:")
            for i, (dist, addr) in enumerate(closest_addresses):
                print(f"{i+1}. {addr} ({dist:.2f} km away)")
        else:
            print("Could not find closest addresses.")
    else:
        print("No address entered.")
else:
    print(f"\nCannot proceed: Geocoded file '{GECODED_FILE}' not found.")
    print("Please ensure the preprocessing step ran successfully and created the file.")


Geocoding user address: 5033 Wilson Lane Bethesda MD 20814
  -> Cleaned user address: 5033 Wilson Lane Bethesda MD 20814
User coordinates: (38.9896048, -77.1229785)

Top 10 Closest Addresses:
1. 5033 Wilson Lane Bethesda MD 20814 (0.00 km away)
2. 1908 North Capitol Street NW Washington DC 20002 (12.80 km away)
