In [None]:
pip install osmnx geopandas requests descartes contextily tqdm

In [None]:
import numpy as np
import pandas as pd 
import geopandas as gpd
import matplotlib.pyplot as plt
import osmnx as ox
import networkx as nx
from descartes import PolygonPatch
from shapely.geometry import Point, Polygon, MultiPolygon, LineString
from tqdm.auto import tqdm
import shapely.speedups
shapely.speedups.enable() 
import warnings
warnings.filterwarnings('ignore')
from matplotlib.patches import PathPatch
from matplotlib.path import Path


In [None]:
PLACE_NAME = 'Florence, Italy'
name = 'Florence'
grid_size = 500

In [None]:
florence = ox.geocode_to_gdf(PLACE_NAME)
print(florence.crs)
# changing CRS for grid division
florence = ox.project_gdf(florence) 
print(florence.crs)

# Plot and store the axis object
ax = florence.plot()

# Customize the font size of the tick labels on the axes
ax.tick_params(axis='both', which='major', labelsize=8)

# Display the plot
plt.show()

# Print the type of geometry for your information
geometry = florence['geometry'].iloc[0]
print(type(geometry))

In [None]:
#save geometry
#florence.to_file(f"D:/charging_stations/task2/map/{florence}.shp")

Division into grid

In [None]:
geometry_cut = ox.utils_geo._quadrat_cut_geometry(geometry, quadrat_width=grid_size) #500 Meter grid
print(type(geometry_cut))
polylist = [poly for poly in geometry_cut.geoms]

In [None]:
import shapely.geometry as sg
polylist = [poly for poly in geometry_cut.geoms]
# plot city
west, south, east, north = florence.unary_union.bounds

# Manual patch creation
fig, ax = plt.subplots(figsize=(40, 40))
for polygon in geometry_cut.geoms:
    if isinstance(polygon, sg.Polygon) and not polygon.is_empty:
        verts = np.array(polygon.exterior.coords)
        codes = np.full(len(verts), Path.LINETO)
        codes[0] = Path.MOVETO
        codes[-1] = Path.CLOSEPOLY
        path = Path(verts, codes)
        patch = PathPatch(path, facecolor='white', edgecolor='black', alpha=0.5)
        ax.add_patch(patch)

ax.set_xlim(west, east)
ax.set_ylim(south, north)
ax.axis('off')
plt.show()

In [None]:
florence_polyframe = gpd.GeoDataFrame(geometry=polylist)
florence_polyframe.crs = florence.crs
print(florence_polyframe.crs)
florence_polyframe.head()

In [None]:
import contextily as ctx
# ctx uses epsg:3857ax = city.plot()
polyframe_3857 = florence_polyframe.to_crs(epsg=3857)
west, south, east, north = polyframe_3857.unary_union.bounds

ax = polyframe_3857.plot(figsize=(40,40), alpha=0.5, edgecolor='k')
ctx.add_basemap(ax, zoom=13)
ax.set_xlim(west, east)
ax.set_ylim(south, north)

## POIs

In [None]:
florence_polyframe_lat_long = ox.project_gdf(florence_polyframe, to_latlong=True) #Changing CRS to (Lat, long)
florence_polyframe_lat_long.head()

# EV Stations

In [None]:
EV_stations = ox.geometries_from_place(
    PLACE_NAME,
    {"amenity": "charging_station"},
)

In [None]:
EV_stations.head()

In [None]:
#plot EV stations geo data
EV_stations.plot(figsize=(10,10))
plt.title("Florence EV stations")

In [None]:
fig, ax = plt.subplots(figsize=(10,10))
ox.project_gdf(florence, to_latlong=True).plot(ax =ax, alpha=0.5, edgecolor='k')
EV_stations.plot(ax=ax, color='green', marker='*', markersize=10)
#add legend
plt.legend(['EV stations'])
plt.title("Florence EV stations")

In [None]:
#intersection of points and polygons
import shapely.speedups
shapely.speedups.enable() 

