In [1]:
!pip install --quiet geopy folium requests googlemaps

In [2]:
!pip install --quiet pandas numpy

In [3]:
!pip install --quiet googlemaps

In [27]:
import json
import requests
import googlemaps
import re
import time
import pandas as pd
from typing import List, Tuple, Dict, Any

# Geopy is a Python toolbox for geocoding services. 
# Geodesic calculates the distance between two points using their coordinates
from geopy.distance import geodesic
# Nominatim is a geocoder class for converting addresses into coordinates and vice versa
from geopy.geocoders import Nominatim

### Detect Address/Crossroad Using Data from OpenStreetMap

-OSM is a raw, open, editable geographical database. It's built by mappers, GIS professionals, engineers running the OSM servers.

-Bing Maps and Apple Maps rely on OSM for part of their data. 

In [5]:
# Define the function to get address using coordinates
def get_address(lat, lon):
    try:
        geolocator = Nominatim(user_agent="simple_app")
        location = geolocator.reverse(f"{lat}, {lon}")
        return location.address if location else "Address not found"
    except Exception as e:
        return f"Error: {e}"

In [6]:
# Read in coordinates and Convert 
lat = 42.31467523826113
lon = -83.66011171923385

address = get_address(lat, lon)
print(address)

3341, Beaumont Avenue, Dixboro, Superior Charter Township, Washtenaw County, Michigan, 48105, United States


In [7]:
# Define the function to get coordinates using address
def get_coordinates(address):
    try:
        geolocator = Nominatim(user_agent="simple_app")
        location = geolocator.geocode(address)
        if location:
            return location.latitude, location.longitude
        else:
            return None, None
    except Exception as e:
        return None, None

In [8]:
# Read in address and Convert 
address = get_coordinates("426 Thompson St, Ann Arbor, MI 48104")
print(address)

(42.2767469, -83.7437309)


### Detect the closest crossroad (Input: coordinates)

In [19]:
def are_same_road(name1, name2):
    """
    Determine if two road names likely refer to the same physical road
    """
    # Normalize the names - remove common suffixes and convert to lowercase
    def normalize_name(name):
        name = name.lower().strip()
        # Remove common road suffixes
        suffixes = ['street', 'st', 'road', 'rd', 'avenue', 'ave', 'boulevard', 'blvd', 
                   'drive', 'dr', 'lane', 'ln', 'way', 'circle', 'cir', 'court', 'ct',  
                    'highway', 'hwy', 'parkway', 'pkwy', 'freeway', 'fwy', 'turnpike',
                   'plaza', 'plz', 'place', 'pl', 'terrace', 'ter', 'trail', 'trl', 'path', 
                    'walk', 'row', 'square', 'sq', 'crescent', 'cres', 'close', 'grove', 'park',
            'gardens', 'gdns', 'green', 'common', 'cmn', 'mews', 'gate', 'gates']
        for suffix in suffixes:
            # Remove suffix if it's at the end (with optional 's' and word boundary)
            pattern = r'\b' + suffix + r's?\b$'
            name = re.sub(pattern, '', name).strip()
        return name
    
    # Normalize both names
    norm1 = normalize_name(name1)
    norm2 = normalize_name(name2)
    
    # Check if they're the same after normalization
    if norm1 == norm2:
        return True
    
    # Check for common variations (e.g., "east main" vs "e main")
    direction_map = {
        'north': 'n', 'south': 's', 'east': 'e', 'west': 'w',
        'northeast': 'ne', 'northwest': 'nw', 'southeast': 'se', 'southwest': 'sw'
    }
    
    # Replace full direction words with abbreviations
    for full, abbrev in direction_map.items():
        pattern1 = r'\b' + full + r'\b'
        norm1 = re.sub(pattern1, abbrev, norm1)
        norm2 = re.sub(pattern1, abbrev, norm2)
    
    return norm1 == norm2

