In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
!pip install folium
!pip install osmnx
!pip install census
!pip install geopandas



In [3]:
import pandas as pd
import numpy as np
import requests
import json
import folium
from folium import plugins
from sklearn.cluster import DBSCAN, KMeans
from sklearn.preprocessing import StandardScaler
import osmnx as ox
import geopandas as gpd
from shapely.geometry import Point, Polygon
import time
from datetime import datetime, timedelta
import census
import warnings
from google.colab import output
import os
warnings.filterwarnings('ignore')

In [4]:
class ComprehensiveEVAnalysis:
    def __init__(self, city="San Francisco", county="San Francisco County"):
        self.city = city
        self.county = county
        self.state = "California"
        self.state_fips = '06'
        # API Keys
        self.ocm_api_key = "a76b4e24-c8d3-443e-917a-d1f4e0446eac"
        self.census_api_key = "e3bef567fd302740ce2d9bea2c37dd2a8a99fb4d"
        self.tomtom_api_key = "5rjFZwy1OmyRAGhcFDvvL8Y5BXjS63qb"

        self.data_folder = "/content/drive/MyDrive/ev_charging_analysis/"
        if not os.path.exists(self.data_folder):
            os.makedirs(self.data_folder)
            print(f"Created data directory at {self.data_folder}")

    def get_charging_stations(self):
        """Collect charging station data from OpenChargeMap API"""
        csv_path = os.path.join(self.data_folder, f"charging_stations_{self.city.lower().replace(' ', '_')}.csv")

        # Check for valid cached data
        if os.path.exists(csv_path) and os.path.getsize(csv_path) > 0:
            try:
                df = pd.read_csv(csv_path)
                if len(df) > 0:
                    print(f"Loading {len(df)} stations from cache")
                    self.charging_stations = df
                    return True
            except Exception as e:
                print(f"Cache error: {str(e)}")

        print("Fetching fresh data from OpenChargeMap...")

        # OpenChargeMap API endpoint
        url = "https://api.openchargemap.io/v3/poi"
        headers = {
            'X-API-Key': self.ocm_api_key
        }
        params = {
            'countrycode': 'US',
            'maxresults': 1000,
            'latitude': 37.7749,  # SF coordinates
            'longitude': -122.4194,
            'distance': 20,  # 20-mile radius
            'distanceunit': 'miles',
            'compact': True,
            'verbose': False
        }

        try:
            response = requests.get(url, headers=headers, params=params)
            if response.status_code == 200:
                stations = []
                data = response.json()

                for station in data:
                    try:
                        station_data = {
                            'latitude': station['AddressInfo']['Latitude'],
                            'longitude': station['AddressInfo']['Longitude'],
                            'station_name': station['AddressInfo']['Title'],
                            'street_address': station['AddressInfo'].get('AddressLine1', ''),
                            'city': station['AddressInfo'].get('Town', ''),
                            'state': station['AddressInfo'].get('StateOrProvince', ''),
                            'zip': station['AddressInfo'].get('Postcode', ''),
                            'num_points': len(station.get('Connections', [])),
                            'operator': station.get('OperatorInfo', {}).get('Title', 'Unknown'),
                            'usage_cost': station.get('UsageCost', 'Not specified')
                        }
                        stations.append(station_data)
                    except KeyError as e:
                        print(f"Skipping station due to missing data: {e}")

                self.charging_stations = pd.DataFrame(stations)
                self.charging_stations.to_csv(csv_path, index=False)
                print(f"Saved {len(stations)} charging stations")
                return True
            else:
                print(f"API Error: {response.status_code}")
                return False
        except Exception as e:
            print(f"Error fetching charging stations: {str(e)}")
            return False

    def get_census_data(self):
        """Collect and validate demographic data from Census API"""
        csv_path = os.path.join(self.data_folder, f"census_{self.city.lower().replace(' ', '_')}.csv")

        try:
            c = census.Census(self.census_api_key)

            variables = {
                'B01003_001E': 'total_population',
                'B19013_001E': 'median_household_income',
                'B08014_002E': 'vehicles_available',
                'B08301_001E': 'total_commuters',
            }

            data = c.acs5.state_county_tract(
                fields=list(variables.keys()),
                state_fips=self.state_fips,  # California
                county_fips='075',  # San Francisco
                tract='*',
                year=2022
            )

            df = pd.DataFrame(data)
            df = df.rename(columns=variables)

            # Clean and validate income data
            df['median_household_income'] = pd.to_numeric(df['median_household_income'], errors='coerce')
            df = df[df['median_household_income'] > 0]

            self.census_data = df
            df.to_csv(csv_path, index=False)
            print(f"Saved census data for {len(df)} tracts")
            return True

        except Exception as e:
            print(f"Census API error: {str(e)}")
            return False

    def get_traffic_data(self):
        """Collect traffic data from TomTom API"""
        csv_path = os.path.join(self.data_folder, f"traffic_{self.city.lower().replace(' ', '_')}.csv")

        try:
            # Get city boundary
            gdf = ox.geocode_to_gdf(f"{self.city}, {self.state}")
            bounds = gdf.total_bounds

            # Create grid of points to sample traffic
            lat_points = np.linspace(bounds[1], bounds[3], 10)
            lon_points = np.linspace(bounds[0], bounds[2], 10)

            traffic_data = []
            base_url = "https://api.tomtom.com/traffic/services/4/flowSegmentData/relative0/10/json"

            for lat in lat_points:
                for lon in lon_points:
                    params = {
                        'key': self.tomtom_api_key,
                        'point': f"{lat},{lon}"
                    }

                    response = requests.get(base_url, params=params)
                    if response.status_code == 200:
                        data = response.json()
                        if 'flowSegmentData' in data:
                            segment = data['flowSegmentData']
                            traffic_data.append({
                                'latitude': lat,
                                'longitude': lon,
                                'current_speed': segment.get('currentSpeed', 0),
                                'free_flow_speed': segment.get('freeFlowSpeed', 0),
                                'confidence': segment.get('confidence', 0),
                                'congestion_level': segment.get('currentTravelTime', 0) /
                                                  segment.get('freeFlowTravelTime', 1) * 100
                            })

                    time.sleep(0.5)  # Rate limiting

            self.traffic_data = pd.DataFrame(traffic_data)
            self.traffic_data.to_csv(csv_path, index=False)
            print(f"Saved traffic data for {len(traffic_data)} locations")
            return True

        except Exception as e:
            print(f"Traffic API error: {str(e)}")
            return False

    def get_osm_data(self):
      """Collect relevant OpenStreetMap data for EV charging station placement analysis"""
      csv_path = os.path.join(self.data_folder, f"osm_{self.city.lower().replace(' ', '_')}.csv")

      try:
          # Get city boundary
          print("Fetching city boundary...")
          city_gdf = ox.geocode_to_gdf(f"{self.city}, {self.state}")
          boundary = city_gdf.iloc[0].geometry

          # Download street network
          print("Downloading street network...")
          G = ox.graph_from_polygon(boundary, network_type='drive')
          nodes, edges = ox.graph_to_gdfs(G)

          # Download relevant POIs
          print("Downloading points of interest...")
          tags = {
              'amenity': ['parking', 'restaurant', 'shopping_mall', 'fuel'],
              'building': ['commercial', 'retail', 'office'],
              'shop': ['supermarket', 'mall'],
              'leisure': ['fitness_centre', 'sports_centre'],
              'highway': ['motorway', 'trunk', 'primary', 'secondary']
          }

          pois_data = []
          for key, values in tags.items():
              for value in values:
                  print(f"Fetching {value} locations...")
                  try:
                      # Using features_from_polygon instead of geometries_from_polygon
                      pois = ox.features_from_polygon(boundary, tags={key: value})
                      if not pois.empty:
                          pois['category'] = value
                          pois['type'] = key
                          # Extract relevant columns
                          keep_cols = ['category', 'type', 'name', 'geometry']
                          existing_cols = [col for col in keep_cols if col in pois.columns]
                          pois = pois[existing_cols]
                          pois_data.append(pois)
                  except Exception as e:
                      print(f"Warning: Could not fetch {value} locations: {str(e)}")
                      continue

          if pois_data:
              all_pois = pd.concat(pois_data, ignore_index=True)

              # Convert to more manageable format
              poi_records = []
              for idx, row in all_pois.iterrows():
                  try:
                      if isinstance(row.geometry, (Point, Polygon)):
                          # Get centroid if it's a polygon
                          point = row.geometry.centroid if isinstance(row.geometry, Polygon) else row.geometry
                          record = {
                              'poi_id': idx,
                              'category': row.get('category', 'unknown'),
                              'type': row.get('type', 'unknown'),
                              'name': row.get('name', 'unnamed'),
                              'latitude': point.y,
                              'longitude': point.x
                          }
                          poi_records.append(record)
                  except Exception as e:
                      print(f"Warning: Error processing POI {idx}: {str(e)}")
                      continue

              # Create DataFrame and save
              osm_df = pd.DataFrame(poi_records)
              osm_df.to_csv(csv_path, index=False)
              print(f"Saved {len(osm_df)} POIs to {csv_path}")

              # Store in class instance
              self.osm_data = osm_df

              return True

      except Exception as e:
          print(f"Error collecting OSM data: {str(e)}")
          return False

    def process_osm_features(self):
      """Process OSM data to extract relevant features for ML model"""
      if not hasattr(self, 'osm_data'):
          print("OSM data not available. Please run get_osm_data first.")
          return None

      try:
          features = []
          if hasattr(self, 'network_data'):
              G = self.network_data['graph']
          else:
              print("Warning: Network data not available. Some features will be missing.")
              G = None

          # Create grid of potential charging station locations
          bounds = self.osm_data[['latitude', 'longitude']].agg(['min', 'max'])
          lat_grid = np.linspace(bounds.latitude['min'], bounds.latitude['max'], 50)  # Reduced from 100 for performance
          lon_grid = np.linspace(bounds.longitude['min'], bounds.longitude['max'], 50)

          total_points = len(lat_grid) * len(lon_grid)
          processed_points = 0

          for lat in lat_grid:
              for lon in lon_grid:
                  # Calculate features for this location
                  point = Point(lon, lat)

                  # Count nearby POIs by category
                  nearby_pois = {}
                  for category in self.osm_data['category'].unique():
                      category_pois = self.osm_data[self.osm_data['category'] == category]
                      points = [Point(row['longitude'], row['latitude']) for _, row in category_pois.iterrows()]
                      nearby = sum(1 for p in points if point.distance(p) <= 0.001)  # Roughly 100m
                      nearby_pois[f'nearby_{category}'] = nearby

                  feature_dict = {
                      'latitude': lat,
                      'longitude': lon,
                      **nearby_pois
                  }

                  # Add network features if available
                  if G is not None:
                      try:
                          nearest_node = ox.nearest_nodes(G, lon, lat)
                          node_centrality = self.network_data.get('node_centrality', {}).get(nearest_node, 0)
                          feature_dict['node_centrality'] = node_centrality
                      except Exception as e:
                          print(f"Warning: Could not calculate network features for point ({lat}, {lon}): {str(e)}")
                          feature_dict['node_centrality'] = 0

                  features.append(feature_dict)

                  # Update progress
                  processed_points += 1
                  if processed_points % 100 == 0:
                      print(f"Processed {processed_points}/{total_points} points...")

          features_df = pd.DataFrame(features)
          features_path = os.path.join(self.data_folder, f"osm_features_{self.city.lower().replace(' ', '_')}.csv")
          features_df.to_csv(features_path, index=False)
          print(f"Saved {len(features_df)} location features to {features_path}")

          self.osm_features = features_df
          return features_df

      except Exception as e:
          print(f"Error processing OSM features: {str(e)}")
          return None

    def visualize_data(self):
        """Create comprehensive visualization of all data"""
        if not hasattr(self, 'charging_stations') or len(self.charging_stations) == 0:
            print("No charging station data available")
            return None

        try:
            # Create base map
            center_lat = self.charging_stations['latitude'].mean()
            center_lon = self.charging_stations['longitude'].mean()
            m = folium.Map(location=[center_lat, center_lon], zoom_start=13)

            # Add charging stations
            station_group = folium.FeatureGroup(name='Charging Stations')
            for _, station in self.charging_stations.iterrows():
                popup_content = f"""
                <b>{station['station_name']}</b><br>
                Address: {station['street_address']}<br>
                City: {station['city']}<br>
                Operator: {station['operator']}<br>
                Cost: {station['usage_cost']}<br>
                Connection Points: {station['num_points']}
                """

                folium.CircleMarker(
                    location=[station['latitude'], station['longitude']],
                    radius=8,
                    color='blue',
                    fill=True,
                    popup=folium.Popup(popup_content, max_width=300)
                ).add_to(station_group)
            station_group.add_to(m)

            # Add traffic data if available
            if hasattr(self, 'traffic_data') and len(self.traffic_data) > 0:
                traffic_group = folium.FeatureGroup(name='Traffic Conditions')
                for _, point in self.traffic_data.iterrows():
                    color = 'green' if point['congestion_level'] < 50 else \
                           'yellow' if point['congestion_level'] < 75 else 'red'

                    folium.CircleMarker(
                        location=[point['latitude'], point['longitude']],
                        radius=5,
                        color=color,
                        fill=True,
                        popup=f"Congestion: {point['congestion_level']:.1f}%"
                    ).add_to(traffic_group)
                traffic_group.add_to(m)

            # Add layer control
            folium.LayerControl().add_to(m)

            # Save and display map
            map_path = os.path.join(self.data_folder, f"ev_analysis_map_{self.city.lower().replace(' ', '_')}.html")
            m.save(map_path)
            print(f"Saved interactive map to {map_path}")

            return m

        except Exception as e:
            print(f"Visualization error: {str(e)}")
            return None

