In [2]:
import sys
import os

# Set the main path in the root folder of the project.
sys.path.append(os.path.join('..'))

In [3]:
# Settings for autoreloading.
%load_ext autoreload
%autoreload 2

In [4]:
from src.utils.seed import set_random_seed

# Set the random seed for deterministic operations.
SEED = 42
set_random_seed(SEED)

In [5]:
import torch

# Set the device for training and querying the model.
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'The selected device is: "{DEVICE}"')

The selected device is: "cuda"


# Loading the Data

In [6]:
import os

BASE_DATA_DIR = os.path.join('..', 'data', 'pems-bay')

In [7]:
import pickle
with open(os.path.join(BASE_DATA_DIR, 'processed', 'scaler.pkl'), 'rb') as f:
    scaler = pickle.load(f)

In [8]:
from src.spatial_temporal_gnn.model import SpatialTemporalGNN
from src.explanation.navigator.model import Navigator
from src.data.data_extraction import get_adjacency_matrix

# Get the adjacency matrix
adj_matrix_structure = get_adjacency_matrix(
    os.path.join(BASE_DATA_DIR, 'raw', 'adj_mx_pems_bay.pkl'))

# Get the header of the adjacency matrix, the node indices and the
# matrix itself.
header, node_ids_dict, adj_matrix = adj_matrix_structure

# Get the STGNN and load the checkpoints.
spatial_temporal_gnn = SpatialTemporalGNN(9, 1, 12, 12, adj_matrix, DEVICE, 64)

stgnn_checkpoints_path = os.path.join('..', 'models', 'checkpoints',
                                      'st_gnn_pems_bay.pth')

stgnn_checkpoints = torch.load(stgnn_checkpoints_path)
spatial_temporal_gnn.load_state_dict(stgnn_checkpoints['model_state_dict'])

# Set the STGNN in evaluation mode.
spatial_temporal_gnn.eval();

In [9]:
from src.data.data_extraction import get_locations_dataframe

# Get the dataframe containing the latitude and longitude of each sensor.
locations_df = get_locations_dataframe(
    os.path.join(BASE_DATA_DIR, 'raw', 'graph_sensor_locations_pems_bay.csv'),
    has_header=False)

In [10]:
# Get the node positions dictionary.
node_pos_dict = { i: id for id, i in node_ids_dict.items() }

In [11]:
import os
import numpy as np

# Get the explained data.
x_test = np.load(os.path.join(BASE_DATA_DIR, 'explained', 'x_test.npy'))[..., :1]
y_test = np.load(os.path.join(BASE_DATA_DIR, 'explained', 'y_test.npy'))[..., :1]

# Get the time information of the explained data.
x_test_time = np.load(os.path.join(BASE_DATA_DIR, 'explained', 'x_test_time.npy'))
y_test_time = np.load(os.path.join(BASE_DATA_DIR, 'explained', 'y_test_time.npy'))

In [12]:
from src.utils.config import MPH_TO_KMH_FACTOR, SEVERE_CONGESTION_THRESHOLD_MPH, CONGESTION_THRESHOLD_MPH

In [13]:
x_test = x_test * MPH_TO_KMH_FACTOR
y_test = y_test * MPH_TO_KMH_FACTOR

In [14]:
import pickle

with open(os.path.join(BASE_DATA_DIR, 'structured', 'node_locations.pkl'), 'rb') as f:
    node_info = pickle.load(f)

In [15]:
sample_x, sample_y, sample_x_time, sample_y_time = x_test[0], y_test[0], x_test_time[0], y_test_time[0]

In [16]:
_, n_timesteps, n_nodes, _ = y_test.shape

In [17]:
from src.explanation.clustering.clustering import (
    get_adjacency_distance_matrix)

adj_distance_matrix = get_adjacency_distance_matrix(adj_matrix, n_timesteps)

In [18]:
from src.explanation.clustering.clustering import (
    get_temporal_distance_matrix)

temporal_distance_matrix = get_temporal_distance_matrix(n_nodes, n_timesteps)

In [19]:
# Set the best parameters based on the results of the grid search.

SPEED_DISTANCE_WEIGHT = 2
N_CLUSTERS = 5

In [20]:
prediction_verbs = [
    'predicted',
    'anticipated',
    'forecasted',
    'expected']

