In [1]:
# Cell 1: Core imports
import geopandas as gpd
import pandas as pd
import numpy as np
from shapely.geometry import Point, Polygon, MultiPolygon
from shapely.ops import unary_union
import osmnx as ox
import folium
import networkx as nx
from geopy.geocoders import Nominatim  # This was missing!
from typing import List, Dict, Union, Tuple, Optional
import warnings
import scipy
warnings.filterwarnings('ignore')

# Configure OSMnx
ox.settings.use_cache = True
ox.settings.log_console = True
ox.settings.requests_timeout = 300

# Display settings
pd.set_option('display.max_columns', None)


In [10]:
# Complete LocationAnalyzer class with ALL methods included

class LocationAnalyzer:
    """
    A tool for identifying locations that meet multiple spatial and demographic criteria.
    """
    TRAVEL_SPEEDS = {
        'walk': 3.0,    # Typical walking speed
        'bike': 12.0,   # Average cycling speed
        'drive': 30.0   # Average urban driving speed
    }

    # Buffer adjustments to account for non-straight-line travel
    BUFFER_ADJUSTMENTS = {
        'walk': 1.2,    # Walking paths deviate ~20% from straight line
        'bike': 1.3,    # Cycling paths deviate ~30%
        'drive': 1.5    # Roads deviate ~50%
    }

    def __init__(self, center_location: str, max_radius_miles: float = 50):
        """Initialize with a center point and maximum search radius."""
        print(f"Initializing analyzer centered on: {center_location}")
        
        # Geocode using OSMnx
        try:
            location_gdf = ox.geocode_to_gdf(center_location)
            self.center_point = location_gdf.geometry.iloc[0].centroid
            lat = self.center_point.y
            lon = self.center_point.x
        except:
            try:
                lat, lon = ox.geocode(center_location)
                self.center_point = Point(lon, lat)
            except:
                raise ValueError(f"Could not geocode location: {center_location}")
        
        self.center_location = center_location
        self.max_radius_miles = max_radius_miles
        self.crs = 'EPSG:4326'
        
        # Create initial search boundary
        utm_crs = self._estimate_utm_crs(lon, lat)
        center_gdf = gpd.GeoDataFrame([{'geometry': self.center_point}], crs=self.crs)
        center_projected = center_gdf.to_crs(utm_crs)
        
        # Create buffer in meters
        buffer_meters = max_radius_miles * 1609.34
        search_boundary = center_projected.buffer(buffer_meters)
        
        # Convert back to WGS84
        self.search_boundary = search_boundary.to_crs(self.crs).iloc[0]
        self.current_search_area = self.search_boundary
        
        # Storage
        self.criteria_results = []
        self.criteria_descriptions = []
        
        print(f"Search area initialized: {max_radius_miles} mile radius")
        print(f"Center coordinates: {lat:.4f}, {lon:.4f}")

    def _estimate_utm_crs(self, lon: float, lat: float) -> str:
        """Estimate appropriate UTM CRS for given coordinates."""
        utm_zone = int((lon + 180) / 6) + 1
        if lat >= 0:
            return f'EPSG:326{utm_zone:02d}'
        else:
            return f'EPSG:327{utm_zone:02d}'

    def _calculate_area_sq_miles(self, geometry) -> float:
        """Calculate area in square miles."""
        utm_crs = self._estimate_utm_crs(self.center_point.x, self.center_point.y)
        gdf = gpd.GeoDataFrame([{'geometry': geometry}], crs=self.crs)
        gdf_projected = gdf.to_crs(utm_crs)
        area_sq_m = gdf_projected.geometry.area.iloc[0]
        return area_sq_m / 2589988.11

    def add_simple_buffer_criterion(self, 
                                poi_type: Dict = None,
                                specific_location: str = None,
                                max_distance_miles: float = 1.0,
                                criterion_name: str = None) -> gpd.GeoDataFrame:
        """
        Add a simple distance buffer criterion (no travel mode consideration).
        Fast but less accurate - good for initial filtering.
        """
        print(f"\nProcessing {criterion_name} (simple buffer: {max_distance_miles} miles)...")
        
        # Get POIs
        pois = self._get_pois(poi_type, specific_location)
        if pois is None or len(pois) == 0:
            return None
        
        # Create simple buffers
        utm_crs = self._estimate_utm_crs(self.center_point.x, self.center_point.y)
        pois_projected = pois.to_crs(utm_crs)
        buffer_meters = max_distance_miles * 1609.34
        buffers = pois_projected.geometry.buffer(buffer_meters)
        
        # Combine and intersect
        combined_buffer = buffers.unary_union
        result_gdf = gpd.GeoDataFrame([{'geometry': combined_buffer}], crs=utm_crs).to_crs(self.crs)
        result_geometry = result_gdf.geometry.iloc[0].intersection(self.current_search_area)
        
        # Update state
        self.current_search_area = result_geometry
        self.criteria_results.append({
            'name': criterion_name,
            'geometry': result_geometry,
            'description': f"Within {max_distance_miles} miles (straight-line)"
        })
        
        area_sq_miles = self._calculate_area_sq_miles(result_geometry)
        print(f"✓ Applied. New search area: {area_sq_miles:.1f} sq miles")
        
        return gpd.GeoDataFrame([{'geometry': result_geometry}], crs=self.crs)

    def add_travel_time_criterion(self,
                                poi_type: Dict = None,
                                specific_location: str = None,
                                max_time_minutes: int = 10,
                                travel_mode: str = 'walk',
                                criterion_name: str = None,
                                use_network: bool = None) -> gpd.GeoDataFrame:
        """
        Add travel time criterion with proper mode-specific calculations.
        
        Parameters:
        -----------
        poi_type : dict
            OSM tags for POI type
        specific_location : str
            Specific location name/address
        max_time_minutes : int
            Maximum travel time in minutes
        travel_mode : str
            'walk', 'bike', or 'drive'
        use_network : bool
            Force network analysis (True) or simple buffer (False). 
            If None, automatically decided based on area size.
        """
        print(f"\nProcessing {criterion_name} ({travel_mode}: {max_time_minutes} min)...")
        
        # Calculate distance based on travel mode and time
        speed_mph = self.TRAVEL_SPEEDS[travel_mode]
        max_distance_miles = (max_time_minutes / 60) * speed_mph
        
        print(f"  Mode: {travel_mode} ({speed_mph} mph)")
        print(f"  Max distance: {max_distance_miles:.2f} miles in {max_time_minutes} minutes")
        
        # Decide method
        area_sq_miles = self._calculate_area_sq_miles(self.current_search_area)
        
        if use_network is None:
            use_network = area_sq_miles < 50  # Auto-decide based on area
        
        if use_network and area_sq_miles > 100:
            print("  ⚠️  Area too large for network analysis. Using simple method.")
            use_network = False
        
        if use_network:
            return self._network_travel_analysis(
                poi_type, specific_location, max_time_minutes, 
                travel_mode, criterion_name
            )
        else:
            return self._simple_travel_buffer(
                poi_type, specific_location, max_distance_miles,
                travel_mode, criterion_name
            )

    def _simple_travel_buffer(self, poi_type, specific_location, 
                            max_distance_miles, travel_mode, criterion_name):
        """Simple buffer adjusted for travel mode."""
        # Get POIs
        pois = self._get_pois(poi_type, specific_location)
        if pois is None or len(pois) == 0:
            return None
        
        # Adjust distance for realistic travel patterns
        adjustment = self.BUFFER_ADJUSTMENTS[travel_mode]
        adjusted_distance = max_distance_miles / adjustment
        
        print(f"  Using adjusted buffer: {adjusted_distance:.2f} miles (factor: {adjustment})")
        
        # Create buffers
        utm_crs = self._estimate_utm_crs(self.center_point.x, self.center_point.y)
        pois_projected = pois.to_crs(utm_crs)
        buffer_meters = adjusted_distance * 1609.34
        buffers = pois_projected.geometry.buffer(buffer_meters)
        
        # Combine and intersect
        combined_buffer = buffers.unary_union
        result_gdf = gpd.GeoDataFrame([{'geometry': combined_buffer}], crs=utm_crs).to_crs(self.crs)
        result_geometry = result_gdf.geometry.iloc[0].intersection(self.current_search_area)
        
        # Update state
        self.current_search_area = result_geometry
        self.criteria_results.append({
            'name': criterion_name,
            'geometry': result_geometry,
            'description': f"{travel_mode}: {max_distance_miles:.1f} mi / {int(max_distance_miles/self.TRAVEL_SPEEDS[travel_mode]*60)} min"
        })
        
        area_sq_miles = self._calculate_area_sq_miles(result_geometry)
        print(f"✓ Applied. New search area: {area_sq_miles:.1f} sq miles")
        
        return gpd.GeoDataFrame([{'geometry': result_geometry}], crs=self.crs)

    def _network_travel_analysis(self, poi_type, specific_location,
                                max_time_minutes, travel_mode, criterion_name):
        """Network-based travel time analysis."""
        print("  Using network analysis for accurate travel times...")
        
        # Get POIs
        pois = self._get_pois(poi_type, specific_location)
        if pois is None or len(pois) == 0:
            return None
        
        # Limit POIs for efficiency
        if len(pois) > 10 and not specific_location:
            print(f"  Limiting to 10 nearest POIs (from {len(pois)})")
            pois['distance_to_center'] = pois.geometry.distance(self.center_point)
            pois = pois.nsmallest(10, 'distance_to_center')
        
        # Get appropriate network
        print(f"  Downloading {travel_mode} network...")
        try:
            G = ox.graph_from_polygon(
                self.current_search_area,
                network_type=travel_mode if travel_mode != 'bike' else 'all',
                simplify=True
            )
            print(f"  Network: {len(G.nodes)} nodes, {len(G.edges)} edges")
        except Exception as e:
            print(f"  Network download failed: {e}")
            print("  Falling back to simple buffer method")
            max_distance_miles = (max_time_minutes / 60) * self.TRAVEL_SPEEDS[travel_mode]
            return self._simple_travel_buffer(
                poi_type, specific_location, max_distance_miles,
                travel_mode, criterion_name
            )
        
        # Add appropriate speeds
        if travel_mode == 'walk':
            # Walking speeds by path type
            walk_speeds = {
                'footway': 4.5, 'path': 4.5, 'pedestrian': 4.5,
                'residential': 4.0, 'steps': 2.0, 'track': 3.5
            }
            G = ox.add_edge_speeds(G, hwy_speeds=walk_speeds, fallback=4.5)
        elif travel_mode == 'bike':
            # Biking speeds by path type
            bike_speeds = {
                'cycleway': 15, 'residential': 15, 'path': 12,
                'track': 10, 'primary': 20, 'secondary': 18
            }
            G = ox.add_edge_speeds(G, hwy_speeds=bike_speeds, fallback=15)
        else:  # drive
            G = ox.add_edge_speeds(G)  # Use OSM speed limits
        
        G = ox.add_edge_travel_times(G)
        
        # Calculate isochrones
        isochrones = []
        for idx, poi in pois.iterrows():
            try:
                center_node = ox.nearest_nodes(
                    G, poi.geometry.centroid.x, poi.geometry.centroid.y
                )
                
                # Create isochrone
                subgraph = nx.ego_graph(
                    G, center_node,
                    radius=max_time_minutes * 60,  # seconds
                    distance='travel_time'
                )
                
                if len(subgraph.nodes) >= 3:
                    node_points = [Point((data['x'], data['y'])) 
                                for node, data in subgraph.nodes(data=True)]
                    iso_poly = gpd.GeoSeries(node_points).unary_union.convex_hull
                    isochrones.append(iso_poly)
            except:
                continue
        
        if not isochrones:
            print("  No valid isochrones. Falling back to simple method.")
            max_distance_miles = (max_time_minutes / 60) * self.TRAVEL_SPEEDS[travel_mode]
            return self._simple_travel_buffer(
                poi_type, specific_location, max_distance_miles,
                travel_mode, criterion_name
            )
        
        # Combine results
        combined_isochrone = unary_union(isochrones)
        result_geometry = combined_isochrone.intersection(self.current_search_area)
        
        # Update state
        self.current_search_area = result_geometry
        self.criteria_results.append({
            'name': criterion_name,
            'geometry': result_geometry,
            'description': f"{travel_mode}: {max_time_minutes} min (network)"
        })
        
        area_sq_miles = self._calculate_area_sq_miles(result_geometry)
        print(f"✓ Applied. New search area: {area_sq_miles:.1f} sq miles")
        
        return gpd.GeoDataFrame([{'geometry': result_geometry}], crs=self.crs)

    def _get_pois(self, poi_type: Dict = None, 
                specific_location: str = None) -> gpd.GeoDataFrame:
        """Get POIs from either type search or specific location."""
        if specific_location:
            print(f"  Geocoding: {specific_location}")
            try:
                location_gdf = ox.geocode_to_gdf(specific_location)
                location_point = location_gdf.geometry.iloc[0].centroid
            except:
                try:
                    lat, lon = ox.geocode(specific_location)
                    location_point = Point(lon, lat)
                except:
                    print(f"  ✗ Could not geocode: {specific_location}")
                    return None
            
            return gpd.GeoDataFrame(
                [{'name': specific_location, 'geometry': location_point}],
                crs=self.crs
            )
        else:
            try:
                pois = ox.features_from_polygon(
                    self.current_search_area,
                    tags=poi_type
                )
                print(f"  Found {len(pois)} POIs of type {poi_type}")
                return pois
            except Exception as e:
                print(f"  ✗ No POIs found: {e}")
                return None

    def add_socioeconomic_criterion(self,
                                variable: str,
                                operator: str,
                                threshold: float,
                                criterion_name: str) -> gpd.GeoDataFrame:
        """Add socioeconomic criterion (placeholder for census integration)."""
        print(f"\nProcessing {criterion_name} (socioeconomic)...")
        print(f"  Variable: {variable} {operator} {threshold}")
        print("  ⚠️  Census integration not implemented in this version")
        
        # For now, just store the criterion
        self.criteria_descriptions.append(f"{criterion_name}: {variable} {operator} {threshold}")
        
        return self.get_current_result()

    def get_current_result(self) -> gpd.GeoDataFrame:
        """Get the current filtered area."""
        return gpd.GeoDataFrame(
            [{'geometry': self.current_search_area,
            'criteria_count': len(self.criteria_results),
            'area_sq_miles': self._calculate_area_sq_miles(self.current_search_area)}],
            crs=self.crs
        )

    def summary(self):
        """Print analysis summary."""
        print("\n" + "="*50)
        print("LOCATION ANALYSIS SUMMARY")
        print("="*50)
        print(f"Center: {self.center_location}")
        print(f"Initial radius: {self.max_radius_miles} miles")
        
        if self.criteria_results:
            print(f"\nCriteria applied ({len(self.criteria_results)}):")
            for i, result in enumerate(self.criteria_results, 1):
                area = self._calculate_area_sq_miles(result['geometry'])
                print(f"  {i}. {result['name']}: {area:.1f} sq mi - {result['description']}")
        
        final_area = self._calculate_area_sq_miles(self.current_search_area)
        initial_area = self._calculate_area_sq_miles(self.search_boundary)
        reduction = (1 - final_area/initial_area) * 100
        
        print(f"\nResults:")
        print(f"  Initial area: {initial_area:.1f} sq miles")
        print(f"  Final area: {final_area:.1f} sq miles")
        print(f"  Reduction: {reduction:.1f}%")
        print("="*50)

    def visualize_results(self) -> folium.Map:
        """Create interactive map of results."""
        center = [self.center_point.y, self.center_point.x]
        m = folium.Map(location=center, zoom_start=11)
        
        # Add center marker
        folium.Marker(
            center,
            popup=self.center_location,
            icon=folium.Icon(color='red', icon='home')
        ).add_to(m)
        
        # Add initial boundary
        folium.GeoJson(
            gpd.GeoDataFrame([{'geometry': self.search_boundary}], crs=self.crs).to_json(),
            name='Initial Search Area',
            style_function=lambda x: {
                'fillColor': 'lightgray',
                'color': 'gray',
                'weight': 1,
                'fillOpacity': 0.1
            }
        ).add_to(m)
        
        # Add criteria layers
        colors = ['blue', 'green', 'purple', 'orange', 'darkred', 'lightblue']
        for i, result in enumerate(self.criteria_results):
            color = colors[i % len(colors)]
            folium.GeoJson(
                gpd.GeoDataFrame([{'geometry': result['geometry']}], crs=self.crs).to_json(),
                name=f"{result['name']}: {result['description']}",
                style_function=lambda x, c=color: {
                    'fillColor': c,
                    'color': c,
                    'weight': 2,
                    'fillOpacity': 0.15
                }
            ).add_to(m)
        
        # Add final result
        folium.GeoJson(
            self.get_current_result().to_json(),
            name='Final Result Area',
            style_function=lambda x: {
                'fillColor': 'darkgreen',
                'color': 'darkgreen',
                'weight': 3,
                'fillOpacity': 0.25,
                'dashArray': '5, 5'
            }
        ).add_to(m)
        
        folium.LayerControl().add_to(m)
        return m

    def export_results(self, filename_base: str = "location_analysis"):
        """Export results to multiple formats."""
        result = self.get_current_result()
        
        # Add metadata
        result['center'] = self.center_location
        result['criteria'] = ', '.join([c['name'] for c in self.criteria_results])
        result['timestamp'] = pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S')
        
        # Export
        result.to_file(f"{filename_base}.geojson", driver='GeoJSON')
        result.to_file(f"{filename_base}.gpkg", driver='GPKG')
        
        print(f"\nResults exported:")
        print(f"  - {filename_base}.geojson")
        print(f"  - {filename_base}.gpkg")
        
        return result


