# Import & Initialization

In [None]:
import os
import time
import re
import pytz
import random
import uuid
import ast
import googlemaps
import gmplot

from datetime import datetime, timedelta
from dotenv import load_dotenv

import pandas as pd
import numpy as np
import osmnx as ox
import matplotlib.pyplot as plt
import plotly.express as px

from io import StringIO

from difflib import SequenceMatcher
from fuzzywuzzy import fuzz

from heapq import heappush, heappop

from sklearn.preprocessing import StandardScaler, OneHotEncoder,MinMaxScaler
from sklearn.cluster import DBSCAN
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score

In [None]:
load_dotenv()
API_KEY = os.getenv('GOOGLE_MAPS_API_KEY')
gmaps = googlemaps.Client(key=API_KEY)

# Generate Road Names (Riyadh)

## Create Road Names

In [None]:
city_name = 'Riyadh, Saudi Arabia'
G = ox.graph_from_place(city_name, network_type='drive')  # Get the road network for the city

edges = ox.graph_to_gdfs(G, nodes=False, edges=True)  # Convert the road network to a GeoDataFrame for edges

roads_with_coords = []

for _, row in edges.iterrows():
    origin = f"{row['geometry'].coords[0][1]},{row['geometry'].coords[0][0]}"  # Extract origin coordinates
    destination = f"{row['geometry'].coords[-1][1]},{row['geometry'].coords[-1][0]}"  # Extract destination coordinates
    
    road_name = row['name'] if 'name' in row and row['name'] is not None else "Unnamed road"  # Check if road name exists
    
    roads_with_coords.append((road_name, origin, destination)) 


In [None]:
list_graph_roads = []

for i in roads_with_coords:
    if isinstance(i[0], (list, np.ndarray)):  # Check if the road name is a list or numpy array
        if not pd.isnull(i[0]).any():  # Ensure none of the road names in the list are NaN
            list_graph_roads.append(i[0])
    else: 
        if not pd.isnull(i[0]):
            list_graph_roads.append(i[0])

In [None]:
road_names = [road_info[0] for road_info in roads_with_coords if isinstance(road_info[0], str)]

unique_road_names = set(road_names)

## Filter Road Names

In [None]:
def remove_duplicate_roads(road_names):
    """
    Removes duplicate or similar road names based on a similarity threshold using fuzzy matching.
    
    Args:
    - road_names (list): List of road names to process.
    
    Returns:
    - unique_roads (list): List of road names with duplicates removed.
    """
    unique_roads = []
    threshold = 80  # Similarity threshold for considering roads as duplicates
    for road in road_names:
        # Check if the road is not similar to any already added unique road
        if not any(fuzz.ratio(road, unique) > threshold for unique in unique_roads):
            unique_roads.append(road)
    
    return unique_roads

cleaned_roads = remove_duplicate_roads(unique_road_names) 


In [None]:
def get_road_start_end(gmaps_client, road_name, city='Riyadh'):
    """
    Retrieves the start and end geographic coordinates (latitude, longitude) for a given road 
    in a specified city using the Google Maps Geocoding API.
    
    Args:
    - gmaps_client: The Google Maps client for making API requests.
    - road_name (str): The name of the road to search for.
    - city (str, optional): The city where the road is located. Default is 'Riyadh'.
    
    Returns:
    - tuple: A tuple containing two sets of coordinates for the start and end points of the road, or (None, None) if the geocode request fails or no bounds are found.
    """
    try:
        geocode_result = gmaps_client.geocode(f"{road_name}, {city}")
        if geocode_result and 'bounds' in geocode_result[0]['geometry']:  # Ensure bounds exist
            start_point = geocode_result[0]['geometry']['bounds']['northeast']
            end_point = geocode_result[0]['geometry']['bounds']['southwest']
            return (start_point['lat'], start_point['lng']), (end_point['lat'], end_point['lng'])
        else:
            return None, None  # Return None if bounds are not found
    except Exception as e:
        print(f"Error getting start/end points for {road_name}: {e}")
        return None, None


In [None]:
def get_traffic_data(gmaps_client, road_name, city='Riyadh'):
    """
    Fetches real-time traffic data for a specified road using the Google Maps Directions API.
    
    Args:
    - gmaps_client: The Google Maps client for making API requests.
    - road_name (str): The name of the road to search for.
    - city (str, optional): The city where the road is located. Default is 'Riyadh'.
    
    Returns:
    - tuple: Duration in traffic (minutes) and distance (kilometers), or (None, None) if data retrieval fails.
    """
    start_coords, end_coords = get_road_start_end(gmaps_client, road_name, city)
    
    if start_coords is None or end_coords is None:
        print(f"Could not retrieve start/end points for {road_name}.")
        return None, None

    origin = f"{start_coords[0]},{start_coords[1]}"
    destination = f"{end_coords[0]},{end_coords[1]}"
    
    try:
        directions_result = gmaps_client.directions(
            origin,
            destination,
            mode="driving",
            departure_time="now",
            traffic_model="best_guess"
        )

        if directions_result:
            duration_in_traffic = directions_result[0]['legs'][0]['duration_in_traffic']['value'] / 60  # Convert to minutes
            distance = directions_result[0]['legs'][0]['distance']['value'] / 1000  # Convert to kilometers
            return duration_in_traffic, distance
        else:
            return None, None
    except Exception as e:
        print(f"Error fetching traffic data for {road_name}: {e}")
        return None, None