In [21]:
first_paragraph_sentences = [
    'A {c} was {prediction} {w:} {d}, with an average speed of {s} km/h {t}.',
    '{D}, {w} was {prediction} to experience {c}, averaging {s} km/h {t}.',
    '{W} was {prediction} to {c} {d}, with an average speed of {s} km/h {t}.',
    'A {c} was {prediction} to hit {w} {d}, {t}, with an average speed of {s} km/h.',
    'A {c} was {prediction} on {w:on} {d}, with an average speed of {s} km/h {t}.',
    '{W} was {prediction} to see {c} {d}, with an average speed of {s} km/h {t}.',
    'A {c} was {prediction} {w:} {d}, {t}, with an average speed of {s} km/h.',
    'A {c} was {prediction} to occur {w:} {d}, {t}, with an average speed of {s} km/h.',
    '{D}, a {prediction} {c} affected {w}, maintaining an average speed of {s} km/h {t}.',
    '{W} {prediction} {c} {d}, {t}, with an average speed of {s} km/h.']

extra_involved_street_sentences = [
    'The {c} also affected{w}.',
    'The {c} also impacted{w}.',
    'The {c} also hit{w}.',
    'The {c} also took place {w:}.',
    'The {c} also happened {w:}.',
    'The {c} extended {w:}.']

first_paragraph_end_sentences = [
    'This was caused by a',
    'This resulted from a',
    'This was induced by a',
    'This happened because of a',
    'This was driven by a',
    'This was a result of a',
    'The motivation was a',
    'This was triggered by a',
    'This occurred because of a',
    'The reason behind it was a']

second_paragraph_connectors = [
    'An initial {c}',
    'A first {c}',
    'Firstly, a {c}',
    'Initially, a {c}',
    'To begin, a {c}',
    'To start, a {c}',
    'To commence, a {c}',
]

second_paragraph_verbs = [
    'occurred',
    'happened',
    'manifested',
    'materialized',
    'took place'
]

other_paragraphs_connectors = [
    'Following this, {c}',
    'Subsequently, {c}',
    'Next, {c}',
    'Then, {c}',
    'Afterwards, {c}',
    'After this, {c}',
    'After that, {c}',
]

final_paragraph_connectors = [
    'Finally, {c}',
    'Lastly, {c}',
    'Eventually, {c}',
    'To conclude, {c}',
    'In the end, {c}',
    'Ultimately, {c}',
    'At last, {c}']

second_paragraph_sentences = [
    ' {w:} {t} {d}, with an average speed of {s} km/h.',
    ' {w:}, occurring {t} {d}, with an average speed of {s} km/h.',
    ', averaging at a speed of {s} km/h, {w:} {d}, {t}.',
    ', at {s} km/h, {w:} {d}, {t}.',
    ', at {s} km/h, {w:} {t} {d}.',
    ', with an average speed of {s} km/h, {w:} occurring {t} {d}.',
    ' {w:}, with an average speed of {s} km/h, {d}, {t}.',
    ' {w:}, occurring {t} {d}, with an average speed of {s} km/h.',
    ' {w:}, with an average speed of {s} km/h, {d}, {t}.',
    ' {d}, {t} {w:} with an average speed of {s} km/h.']

another_connectors = [
    'another',
    'a new',
    'a further',
    'an additional',
    'an extra']

again_connectors= [
    'again',
    'once more',
    'another time']

yet_again_connectors = [
    'yet again',
    'once again',
    'yet another time']

In [22]:
from src.explanation.clustering.clustering import get_explanation_clusters

clusters = get_explanation_clusters(
    sample_x,
    adj_distance_matrix,
    temporal_distance_matrix,
    speed_distance_weight=SPEED_DISTANCE_WEIGHT,
    n_clusters=N_CLUSTERS)

In [23]:
from typing import Dict, List, Tuple

import numpy as np


def _get_cluster_type(
    values: np.ndarray
    ) -> str:
    if values.mean() <= SEVERE_CONGESTION_THRESHOLD_MPH * MPH_TO_KMH_FACTOR:
        return 'severe congestion'

    elif values.mean() <= CONGESTION_THRESHOLD_MPH * MPH_TO_KMH_FACTOR:
        return 'congestion'

    else:
        return 'free flow'

def _get_time(date: np.datetime64) -> Tuple[str, str, str]:
    days = {0: 'Monday', 1: 'Tuesday', 2: 'Wednesday', 3: 'Thursday',
            4: 'Friday', 5: 'Saturday', 6: 'Sunday'}

    Y, M, D, h, m = [date.astype('datetime64[%s]' % kind) for kind in 'YMDhm']

    year = Y.astype(int) + 1970
    month = M.astype(int) % 12 + 1
    day = (D - M).astype(int) + 1
    day_of_week = days[((D - M).astype(int) - 1) % 7]
    hour = (h - D).astype(int)
    minute = (m - h).astype(int)

    return day_of_week, f'{day:02d}/{month:02d}/{year}', f'{hour:02d}:{minute:02d}'

In [30]:
from datetime import datetime, timedelta
import random
from typing import Any, Dict