def find_closest_main_road_and_crossroad(lat, lon, radius=500):
    """Find both the closest main road and closest main road crossroad, excluding residential roads"""
    overpass_url = "https://overpass-api.de/api/interpreter"
    
    # Define main road types (excluding residential and minor roads)
    main_road_types = [
        'motorway', 'trunk', 'primary', 'secondary', 'tertiary',
        'motorway_link', 'trunk_link', 'primary_link', 'secondary_link', 'tertiary_link'
    ]
    
    query = f"""
    [out:json][timeout:25];
    (
      way(around:{radius},{lat},{lon})[highway~"^(motorway|trunk|primary|secondary|tertiary|motorway_link|trunk_link|primary_link|secondary_link|tertiary_link)$"][name];
      way(around:{radius},{lat},{lon})[highway=residential][name];
    );
    (._;>;);
    out body;
    """
    
    try:
        response = requests.get(overpass_url, params={'data': query}, timeout=30)
        if response.status_code != 200:
            return None, f"API error: {response.status_code}"
        
        data = response.json()
        elements = data.get('elements', [])
        
        if not elements:
            return False, "No main roads found nearby"
        
        # Separate nodes and ways, and identify roads with residential segments
        nodes = {}
        roads = []
        residential_road_names = set()
        
        for element in elements:
            if element['type'] == 'node':
                nodes[element['id']] = (element['lat'], element['lon'])
            elif element['type'] == 'way':
                tags = element.get('tags', {})
                highway_type = tags.get('highway', '')
                road_name = tags.get('name', '')
                
                # Track roads that have residential segments
                if highway_type == 'residential' and road_name:
                    residential_road_names.add(road_name)
                
                # Collect main road segments
                if ('name' in tags and highway_type in main_road_types and 'nodes' in element):
                    roads.append({
                        'id': element['id'],
                        'name': road_name,
                        'type': highway_type,
                        'nodes': element['nodes']
                    })
        
        # Filter out any roads that have residential segments
        filtered_roads = [road for road in roads if road['name'] not in residential_road_names]
        
        if not filtered_roads:
            return False, "No purely main roads found nearby (excluding roads with residential segments)"
        
        # Find closest single road node to given lat/lon
        closest_road = None
        min_distance = float('inf')
        closest_coord = None
        
        for road in filtered_roads:
            for node_id in road['nodes']:
                if node_id in nodes:
                    node_coord = nodes[node_id]
                    dist = geodesic((lat, lon), node_coord).meters
                    if dist < min_distance:
                        min_distance = dist
                        closest_road = road
                        closest_coord = node_coord
        
        closest_road_result = {
            'name': closest_road['name'] if closest_road else 'Unknown',
            'type': closest_road['type'] if closest_road else 'unknown',
            'distance': round(min_distance, 1)
        }
        
        # Build a map from node_id to the set of road names that contain that node
        # Only consider filtered main roads for intersections
        node_road_map = {}
        for road in filtered_roads:
            for node_id in road['nodes']:
                node_road_map.setdefault(node_id, set()).add(road['name'])
        
        # Find nodes shared by 2 or more different main roads → main road intersections
        # But exclude cases where road names are variations of the same road
        intersections = []
        for nid, road_names in node_road_map.items():
            if len(road_names) >= 2 and nid in nodes:
                # Filter out roads that are likely the same physical road with different names
                unique_roads = []
                road_names_list = list(road_names)
                
                for i, road_name in enumerate(road_names_list):
                    is_duplicate = False
                    for j, existing_road in enumerate(unique_roads):
                        if are_same_road(road_name, existing_road):
                            is_duplicate = True
                            break
                    
                    if not is_duplicate:
                        unique_roads.append(road_name)
                
                # Only create intersection if we have 2+ truly different roads
                if len(unique_roads) >= 2:
                    node_coord = nodes[nid]
                    distance_to_search = geodesic((lat, lon), node_coord).meters
                    
                    # Collect road info for the unique roads only
                    intersection_roads = []
                    for unique_road_name in unique_roads:
                        # Find a representative road segment for this name
                        for road in filtered_roads:
                            if road['name'] == unique_road_name and nid in road['nodes']:
                                intersection_roads.append({
                                    'name': road['name'],
                                    'type': road['type']
                                })
                                break  # Take first match for each unique road name
                    
                    if len(intersection_roads) >= 2:  # Double-check we have multiple roads
                        intersections.append({
                            'lat': node_coord[0],
                            'lon': node_coord[1],
                            'roads': intersection_roads,
                            'distance': distance_to_search
                        })
        
        # Remove duplicates by road name combinations and find closest intersection
        seen_combinations = set()
        unique_intersections = []
        for intersection in intersections:
            # Create a combination key based on normalized road names
            road_names = [road['name'] for road in intersection['roads']]
            normalized_names = [road_name.lower().strip() for road_name in road_names]
            combo = tuple(sorted(normalized_names))
            if combo not in seen_combinations:
                seen_combinations.add(combo)
                unique_intersections.append(intersection)
        
        closest_crossroad_result = None
        if unique_intersections:
            closest_intersection = min(unique_intersections, key=lambda x: x['distance'])
            closest_crossroad_result = {
                'roads': closest_intersection['roads'],
                'distance': round(closest_intersection['distance'], 1)
            }
        
        return True, {
            'closest_main_road': closest_road_result,
            'closest_main_crossroad': closest_crossroad_result
        }
            
    except Exception as e:
        return None, f"Error: {str(e)}"