top_200_heap = []

# Iterate over all roads in the cleaned set to collect traffic data
for road in cleaned_roads:
    traffic_time, road_distance = get_traffic_data(gmaps, road)

    # Process only roads that are longer than 5 kilometers
    if traffic_time and road_distance and road_distance > 5:
        ratio = road_distance / traffic_time 

        # If heap has less than 200 items, add the current road
        if len(top_200_heap) < 200:
            heappush(top_200_heap, (-ratio, road, road_distance, traffic_time))  # Add road with inverted ratio for max-heap
        else:
            # If the heap is full, replace the highest ratio if the current road has a better (lower) ratio
            if -ratio > top_200_heap[0][0]: 
                heappop(top_200_heap) 
                heappush(top_200_heap, (-ratio, road, road_distance, traffic_time))


In [None]:
df = pd.DataFrame(top_200_heap, columns=['Negative Ratio', 'Road Name', 'Distance (km)', 'Time (mins)'])

df['Ratio (km/min)'] = -df['Negative Ratio']

df = df.drop(columns=['Negative Ratio'])

df.to_csv('filtered_road_names.csv', index=False)

## Add Start & End Points to road names

In [None]:
df = pd.read_csv('filtered_road_names.csv')
df[['Start_point','End_point']] = df['Road Name'].apply(lambda x: pd.Series(get_road_start_end(gmaps, x, city='Riyadh')))
df.to_csv('filtered_road_names.csv', index=False)

# Generate Data

In [None]:
# Function to determine traffic severity based on delay and duration
def determine_traffic_color(delay, duration):
    """
    Determines the traffic condition color based on the delay and duration.
    
    Args:
    - delay (float): The delay in minutes.
    - duration (float): The normal duration in minutes.
    
    Returns:
    - str: The color indicating traffic condition.
    """
    if delay < 0.05 * duration:
        return 'Blue'
    elif delay < 0.20 * duration:
        return 'Yellow'
    elif delay < 0.50 * duration:
        return 'Orange'
    elif delay < 1.00 * duration:
        return 'Red'
    else:
        return 'Dark Red'


# Function to retrieve traffic data using coordinates from a dataframe
def get_traffic_data_from_dataframe(gmaps_client, start_lat, start_lng, end_lat, end_lng, road_name, city='Riyadh'):
    """
    Fetches traffic data for a specific road using start and end coordinates from the Google Maps API.
    
    Args:
    - gmaps_client: The Google Maps client to make API requests.
    - start_lat, start_lng (float): Starting latitude and longitude.
    - end_lat, end_lng (float): Ending latitude and longitude.
    - road_name (str): The name of the road.
    - city (str, optional): The city for the road (default is 'Riyadh').
    
    Returns:
    - dict: A dictionary containing traffic data or None in case of failure.
    """
    origin = f"{start_lat},{start_lng}"
    destination = f"{end_lat},{end_lng}"
    
    try:
        directions_result = gmaps_client.directions(
            origin, destination, mode="driving", departure_time="now", traffic_model="best_guess"
        )

        if not directions_result:
            return None

        route = directions_result[0]['legs'][0]
        duration_in_traffic_min = route['duration_in_traffic']['value'] / 60  # Duration in minutes
        distance_km = route['distance']['value'] / 1000  # Distance in kilometers
        speed_kmh = distance_km / (duration_in_traffic_min / 60)  # Speed in km/h
        delay_min = (route['duration_in_traffic']['value'] - route['duration']['value']) / 60  # Delay in minutes
        traffic_condition = determine_traffic_color(delay_min, route['duration']['value'] / 60)  # Traffic condition
        timestamp = datetime.now().isoformat()
        
        return {
            'road_name': road_name,
            'distance_km': distance_km,
            'duration_in_traffic_min': duration_in_traffic_min,
            'speed_kmh': speed_kmh,
            'delay_min': delay_min,
            'traffic_condition': traffic_condition,
            'timestamp': timestamp,
            'Start_point': (start_lat, start_lng),
            'End_point': (end_lat, end_lng),
        }
    except Exception as e:
        print(f"Error fetching traffic data for {road_name}: {e}")
        return None


