In [13]:
import numpy as np
import pandas as pd
import osmnx as ox
import geopandas as gpd
from shapely.geometry import Point, LineString, box
import networkx as nx
from dataclasses import dataclass
import ast
import rtree
import logging
from typing import List, Dict, Tuple, Optional
from math import pi
import folium
from folium.plugins import MarkerCluster
from folium.features import DivIcon
from branca.colormap import LinearColormap
import os
from tqdm import tqdm

SIGMA_Z = 15.0  # Increased sigma_z for more tolerance in emission
MAX_DISTANCE = 50.0  # Increased max_distance for broader candidate search
TURN_ANGLE_THRESHOLD = pi / 4  # 45 degrees threshold for transition penalty
MIN_TRANSITION_PROB = 1e-5  # Non-zero transition probability for flexibility

class EnhancedViterbiMatcher:
    def __init__(self, graph, edges_gdf, config=None):
        """Initialize matcher with improved configuration"""
        self.graph = graph
        self.edges_gdf = edges_gdf.copy()
        
        if isinstance(self.edges_gdf.index, pd.MultiIndex):
            self.edges_gdf = self.edges_gdf.reset_index(drop=True)
        self.edges_gdf.index = range(len(self.edges_gdf))
        
        # Enhanced default configuration
        default_config = {
            'max_candidates': 20,          # Increased from 10
            'max_distance': 100.0,         # Increased from 50.0
            'sigma_z': 50.0,              # Adjusted for better GPS noise handling
            'beta': 2.0,                  # Increased for better transition scoring
            'min_prob_norm': 1e-7,        # Lowered for more flexibility
            'max_speed': 50.0,            # Maximum expected speed (m/s)
            'min_speed': 0.1,             # Minimum expected speed (m/s)
            'angle_tolerance': np.pi/2,    # 90 degrees angle tolerance
            'max_angle_penalty': 0.5,      # Maximum penalty for sharp turns
            'distance_decay': 0.85,        # Distance decay factor
            'sequential_matching': True    # Enable sequential matching for long trajectories
        }
        
        if config:
            default_config.update(config)
        self.config = default_config
        
        self._init_spatial_index()
        self.edge_to_nodes = self._build_edge_to_nodes()
        self.node_to_edges = self._build_node_to_edges()
        
        self.logger = logging.getLogger(__name__)
    
    def _init_spatial_index(self):
        """Initialize R-tree spatial index with improved error handling"""
        try:
            self.spatial_index = rtree.index.Index()
            for idx, edge in self.edges_gdf.iterrows():
                if edge.geometry is not None and not edge.geometry.is_empty:
                    self.spatial_index.insert(idx, edge.geometry.bounds)
        except Exception as e:
            self.logger.error(f"Error initializing spatial index: {str(e)}")
            raise

    def _build_edge_to_nodes(self) -> Dict[int, set]:
        """Build mapping from edge IDs to their endpoint nodes with validation"""
        edge_to_nodes = {}
        for idx, edge in self.edges_gdf.iterrows():
            if edge.geometry is not None and not edge.geometry.is_empty:
                coords = list(edge.geometry.coords)
                if len(coords) >= 2:  # Ensure valid linestring
                    edge_to_nodes[idx] = {
                        self._get_node_id(coords[0]),
                        self._get_node_id(coords[-1])
                    }
        return edge_to_nodes

    def _build_node_to_edges(self) -> Dict[tuple, set]:
        """Build mapping from nodes to connected edge IDs"""
        node_to_edges = {}
        for edge_id, nodes in self.edge_to_nodes.items():
            for node in nodes:
                if node not in node_to_edges:
                    node_to_edges[node] = set()
                node_to_edges[node].add(edge_id)
        return node_to_edges

    def _get_node_id(self, coord: tuple) -> tuple:
        """Convert coordinate to node ID with improved precision"""
        return tuple(round(x, 6) for x in coord)

    def _find_candidates(self, point: Point) -> List[dict]:
        """Enhanced candidate finding with adaptive search radius"""
        candidates = []
        initial_distance = 30.0  # Start with a reduced search radius
        max_attempts = 3
        current_distance = initial_distance
        
        for attempt in range(max_attempts):
            bounds = (
                point.x - current_distance,
                point.y - current_distance,
                point.x + current_distance,
                point.y + current_distance
            )
            
            for idx in self.spatial_index.intersection(bounds):
                edge = self.edges_gdf.loc[idx]
                if edge.geometry is not None:
                    dist = point.distance(edge.geometry)
                    if dist <= current_distance:
                        proj_point = edge.geometry.interpolate(
                            edge.geometry.project(point)
                        )
                        candidates.append({
                            'edge_id': idx,
                            'distance': dist,
                            'proj_point': proj_point,
                            'edge': edge
                        })
            
            if candidates:
                break
                
            current_distance *= 1.5  # Increase search radius for next attempt
        
        candidates.sort(key=lambda x: x['distance'])
        return candidates[:self.config['max_candidates']]

    def _calculate_emission_prob(self, point: Point, candidate: dict) -> float:
        """Enhanced emission probability calculation with improved scaling"""
        distance = candidate['distance']
        sigma_z = self.config['sigma_z']
        
        # Distance-based probability with decay
        distance_factor = np.exp(-distance * self.config['distance_decay'])
        
        # Gaussian probability
        gaussian_prob = np.exp(-0.5 * (distance / sigma_z) ** 2)
        
        # Combined probability
        prob = gaussian_prob * distance_factor
        
        return max(prob, self.config['min_prob_norm'])

    def _calculate_transition_prob(self, prev_edge: int, curr_edge: int,
                                 prev_point: Point, curr_point: Point) -> float:
        """Enhanced transition probability with improved angle handling"""
        prev_nodes = self.edge_to_nodes[prev_edge]
        curr_nodes = self.edge_to_nodes[curr_edge]
        
        connected = bool(prev_nodes.intersection(curr_nodes))
        connectivity_score = 1.0 if connected else 0.3
        
        dir1 = np.array(prev_point.coords[-1]) - np.array(prev_point.coords[0])
        dir2 = np.array(curr_point.coords[-1]) - np.array(curr_point.coords[0])
        
        norm1 = np.linalg.norm(dir1)
        norm2 = np.linalg.norm(dir2)
        
        if norm1 == 0 or norm2 == 0:
            angle_score = 1.0
        else:
            cos_angle = np.dot(dir1, dir2) / (norm1 * norm2)
            angle = np.arccos(np.clip(cos_angle, -1.0, 1.0))
            
            angle_score = 1.0 - (angle / self.config['angle_tolerance']) * self.config['max_angle_penalty']
            angle_score = max(angle_score, 1.0 - self.config['max_angle_penalty'])
        
        prob = connectivity_score * angle_score
        return max(prob, self.config['min_prob_norm'])

    def _viterbi_matching(self, points: List[Point], candidates_by_point: List[List[dict]]) -> List[Dict]:
        """Improved Viterbi algorithm with better numerical stability"""
        n_points = len(points)
        states = [{} for _ in range(n_points)]
        
        # Initialize first state
        for candidate in candidates_by_point[0]:
            edge_id = candidate['edge_id']
            log_emission = np.log(self._calculate_emission_prob(points[0], candidate))
            states[0][edge_id] = {
                'log_prob': log_emission,
                'prev': None,
                'emission': log_emission,
                'transition': 0.0
            }
        
        # Forward pass with log probabilities
        for t in range(1, n_points):
            for candidate in candidates_by_point[t]:
                curr_edge = candidate['edge_id']
                log_emission = np.log(self._calculate_emission_prob(points[t], candidate))
                
                best_log_prob = float('-inf')
                best_prev = None
                best_transition = None
                
                for prev_edge, prev_state in states[t-1].items():
                    trans_prob = self._calculate_transition_prob(
                        prev_edge, curr_edge, points[t-1], points[t]
                    )
                    log_transition = np.log(trans_prob)
                    
                    log_prob = prev_state['log_prob'] + log_transition + log_emission
                    
                    if log_prob > best_log_prob:
                        best_log_prob = log_prob
                        best_prev = prev_edge
                        best_transition = log_transition
                
                if best_prev is not None:
                    states[t][curr_edge] = {
                        'log_prob': best_log_prob,
                        'prev': best_prev,
                        'emission': log_emission,
                        'transition': best_transition
                    }
        
        # Convert log probabilities to normalized confidence scores
        if states[-1]:
            log_probs = np.array([state['log_prob'] for state in states[-1].values()])
            max_log_prob = np.max(log_probs)
            normalized_probs = np.exp(log_probs - max_log_prob)
            normalized_probs /= np.sum(normalized_probs)
            
            for edge_id, norm_prob in zip(states[-1].keys(), normalized_probs):
                states[-1][edge_id]['confidence'] = norm_prob
        
        return states

    def _backtrack(self, states: List[Dict]) -> List[int]:
        """Backtrack to find the best path with improved handling of edge cases"""
        if not states or not states[-1]:
            return []
        
        path = []
        current_edge = max(states[-1].items(), key=lambda x: x[1]['log_prob'])[0]
        
        for t in range(len(states) - 1, -1, -1):
            path.append(current_edge)
            if t > 0 and states[t][current_edge]['prev'] is not None:
                current_edge = states[t][current_edge]['prev']
        
        return list(reversed(path))

    def _sequential_matching(self, points: List[Point]) -> Dict:
        """Match long trajectories in sequential segments with overlap"""
        segment_size = 30
        overlap = 10
        all_edges = []
        segment_confidences = []
        
        for i in range(0, len(points), segment_size - overlap):
            segment = points[i:i + segment_size]
            if len(segment) < 2:
                continue
                
            candidates = [self._find_candidates(p) for p in segment]
            if not all(candidates):
                continue
                
            states = self._viterbi_matching(segment, candidates)
            path = self._backtrack(states)
            
            if path:
                if states[-1] and path[-1] in states[-1]:
                    segment_confidences.append(states[-1][path[-1]].get('confidence', 0.0))
                    
                if all_edges and overlap > 0:
                    all_edges = all_edges[:-overlap]
                all_edges.extend(path)
        
        if not all_edges:
            return {'success': False, 'edges': [], 'confidence': 0.0}
        
        overall_confidence = np.mean(segment_confidences) if segment_confidences else 0.0
            
        return {
            'success': True,
            'edges': all_edges,
            'confidence': overall_confidence
        }

    def match_trajectory(self, points: List[Tuple[float, float]]) -> Dict:
        """Match trajectory with improved error handling and validation"""
        try:
            if len(points) < 2:
                return {'success': False, 'edges': [], 'confidence': 0.0}

            point_objects = [Point(p) for p in points]
            
            if self.config['sequential_matching'] and len(points) > 50:
                return self._sequential_matching(point_objects)
            
            candidates_by_point = [self._find_candidates(p) for p in point_objects]
            
            if not all(candidates_by_point):
                return {'success': False, 'edges': [], 'confidence': 0.0}
            
            states = self._viterbi_matching(point_objects, candidates_by_point)
            path = self._backtrack(states)
            
            if not path:
                return {'success': False, 'edges': [], 'confidence': 0.0}
            
            confidence = states[-1][path[-1]].get('confidence', 0.0)
            
            return {
                'success': True,
                'edges': path,
                'confidence': confidence
            }
                
        except Exception as e:
            self.logger.error(f"Error in match_trajectory: {str(e)}")
            return {'success': False, 'edges': [], 'confidence': 0.0}