In [20]:
find_closest_main_road_and_crossroad(42.27695287345026, -83.74358634252938)

(True,
 {'closest_main_road': {'name': 'South Division Street',
   'type': 'secondary',
   'distance': 79.6},
  'closest_main_crossroad': {'roads': [{'name': 'East William Street',
     'type': 'tertiary'},
    {'name': 'South Division Street', 'type': 'secondary'}],
   'distance': 105.5}})

###

###

### Batch Processing

In [33]:
def are_same_road(name1, name2):
    """
    Determine if two road names likely refer to the same physical road
    """
    # Normalize the names - remove common suffixes and convert to lowercase
    def normalize_name(name):
        name = name.lower().strip()
        # Remove common road suffixes
        suffixes = ['street', 'st', 'road', 'rd', 'avenue', 'ave', 'boulevard', 'blvd', 
                   'drive', 'dr', 'lane', 'ln', 'way', 'circle', 'cir', 'court', 'ct',  
                    'highway', 'hwy', 'parkway', 'pkwy', 'freeway', 'fwy', 'turnpike',
                   'plaza', 'plz', 'place', 'pl', 'terrace', 'ter', 'trail', 'trl', 'path', 
                    'walk', 'row', 'square', 'sq', 'crescent', 'cres', 'close', 'grove', 'park',
            'gardens', 'gdns', 'green', 'common', 'cmn', 'mews', 'gate', 'gates']
        for suffix in suffixes:
            # Remove suffix if it's at the end (with optional 's' and word boundary)
            pattern = r'\b' + suffix + r's?\b$'
            name = re.sub(pattern, '', name).strip()
        return name
    
    # Normalize both names
    norm1 = normalize_name(name1)
    norm2 = normalize_name(name2)
    
    # Check if they're the same after normalization
    if norm1 == norm2:
        return True
    
    # Check for common variations (e.g., "east main" vs "e main")
    direction_map = {
        'north': 'n', 'south': 's', 'east': 'e', 'west': 'w',
        'northeast': 'ne', 'northwest': 'nw', 'southeast': 'se', 'southwest': 'sw'
    }
    
    # Replace full direction words with abbreviations
    for full, abbrev in direction_map.items():
        pattern1 = r'\b' + full + r'\b'
        norm1 = re.sub(pattern1, abbrev, norm1)
        norm2 = re.sub(pattern1, abbrev, norm2)
    
    return norm1 == norm2

