tfpd_neighbors aims to find the closest neighbors to each scat. This allows for the future routing to take place. Firslty, all of the neighbors will be found using the shared roads. From there, the closest neighbor in each direction will be calculated using the long and lat of each scat.

In [6]:
import os
import pandas as pd
import re
from geopy.distance import geodesic
import math


Find SCAT neighbors based on the shared roads

In [7]:
scats_neighbors_file = 'data/scats_neighbors.csv'
scats_data_file = 'scats-10-2006.csv'

# Dictionary for equivalent road names
equivalent_roads = {
    'MAROONDAH_HWY': 'WHITEHORSE_RD',
    'BARKERS_RD': 'VICTORIA_ST',
    'HIGH STREET_RD' : 'HIGH_ST',
    'VICTORIA_ST' : 'DENMARK_ST',
    'TOORAK_RD' : 'BURWOOD_HWY',
    'EARL_ST' : 'VALERIE_ST',
    'OFFRAMP_EASTERN_FWY': 'EASTERN_FWY',
    'EASTERN_FWY_W_BD_RAMPS': 'EASTERN_FWY',
    'TRAFALGAR_RD': 'STANHOPE_GV',
    'BURWOOD_RD': 'CAMBERWELL_RD',
    'SWAN_ST': 'RIVERSDALE_RD',
}

#Set LAT and LONG for 4266 because, in the RAW DATA its missing inputs
def set_scat_4266_coordinates(grouped):
    grouped.loc[grouped['SCATS Number'] == 4266, 'NB_LATITUDE'] = -37.82529
    grouped.loc[grouped['SCATS Number'] == 4266, 'NB_LONGITUDE'] = 145.04387
    return grouped


#Get road names
def extract_road_names(location):
    #Remove directional information (e.g., N, E, S, W) and clean road names
    location_cleaned = re.sub(r'\b[NESW]+\b', '', location)
    roads = re.split(r'\s(of|near)\s', location_cleaned)

    #Clean road names and standardize them
    cleaned_roads = [road.strip() for road in roads if road.strip() and road.lower() not in ['of', 'near']]
    final_roads = []
    for road in cleaned_roads:
        sub_roads = re.split(r'\s(OF|NEAR)\s', road)
        for sub_road in sub_roads:
            sub_road = sub_road.strip()
            if sub_road == "OF" or ".." in sub_road:
                continue 
            #Standardize the road name using the equivalent_roads dictionary
            standardized_road = equivalent_roads.get(sub_road.upper(), sub_road)
            final_roads.append(standardized_road)
    
    return list(set(final_roads))  # Remove duplicates


#Create file if it doesnt exist
if not os.path.exists(scats_neighbors_file):
    print(f"{scats_neighbors_file} not found. Creating the file now.")
    
    data = pd.read_csv(scats_data_file)
    
    #Get roadnames for each SCAT
    data['Roads'] = data['Location'].apply(extract_road_names)

    #Select SCATS Number, Roads, NB_LATITUDE, NB_LONGITUDE
    data_selected = data[['SCATS Number', 'Roads', 'NB_LATITUDE', 'NB_LONGITUDE']]

    #Calculate ave long and lat for each SCAT
    grouped = data_selected.groupby('SCATS Number').agg({
        'Roads': lambda x: list(set([road for sublist in x for road in sublist])),
        'NB_LATITUDE': 'mean',
        'NB_LONGITUDE': 'mean'
    }).reset_index()

     # Set coordinates for SCAT 4266
    grouped = set_scat_4266_coordinates(grouped)

    #Find neighbors based on shared roads
    def find_neighbors(scats_num, road_names):
        # Standardize the road names before neighbor search
        standardized_road_names = [equivalent_roads.get(road.upper(), road) for road in road_names]
    
        # Find SCATS numbers with any shared road
        potential_neighbors = data[
            (data['SCATS Number'] != scats_num) & 
            (data['Roads'].apply(lambda roads: any(equivalent_roads.get(road.upper(), road) in standardized_road_names for road in roads)))
        ]
        return ', '.join(map(str, potential_neighbors['SCATS Number'].unique()))


    grouped['Neighbors'] = grouped.apply(lambda row: find_neighbors(row['SCATS Number'], row['Roads']), axis=1)
    grouped['Road(s)'] = grouped['Roads'].apply(lambda roads: ', '.join(roads))

    grouped.rename(columns={
        'SCATS Number': 'SCAT number',
        'NB_LATITUDE': 'NB_Latitude',
        'NB_LONGITUDE': 'NB_Longitude',
    }, inplace=True)

    grouped.drop(columns=['Roads'], inplace=True)

    grouped.to_csv(scats_neighbors_file, index=False)
    print(f"{scats_neighbors_file} has been created successfully.")
else:
    print(f"{scats_neighbors_file} already exists.")


data/scats_neighbors.csv not found. Creating the file now.
data/scats_neighbors.csv has been created successfully.


