In [7]:
import torch
import torch_geometric as pyg
import networkx as nx
import pandas as pd
import numpy as np
from torch_geometric.utils import to_networkx
from sklearn.cluster import DBSCAN
import seaborn as sns
import matplotlib.pyplot as plt
from scipy.spatial.distance import cdist
import scipy.stats

def load_and_analyze_graph(path):
    """
    Load and perform comprehensive analysis on traffic collision graph
    """
    # Load the PyTorch geometric graph
    graph = torch.load(path)
    
    # Convert to NetworkX for certain analyses
    G = to_networkx(graph, to_undirected=True)
    
    results = {}
    
    # 1. Network Topology Analysis
    results['network_metrics'] = analyze_network_topology(G)
    
    # 2. Environmental Analysis
    results['environmental'] = analyze_environmental_factors(graph)
    
    # 3. Road Analysis
    results['road_analysis'] = analyze_road_characteristics(graph)
    
    # 4. Spatial Analysis
    results['spatial'] = analyze_spatial_patterns(graph)
    
    return results

def analyze_network_topology(G):
    """
    Analyze network topology metrics
    """
    metrics = {
        'betweenness_centrality': nx.betweenness_centrality(G),
        'degree_centrality': nx.degree_centrality(G),
        'clustering_coefficient': nx.average_clustering(G),
        'average_shortest_path': nx.average_shortest_path_length(G),
        'high_risk_nodes': identify_high_risk_nodes(G)
    }
    return metrics

def analyze_environmental_factors(graph):
    """
    Analyze weather-related patterns
    """
    weather_features = ['tavg', 'tmax', 'tmin', 'prcp', 'wspd', 'pres']
    
    correlations = {}
    for feature in weather_features:
        if hasattr(graph, feature):
            feature_data = getattr(graph, feature)
            correlations[feature] = analyze_weather_correlation(feature_data, graph.collision_count)
    
    return correlations

def analyze_road_characteristics(graph):
    """
    Analyze road-specific patterns
    """
    road_stats = {
        'one_way_analysis': analyze_one_way_roads(graph),
        'road_type_analysis': analyze_road_types(graph),
        'traffic_volume_impact': analyze_traffic_volume(graph)
    }
    return road_stats

def analyze_spatial_patterns(graph):
    """
    Perform spatial analysis
    """
    # Extract coordinates
    coords = np.column_stack((graph.lat, graph.long))
    
    # Perform DBSCAN clustering
    clustering = DBSCAN(eps=0.1, min_samples=5).fit(coords)
    
    spatial_analysis = {
        'hotspots': identify_hotspots(coords, clustering.labels_),
        'spatial_autocorrelation': calculate_spatial_autocorrelation(coords, graph.collision_count)
    }
    return spatial_analysis

def identify_hotspots(coords, cluster_labels):
    """
    Identify collision hotspots based on DBSCAN clustering
    """
    unique_clusters = np.unique(cluster_labels)
    hotspots = []
    
    for cluster in unique_clusters:
        if cluster != -1:  # -1 represents noise in DBSCAN
            cluster_points = coords[cluster_labels == cluster]
            center = np.mean(cluster_points, axis=0)
            density = len(cluster_points)
            hotspots.append({
                'center': center,
                'density': density,
                'points': cluster_points
            })
    
    return sorted(hotspots, key=lambda x: x['density'], reverse=True)

def calculate_spatial_autocorrelation(coords, collision_counts):
    """
    Calculate Moran's I spatial autocorrelation
    """
    # Calculate distance matrix
    dist_matrix = cdist(coords, coords)
    
    # Convert to weights matrix (inverse distance)
    weights = 1 / (dist_matrix + np.eye(len(coords)))  # Add eye to avoid division by zero
    weights[weights == np.inf] = 0
    
    # Normalize weights
    weights = weights / weights.sum(axis=1)[:, np.newaxis]
    
    # Calculate Moran's I
    return moran(collision_counts, weights)[0]

def analyze_road_types(graph):
    """
    Analyze collision patterns by road type
    """
    road_types = graph.road_type.unique()
    analysis = {}
    
    for road_type in road_types:
        mask = graph.road_type == road_type
        analysis[road_type] = {
            'collision_count': graph.collision_count[mask].sum(),
            'collision_rate': graph.collision_count[mask].mean(),
            'avg_traffic': graph.aadt[mask].mean() if hasattr(graph, 'aadt') else None
        }
    
    return analysis