def find_closest_main_road_and_crossroad(lat, lon, radius=500):
    """Find both the closest main road and closest main road crossroad, excluding residential roads"""
    overpass_url = "https://overpass-api.de/api/interpreter"
    
    # Define main road types (excluding residential and minor roads)
    main_road_types = [
        'motorway', 'trunk', 'primary', 'secondary', 'tertiary',
        'motorway_link', 'trunk_link', 'primary_link', 'secondary_link', 'tertiary_link'
    ]
    
    query = f"""
    [out:json][timeout:25];
    (
      way(around:{radius},{lat},{lon})[highway~"^(motorway|trunk|primary|secondary|tertiary|motorway_link|trunk_link|primary_link|secondary_link|tertiary_link)$"][name];
      way(around:{radius},{lat},{lon})[highway=residential][name];
    );
    (._;>;);
    out body;
    """
    
    try:
        response = requests.get(overpass_url, params={'data': query}, timeout=30)
        if response.status_code != 200:
            return None, f"API error: {response.status_code}"
        
        data = response.json()
        elements = data.get('elements', [])
        
        if not elements:
            return False, "No main roads found nearby"
        
        # Separate nodes and ways, and identify roads with residential segments
        nodes = {}
        roads = []
        residential_road_names = set()
        
        for element in elements:
            if element['type'] == 'node':
                nodes[element['id']] = (element['lat'], element['lon'])
            elif element['type'] == 'way':
                tags = element.get('tags', {})
                highway_type = tags.get('highway', '')
                road_name = tags.get('name', '')
                
                # Track roads that have residential segments
                if highway_type == 'residential' and road_name:
                    residential_road_names.add(road_name)
                
                # Collect main road segments
                if ('name' in tags and highway_type in main_road_types and 'nodes' in element):
                    roads.append({
                        'id': element['id'],
                        'name': road_name,
                        'type': highway_type,
                        'nodes': element['nodes']
                    })
        
        # Filter out any roads that have residential segments
        filtered_roads = [road for road in roads if road['name'] not in residential_road_names]
        
        if not filtered_roads:
            return False, "No purely main roads found nearby (excluding roads with residential segments)"
        
        # Find closest single road node to given lat/lon
        closest_road = None
        min_distance = float('inf')
        closest_coord = None
        
        for road in filtered_roads:
            for node_id in road['nodes']:
                if node_id in nodes:
                    node_coord = nodes[node_id]
                    dist = geodesic((lat, lon), node_coord).meters
                    if dist < min_distance:
                        min_distance = dist
                        closest_road = road
                        closest_coord = node_coord
        
        closest_road_result = {
            'name': closest_road['name'] if closest_road else 'Unknown',
            'type': closest_road['type'] if closest_road else 'unknown',
            'distance': round(min_distance, 1)
        }
        
        # Build a map from node_id to the set of road names that contain that node
        # Only consider filtered main roads for intersections
        node_road_map = {}
        for road in filtered_roads:
            for node_id in road['nodes']:
                node_road_map.setdefault(node_id, set()).add(road['name'])
        
        # Find nodes shared by 2 or more different main roads → main road intersections
        # But exclude cases where road names are variations of the same road
        intersections = []
        for nid, road_names in node_road_map.items():
            if len(road_names) >= 2 and nid in nodes:
                # Filter out roads that are likely the same physical road with different names
                unique_roads = []
                road_names_list = list(road_names)
                
                for i, road_name in enumerate(road_names_list):
                    is_duplicate = False
                    for j, existing_road in enumerate(unique_roads):
                        if are_same_road(road_name, existing_road):
                            is_duplicate = True
                            break
                    
                    if not is_duplicate:
                        unique_roads.append(road_name)
                
                # Only create intersection if we have 2+ truly different roads
                if len(unique_roads) >= 2:
                    node_coord = nodes[nid]
                    distance_to_search = geodesic((lat, lon), node_coord).meters
                    
                    # Collect road info for the unique roads only
                    intersection_roads = []
                    for unique_road_name in unique_roads:
                        # Find a representative road segment for this name
                        for road in filtered_roads:
                            if road['name'] == unique_road_name and nid in road['nodes']:
                                intersection_roads.append({
                                    'name': road['name'],
                                    'type': road['type']
                                })
                                break  # Take first match for each unique road name
                    
                    if len(intersection_roads) >= 2:  # Double-check we have multiple roads
                        intersections.append({
                            'lat': node_coord[0],
                            'lon': node_coord[1],
                            'roads': intersection_roads,
                            'distance': distance_to_search
                        })
        
        # Remove duplicates by road name combinations and find closest intersection
        seen_combinations = set()
        unique_intersections = []
        for intersection in intersections:
            # Create a combination key based on normalized road names
            road_names = [road['name'] for road in intersection['roads']]
            normalized_names = [road_name.lower().strip() for road_name in road_names]
            combo = tuple(sorted(normalized_names))
            if combo not in seen_combinations:
                seen_combinations.add(combo)
                unique_intersections.append(intersection)
        
        closest_crossroad_result = None
        if unique_intersections:
            closest_intersection = min(unique_intersections, key=lambda x: x['distance'])
            closest_crossroad_result = {
                'roads': closest_intersection['roads'],
                'distance': round(closest_intersection['distance'], 1)
            }
        
        return True, {
            'closest_main_road': closest_road_result,
            'closest_main_crossroad': closest_crossroad_result
        }
            
    except Exception as e:
        return None, f"Error: {str(e)}"