In [None]:
# Example demonstrating progressive filtering with all travel modes

# Step 1: Initialize with a specific location
analyzer = LocationAnalyzer(
    center_location="Downtown Durham, NC",
    max_radius_miles=25  # Start with 25 mile radius
)

# Step 2: Apply broad filters first (fast operations)
# Must be within 3 miles of any grocery store
analyzer.add_simple_buffer_criterion(
    poi_type={'shop': 'supermarket'},
    max_distance_miles=3,
    criterion_name='grocery_stores_nearby'
)

# Step 3: Add walking distance criterion
# Within 15 minute walk of a park
analyzer.add_travel_time_criterion(
    poi_type={'leisure': 'park'},
    max_time_minutes=15,
    travel_mode='walk',
    criterion_name='parks_walkable',
    use_network=False  # Use simple method for speed
)

# Step 4: Add biking distance criterion  
# Within 10 minute bike ride of a coffee shop
analyzer.add_travel_time_criterion(
    poi_type={'amenity': 'cafe'},
    max_time_minutes=10,
    travel_mode='bike',
    criterion_name='coffee_shops_bikeable',
    use_network=False
)

# Step 5: Add specific location with driving
# Within 20 minute drive of Duke University Hospital
analyzer.add_travel_time_criterion(
    specific_location="Duke University Hospital, Durham, NC",
    max_time_minutes=20,
    travel_mode='drive',
    criterion_name='duke_hospital_accessible',
    use_network=False  # Still using simple method
)