masks_EV = []
for i in tqdm(range(0,florence_polyframe.shape[0])):
    pip_mask = EV_stations.within(florence_polyframe_lat_long.loc[i, 'geometry']) 
    masks_EV.append(pip_mask)

In [None]:
# create a new geodataframe with mask EV stations 
EV_stations_masked = gpd.GeoDataFrame()
for i in tqdm(range(0,florence_polyframe.shape[0])):
    EV_stations_masked = pd.concat([EV_stations_masked, EV_stations[masks_EV[i]][['geometry']]], ignore_index=True)

In [None]:
florence_polyframe_lat_long_new = florence_polyframe_lat_long.copy()
florence_polyframe_lat_long_new['EV_stations_counts'] = 0
florence_polyframe_lat_long_new['EV_stations_geomery'] = 0

In [None]:
for i in tqdm(range(0,florence_polyframe_lat_long_new.shape[0])):
    florence_polyframe_lat_long_new.loc[i, 'EV_stations_counts'] = EV_stations[masks_EV[i]].shape[0]
    florence_polyframe_lat_long_new.loc[i, 'EV_stations_geomery'] = str(EV_stations[masks_EV[i]][['geometry']].values.tolist())

In [None]:
#How many EV stations in each grid
florence_polyframe_lat_long_new['EV_stations_counts'].value_counts()
#Percentage of EV stations in each grid
florence_polyframe_lat_long_new['EV_stations_counts'].value_counts() / florence_polyframe_lat_long_new.shape[0]

In [None]:
florence_polyframe_lat_long_new.head(5)

# Population data visualize and add to each grid

In [None]:
df_pop = pd.read_csv("/Users/claudiacortese/Desktop/ita_general_2020.csv")
gdf = gpd.GeoDataFrame(df_pop, geometry=gpd.points_from_xy(df_pop.longitude, df_pop.latitude))

In [None]:
df_pop.head()

In [None]:
#extract population data for Florence
florence.geometry[0]

In [None]:
# change crs to lat long
florence_lat_long = ox.project_gdf(florence, to_latlong=True)
pip_mask = gdf.within(florence_lat_long.loc[0, 'geometry'])

In [None]:
florence_pop_gdf = gdf.loc[pip_mask]

In [None]:
#plot population data with cmap
florence_pop_gdf.plot(figsize=(10,10))

In [None]:
florence_pop_gdf.to_csv(f"/Users/claudiacortese/Desktop/{name}-population.csv")

In [None]:
# population for each grid
masks_pop = []
for i in tqdm(range(0,florence_polyframe_lat_long.shape[0])):
    pip_mask = florence_pop_gdf.within(florence_polyframe_lat_long.loc[i, 'geometry'])
    masks_pop.append(pip_mask)

In [None]:
pop_grid = []
for i in range(0,florence_polyframe_lat_long.shape[0]):
    pop_grid.append(florence_pop_gdf.loc[masks_pop[i]].ita_general_2020.mean())

In [None]:
pop_grid = np.nan_to_num(pop_grid)

In [None]:
florence_polyframe_lat_long_new['population'] = pop_grid
florence_polyframe_lat_long_new.head()

In [None]:
plt.rcParams.update({'font.size':25})
west, south, east, north = florence_polyframe_lat_long_new.unary_union.bounds

fig, ax = plt.subplots(figsize=(20,20))
florence_polyframe_lat_long_new.plot(ax=ax, column = 'population', legend=False, cmap='magma')
# add EV stations
EV_stations.plot(ax=ax, color='green', markersize=30, marker='*', label='EV stations')
# add legend
plt.legend(['EV stations'])
plt.title(f'{PLACE_NAME} EV stations and population')
cbax = fig.add_axes([0.915, 0.175, 0.02, 0.7])

sm = plt.cm.ScalarMappable(cmap='magma', \
                          norm = plt.Normalize(vmin=min(florence_polyframe_lat_long_new.population), vmax=max(florence_polyframe_lat_long_new.population)))
sm._A = []
# draw colormap into cbax
fig.colorbar(sm, cax=cbax, format="%d")
ax.set_xlim(west, east)
ax.set_ylim(south, north)
# ax.axis('off')30
plt.show()