def analyze_traffic_volume(graph):
    """
    Analyze relationship between traffic volume and collisions
    """
    if not hasattr(graph, 'aadt'):
        return None
    
    # Calculate correlation between AADT and collisions
    correlation = np.corrcoef(graph.aadt, graph.collision_count)[0,1]
    
    # Bin traffic volumes and calculate average collision rates
    bins = np.percentile(graph.aadt, np.linspace(0, 100, 11))
    traffic_bins = np.digitize(graph.aadt, bins)
    
    avg_collisions = [graph.collision_count[traffic_bins == i].mean() 
                     for i in range(1, len(bins))]
    
    return {
        'correlation': correlation,
        'binned_analysis': {
            'bins': bins,
            'avg_collisions': avg_collisions
        }
    }

def calculate_collision_rate(graph, one_way):
    """
    Calculate collision rate for specific road type
    """
    mask = graph.one_way == one_way
    total_collisions = graph.collision_count[mask].sum()
    total_roads = mask.sum()
    
    return {
        'total_collisions': total_collisions,
        'total_roads': total_roads,
        'collision_rate': total_collisions / total_roads if total_roads > 0 else 0
    }

def plot_weather_correlations(environmental_results, ax):
    """
    Plot weather correlations with collision rates
    """
    features = list(environmental_results.keys())
    correlations = list(environmental_results.values())
    
    sns.barplot(x=features, y=correlations, ax=ax)
    ax.set_title('Weather Correlations with Collision Rates')
    ax.set_xticklabels(ax.get_xticklabels(), rotation=45)
    ax.set_ylabel('Correlation Coefficient')

def plot_road_type_analysis(road_analysis, ax):
    """
    Plot collision rates by road type
    """
    road_types = list(road_analysis['road_type_analysis'].keys())
    collision_rates = [data['collision_rate'] 
                      for data in road_analysis['road_type_analysis'].values()]
    
    sns.barplot(x=road_types, y=collision_rates, ax=ax)
    ax.set_title('Collision Rates by Road Type')
    ax.set_xticklabels(ax.get_xticklabels(), rotation=45)
    ax.set_ylabel('Collision Rate')

def plot_spatial_hotspots(spatial_results, lat, long, ax):
    """
    Plot spatial hotspots on a map
    """
    # Create scatter plot of all points
    ax.scatter(long, lat, alpha=0.1, c='gray', s=1)
    
    # Plot hotspots
    for hotspot in spatial_results['hotspots']:
        ax.scatter(hotspot['center'][1], hotspot['center'][0], 
                  c='red', s=100, alpha=0.6)
        
    ax.set_title('Collision Hotspots')
    ax.set_xlabel('Longitude')
    ax.set_ylabel('Latitude')

def visualize_results(results, graph):
    """
    Create visualizations for analysis results
    """
    fig, axes = plt.subplots(2, 2, figsize=(15, 15))
    
    # 1. Network topology heatmap
    sns.heatmap(pd.DataFrame(results['network_metrics']).corr(), 
                ax=axes[0,0], cmap='coolwarm')
    axes[0,0].set_title('Network Metrics Correlation')
    
    # 2. Weather correlation plot
    plot_weather_correlations(results['environmental'], ax=axes[0,1])
    
    # 3. Road type analysis
    plot_road_type_analysis(results['road_analysis'], ax=axes[1,0])
    
    # 4. Spatial hotspot map
    plot_spatial_hotspots(results['spatial'], graph.lat, graph.long, ax=axes[1,1])
    
    plt.tight_layout()
    return fig

In [2]:
import torch
import torch_geometric as pyg
import networkx as nx
import pandas as pd
import numpy as np
from torch_geometric.utils import to_networkx
from sklearn.cluster import DBSCAN
import seaborn as sns
import matplotlib.pyplot as plt
from scipy.spatial.distance import cdist
import scipy.stats

In [3]:
"""
Load and perform comprehensive analysis on traffic collision graph
"""
# Load the PyTorch geometric graph
graph = torch.load('/Users/beck/Documents/GitHub/ML4RoadSafety/ml_for_road_safety/data/CA/adj_matrix.pt')

# Convert to NetworkX for certain analyses
G = to_networkx(graph, to_undirected=True)

  graph = torch.load('/Users/beck/Documents/GitHub/ML4RoadSafety/ml_for_road_safety/data/CA/adj_matrix.pt')


FileNotFoundError: [Errno 2] No such file or directory: '/Users/beck/Documents/GitHub/ML4RoadSafety/ml_for_road_safety/data/CA/adj_matrix.pt'