def _get_cluster_location_info(
    node_info: Dict[str, Tuple[str, int]],
    node_indices: np.ndarray
    ) -> Dict[str, List[int]]:
    """
    Get a dictionary containing the street involved in the cluster along
    with their involved kms.

    Parameters
    ----------
    node_info : { str: (str, int) }
        The dictionary containing for each node id the street and kilometrage.
    node_indices : ndarray
        The indices of the nodes involved in the cluster.

    Returns
    -------
    { str: list of int }
        The dictionary containing the streets involved in the cluster along
        with their involved kms.
    """
    # Get the unique node indices.
    node_indices = np.unique(node_indices)
    # Get the IDs of the nodes by their indices.
    node_ids = [ node_pos_dict[idx] for idx in node_indices ]

    # Get a dictionary containing the street and kilometrage of each node.
    streets = {}
    for node_id in node_ids:
        # Get the street and kilometrage of the node.
        street, km = node_info[node_id]
        # Add the street and kilometrage to the dictionary.
        if not street in streets.keys():
            streets[street] = [km]
        else:
            streets[street].append(km)
    # Sort the kms of each street and round them to the nearest integer.
    for street, kms in streets.items():
        streets[street] = sorted(set([int(km) for km in kms]))
    # Sort the streets by the number of involved kms.
    streets = dict(sorted(streets.items(), key=lambda x: len(x[1]), reverse=True))
    return streets

def _get_repetition_of_location_information(
    location_information: Dict[str, List[int]],
    previous_location_information: List[Dict[str, List[int]]] = None,
    ) -> Dict[str, int]:
    """
    For each street, count the times it has been involved in the previous
    location information.

    Parameters
    ----------
    location_information : { str: list of int }
        The dictionary containing the location information.
    previous_location_information : list of { str: list of int }, optional
        The list of the previous location information, by default None

    Returns
    -------
    { str: int }
        The dictionary containing the times each street has been involved in
        the previous location information.
    """
    # Set the dictionary containing the times each street has been involved in
    # the previous location information to count 0 for each street.
    equal_times_counts = { k: 0 for k in location_information.keys() }
    
    if previous_location_information is not None:
        # For each street, count the times it has been involved in the previous
        # location information.
        for k in location_information.keys():
            equal_times_count = 0
            for previous_location_info in previous_location_information:
                if k in previous_location_info.keys():
                    equal_times_count += 1
            equal_times_counts[k] = equal_times_count

    return equal_times_counts

def _get_target_location_sentence(
    location_information: Dict[str, List[int]],
    previous_location_information: List[Dict[str, List[int]]] = None
    ) -> List[Tuple[str, str]]:
    """
    Get a list of tuple of sentences which for each involved street contains the
    information about the involved kms. The first sentence of the tuple is the
    sentence without the adverb, the second one is the sentence with the adverb
    "on".

    Parameters
    ----------
    location_information : { str: list of int }
        The dictionary containing the location information.
        Keys are the streets and values are the involved kms.
    previous_location_information : list of { str: list of int }, optional
        The list of the previous location information, by default None

    Returns
    -------
    list of (str, str)
        The list of the sentences containing the location information.
        The first sentence of the tuple is the sentence without the adverb,
        the second one is the sentence with the adverb "on".
    """
    previous_locations_information_count = _get_repetition_of_location_information(
        location_information,
        previous_location_information)

    # Set the list of the location sentences.
    location_sentences = []
    for street, kms in location_information.items():
        if previous_locations_information_count[street] == 0:
            connector = ''
        elif previous_locations_information_count[street] == 1:
            connector = f', {random.choice(again_connectors)}, '
        else:
            connector = f', {random.choice(yet_again_connectors)}, '
        
        location_sentence = f'{connector}{street} at '
        
        
        if len(kms) == 1:
            location_sentence += f'km {kms[0]}'
        else:
            kms_sentence = ', '.join([f'{km}' for km in kms[:-1]])
            kms_sentence += f' and {kms[-1]}'
            location_sentence += f'kms {kms_sentence}'
        location_sentences.append((
            location_sentence, 
            'on ' + location_sentence if connector == '' else 'on' + location_sentence))
    return location_sentences

def _link_other_locations_sentences(
    location_sentences: List[str],
    ) -> Tuple[str, str]:
    """
    Link the sentences containing the location information in a single
    sentence.

    Parameters
    ----------
    location_sentences : list of str
        The list of the sentences containing the location information.

    Returns
    -------
    str
        The linked sentence containing the location information.
    str
        The linked sentence containing the location information with the
        adverb "on". 
    """
    if len(location_sentences) == 1:
        return location_sentences[0]
    else:
        formatted_location_sentences = [
            l[0][2:] if l[0][0] == ',' else l[0] for l in location_sentences]
        
        location_sentence = ', '.join(formatted_location_sentences[:-1])
        location_sentence += f' and {formatted_location_sentences[-1]}'

        formatted_location_sentences = [
            l[1][2:] if l[1][0] == ',' else l[1] for l in location_sentences]
        location_sentence_adv = ', '.join(formatted_location_sentences[:-1])
        location_sentence_adv += f' and {formatted_location_sentences[-1]}'

        return location_sentence, location_sentence_adv