# Land Use

In [None]:
landuse = ox.geometries_from_place(PLACE_NAME, tags={'landuse': True})

# Define the categories
categories = {
    'residential': ['residential'],
    'commercial': ['commercial', 'retail', 'office', 'industrial'],
    'retail': ['retail'],
    'industrial': ['industrial']
}

# Create a new column to categorize the landuse types
landuse['category'] = 'Other'

# Iterate through the categories and assign the corresponding landuse types
for category, tags in categories.items():
    landuse.loc[landuse['landuse'].isin(tags), 'category'] = category

In [None]:
landuse.to_csv(f"/Users/claudiacortese/Desktop/{name}-landuse.csv")

In [None]:
#plot node in landuse
fig, ax = plt.subplots(figsize=(20,20))
landuse.plot(ax=ax, column = 'category', legend=True, cmap='magma')
plt.title(f'{PLACE_NAME} landuse')
plt.show()

In [None]:
#create a copy of landuse drop everything except Category and geometry
landuse_cat = landuse[['category', 'geometry']].copy()

In [None]:
import shapely.speedups
shapely.speedups.enable()

In [None]:
masks_land_use = []
for i in tqdm(range(0,florence_polyframe_lat_long.shape[0])):
    pip_mask = landuse_cat.within(florence_polyframe_lat_long.loc[i, 'geometry']) 
    masks_land_use.append(pip_mask)
    assert pip_mask.shape[0] == landuse_cat.shape[0]

In [None]:
land_labels = []
for i in range(0,florence_polyframe_lat_long.shape[0]):
    if landuse_cat[masks_land_use[i]].category.value_counts().isnull().all() == True:
        land_labels.append('Other')
    else:
        land_labels.append(landuse_cat[masks_land_use[i]].category.value_counts().index[0])

In [None]:
florence_polyframe_lat_long_new['landuse'] = land_labels

In [None]:
florence_polyframe_lat_long_new.landuse.value_counts()

In [None]:
plt.rcParams.update({'font.size':25})
west, south, east, north = florence_polyframe_lat_long_new.unary_union.bounds

fig, ax = plt.subplots(figsize=(20,20))
florence_polyframe_lat_long_new.plot(ax=ax, column = 'landuse', legend=False, cmap='magma')
# add EV stations
EV_stations.plot(ax=ax, color='green', markersize=30, marker='*', label='EV stations')
ax.legend()
# add legend
plt.title(f'{PLACE_NAME} and Landuse')
ax.set_xlim(west, east)
ax.set_ylim(south, north)
# ax.axis('off')30
plt.show()

In [None]:
florence_polyframe_lat_long_new.head()

## Road Connectivity

In [None]:
G = ox.graph_from_place(PLACE_NAME, network_type='drive')
ox.plot_graph(G, node_size= 0)

In [None]:
area,edges = ox.graph_to_gdfs(G)

In [None]:
#calculate number of edges and nodes for each grid
nodes = []
edges = []
density = []
for i in tqdm(range(0,florence_polyframe_lat_long.shape[0])):
    try: 
        poly = florence_polyframe_lat_long.loc[i, 'geometry']
        G = ox.graph_from_polygon(poly, network_type='drive')
        nodes.append(len(list(G.nodes())))
        edges.append(len(list(G.edges())))
        density.append(nx.density(G))
    except:
        nodes.append(0)
        edges.append(0)
        density.append(0)

In [None]:
#add nodes, edges and density to the dataframe
florence_polyframe_lat_long_new['nodes'] = nodes
florence_polyframe_lat_long_new['edges'] = edges
florence_polyframe_lat_long_new['density'] = density

In [None]:
# relation between EVCSs and nodes
plt.rcParams.update({'font.size':25})
west, south, east, north = florence_polyframe_lat_long_new.unary_union.bounds

#nodes
fig, ax = plt.subplots(figsize=(20,20))
florence_polyframe_lat_long_new.plot(ax=ax, column = 'nodes', legend=False, cmap='magma')

