In [1]:
import pandas as pd
import requests
import csv
import folium
from folium.plugins import MarkerCluster
import matplotlib.pyplot as plt
import seaborn as sns

# Set boundaries for latitude and longitude based on your driving data
BOUNDARY_MIN_LAT = 39.96  # Adjust as per your driving_data.csv
BOUNDARY_MAX_LAT = 40.06  # Adjust as per your driving_data.csv
BOUNDARY_MIN_LON = -83.05  # Adjust as per your driving_data.csv
BOUNDARY_MAX_LON = -82.85  # Adjust as per your driving_data.csv

# Overpass API endpoint
OVERPASS_URL = "http://overpass-api.de/api/interpreter"

# Overpass query to fetch sensitive locations including schools, hospitals, libraries, etc.
overpass_query = f"""
[out:json];
(
  node["amenity"="school"]({BOUNDARY_MIN_LAT},{BOUNDARY_MIN_LON},{BOUNDARY_MAX_LAT},{BOUNDARY_MAX_LON});
  node["amenity"="hospital"]({BOUNDARY_MIN_LAT},{BOUNDARY_MIN_LON},{BOUNDARY_MAX_LAT},{BOUNDARY_MAX_LON});
  node["amenity"="college"]({BOUNDARY_MIN_LAT},{BOUNDARY_MIN_LON},{BOUNDARY_MAX_LAT},{BOUNDARY_MAX_LON});
  node["amenity"="university"]({BOUNDARY_MIN_LAT},{BOUNDARY_MIN_LON},{BOUNDARY_MAX_LAT},{BOUNDARY_MAX_LON});
  node["amenity"="nursing_home"]({BOUNDARY_MIN_LAT},{BOUNDARY_MIN_LON},{BOUNDARY_MAX_LAT},{BOUNDARY_MAX_LON});
  node["amenity"="library"]({BOUNDARY_MIN_LAT},{BOUNDARY_MIN_LON},{BOUNDARY_MAX_LAT},{BOUNDARY_MAX_LON});
  node["amenity"="police"]({BOUNDARY_MIN_LAT},{BOUNDARY_MIN_LON},{BOUNDARY_MAX_LAT},{BOUNDARY_MAX_LON});
  node["amenity"="fire_station"]({BOUNDARY_MIN_LAT},{BOUNDARY_MIN_LON},{BOUNDARY_MAX_LAT},{BOUNDARY_MAX_LON});
  node["amenity"="pharmacy"]({BOUNDARY_MIN_LAT},{BOUNDARY_MIN_LON},{BOUNDARY_MAX_LAT},{BOUNDARY_MAX_LON});
);
out center;
"""

def fetch_sensitive_locations():
    """Fetch sensitive locations from the Overpass API."""
    try:
        response = requests.post(OVERPASS_URL, data={'data': overpass_query})
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"Error fetching data from Overpass API: {e}")
        return None

def parse_locations(osm_data):
    """Parse the Overpass API JSON data and extract relevant location information."""
    locations = []
    for element in osm_data['elements']:
        lat = element['lat']
        lon = element['lon']
        amenity = element['tags'].get('amenity', 'Unknown')
        name = element['tags'].get('name', 'Unnamed')
        locations.append({
            'Latitude': lat,
            'Longitude': lon,
            'Amenity': amenity,
            'Name': name
        })
    return locations

def save_to_csv(locations, output_csv):
    """Save the parsed location data to a CSV file."""
    with open(output_csv, mode='w', newline='', encoding='utf-8') as file:
        writer = csv.DictWriter(file, fieldnames=['Latitude', 'Longitude', 'Amenity', 'Name'])
        writer.writeheader()
        for location in locations:
            writer.writerow(location)
    print(f"Sensitive locations saved to {output_csv}")

def plot_sensitive_areas_on_map(locations, output_html):
    """Plot sensitive areas on an interactive map using Folium."""
    # Center the map at the first sensitive location
    if not locations:
        print("No locations found to plot.")
        return
    
    first_location = locations[0]
    folium_map = folium.Map(location=[first_location['Latitude'], first_location['Longitude']], zoom_start=12)
    
    # Use MarkerCluster to group markers
    marker_cluster = MarkerCluster().add_to(folium_map)
    
    for location in locations:
        popup_text = f"{location['Amenity'].title()}: {location['Name']}"
        folium.Marker(
            location=[location['Latitude'], location['Longitude']],
            popup=popup_text,
            icon=folium.Icon(color='blue', icon='info-sign')
        ).add_to(marker_cluster)
    
    # Save map to HTML file
    folium_map.save(output_html)
    print(f"Map saved as {output_html}")

