In [1]:
import googlemaps
import pandas as pd
import random
import json
from dotenv import load_dotenv
from sklearn.neighbors import NearestNeighbors
from sklearn.cluster import DBSCAN
import numpy as np
from scipy.spatial import ConvexHull
from sklearn.metrics import silhouette_score
from scipy.spatial.distance import pdist, squareform
from kneed import KneeLocator
import time
import os
import matplotlib.pyplot as plt

In [2]:
current_file_directory = os.path.dirname(os.path.abspath(__file__))
dotenv_path = os.path.join(current_file_directory, '.env')
# Load environment variables from .env file
load_dotenv(dotenv_path)

# Access the environment variable
my_api_key = os.getenv("GOOGLE_KEY")

gmaps = googlemaps.Client(key=my_api_key)


def load_distance_cache():
    try:
        with open("distance_cache.json", "r") as file:
            loaded_cache = json.load(file)
            # Convert string keys back to tuples if your logic requires tuple keys
            return {tuple(key.split(",")): value for key, value in loaded_cache.items()}
    except (FileNotFoundError, json.JSONDecodeError):
        return {}  # Return an empty dictionary if there is no file or decoding fails


def save_distance_cache(cache):
    with open("distance_cache.json", "w") as file:
        # Convert tuple keys to a string format
        formatted_cache = {",".join(key): value for key, value in cache.items()}
        json.dump(formatted_cache, file, indent=4)


# Make sure to call this once at the start of your program or script
distance_cache = load_distance_cache()

In [None]:
def read_cities_csv_to_dict(csv_file_path):
    """
    Reads a CSV file containing city information and returns a dictionary
    with city names as keys and their coordinates (latitude and longitude) as values.

    Args:
    csv_file_path: The file path to the CSV file containing the city data.

    Returns:
    A dictionary with city names as keys and (latitude, longitude) tuples as values.
    """
    # Load the CSV file into a DataFrame
    df = pd.read_csv(csv_file_path, delimiter=",")

    duplicates = df[df.duplicated(keep=False)]
    if duplicates.empty:
        print("No duplicates found.")
    else:
        print("Duplicates found:", len(duplicates))
        df.drop_duplicates(keep="first", inplace=True)

    # Create a dictionary from the DataFrame
    city_coordinates = {
        row["city"]: (row["lat"], row["lng"])
        for index, row in df.iterrows()
        if row["country"] == "Sweden"
    }
    print(city_coordinates)

    return city_coordinates


csv_file_path = "Sweden_cities.csv"
cities_coordinates = read_cities_csv_to_dict(csv_file_path)

In [None]:
# code inspired from https://medium.com/@tarammullin/dbscan-parameter-estimation-ff8330e3a3bd
# and https://kneed.readthedocs.io/en/stable/parameters.html

def calculate_eps(coordinates, min_samples=2):
    """
    Automatically calculates the eps parameter for DBSCAN based on the nearest neighbors.

    Args:
    coordinates: List of (latitude, longitude) tuples for the cities.
    min_samples: The minimum samples in a neighborhood for a point to be considered as a core point.

    Returns:
    The calculated eps value.
    """
    # Use NearestNeighbors to find the distance to the nearest min_samples points
    nn = NearestNeighbors(n_neighbors=min_samples)
    nn.fit(coordinates)
    distances, indices = nn.kneighbors(coordinates)

    # Take the distance to the farthest of the min_samples points
    distances = np.sort(distances[:, min_samples - 1], axis=0)

    # Find the "knee" in the distances graph which is a good estimate for eps
    knee_locator = KneeLocator(
        range(len(distances)), distances, curve="convex", direction="increasing"
    )
    eps = distances[knee_locator.knee] if knee_locator.knee else np.mean(distances)
    return eps

    # Uncomment and move inside the function to Plot the nearest distances with the knee point
"""     plt.figure(figsize=(10, 6))
    plt.plot(range(len(distances)), distances, marker="o")
    if knee_locator.knee is not None:
        plt.vlines(knee_locator.knee, plt.ylim()[0], plt.ylim()[1], linestyles="dashed")
        plt.scatter(
            knee_locator.knee,
            distances[knee_locator.knee],
            color="red",
            s=100,
            zorder=5,
            label=f"Knee at index {knee_locator.knee}, eps={eps:.3f}",
        )
    plt.title("Elbow Method for Determining Optimal eps")
    plt.xlabel("Points sorted by distance to min_samples-th nearest neighbor")
    plt.ylabel("Distance to min_samples-th nearest neighbor")
    plt.legend()
    plt.grid(True)
    plt.show() """