def _get_cluster_time_info(
    time_info: np.ndarray,
    time_indices: np.ndarray
    ) -> Dict[str, str]:
    """
    Get the time information of the cluster.

    Parameters
    ----------
    time_info : ndarray
        The array containing the time information of the nodes.
    time_indices : ndarray
        The indices of the nodes involved in the cluster.

    Returns
    -------
    { str: str }
        The dictionary containing the time information of the cluster.
    """
    # Get the minimum and maximum timestep of the target nodes.
    min_timestep, max_timestep = np.min(time_indices), np.max(time_indices)
    y_min_time, y_max_time = time_info[min_timestep][0], time_info[max_timestep][0]
    beginning_day, beginning_date, beginning_hour = _get_time(y_min_time)
    end_day, end_date, end_hour = _get_time(y_max_time)

    cluster_time_info = {}

    # Put the date and day information of the target nodes in the
    # knowledge graph.
    if beginning_date == end_date:
        cluster_time_info['on date'] = beginning_date
        cluster_time_info['on day'] = beginning_day
        if beginning_hour == end_hour:
            cluster_time_info['on time'] = beginning_hour
        else:
            cluster_time_info['from time'] = beginning_hour
            cluster_time_info['to time'] = end_hour
    else:
        cluster_time_info['from date'] = beginning_date
        cluster_time_info['to date'] = end_date

        cluster_time_info['from day'] = beginning_day
        cluster_time_info['to day'] = end_day
        
        # Put the time information of the target nodes in the knowledge graph.
        cluster_time_info['from time'] = beginning_hour
        cluster_time_info['to time'] = end_hour
        
    return cluster_time_info

def _get_time_sentence(
    temporal_information: Dict[str, str]
    ) -> str:
    """
    Get the sentence containing the time information.

    Parameters
    ----------
    temporal_information : { str: str }
        The dictionary containing the time information of the cluster.

    Returns
    -------
    str
        The sentence containing the time information.
    """
    
    if 'from time' in temporal_information:
        from_time = temporal_information['from time']
        to_time = temporal_information['to time']
        return f'from {from_time} to {to_time}'
    else:
        on_time = temporal_information['on time']
        return f'at {on_time}'

def _get_target_day_sentence(
    target_temporal_information: Dict[str, Any],
    ) -> str:
    """
    Get the sentence containing the day information of the cluster.

    Parameters
    ----------
    target_temporal_information : { str: Any }
        The dictionary containing the time information of the cluster.

    Returns
    -------
    str
        The sentence containing the day information of the cluster.
    """
    if 'from day' in target_temporal_information:
        from_day = target_temporal_information['from day']
        to_day = target_temporal_information['to day']
        from_date = target_temporal_information['from date']
        to_date = target_temporal_information['to date']
        day_sentence = f'from {from_day}, {from_date} to {to_day}, {to_date}'
    else:
        on_day = target_temporal_information['on day']
        on_date = target_temporal_information['on date']
        day_sentence = f'on {on_day}, {on_date}'
        
    return day_sentence

def _get_formatted_day_sentence(
    input_temporal_information: Dict[str, str],
    target_date_dt: datetime,
    is_target_more_days: bool,
    ) -> str:
    """
    Get the sentence containing the day information of the cluster
    formatted in a way that is more readable.

    Parameters
    ----------
    input_temporal_information : { str: str }
        The dictionary containing the time information of the cluster.
    target_date_dt : datetime
        The datetime of the date of the target cluster.
    is_target_more_days : bool
        Whether the target cluster spans in more days.

    Returns
    -------
    str
        The sentence containing the day information of the cluster
        formatted in a way that is more readable.
    """
    target_adjective = 'first' if is_target_more_days else 'same' 
    
    # Case where the input temporal information spawns in more days.
    if 'from day' in input_temporal_information:
        input_from_date = input_temporal_information['from date']
        input_from_date_dt = datetime.strptime(input_from_date, '%d/%m/%Y')

        input_to_date = input_temporal_information['to date']
        input_to_date_dt = datetime.strptime(input_to_date, '%d/%m/%Y')

        input_from_day = input_temporal_information['from day']
        input_to_day = input_temporal_information['to day']
        
        # If input end date is the same as the target date.
        if input_to_date_dt == target_date_dt:
            # If input start date is the day before the target date.
            if input_from_date_dt == target_date_dt - timedelta(days=1):
                return f'from the previous to the {target_adjective} day'
            else:
                return f'from {input_from_day}, {input_from_date} to the {target_adjective} day'
        # If input end date is different from the target date.
        else:
            return f'from {input_from_day}, {input_from_date} to {input_to_day}, {input_to_date}'
    # Case where the input temporal information spawns in one day.
    else:
        input_on_date = input_temporal_information['on date']
        input_on_date_dt = datetime.strptime(input_on_date, '%d/%m/%Y')

        input_on_day = input_temporal_information['on day']
        # If input date is the same as the target date.
        if input_on_date_dt == target_date_dt:
            if is_target_more_days:
                return f'on the {target_adjective} day'
            else:
                return ''
        # If input date is the day before the target date.
        elif input_on_date_dt == target_date_dt - timedelta(days=1):
            return f'on the previous day'
        # If input date is different from the target date.
        else:
            return f'on {input_on_day}, {input_on_date}'