# add EV stations
EV_stations.plot(ax=ax, color='green', markersize=10, marker='*', label='EV stations')

ax.legend()
plt.title(f'{PLACE_NAME} and Nodes(Road-connectivity)')
ax.set_xlim(west, east)
ax.set_ylim(south, north)
plt.show()

In [None]:
plt.rcParams.update({'font.size':25})
west, south, east, north = florence_polyframe_lat_long_new.unary_union.bounds

#edges
fig, ax = plt.subplots(figsize=(20,20))
florence_polyframe_lat_long_new.plot(ax=ax, column = 'edges', legend=False, cmap='magma')

# add EV stations
EV_stations.plot(ax=ax, color='green', markersize=10, marker='*', label='EV stations')

ax.legend()
plt.title(f'{PLACE_NAME} and edges(Road-connectivity)')
ax.set_xlim(west, east)
ax.set_ylim(south, north)
plt.show()

In [None]:
plt.rcParams.update({'font.size':25})
west, south, east, north = florence_polyframe_lat_long_new.unary_union.bounds

#density
fig, ax = plt.subplots(figsize=(20,20))
florence_polyframe_lat_long_new.plot(ax=ax, column = 'density', legend=False, cmap='magma')

# add EV stations
EV_stations.plot(ax=ax, color='green', markersize=10, marker='*', label='EV stations')

ax.legend()
plt.title(f'{PLACE_NAME} and Road Density(Road-connectivity))')
ax.set_xlim(west, east)
ax.set_ylim(south, north)
plt.show()

In [None]:
florence_polyframe_lat_long_new.head()

# POIs

In [None]:
import osmnx as ox
import pandas as pd
from tqdm import tqdm

def count_features_in_grids(place_name, features, city_polyframe, dataframe_name):
    for feature_key, feature_list in features.items():
        for feature in feature_list:
            try:
                # Fetch geometries from OpenStreetMap
                geometries = ox.geometries_from_place(
                    place_name,
                    {feature_key: feature}
                )
                # Export to CSV if geometries are found
                if not geometries.empty:
                    geometries.to_csv(f"/Users/claudiacortese/Desktop/Florence-{feature}.csv")

                    # Create masks for each grid
                    masks = []
                    for i in tqdm(range(0, city_polyframe.shape[0]), desc=f"Processing masks for {feature}"):
                        pip_mask = geometries.within(city_polyframe.loc[i, 'geometry'])
                        masks.append(pip_mask)

                    # Count features in each grid
                    feature_counts = []
                    for i in tqdm(range(0, city_polyframe.shape[0]), desc=f"Counting {feature}"):
                        feature_counts.append(geometries[masks[i]].shape[0])

                    # Add counts to the dataframe
                    dataframe_name[f'{feature}_count'] = feature_counts
                else:
                    print(f"No data found for {feature_key}='{feature}' in {place_name}.")
            except Exception as e:
                print(f"Failed to process {feature_key}='{feature}' in {place_name}: {str(e)}")

In [None]:
features = {
    'amenity': ['school', 'university', 'restaurant', 'place_of_worship', 
                'community_centre', 'townhall', 'parking', 'library'],
    'leisure': ['park', 'cinema'],
    'building': ['commercial', 'government', 'civic', 'retail']
}

city_polyframe = florence_polyframe_lat_long_new

count_features_in_grids(PLACE_NAME, features, city_polyframe, florence_polyframe_lat_long_new)

In [None]:
florence_polyframe_lat_long_new.head().T

## Traffic

In [None]:
# Fetch road network data
florence_graph = ox.graph_from_place(PLACE_NAME, network_type='drive')

# Visualize the road network
ox.plot_graph(florence_graph)

In [None]:
# Convert the graph to edge GeoDataFrame
edges = ox.graph_to_gdfs(florence_graph, nodes=False, edges=True)

In [None]:
selected_columns = edges[['osmid', 'oneway', 'lanes', 'highway', 'maxspeed', 
                          'geometry']]