Find the closest neighbors is each direction using the shared roads and the long and lat

In [8]:
# Manual overwrites - connections between intersections that aren't mapped due to alternative connections - Or two roads have the same name 
#But are completely different roads
manual_overwrites = {
    970: {
        'Northwest': None
    },
    2846: {
        'North': None,
        'Northwest': None
    },
    3001: {
        'South': 4262
    },
    2820: {
        'East': 2825
    },
    3662: {
        'West': 3001
    },
    4040: {
        'Northwest': 4266
    },
    4262: {
        'North': 3001
    },
    4321: {
        'Southeast': None,
        'South': None
    },
    4324: {
        'West': 3662
    },
    4335: {
        'Southeast': None,
        'South': None
    },
    3662: {
        'East': 4324,
        'South': None,
        'West': None,
        'Southeast': None
    },
}

# Calculate distance and bearing between two points using geodesic
def calculate_distance_and_bearing(lat1, lon1, lat2, lon2):
    distance = geodesic((lat1, lon1), (lat2, lon2)).meters
    
    # Calculate bearing
    d_lon = math.radians(lon2 - lon1)
    lat1 = math.radians(lat1)
    lat2 = math.radians(lat2)
    x = math.sin(d_lon) * math.cos(lat2)
    y = math.cos(lat1) * math.sin(lat2) - math.sin(lat1) * math.cos(lat2) * math.cos(d_lon)
    bearing = math.degrees(math.atan2(x, y))
    
    # Normalize bearing to 0-360 degrees
    bearing = (bearing + 360) % 360
    return distance, bearing

# Get direction based on bearing (N, NE, E, SE, S, SW, W, NW)
def get_direction(bearing):
    if 22.5 <= bearing < 67.5:
        return 'Northeast'
    elif 67.5 <= bearing < 112.5:
        return 'East'
    elif 112.5 <= bearing < 157.5:
        return 'Southeast'
    elif 157.5 <= bearing < 202.5:
        return 'South'
    elif 202.5 <= bearing < 247.5:
        return 'Southwest'
    elif 247.5 <= bearing < 292.5:
        return 'West'
    elif 292.5 <= bearing < 337.5:
        return 'Northwest'
    else:
        return 'North'

# Find closest SCAT in each direction
def find_closest_neighbors(scats_num, lat, lon, neighbors):
    neighbors_list = [int(n.strip()) for n in neighbors.split(',')] if isinstance(neighbors, str) else []
    neighbors_data = data[data['SCAT number'].isin(neighbors_list)]
    
    closest_neighbors = {
        'North': None, 'Northeast': None, 'East': None, 'Southeast': None,
        'South': None, 'Southwest': None, 'West': None, 'Northwest': None
    }
    
    closest_distances = {direction: float('inf') for direction in closest_neighbors}
    
    # For each neighbor, calculate the distance and bearing
    for _, neighbor in neighbors_data.iterrows():
        neighbor_lat = neighbor['NB_Latitude']
        neighbor_lon = neighbor['NB_Longitude']
        distance, bearing = calculate_distance_and_bearing(lat, lon, neighbor_lat, neighbor_lon)
        
        direction = get_direction(bearing)
        
        # Update the closest neighbor in that direction
        if distance < closest_distances[direction]:
            closest_distances[direction] = distance
            closest_neighbors[direction] = neighbor['SCAT number']
    
    return closest_neighbors

# Load the existing scats_neighbors.csv file to process nearest neighbors
if os.path.exists(scats_neighbors_file):
    data = pd.read_csv(scats_neighbors_file)

    # Process each SCAT to find its closest neighbor in each direction
    for idx, row in data.iterrows():
        scats_num = row['SCAT number']
        lat = row['NB_Latitude']
        lon = row['NB_Longitude']
        
        # Use precomputed neighbors
        neighbors = row['Neighbors']
        
        closest_neighbors = find_closest_neighbors(scats_num, lat, lon, neighbors)
        
        # Check if there are manual overwrites for this SCAT number
        if scats_num in manual_overwrites:
            for direction, overwrite_neighbor in manual_overwrites[scats_num].items():
                closest_neighbors[direction] = overwrite_neighbor 

        # Save closest neighbor in all directions (N, NE, E, SE, S, SW, W, NW)
        for direction in closest_neighbors:
            data.at[idx, f'{direction} Neighbor'] = closest_neighbors[direction]
    
    data.drop(columns=['Neighbors', 'Road(s)'], inplace=True)

    # Save the updated DataFrame back to the same file
    data.to_csv(scats_neighbors_file, index=False)
    print(f"Updated neighbors saved to {scats_neighbors_file} with closest neighbors added.")
else:
    print(f"{scats_neighbors_file} does not exist. Please create the file in the previous section.")


Updated neighbors saved to data/scats_neighbors.csv with closest neighbors added.