In [14]:
def get_edge_identifier(edge_data) -> str:
    """Get a unique identifier for an edge, falling back to alternatives if OSMID is not available"""
    if 'OSMID' in edge_data:
        return str(edge_data['OSMID'])
    elif 'osmid' in edge_data:  # Try lowercase version
        return str(edge_data['osmid'])
    elif 'name' in edge_data:
        return f"road_{edge_data['name']}"
    else:
        # Create a unique identifier from the edge geometry
        coords = list(edge_data.geometry.coords)
        start = coords[0]
        end = coords[-1]
        return f"edge_{start[0]:.4f}_{start[1]:.4f}_{end[0]:.4f}_{end[1]:.4f}"

class RouteAnalyzer:
    """Analyze mapped routes for frequently traversed and slow segments"""
    def __init__(self, matcher, matched_results: List[Dict], output_dir: str = 'map_matching_results'):
        self.matcher = matcher
        self.matched_results = matched_results
        self.output_dir = output_dir
        self.analysis_dir = os.path.join(output_dir, 'route_analysis')
        os.makedirs(self.analysis_dir, exist_ok=True)
        
        # Convert edges to WGS84 for visualization
        self.edges_wgs84 = matcher.edges_gdf.to_crs('EPSG:4326')
        
        # Initialize segment statistics
        self.segment_stats = self._initialize_segment_stats()
    
    
    
    def _initialize_segment_stats(self) -> Dict:
        """Initialize statistics for each road segment with improved validation"""
        stats = {}
        
        for result in self.matched_results:
            if not result['match_result']['success']:
                continue
            
            coords = result['original_coords']
            edges = result['match_result']['edges']
            timestamps = result['timestamps']
            
            if len(coords) < 2 or not edges or timestamps is None:
                continue
            
            for edge_id in edges:
                if edge_id not in stats:
                    edge_data = self.matcher.edges_gdf.loc[edge_id]
                    road_id = get_edge_identifier(edge_data)
                    
                    stats[edge_id] = {
                        'traverse_count': 0,
                        'OSMID': road_id,
                        'length': 0,
                        'speeds': [],
                        'times': [],
                        'distance_traversed': 0
                    }
                
                edge_geom = self.matcher.edges_gdf.loc[edge_id].geometry
                stats[edge_id]['length'] = edge_geom.length
                stats[edge_id]['distance_traversed'] += edge_geom.length
                
                speed, time = self._analyze_trajectory_segment(
                    edge_id, 
                    edge_geom, 
                    coords,
                    timestamps
                )
                
                # Only add valid speed/time measurements
                if speed > 0 and time > 0:
                    stats[edge_id]['traverse_count'] += 1
                    stats[edge_id]['speeds'].append(speed)
                    stats[edge_id]['times'].append(time)
        
        # Calculate aggregate statistics with validation
        for edge_id, edge_stats in stats.items():
            if edge_stats['traverse_count'] > 0 and edge_stats['speeds']:
                valid_speeds = [s for s in edge_stats['speeds'] if s > 0]
                if valid_speeds:
                    edge_stats['avg_speed'] = np.mean(valid_speeds)
                    edge_stats['speed_std'] = np.std(valid_speeds) if len(valid_speeds) > 1 else 0
                    edge_stats['avg_time'] = np.mean([t for t in edge_stats['times'] if t > 0])
                    edge_stats['congestion_index'] = (edge_stats['speed_std'] / edge_stats['avg_speed'] 
                                                if edge_stats['avg_speed'] > 0 else 0)
        
        return stats
    
  
    
    
    
    
    def _analyze_trajectory_segment(self, edge_id: int, edge_geom, coords: List[tuple], 
                            timestamps: List[int]) -> tuple:
        """Analyze trajectory segment with speed in m/s"""
        edge_length = edge_geom.length
        if len(coords) < 2 or len(timestamps) < 2:
            return 0, 0

        # Calculate time difference in seconds
        time_diffs = [timestamps[i+1] - timestamps[i] for i in range(len(timestamps) - 1)]
        total_time = sum(time_diffs)

        # Validate time and length
        if total_time <= 0 or edge_length <= 0:
            return 0, 0
        
        # Calculate speed (m/s)
        speed = edge_length / total_time
        
        total_time = edge_length / speed
        
        # # Validate speed is realistic
        # min_speed_ms = 0.3  # ≈1 km/h in m/s
        # max_speed_ms = 33.3  # ≈120 km/h in m/s
        
        # if speed < min_speed_ms:
        #     speed = min_speed_ms
        #     total_time = edge_length / min_speed_ms
        # elif speed > max_speed_ms:
        #     speed = max_speed_ms
        #     total_time = edge_length / max_speed_ms
        
        return speed, total_time
    
    
    
    def get_most_traversed_segments(self, n: int = 10) -> List[Dict]:
        """Return the n most frequently traversed road segments"""
        segments = []
        for edge_id, stats in self.segment_stats.items():
            if stats['traverse_count'] > 0:
                segments.append({
                    'edge_id': edge_id,
                    'OSMID': stats['OSMID'],
                    'count': stats['traverse_count'],
                    'geometry': self.edges_wgs84.loc[edge_id].geometry,
                    'avg_speed': stats.get('avg_speed', 0),
                    'avg_time': stats.get('avg_time', 0),
                    'length': stats['length'],
                    'speed_std': stats.get('speed_std', 0),
                    'congestion_index': stats.get('congestion_index', 0),
                    'distance_traversed': stats['distance_traversed']
                })
        
        segments.sort(key=lambda x: x['count'], reverse=True)
        return segments[:n]
    
    def get_slowest_segments(self, n: int = 10) -> List[Dict]:
        """Return the n slowest road segments with speed in m/s"""
        segments = []
        min_length = 50  # Only consider segments longer than 50m
        min_speed = 0.3  # Minimum realistic speed in m/s (≈1 km/h)
        max_speed = 33.3  # Maximum realistic speed in m/s (≈120 km/h)
        
        for edge_id, stats in self.segment_stats.items():
            if (stats['traverse_count'] > 0 and 
                stats.get('avg_time', 0) > 0 and 
                stats['length'] >= min_length):
                
                # Calculate speed in m/s
                speed_ms = stats['length'] / stats['avg_time']
                
                # Validate speed is realistic
                if speed_ms >= min_speed and speed_ms <= max_speed:
                    segments.append({
                        'edge_id': edge_id,
                        'OSMID': stats['OSMID'],
                        'avg_time': stats['avg_time'],
                        'count': stats['traverse_count'],
                        'geometry': self.edges_wgs84.loc[edge_id].geometry,
                        'length': stats['length'],
                        'speed_ms': speed_ms,
                        'time_per_100m': (stats['avg_time'] / stats['length']) * 100
                    })
        
        # Sort by speed (lower speed = slower segment)
        segments.sort(key=lambda x: x['speed_ms'])
        return segments[:n]
    
    
    
    
    
    def _get_metric_display(self, segment: Dict, metric_name: str) -> str:
        """Format metric display for legend with m/s speed and time metrics"""
        if 'traverse' in metric_name.lower():
            return f"Count: {segment['count']}"
        else:
            # Calculate time metrics
            time_minutes = segment['avg_time'] / 60
            time_per_100m = (segment['avg_time'] / segment['length']) * 100
            
            return (f"Speed: {segment['speed_ms']:.1f} m/s\n"
                    f"Length: {segment['length']:.0f}m\n"
                    f"Time: {time_minutes:.1f} min\n"
                    f"Time/100m: {time_per_100m:.1f} sec")
    
    
    

    def _create_popup_text(self, segment: Dict, rank: int) -> str:
        """Create detailed popup text for segment"""
        return f"""
        <div style='font-family: Arial; font-size: 12px;'>
            <strong>Rank: {rank}</strong><br>
            OSMID: {segment['OSMID']}<br>
            Edge ID: {segment['edge_id']}<br>
            Traverse count: {segment['num_traversals']}<br>
            Average Speed: {segment['avg_speed_kmh']:.1f} km/h<br>
            Time per 100m: {segment['time_per_100m']:.1f} seconds<br>
            Length: {segment['length']:.0f} m<br>
            Total traversal time: {segment['total_time']:.0f} seconds
        </div>
        """

    def _create_title_html(self, title: str, num_segments: int, metric_name: str) -> str:
        """Create enhanced HTML for map title"""
        return f"""
        <div style="
            position: fixed;
            top: 20px;
            left: 60px;
            width: 320px;
            z-index: 1000;
            padding: 15px;
            background-color: white;
            border-radius: 6px;
            box-shadow: 0 2px 5px rgba(0,0,0,0.2);
        ">
            <h3 style="margin: 0; color: #2c3e50;">{title}</h3>
            <p style="margin: 8px 0 0 0; font-size: 13px; color: #666;">
                Showing top {num_segments} segments ranked by {metric_name.lower()}
            </p>
        </div>
        """




    def visualize_segments_enhanced(self, segments: List[Dict], title: str, filename: str,
                          metric_name: str, color_scheme: List[str]):
        """Create enhanced interactive visualization with corrected colormap scaling"""
        if not segments:
            logging.warning(f"No segments to visualize for {title}")
            return
        
        # Create base map
        center_lat, center_lon = 41.1579, -8.6291  # Porto coordinates
        m = folium.Map(
            location=[center_lat, center_lon],
            zoom_start=13,
            tiles='cartodbpositron'
        )
        
        # Add background network
        for _, edge in self.edges_wgs84.iterrows():
            if edge.geometry is not None:
                coords = [(y, x) for x, y in edge.geometry.coords]
                folium.PolyLine(
                    coords,
                    weight=1,
                    color='lightgray',
                    opacity=0.1
                ).add_to(m)
        
        if segments:
            # Calculate proper metric values for colormap with rounding
            if 'average travel time' in metric_name.lower():
                # Round time_per_100m values to 1 decimal place
                metric_values = [round(seg['time_per_100m'], 1) for seg in segments]
                caption = 'Time per 100m (seconds)'
                
                # Update vmin and vmax with rounded values
                vmin = min(metric_values)
                vmax = max(metric_values)
                
                # Create colormap with rounded range
                colormap = LinearColormap(
                    colors=color_scheme,
                    vmin=vmin,
                    vmax=vmax,
                    caption=f"{vmin:.1f} - {vmax:.1f} {caption}"
                )
            else:
                # For traverse count, keep as integers
                metric_values = [seg['count'] for seg in segments]
                caption = 'Traverse Count'
                colormap = LinearColormap(
                    colors=color_scheme,
                    vmin=min(metric_values),
                    vmax=max(metric_values),
                    caption=caption
                )
            
            # Create legend
            legend_html = """
            <div style="
                position: fixed;
                bottom: 50px;
                right: 50px;
                width: 250px;
                z-index: 1000;
                background-color: white;
                padding: 10px;
                border-radius: 5px;
                box-shadow: 0 0 5px rgba(0,0,0,0.2);
                font-size: 12px;
                max-height: 300px;
                overflow-y: auto;
            ">
                <h4 style="margin: 0 0 10px 0;">Road Segments</h4>
            """
            
            for rank, segment in enumerate(segments, 1):
                # Round the value used for coloring
                value = round(segment['time_per_100m'], 1) if 'average travel time' in metric_name.lower() else segment['count']
                color = colormap(value)
                
                legend_html += f"""
                <div style="margin-bottom: 8px;">
                    <span style="
                        display: inline-block;
                        width: 12px;
                        height: 12px;
                        background-color: {color};
                        margin-right: 5px;
                        border: 1px solid #666;
                    "></span>
                    <strong>#{rank}</strong> OSMID: {segment['OSMID']}
                    <br>
                    <span style="margin-left: 17px; white-space: pre-line;">
                        {self._get_metric_display(segment, metric_name)}
                    </span>
                </div>
                """
                
                # Add segment to map
                coords = [(y, x) for x, y in segment['geometry'].coords]
                folium.PolyLine(
                    coords,
                    weight=6,
                    color=colormap(value),
                    opacity=0.9,
                    popup=self._create_popup_text(segment, rank)
                ).add_to(m)
                
                # Add glowing effect
                folium.PolyLine(
                    coords,
                    weight=10,
                    color=colormap(value),
                    opacity=0.3
                ).add_to(m)
                
                # Add label
                if len(coords) > 1:
                    mid_point = coords[len(coords)//2]
                    folium.DivIcon(
                        html=f"""
                        <div style="
                            background-color: rgba(255, 255, 255, 0);
                            border: 2px solid {color};
                            border-radius: 4px;
                            padding: 3px 6px;
                            font-size: 8px;
                            font-weight: bold;
                            white-space: nowrap;
                            box-shadow: 0 0 4px rgba(0,0,0,0.2);
                        ">
                            #{rank} - {segment['length']:.0f}m
                        </div>
                        """
                     ).add_to(folium.Marker(
                         [segment['geometry'].coords[-1][1], segment['geometry'].coords[-1][0]],  # Use last coordinate for bottom placement
                         icon=DivIcon(
                             icon_size=(60, 36),
                             icon_anchor=(30, 0)  # Anchor the icon at the bottom center
                             )
                         ).add_to(m)) #.add_to(folium.Marker(
                    #     mid_point,
                    #     icon=DivIcon(
                    #         icon_size=(60, 36),
                    #         icon_anchor=(30, 18)
                    #     )
                    # ).add_to(m))
            
            legend_html += "</div>"
            m.get_root().html.add_child(folium.Element(legend_html))
            
            # Add colormap with proper caption
            colormap.add_to(m)
        
        # Add title
        title_html = self._create_title_html(title, len(segments), metric_name)
        m.get_root().html.add_child(folium.Element(title_html))
        
        # Save map
        output_path = os.path.join(self.analysis_dir, filename)
        m.save(output_path)
        return output_path

    


    def _create_popup_text(self, segment: Dict, rank: int) -> str:
        """Create enhanced popup text for segment"""
        # Calculate actual speed
        speed = segment['length'] / segment['avg_time'] if segment['avg_time'] > 0 else 0
        speed_kmh = speed * 3.6
        
        return f"""
        <div style='font-family: Arial; font-size: 12px; min-width: 200px;'>
            <div style='background-color: #f8f9fa; padding: 8px; border-radius: 4px; margin-bottom: 8px;'>
                <strong style='font-size: 14px;'>Rank #{rank}</strong><br>
                <strong>OSMID:</strong> {segment['OSMID']}
            </div>
            <strong>Statistics:</strong><br>
            • Traverse count: {segment.get('count', 'N/A')}<br>
            • Average Speed: {speed_kmh:.1f} km/h<br>
            • Length: {segment['length']:.0f} m<br>
            • Time: {segment['avg_time']:.1f} s<br>
            • Time per 100m: {(segment['avg_time'] / segment['length'] * 100):.1f} s
        </div>
        """

   

    def _create_title_html(self, title: str, num_segments: int, metric_name: str) -> str:
        """Create HTML for map title"""
        return f"""
        <div style="
            position: fixed;
            top: 10px;
            left: 50px;
            width: 300px;
            z-index: 1000;
            padding: 10px;
            background-color: white;
            border-radius: 5px;
            box-shadow: 0 0 5px rgba(0,0,0,0.2);
        ">
            <h4 style="margin: 0;">{title}</h4>
            <p style="margin: 5px 0 0 0; font-size: 12px;">
                Top {num_segments} segments ranked by {metric_name}
            </p>
        </div>
        """

    def analyze_and_visualize(self):
        """Perform complete route analysis"""
        try:
            most_traversed = self.get_most_traversed_segments()
            slowest_segments = self.get_slowest_segments()
            
            if not most_traversed and not slowest_segments:
                logging.warning("No valid segments found for analysis")
                return None
            
            # Generate visualizations
            results = {
                'most_traversed': {
                    'segments': most_traversed,
                    'map_path': self.visualize_segments_enhanced(
                        most_traversed,
                        "Most Frequently Traversed Road Segments",
                        "most_traversed_segments.html",
                        "traverse frequency",
                        ['#fff7ec', '#fee8c8', '#fdd49e', '#fdbb84', '#fc8d59', '#ef6548', '#d7301f', '#990000']
                    )
                },
                'slowest_segments': {
                    'segments': slowest_segments,
                    'map_path': self.visualize_segments_enhanced(
                        slowest_segments,
                        "Slowest Road Segments",
                        "slowest_segments.html",
                        "average travel time",
                        ['#f7fcfd', '#e0ecf4', '#bfd3e6', '#9ebcda', '#8c96c6', '#8c6bb1', '#88419d', '#6e016b']
                    )
                }
            }
            
            return results
            
        except Exception as e:
            logging.error(f"Error in analyze_and_visualize: {str(e)}")
            return None

def process_trajectory_with_time(row):
    """Process trajectory with timestamps from the data"""
    try:
        coords = ast.literal_eval(row['POLYLINE'])
        if not coords or len(coords) < 2:
            return None
            
        start_timestamp = int(row['TIMESTAMP'])
        timestamps = [start_timestamp + i * 15 for i in range(len(coords))]
        
        return {
            'coords': coords,
            'timestamps': timestamps,
            'start_timestamp': start_timestamp
        }
    except Exception as e:
        logging.warning(f"Error processing trajectory: {str(e)}")
        return None


In [15]:


def run_map_matching_pipeline(input_file: str, place: str, nrows: int = None) -> Dict:
    """
    Run the complete map matching pipeline with the given parameters
    """
    try:
        # Set up logging
        logging.basicConfig(level=logging.INFO)
        logger = logging.getLogger(__name__)
        
        # Load road network
        logger.info(f"Loading road network for {place}...")
        G = ox.graph_from_place(place, network_type='drive')
        nodes, edges = ox.graph_to_gdfs(G)
        
        # Add edge IDs if they don't exist
        if 'OSMID' not in edges.columns and 'osmid' not in edges.columns:
            logger.info("Creating unique identifiers for edges...")
            edges['OSMID'] = edges.apply(get_edge_identifier, axis=1)
        
        # Convert to UTM coordinates
        utm_crs = 'EPSG:32629'  # UTM zone 29N for Porto
        edges = edges.to_crs(utm_crs)
        
        # Load trajectory data
        logger.info(f"Loading trajectory data from {input_file}...")
        df = pd.read_csv(input_file, nrows=nrows)
        
        # Initialize matcher with default configuration
        matcher = EnhancedViterbiMatcher(G, edges)
        
        # Process trajectories
        matched_results = []
        for idx, row in tqdm(df.iterrows(), total=len(df)):
            trajectory_data = process_trajectory_with_time(row)
            if trajectory_data:
                point_gdf = gpd.GeoDataFrame(
                    geometry=[Point(x, y) for x, y in trajectory_data['coords']],
                    crs='EPSG:4326'
                ).to_crs(utm_crs)
                
                utm_coords = [(p.x, p.y) for p in point_gdf.geometry]
                result = matcher.match_trajectory(utm_coords)
                
                if result['success']:
                    matched_results.append({
                        'match_result': result,
                        'original_coords': trajectory_data['coords'],
                        'timestamps': trajectory_data['timestamps'],
                        'start_timestamp': trajectory_data['start_timestamp']
                    })
        
        if not matched_results:
            logger.warning("No trajectories were successfully matched")
            return None
        
        # Perform analysis
        output_dir = 'map_matching_results'
        route_analyzer = RouteAnalyzer(matcher, matched_results, output_dir)
        analysis_results = route_analyzer.analyze_and_visualize()
        
        return {
            'matched_results': matched_results,
            'analysis_results': analysis_results,
            'output_dir': output_dir
        }
        
    except Exception as e:
        logger.error(f"Error in map matching pipeline: {str(e)}")
        return None

if __name__ == "__main__":
    # Example usage
    try:
        results = run_map_matching_pipeline(
            input_file='kraggle_data/train/train.csv',
            place='Porto, Portugal',
            nrows=1500
        )
        
        if results:
            print(f"Successfully processed trajectories and generated analysis in: {results['output_dir']}")
        else:
            print("Pipeline execution failed")
    except Exception as e:
        print(f"Error running pipeline: {str(e)}")

INFO:__main__:Loading road network for Porto, Portugal...
INFO:__main__:Loading trajectory data from kraggle_data/train/train.csv...
100%|██████████| 1500/1500 [00:37<00:00, 39.64it/s]


Successfully processed trajectories and generated analysis in: map_matching_results
