In [None]:
import streamlit as st
import osmnx as ox
import networkx as nx
import numpy as np
import folium
import os
from folium.features import DivIcon
from streamlit_folium import folium_static
import matplotlib.cm as cm
import matplotlib.colors as colors
from scipy.ndimage import uniform_filter1d
import pandas as pd
import matplotlib.pyplot as plt
from typing import Dict, Tuple, List, Optional, Union, Any
from geopy.distance import geodesic
import heapq
from dataclasses import dataclass

In [None]:
# Configuration constants
MIN_DISTANCE_KM = 5
MAX_DISTANCE_KM = 150
DISTANCE_STEP = 5
SEARCH_RADIUS_BUFFER = 1.3  # Buffer multiplier for search radius
OPENTOPODATA_URL = "https://api.opentopodata.org/v1/eudem25m?locations={locations}"
BATCH_SIZE = 400
API_PAUSE = 0.1  # Pause between API calls in seconds
ROLLING_MEAN_WINDOW = 5  # Window size for smoothing grades


In [None]:
@dataclass
class ClimbSection:
    """Class to represent a climb section with relevant metrics."""
    edges: List[Tuple]  # List of (u, v, k) edge identifiers
    nodes: List  # List of node IDs that make up the path
    length_m: float  # Total length in meters
    elevation_gain_m: float  # Total elevation gain in meters
    avg_grade: float  # Average grade as percentage
    max_grade: float  # Maximum grade as percentage
    start_coords: Tuple[float, float]  # (lat, lon) of start
    end_coords: Tuple[float, float]  # (lat, lon) of end
    geometry: List[Tuple[float, float]]  # List of (lat, lon) coordinates along the path
    
    def get_score(self) -> float:
        """Calculate a score for ranking climbs based on length, grade and elevation gain."""
        # Score climbs by prioritizing longer, steeper sections with significant gain
        return (self.elevation_gain_m * (1 + self.avg_grade / 10))

