In [1]:
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point
import googlemaps
from tqdm import tqdm
import os
from dotenv import load_dotenv

In [2]:
# --- 1. CONFIGURATION & SETUP ---
load_dotenv()
API_KEY = os.getenv('GOOGLE_API_KEY')
if not API_KEY:
    raise ValueError("API Key not found in .env file.")
gmaps = googlemaps.Client(key=API_KEY)

# Define file paths
input_path = '../data/processed/manhattan_sales_geocoded.parquet'
subway_data_path = '../data/raw/mta_subway_entrances_2024.csv'
output_path = '../data/processed/manhattan_sales_transit.parquet'
checkpoint_path = '../data/processed/transit_checkpoint.parquet'

# Number of nearest candidates to check with the API
K_NEAREST_CANDIDATES = 10

In [3]:
# --- 2. LOAD & PREPARE GEOSPATIAL DATA ---
print("--- Loading and Preparing Geospatial Data ---")
df_props = pd.read_parquet(input_path)
geometry_props = [Point(xy) for xy in zip(df_props['longitude'], df_props['latitude'])]
gdf_props = gpd.GeoDataFrame(df_props, geometry=geometry_props, crs="EPSG:4326")
print(f"Loaded {len(gdf_props)} properties.")

df_subway = pd.read_csv(subway_data_path)
df_subway.columns = df_subway.columns.str.lower().str.replace(' ', '_')
df_subway_manhattan = df_subway[df_subway['borough'] == 'M'].dropna(subset=['entrance_latitude', 'entrance_longitude']).copy()
geometry_subway = [Point(xy) for xy in zip(df_subway_manhattan['entrance_longitude'], df_subway_manhattan['entrance_latitude'])]
gdf_subway = gpd.GeoDataFrame(df_subway_manhattan, geometry=geometry_subway, crs="EPSG:4326")
print(f"Loaded {len(gdf_subway)} Manhattan subway entrances.")

--- Loading and Preparing Geospatial Data ---
Loaded 6505 properties.
Loaded 868 Manhattan subway entrances.


In [4]:
# --- 3. BUILD SPATIAL INDEX ---
print("\nBuilding R-tree spatial index...")
subway_sindex = gdf_subway.sindex
print("Spatial index built successfully.")


Building R-tree spatial index...
Spatial index built successfully.


In [5]:
# --- 4. DEFINE THE CORRECTED, VERSION-COMPATIBLE API FUNCTION ---
def get_nearest_subway_optimized(property_geom, subway_gdf, subway_sindex, verbose=False):
    """
    Finds nearest subway using a version-compatible method. It queries a bounding
    box and then finds the closest candidates within that box.
    """
    if pd.isna(property_geom):
        return None, None
    if verbose: print(f"\nProcessing property at ({property_geom.y:.4f}, {property_geom.x:.4f})")
        
    try:
        # Step 1 (The Fix): Query by a bounding box instead of using num_results.
        buffer_dist = 0.01 # ~1km radius search area
        search_bounds = property_geom.buffer(buffer_dist).bounds
        possible_matches_indices = list(subway_sindex.intersection(search_bounds))
        
        if not possible_matches_indices:
            if verbose: print("  -> No subway candidates found in initial search radius.")
            return None, None

        # Step 2: From the candidates found, find the K nearest by actual distance.
        candidate_subways = subway_gdf.iloc[possible_matches_indices]
        distances = candidate_subways.distance(property_geom)
        closest_k_indices = distances.nsmallest(K_NEAREST_CANDIDATES).index
        final_candidates = subway_gdf.loc[closest_k_indices]
        
        candidate_locations = list(zip(final_candidates['entrance_latitude'], final_candidates['entrance_longitude']))
        if verbose: print(f"  -> Found {len(candidate_locations)} nearby candidates to check with API.")
        
    except Exception as e:
        if verbose: print(f"  -> ERROR finding nearest candidates: {e}")
        return None, None

    # Step 3: Make the API call with the refined candidate list.
    try:
        result = gmaps.distance_matrix(
            origins=[(property_geom.y, property_geom.x)],
            destinations=candidate_locations,
            mode='walking'
        )
        if verbose: print("  -> Successfully received API response.")
        
        elements = result['rows'][0]['elements']
        min_distance = float('inf')
        min_duration = float('inf')
        
        for el in elements:
            if el['status'] == 'OK':
                if el['distance']['value'] < min_distance:
                    min_distance = el['distance']['value']
                    min_duration = el['duration']['value']
        
        if min_distance != float('inf'):
            if verbose: print(f"  -> SUCCESS: Found nearest subway at {min_distance:.0f} meters.")
            return min_distance, min_duration
        else:
            if verbose: print("  -> WARNING: API call successful, but no 'OK' status found in results.")
            return None, None
            
    except Exception as e:
        if verbose: print(f"  -> API ERROR: {e}")
        return None, None