def plot_histogram(data, column, title, xlabel, ylabel):
    """Plot histogram of a specific column from the data."""
    plt.figure(figsize=(10, 6))
    sns.histplot(data[column], bins=30, kde=True, color='blue')
    plt.title(title)
    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    plt.show()

def process_driving_data(input_csv, output_csv, output_map_html):
    """Main function to process driving data and fetch sensitive locations."""
    # Fetch sensitive locations from OSM
    print("Fetching sensitive locations from Overpass API...")
    osm_data = fetch_sensitive_locations()

    if osm_data:
        print("Parsing location data...")
        locations = parse_locations(osm_data)
        
        # Save sensitive locations to CSV
        print(f"Saving sensitive locations to {output_csv}...")
        save_to_csv(locations, output_csv)

        # Plot sensitive areas on a map with popups
        print("Plotting sensitive areas on a map...")
        plot_sensitive_areas_on_map(locations, output_map_html)

        # Load driving data and plot histograms for speed and acceleration
        print("Loading driving data...")
        df = pd.read_csv(input_csv)

        # Plot histograms for speed and acceleration
        print("Plotting histograms for Speed and Acceleration...")
        plot_histogram(df, 'Speed(km/h)', 'Distribution of Speed (km/h)', 'Speed (km/h)', 'Frequency')
        plot_histogram(df, 'Acceleration(m/s^2)', 'Distribution of Acceleration (m/s²)', 'Acceleration (m/s²)', 'Frequency')

        print("Process completed successfully.")
    else:
        print("Failed to retrieve sensitive locations.")

if __name__ == "__main__":
    input_csv = '../data/driving_data.csv'  # Path to your input driving data CSV file
    output_csv = '../data/sensitive_location.csv'  # Output CSV for sensitive areas
    output_map_html = 'sensitive_areas_map.html'  # Output HTML for map visualization

    # Process driving data and detect sensitive areas
    process_driving_data(input_csv, output_csv, output_map_html)


Fetching sensitive locations from Overpass API...


In [1]:
import os
import pandas as pd
import numpy as np
from math import radians, sin, cos, sqrt, atan2
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from scipy import stats
import matplotlib.pyplot as plt
import seaborn as sns

# Ensure the 'data' directory exists
output_directory = 'data'
if not os.path.exists(output_directory):
    os.makedirs(output_directory)

# Load the driving data
df = pd.read_csv('../data/driving_data.csv')

# Ensure that Time_Step, Latitude, and Longitude are numeric
df['Time_Step'] = pd.to_numeric(df['Time_Step'], errors='coerce')
df['Latitude'] = pd.to_numeric(df['Latitude'], errors='coerce')
df['Longitude'] = pd.to_numeric(df['Longitude'], errors='coerce')

# Drop rows with missing values in important columns
df = df.dropna(subset=['Time_Step', 'Latitude', 'Longitude'])

# Convert Speed from km/h to m/s
df['Speed(m/s)'] = df['Speed(km/h)'] * 0.27778

# Haversine formula to calculate distance between two GPS points
def haversine(lat1, lon1, lat2, lon2):
    R = 6371  # Radius of Earth in kilometers
    d_lat = radians(lat2 - lat1)
    d_lon = radians(lon2 - lon1)
    lat1 = radians(lat1)
    lat2 = radians(lat2)

    a = sin(d_lat / 2)**2 + cos(lat1) * cos(lat2) * sin(d_lon / 2)**2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))

    return R * c * 1000  # Distance in meters

# Calculate differences for time, latitude, and longitude
df['Time_Diff'] = df['Time_Step'].diff().fillna(0)

# Shift latitude and longitude for distance calculation
df['Lat_Shifted'] = df['Latitude'].shift(1)
df['Lon_Shifted'] = df['Longitude'].shift(1)

# Calculate distance between consecutive points
df['Distance(m)'] = df.apply(lambda row: haversine(row['Lat_Shifted'], row['Lon_Shifted'], row['Latitude'], row['Longitude']), axis=1)

# Filter out extreme values for acceleration (beyond ±10 m/s²) and other columns using z-score
z_scores = np.abs(stats.zscore(df[['Speed(m/s)', 'Acceleration(m/s^2)', 'Jerk(m/s^3)', 'Braking_Intensity']].fillna(0)))
df = df[(z_scores < 3).all(axis=1)]  # Keep data within 3 standard deviations

