# Packages

In [None]:
# use environment.yml to avoid compatibility issues

In [None]:
import os
import fiona
import networkx as nx
import osmnx as ox
import pandas as pd
import geopandas as gpd
import matplotlib.pyplot as plt

import folium
from folium import GeoJson, LayerControl
from branca.colormap import linear

# Functions

In [None]:
def tag_flooded_roads(edges, nodes, flood_zones, layer):
    output_path = flood_cut_roads[layer]
    graphml_path = flood_safe_roads[layer]

    if os.path.exists(output_path) and layer in fiona.listlayers(output_path):
        print(f"Loading {layer} from {output_path}")
        edges = gpd.read_file(output_path, layer=layer)
    else:
        print(f"Tagging and saving {layer} to {output_path}")

        bounds = edges.total_bounds
        flood_subset = flood_zones.cx[bounds[0]:bounds[2], bounds[1]:bounds[3]]
        flood_geoms = flood_subset.geometry

        edges = edges.copy()
        edges["in_flood_zone"] = edges.geometry.apply(lambda geom: flood_geoms.intersects(geom).any())

        edges.to_file(output_path, layer=layer, driver="GPKG")

        
    # Filter out flooded edges
    if os.path.exists(graphml_path):
        print("Pruned graph already exists")
    else:    
        safe_edges = edges[~edges["in_flood_zone"]].copy()
        
        # Rebuild graph from safe edges
        print("Rebuilding pruned graph...")
        G_safe = ox.graph_from_gdfs(nodes, safe_edges)
        ox.save_graphml(G_safe, filepath=graphml_path)
        print(f"Saved pruned graph to {graphml_path}")

    return edges

In [None]:
def load_or_clip_flood_zone(return_crs, layer, source_path, clip_geom):
    output_path = flood_zones[layer]
    
    # Load layer if it exists
    if os.path.exists(output_path) and layer in fiona.listlayers(output_path):
        print(f"Loading {layer} from {output_path}")
        clipped = gpd.read_file(output_path, layer=layer).to_crs(return_crs)

    else:
        # Clip and save the original
        print(f"Clipping and saving {layer} from {output_path}" )
        flood = gpd.read_file(source_path).to_crs(return_crs)
        clipped = gpd.clip(flood, clip_geom)
        clipped.to_file(output_path, layer=layer, driver="GPKG")

    return clipped

In [None]:
def make_geojson_safe(gdf):
    gdf = gdf.copy()
    dt_cols = gdf.select_dtypes(include=['datetime64[ns]', 'datetime64[ns, UTC]']).columns
    gdf[dt_cols] = gdf[dt_cols].astype(str)
    for col in gdf.columns:
        if col != "geometry" and not pd.api.types.is_scalar(gdf[col].iloc[0]):
            gdf.drop(columns=[col], inplace=True)
    return gdf

In [None]:
def add_flood_zones_layer(gdf, name, color):
    if gdf.crs.to_epsg() != 4326:
        gdf = gdf.to_crs(epsg=4326)

    gdf_serializable = make_geojson_safe(gdf)

    style_function = lambda x: {
        'fillColor': color,
        'color': color,
        'weight': 1,
        'fillOpacity': 0.4
    }

    geojson = folium.GeoJson(
        data=gdf_serializable,
        name=f"Flood {name}",
        style_function=style_function,
        show=False
    )
    geojson.add_to(m)

In [None]:
def add_cut_roads_layer(edges, name, color):
    flooded = edges[edges["in_flood_zone"] == True]
    flooded = flooded.to_crs(epsg=4326)
    flooded = make_geojson_safe(flooded)

    style_function = lambda x: {
        'color': color,
        'weight': 2,
        'opacity': 0.8
    }

    geojson = folium.GeoJson(
        flooded,
        name=f"Flooded Roads {name}",
        style_function=style_function,
        show=False
    )
    geojson.add_to(m)

In [None]:
def parse_depth_range(val):
    if pd.isna(val):
        return None

    val = val.strip()

    if val.startswith('Below'):
        return float(val[5:].strip()) / 2

    if val.startswith('>'):
        return float(val[1:].strip())  # You may want to cap it

    if '-' in val:
        parts = val.split('-')
        try:
            low = float(parts[0].strip())
            high = float(parts[1].strip())
            return (low + high) / 2
        except:
            return None

    try:
        return float(val)
    except:
        return None