In [None]:
def calculate_perimeter_and_area(points):
    """
    Calculate the perimeter of the convex hull of a set of points.

    Args:
    points: An array of points in the format [(x1, y1), (x2, y2), ...]

    Returns:
    The perimeter of the convex hull of the given points.
    """
    if len(points) < 3:
        # Not enough points to form a convex hull; return 0
        return np.nan, np.nan  # Use NaN to indicate the value is not available

    # Ensure all points do not lie on a single line or are not identical
    if np.std(points[:, 0]) == 0 or np.std(points[:, 1]) == 0:
        return np.nan, np.nan  # Points are collinear or identical in one dimension

    try:
        hull = ConvexHull(points)
        perimeter = hull.area
        area = hull.volume
        return perimeter, area
    except Exception as e:
        print("Failed to compute ConvexHull:", e)
        return np.nan, np.nan

In [None]:
# Inspired by https://scikit-learn.org/stable/auto_examples/cluster/plot_dbscan.html

def dbscan_and_metrics(coordinates):
    """
    Performs DBSCAN clustering on the provided coordinates and calculates various metrics for each cluster.

    This function automatically calculates the 'eps' parameter for DBSCAN using the nearest neighbors approach,
    performs the clustering, and then calculates metrics such as silhouette score, cluster sizes, inter-cluster distances,
    and various geometric properties of the clusters like perimeter and area.

    Args:
        coordinates (dict): A dictionary with city names as keys and (latitude, longitude) tuples as values.

    Returns:
        dict: A dictionary containing:
            - 'cluster_IDs': An array of cluster labels for each point.
            - 'cluster_sizes': A list with the size (number of points) of each cluster.
            - 'n_clusters': The number of clusters found, excluding noise.
            - 'avg_inter_cluster_distance_km': The average distance between clusters in kilometers.
            - 'min_inter_cluster_distance_km': The minimum distance between any two clusters in kilometers.
            - 'max_inter_cluster_distance_km': The maximum distance between any two clusters in kilometers.
            - 'average_silhouette': The average silhouette score across all clusters.
            - 'n_noise_points': The number of points classified as noise.
            - 'std_dev_cluster_sizes': The standard deviation of the sizes of the clusters.
            - 'average_cluster_density': The average density of clusters, defined as size/area.
            - 'average_cluster_perimeter': The average perimeter of the clusters.
            - 'average_cluster_area': The average area of the clusters.
            - 'average_cluster_complexity': An average measure of cluster complexity, defined as perimeter/sqrt(area).
    """
    if not coordinates:
        print("No coordinates provided for clustering.")
        return {}

    # Convert city coordinates to a NumPy array for DBSCAN
    X = np.array(list(coordinates.values()))
    if X.size == 0:
        print("Empty coordinate array.")
        return {}

    min_sample = 4  # based on https://www.theaidream.com/post/dbscan-clustering-algorithm-in-machine-learning 2.dim
    start_time_eps = time.time()
    # Calculate eps automatically
    eps = calculate_eps(X, min_samples=min_sample)
    execution_time_eps = time.time() - start_time_eps

    # Perform DBSCAN clustering
    db = DBSCAN(eps=eps, min_samples=min_sample, metric="euclidean").fit(X)
    labels = db.labels_

    # Number of clusters, excluding noise if present
    n_clusters_ = len(set(labels)) - (1 if -1 in labels else 0)
    n_noise_ = list(labels).count(-1)

    # Calculate silhouette score
    if n_clusters_ == 0:
        print("No clusters found.")
        return {
            "DBSCAN_min_sample": min_sample,
            "DBSCAN_eps": eps,
            "cluster_IDs": [],
            "cluster_sizes": [],
            "n_clusters": 0,
            "n_noise_points": n_noise_,
            "avg_inter_cluster_distance_km": np.nan,
            "min_inter_cluster_distance_km": np.nan,
            "max_inter_cluster_distance_km": np.nan,
            "average_silhouette": np.nan,
            "std_dev_cluster_sizes": std_dev_cluster_sizes,
            "avg_cluster_density": np.nan,
            "avg_cluster_perimeter": np.nan,
            "avg_cluster_area": np.nan,
            "avg_cluster_complexity": np.nan,
            "eps_exec_time": execution_time_eps,
        }
    elif n_clusters_ > 1:
        silhouette_avg = silhouette_score(X, labels)
    else:
        silhouette_avg = np.nan  # silhouette score is not meaningful with 1 or 0 clusters

    # Prepare cluster information
    clusters = [X[labels == i] for i in range(n_clusters_)]
    cluster_sizes = [len(cluster) for cluster in clusters]
    std_dev_cluster_sizes = np.std(cluster_sizes) if cluster_sizes else 0

    # Calculate inter-cluster distances
    cluster_centers = [np.mean(cluster, axis=0) for cluster in clusters]
    if len(cluster_centers) > 1:
        inter_cluster_distances = pdist(cluster_centers, 'euclidean') * 111  # Approx. conversion from degrees to km
        avg_inter_cluster_distance = np.mean(inter_cluster_distances)
        min_inter_cluster_distance = np.min(inter_cluster_distances)
        max_inter_cluster_distance = np.max(inter_cluster_distances)
    else:
        avg_inter_cluster_distance = min_inter_cluster_distance = max_inter_cluster_distance = np.nan

    cluster_perimeters, cluster_areas = zip(
        *[calculate_perimeter_and_area(cluster) for cluster in clusters]
    )

    return {
        "DBSCAN_min_sample": min_sample,
        "DBSCAN_eps": eps,
        "cluster_IDs": labels,
        "cluster_sizes": cluster_sizes,
        "n_clusters": n_clusters_,
        "n_noise_points": n_noise_,
        "avg_inter_cluster_distance_km": avg_inter_cluster_distance,
        "min_inter_cluster_distance_km": min_inter_cluster_distance,
        "max_inter_cluster_distance_km": max_inter_cluster_distance,
        "avg_silhouette": silhouette_avg,
        "std_dev_cluster_sizes": std_dev_cluster_sizes,
        "avg_cluster_density": np.mean(
            [
                size / area if area else 0
                for size, area in zip(cluster_sizes, cluster_areas)
            ]
        ),
        "avg_cluster_perimeter": (
            np.mean(cluster_perimeters) if cluster_perimeters else np.nan
        ),
        "avg_cluster_area": np.mean(cluster_areas) if cluster_areas else np.nan,
        "avg_cluster_complexity": np.mean(
            [
                perimeter / np.sqrt(area) if area else 0
                for perimeter, area in zip(cluster_perimeters, cluster_areas)
            ]
        ),
        "eps_exec_time": execution_time_eps,
    }