# Recalculate acceleration after outlier removal
df['Acceleration(m/s^2)'] = df['Speed(m/s)'].diff() / df['Time_Diff'].replace(0, np.nan).fillna(1)

# Calculate jerk (m/s³)
df['Jerk(m/s^3)'] = df['Acceleration(m/s^2)'].diff() / df['Time_Diff'].replace(0, np.nan).fillna(1)

# Calculate braking intensity (absolute value of negative acceleration)
df['Braking_Intensity'] = df['Acceleration(m/s^2)'].apply(lambda x: abs(x) if x < 0 else 0)

# Load sensitive locations (school, hospital, etc.)
sensitive_locations = pd.read_csv('../data/sensitive_location.csv')

# Function to calculate SASV (Sensitive Area Speed Violation)
def haversine_vectorized(lat1, lon1, lat2_series, lon2_series):
    R = 6371  # Earth's radius in kilometers
    d_lat = np.radians(lat2_series - lat1)
    d_lon = np.radians(lon2_series - lon1)
    lat1 = np.radians(lat1)
    lat2_series = np.radians(lat2_series)
    a = np.sin(d_lat / 2)**2 + np.cos(lat1) * np.cos(lat2_series) * np.sin(d_lon / 2)**2
    c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a))
    return R * c * 1000  # Distance in meters

def calculate_sasv(lat, lon, speed, sensitive_locations):
    sensitive_distances = haversine_vectorized(lat, lon, sensitive_locations['Latitude'], sensitive_locations['Longitude'])
    if np.any(sensitive_distances < 300):  # Within 300 meters of sensitive areas
        if speed > 8.33:  # Speed > 30 km/h in sensitive area
            return 1
    return 0

# Apply SASV calculation
df['SASV'] = df.apply(lambda row: calculate_sasv(row['Latitude'], row['Longitude'], row['Speed(m/s)'], sensitive_locations), axis=1)

# Calculate rule violation score for exceeding general speed limit
def calculate_speed_violation(row):
    speed_limit = 13.89  # ~50 km/h general speed limit
    if row['Speed(m/s)'] > speed_limit:
        return 1  # Speed violation
    return 0

df['Speed_Violation'] = df.apply(calculate_speed_violation, axis=1)

# ---------- Driving Score Calculation ---------- #
df['Driving_Score'] = 100

# Normalize the key features using MinMaxScaler
scaler = MinMaxScaler()
df[['Speed(m/s)', 'Acceleration(m/s^2)', 'Jerk(m/s^3)', 'Braking_Intensity']] = scaler.fit_transform(
    df[['Speed(m/s)', 'Acceleration(m/s^2)', 'Jerk(m/s^3)', 'Braking_Intensity']])

# Apply penalties based on normalized features
df['Driving_Score'] -= df['Speed(m/s)'] * 25  # Speed penalty
df['Driving_Score'] -= df['Acceleration(m/s^2)'] * 15  # Acceleration penalty
df['Driving_Score'] -= df['Jerk(m/s^3)'] * 5  # Jerk penalty
df['Driving_Score'] -= df['Braking_Intensity'] * 5  # Braking intensity penalty

# Penalty for violations
df['Driving_Score'] -= df['SASV'] * 15  # Penalty for violating sensitive areas
df['Driving_Score'] -= df['Speed_Violation'] * 15  # Penalty for general speed violation

# Ensure the score is within the range [0, 100]
df['Driving_Score'] = df['Driving_Score'].clip(upper=100, lower=0)

# Driving Category based on score
def categorize_driving_score(score):
    if score > 80:
        return 'Safe'
    elif score > 60:
        return 'Moderate'
    else:
        return 'Risky'

df['Driving_Category'] = df['Driving_Score'].apply(categorize_driving_score)

# Save the processed data
processed_columns = ['TripId', 'Time_Step', 'Latitude', 'Longitude', 'Speed(m/s)', 'Acceleration(m/s^2)', 
                     'Jerk(m/s^3)', 'Braking_Intensity', 'SASV', 'Speed_Violation', 'Driving_Score', 'Driving_Category']
processed_data = df[processed_columns]
processed_data.to_csv('../data/processed_data.csv', index=False)

print("Processed data saved to '../data/processed_data.csv'.")


KeyError: "['Jerk(m/s^3)', 'Braking_Intensity'] not in index"