In [1]:
# Imports
import os
import csv
import numpy as np
import speech_recognition as sr 
from scipy.sparse import csr_matrix
from sknetwork.path import shortest_path

from collections import defaultdict
import functools
import itertools

currentPath = os.path.dirname(os.path.abspath(''))

In [2]:
# Reading trips from csv file
# Inspired from https://stackoverflow.com/a/12398967

pathCount = 0
timeTableFileName = os.path.join(currentPath, './data/timetables_edited.csv')

# Create dictionary to associates a station name with an id
trainStationNameToId = defaultdict(functools.partial(next, itertools.count()))

with open(timeTableFileName, newline='', encoding='UTF-8') as csvfile:
    reader = csv.reader(csvfile, delimiter=',')

    # Set of the reading position (ignoring header line)
    csvfile.seek(0)
    next(reader)

    # First reading to get the number different stations and init shape of trips object
    for row in reader:
        idxA = trainStationNameToId[row[1]]
        idxB = trainStationNameToId[row[2]]
    numberOfTrainstations = len(trainStationNameToId)
    trips = np.zeros((numberOfTrainstations, numberOfTrainstations))

    # Reset of the reading position (ignoring header line)
    csvfile.seek(0)
    next(reader)

    # Reading data
    for row in reader:
        pathCount += 1 
        idxA = trainStationNameToId[row[1]]
        idxB = trainStationNameToId[row[2]]

        # If trip already exist/has already be read, display message
        indexTupple = (idxA, idxB) if idxA < idxB else (idxB, idxA)
        if trips[indexTupple] != 0:
            print(f"Trip {row[1]} - {row[2]} with a distance of {row[3]} has already be read with a distance of {trips[indexTupple]}. Ignoring the new one.")
        else:
            trips[indexTupple] = int(row[3])


# Create dictionarry to map an id to its train station name
trainStationIdToName = dict((id, name) for name, id in trainStationNameToId.items())

# Make matrix symetrical as the trips are not directed but can be taken in both directions
# Source from https://stackoverflow.com/a/42209263
i_lower = np.tril_indices(numberOfTrainstations, -1)
trips[i_lower] = trips.T[i_lower]

# Creating Compressed Sparse Row (CSR) matrix to store and work more efficiently with only the trips
tripGraph = csr_matrix(trips)

print(f"Read {tripGraph.getnnz()} go and back trips ({int(tripGraph.getnnz() / 2)} distinc trips) out of {pathCount} rows for {len(trainStationNameToId)} distinct train stations.")

Read 3150 go and back trips (1575 distinc trips) out of 1575 rows for 817 distinct train stations.


In [3]:
class Trip:
    def __init__(self, startStationId, endStationId, path, totalDuration):
        self.startStationId = startStationId
        self.endStationId = endStationId
        self.path = path
        if totalDuration is None:
            self.totalDuration = None
        else:
            self.totalDuration = int(totalDuration)

    def __str__(self):
     return f"Trip from {self.startStationId} to {self.endStationId} for a total duration of {self.totalDuration} by this path: {self.path}"

In [4]:
# Functions used to determine the shortest path between cities 

def getPathBetweenIds(trainStationStartIds: list, trainStationEndIds: list):
    global tripGraph
    paths = []
    # As shortest_path() does not support multiple sources and multiple targets at the same time, we'll iterate through all start points and manually concat the results
    if(len(trainStationStartIds) > 1 and len(trainStationEndIds) > 1):
        for trainStationEndId in trainStationEndIds:
            results = shortest_path(tripGraph, sources=[int(i) for i in trainStationStartIds], targets=[int(trainStationEndId)], method='D')
            for result in results:
                if len(result) >= 2:
                    paths.append(result)
        return paths
    else:
        results = shortest_path(tripGraph, sources=[int(i) for i in trainStationStartIds], targets=[int(i) for i in trainStationEndIds], method='D')
        for result in results:
            if len(result) >= 2:
                paths.append(result)
        return paths

def isCityMatchingKey(city: str, key: str):
    return city.lower() in key.lower()

def getPathBetweenCities(start: str, end: str):
    trainStationStartIds = np.array([])
    trainStationEndIds = np.array([])
    
    # Get all stations that contains the searched name
    for key, value in trainStationNameToId.items():
        if isCityMatchingKey(start, key):
            trainStationStartIds = np.append(trainStationStartIds, value)
        if isCityMatchingKey(end, key):
            trainStationEndIds = np.append(trainStationEndIds, value)

    if len(trainStationStartIds) > 0 and len(trainStationEndIds) > 0:
        return getPathBetweenIds(trainStationStartIds, trainStationEndIds)
    else:
        return np.array([])

def getBestPathForFullTrip(tripCityWaypoints: list):
    global tripGraph
    fullTrip = np.array(np.zeros(len(tripCityWaypoints)-1), dtype=object)
    # Iterate through all sub trips
    for trip in range(len(fullTrip)):
        paths = getPathBetweenCities(tripCityWaypoints[trip], tripCityWaypoints[trip+1])
        minDistance = None
        keptPath = None
        startId = None
        endId = None

        # Iterate though the returned array
        for path in paths:
            distance = 0
            # Nested array => multiple start/end possible
            if isinstance(path, list):
                for i in range(len(path)-1):
                    distance = distance + tripGraph[(path[i], path[i+1])]
                if minDistance is None or distance < minDistance:
                    minDistance = distance
                    keptPath = path
                    startId = path[0]
                    endId = path[len(path)-1]
            
            # Scalar value => only one path possible
            else:
                for i in range(len(paths)-1):
                    distance = distance + tripGraph[(paths[i], paths[i+1])]
                minDistance = distance
                keptPath = paths
                startId = paths[0]
                endId = paths[len(paths)-1]

        fullTrip[trip] = Trip(startId, endId, keptPath, minDistance)
    return fullTrip


# Use following to test pathfinding functions above
def testPathfinding():
    bestTrips = getBestPathForFullTrip(['Orléan', 'Paris', 'Vosges'])
    for i in range(len(bestTrips)):
        print(f"#{i} - {bestTrips[i]}")
        for stationId in bestTrips[i].path:
            print(f"{trainStationIdToName[stationId]}")
testPathfinding()

#0 - Trip from 18 to 12 for a total duration of 75 by this path: [18, 12]
Gare de Orléans
Gare de Paris-Austerlitz
#1 - Trip from 735 to 308 for a total duration of 283 by this path: [735, 249, 670, 305, 300, 308]
Gare de Paris-Est
Gare de Chaumont
Gare de Culmont-Chalindrey
Gare de Vittel
Mirecourt
Gare de Charmes (Vosges)


In [5]:
# Getting user's request
# Inspired from https://www.geeksforgeeks.org/python-convert-speech-to-text-and-text-to-speech/
try: 
    r = sr.Recognizer()
    with sr.Microphone() as source: 
        print("Adjusting to noise level...")
        r.adjust_for_ambient_noise(source, duration=0.2) 
 
        print("Listening...")
        audio = r.listen(source) 
            
        print("Voice recognition...")
        request = r.recognize_google(audio, language="fr-FR") 

        print(f"Parsed: '{request}'") 
            
except sr.RequestError as e: 
    print(f"Exception during request parsing: {e}") 

Adjusting to noise level...
Listening...
Voice recognition...
Parsed: 'j'aimerais aller de Orléans à Paris puis dans les Vosges'


In [6]:
# Extract cities list in the right travel order from the user's request
# TODO 

In [7]:
# Call getBestPathForFullTrip() and display the results
# TODO