In [None]:
# API call code inspired from https://medium.com/how-to-use-google-distance-matrix-api-in-python/how-to-use-google-distance-matrix-api-in-python-ef9cd895303c
# and https://www.linkedin.com/pulse/calculating-distances-using-python-google-maps-r%C3%A9gis-nisengwe/


def fetch_distances(selected_cities):
    """
    Fetches the driving distances between each pair of selected cities using Google Maps API.
    Updates and uses a local cache to minimize API calls.
    """
    print("started fetch_distances")
    distances = {}
    cache_updated = False
    for origin in selected_cities:
        distances[origin] = {}
        for destination in selected_cities:
            if origin == destination:
                distances[origin][destination] = 0
            else:
                # Sort and convert to string to use as a JSON-compatible key
                cache_key = tuple(sorted([origin, destination]))
                str_cache_key = ",".join(cache_key)
                if cache_key in distance_cache:
                    distances[origin][destination] = distance_cache[cache_key]
                    print("Found cache")
                else:
                    try:
                        print("started API call")
                        result = gmaps.distance_matrix(
                            origins=[cities_coordinates[origin]],
                            destinations=[cities_coordinates[destination]],
                            mode="driving",
                        )
                        distance = result["rows"][0]["elements"][0]["distance"]["value"]
                        distances[origin][destination] = distance
                        distance_cache[cache_key] = distance  # Store using tuple
                        cache_updated = True
                        print("finished API call")
                    except Exception as e:
                        print(
                            f"Error fetching distance between {origin} and {destination}: {e}"
                        )
                        distances[origin][destination] = float(
                            "inf"
                        )  # Assign a high cost in case of error
    if cache_updated:
        save_distance_cache(distance_cache)
    print("finished fetch_distances")
    return distances

In [None]:
def calculate_cost(solution, distances):
    """
    Calculates the total travel cost of a given solution based on the distances between cities.

    Args:
        solution (list): An ordered list of city names representing the tour.
        distances (dict): A nested dictionary of distances between each pair of cities.

    Returns:
        float: The total cost of the solution in kilometers.
    """
    # Calculate the total distance in meters first
    total_distance_meters = (
        sum(distances[solution[i]][solution[i + 1]] for i in range(len(solution) - 1))
        + distances[solution[-1]][solution[0]]
    )
    # Convert the total distance to kilometers
    total_distance_km = total_distance_meters / 1000.0
    return total_distance_km

In [None]:
# Inspired by https://jupyter.brynmawr.edu/services/public/dblank/jupyter.cs/FLAIRS-2015/TSPv3.ipynb

def nearest_neighbor(cities, distances):
    """
    Finds a tour by repeatedly visiting the nearest unvisited city.

    Args:
        cities (list): A list of city names to visit.
        distances (dict): A nested dictionary of distances between each pair of cities.

    Returns:
        list: An ordered list representing the tour of cities.
    """
    print("started nearest_neighbor")
    start_city = random.choice(cities)
    unvisited = set(cities)
    unvisited.remove(start_city)
    tour = [start_city]
    while unvisited:
        last_city = tour[-1]
        next_city = min(unvisited, key=lambda city: distances[last_city][city])
        unvisited.remove(next_city)
        tour.append(next_city)
    print("finished nearest_neighbor")
    return tour