class SteepClimbFinder:
    """Class to find steep, meaningful climbs within a specified area."""
    
    def __init__(self, 
                 location: str, 
                 radius_km: float, 
                 min_climb_length_m: float = 200, 
                 min_elevation_gain_m: float = 30,
                 min_avg_grade: float = 5.0,
                 max_results: int = 10):
        """
        Initialize the steep climb finder.
        
        Args:
            location: The starting location as a string
            radius_km: Search radius in kilometers
            min_climb_length_m: Minimum length of a climb in meters
            min_elevation_gain_m: Minimum elevation gain threshold in meters
            min_avg_grade: Minimum average grade to consider (percent)
            max_results: Maximum number of results to return
        """
        self.location = location
        self.radius_km = radius_km
        self.min_climb_length_m = min_climb_length_m
        self.min_elevation_gain_m = min_elevation_gain_m
        self.min_avg_grade = min_avg_grade
        self.max_results = max_results
        
        # Initialize with None; will be populated in fetch_network
        self.G = None
        self.center_point = None
        self.climbs = []
    
    def fetch_network(self) -> None:
        """Fetch the street network and add elevation data."""
        try:
            print(f"Fetching network around {self.location} with {self.radius_km}km radius...")
            
            # Get center point coordinates
            self.center_point = ox.geocode(self.location)
            
            # Convert radius to meters and fetch network
            radius_m = self.radius_km * 1000
            
            # For cycling network (prioritizing roads over paths)
            self.G = ox.graph_from_point(
                self.center_point, 
                dist=radius_m, 
                network_type="drive",  # Use "drive" to focus on roads
                simplify=True
            )
            
            print(f"Network fetched with {len(self.G.nodes)} nodes and {len(self.G.edges)} edges")
            
            # Add elevation data
            try:
                # Try local elevation service first if available
                local_elevation_url = "http://localhost:5001/v1/eudem25m?locations={locations}"
                
                print("Adding elevation data...")
                try:
                    original_elevation_url = ox.settings.elevation_url_template
                    ox.settings.elevation_url_template = local_elevation_url
                    
                    self.G = ox.elevation.add_node_elevations_google(
                        self.G, 
                        api_key=None,
                        batch_size=500,
                        pause=0.01
                    )
                    print("Added elevation data from local service")
                    local_success = True
                except Exception as e:
                    print(f"Local elevation service failed: {str(e)}. Trying public API...")
                    local_success = False
                    
                # If local service failed, try the public API
                if not local_success:
                    public_api_url = "https://api.opentopodata.org/v1/eudem25m?locations={locations}"
                    ox.settings.elevation_url_template = public_api_url
                    
                    self.G = ox.elevation.add_node_elevations_google(
                        self.G, 
                        api_key=None,
                        batch_size=100, 
                        pause=1.0
                    )
                    print("Added elevation data from public API")
                
                # Calculate edge grades
                self.G = ox.elevation.add_edge_grades(self.G)
                
                # Verify elevation data quality
                elevations = [data.get('elevation', 0) for _, data in self.G.nodes(data=True)]
                if elevations:
                    min_elev = min(elevations)
                    max_elev = max(elevations)
                    elevation_range = max_elev - min_elev
                    
                    if elevation_range < 10.0:
                        print(f"Very flat terrain detected (elevation range: {elevation_range:.1f}m). " 
                                  f"May be difficult to find steep climbs.")
                    else:
                        print(f"Terrain elevation range: {min_elev:.1f}m to {max_elev:.1f}m " 
                               f"(range: {elevation_range:.1f}m)")
                
                # Restore original settings if modified
                if 'original_elevation_url' in locals():
                    ox.settings.elevation_url_template = original_elevation_url
                    
            except Exception as e:
                print(f"Error adding elevation data: {str(e)}")
                raise
                
        except Exception as e:
            print(f"Error fetching network: {str(e)}")
            import traceback
            print(f"Traceback: {traceback.format_exc()}")
            raise
        return self.G

        
    def find_climbs(self) -> List[ClimbSection]:
        """
        Find steep climbs that meet the criteria.
        
        Returns:
            List of ClimbSection objects representing the identified climbs
        """
        if self.G is None:
            print("Network not fetched. Please fetch network first.")
            return []
        
        print("Analyzing network for steep climbs...")
        
        # 1. Find uphill edges with sufficient grade
        steep_edges = []
        
        # First pass: identify candidate edges
        for u, v, k, data in self.G.edges(keys=True, data=True):
            # Skip edges without grade data
            if 'grade' not in data:
                continue
                
            grade = data['grade'] * 100
            length = data.get('length', 0)
            
            # Collect uphill edges meeting minimum grade
            if grade >= self.min_avg_grade and length > 0:
                # Store as (u, v, k) with metadata
                steep_edges.append((u, v, k, {
                    'grade': grade,
                    'length': length,
                    'elevation_gain': length * grade / 100
                }))
        
        # 2. Find connected components/sequences of steep edges
        # Sort edges by node ID to help identify connected segments
        steep_edges.sort()
        
        # Create a specialized graph of just the steep edges
        steep_graph = nx.DiGraph()
        
        # Add edges to the steep graph
        for u, v, k, data in steep_edges:
            steep_graph.add_edge(u, v, key=k, **data)
            
            # Add node attributes using data from original graph
            steep_graph.nodes[u].update({
                'x': self.G.nodes[u].get('x'),
                'y': self.G.nodes[u].get('y'),
                'elevation': self.G.nodes[u].get('elevation', 0)
            })
            steep_graph.nodes[v].update({
                'x': self.G.nodes[v].get('x'),
                'y': self.G.nodes[v].get('y'),
                'elevation': self.G.nodes[v].get('elevation', 0)
            })
        
        # 3. Identify climb paths using depth-first traversal
        climb_paths = []
        
        # Visit each node as a potential start of a climb
        for start_node in steep_graph.nodes():
            # Skip nodes with no outgoing edges
            if steep_graph.out_degree(start_node) == 0:
                continue
                
            # Perform depth-first traversal to find climb paths
            self._find_climb_paths_from_node(
                steep_graph, 
                start_node, 
                [], 
                0, 
                0, 
                0, 
                set(),
                climb_paths
            )
        
        # 4. Convert paths to ClimbSection objects
        climbs = []
        
        for path, data in climb_paths:
            # Skip if path doesn't meet minimum requirements
            if (data['length'] < self.min_climb_length_m or 
                data['elevation_gain'] < self.min_elevation_gain_m or
                data['avg_grade'] < self.min_avg_grade):
                continue
            
            # Get coordinates for visualization
            geometry = []
            try:
                # Try to get detailed geometry if available
                for i in range(len(path) - 1):
                    u, v = path[i], path[i+1]
                    edge_data = steep_graph[u][v]
                    
                    if 'geometry' in edge_data:
                        # LineString from OSMnx has (lon, lat) ordering
                        geom = edge_data['geometry']
                        # Convert to (lat, lon) for Folium
                        coords = [(lat, lon) for lon, lat in geom.coords]
                        geometry.extend(coords)
                    else:
                        # Fallback to node coordinates
                        geometry.append((steep_graph.nodes[u]['y'], steep_graph.nodes[u]['x']))
                        geometry.append((steep_graph.nodes[v]['y'], steep_graph.nodes[v]['x']))
            except Exception:
                # Fallback: use node coordinates directly
                geometry = [(steep_graph.nodes[n]['y'], steep_graph.nodes[n]['x']) for n in path]
            
            # Remove duplicate consecutive points
            clean_geometry = []
            for point in geometry:
                if not clean_geometry or point != clean_geometry[-1]:
                    clean_geometry.append(point)
            
            # Create start and end coordinates
            if clean_geometry:
                start_coords = clean_geometry[0]
                end_coords = clean_geometry[-1]
            else:
                # Fallback
                start_coords = (steep_graph.nodes[path[0]]['y'], steep_graph.nodes[path[0]]['x'])
                end_coords = (steep_graph.nodes[path[-1]]['y'], steep_graph.nodes[path[-1]]['x'])
            
            # Create edge identification tuples
            edges = []
            for i in range(len(path) - 1):
                u, v = path[i], path[i+1]
                # Get the key (k) for this edge
                for k in steep_graph[u][v]:
                    edges.append((u, v, k))
                    break
            
            # Create ClimbSection object
            climb = ClimbSection(
                edges=edges,
                nodes=path,
                length_m=data['length'],
                elevation_gain_m=data['elevation_gain'],
                avg_grade=data['avg_grade'],
                max_grade=data['max_grade'],
                start_coords=start_coords,
                end_coords=end_coords,
                geometry=clean_geometry
            )
            climbs.append(climb)
        
        # Sort climbs by score (descending)
        climbs.sort(key=lambda x: x.get_score(), reverse=True)
        
        # Limit to max_results
        self.climbs = climbs[:self.max_results]
        
        print(f"Found {len(self.climbs)} significant climbs")
        return self.climbs
    
    def _find_climb_paths_from_node(self, 
                                    graph: nx.DiGraph, 
                                    current: Any, 
                                    path: List, 
                                    length: float, 
                                    gain: float, 
                                    max_grade: float,
                                    visited: set,
                                    results: List) -> None:
        """
        Recursively find climb paths from a starting node using depth-first traversal.
        
        Args:
            graph: NetworkX graph of steep edges
            current: Current node
            path: Current path (list of nodes)
            length: Accumulated path length
            gain: Accumulated elevation gain
            max_grade: Maximum grade encountered
            visited: Set of visited nodes (to prevent cycles)
            results: List to collect results
        """
        # Add current node to path
        path = path + [current]
        visited = visited.copy()
        visited.add(current)
        
        # If path has more than one node, it's a potential climb
        if len(path) > 1:
            # Calculate average grade for the current path
            avg_grade = (gain / length * 100) if length > 0 else 0
            
            # Record as a potential climb if it meets minimum requirements
            if (length >= self.min_climb_length_m and 
                gain >= self.min_elevation_gain_m and 
                avg_grade >= self.min_avg_grade):
                
                # Store as (path, metadata)
                results.append((path.copy(), {
                    'length': length,
                    'elevation_gain': gain,
                    'avg_grade': avg_grade,
                    'max_grade': max_grade
                }))
        
        # Get neighbor nodes (outgoing edges)
        for neighbor in graph.successors(current):
            # Skip if already visited (avoid cycles)
            if neighbor in visited:
                continue
                
            # Get edge data
            edge_data = graph[current][neighbor]
            
            # OSMnx might have multiple edges between same nodes, get first one
            if isinstance(edge_data, dict) and not isinstance(edge_data, nx.classes.coreviews.AtlasView):
                # Single edge
                edge = edge_data
            else:
                # Multiple edges, get the one with highest grade
                edge = max(edge_data.values(), key=lambda x: x.get('grade', 0))
            
            # Get edge attributes
            edge_length = edge.get('length', 0)
            edge_grade = edge.get('grade', 0)
            edge_gain = edge_length * edge_grade / 100
            
            # Update max grade
            new_max_grade = max(max_grade, edge_grade)
            
            # Only continue if this edge is uphill
            if edge_grade >= self.min_avg_grade:
                # Recursively explore from neighbor
                self._find_climb_paths_from_node(
                    graph,
                    neighbor,
                    path,
                    length + edge_length,
                    gain + edge_gain,
                    new_max_grade,
                    visited,
                    results
                )
    
    def visualize_climbs(self) -> folium.Map:
        """
        Create a visualization of the identified climbs.
        
        Returns:
            folium.Map object with visualized climbs
        """
        if not self.climbs:
            print("No climbs to visualize. Run find_climbs() first.")
            return None
        
        # Find center point from original center or first climb
        if self.center_point:
            map_center = (self.center_point[0], self.center_point[1])
        elif self.climbs:
            map_center = self.climbs[0].start_coords
        else:
            # Fallback to a default center from network
            nodes_df = ox.graph_to_gdfs(self.G, edges=False)
            map_center = (nodes_df['y'].mean(), nodes_df['x'].mean())
        
        # Create map
        m = folium.Map(
            location=map_center,
            zoom_start=14,
            tiles="cartodbpositron"
        )
        
        # Create colormap for climbs
        cmap = cm.get_cmap('plasma', len(self.climbs))
        
        # Add circle marker for center point
        folium.CircleMarker(
            location=map_center,
            radius=10,
            color='blue',
            fill=True,
            fill_opacity=0.7,
            tooltip=f"Center: {self.location}"
        ).add_to(m)
        
        # Add circles to show search radius
        folium.Circle(
            location=map_center,
            radius=self.radius_km * 1000,  # Convert to meters
            color='blue',
            weight=2,
            fill=False,
            opacity=0.5,
            tooltip=f"Search radius: {self.radius_km}km"
        ).add_to(m)
        
        # Add each climb with a different color
        for i, climb in enumerate(self.climbs):
            # Generate color for this climb
            color = colors.rgb2hex(cmap(i / len(self.climbs)))
            
            # Add polyline for climb
            folium.PolyLine(
                climb.geometry,
                color=color,
                weight=5,
                opacity=0.8,
                tooltip=f"Climb {i+1}: {climb.length_m:.0f}m, {climb.avg_grade:.1f}%, {climb.elevation_gain_m:.0f}m gain"
            ).add_to(m)
            
            # Add markers for start and end
            folium.Marker(
                climb.start_coords,
                icon=folium.Icon(icon='play', color='green'),
                tooltip=f"Start of climb {i+1}"
            ).add_to(m)
            
            folium.Marker(
                climb.end_coords,
                icon=folium.Icon(icon='flag', color='red'),
                tooltip=f"End of climb {i+1}"
            ).add_to(m)
            
            # Add a label with climb number
            folium.map.Marker(
                climb.start_coords,
                icon=DivIcon(
                    icon_size=(20, 20),
                    icon_anchor=(0, 0),
                    html=f'<div style="font-size: 12pt; color: white; background-color: {color}; border-radius: 50%; width: 25px; height: 25px; text-align: center; line-height: 25px;">{i+1}</div>'
                )
            ).add_to(m)
        
        # Add legend
        legend_html = """
        <div style="position: fixed; bottom: 50px; left: 50px; z-index: 1000; background-color: white; 
                    padding: 10px; border: 2px solid grey; border-radius: 5px">
            <p><b>Climbs by Rank</b></p>
        """
        
        # Add entry for each climb
        for i, climb in enumerate(self.climbs):
            color = colors.rgb2hex(cmap(i / len(self.climbs)))
            legend_html += f"""
            <div style="display: flex; align-items: center; margin-bottom: 5px;">
                <span style="background-color: {color}; width: 20px; height: 20px; display: inline-block; margin-right: 5px;"></span>
                <span>#{i+1}: {climb.length_m:.0f}m @ {climb.avg_grade:.1f}% ({climb.elevation_gain_m:.0f}m gain)</span>
            </div>
            """
            
        legend_html += "</div>"
        m.get_root().html.add_child(folium.Element(legend_html))
        
        return m