# Step 3: Add walking distance criterion
# Within 15 minute walk of a park
analyzer.add_travel_time_criterion(
    poi_type={'leisure': 'park'},
    max_time_minutes=15,
    travel_mode='walk',
    criterion_name='parks_walkable',
    use_network=True  # Use simple method for speed
)

# Step 4: Add biking distance criterion  
# Within 10 minute bike ride of a coffee shop
analyzer.add_travel_time_criterion(
    poi_type={'amenity': 'cafe'},
    max_time_minutes=10,
    travel_mode='bike',
    criterion_name='coffee_shops_bikeable',
    use_network=True
)

# Step 5: Add specific location with driving
# Within 20 minute drive of Duke University Hospital
analyzer.add_travel_time_criterion(
    specific_location="Duke University Hospital, Durham, NC",
    max_time_minutes=20,
    travel_mode='drive',
    criterion_name='duke_hospital_accessible',
    use_network=True  # Still using simple method
)

# Check progress
analyzer.summary()

# Step 6: Now area should be small enough for network analysis
# Within 5 minute walk of public transit (using accurate network analysis)
analyzer.add_travel_time_criterion(
    poi_type={'highway': 'bus_stop'},
    max_time_minutes=5,
    travel_mode='walk',
    criterion_name='transit_5min_walk',
    use_network=True  # Force network analysis for accuracy
)