def _get_input_day_sentence(
    target_temporal_information: Dict[str, Any],
    input_temporal_information: Dict[str, Any],
    ) -> str:
    """
    Get the sentence containing the day information of the cluster.

    Parameters
    ----------
    target_temporal_information : { str: Any }
        The dictionary containing the time information of the target cluster.
    input_temporal_information :  { str: Any }
        The dictionary containing the time information of the input cluster.

    Returns
    -------
    str
        The sentence containing the day information of the cluster.
    """
    # Case where the target temporal information spawns in more days.
    if 'from day' in target_temporal_information:
        target_from_date = target_temporal_information['from date']
        target_from_date_dt = datetime.strptime(target_from_date, '%d/%m/%Y')
        return _get_formatted_day_sentence(
            input_temporal_information,
            target_from_date_dt,
            is_target_more_days=True)
        
    # Case where the target temporal information spawns in one day.
    else:
        target_on_date = target_temporal_information['on date']
        target_on_date_dt = datetime.strptime(target_on_date, '%d/%m/%Y')
        return _get_formatted_day_sentence(
            input_temporal_information,
            target_on_date_dt,
            is_target_more_days=False)

def _replace_template_placeholder(
    sentence: str,
    placeholder: str,
    replacement: str,
    ) -> str:
    """
    Replace the `placeholder` in the `sentence` with the `replacement`.

    Parameters
    ----------
    sentence : str
        The sentence containing the placeholder.
    placeholder : str
        The placeholder to replace.
    replacement : str
        The replacement of the placeholder.

    Returns
    -------
    str
        The sentence with the placeholder replaced.
    """
    if placeholder == '{d}' and replacement == '':
        sentence = sentence.replace('{d}, ', '')
        sentence = sentence.replace(' {d}.', '.')

    # Substitute `placeholder` in the sentence with the `replacement`.
    sentence = sentence.replace(placeholder.upper(), replacement.capitalize())
    sentence = sentence.replace(placeholder, replacement)

    return sentence

def _fill_first_paragraph_template(
    predicted_cluster_kind: str,
    time_sentence: str,
    day_sentence: str,
    average_speed: float,
    street_sentences: List[str],
    ) -> str:
    """
    Fill the template of the first paragraph.

    Parameters
    ----------
    predicted_cluster_kind : str
        The type of the predicted cluster.
    time_sentence : str
        The sentence containing the time information.
    day_sentence : str
        The sentence containing the day information.
    average_speed : float
        The average speed of the target nodes in the cluster.
    street_sentences : list of str
        The list of the sentences containing the location information.

    Returns
    -------
    str
        The filled template of the first paragraph.
    """
    sentence = random.choice(first_paragraph_sentences)
    # substitute prediction verb
    sentence = sentence.replace('{prediction}', random.choice(prediction_verbs))
    # Add the cluster type information to the sentence.
    sentence = _replace_template_placeholder(sentence, '{c}', predicted_cluster_kind)
    # Add the name of the street information to the sentence.
    sentence = _replace_template_placeholder(sentence, '{w}', street_sentences[0][0])
    sentence = _replace_template_placeholder(sentence, '{w:}', street_sentences[0][1])
    # Add the day information to the sentence.
    sentence = _replace_template_placeholder(sentence, '{d}', day_sentence)
    # Add the time information to the sentence.
    sentence = _replace_template_placeholder(sentence, '{t}', time_sentence)
    # Add the average speed information to the sentence.
    sentence = sentence.replace('{s}', f'{average_speed:.2f}')
    if len(street_sentences) > 1:
        extra_involved_street_sentence = random.choice(extra_involved_street_sentences)
        other_locations_sentences = _link_other_locations_sentences(street_sentences[1:])
        other_locations_sentence = other_locations_sentences[0]
        if not other_locations_sentence.startswith(','):
            other_locations_sentence = ' ' + other_locations_sentence
        other_locations_sentence_adv = other_locations_sentences[1]
        extra_involved_street_sentence = _replace_template_placeholder(extra_involved_street_sentence, '{w}', other_locations_sentence)
        extra_involved_street_sentence = _replace_template_placeholder(extra_involved_street_sentence, '{w:}', other_locations_sentence_adv)
        extra_involved_street_sentence = _replace_template_placeholder(extra_involved_street_sentence, '{c}', predicted_cluster_kind)
        return sentence + ' ' + extra_involved_street_sentence
    return sentence