In [None]:


# Steep Climb Finder Tab
st.markdown(
    """
    ## Steep Climb Finder
    Discover significant uphill segments within a specified radius. Perfect for finding the steepest, 
    most challenging climbs near you for training.
    """
)


# Location input
climb_location = 'Quai Gustave Ador 23, 1207 Genève'

# Search radius
radius_km = 8
min_climb_length_m = 200
min_elevation_gain_m = 10
min_avg_grade = 2
max_results = 5

In [None]:
climb_finder = SteepClimbFinder(
            location=climb_location,
            radius_km=radius_km,
            min_climb_length_m=min_climb_length_m,
            min_elevation_gain_m=min_elevation_gain_m,
            min_avg_grade=min_avg_grade,
            max_results=max_results
        )
        

In [None]:
G = climb_finder.fetch_network()

In [None]:
climbs = climb_finder.find_climbs()

In [None]:
climbs

In [None]:
grades = pd.Series([d["grade_abs"] for _, _, d in ox.convert.to_undirected(G).edges(data=True)])
grades = grades.replace([np.inf, -np.inf], np.nan).dropna()

In [None]:
grades

In [None]:
avg_grade = np.mean(grades) * 100

In [None]:
avg_grade

In [None]:
ec = ox.plot.get_edge_colors_by_attr(G, "grade_abs", cmap="plasma", num_bins=12, equal_size=True)
fig, ax = ox.plot.plot_graph(G, edge_color=ec, edge_linewidth=1, node_size=0, bgcolor="k")