In [5]:
# Initialize analysis
city = "San Francisco"
analysis = ComprehensiveEVAnalysis(city)

# Collect data
print("1. Collecting charging station data...")
analysis.get_charging_stations()

print("\n2. Collecting census data...")
analysis.get_census_data()

print("\n3. Collecting traffic data...")
analysis.get_traffic_data()

print("\n4. Collecting OpenStreetMap data...")
analysis.get_osm_data()
print("\n5. Processing OSM features...")
analysis.process_osm_features()

1. Collecting charging station data...
Fetching fresh data from OpenChargeMap...
Saved 1000 charging stations

2. Collecting census data...
Saved census data for 235 tracts

3. Collecting traffic data...
Saved traffic data for 31 locations

4. Collecting OpenStreetMap data...
Fetching city boundary...
Downloading street network...
Downloading points of interest...
Fetching parking locations...
Fetching restaurant locations...
Fetching shopping_mall locations...
Fetching fuel locations...
Fetching commercial locations...
Fetching retail locations...
Fetching office locations...
Fetching supermarket locations...
Fetching mall locations...
Fetching fitness_centre locations...
Fetching sports_centre locations...
Fetching motorway locations...
Fetching trunk locations...
Fetching primary locations...
Fetching secondary locations...
Saved 4237 POIs to /content/drive/MyDrive/ev_charging_analysis/osm_san_francisco.csv