def generate_data_from_dataframe(csv_file, gmaps_client):
    """
    Processes traffic data from a CSV file containing road names and coordinates and saves the results as a new CSV file.
    
    Args:
    - csv_file (str): The path to the CSV file containing road names and coordinates.
    - gmaps_client: The Google Maps client to make API requests.
    
    Returns:
    - None: Saves the generated traffic data as a new CSV file.
    """
    df = pd.read_csv(csv_file)
    
    traffic_data_list = []
    df['Start_point'] = df['Start_point'].apply(ast.literal_eval)  # Convert Start_point strings to tuples
    df['End_point'] = df['End_point'].apply(ast.literal_eval)  # Convert End_point strings to tuples
    
    for _, row in df.iterrows():
        road_name = row['Road Name']
        start_lat, start_lng = row['Start_point']
        end_lat, end_lng = row['End_point']
        
        # Fetch traffic data for each road
        traffic_data = get_traffic_data_from_dataframe(gmaps_client, start_lat, start_lng, end_lat, end_lng, road_name)
        
        if traffic_data:
            traffic_data_list.append(traffic_data)

    if traffic_data_list:
        traffic_df = pd.DataFrame(traffic_data_list)
        
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        unique_filename = f"traffic_data_{timestamp}_{uuid.uuid4()}.csv"
        
        traffic_df.to_csv(unique_filename, index=False)
        
        print(f"Traffic data saved as '{unique_filename}'.")

generate_data_from_dataframe('filtered_road_names.csv', gmaps)


# Select Important Roads

In [None]:
directory = '/content/DS'

files = os.listdir(directory)

csv_files = [f for f in files if f.endswith('.csv')]

dfs = []
for csv_file in csv_files:
    file_path = os.path.join(directory, csv_file)
    df = pd.read_csv(file_path)
    dfs.append(df)

df = pd.concat(dfs, ignore_index=True)

In [None]:
df['timestamp'] = pd.to_datetime(df['timestamp'])

df['timestamp'] = pd.to_datetime(df['timestamp'] + timedelta(hours=3))

df = df[df['distance_km'] >= 10]
df = df[df['distance_km'] <=  40]

df['timestamp'] = pd.to_datetime(df['timestamp'])

df['hour'] = df['timestamp'].dt.hour
df['day'] = df['timestamp'].dt.day
df['month'] = df['timestamp'].dt.month

df['congestion_ratio'] = df['delay_min'] / df['distance_km']
df['distance_ratio'] = df['duration_in_traffic_min'] / df['distance_km']

df = df[df['day'] != 22] # National Day

congestion_info = df[['road_name', 'distance_km', 'delay_min', 'congestion_ratio','distance_ratio', 'hour','day','month', 'traffic_condition', 'timestamp', 'start_point', 'end_point']]

In [None]:
# Perform groupby with two columns ('start_point', 'end_point') and aggregate the mean of the specified columns
aggregation_result_3 = df.groupby(['road_name', 'start_point', 'end_point']).agg({
    'congestion_ratio': 'mean',
    'distance_ratio': 'mean'
}).reset_index()

traffic_condition_mode = df.groupby(['start_point', 'end_point'])['traffic_condition'].agg(lambda x: x.mode()[0]).reset_index()

aggregation_result_3 = pd.merge(aggregation_result_3, traffic_condition_mode, on=['start_point', 'end_point'])

ag_df_3 = aggregation_result_3


In [None]:
features = aggregation_result_3[['congestion_ratio', 'distance_ratio']]

silhouette_scores = []
K = range(2, 11)

for k in K:
    kmeans = KMeans(n_clusters=k, random_state=42)
    kmeans.fit(features)
    labels = kmeans.labels_
    score = silhouette_score(features, labels)
    silhouette_scores.append(score)

plt.figure(figsize=(8, 5))
plt.plot(K, silhouette_scores, 'bx-')
plt.xlabel('Number of Clusters (k)')
plt.ylabel('Silhouette Score')
plt.title('Silhouette Method for Optimal k')
plt.show()


In [None]:
features = ag_df_3[['congestion_ratio', 'distance_ratio']]

scaler = MinMaxScaler()
features_scaled = scaler.fit_transform(features)

kmeans = KMeans(n_clusters=2, random_state=42) # Select the optimal number of clusters from the Silhouette method
ag_df_3['cluster'] = kmeans.fit_predict(features_scaled)

ag_df_3[['road_name', 'start_point', 'end_point', 'cluster']]

In [None]:
fig = px.scatter(
    ag_df_3,
    x='distance_ratio',
    y='congestion_ratio',
    color='cluster',
    hover_data=['road_name', 'start_point', 'end_point'],
    title='Clusters Based on Congestion Ratio and Distance Ratio',
    color_discrete_sequence=['blue', 'Red'] 
)


fig.update_layout(
    xaxis_title='Distance Ratio',
    yaxis_title='Congestion Ratio',
    width=600,
    height=400,
    coloraxis_showscale=False
)

fig.show()

In [None]:
cluster_data_3_0 = ag_df_3[ag_df_3['cluster'] == 0]
display(cluster_data_3_0)

In [None]:
cluster_data_3_1 = ag_df_3[ag_df_3['cluster'] == 1]
display(cluster_data_3_1)