In [6]:
# --- 5. IMPLEMENT CHECKPOINTING ---
try:
    # Ensure checkpoint file exists before trying to delete
    if os.path.exists(checkpoint_path):
        print("A checkpoint file from a previous failed run was found. Forcing a fresh start.")
        os.remove(checkpoint_path)
    df_checkpoint = pd.read_parquet(checkpoint_path) # This will now fail, triggering the 'except' block
except FileNotFoundError:
    print("\nNo transit checkpoint file found or was just deleted. Starting from scratch.")
    df_to_process = gdf_props.copy()
    results_list = []

A checkpoint file from a previous failed run was found. Forcing a fresh start.

No transit checkpoint file found or was just deleted. Starting from scratch.


In [7]:
# --- 6. EXECUTE THE OPTIMIZED & ROBUST WORKFLOW ---
if not df_to_process.empty:
    print(f"\n--- Starting OPTIMIZED transit analysis for {len(df_to_process)} properties. ---")
    
    for index, row in tqdm(df_to_process.iterrows(), total=len(df_to_process)):
        is_verbose = (len(results_list) < 5) 
        
        dist, time = get_nearest_subway_optimized(row['geometry'], gdf_subway, subway_sindex, verbose=is_verbose)
        results_list.append({'index': index, 'subway_distance_meters': dist, 'subway_walk_time_seconds': time})
        
        # Save progress every 100 rows
        if len(results_list) % 100 == 0:
            temp_df = pd.DataFrame(results_list).set_index('index')
            temp_df.to_parquet(checkpoint_path)


--- Starting OPTIMIZED transit analysis for 6505 properties. ---


  0%|                                                                                                        | 0/6505 [00:00<?, ?it/s]


Processing property at (40.7229, -73.9774)
  -> Found 10 nearby candidates to check with API.



  distances = candidate_subways.distance(property_geom)
  0%|                                                                                                | 3/6505 [00:00<13:35,  7.97it/s]

  -> Successfully received API response.
  -> SUCCESS: Found nearest subway at 1202 meters.

Processing property at (40.7237, -73.9774)
  -> Found 10 nearby candidates to check with API.
  -> Successfully received API response.
  -> SUCCESS: Found nearest subway at 1106 meters.

Processing property at (40.7470, -74.0041)
  -> Found 10 nearby candidates to check with API.
  -> Successfully received API response.
  -> SUCCESS: Found nearest subway at 561 meters.

Processing property at (40.7395, -74.0003)
  -> Found 10 nearby candidates to check with API.


  0%|                                                                                                | 5/6505 [00:00<15:26,  7.02it/s]

  -> Successfully received API response.
  -> SUCCESS: Found nearest subway at 160 meters.

Processing property at (40.7421, -74.0000)
  -> Found 10 nearby candidates to check with API.
  -> Successfully received API response.
  -> SUCCESS: Found nearest subway at 198 meters.


100%|█████████████████████████████████████████████████████████████████████████████████████████████| 6505/6505 [11:20<00:00,  9.56it/s]


In [8]:
# --- 7. FINALIZE, JOIN, AND SAVE ---
print("\nProcessing complete. Finalizing dataset...")

results_df = pd.DataFrame(results_list).set_index('index')
df_final = gdf_props.join(results_df)
df_final = pd.DataFrame(df_final.drop(columns='geometry'))

print(f"\nSaving final transit-enriched dataset with {len(df_final)} rows to {output_path}")
df_final.to_parquet(output_path)
print("File saved successfully!")

# Optional: Clean up the checkpoint file after a successful run
if os.path.exists(checkpoint_path):
    os.remove(checkpoint_path)

# --- 8. FINAL INSPECTION ---
print("\n--- Final DataFrame Head with New Transit Features ---")
display(df_final[['address', 'latitude', 'longitude', 'subway_distance_meters', 'subway_walk_time_seconds']].head())
print("\nNull count for new columns:")
print(df_final[['subway_distance_meters', 'subway_walk_time_seconds']].isnull().sum())


Processing complete. Finalizing dataset...

Saving final transit-enriched dataset with 6505 rows to ../data/processed/manhattan_sales_transit.parquet
File saved successfully!

--- Final DataFrame Head with New Transit Features ---


Unnamed: 0,address,latitude,longitude,subway_distance_meters,subway_walk_time_seconds
7664,743 EAST 6TH STREET,40.722937,-73.977391,1202,999
7667,263 EAST 7TH STREET,40.723652,-73.977354,1106,922
7801,483 WEST 22ND STREET,40.746989,-74.004069,561,478
7802,218 WEST 15TH STREET,40.739534,-74.000271,160,130
7803,253 WEST 18TH STREET,40.74214,-74.000014,198,163



Null count for new columns:
subway_distance_meters      0
subway_walk_time_seconds    0
dtype: int64