5. Processing OSM features...
Processed 100/2500 points...
Processed 200/25

Unnamed: 0,latitude,longitude,nearby_parking,nearby_restaurant,nearby_fuel,nearby_commercial,nearby_retail,nearby_office,nearby_supermarket,nearby_mall,nearby_fitness_centre,nearby_sports_centre
0,37.707918,-122.514021,0,0,0,0,0,0,0,0,0,0
1,37.707918,-122.510887,0,0,0,0,0,0,0,0,0,0
2,37.707918,-122.507754,0,0,0,0,0,0,0,0,0,0
3,37.707918,-122.504620,0,0,0,0,0,0,0,0,0,0
4,37.707918,-122.501487,0,0,0,0,0,0,0,0,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...
2495,37.829249,-122.373009,0,0,0,0,0,0,0,0,0,0
2496,37.829249,-122.369875,0,0,0,0,0,0,0,0,0,0
2497,37.829249,-122.366742,0,0,0,0,0,0,0,0,0,0
2498,37.829249,-122.363608,0,0,0,0,0,0,0,0,0,0


In [6]:
# Create visualization
print("\n6. Creating visualization...")
map_viz = analysis.visualize_data()
if map_viz:
    display(map_viz)


6. Creating visualization...
Saved interactive map to /content/drive/MyDrive/ev_charging_analysis/ev_analysis_map_san_francisco.html


In [7]:
# Print summary statistics
print("\nSummary Statistics:")
if hasattr(analysis, 'charging_stations'):
    print(f"Total charging stations: {len(analysis.charging_stations)}")
if hasattr(analysis, 'census_data'):
    print(f"Average household income: ${analysis.census_data['median_household_income'].mean():,.2f}")
    print(f"Total population: {analysis.census_data['total_population'].sum():,.0f}")
if hasattr(analysis, 'traffic_data'):
    print(f"Average congestion level: {analysis.traffic_data['congestion_level'].mean():.1f}%")


Summary Statistics:
Total charging stations: 1000
Average household income: $141,218.76
Total population: 844,064
Average congestion level: 115.2%