# Step 7: Add another specific location with network analysis
# Within 10 minute drive of a specific school
analyzer.add_travel_time_criterion(
    specific_location="Durham School of the Arts",
    max_time_minutes=15,
    travel_mode='walk',
    criterion_name='school_10min_drive',
    use_network=True
)

# Final summary
analyzer.summary()

# Visualize results
map_viz = analyzer.visualize_results()
map_viz.save("durham_location_analysis.html")

# Export results
analyzer.export_results("durham_suitable_locations")

# Additional analysis - show what a 10 minute walk really covers

analyzer.visualize_results()

Initializing analyzer centered on: Downtown Durham, NC
Search area initialized: 25 mile radius
Center coordinates: 35.9963, -78.9031

Processing grocery_stores_nearby (simple buffer: 3 miles)...
  Found 237 POIs of type {'shop': 'supermarket'}
✓ Applied. New search area: 1044.0 sq miles

Processing parks_walkable (walk: 15 min)...
  Mode: walk (3.0 mph)
  Max distance: 0.75 miles in 15 minutes
  Found 763 POIs of type {'leisure': 'park'}
  Using adjusted buffer: 0.62 miles (factor: 1.2)
✓ Applied. New search area: 472.7 sq miles

Processing coffee_shops_bikeable (bike: 10 min)...
  Mode: bike (12.0 mph)
  Max distance: 2.00 miles in 10 minutes
  Found 167 POIs of type {'amenity': 'cafe'}
  Using adjusted buffer: 1.54 miles (factor: 1.3)
✓ Applied. New search area: 276.5 sq miles