def _fill_other_paragraph(
    input_cluster_kind: str,
    formatted_cluster_type: str,
    connector: str,
    time_sentence: str,
    day_sentence: str,
    average_speed: float,
    street_sentences: List[str],
    ) -> str:
    """
    Fill the template of the other paragraphs.

    Parameters
    ----------
    input_cluster_kind : str
        The type of the input cluster.
    formatted_cluster_type : str
        The type of the input cluster formatted.
    connector : str
        The connector of the paragraph.
    time_sentence : str
        The sentence containing the time information.
    day_sentence : str
        The sentence containing the day information.
    average_speed : float
        The average speed of the target nodes in the cluster.
    street_sentence : list of str
        The list of the sentences containing the location information.

    Returns
    -------
    str
        The filled template of the other paragraph.
    """
    sentence = connector + random.choice(second_paragraph_sentences)
    # Add the input type information to the sentence.
    sentence = _replace_template_placeholder(sentence, '{c}', formatted_cluster_type)
    # substitute prediction verb
    sentence = sentence.replace('{prediction}', random.choice(prediction_verbs))
    # Add the name of the street information to the sentence.
    sentence = _replace_template_placeholder(sentence, '{w}', street_sentences[0][0])
    sentence = _replace_template_placeholder(sentence, '{w:}', street_sentences[0][1])
    # Add the day information to the sentence.
    sentence = _replace_template_placeholder(sentence, '{d}', day_sentence)
    # Add the time information to the sentence.
    sentence = _replace_template_placeholder(sentence, '{t}', time_sentence)
    # Add the average speed information to the sentence.
    sentence = sentence.replace('{s}', f'{average_speed:.2f}')
    if len(street_sentences) > 1:
        extra_involved_street_sentence = random.choice(extra_involved_street_sentences)
        other_locations_sentences = _link_other_locations_sentences(street_sentences[1:])
        other_locations_sentence = other_locations_sentences[0]
        if not other_locations_sentence.startswith(','):
            other_locations_sentence = ' ' + other_locations_sentence
        other_locations_sentence_adv = other_locations_sentences[1]
        extra_involved_street_sentence = _replace_template_placeholder(extra_involved_street_sentence, '{w}', other_locations_sentence)
        extra_involved_street_sentence = _replace_template_placeholder(extra_involved_street_sentence, '{w:}', other_locations_sentence_adv)
        extra_involved_street_sentence = _replace_template_placeholder(extra_involved_street_sentence, '{c}', input_cluster_kind)
        return sentence + ' ' + extra_involved_street_sentence
    return sentence

def _get_cluster_time_span(
    temporal_information: Dict[str, str],
    ) -> Tuple[datetime, datetime]:
    """
    Get the time span of the cluster.

    Parameters
    ----------
    temporal_information : { str: str}
        The dictionary containing the time information of the cluster.

    Returns
    -------
    datetime
        The time datetime of the beginning of the cluster.
    datetime
        The time datetime of the end of the cluster.
    """
    if 'from time' in temporal_information:
        from_time = temporal_information['from time']
        from_time_datetime = datetime.strptime(from_time, '%H:%M')
        to_time = temporal_information['to time']
        to_time_datetime = datetime.strptime(to_time, '%H:%M')
        return (from_time_datetime.time(), to_time_datetime.time())
    else:
        on_time = temporal_information['on time']
        time_datetime = datetime.strptime(on_time, '%H:%M')
        return (time_datetime.time(), time_datetime.time())

