In [7]:
import json
import osmnx as ox
import networkx as nx
import folium
from geopy.distance import geodesic
from folium.plugins import MarkerCluster
from scipy.spatial import cKDTree
import numpy as np

In [10]:

class AStarRoutePlanner:
    def __init__(self, json_paths: dict, buffer_radius_m: int = 150):
        """
        Initialize with disruption sources
        Args:
            json_paths (dict): dict with keys 'tweets', 'delivery', 'traffic' and file paths as values
            buffer_radius_m (int): penalty radius in meters
        """
        self.buffer_radius = buffer_radius_m
        self.json_paths = json_paths
        self.obstacles = []  # All obstacle coordinates
        self.categorized_obstacles = {
            "tweets": [],
            "delivery": [],
            "traffic": []
        }
        self._load_obstacles()

    def _load_obstacles(self):
        """Load obstacles from categorized JSON files"""
        for category, path in self.json_paths.items():
            with open(path, 'r') as f:
                data = json.load(f)
                for item in data:
                    lat = item.get("latitude")
                    lon = item.get("longitude")
                    severity = item.get("severity", "Unknown")
                    if lat and lon:
                        self.categorized_obstacles[category].append((lat, lon, severity))
                        self.obstacles.append((lat, lon))

    def _add_weights_with_penalties(self, G):
        """Apply penalty to edges near obstacles using KDTree"""
        obstacle_coords = np.radians(self.obstacles)
        tree = cKDTree(obstacle_coords)

        for u, v, data in G.edges(data=True):
            point_u = (G.nodes[u]['y'], G.nodes[u]['x'])
            point_v = (G.nodes[v]['y'], G.nodes[v]['x'])
            midpoint = ((point_u[0] + point_v[0]) / 2, (point_u[1] + point_v[1]) / 2)
            midpoint_rad = np.radians(midpoint)

            idxs = tree.query_ball_point(midpoint_rad, r=self.buffer_radius / 6371000.0)
            weight = data.get('length', 1)
            if idxs:
                weight *= 10  # Heavy penalty
            data['weight'] = weight

    def _add_custom_markers(self, fmap, category, icon_color, icon_name):
        """Add categorized obstacle markers to the map"""
        group = folium.FeatureGroup(name=f"{category.capitalize()} Alerts")
        for lat, lon, severity in self.categorized_obstacles[category]:
            folium.Marker(
                location=(lat, lon),
                icon=folium.Icon(color=icon_color, icon=icon_name, prefix='fa'),
                popup=f"{category.capitalize()} Alert<br>Severity: {severity}"
            ).add_to(group)
        group.add_to(fmap)

    def _add_legend(self, fmap):
        """Add legend to map"""
        legend_html = """
        <div style="position: fixed; bottom: 30px; left: 30px; width: 220px; height: 120px;
                     border:2px solid grey; z-index:9999; font-size:14px; background-color:white; padding: 10px;">
            <b>🗺️ Legend</b><br>
            <i class="fa fa-comment" style="color:red"></i> Disruption Tweets<br>
            <i class="fa fa-truck" style="color:orange"></i> Delivery Anomalies<br>
            <i class="fa fa-car" style="color:blue"></i> Traffic Slowdowns<br>
        </div>
        """
        fmap.get_root().html.add_child(folium.Element(legend_html))

    def find_optimal_route(self, start: tuple, end: tuple, map_output: str = "a_star_route_map.html"):
        """
        Find optimal route using A* and generate annotated map
        Args:
            start: (lat, lon) tuple of warehouse
            end: (lat, lon) tuple of delivery point
            map_output: Output HTML path
        Returns:
            List of route coordinates
        """
        # Load graph from OSM
        G = ox.graph_from_point(start, dist=10000, network_type='drive')

        # Get closest graph nodes
        orig_node = ox.distance.nearest_nodes(G, X=start[1], Y=start[0])
        dest_node = ox.distance.nearest_nodes(G, X=end[1], Y=end[0])

        # Apply penalties
        self._add_weights_with_penalties(G)

        try:
            route = nx.astar_path(G, orig_node, dest_node, weight='weight')
        except nx.NetworkXNoPath:
            print("❌ No path found!")
            return []

        route_coords = [(G.nodes[n]['y'], G.nodes[n]['x']) for n in route]

        # Map setup
        m = folium.Map(location=start, zoom_start=13)
        folium.Marker(location=start, icon=folium.Icon(color='blue'), tooltip="Warehouse").add_to(m)
        folium.Marker(location=end, icon=folium.Icon(color='red'), tooltip="Delivery").add_to(m)
        folium.PolyLine(route_coords, color='green', weight=5, tooltip="Optimal Path").add_to(m)

        # Add categorized markers
        self._add_custom_markers(m, "tweets", "red", "comment")
        self._add_custom_markers(m, "delivery", "orange", "truck")
        self._add_custom_markers(m, "traffic", "blue", "car")

        # Add legend and controls
        self._add_legend(m)
        folium.LayerControl().add_to(m)

        m.save(map_output)
        print(f"✅ Map saved to {map_output}")
        return route_coords