Processing duke_hospital_accessible (drive: 20 min)...
  Mode: drive (30.0 mph)
  Max distance: 10.00 miles in 20 minutes
  Geocoding: Duke University Hospital, Durham, NC
  Using adjusted buffer: 6.67 miles (

In [19]:
import streamlit as st
import streamlit_folium as st_folium

st.title("Location Analyzer")
location = st.text_input("Choose a city or location", "Durham, NC")
radius = st.slider("Search radius (miles)", 1, 25, 10)

if st.button("Analyze"):
    #analyzer = LocationAnalyzer(location, radius)
    # ... add criteria
    m = analyzer.visualize_results()
    st_folium.folium_static(m)


2025-07-01 17:27:25.973 
  command:

    streamlit run C:\Users\arikt\AppData\Roaming\Python\Python313\site-packages\ipykernel_launcher.py [ARGUMENTS]
2025-07-01 17:27:25.977 Session state does not function when running a script without `streamlit run`


In [20]:

import sys
from pathlib import Path

if __name__ == "__main__":
    # Remove the CWD from sys.path while we load stuff.
    # This is added back by InteractiveShellApp.init_path()
    if sys.path[0] == "" or Path(sys.path[0]) == Path.cwd():
        del sys.path[0]

    from ipykernel import kernelapp as app

    app.launch_new_instance()

AssertionError: init_sockets cannot be called twice!

In [18]:
pip install streamlit_folium

Collecting streamlit_folium
  Downloading streamlit_folium-0.25.0-py3-none-any.whl.metadata (621 bytes)
Downloading streamlit_folium-0.25.0-py3-none-any.whl (328 kB)
Installing collected packages: streamlit_folium
Successfully installed streamlit_folium-0.25.0
Note: you may need to restart the kernel to use updated packages.


In [7]:
# Cell 3: Example with progressive filtering
# Initialize with a specific address or general location
analyzer = LocationAnalyzer(
    center_location="Chapel Hill, NC",
    max_radius_miles=5  # Start with 20 mile radius
)

# Step 1: Apply simple filters first (fast)
# Within 2 miles of a grocery store
analyzer.add_simple_distance_criterion(
    poi_type={'shop': 'supermarket'},
    max_distance_miles=2,
    criterion_name='grocery_nearby'
)

# Step 2: Within 1 mile of a park
analyzer.add_simple_distance_criterion(
    poi_type={'leisure': 'park'},
    max_distance_miles=1,
    criterion_name='park_nearby'
)

# Step 3: Within 3 miles of a school
analyzer.add_simple_distance_criterion(
    poi_type={'amenity': 'school'},
    max_distance_miles=3,
    criterion_name='school_nearby'
)

analyzer.add_specific_location_criterion(
    location_name="Ephesus Park, Chapel Hill, NC",
    max_distance_miles=2,
    criterion_name='ephesus_park'
)

# Check current area before expensive operations
analyzer.summary()

# Step 4: Now apply network analysis on the reduced area
# Only if area is small enough
current_result = analyzer.get_current_result()
area_sq_miles = analyzer._calculate_area_sq_miles(current_result.geometry.iloc[0])

if area_sq_miles < 50:  # Only do network analysis if area < 50 sq miles
    analyzer.add_network_distance_criterion(
       specific_location="Ephesus Park, Chapel Hill, NC",
        max_minutes=5,
        criterion_name='ephesus_5min_walk',
        network_type='bike'
    )
else:
    print("\nArea still too large for network analysis. Add more simple filters first.")

if area_sq_miles < 50:  # Only do network analysis if area < 50 sq miles
    analyzer.add_network_distance_criterion(
        poi_type={'amenity': 'cafe'},
        max_minutes=5,
        criterion_name='cafe_5min_drive',
        network_type='bike'
    )
else:
    print("\nArea still too large for network analysis. Add more simple filters first.")

# Visualize results
analyzer.visualize_progressive_results()


Initializing analyzer centered on: Chapel Hill, NC
Search area initialized: 5 mile radius
Center coordinates: 35.9271, -79.0392


AttributeError: 'LocationAnalyzer' object has no attribute 'add_simple_distance_criterion'

In [None]:
# Cell 2: Complete LocationAnalyzer class with all methods properly inside
import networkx as nx

class LocationAnalyzer:
    """
    A tool for identifying locations that meet multiple spatial and demographic criteria.
    """
    
    def __init__(self, study_area: Union[str, gpd.GeoDataFrame], 
                 crs: str = 'EPSG:4326'):
        """
        Initialize the analyzer with a study area.
        """
        self.crs = crs
        
        # Set up study area
        if isinstance(study_area, str):
            self.study_area_name = study_area
            self.study_area = ox.geocode_to_gdf(study_area)
            self.study_area = self.study_area.to_crs(crs)
        else:
            self.study_area = study_area.to_crs(crs)
            self.study_area_name = "Custom Area"
            
        # Get the bounding box
        self.bbox = self.study_area.total_bounds
        
        # Initialize results storage
        self.criteria_polygons = {}
        self.criteria_descriptions = {}
        
    def add_driving_distance_criterion(self, 
                                     poi_type: Union[str, Dict], 
                                     max_minutes: int,
                                     criterion_name: str,
                                     specific_location: str = None) -> gpd.GeoDataFrame:
        """
        Add a driving distance criterion.
        """
        print(f"Processing driving criterion: {criterion_name}")
        
        # Get POIs
        if specific_location:
            location = ox.geocode_to_gdf(specific_location)
            pois = gpd.GeoDataFrame([{'name': specific_location, 
                                     'geometry': location.geometry.iloc[0]}])
        else:
            try:
                pois = ox.features_from_place(
                    self.study_area_name,
                    tags=poi_type
                )
            except:
                pois = ox.features_from_polygon(
                    self.study_area.unary_union,
                    tags=poi_type
                )
                
        if len(pois) == 0:
            print(f"No POIs found for {criterion_name}")
            return None
            
        # Get street network
        try:
            G = ox.graph_from_place(
                self.study_area_name,
                network_type='drive'
            )
        except:
            G = ox.graph_from_polygon(
                self.study_area.unary_union,
                network_type='drive'
            )
        
        # Calculate isochrones for each POI
        isochrones = []
        for idx, poi in pois.iterrows():
            try:
                center_node = ox.nearest_nodes(
                    G, 
                    poi.geometry.centroid.x, 
                    poi.geometry.centroid.y
                )
                
                subgraph = nx.ego_graph(
                    G, center_node, 
                    radius=max_minutes*60,
                    distance='travel_time'
                )
                
                node_points = [Point((data['x'], data['y'])) 
                              for node, data in subgraph.nodes(data=True)]
                
                if len(node_points) >= 3:
                    iso_poly = gpd.GeoSeries(node_points).unary_union.convex_hull
                    isochrones.append(iso_poly)
                    
            except Exception as e:
                print(f"Error processing POI {idx}: {e}")
                continue
                
        # Combine all isochrones
        if isochrones:
            combined_polygon = unary_union(isochrones)
            result_gdf = gpd.GeoDataFrame(
                [{'geometry': combined_polygon, 'criterion': criterion_name}],
                crs=self.crs
            )
            
            self.criteria_polygons[criterion_name] = result_gdf
            self.criteria_descriptions[criterion_name] = f"Within {max_minutes} min drive"
            
            return result_gdf
        
        return None
    
    def add_walking_distance_criterion(self, 
                                     poi_type: Union[str, Dict], 
                                     max_distance: float,
                                     distance_unit: str = 'km',
                                     criterion_name: str = None,
                                     specific_location: str = None) -> gpd.GeoDataFrame:
        """
        Add a walking/biking distance criterion using simple buffers.
        """
        print(f"Processing walking/biking criterion: {criterion_name}")
        
        # Convert distance to meters
        if distance_unit == 'miles':
            max_distance_m = max_distance * 1609.34
        else:  # km
            max_distance_m = max_distance * 1000
            
        # Get POIs
        if specific_location:
            location = ox.geocode_to_gdf(specific_location)
            pois = gpd.GeoDataFrame([{'name': specific_location, 
                                     'geometry': location.geometry.iloc[0]}])
        else:
            try:
                pois = ox.features_from_place(
                    self.study_area_name,
                    tags=poi_type
                )
            except:
                pois = ox.features_from_polygon(
                    self.study_area.unary_union,
                    tags=poi_type
                )
            
        if len(pois) == 0:
            print(f"No POIs found for {criterion_name}")
            return None
            
        # Project to local CRS for accurate buffering
        local_crs = pois.estimate_utm_crs()
        pois_projected = pois.to_crs(local_crs)
        
        # Create buffers
        buffers = pois_projected.geometry.buffer(max_distance_m)
        
        # Union all buffers and convert back
        combined_buffer = buffers.unary_union
        result_gdf = gpd.GeoDataFrame(
            [{'geometry': combined_buffer, 'criterion': criterion_name}],
            crs=local_crs
        ).to_crs(self.crs)
        
        # Store results
        self.criteria_polygons[criterion_name] = result_gdf
        self.criteria_descriptions[criterion_name] = f"Within {max_distance} {distance_unit} walk/bike"
        
        return result_gdf
    
    def add_socioeconomic_criterion(self, 
                                  variable: str,
                                  operator: str,
                                  threshold: float,
                                  criterion_name: str,
                                  year: int = 2021) -> gpd.GeoDataFrame:
        """
        Add a socioeconomic criterion based on census data.
        """
        print(f"Processing socioeconomic criterion: {criterion_name}")
        
        try:
            from census import Census
            from us import states
            
            # Initialize Census API
            c = Census("YOUR_CENSUS_API_KEY")
            
            # Determine state from study area centroid
            centroid = self.study_area.geometry.centroid.iloc[0]
            
            # This is a simplified example
            state_fips = '37'  # North Carolina
            
            # Get census tract data
            census_data = c.acs5.state_county_tract(
                fields=('NAME', variable),
                state_fips=state_fips,
                county_fips='*',
                tract='*',
                year=year
            )
            
            # Convert to DataFrame
            df = pd.DataFrame(census_data)
            df[variable] = pd.to_numeric(df[variable], errors='coerce')
            
            # Apply threshold
            operators = {
                '>': df[variable] > threshold,
                '<': df[variable] < threshold,
                '>=': df[variable] >= threshold,
                '<=': df[variable] <= threshold,
                '==': df[variable] == threshold
            }
            
            filtered_df = df[operators[operator]]
            
            print(f"Found {len(filtered_df)} census tracts meeting criteria")
            
            # Store description
            self.criteria_descriptions[criterion_name] = f"{variable} {operator} {threshold}"
            
            return None  # Would return actual tract geometries
            
        except ImportError:
            print("Census package not installed. Install with: pip install census us")
            return None
    
    def combine_criteria(self, 
                        criteria_names: List[str] = None,
                        operation: str = 'intersection') -> gpd.GeoDataFrame:
        """
        Combine multiple criteria into a single result.
        """
        if criteria_names is None:
            criteria_names = list(self.criteria_polygons.keys())
            
        polygons = [self.criteria_polygons[name].geometry.iloc[0] 
                    for name in criteria_names 
                    if name in self.criteria_polygons]
        
        if not polygons:
            print("No valid criteria to combine")
            return None
            
        if operation == 'intersection':
            result_geometry = polygons[0]
            for poly in polygons[1:]:
                result_geometry = result_geometry.intersection(poly)
        else:  # union
            result_geometry = unary_union(polygons)
            
        # Clip to study area
        study_area_union = self.study_area.geometry.unary_union
        result_geometry = result_geometry.intersection(study_area_union)
        
        result_gdf = gpd.GeoDataFrame(
            [{'geometry': result_geometry, 
              'criteria': ', '.join(criteria_names),
              'operation': operation}],
            crs=self.crs
        )
        
        return result_gdf
    
    def visualize_results(self, 
                         show_criteria: List[str] = None,
                         show_combined: bool = True) -> folium.Map:
        """
        Create an interactive map of the results.
        """
        # Create base map
        center = [self.study_area.geometry.centroid.y.iloc[0],
                  self.study_area.geometry.centroid.x.iloc[0]]
        m = folium.Map(location=center, zoom_start=12)
        
        # Add study area
        folium.GeoJson(
            self.study_area.to_json(),
            name='Study Area',
            style_function=lambda x: {'fillColor': 'lightblue', 
                                     'color': 'blue',
                                     'weight': 2,
                                     'fillOpacity': 0.1}
        ).add_to(m)
        
        # Add individual criteria
        if show_criteria:
            colors = ['red', 'green', 'purple', 'orange', 'darkred']
            for i, (name, gdf) in enumerate(self.criteria_polygons.items()):
                if show_criteria is None or name in show_criteria:
                    folium.GeoJson(
                        gdf.to_json(),
                        name=f'{name}: {self.criteria_descriptions.get(name, "")}',
                        style_function=lambda x, c=colors[i % len(colors)]: {
                            'fillColor': c,
                            'color': c,
                            'weight': 2,
                            'fillOpacity': 0.2
                        }
                    ).add_to(m)
        
        # Add layer control
        folium.LayerControl().add_to(m)
        
        return m


In [None]:
    def visualize_results(self, 
                         show_criteria: List[str] = None,
                         show_combined: bool = True) -> folium.Map:
        """
        Create an interactive map of the results.
        """
        # Create base map
        center = [self.study_area.geometry.centroid.y.iloc[0],
                  self.study_area.geometry.centroid.x.iloc[0]]
        m = folium.Map(location=center, zoom_start=12)
        
        # Add study area
        folium.GeoJson(
            self.study_area.to_json(),
            name='Study Area',
            style_function=lambda x: {'fillColor': 'lightblue', 
                                     'color': 'blue',
                                     'weight': 2,
                                     'fillOpacity': 0.1}
        ).add_to(m)
        
        # Add individual criteria
        if show_criteria:
            colors = ['red', 'green', 'purple', 'orange', 'darkred']
            for i, (name, gdf) in enumerate(self.criteria_polygons.items()):
                if show_criteria is None or name in show_criteria:
                    folium.GeoJson(
                        gdf.to_json(),
                        name=f'{name}: {self.criteria_descriptions.get(name, "")}',
                        style_function=lambda x, c=colors[i % len(colors)]: {
                            'fillColor': c,
                            'color': c,
                            'weight': 2,
                            'fillOpacity': 0.2
                        }
                    ).add_to(m)
        
        # Add layer control
        folium.LayerControl().add_to(m)
        
        return m


In [None]:
# Cell 7: Example usage
# Initialize analyzer for Chapel Hill, NC
analyzer = LocationAnalyzer("Chapel Hill, NC")

# Add criteria
# 1. Within 10 minutes drive of a grocery store
analyzer.add_driving_distance_criterion(
    poi_type={'shop': 'supermarket'},
    max_minutes=10,
    criterion_name='grocery_10min_drive'
)

# 2. Within 1 mile walk of a park
analyzer.add_walking_distance_criterion(
    poi_type={'leisure': 'park'},
    max_distance=1,
    distance_unit='miles',
    criterion_name='park_1mi_walk'
)

# 3. Within 15 minutes drive of specific school
analyzer.add_driving_distance_criterion(
    poi_type=None,
    max_minutes=15,
    criterion_name='school_15min_drive',
    specific_location='Phillips Middle School, Chapel Hill, NC'
)

# Combine criteria (intersection = all must be met)
result = analyzer.combine_criteria(operation='intersection')

# Visualize
map_viz = analyzer.visualize_results()
map_viz


In [9]:
# Cell 3: Main LocationAnalyzer class
class LocationAnalyzer:
    """
    A tool for identifying locations that meet multiple spatial and demographic criteria.
    """
    
    def __init__(self, study_area: Union[str, gpd.GeoDataFrame], 
                 crs: str = 'EPSG:4326'):
        """
        Initialize the analyzer with a study area.
        
        Parameters:
        -----------
        study_area : str or GeoDataFrame
            Either a place name string (e.g., "Chapel Hill, NC") or a GeoDataFrame
        crs : str
            Coordinate Reference System (default: WGS84)
        """
        self.crs = crs
        
        # Set up study area
        if isinstance(study_area, str):
            self.study_area_name = study_area
            self.study_area = ox.geocode_to_gdf(study_area)
            self.study_area = self.study_area.to_crs(crs)
        else:
            self.study_area = study_area.to_crs(crs)
            self.study_area_name = "Custom Area"
            
        # Get the bounding box
        self.bbox = self.study_area.total_bounds
        
        # Initialize results storage
        self.criteria_polygons = {}
        self.criteria_descriptions = {}
        
    def add_driving_distance_criterion(self, 
                                     poi_type: Union[str, Dict], 
                                     max_minutes: int,
                                     criterion_name: str,
                                     specific_location: str = None) -> gpd.GeoDataFrame:
        """
        Add a driving distance criterion.
        
        Parameters:
        -----------
        poi_type : str or dict
            OSM tags for POI type (e.g., {'amenity': 'school'}) or None for specific location
        max_minutes : int
            Maximum driving time in minutes
        criterion_name : str
            Name for this criterion
        specific_location : str, optional
            Address or name of specific location (overrides poi_type)
        """
        print(f"Processing driving criterion: {criterion_name}")
        
        # Get POIs
        if specific_location:
            # Geocode specific location
            location = ox.geocode_to_gdf(specific_location)
            pois = gpd.GeoDataFrame([{'name': specific_location, 
                                     'geometry': location.geometry.iloc[0]}])
        else:
            # Get POIs by type - FIXED: use correct bbox format
            pois = ox.features_from_bbox(
                bbox=(self.bbox[3], self.bbox[1], self.bbox[2], self.bbox[0]),  # (north, south, east, west)
                tags=poi_type
            )
            
        if len(pois) == 0:
            print(f"No POIs found for {criterion_name}")
            return None
            
        # Get street network - FIXED: use correct bbox format
        G = ox.graph_from_bbox(
            bbox=(self.bbox[3], self.bbox[1], self.bbox[2], self.bbox[0]),  # (north, south, east, west)
            network_type='drive'
        )
        
        # Calculate isochrones for each POI
        isochrones = []
        for idx, poi in pois.iterrows():
            try:
                # Get nearest node
                center_node = ox.nearest_nodes(
                    G, 
                    poi.geometry.centroid.x, 
                    poi.geometry.centroid.y
                )
                
                # Calculate isochrone
                subgraph = nx.ego_graph(
                    G, center_node, 
                    radius=max_minutes*60,  # Convert to seconds
                    distance='travel_time'
                )
                
                node_points = [Point((data['x'], data['y'])) 
                              for node, data in subgraph.nodes(data=True)]
                
                if len(node_points) >= 3:
                    iso_poly = gpd.GeoSeries(node_points).unary_union.convex_hull
                    isochrones.append(iso_poly)
                    
            except Exception as e:
                print(f"Error processing POI {idx}: {e}")
                continue
                
        # Combine all isochrones
        if isochrones:
            combined_polygon = unary_union(isochrones)
            result_gdf = gpd.GeoDataFrame(
                [{'geometry': combined_polygon, 'criterion': criterion_name}],
                crs=self.crs
            )
            
            # Store results
            self.criteria_polygons[criterion_name] = result_gdf
            self.criteria_descriptions[criterion_name] = f"Within {max_minutes} min drive"
            
            return result_gdf
        
        return None

In [3]:
# Cell 4: Add walking/biking distance methods
def add_walking_distance_criterion(self, 
                                 poi_type: Union[str, Dict], 
                                 max_distance: float,
                                 distance_unit: str = 'km',
                                 criterion_name: str = None,
                                 specific_location: str = None) -> gpd.GeoDataFrame:
    """
    Add a walking/biking distance criterion using simple buffers.
    
    Parameters:
    -----------
    poi_type : str or dict
        OSM tags for POI type
    max_distance : float
        Maximum distance
    distance_unit : str
        'km' or 'miles'
    criterion_name : str
        Name for this criterion
    specific_location : str, optional
        Specific location name/address
    """
    print(f"Processing walking/biking criterion: {criterion_name}")
    
    # Convert distance to meters
    if distance_unit == 'miles':
        max_distance_m = max_distance * 1609.34
    else:  # km
        max_distance_m = max_distance * 1000
        
    # Get POIs
    if specific_location:
        location = ox.geocode_to_gdf(specific_location)
        pois = gpd.GeoDataFrame([{'name': specific_location, 
                                 'geometry': location.geometry.iloc[0]}])
    else:
        pois = ox.features_from_bbox(
            north=self.bbox[3], south=self.bbox[1],
            east=self.bbox[2], west=self.bbox[0],
            tags=poi_type
        )
        
    if len(pois) == 0:
        print(f"No POIs found for {criterion_name}")
        return None
        
    # Project to local CRS for accurate buffering
    local_crs = f'EPSG:{32600 + int((self.bbox[0] + 180) / 6) + 1}'
    pois_projected = pois.to_crs(local_crs)
    
    # Create buffers
    buffers = pois_projected.geometry.buffer(max_distance_m)
    
    # Union all buffers and convert back
    combined_buffer = buffers.unary_union
    result_gdf = gpd.GeoDataFrame(
        [{'geometry': combined_buffer, 'criterion': criterion_name}],
        crs=local_crs
    ).to_crs(self.crs)
    
    # Store results
    self.criteria_polygons[criterion_name] = result_gdf
    self.criteria_descriptions[criterion_name] = f"Within {max_distance} {distance_unit} walk/bike"
    
    return result_gdf

# Add method to class
LocationAnalyzer.add_walking_distance_criterion = add_walking_distance_criterion


In [4]:
# Cell 5: Add census/demographic methods
def add_socioeconomic_criterion(self, 
                              variable: str,
                              operator: str,
                              threshold: float,
                              criterion_name: str,
                              year: int = 2021) -> gpd.GeoDataFrame:
    """
    Add a socioeconomic criterion based on census data.
    
    Parameters:
    -----------
    variable : str
        Census variable code (e.g., 'B19013_001E' for median household income)
    operator : str
        Comparison operator: '>', '<', '>=', '<=', '=='
    threshold : float
        Threshold value
    criterion_name : str
        Name for this criterion
    year : int
        Census year (default: 2021)
    """
    print(f"Processing socioeconomic criterion: {criterion_name}")
    
    try:
        from census import Census
        from us import states
        
        # Initialize Census API (you'll need to get your own key)
        # Get one at: https://api.census.gov/data/key_signup.html
        c = Census("YOUR_CENSUS_API_KEY")
        
        # Determine state from study area centroid
        centroid = self.study_area.geometry.centroid.iloc[0]
        
        # This is a simplified example - you'd need to implement state detection
        state_fips = '37'  # North Carolina example
        
        # Get census tract data
        census_data = c.acs5.state_county_tract(
            fields=('NAME', variable),
            state_fips=state_fips,
            county_fips='*',
            tract='*',
            year=year
        )
        
        # Convert to DataFrame
        df = pd.DataFrame(census_data)
        df[variable] = pd.to_numeric(df[variable], errors='coerce')
        
        # Apply threshold
        operators = {
            '>': df[variable] > threshold,
            '<': df[variable] < threshold,
            '>=': df[variable] >= threshold,
            '<=': df[variable] <= threshold,
            '==': df[variable] == threshold
        }
        
        filtered_df = df[operators[operator]]
        
        # Get tract geometries (simplified - you'd need tract shapefiles)
        # For now, return a placeholder
        print(f"Found {len(filtered_df)} census tracts meeting criteria")
        
        # Store description
        self.criteria_descriptions[criterion_name] = f"{variable} {operator} {threshold}"
        
        return None  # Would return actual tract geometries
        
    except ImportError:
        print("Census package not installed. Install with: pip install census us")
        return None

# Add method to class
LocationAnalyzer.add_socioeconomic_criterion = add_socioeconomic_criterion


In [5]:
# Cell 6: Methods for combining criteria
def combine_criteria(self, 
                    criteria_names: List[str] = None,
                    operation: str = 'intersection') -> gpd.GeoDataFrame:
    """
    Combine multiple criteria into a single result.
    
    Parameters:
    -----------
    criteria_names : list of str
        Names of criteria to combine (None = use all)
    operation : str
        'intersection' (AND) or 'union' (OR)
    """
    if criteria_names is None:
        criteria_names = list(self.criteria_polygons.keys())
        
    polygons = [self.criteria_polygons[name].geometry.iloc[0] 
                for name in criteria_names 
                if name in self.criteria_polygons]
    
    if not polygons:
        print("No valid criteria to combine")
        return None
        
    if operation == 'intersection':
        result_geometry = polygons[0]
        for poly in polygons[1:]:
            result_geometry = result_geometry.intersection(poly)
    else:  # union
        result_geometry = unary_union(polygons)
        
    # Clip to study area
    study_area_union = self.study_area.geometry.unary_union
    result_geometry = result_geometry.intersection(study_area_union)
    
    result_gdf = gpd.GeoDataFrame(
        [{'geometry': result_geometry, 
          'criteria': ', '.join(criteria_names),
          'operation': operation}],
        crs=self.crs
    )
    
    return result_gdf

def visualize_results(self, 
                     show_criteria: List[str] = None,
                     show_combined: bool = True) -> folium.Map:
    """
    Create an interactive map of the results.
    """
    # Create base map
    center = [self.study_area.geometry.centroid.y.iloc[0],
              self.study_area.geometry.centroid.x.iloc[0]]
    m = folium.Map(location=center, zoom_start=12)
    
    # Add study area
    folium.GeoJson(
        self.study_area.to_json(),
        name='Study Area',
        style_function=lambda x: {'fillColor': 'lightblue', 
                                 'color': 'blue',
                                 'weight': 2,
                                 'fillOpacity': 0.1}
    ).add_to(m)
    
    # Add individual criteria
    if show_criteria:
        colors = ['red', 'green', 'purple', 'orange', 'darkred']
        for i, (name, gdf) in enumerate(self.criteria_polygons.items()):
            if show_criteria is None or name in show_criteria:
                folium.GeoJson(
                    gdf.to_json(),
                    name=f'{name}: {self.criteria_descriptions.get(name, "")}',
                    style_function=lambda x, c=colors[i % len(colors)]: {
                        'fillColor': c,
                        'color': c,
                        'weight': 2,
                        'fillOpacity': 0.2
                    }
                ).add_to(m)
    
    # Add layer control
    folium.LayerControl().add_to(m)
    
    return m

# Add methods to class
LocationAnalyzer.combine_criteria = combine_criteria
LocationAnalyzer.visualize_results = visualize_results


In [11]:
# Cell 7: Example usage
# Initialize analyzer for Chapel Hill, NC
analyzer = LocationAnalyzer("Chapel Hill, NC")

# Add criteria
# 1. Within 10 minutes drive of a grocery store
analyzer.add_driving_distance_criterion(
    poi_type={'shop': 'supermarket'},
    max_minutes=10,
    criterion_name='grocery_10min_drive'
)

# 2. Within 1 mile walk of a park
analyzer.add_walking_distance_criterion(
    poi_type={'leisure': 'park'},
    max_distance=1,
    distance_unit='miles',
    criterion_name='park_1mi_walk'
)

# 3. Within 15 minutes drive of specific school
analyzer.add_driving_distance_criterion(
    poi_type=None,
    max_minutes=15,
    criterion_name='school_15min_drive',
    specific_location='Phillips Middle School, Chapel Hill, NC'
)

# Combine criteria (intersection = all must be met)
result = analyzer.combine_criteria(operation='intersection')

# Visualize
map_viz = analyzer.visualize_results()
map_viz


Processing driving criterion: grocery_10min_drive


KeyboardInterrupt: 

In [6]:
analyzer = LocationAnalyzer("Chapel Hill, NC")
analyzer

<__main__.LocationAnalyzer at 0x1dd1186b380>

In [10]:
analyzer.add_driving_distance_criterion(
    poi_type={'shop': 'supermarket'},
    max_minutes=10,
    criterion_name='grocery_10min_drive'
)

Processing driving criterion: grocery_10min_drive


TypeError: features_from_bbox() got an unexpected keyword argument 'north'