def process_coordinates_batch(coordinates_list: List[Tuple[float, float]], 
                            radius: int = 500,
                            delay_seconds: float = 0.5,
                            max_retries: int = 3,
                            verbose: bool = True) -> List[Dict[str, Any]]:
    """
    Process a batch of coordinates and return results for each
    
    Parameters:
    - coordinates_list: List of (lat, lon) tuples
    - radius: Search radius in meters
    - delay_seconds: Delay between requests to be nice to the API
    - max_retries: Number of retry attempts for failed requests
    - verbose: Whether to print progress messages
    
    Returns:
    - List of results, one for each coordinate pair
    """
    results = []
    
    if verbose:
        print(f"Processing {len(coordinates_list)} coordinates...")
    
    for i, (lat, lon) in enumerate(coordinates_list):
        if verbose:
            print(f"  {i+1}/{len(coordinates_list)}: ({lat}, {lon})")
        
        # Retry logic for failed requests
        for attempt in range(max_retries):
            try:
                success, result = find_closest_main_road_and_crossroad(lat, lon, radius)
                
                # Create standardized result format
                result_entry = {
                    'input_lat': lat,
                    'input_lon': lon,
                    'success': success,
                    'closest_main_road_name': None,
                    'closest_main_road_type': None,
                    'closest_main_road_distance': None,
                    'closest_crossroad_roads': None,
                    'closest_crossroad_distance': None,
                    'error_message': None
                }
                
                if success:
                    # Extract road info
                    road = result['closest_main_road']
                    result_entry.update({
                        'closest_main_road_name': road['name'],
                        'closest_main_road_type': road['type'],
                        'closest_main_road_distance': road['distance']
                    })
                    
                    # Extract crossroad info
                    crossroad = result['closest_main_crossroad']
                    if crossroad:
                        road_names = [r['name'] for r in crossroad['roads']]
                        result_entry.update({
                            'closest_crossroad_roads': ' & '.join(road_names),
                            'closest_crossroad_distance': crossroad['distance']
                        })
                else:
                    result_entry['error_message'] = result
                
                results.append(result_entry)
                break  # Success, no need to retry
                
            except Exception as e:
                if attempt == max_retries - 1:  # Last attempt
                    result_entry = {
                        'input_lat': lat,
                        'input_lon': lon,
                        'success': False,
                        'closest_main_road_name': None,
                        'closest_main_road_type': None,
                        'closest_main_road_distance': None,
                        'closest_crossroad_roads': None,
                        'closest_crossroad_distance': None,
                        'error_message': f"Exception after {max_retries} attempts: {str(e)}"
                    }
                    results.append(result_entry)
                else:
                    if verbose:
                        print(f"    Attempt {attempt + 1} failed, retrying...")
                    time.sleep(delay_seconds * 2)  # Longer delay on retry
        
        # Delay between requests to be respectful to the API
        if i < len(coordinates_list) - 1:  # Don't delay after the last request
            time.sleep(delay_seconds)
    
    if verbose:
        print("Processing complete!")
    
    return results