In [12]:
planner = AStarRoutePlanner(
    json_paths={
        "tweets": r"C:\Users\Sheraz\Documents\pythontest\TrackSmart\models_data\twitter_alerts_output.json",
        "delivery": r"C:\Users\Sheraz\Documents\pythontest\TrackSmart\models_data\anomaly_alerts.json",
        "traffic": r"C:\Users\Sheraz\Documents\pythontest\TrackSmart\models_data\traffic_alerts.json"
    },
    buffer_radius_m=150
)

start = (12.9103, 80.2270)     # Example: Velachery
end = (13.0601, 80.2492)       # Example: T.Nagar

planner.find_optimal_route(start, end, map_output="route_with_all_alerts.html")


✅ Map saved to route_with_all_alerts.html


[(12.909688, 80.2285884),
 (12.9127609, 80.2290272),
 (12.9134799, 80.2291245),
 (12.9138126, 80.2291752),
 (12.9147133, 80.229305),
 (12.9157556, 80.2294553),
 (12.9159993, 80.2294905),
 (12.9162392, 80.2295251),
 (12.9165173, 80.2295652),
 (12.9167759, 80.2295973),
 (12.9179747, 80.2297475),
 (12.91838, 80.2297947),
 (12.919109, 80.2298796),
 (12.9231719, 80.230525),
 (12.9233106, 80.2305447),
 (12.9245173, 80.2307024),
 (12.924605, 80.2306224),
 (12.9253328, 80.2307057),
 (12.9277559, 80.2310059),
 (12.9290333, 80.2311869),
 (12.93185, 80.2320186),
 (12.9324746, 80.2321928),
 (12.9327421, 80.2322797),
 (12.9332367, 80.2324759),
 (12.9344002, 80.2330003),
 (12.9347624, 80.2331597),
 (12.9367876, 80.2341317),
 (12.9398701, 80.235602),
 (12.9401463, 80.235742),
 (12.940604, 80.235882),
 (12.9415166, 80.2361308),
 (12.9418912, 80.2362242),
 (12.9427203, 80.2364962),
 (12.9429076, 80.2364621),
 (12.9437004, 80.2365476),
 (12.9438509, 80.2364922),
 (12.9443344, 80.2363646),
 (12.9445386, 

In [1]:
###############################

In [2]:
import json
import folium
import networkx as nx
import numpy as np
import osmnx as ox
from scipy.spatial import cKDTree
from folium.plugins import MarkerCluster

In [18]:
import json
import osmnx as ox
import networkx as nx
import folium
from geopy.distance import geodesic
from folium.plugins import MarkerCluster
from scipy.spatial import cKDTree
import numpy as np


class AStarRoutePlanner:
    def __init__(self, json_paths: dict, buffer_radius_m: int = 150):
        """
        Initialize with disruption sources
        Args:
            json_paths (dict): Paths to JSON files for different alert types
            buffer_radius_m (int): Buffer radius around alerts to penalize roads
        """
        self.obstacles = []
        self.buffer_radius = buffer_radius_m
        self.json_paths = json_paths
        self._load_obstacles()

    def _load_obstacles(self):
        """Load all obstacles from JSON files"""
        for path in self.json_paths.values():
            with open(path, 'r') as f:
                data = json.load(f)
                for item in data:
                    lat = item.get("latitude")
                    lon = item.get("longitude")
                    if lat and lon:
                        self.obstacles.append((lat, lon))


    def _add_weights_with_penalties(self, G):
        """Optimized: Add edge weights + penalty if near obstacles using KDTree"""
        obstacle_coords = np.radians(self.obstacles)  # Convert to radians
        tree = cKDTree(obstacle_coords)

        def haversine(lat1, lon1, lat2, lon2):
            R = 6371000  # Earth radius in meters
            dlat = lat2 - lat1
            dlon = lon2 - lon1
            a = np.sin(dlat/2)**2 + np.cos(lat1)*np.cos(lat2)*np.sin(dlon/2)**2
            return 2 * R * np.arcsin(np.sqrt(a))

        for u, v, data in G.edges(data=True):
            point_u = (G.nodes[u]['y'], G.nodes[u]['x'])
            point_v = (G.nodes[v]['y'], G.nodes[v]['x'])
            midpoint = ((point_u[0] + point_v[0]) / 2, (point_u[1] + point_v[1]) / 2)
            midpoint_rad = np.radians(midpoint)

            # Query KDTree for neighbors within radius
            idxs = tree.query_ball_point(midpoint_rad, r=self.buffer_radius / 6371000.0)  # radius in radians

            weight = data.get('length', 1)
            if idxs:
                weight *= 10  # penalty

            data['weight'] = weight


    def find_optimal_route(self, start: tuple, end: tuple, map_output: str = "a_star_route_map.html"):
        """
        Find the optimal route using A* algorithm and save a map
        Args:
            start (tuple): (lat, lon) of warehouse
            end (tuple): (lat, lon) of delivery location
            map_output (str): Output HTML file
        Returns:
            list of (lat, lon) tuples representing path
        """
        # Load street network (10km buffer)
        G = ox.graph_from_point(start, dist=10000, network_type='drive')

        # Get nearest nodes
        orig_node = ox.distance.nearest_nodes(G, X=start[1], Y=start[0])
        dest_node = ox.distance.nearest_nodes(G, X=end[1], Y=end[0])

        # Apply penalties
        self._add_weights_with_penalties(G)

        # Find path with A*
        try:
            route = nx.astar_path(G, orig_node, dest_node, weight='weight')
        except nx.NetworkXNoPath:
            print("❌ No path found avoiding obstacles!")
            return []

        # Get coordinates
        route_coords = [(G.nodes[n]['y'], G.nodes[n]['x']) for n in route]

        # Create map
        m = folium.Map(location=start, zoom_start=13)
        folium.Marker(location=start, icon=folium.Icon(color='blue'), tooltip="Start").add_to(m)
        folium.Marker(location=end, icon=folium.Icon(color='red'), tooltip="Destination").add_to(m)

        # Plot obstacles
        marker_cluster = MarkerCluster().add_to(m)
        for lat, lon in self.obstacles:
            folium.CircleMarker(location=(lat, lon), radius=5, color='orange', fill=True, fill_opacity=0.6).add_to(marker_cluster)

        # Draw route
        folium.PolyLine(route_coords, color='green', weight=5, tooltip="Optimal Path").add_to(m)

        m.save(map_output)
        print(f"✅ Route map saved as {map_output}")
        return route_coords


In [20]:
if __name__ == "__main__":
    # Paths to alert JSONs
    json_sources = {
        "tweets": r"C:\Users\Sheraz\Documents\pythontest\TrackSmart\models_data\twitter_alerts_output.json",
        "anomalies": r"C:\Users\Sheraz\Documents\pythontest\TrackSmart\models_data\anomaly_alerts.json",
        "traffic": r"C:\Users\Sheraz\Documents\pythontest\TrackSmart\models_data\traffic_alerts.json"
    }

    planner = AStarRoutePlanner(json_paths=json_sources)

    warehouse = (12.9816, 80.2209)
    delivery = (13.0601, 80.2492)

    route = planner.find_optimal_route(warehouse, delivery)


✅ Route map saved as a_star_route_map.html