In [None]:
steep_edges = []

for u, v, k, data in G.edges(keys=True, data=True):
    # Skip edges without grade data
    if 'grade' not in data:
        print('No grade in data')
        continue
        
    grade = data['grade'] *100
    length = data.get('length', 0)
    # Collect uphill edges meeting minimum grade
    if grade >= min_avg_grade and length > 0:
        # Store as (u, v, k) with metadata
        steep_edges.append((u, v, k, {
            'grade': grade,
            'length': length,
            'elevation_gain': length * grade / 100
        }))

In [None]:
steep_edges

In [None]:
min_avg_grade

In [None]:
climbs = climb_finder.find_climbs()

In [None]:

# Button to find climbs
if st.sidebar.button("Find Steep Climbs", type="primary", key="find_climbs_button"):
    if not climb_location:
        print("Please enter a valid location.")
        return
        
    try:
        # Initialize steep climb finder
        climb_finder = SteepClimbFinder(
            location=climb_location,
            radius_km=radius_km,
            min_climb_length_m=min_climb_length_m,
            min_elevation_gain_m=min_elevation_gain_m,
            min_avg_grade=min_avg_grade,
            max_results=max_results
        )
        
        # Fetch network and add elevation data
        with st.spinner("Fetching network and elevation data..."):
            climb_finder.fetch_network()
        
        # Find steep climbs
        with st.spinner("Analyzing network for steep climbs..."):
            climbs = climb_finder.find_climbs()
        
        if not climbs:
            print("No significant climbs found that match your criteria. Try adjusting parameters or choosing a different location.")
            return
            
        # Create a variable to track selected climb for map centering
        if 'selected_climb_index' not in st.session_state:
            st.session_state.selected_climb_index = 0
        
        # Display visualization controls above the columns
        climb_sort = st.radio(
            "Sort climbs by:",
            options=["Score (default)", "Length", "Grade", "Elevation Gain"],
            horizontal=True,
            key="climb_sort"
        )
        
        # Sort climbs based on selection
        if climb_sort == "Length":
            climbs.sort(key=lambda x: x.length_m, reverse=True)
        elif climb_sort == "Grade":
            climbs.sort(key=lambda x: x.avg_grade, reverse=True)
        elif climb_sort == "Elevation Gain":
            climbs.sort(key=lambda x: x.elevation_gain_m, reverse=True)
        # "Score" is default, already sorted
        
        # Export options
        export_format = st.selectbox(
            "Export climbs as:",
            options=["None", "GPX", "CSV", "GeoJSON"],
            index=0,
            key="export_format"
        )
        
        if export_format != "None":
            export_data = prepare_export_data(climbs, export_format)
            st.download_button(
                f"Download {export_format} file",
                export_data,
                file_name=f"steep_climbs_{climb_location.replace(' ', '_')}.{export_format.lower()}",
                mime=get_mime_type(export_format)
            )
        
        # Display climb statistics
        with climb_col1:
            st.subheader("Climb Statistics")
            
            # Summary table of all climbs
            climb_data = []
            for i, climb in enumerate(climbs):
                climb_data.append({
                    "Rank": i+1,
                    "Length (m)": f"{climb.length_m:.0f}",
                    "Gain (m)": f"{climb.elevation_gain_m:.0f}",
                    "Avg Grade (%)": f"{climb.avg_grade:.1f}",
                    "Max Grade (%)": f"{climb.max_grade:.1f}"
                })
            
            st.dataframe(
                climb_data, 
                use_container_width=True,
                hide_index=True
            )
            
            st.markdown("---")
            st.markdown("### Detailed Climb Information")
            
            # Create an expandable section for each climb
            for i, climb in enumerate(climbs):
                with st.expander(f"Climb #{i+1}: {climb.length_m:.0f}m @ {climb.avg_grade:.1f}%"):
                    st.metric("Distance", f"{climb.length_m:.0f} m")
                    st.metric("Elevation Gain", f"{climb.elevation_gain_m:.0f} m")
                    st.metric("Average Grade", f"{climb.avg_grade:.1f}%")
                    st.metric("Maximum Grade", f"{climb.max_grade:.1f}%")
                    
                    # Calculate difficulty score based on length, grade, and elevation gain
                    difficulty_score = climb.get_score()
                    scaled_score = min(5, max(1, int(difficulty_score / 200)))
                    fire_emoji = "🔥" * scaled_score
                    st.markdown(f"**Difficulty:** {fire_emoji}")
                    
                    # Add button to center map on this climb
                    if st.button(f"Center map on this climb", key=f"center_climb_{i}"):
                        st.session_state.selected_climb_index = i
    except Exception as e:
        # Handle the exception
        print(f"An error occurred: {str(e)}")