def _get_cluster_day_span(
    temporal_information: Dict[str, str],
    ) -> Tuple[datetime, datetime]:
    """
    Get the day span of the cluster.

    Parameters
    ----------
    temporal_information : { str: str }
        The dictionary containing the time information of the cluster.

    Returns
    -------
    datetime
        The day datetime of the beginning of the cluster.
    datetime
        The day datetime of the end of the cluster.
    """
    # Get the clusters sorted by time.
    if 'from day' in temporal_information:
        from_date = temporal_information['from date']
        from_date_datetime = datetime.strptime(from_date, '%d/%m/%Y')
        to_date = temporal_information['to date']
        to_date_datetime = datetime.strptime(to_date, '%d/%m/%Y')
        from_time, to_time = _get_cluster_time_span(temporal_information)
        return (datetime.combine(from_date_datetime, from_time),
                datetime.combine(to_date_datetime, to_time))
    else:
        on_date = temporal_information['on date']
        on_date_datetime = datetime.strptime(on_date, '%d/%m/%Y')
        from_time, to_time = _get_cluster_time_span(temporal_information)
        return (datetime.combine(on_date_datetime, from_time),
                datetime.combine(on_date_datetime, to_time))

def _get_repetition_of_cluster_type_information(
    cluster_type_information: str,
    previous_cluster_type_information: List[str],
    ) -> int:
    """
    Get the number of times the cluster type information has been repeated
    in the previous clusters.

    Parameters
    ----------
    cluster_type_information : str
        The cluster type information.
    previous_cluster_type_information : list of str
        The list of the previous cluster type information.

    Returns
    -------
    int
        The number of times the cluster type information has been repeated
        in the previous clusters.
    """
    equal_clusters_count = 0
    for c in previous_cluster_type_information:
        if cluster_type_information == c:
            equal_clusters_count += 1
    return equal_clusters_count

def _get_first_paragraph_plus_end_sentence(
    first_paragraph: str,
    cluster_types: List[str]
    ) -> str:
    """
    Get the first paragraph end sentence.

    Parameters
    ----------
    first_paragraph : str
        The first paragraph.
    cluster_types : list of str
        The list of the cluster types present in the explanation.

    Returns
    -------
    str
        The first paragraph with the added end sentence.
    """
    first_paragraph_end_sentence = random.choice(first_paragraph_end_sentences)
    first_paragraph += ' ' + first_paragraph_end_sentence
    
    n_congestions = 0.
    n_free_flows = 0.
    
    endings = {
        (0, 1): ' free flow.',
        (1, 0): ' congestion.',
        (1, 1): ' congestion and a free flow.',
        (0, len(cluster_types)): ' series of free flows.',
        (len(cluster_types), 0): ' series of congestions.',
        (1, len(cluster_types) - 1): ' series of free flows and a congestion.',
        (len(cluster_types) - 1, 1): ' series of congestions and a free flow.',
    }
    
    for cluster_type in cluster_types:
        if cluster_type in['congestion', 'severe congestion']:
            n_congestions += 1
        elif cluster_type == 'free flow':
            n_free_flows += 1
    if (n_congestions, n_free_flows) in endings.keys():
        return first_paragraph + endings[(n_congestions, n_free_flows)]
    else:
        return first_paragraph + ' series of congestions and free flows.'