# Street data

In [None]:
output_dir = "processed_files"
os.makedirs(output_dir, exist_ok=True)

In [None]:
ox.settings.use_cache = True
ox.settings.log_console = True

In [None]:
polygon_path = os.path.join(output_dir, "study_area.geojson")

if os.path.exists(polygon_path):
    print("Loading saved province polygon...")
    study_area = gpd.read_file(polygon_path)
else:
    print("Downloading province polygon...")
    study_area = ox.geocode_to_gdf("Provincia de València, Comunitat Valenciana, España")
    study_area.to_file(polygon_path, driver="GeoJSON")

polygon = study_area.geometry.values[0]

In [None]:
graph_path = os.path.join(output_dir, "study_area_roads.graphml")

if os.path.exists(graph_path):
    print("Loading saved road network graph...")
    G = ox.load_graphml(graph_path)
else:
    print("Downloading road network graph...")
    G = ox.graph_from_polygon(polygon, network_type="drive")
    ox.save_graphml(G, filepath=graph_path)
    
nodes, edges = ox.graph_to_gdfs(G)
del G

In [None]:
rail=True
if rail:
    graph_path_rail = os.path.join(output_dir, "study_area_rail.graphml")
    if os.path.exists(graph_path_rail):
        print("Loading saved rail network graph...")
        G_rail = ox.load_graphml(graph_path_rail)
    else:
        print("Downloading rail network graph...")
        rail_filter = '["railway"~"rail|light_rail|subway|tram"]'
        G_rail = ox.graph_from_polygon(polygon, custom_filter=rail_filter, network_type="all")
        ox.save_graphml(G_rail, filepath=graph_path_rail)

    nodes_rail, edges_rail = ox.graph_to_gdfs(G_rail)
    del G_rail

# Layers

In [None]:
flood_zones = {
    "10 yr": f"{output_dir}/flood_risk_zones.gpkg",
    "100 yr": f"{output_dir}/flood_risk_zones.gpkg",
    "500 yr": f"{output_dir}/flood_risk_zones.gpkg",
    "DANA": f"{output_dir}/DANA_zones.gpkg",
    "DANA depth": f"{output_dir}/DANA_depths.gpkg"
}

flood_cut_roads = {
    "10 yr": f"{output_dir}/flood_risk_cut_roads.gpkg",
    "100 yr": f"{output_dir}/flood_risk_cut_roads.gpkg",
    "500 yr": f"{output_dir}/flood_risk_cut_roads.gpkg",
    "DANA": f"{output_dir}/DANA_cut_roads.gpkg",
}

flood_safe_roads = {
    "10 yr": f"{output_dir}/flood_risk_10_safe_roads.graphml",
    "100 yr": f"{output_dir}/flood_risk__100_safe_roads.graphml",
    "500 yr": f"{output_dir}/flood_risk__500_safe_roads.graphml",
    "DANA": f"{output_dir}/DANA_safe_roads.graphml",
}

# Floodable zones

In [None]:
flood_risk_zones_10  = load_or_clip_flood_zone(edges.crs, "10 yr", "source_files/laminaspb-q10/Q10_2Ciclo_PB_20241121.shp", polygon)
flood_risk_zones_100 = load_or_clip_flood_zone(edges.crs, "100 yr", "source_files/laminaspb-q100/Q100_2Ciclo_PB_20241121_ETRS89.shp", polygon)
flood_risk_zones_500 = load_or_clip_flood_zone(edges.crs, "500 yr", "source_files/laminaspb-q500/Q500_2Ciclo_PB_20241121_ETRS89.shp", polygon)

In [None]:
edges_with_flood_10 = tag_flooded_roads(edges, nodes, flood_risk_zones_10, "10 yr")
edges_with_flood_100 = tag_flooded_roads(edges, nodes, flood_risk_zones_100, "100 yr")
edges_with_flood_500 = tag_flooded_roads(edges, nodes, flood_risk_zones_500, "500 yr")

# Flooded Area

In [None]:
output_dir = "processed_files"
os.makedirs(output_dir, exist_ok=True)

