Import statements

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.neighbors import KNeighborsClassifier
import networkx as nx

Predictor File

In [None]:
n_pts = 100
noise = 0.5
len_x = 1440
shift_x = 0
min_y = 5
max_y = 10

class Predictor:
    def __init__(self, data=None, max_k=15, k_iter=20, bias=2):
        self.data = data if data else self.generate_data()
        self.max_k = max_k + 1
        self.k_iter = k_iter
        self.bias = bias
        
        self.clusters = {}
        self.pred = []
    
    def generate_data(self):
        X = []
        for i in range(n_pts):
            v = i * 2 * np.pi / 100
            X.append((((v + shift_x) % (2 * np.pi)) * len_x,
            (np.sin(v) + 1 + np.random.rand() * noise) * (max_y - min_y) + min_y))
        return np.array(X)
    
    # Clustering functions
    def __distance(self, p1, p2):
        return np.sqrt(np.sum((p1-p2)**2))

    def __gen_clusters(self, k: int):
        self.clusters = {}

        for i in range(k):
            centre = self.data[i * self.data.shape[0] // k]
            cluster = {
            'centre' : centre,
            'points' : []
            }
            self.clusters[i] = cluster

    def __assign_clusters(self):
        k = len(self.clusters)

        for x in self.data:
            dist = []

            for j in range(k):
                dis = self.__distance(x, self.clusters[j]['centre'])
                dist.append(dis)
                self.clusters[np.argmin(dist)]['points'].append(x)

    def __update_clusters(self):
        k = len(self.clusters)

        for i in range(k):
            points = np.array(self.clusters[i]['points'])
            if points.shape[0] > 0:
                new_centre = points.mean(axis =0)
                self.clusters[i]['centre'] = new_centre

                self.clusters[i]['points'] = []

    def __pred_cluster(self):
        k = len(self.clusters)
        pred = []

        for x in self.data:
            dist = []
            for j in range(k):
                dist.append(self.__distance(x, self.clusters[j]['centre']))
            self.clusters[np.argmin(dist)]['points'].append(x)
            pred.append(np.argmin(dist))

        return pred

    def __silhouette(self):
        scores = []
        for i in self.clusters:
            agg_scr = 0
            neighbours = []
            
            # Skip empty clusters
            if len(self.clusters[i]['points']) == 0:
                continue
            
            for x in self.clusters[i]['points']:
                in_dist = 0
                out_dist = 0
                for j in self.clusters:
                    if len(self.clusters[j]['points']) == 0:
                        continue

                    dist = self.__distance(x, self.clusters[j]['centre'])
                    if i != j:
                        neighbours.append(dist)
                        # print(neighbours)
                    else:
                        in_dist = dist

            out_dist = np.min(neighbours)
            scr = (out_dist - in_dist) / np.max((in_dist, out_dist))
            agg_scr += scr

            scores.append(agg_scr / len(self.clusters[i]['points']))
        return scores
            
    def recluster(self, show: bool=False):
        agg_scr = []
        for k in range(4, self.max_k):
            self.__gen_clusters(k)
            for _ in range(self.k_iter):
                self.__assign_clusters()
                self.__update_clusters()
            self.pred = self.__pred_cluster()
            scores = self.__silhouette()
            agg_scr.append((k, np.mean(scores)))

        agg_scr.sort(key=lambda x: x[1])
        self.__gen_clusters(agg_scr.pop()[0])
        self.__assign_clusters()
        self.__update_clusters()
        self.pred = self.__pred_cluster()
        
        if show:
            self.show_clusters()
        
    def show_clusters(self):
        plt.scatter(self.data[:, 0], self.data[:, 1], c = self.pred)
        for i in self.clusters:
            centre = self.clusters[i]['centre']
            plt.scatter(centre[0], centre[1], marker = '^', c = 'red')
        plt.xlabel('Minutes from midnight')
        plt.ylabel('Travel time (in mins)')
        plt.show()


    # KNN-means and prediction functions
    def __gaussian_average(self, time: float):
        prod = 0
        total = 0
        for x in self.data:
            if time - 30 < x[0] < time + 30:
                wei = np.e ** (-0.5 * ((x[0] - time) * 2.5) ** 2)
                prod += wei * x[1]
                total += wei
        return prod / total if total > 0 else prod
                
    
    def __knn_m(self, time: float, new_y: float, show: bool):
        knn = KNeighborsClassifier(n_neighbors=self.data.shape[0] // 10)
        knn.fit(self.data, self.pred)
        new_point = np.array([(time, new_y)])

        prediction = knn.predict(new_point)
        fin_X = np.concat((self.data, new_point))
        self.pred.append(prediction[0])
        
        tot_dist = 0
        clust = self.clusters[prediction[0]]['points'].copy()
        for pt in clust:
            pt[1] *= self.bias
            tot_dist += self.__distance(pt, new_point)

        final_y = 0
        for pt in clust:
            final_y += pt[1] * self.__distance(pt, new_point) / tot_dist
        final_y = final_y / self.bias
        
        if show:
            plt.scatter(fin_X[:, 0], fin_X[:, 1], c = self.pred)
            plt.scatter(time, new_y, c = 'red')
            plt.scatter(time, final_y, marker = '*', s = 250, c = 'white', edgecolors = 'black', linewidths=0.75)
            for i in self.clusters:
                centre = self.clusters[i]['centre']
                plt.scatter(centre[0], centre[1], marker = '^', c = 'red')
            plt.xlabel('Minutes from midnight')
            plt.ylabel('Travel time (in mins)')
            plt.show()
            
        return final_y
    
    def heuristic(self, time: float, show: bool=False):
        trav_t = self.__gaussian_average(time)
        return self.__knn_m(time, trav_t, show)

A_star File

In [None]:
#import Predictor as pred
# This file has been written such that it serves as a API call for performing A* search

# f = g + h #
TIME = 1220
pred_mat = [[Predictor() for _ in range(29)] for _ in range(29)]
count = 0
for i in pred_mat:
    for x in i:
        x.recluster()
        count += 1
        print(count)


def a_star(graph, origin, destination, graph_neighbor_coordinates_keys_to_index):
    open_list = [origin]

    g_score = {node: float('inf') for node in graph}
    g_score[origin] = 0
    parent = {origin: None}
    f_score = {node: float('inf') for node in graph}
    f_score[origin] = pred_mat[graph_neighbor_coordinates_keys_to_index[origin]][graph_neighbor_coordinates_keys_to_index[destination]].heuristic(TIME)

    while open_list:
        current = min(open_list, key=lambda node: f_score[node])
        open_list.remove(current)

        if current == destination:
            path = []
            while current:
                path.append(current)
                current = parent[current]
            return path[::-1]   # Returns a list of the names of nodes
        
        for neighbor, cost in graph[current].items():
            tentative_g = g_score[current] + cost

            if tentative_g < g_score[neighbor]:
                g_score[neighbor] = tentative_g
                f_score[neighbor] = tentative_g + heuristic(TIME)
                parent[neighbor] = current

                if neighbor not in open_list:
                    open_list.append(neighbor)

    return 'Path NotFoundError\n'




Graph Maker code

In [None]:
#import a_Star

# Define 30 commonly known locations in Manipal with coordinates
manipal_locations = {
    "MIT": (10, 50), "Marena": (15, 70), "End Point": (30, 90), "Tiger Circle": (40, 50),
    "Manipal Lake": (60, 80), "Kamath Circle": (25, 60), "KMC": (50, 50), "SOC": (35, 40),
    "MSAP": (20, 30), "DOC": (45, 20), "Deetee": (55, 30), "Hangyo": (70, 60),
    "Dollops": (50, 65), "China Valley": (65, 50), "TC Canteen": (42, 55), "Health Center": (48, 72),
    "Library": (38, 30), "Manipal Greens": (28, 85), "Valley View": (70, 40),
    "16th Block": (15, 55), "8th Block": (20, 45), "AB5": (30, 35), "NLH": (25, 25),
    "FC2": (40, 20), "Pharmacy Block": (55, 15), "Swimming Pool": (60, 35), 
    "Innovation Center": (75, 25), "Student Plaza": (80, 50), "MIT Cafeteria": (85, 70)
}

# Create graph
G = nx.Graph()
positions = {i: manipal_locations[name] for i, name in enumerate(manipal_locations.keys())}
node_names = list(manipal_locations.keys())

# Add nodes
for i, name in enumerate(node_names):
    G.add_node(i, name=name, pos=positions[i])

# Define edges
edges = [
    ("MIT", "16th Block"), ("MIT", "Marena"), ("MIT", "Kamath Circle"), ("16th Block", "8th Block"), 
    ("8th Block", "AB5"), ("AB5", "NLH"), ("NLH", "FC2"), ("FC2", "Pharmacy Block"),
    ("Marena", "Manipal Greens"), ("Manipal Greens", "End Point"), ("Kamath Circle", "Tiger Circle"),
    ("Tiger Circle", "KMC"), ("Tiger Circle", "DOC"), ("KMC", "Health Center"), ("KMC", "Dollops"), 
    ("Dollops", "China Valley"), ("China Valley", "Valley View"), ("Valley View", "Hangyo"), 
    ("Hangyo", "Student Plaza"), ("Student Plaza", "MIT Cafeteria"), ("MIT Cafeteria", "Manipal Lake"), 
    ("Manipal Lake", "End Point"), ("Health Center", "Library"), ("Library", "TC Canteen"), 
    ("TC Canteen", "DOC"), ("DOC", "Deetee"), ("Deetee", "Swimming Pool"), 
    ("Swimming Pool", "Innovation Center"), ("Innovation Center", "Pharmacy Block"), ("Pharmacy Block", "FC2")
]

graph_neighbor_coordinates = {'MIT': [['16th Block', 'Marena', 'Kamath Circle'], [(15, 55), (15, 70), (25, 60)]],
                               '16th Block': [['8th Block'], [(20, 45)]], '8th Block': [['AB5'], [(30, 35)]], 
                               'AB5': [['NLH'], [(25, 25)]], 'NLH': [['FC2'], [(40, 20)]], 
                               'FC2': [['Pharmacy Block'], [(55, 15)]], 'Marena': [['Manipal Greens'], [(28, 85)]], 
                               'Manipal Greens': [['End Point'], [(30, 90)]], 'Kamath Circle': [['Tiger Circle'], [(40, 50)]],
                               'Tiger Circle': [['KMC', 'DOC'], [(50, 50), (45, 20)]], 'KMC': [['Health Center', 'Dollops'], [(48, 72), (50, 65)]],
                               'Dollops': [['China Valley'], [(65, 50)]], 'China Valley': [['Valley View'], [(70, 40)]], 
                               'Valley View': [['Hangyo'], [(70, 60)]], 'Hangyo': [['Student Plaza'], [(80, 50)]], 
                               'Student Plaza': [['MIT Cafeteria'], [(85, 70)]], 'MIT Cafeteria': [['Manipal Lake'], [(60, 80)]], 
                               'Manipal Lake': [['End Point'], [(30, 90)]], 'Health Center': [['Library'], [(38, 30)]], 
                               'Library': [['TC Canteen'], [(42, 55)]], 'TC Canteen': [['DOC'], [(45, 20)]], 
                               'DOC': [['Deetee'], [(55, 30)]], 'Deetee': [['Swimming Pool'], [(60, 35)]], 
                               'Swimming Pool': [['Innovation Center'], [(75, 25)]], 'Innovation Center': [['Pharmacy Block'], [(55, 15)]], 
                               'Pharmacy Block': [['FC2'], [(40, 20)]]}

graph_neighbor_coordinates_keys_to_index = dict()
for key, i in zip(graph_neighbor_coordinates, range(len(graph_neighbor_coordinates))):
    graph_neighbor_coordinates_keys_to_index[key] = i

# Convert location names to node indices and add edges with weights
for loc1, loc2 in edges:
    node1 = node_names.index(loc1)
    node2 = node_names.index(loc2)
    distance = np.linalg.norm(np.array(positions[node1]) - np.array(positions[node2]))  # Euclidean distance
    G.add_edge(node1, node2, weight=round(distance, 2))

# Define heuristic values (replace with actual heuristics)
heuristic_values = {i: np.random.randint(1, 10) for i in G.nodes}

# Get user input
print("\nAvailable Locations:\n")
print(", ".join(node_names))
start_name = input("\nEnter start location: ").strip()
end_name = input("Enter end location: ").strip()

# Validate input
if start_name not in node_names or end_name not in node_names:
    print("Invalid locations! Please check the list and try again.")
    exit()

start = node_names.index(start_name)
end = node_names.index(end_name)

######################################################################################
# Compute shortest path
#path = nx.shortest_path(G, source=start, target=end, weight='weight')

# Create dictionary graph for a_star
graph_dict = {}
for node in G.nodes:
    name = node_names[node]
    neighbors = {}
    for neighbor in G[node]:
        neighbor_name = node_names[neighbor]
        weight = G[node][neighbor]['weight']
        neighbors[neighbor_name] = weight
    graph_dict[name] = neighbors

# Run custom A* algorithm
path_names = a_star(graph_dict, start_name, end_name, graph_neighbor_coordinates_keys_to_index)
path = [node_names.index(name) for name in path_names]

# Check if path is found
if isinstance(path_names, str):
    print(path_names)
    exit()

# Convert name-based path to index-based path for plotting
path = [node_names.index(name) for name in path_names]


########################################################################
#path2 = a_Star.a_star(G, origin=start, destination=end)
#path = [node_names.index(name) for name in path_names]
########################################################################

########################################################################
path_cost = round(sum(G[u][v]['weight'] for u, v in zip(path, path[1:])), 2)
# Create figure
fig, ax = plt.subplots(figsize=(10, 8))

# Draw base graph
nx.draw(G, positions, labels={i: node_names[i] for i in G.nodes}, node_color="blue", edge_color="gray",
        with_labels=True, node_size=300, font_size=8)

# Draw edge labels (path costs)
edge_labels = nx.get_edge_attributes(G, 'weight')
nx.draw_networkx_edge_labels(G, positions, edge_labels=edge_labels)

# Draw node labels (heuristic values)
heuristic_labels = {i: f"{node_names[i]}\n(h={heuristic_values[i]})" for i in G.nodes}
#nx.draw_networkx_labels(G, positions, labels=heuristic_labels, font_color='red')

# Highlight start and end nodes
#nx.draw_networkx_nodes(G, positions, nodelist=[start], node_color='lime', node_size=500)
#nx.draw_networkx_nodes(G, positions, nodelist=[end], node_color='magenta', node_size=500)

# Get axis limits
ax.set_xlim(min(x for x, y in positions.values()) - 10, max(x for x, y in positions.values()) + 10)
ax.set_ylim(min(y for x, y in positions.values()) - 10, max(y for x, y in positions.values()) + 10)

# Define bottom center coordinates for path cost display
x_min, x_max = ax.get_xlim()
y_min, y_max = ax.get_ylim()
bottom_x = (x_min + x_max) / 2
bottom_y = y_min + (y_max - y_min) * 0.02  # 2% above the bottom

# Display path cost at the bottom center
cost_text = plt.text(bottom_x, bottom_y, f"Total Path Cost: {path_cost}", fontsize=12, ha='center', color='black')

# Highlight the path
path_edges = list(zip(path, path[1:]))
nx.draw_networkx_edges(G, positions, edgelist=path_edges, edge_color='red', width=2)

# Show the plot
plt.title(f"Shortest Path from {start_name} to {end_name}", fontsize=14)
plt.tight_layout()
plt.show()