def get_verbal_explanation(
    x: np.ndarray,
    x_times: np.ndarray,
    x_clusters: np.ndarray,
    y: np.ndarray,
    y_times: np.ndarray
    ) -> str:
    """
    Get the verbal explanation of the prediction.

    Parameters
    ----------
    x : ndarray
        The input data.
    x_times : ndarray
        The input data times.
    x_clusters : ndarray
        The input data clusters.
    y : ndarray
        The target data.
    y_times : ndarray
        The target data times.

    Returns
    -------
    str
        The verbal explanation of the prediction.
    """

    # Get the values of the selected target nodes.
    target_node_values = y[y > 0]

    # Get the type of the target nodes cluster (eg.: congestion, free flow).
    target_cluster_type = _get_cluster_type(target_node_values)

    # Get the indices of the non-null values of the target nodes.
    y_indices = np.nonzero(y)

    # Translate the temporal information.
    target_temporal_information = _get_cluster_time_info(y_times, y_indices[0])
    day_sentence = _get_target_day_sentence(target_temporal_information)
    time_sentence = _get_time_sentence(target_temporal_information)

    # Translate the location information.
    target_street_information = _get_cluster_location_info(node_info, y_indices[1])
    street_sentence = _get_target_location_sentence(target_street_information)

    # Get the average speed of the target nodes.
    target_average_speed = target_node_values.mean()

    # Get the first paragraph.
    first_paragraph = _fill_first_paragraph_template(
        target_cluster_type,
        time_sentence,
        day_sentence,
        target_average_speed,
        street_sentence)

    # Set the list of the other paragraphs.
    other_paragraphs = []

    # Get the input clusters IDs.
    input_clusters_ids = [c for c in np.unique(x_clusters) if c != -1]

    input_clusters_with_information = []

    for c in input_clusters_ids:
        # Get the values of the nodes of the cluster.
        input_node_values = x[x_clusters == c]

        # Get the type of the cluster.
        input_cluster_type = _get_cluster_type(input_node_values)

        # Get the indices of the clusters in the input data.
        x_indices = np.where(x_clusters == c)

        # Get the temporal information.
        input_temporal_information = _get_cluster_time_info(x_times, x_indices[0])

        # Get the location information.
        input_location_information = _get_cluster_location_info(node_info, x_indices[1])

        # Get the average speed of the target nodes.
        input_average_speed = input_node_values.mean()

        # Set the input information in a dictionary.
        input_information = {
            'type': input_cluster_type,
            'temporal': input_temporal_information,
            'location': input_location_information,
            'average_speed': input_average_speed
        }
        
        # Add the cluster information to the list.
        input_clusters_with_information.append((c, input_information))

    first_paragraph = _get_first_paragraph_plus_end_sentence(
        first_paragraph,
        [inf['type'] for _, inf in input_clusters_with_information])

    # Sort the clusters by the time they occur and get just the information.
    input_clusters_with_information = sorted(
        input_clusters_with_information,
        key=lambda x: _get_cluster_day_span(x[1]['temporal']))

    for i, (_, info) in enumerate(input_clusters_with_information):
        # Get the type of the input cluster.
        input_cluster_type = info['type']
        same_cluster_type_count = _get_repetition_of_cluster_type_information(
            input_cluster_type,
            [inf['type'] for _, inf in input_clusters_with_information[:i]])
        formatted_cluster_type = f'contributing {info["type"]}'
        if i > 0 and same_cluster_type_count == 0:
            formatted_cluster_type = f'a {formatted_cluster_type}'
        elif i > 0 and same_cluster_type_count == 1:
            formatted_cluster_type = f'{random.choice(another_connectors)} {formatted_cluster_type}'
        elif i > 0 and same_cluster_type_count > 1:
            formatted_cluster_type = f'yet {random.choice(another_connectors)} {formatted_cluster_type}'

        if i == 0:
            paragraph_connector = f'{random.choice(second_paragraph_connectors)} {random.choice(second_paragraph_verbs)}'
        elif i == len(input_clusters_ids) - 1:
            paragraph_connector = f'{random.choice(final_paragraph_connectors)} {random.choice(second_paragraph_verbs)}'
        else:
            paragraph_connector = f'{random.choice(other_paragraphs_connectors)} {random.choice(second_paragraph_verbs)}'


        # Translate the temporal information.
        #previous_temporal_information = [
        #    inf['temporal'] for _, inf in input_clusters_with_information[:i]]
        day_sentence = _get_input_day_sentence(
            target_temporal_information,
            info['temporal'])
        time_sentence = _get_time_sentence(info['temporal'])
        #previous_temporal_information)

        # Translate the location information.
        previous_location_information = [
            inf['location'] for _, inf in input_clusters_with_information[:i]]
        input_location_sentence = _get_target_location_sentence(
            info['location'],
            previous_location_information
            )

        # Get the average speed of the target nodes.
        input_average_speed = info['average_speed']
        
        # Get the other paragraph.
        other_paragraph = _fill_other_paragraph(
            input_cluster_type,
            formatted_cluster_type,
            paragraph_connector,
            time_sentence,
            day_sentence,
            input_average_speed,
            input_location_sentence)

        other_paragraphs.append(other_paragraph)

    # Get the explanation.
    explanation = first_paragraph + '\n\n' + '\n\n'.join(other_paragraphs)
    return explanation

In [29]:
print(get_verbal_explanation(sample_x, sample_x_time, clusters, sample_y, sample_y_time))

A severe congestion was predicted to occur on Glendale Freeway at km 10 on Wednesday, 04/06/2012, from 07:45 to 08:40, with an average speed of 37.00 km/h. The motivation was a series of congestions and free flows.

An initial contributing free flow took place, at 107.32 km/h, on Glendale Freeway at kms 8 and 10 from 06:45 to 07:15.

Subsequently, an extra contributing free flow occurred, at 101.33 km/h, on, again, Glendale Freeway at kms 7, 8, 10 and 11 from 06:45 to 07:25. The free flow also affected Golden State Freeway at kms 4 and 7 and Ventura Freeway at km 5.

After that, a contributing congestion happened on, yet again, Glendale Freeway at kms 8, 9 and 10, with an average speed of 75.95 km/h, from 06:45 to 07:40.

After this, yet an additional contributing free flow manifested, with an average speed of 101.91 km/h, on, yet again, Glendale Freeway at kms 10 and 11 occurring from 06:45 to 07:40. The free flow extended on, once more, Golden State Freeway at km 7.

Eventually, a co