In [None]:
selected_columns.head()

In [None]:
# Drop rows where any cell in that row is NA
selected_columns = selected_columns.dropna()

In [None]:
# Convert road network to a projected CRS
road_network = selected_columns.to_crs(epsg=32633)

# Also ensure your polygons are in the same CRS
florence_polyframe_lat_long_new = florence_polyframe_lat_long_new.to_crs(epsg=32633)

In [None]:
# Calculate centroids safely
road_centers = road_network.centroid

In [None]:
def check_one_way(polygon, roads):
    # Recalculate centroids here if necessary
    road_centers = roads.centroid  # Ensure 'roads' is already in a projected CRS
    within_mask = road_centers.within(polygon)
    one_way_roads = roads[within_mask & roads['oneway']]
    return 'Yes' if not one_way_roads.empty else 'No'

# Apply the function
florence_polyframe_lat_long_new['oneway_exists'] = florence_polyframe_lat_long_new['geometry'].apply(check_one_way, roads=road_network)

In [None]:
florence_polyframe_lat_long_new.head().T

In [None]:
def check_highways(polygon, roads):
    # Calculate centroids for the roads and check if they fall within the given polygon
    road_centers = roads.centroid
    within_mask = road_centers.within(polygon)
    highways_within = roads.loc[within_mask, 'highway']

    # Flatten the list of highways if they are stored in lists within cells
    flattened_highways = []
    for item in highways_within.dropna():
        if isinstance(item, list):
            flattened_highways.extend(item)  # Extend the flat list with elements of the sublist
        else:
            flattened_highways.append(item)  # Append the item directly if it's not a list

    # Create a set of unique highway types, removing duplicates and handling unhashable types
    unique_highways = set(flattened_highways)
    return unique_highways if unique_highways else 'None'

In [None]:
florence_polyframe_lat_long_new['highway_types'] = florence_polyframe_lat_long_new['geometry'].apply(check_highways, roads=road_network)

In [None]:
florence_polyframe_lat_long_new.head().T

In [None]:
def aggregate_lanes(polygon, roads):
    road_centers = roads.centroid
    within_mask = road_centers.within(polygon)
    lanes_within = roads.loc[within_mask, 'lanes'].dropna()
    # Convert data to numeric, because 'lanes' might be stored as strings
    lanes_within = pd.to_numeric(lanes_within, errors='coerce')
    return lanes_within.mean()  # You could also consider sum, max, or other aggregations

In [None]:
florence_polyframe_lat_long_new['average_lanes'] = florence_polyframe_lat_long_new['geometry'].apply(aggregate_lanes, roads=road_network)

In [None]:
def aggregate_maxspeed(polygon, roads):
    road_centers = roads.centroid
    within_mask = road_centers.within(polygon)
    maxspeed_within = roads.loc[within_mask, 'maxspeed'].dropna()
    
    # Process each maxspeed entry, whether list or string, to get numeric values
    def process_maxspeed_entry(entry):
        if isinstance(entry, list):
            # Assuming the list has a string with the first element as the speed
            return pd.to_numeric(entry[0].split(' ')[0], errors='coerce')
        elif isinstance(entry, str):
            return pd.to_numeric(entry.split(' ')[0], errors='coerce')
        else:
            return pd.NA  # Pandas' NA for missing type-aware handling

    # Apply processing function to each item in the series
    maxspeed_numeric = maxspeed_within.apply(process_maxspeed_entry)
    return maxspeed_numeric.mean()  # Calculate mean of the cleaned, numeric maxspeed values

In [None]:
florence_polyframe_lat_long_new['average_maxspeed'] = florence_polyframe_lat_long_new['geometry'].apply(aggregate_maxspeed, roads=road_network)

In [None]:
florence_polyframe_lat_long_new['city'] = 'Florence'

In [None]:
florence_polyframe_lat_long_new.head().T

In [None]:
# Save the updated DataFrame to a CSV file
florence_polyframe_lat_long_new.to_csv(r"D:\BIG_DATA\charging_stations\task_2\florence.csv", index=False)