In [None]:
# Load DANA flood zones (update paths with your source shapefiles)
flood_zones_DANA  = load_or_clip_flood_zone(edges.crs, "DANA", "source_files/EMSR773_AOI01_DEL_PRODUCT_v1/EMSR773_AOI01_DEL_PRODUCT_observedEventA_v1.shp", polygon)

In [None]:
edges_with_flood_DANA = tag_flooded_roads(edges, nodes, flood_zones_DANA, "DANA")

In [None]:
layer="DANA depth"
output_path = flood_zones[layer]

if os.path.exists(output_path) and layer in fiona.listlayers(output_path):
    print(f"Loading {layer} from {output_path}")
    DANA_flood_depth=gpd.read_file(output_path, layer=layer)
else:
    print(f"Saving {layer} to {output_path}")
    DANA_flood_depth = gpd.read_file("source_files/EMSR773_AOI01_DEL_PRODUCT_v1/EMSR773_AOI01_DEL_PRODUCT_floodDepthA_v1.shp")
    DANA_flood_depth["depth_val"] = DANA_flood_depth["value"].apply(parse_depth_range)
    DANA_flood_depth.to_file(output_path, layer=layer, driver="GPKG")
    print(f"Saved processed {layer} in {output_path}")

# Interactive Visualization

In [None]:
# Set initial position
projected = study_area.to_crs(epsg=25830)
centroid_projected = projected.geometry.centroid.iloc[0]
centroid_latlon = gpd.GeoSeries([centroid_projected], crs=25830).to_crs(epsg=4326).geometry.iloc[0]
map_center = [centroid_latlon.y, centroid_latlon.x]

# Create map and define layer colors
m = folium.Map(location=map_center, zoom_start=10, tiles="CartoDB positron", max_bounds=True)

# Calculate bounds of the study area in WGS84
bounds_wgs84 = study_area.to_crs(epsg=4326).total_bounds
# Format as [[southwest], [northeast]]
map_bounds = [[bounds_wgs84[1], bounds_wgs84[0]], [bounds_wgs84[3], bounds_wgs84[2]]]

# Create map centered initially (still needed for initialization)
m = folium.Map(location=map_center, zoom_start=10, tiles="CartoDB positron")

# After adding all your layers, zoom to bounds
m.fit_bounds(map_bounds)


In [None]:
flood_colors = {
    "10 yr": "#56B4E9",   # Sky Blue
    "100 yr": "#009E73",  # Bluish Green
    "500 yr": "#E69F00",  # Orange
    "DANA": "#CC79A7"  # Reddish Purple
}

# Add flood zones
add_flood_zones_layer(flood_risk_zones_10, "10 yr", flood_colors["10 yr"])
add_flood_zones_layer(flood_risk_zones_100, "100 yr", flood_colors["100 yr"])
add_flood_zones_layer(flood_risk_zones_500, "500 yr", flood_colors["500 yr"])
add_flood_zones_layer(flood_zones_DANA, "DANA", flood_colors["DANA"])
    
# Add flooded roads (optional)
add_cut_roads_layer(edges_with_flood_10, "10 yr", flood_colors["10 yr"])
add_cut_roads_layer(edges_with_flood_100, "100 yr", flood_colors["100 yr"])
add_cut_roads_layer(edges_with_flood_500, "500 yr", flood_colors["500 yr"])
add_cut_roads_layer(edges_with_flood_DANA, "DANA", flood_colors["DANA"])
    


In [None]:
depth_map = True
if depth_map:
    min_depth = DANA_flood_depth["depth_val"].min()
    max_depth = DANA_flood_depth["depth_val"].max()
    depth_colormap = linear.YlGnBu_09.scale(min_depth, max_depth)
    depth_colormap.caption = 'Flood Depth (m)'

    folium.GeoJson(
        DANA_flood_depth,
        name="DANA flood depth",
        style_function=lambda feature: {
            'fillColor': depth_colormap(feature['properties']['depth_val']),
            'color': 'black',
            'weight': 0.5,
            'fillOpacity': 0.7
        },
        tooltip=folium.GeoJsonTooltip(fields=["depth_val"], aliases=["Depth (m):"])
    ).add_to(m)

    depth_colormap.add_to(m)

In [None]:
# Add toggle control
folium.LayerControl(collapsed=False).add_to(m)

# Save to file
m.save("processed_files/flood_map_valencia.html")

# Old Code