In [None]:
import re
from collections import defaultdict
from datetime import date, datetime

import numpy as np
import pandas as pd

In [None]:
# Import dataset
raw_data = pd.read_csv(
	'./scats_data.csv',
	dtype={
		'SCATS Number': int,
		'Location': str,
		'NB_LATITUDE': float,
		'NB_LONGITUDE': float
	}
)

# Rename select columns
raw_data.rename(columns={
	'SCATS Number': 'SCATS',
	'NB_LATITUDE': 'Latitude',
	'NB_LONGITUDE': 'Longitude',
}, inplace=True)

# SCATS is the intersection ID
# Location is [owner road] [direction from intersection] [other road in intersection]

raw_data.drop_duplicates(inplace=True)

# Fix Auburn N/Burwood intersection missing position
# https://www.openstreetmap.org/way/1092802786#map=19/-37.823687/145.045020
# south: -37.82542, 145.04346
# east: -37.82529, 145.04387
# west: -37.82518, 145.04301
# north: -37.82505, 145.04346 (estimated by Claude)
def fix_burwood_auburn_latitude(_latitude: float):
	# Do it this funky way to avoid floating point nonsense
	if _latitude == 0:
		return -37.82505
	else:
		return _latitude

def fix_burwood_auburn_longitude(_longitude: float):
	if _longitude == 0:
		return 145.04346
	else:
		return _longitude

raw_data['Latitude'] = raw_data['Latitude'].apply(fix_burwood_auburn_latitude)
raw_data['Longitude'] = raw_data['Longitude'].apply(fix_burwood_auburn_longitude)
raw_data

In [None]:
# Import site reference
raw_reference = pd.read_csv(
	'./scats_reference.csv',
	names=['SCATS', 'Intersection', 'Site_Type'],
	header=0,
	dtype={
		'SCATS': np.int32,
		'Intersection': str,
		'Site_Type': str
	}
)

raw_reference.drop_duplicates(inplace=True)
# Remove any site that isn't an intersection (rest are unused)
raw_reference = raw_reference[raw_reference.Site_Type == 'INT']
raw_reference.drop(columns={'Site_Type'}, inplace=True)
raw_reference

In [None]:
# Perform an inner merge to keep only SCATS sites present in both tables
merged_df = pd.merge(raw_reference, raw_data, on='SCATS', how='inner')
merged_df

In [None]:
# Extract location information
extracted = merged_df.copy()

def process_location(_locations: pd.Series):
	streets: list[str] = []
	directions: list[str] = []

	for _, item in _locations.items():
		parts: list[str] = re.split(' of ', item, flags=re.IGNORECASE)
		first_part = parts[0]

		# Get all words in the first part
		words = first_part.split()

		# Last word is the direction, everything before is the street
		direction = words[-1]
		street = ' '.join(words[:-1])

		streets.append(street)
		directions.append(direction)

	return streets, directions

streets, direction = process_location(extracted['Location'])
extracted.insert(3, 'Street', pd.Series(streets))
extracted.insert(4, 'Direction', pd.Series(direction))

def process_date(_dates: pd.Series):
	#dates: list[str] = []
	#years: list[int] = []
	#months:list[int] = []
	#days: list[int] = []
	#day_indexes: list[int] = []
	days_of_week = []

	for _, item in _dates.items():
		# Import as a datetime object
		date_obj = datetime.strptime(item, '%d/%m/%Y')
		#dates.append(date_obj.strftime("%Y-%m-%d"))
		#years.append(date_obj.year)
		#months.append(date_obj.month)
		#days.append(date_obj.day)
		#day_indexes.append((date_obj.date() - date(2000, 1, 1)).days)
		days_since_first_mon = (date_obj.date() - date(2000, 1, 3)).days % 7
		#match days_since_first_mon:
		#	case 0:
		#		day_of_week = 'Monday'
		#	case 1:
		#		day_of_week = 'Tuesday'
		#	case 2:
		#		day_of_week = 'Wednesday'
		#	case 3:
		#		day_of_week = 'Thursday'
		#	case 4:
		#		day_of_week = 'Friday'
		#	case 5:
		#		day_of_week = 'Saturday'
		#	case 6:
		#		day_of_week = 'Sunday'
		#	case _:
		#		day_of_week = None
		days_of_week.append(days_since_first_mon)

	#return dates, years, months, days, day_indexes, days_of_week
	return days_of_week