def print_batch_results(results: List[Dict[str, Any]], format_style: str = 'table'):
    """
    Print batch results in a clean, readable format
    
    Parameters:
    - results: List of result dictionaries from process_coordinates_batch
    - format_style: 'table', 'summary', or 'detailed'
    """
    if format_style == 'table':
        print("\n" + "="*80)
        print("BATCH PROCESSING RESULTS")
        print("="*80)
        print(f"{'#':<3} {'Coordinates':<20} {'Closest Road':<25} {'Intersection':<30}")
        print("-"*80)
        
        for i, result in enumerate(results, 1):
            if result['success']:
                coords = f"({result['input_lat']}, {result['input_lon']})"
                road = f"{result['closest_main_road_name']} ({result['closest_main_road_distance']}m)"
                intersection = result['closest_crossroad_roads'] or "None found"
                if result['closest_crossroad_distance']:
                    intersection += f" ({result['closest_crossroad_distance']}m)"
                
                print(f"{i:<3} {coords:<20} {road:<25} {intersection:<30}")
            else:
                coords = f"({result['input_lat']}, {result['input_lon']})"
                error = result['error_message'][:50] + "..." if len(str(result['error_message'])) > 50 else result['error_message']
                print(f"{i:<3} {coords:<20} {'ERROR':<25} {error:<30}")
        
        print("="*80)
    
    elif format_style == 'summary':
        successful = sum(1 for r in results if r['success'])
        failed = len(results) - successful
        
        print(f"\n=== BATCH SUMMARY ===")
        print(f"Total processed: {len(results)}")
        print(f"Successful: {successful}")
        print(f"Failed: {failed}")
        
        if successful > 0:
            print(f"\nSuccessful results:")
            for i, result in enumerate(results, 1):
                if result['success']:
                    road = result['closest_main_road_name']
                    distance = result['closest_main_road_distance']
                    intersection = result['closest_crossroad_roads'] or "No intersection"
                    print(f"  {i}. {road} ({distance}m) | {intersection}")
    
    elif format_style == 'detailed':
        print(f"\n=== DETAILED RESULTS ===")
        for i, result in enumerate(results, 1):
            print(f"\nLocation {i}: ({result['input_lat']}, {result['input_lon']})")
            if result['success']:
                print(f"  ✓ Closest road: {result['closest_main_road_name']} ({result['closest_main_road_type']}) - {result['closest_main_road_distance']}m")
                if result['closest_crossroad_roads']:
                    print(f"  ✓ Intersection: {result['closest_crossroad_roads']} - {result['closest_crossroad_distance']}m")
                else:
                    print(f"  ✗ No intersections found")
            else:
                print(f"  ✗ Error: {result['error_message']}")

def batch_to_dataframe(results: List[Dict[str, Any]]) -> pd.DataFrame:
    """
    Convert batch results to a pandas DataFrame for easy analysis
    """
    return pd.DataFrame(results)

def save_batch_results(results: List[Dict[str, Any]], filename: str):
    """
    Save batch results to CSV file
    """
    df = batch_to_dataframe(results)
    df.to_csv(filename, index=False)
    print(f"Results saved to {filename}")

In [35]:
# Load coordinates 
coordinates = [
    (42.27668533420888, -83.74360334545510),  # ISR
    (42.31468039565751, -83.66006151263954),  # Sunghee's
    (42.24625341661863, -83.74240414026053),  # Yao's
]

results = process_coordinates_batch(coordinates, radius=500, delay_seconds=1.0)
print_batch_results(results, format_style='table')
save_batch_results(results, 'road_analysis_results.csv')

Processing 3 coordinates...
  1/3: (42.27668533420888, -83.7436033454551)
  2/3: (42.31468039565751, -83.66006151263954)
  3/3: (42.24625341661863, -83.74240414026053)
Processing complete!

BATCH PROCESSING RESULTS
#   Coordinates          Closest Road              Intersection                  
--------------------------------------------------------------------------------
1   (42.27668533420888, -83.7436033454551) South Division Street (69.1m) East William Street & South Division Street (132.5m)
2   (42.31468039565751, -83.66006151263954) North Dixboro Road (108.2m) Plymouth Road & North Dixboro Road (324.8m)
3   (42.24625341661863, -83.74240414026053) South State Street (255.0m) South State Street & East Eisenhower Parkway (358.5m)
Results saved to road_analysis_results.csv


###

###

### Detect the closest crossroad (Input: address)

In [25]:
def find_closest_main_road_and_crossroad(address, radius=500):
    """
    Find closest main roads and crossroads by address
    
    Parameters:
    - address: String address (e.g., "123 Main St, Ann Arbor, MI")
    - radius: Search radius in meters around the address
    
    Returns:
    - (success, result) tuple
    """
    try:
        # Step 1: Convert address to coordinates
        geolocator = Nominatim(user_agent="main_roads_finder", timeout=10)
        location = geolocator.geocode(address)
        
        if not location:
            return False, f"Address '{address}' not found"
        
        lat, lon = location.latitude, location.longitude
        
        # Step 2: Find main roads (rest of your existing logic)
        overpass_url = "https://overpass-api.de/api/interpreter"
        
        main_road_types = [
            'motorway', 'trunk', 'primary', 'secondary', 'tertiary',
            'motorway_link', 'trunk_link', 'primary_link', 'secondary_link', 'tertiary_link'
        ]
        
        query = f"""
        [out:json][timeout:25];
        (
          way(around:{radius},{lat},{lon})[highway~"^(motorway|trunk|primary|secondary|tertiary|motorway_link|trunk_link|primary_link|secondary_link|tertiary_link)$"][name];
          way(around:{radius},{lat},{lon})[highway=residential][name];
        );
        (._;>;);
        out body;
        """
        
        response = requests.get(overpass_url, params={'data': query}, timeout=30)
        if response.status_code != 200:
            return None, f"API error: {response.status_code}"
        
        data = response.json()
        elements = data.get('elements', [])
        
        if not elements:
            return False, "No main roads found nearby"
        
        # Process data (same as your existing function)
        nodes = {}
        roads = []
        residential_road_names = set()
        
        for element in elements:
            if element['type'] == 'node':
                nodes[element['id']] = (element['lat'], element['lon'])
            elif element['type'] == 'way':
                tags = element.get('tags', {})
                highway_type = tags.get('highway', '')
                road_name = tags.get('name', '')
                
                if highway_type == 'residential' and road_name:
                    residential_road_names.add(road_name)
                
                if ('name' in tags and highway_type in main_road_types and 'nodes' in element):
                    roads.append({
                        'id': element['id'],
                        'name': road_name,
                        'type': highway_type,
                        'nodes': element['nodes']
                    })
        
        filtered_roads = [road for road in roads if road['name'] not in residential_road_names]
        
        if not filtered_roads:
            return False, "No purely main roads found nearby"
        
        # Find closest road
        closest_road = None
        min_distance = float('inf')
        
        for road in filtered_roads:
            for node_id in road['nodes']:
                if node_id in nodes:
                    node_coord = nodes[node_id]
                    dist = geodesic((lat, lon), node_coord).meters
                    if dist < min_distance:
                        min_distance = dist
                        closest_road = road
        
        closest_road_result = {
            'name': closest_road['name'] if closest_road else 'Unknown',
            'type': closest_road['type'] if closest_road else 'unknown',
            'distance': round(min_distance, 1)
        }
        
        # Find intersections with duplicate removal
        node_road_map = {}
        for road in filtered_roads:
            for node_id in road['nodes']:
                node_road_map.setdefault(node_id, set()).add(road['name'])
        
        intersections = []
        for nid, road_names in node_road_map.items():
            if len(road_names) >= 2 and nid in nodes:
                node_coord = nodes[nid]
                distance_to_search = geodesic((lat, lon), node_coord).meters
                
                # Collect unique roads at this intersection
                road_dict = {}  # Use dict to avoid duplicates
                for road in filtered_roads:
                    if nid in road['nodes'] and road['name'] in road_names:
                        # Keep only one entry per road name
                        if road['name'] not in road_dict:
                            road_dict[road['name']] = {
                                'name': road['name'],
                                'type': road['type']
                            }
                
                intersection_roads = list(road_dict.values())
                
                if len(intersection_roads) >= 2:
                    intersections.append({
                        'roads': intersection_roads,
                        'distance': distance_to_search
                    })
        
        closest_crossroad_result = None
        if intersections:
            closest_intersection = min(intersections, key=lambda x: x['distance'])
            closest_crossroad_result = {
                'roads': closest_intersection['roads'],
                'distance': round(closest_intersection['distance'], 1)
            }
        
        return True, {
            'closest_main_road': closest_road_result,
            'closest_main_crossroad': closest_crossroad_result
        }
            
    except Exception as e:
        return None, f"Error: {str(e)}"

In [26]:
find_closest_main_road_and_crossroad("426 Thompson St, Ann Arbor, MI 48104")

(True,
 {'closest_main_road': {'name': 'South Division Street',
   'type': 'secondary',
   'distance': 67.0},
  'closest_main_crossroad': {'roads': [{'name': 'South Division Street',
     'type': 'secondary'},
    {'name': 'East William Street', 'type': 'tertiary'}],
   'distance': 122.7}})