# The only one of value might be day of week, but as an int
#dates, years, months, days, date_indexes, days_of_week = process_date(extracted['Date'])
days_of_week = process_date(extracted['Date'])
#extracted['Date'] = dates
extracted.insert(8,'Day_of_week', days_of_week)
#extracted.insert(8, 'DayIndex', date_indexes)
#extracted.insert(8,'Day', days)
#extracted.insert(8,'Month', months)
#extracted.insert(8,'Year', years)

# Remove the location and date columns since they're no longer needed
extracted.drop(columns=['Location', 'Date'], inplace=True)
extracted

In [None]:
def reconfigure(_df: pd.DataFrame):
	# Create sequential IDs within each group
	_df_with_ids = _df.copy()
	_df_with_ids['ID'] = _df_with_ids.groupby(['SCATS', 'Direction', 'Day_of_week']).cumcount()

	# Create the MultiIndex
	_df_with_ids = _df_with_ids.set_index(['SCATS', 'Direction', 'Day_of_week', 'ID'])

	return _df_with_ids

reconfigured = reconfigure(extracted)
reconfigured

In [None]:
# Save dataframe to csv
reconfigured.to_csv('./processed.csv')

In [None]:
def create_graph(_df: pd.DataFrame):
	'''Take the information from a dataframe and create a graph from it.'''

	locations: dict[int, tuple[float, float]] = {}
	street_to_nodes: dict[str, list[int]] = {}

	for _, row in _df.iterrows():
		scats_num: int = row['SCATS']
		latitude: float = row['Latitude']
		longitude: float = row['Longitude']
		loc_desc: str = row['Intersection']

		# Locations is easy to set up
		locations[scats_num] = (latitude, longitude)

		# Split the location description by '/' to get individual streets
		# Clean and process each street name
		streets = [street.strip() for street in loc_desc.split('/')]

		# Associate each street with the SCATS number
		for street in streets:
			if street:
				if street not in street_to_nodes:
					street_to_nodes[street] = []
				street_to_nodes[street].append(scats_num)

	edge_dict = defaultdict(lambda: defaultdict(int))  # Nested defaultdict for {node: {connected_node: cost}}

	# Connect a node to all other nodes with the same street
	for _, nodes in street_to_nodes.items():
		for node in nodes:
			# Add all other nodes from this street as edges with default cost 1
			for connected_node in nodes:
				if connected_node != node:
					edge_dict[node][connected_node] = 1

	# Convert to a regular dictionary
	edges: dict[int, dict[int, int]] = {node: dict(connected_nodes) for node, connected_nodes in edge_dict.items()}

	return locations, edges

locations, edges = create_graph(extracted)
print(locations)
print(edges)

In [None]:
# Print basic graph info
print(f'Number of nodes (intersections): {len(locations)}')
print(f'Number of edges (street connections): {len(edges)}')

# List all nodes and their attributes
for node, (latitude, longitude) in locations.items():
	print(f'{node:4}: ({latitude:.6f}, {longitude:.6f})')

# List all edges and their cost
for node, others in edges.items():
	for other, cost in others.items():
		print(f'{node:4} -- {other:4}: Cost = {cost}')

In [None]:
import search

method = search.select_method('DFS')

if method is None:
	print("Incorrect method type, valid methods:\nDFS, BFS, GBFS, AS, CUS1, CUS2, IDS, BS")
	quit()

graph = search.Graph(edges)
graph.locations = locations

origin = 4030
goals = [4051]

problem = search.GraphProblem(origin, goals, graph)

result, count = method(problem, True)

print('method=AS')
# \n
# Ouput goal node
print('goal=', goals, sep='', end=' | ')

# Output number (length of path)
print('number of nodes=', count, sep='')
# \n
if (result is not None):
	# Output path: list of nodes
	print('path=', result.solution(), sep='')
else:
	print('No path found!')