### Dependencies

In [19]:
import os
import shutil
import pickle
import numpy as np
import json
import matplotlib.pyplot as plt
from matplotlib.patches import Polygon as MatplotlibPolygon
from shapely import wkt
import shapely
import shapely.geometry as sg
from shapely.geometry import Polygon, LineString, Point, MultiPoint
import networkx as nx
import geopandas as gp
from shapely.ops import nearest_points 
from typing import Dict, List, Tuple, Union
from sklearn import preprocessing

# from Plan import Plan
from RoomColours import roomColours
from MarkovChainClass import *
from read_historic_data import *

### Load SDDS plan data as dictionary

In [20]:
def load_dict_from_file(file_path):
    """ dataset composed in read_plans_csv.ipynb"""
    with open(file_path, 'rb') as file:
        data_dict = pickle.load(file)
    return data_dict

plans = load_dict_from_file('Plans.pickle')
plan_ids = load_dict_from_file('plan_ids.pickle')
apartment_ids = load_dict_from_file('apartment_ids.pickle')

### Perform MC simulation on decomposition graph

In [21]:
""" Monte Carlo Simulation of decomposition graph used to label floor plan pre-evacuation times"""

pre_evac_time_per_start = []
behaviour_class = behaviour_in_fire_markov_chain(10000, node_attributes = optimal_node_attributes)

for i in Start_nodes:
    total_time_i = round(behaviour_class.simulate_markov_chain(i, 10000), 2)
    pre_evac_time_per_start.append(total_time_i)


print(pre_evac_time_per_start)

[426.45, 362.43, 366.39, 145.71]


### Create class to analyse plan geometry and adjacencies in terms of fire safety

In [22]:
class Plan:
    """ Class used to find plan rooms, adjacencies (with / without door) , evacuation times (Tp, Te) and create Graph (G = {N,E}) for predicting risk in a given floor plan"""
    def __init__(
            self, 
            Plan_id: int, 
            Apartment_id: str, 
            plans: dict,
            mrkv_chain_behaviours: behaviour_in_fire_markov_chain,
            room_of_origin_weights: dict
    ):
        
        ## identification of plan
        self.plans = plans
        self.plan_id = Plan_id
        self.apartment_id = Apartment_id

        ## rooms / nodes
        self.rooms = self.__get_rooms()
        self.room_attributes = self.__get_room_attributes()
        self.doors, self.door_widths, self.entrance_door  = self.__get_doors()

        ## walls
        self.walls = self.__get_walls()

        ## distances between rooms and adjacency lists
        self.connections_without_doors = self.__get_room_room_connections()
        self.connections_with_doors, self.door_widths = self.__get_room_door_room_connections()
        self.entrance_connection, self.entrance_door_centre = self.__get_entrance_connection()
        self.edges_with_weights, self.edge_door_widths = self.__get_room_adjacencies()
        self.edge_midpoints = self.__get_edge_midpoints()
        self.edge_attributes = self.__get_edge_attributes()

        ## get centres of nodes
        self.room_centers = self.__get_room_centres()

        ## markov chain fire emergency behaviour
        self.mrk_chain = mrkv_chain_behaviours

        ## room of origin likelihood
        self.room_of_origin_weights = room_of_origin_weights
        
        ## graph
        self.graph = self.__make_graph()

    def show_plan_apartment(self) -> None:
        """ Displays the plan of the one apartment specified in class initiation """

        #    Extract plan ID and apartment ID
        plan_id = self.plan_id
        apartment_id = self.apartment_id

        # Load geometry from plans
        geo = shapely.wkt.loads(self.plans[plan_id][apartment_id]['geometries_dict'])
        
        # Plot the geometry
        plt.figure()
        p = gp.GeoSeries(geo)

        p.plot(color=self.__give_colours())


        
        # Set title
        plt.suptitle('Plan: ' + str(plan_id) + ', ' + 'apartment: ' + apartment_id)
        
        # Show plot
        plt.show()

    def show_plan(self) -> None:
        """ Displays the plan of the whole floor in which the plan is in"""
        
        # Extract plan ID
        plan_id = self.plan_id

        # Initialize lists for geometry and colors
        _geo = []
        colours = []

        # Iterate over apartment IDs for the given plan
        for apartment_id in apartment_ids[plan_id]:
            # Create a Plan object for the current apartment
            plan = Plan(plan_id, apartment_id, plans, self.mrk_chain, self.room_of_origin_weights)
            
            # Append colors from the current apartment to the colors list
            colours += plan.__give_colours()
            
            # Append geometry from the current apartment to the _geo list
            _geo.append(shapely.wkt.loads(self.plans[plan_id][apartment_id]['geometries_dict']))

        # Flatten the list of geometries_dict
        geo = [polygon for row in _geo for polygon in row]

        # Plot the geometries_dict
        p = gp.GeoSeries(geo)
        p.plot(color=colours)
        
        # Set title
        plt.suptitle('Plan: ' + str(plan_id))
        
        # Show plot
        plt.show()

    def show_graph(self) -> None:
        """Displays the graph representation of the floor plan"""

        G = self.graph
        pos = self.room_centers
        edge_labels = {(node1, node2): str(distance) for node1, node2, distance in self.edges_with_weights}

        plt.figure(figsize=(10, 10))  # Initiate graph figure

        # Draw nodes
        node_colors = [G.nodes[node]['colour'] for node in G.nodes()]
        node_sizes = [((G.nodes[node]['room_area'] + 10) * 200) for node in G.nodes()]
        nx.draw(G, 
                pos=pos, 
                with_labels=False,
                node_color=node_colors, 
                node_size=node_sizes, 
                alpha=0.3, 
                linewidths=5
            )

        # Draw edges
        nx.draw_networkx_edges(G, 
                            pos, 
                            width=1, 
                            edge_color='#AAAE7F'
                        )

        # Draw node labels
        node_labels = {
            node: (f"{G.nodes[node]['node_index']}: {G.nodes[node]['room_type']}\n"
                f"exit_dis: {G.nodes[node]['distance_to_exit']}\n"
                f"area: {G.nodes[node]['room_area']}\n"
                f"Tp: {G.nodes[node]['time_pre_evac']}\n"
                f"Te: {G.nodes[node]['time_evac']}\n"
                f"b_c: {round(G.nodes[node]['betweenness_centrality'],2)}\n"
                f"c_c: {round(G.nodes[node]['closeness_centrality'],2)}")
            for node in G.nodes()
        }
        nx.draw_networkx_labels(G, 
                                pos=pos, 
                                labels=node_labels, 
                                font_size=9, 
                                font_weight='normal',
                                horizontalalignment='center'
                            )

        # Draw edge labels
        nx.draw_networkx_edge_labels(G, 
                                    pos, 
                                    edge_labels=edge_labels, 
                                    font_weight='bold', 
                                    alpha=0.35
                                )

        plt.show()  # Show graph

    def __give_colours(self) -> List[str]:
        """ Returns the list of room colours for all rooms in the plan, according to RoomColours.py"""

        # Extract plan ID and apartment ID
        plan_id = self.plan_id
        apartment_id = self.apartment_id

        # Get subtypes for the given plan and apartment
        subtypes = self.plans[plan_id][apartment_id]['sub_type_dict']

        # Map subtypes to colors
        colours = [roomColours[x] for x in subtypes]

        return colours

    def __get_rooms(self) -> List[Polygon]:
        """ Returns all rooms in the plans as List of polygons """

        # Extract plan ID and apartment ID
        plan_id = self.plan_id
        apartment_id = self.apartment_id

        # Fetch geometries_dict and types of apartment components from self.plans dictionary
        component_geometry = self.plans[plan_id][apartment_id]['geometries_dict']
        types = self.plans[plan_id][apartment_id]['types_dict']
        sub_types = self.plans[plan_id][apartment_id]['sub_type_dict']

        # Extract room shapes excluding shafts
        rooms = [component_geometry[i] for i, x in enumerate(types) if types[i] == 'area' and sub_types[i] != 'SHAFT']
        
        # Convert WKT (Well-Known Text) format to Shapely geometries_dict for visualization and analysis
        rooms = [shapely.wkt.loads(room) for room in rooms]
        
        return rooms
    
    def __get_room_attributes(self) -> Dict[int, Dict]:
        """ Make room attributes dictionary => {node_id : {attributes}, node_id: {attributes}} """

        # Extract plan ID and apartment ID
        plan_id = self.plan_id
        apartment_id = self.apartment_id

        # Extract component information from the floor plan
        types = self.plans[plan_id][apartment_id]['types_dict']
        sub_types = self.plans[plan_id][apartment_id]['sub_type_dict']
        area_ids = self.plans[plan_id][apartment_id]['area_ids_dict']
        max_distances_to_entrance = self.plans[plan_id][apartment_id]['distance_to_exit_dict']

        # Extract room types excluding shafts
        room_types = [sub_types[i] for i, x in enumerate(types) if types[i] == 'area' and sub_types[i] != 'SHAFT']
        room_indices = list(range(len(room_types)))

        # Extract room area IDs
        room_area_ids = [area_ids[i] for i, x in enumerate(sub_types) if x != 'SHAFT']

        # Extract room areas
        room_areas = [round(room.area, 2) for room in self.rooms]

        # Initialize room attributes dictionary
        room_attributes = {}

        # Populate room attributes
        for i, idx in enumerate(room_indices):
            room_attributes[idx] = {
                'room_type': room_types[i],
                'room_area_id': room_area_ids[i],
                'distance_to_exit': max_distances_to_entrance[i],
                'room_area': room_areas[i]
            }

        # Add entrance door properties
        entrance_door_area_ids = [area_ids[i] for i, x in enumerate(sub_types) if x == 'ENTRANCE_DOOR']
        entrance_door_idx = room_indices[-1] + 1
        room_attributes[entrance_door_idx] = {
            'room_type': 'ENTRANCE_DOOR',
            'room_area_id': entrance_door_area_ids[0],
            'distance_to_exit': 0,
            'room_area': 0
        }

        return room_attributes

    def __get_room_centres(self) -> Dict[int, Tuple[float, float]]:
        """ Returns a dictionary of key : value as node_id: centroid"""
        
        # Extract rooms and entrance door
        rooms = self.rooms
        entrance_door_centre = self.entrance_door_centre
        
        # Initialize list of room indices
        room_indices = list(range(len(rooms)))

        # Add index for the entrance door
        room_indices.append(len(rooms))

        # Compute centroids of rooms and entrance door
        centroids_pts = [shape.centroid for shape in rooms]
        centroids_pts.append(entrance_door_centre)

        # Create dictionary mapping indices to centroid coordinates
        centroids_coords_dict = {
            i: (round(point.x, 2), round(point.y, 2)) 
            for i, point in zip(room_indices, centroids_pts)
        }
        
        return centroids_coords_dict

    def __get_doors(self) -> Tuple[List[Polygon], Dict, Polygon]:
        """ Returns a list of all doors, including the entrance door, as a list of polygons"""

        # Extract plan ID and apartment ID   
        plan_id = self.plan_id
        apartment_id = self.apartment_id

        # Extract component geometries_dict and subtypes
        components = self.plans[plan_id][apartment_id]['geometries_dict']
        sub_type_dict = self.plans[plan_id][apartment_id]['sub_type_dict']

        # Extract door geometries_dict and entrance door geometry
        doors = [components[i] for i, subtype in enumerate(sub_type_dict) if subtype in ['DOOR', 'ENTRANCE_DOOR']]
        entrance_door = [components[i] for i, x in enumerate(sub_type_dict) if sub_type_dict[i] == 'ENTRANCE_DOOR' ]

        # Convert geometries_dict from WKT to Shapely polygons
        doors = shapely.wkt.loads(doors)
        entrance_door = shapely.wkt.loads(entrance_door)[0]

        def get_door_widths(doors):
            """ Returns a list of door widths for each apartment, same indexing as self.doors"""

            ## initialise door widths list
            door_widths = []


            for door in doors:
                # Get the bounding box
                minx, miny, maxx, maxy = door.bounds

                # Calculate width and length
                width = maxx - minx
                length = maxy - miny

                # Return the smallest of the two
                door_width = max(width, length)
                door_widths.append(door_width)

            ## normalise door widths, so that connections without doors have weight of 1.0.
            def normalize_list(values, min_value=0.75, max_value=0.1):
                min_val = min(values)
                max_val = max(values)

                normalized_values = [0.1 for v in values]

                for i, v in enumerate(values):
                    if (max_val - min_val) == 0:
                        normalized_values[i] = 0.05
                    else:
                        normalized_values[i] = round( ((v - min_val) / (max_val - min_val) * (max_value - min_value) + min_value), 2) 
                        
                return normalized_values
            
            if len(door_widths) == 1:
                width = door_widths[0]
                if width > 0.75:
                    normalised_door_widths = 0.0
                else:
                    normalised_door_widths = 0.75 / width
            normalised_door_widths = normalize_list(door_widths)

            return normalised_door_widths
        
        door_widths = get_door_widths(doors)
        door_widths = {key: value for key, value in zip([i for i in range(len(doors))], door_widths)}

        return doors, door_widths, entrance_door
    
    def __get_walls(self) -> List[Polygon]:
        """ Returns all wall shapes in the plan as a list of polygons"""

        # Extract plan ID and apartment ID
        plan_id = self.plan_id
        apartment_id = self.apartment_id

        # Extract component geometries_dict and subtypes
        components = self.plans[plan_id][apartment_id]['geometries_dict']
        component_subtypes = self.plans[plan_id][apartment_id]['sub_type_dict']

        # Extract wall geometries_dict
        walls = [components[i] for i, subtype in enumerate(component_subtypes) if subtype == 'WALL']

        # Convert geometries_dict from WKT to Shapely polygons
        walls = [shapely.wkt.loads(wall) for wall in walls]

        return walls

    def __get_room_room_connections(self) -> Dict[Tuple[int, int], float]:
        connection_distances = {}
        connections_set = set()  # Adjacency matrix of rooms

        # Iterate over rooms
        for from_index, from_room in enumerate(self.rooms):
            # Check connections to neighboring rooms without doors (open plan)
            for to_index, to_room in enumerate(self.rooms):
                if from_index == to_index:
                    continue

                # Find nearest points between rooms
                roomToRoom_nearestPts = nearest_points(from_room, to_room)
                roomToRoom_distance = round(roomToRoom_nearestPts[0].distance(roomToRoom_nearestPts[1]), 2)

                if roomToRoom_distance < 0.07:
                    # Sort connection, turn to tuple and add to set to prevent duplicate connections
                    connection = tuple(sorted([from_index, to_index]))
                    connections_set.add(connection)

                    # Calculate mean connection distance (edge weight) and add to connection_distances
                    pt_current_centre = from_room.centroid
                    pt_neighbour_centre = to_room.centroid
                    pt_connection = Point((pt_current_centre.x + pt_neighbour_centre.x) / 2,
                                        (pt_neighbour_centre.y + pt_current_centre.y) / 2)
                    line_connection_walk = LineString([pt_current_centre, pt_connection, pt_neighbour_centre])
                    length_connection_walk = line_connection_walk.length
                    connection_distances[connection] = round(length_connection_walk, 2)

        return connection_distances
    
    def __get_room_door_room_connections(self) -> Tuple[Dict, Dict]:
        rooms = self.rooms
        doors = self.doors
        connections_set = set()
        connection_distances = {}
        door_widths = {}

        for door_index, door in enumerate(doors):
            # make a connection (pair of rooms) for every door
            connection = []
            for room_index, room in enumerate(rooms):
                doorToRoom_nearestPts = nearest_points(door, room)
                doorToRoom_distance = round(doorToRoom_nearestPts[0].distance(doorToRoom_nearestPts[1]), 2)
                if doorToRoom_distance == 0.0:
                    connection.append(room_index)
            # turn list into tuple and add to set to prevent adding duplicates
            if len(connection) <= 1:
                continue

            # sort connection, turn to tuple and add to set to prevent duplicate connections
            connection.sort()
            connection = tuple(connection)
            connections_set.add(connection)

            ## add door index to dictionary for adding it as an edge attribute in self.make_graph()
            door_widths[connection] = self.door_widths[door_index]

            # calculate and add mean connection distance (edge weight) to connection distances
            from_room = rooms[connection[0]]
            to_room = rooms[connection[1]]

            pt_current_centre = from_room.centroid
            pt_neighbour_centre = to_room.centroid
            pt_door_centre = door.centroid
            line_connection_walk = LineString([pt_current_centre, pt_door_centre, pt_neighbour_centre])
            length_connection_walk = line_connection_walk.length
            connection_distances[connection] = round(length_connection_walk, 2)

        return connection_distances, door_widths

    def __get_entrance_connection(self) -> Tuple[Dict, Tuple[float, float]]:
        # Extract necessary attributes
        entrance_door = self.entrance_door
        entrance_door_centre = entrance_door.centroid
        rooms = self.rooms
        entrance_door_node_id = len(rooms)
        
        # Initialize variables
        connections_set = set()
        connection_dict = {}

        # Iterate over each room to compute connections with the entrance door
        for i, room in enumerate(rooms):
            # Compute distance between nearest points of the door and the current room
            e_door_nearest_pts = nearest_points(entrance_door, room)
            distance = round(e_door_nearest_pts[0].distance(e_door_nearest_pts[1]), 2)

            # If the distance is 0, the door is adjacent to the room
            if distance == 0.0:
                # Create a connection between the entrance door and the room
                connection = [entrance_door_node_id, i]
                
                # Compute connection weight (distance between the centre of the door and the centre of the room)
                connection_distance = self.room_attributes[i]['distance_to_exit']
                if not 0 <= connection_distance <= 100:
                    connection_distance = round(entrance_door.centroid.distance(room.centroid), 2)
                # connection_distance = round(entrance_door_centre.distance(room.centroid), 2)
                
                # Sort connection and convert to tuple
                connection.sort()
                connection = tuple(connection)
                
                # Add connection to the set to prevent duplicates
                connections_set.add(connection)
                
                # Convert the set to a list and extract the connection tuple
                connections = [tup for tup in connections_set]
                connections = connections[0]
                
                # Add connection and its distance to the connection dictionary
                connection_dict[connections] = connection_distance
                    
        return connection_dict, entrance_door_centre

    def __make_graph(self) -> nx.Graph:
        """ Constructs a nx.Graph component from Nodes, Edges, Node attributes and Edge Attributes"""

        # Initialize an empty graph
        G = nx.Graph()

        # Add edges with weights and edge attributes
        G.add_weighted_edges_from(self.edges_with_weights)
        nx.set_edge_attributes(G, self.edge_attributes, 'attributes')

        # Get shortest paths to exit
        shortest_paths = self.__get_shortest_paths_to_exit(G)

        Tp, Te = self.__get_exit_times()

        # Add nodes and attributes
        for i in G.nodes:
            # Extract room attributes
            room_type = self.room_attributes[i]['room_type']
            distance_to_exit = self.room_attributes[i]['distance_to_exit']
            node_area = self.room_attributes[i]['room_area']
            time_pre_evac = Tp[i]
            time_evac = Te[i]
 
            # Correct distance to exit if invalid
            if not 0 <= distance_to_exit <= 100:
                distance_to_exit = shortest_paths[i]

            if time_evac < 0.01:
                time_evac = round(distance_to_exit / 0.7, 2)

            # Get node color
            node_color = roomColours[room_type]

            # Add node to graph
            G.add_node(
                i, 
                colour=node_color, 
                room_type=room_type, 
                distance_to_exit=distance_to_exit, 
                node_index=i, 
                room_area=node_area,
                time_pre_evac= time_pre_evac,
                time_evac = time_evac
            )
            

        # Add betweenness centrality attribute to nodes
        betweenness_centrality = nx.betweenness_centrality(G)
        nx.set_node_attributes(G, betweenness_centrality, 'betweenness_centrality')

        # Add closeness centrality to nodes
        closeness_centrality = nx.closeness_centrality(G)
        nx.set_node_attributes(G, closeness_centrality, 'closeness_centrality')
        
        
        return G
    
    def __get_room_adjacencies(self) -> List[Tuple[int, int, float]]:

        # Get node indices for rooms and entrance connection
        node_indices = [i for i in range(len(self.rooms))]
        node_indices += self.entrance_connection

        # Retrieve room-door-room adjacencies, room-room adjacencies, and entrance-door adjacency
        room_door_room_adjacencies = self.connections_with_doors
        room_room_adjacencies = self.connections_without_doors
        entrance_door_adjacency = self.entrance_connection

        # Combine all adjacency information into a single dictionary
        edges = {}
        edges.update(room_door_room_adjacencies)
        edges.update(room_room_adjacencies)
        edges.update(entrance_door_adjacency)

        ## create door_width dictionary
        door_widths = {}
        for edge in list(edges.keys()):
            if edge in self.connections_with_doors.keys():
                ## room connections with doors are further weighted by the normalised value of the inverse of the door widths (larger door -> smaller weight, smaller door -> bigger weight)
                door_widths[edge] = self.door_widths[edge]
            else:
                ## room connections without doors are further weighted by 0.0, (preferable connection)
                door_widths[edge] = 0.0

        # Convert dictionary entries into a list of tuples with node indices and edge weights
        edges_and_weights = [(a[0], a[1], v) for a, v in edges.items()]

        return edges_and_weights, door_widths

    def __get_edge_attributes(self) -> Dict:
        edges_with_weights = self.edges_with_weights
        room_types = self.room_attributes
        edge_attributes = {}

        # Iterate over each edge and extract attributes
        for edge in edges_with_weights:
            # Extract room types of nodes connected by the edge
            node1_type = room_types[edge[0]]['room_type']
            node2_type = room_types[edge[1]]['room_type']

            # Store edge attributes with node types
            edge_attributes[(edge[0], edge[1])] = {
                'edge_node_types': (node1_type, node2_type), 
                'edge_opening_weight': self.edge_door_widths[(edge[0], edge[1])]
            }

            ## add door width to edge attribute
    

        return edge_attributes

    def __get_edge_midpoints(self) -> Dict:
        midpoints = {}
        edges = [(edge[0], edge[1]) for edge in self.edges_with_weights] 
        room_centers = self.__get_room_centres()

        for i, edge in enumerate(edges):
            if i < len(room_centers):
                midpoints[edge] = (round((room_centers[edge[0]][0] + room_centers[edge[1]][0]) / 2, 2), round((room_centers[edge[0]][1] + room_centers[edge[1]][1]) / 2, 2))
            else:
                midpoints[edge] = (round((room_centers[edge[0]][0] + self.entrance_door_centre.x) / 2, 2), round((room_centers[edge[0]][1] + self.entrance_door_centre.y) / 2, 2))
        return midpoints
    
    def __get_shortest_paths_to_exit(self, graph: nx.Graph) -> Dict:
            exit_node = graph.number_of_nodes() - 1
            shortest_paths = nx.single_target_shortest_path(graph, exit_node)

            def calculate_total_distance(_graph, route):
                total_distance = 0
                for i in range(len(route) - 1):
                    source = route[i]
                    target = route[i + 1]
                    total_distance += _graph[source][target]['weight']

                return total_distance
            
            for key in list(shortest_paths.keys()):
                distance = calculate_total_distance(graph, shortest_paths[key])
                shortest_paths[key] = round(distance, 2)
            
            return shortest_paths

    def __get_exit_times(self) -> Tuple[Dict, Dict]:
        """ Runs a monte carlo simulation on the markov chain, representing behaviour during fire. Giving predicted pre evacuation time per type of room. This function is stochastic, thus every time it is run it will return a different result"""

        markov_chain = self.mrk_chain

        ## Total Exit Time (Tn) = Pre-Evacuation Time (Tp) + Evacuation Time (Te)

        ## Pre Evacuation Time (Tp) Variables

        ## room types
        room_types = [self.room_attributes[i]['room_type'] for i in list(self.room_attributes.keys())]
        pre_evac_time_per_room = {i: 0.0 for i, room_type in enumerate(room_types)}
        evac_time_per_room = {i: 0.0 for i, room_type in enumerate(room_types)}
        
        ## expected time if sleeping
        pre_evac_time_sleeping = round(markov_chain.simulate_markov_chain(1, End_node),2)

        ## expected time if dressing
        pre_evac_time_dressing = round(markov_chain.simulate_markov_chain(4, End_node),2)

        ## expected time if feel concern
        pre_evac_time_concern = round(markov_chain.simulate_markov_chain(11, End_node), 2)

        ## expected time if rescue attempt
        pre_evac_time_rescue_attempt = round(markov_chain.simulate_markov_chain(14, End_node), 2)

        ## Evacuation Time (Te) Variables
        distance_to_exit = [self.room_attributes[i]['distance_to_exit'] for i in list(self.room_attributes.keys())]

        ## occupant speed (assumed at 0.7 m/s)
        ## t = d / s
        occupant_speed = 0.7 


        ## calculate pre-evacuation time per room, depending on prior task prediction, e.g if in bedroom, more likely to be sleeping / dressing

        for i, room_type in enumerate(room_types):
            ## add evacuation time
            evac_time_per_room[i] = round(distance_to_exit[i] / occupant_speed)
            ## add pre evacuation time
            if room_type in  ['LIVING_DINING', 'LIVING_ROOM']:
                pre_evac_time_per_room[i] = round((pre_evac_time_sleeping + pre_evac_time_concern) / 2, 2)
            elif room_type in ['ROOM', 'BEDROOM']:
                pre_evac_time_per_room[i] = round((pre_evac_time_sleeping + pre_evac_time_dressing) / 2, 2)
            elif room_type == 'KITCHEN':
                pre_evac_time_per_room[i] = pre_evac_time_rescue_attempt
            elif room_type == 'ENTRANCE_DOOR':
                pre_evac_time_per_room[i] = 0
            else:
                pre_evac_time_per_room[i] = round((pre_evac_time_concern + pre_evac_time_rescue_attempt) / 2, 2)

        return pre_evac_time_per_room, evac_time_per_room
    
    def assign_non_normalised_label(self) -> Dict[str, int]:
        """ Assigns raw labels to each floor plan, which are to be normalised once all labels are collected"""

        apartment_id = self.apartment_id
        graph = self.graph
        room_types = nx.get_node_attributes(graph, 'room_type')
        room_area = nx.get_node_attributes(graph, 'room-area')
        kitchen_node = [key for key in list(room_types.keys()) if room_types[key] == 'KITCHEN'][0]
        ## pre evacuation time
        room_Tp = [x for x in nx.get_node_attributes(graph, 'time_pre_evac').values() if x != 0]
        expected_Tp = Expected_preEvac_time
        Tp_diff = [round(x - expected_Tp,2) for x in room_Tp]
        Tp_average_difference = round(sum(Tp_diff) / len(Tp_diff),2)

        ## evacuation time
        room_Te = [x for x in nx.get_node_attributes(graph, 'time_evac').values() if x != 0]
        Te_average = round(sum(room_Te) / len(room_Te),2)
        if Te_average > 20:
            Te_average = 20
    

        ## centrality metrics
        room_between_cent = nx.get_node_attributes(graph, 'betweenness_centrality')
        room_close_cent = nx.get_node_attributes(graph, 'closeness_centrality')
        kitchen_betweenness_centrality = room_between_cent[kitchen_node]
        kitchen_closeness_centrality = room_close_cent[kitchen_node]
        kitchen_centrality = kitchen_closeness_centrality + kitchen_betweenness_centrality
        if kitchen_centrality > 1.25:
            kitchen_centrality = 1.25

        floor_plan_labels = {
            apartment_id: {
                'Tp_average_difference': Tp_average_difference,
                'Te_average': Te_average,
                'kitchen_centrality': kitchen_centrality
            }
        }

        return floor_plan_labels

    def save_training_image(self, width_pixels: float, height_pixels: float, dpi: int, transparent_background: bool, subfolder: str) -> None:
        plan_id = self.plan_id
        apartment_id = self.apartment_id

        os.makedirs(subfolder, exist_ok=True)
        filepath = os.path.join(subfolder, f"{apartment_id}.png")  # Include file extension and possibly more info
        
        try:
            colours = self.__give_colours()
            polygons = shapely.wkt.loads(self.plans[plan_id][apartment_id]['geometries_dict'])
        except Exception as e:
            # print(f"Error loading data: {e}")
            return
        
        fig, ax = plt.subplots(figsize=(width_pixels / dpi, height_pixels / dpi))
        
        try:
            for polygon, colour in zip(polygons, colours):
                x, y = polygon.exterior.xy
                ax.fill(x, y, color=colour)
            
            ax.set_aspect('equal', 'box')
            ax.axis('off')
            plt.savefig(filepath, dpi=dpi, transparent=transparent_background)
            plt.close(fig)  # Close the figure to free up memory
            # print(f"Image saved to {filepath}")
        except Exception as e:
            # print(f"Error saving image: {e}")
            if os.path.exists(filepath):
                os.remove(filepath)  # Remove the file if saving fails to prevent incomplete files

### Save images dataset

In [23]:
def make_plan_graphs(plans, plan_ids, apartment_ids):
    labels = {}
    
    for i, plan_id in enumerate(plan_ids):
        for apartment_id in apartment_ids[plan_id]:
            try:
                plan = Plan(plan_id, apartment_id, plans, behaviour_class, room_weights)

                label = plan.assign_non_normalised_label()
                plan.save_training_image(224,224,300,True,'raw_saved_images') ## takes around 12-14 minutes to run for all plans
                labels[apartment_id] = list(label.values())[0]
            except Exception as e:
                continue
    return labels

In [24]:
# plans_dataset = make_plan_graphs(plans, plan_ids,apartment_ids)

# print(plans_dataset)

### Save labels

In [25]:
def normalise_labels(raw_labels):
    tp_avg_diff = []
    te_avg = []
    kitchen_cent = []
    normalised_labels_dict = {}

    for apartment_id in list(raw_labels.keys()):
        tp_avg_diff.append(raw_labels[apartment_id]['Tp_average_difference'])
        te_avg.append(raw_labels[apartment_id]['Te_average'])
        kitchen_cent.append(raw_labels[apartment_id]['kitchen_centrality'])
    def normalize_list(lst):
        min_val = min(lst)
        max_val = max(lst)
        range_val = max_val - min_val
        normalized_lst = [(x - min_val) / range_val for x in lst]
        return normalized_lst
    def z_score_normalization(values):
        """
        Normalize a list of values using Z-score normalization.

        Args:
            values (list): A list of numeric values.

        Returns:
            list: Normalized values using Z-score normalization.
        """
        # Convert the values to a NumPy array for easy mathematical operations
        values_array = np.array(values)
        
        # Calculate the mean and standard deviation of the values
        mean = np.mean(values_array)
        std_dev = np.std(values_array)
        
        # Normalize the values using Z-score normalization
        normalized_values = (values_array - mean) / std_dev
        
        return normalized_values.tolist()
    
    norm_tp_avg_diff = normalize_list(z_score_normalization(tp_avg_diff))
    norm_te_avg = normalize_list(z_score_normalization(te_avg))
    norm_kitchen_cent = normalize_list(z_score_normalization(kitchen_cent))

    def average_lists(list1, list2, list3):
        combined_list = []
        for elem1, elem2, elem3 in zip(list1, list2, list3):
            average = (elem1 + elem2 + elem3) / 3
            combined_list.append(average)
        n_v =  normalize_list(combined_list)
        return n_v
    
    normalised_labels = average_lists(norm_tp_avg_diff, norm_te_avg, norm_kitchen_cent)
    for i, apartment_id in enumerate(list(raw_labels.keys())):
        normalised_labels_dict[apartment_id] = round(normalised_labels[i],1)
    return normalised_labels_dict

In [35]:
# Normalised_Labels = normalise_labels(plans_dataset)
# print(Normalised_Labels)
# Plotting the histogram
# plt.hist(Normalised_Labels, bins=10, color='blue', edgecolor='black')

# # Adding labels and title
# plt.xlabel('Value')
# plt.ylabel('Frequency')
# plt.title('Histogram of List of Numbers')

# # Displaying the plot
# plt.show()

# with open('training_plan_labels.json', 'w') as json_file:
#     json.dump(Normalised_Labels, json_file)

### Reorder images to train and test datasets


In [None]:
## load dataset labels from json to python dictionary

with open('training_plan_labels.json', 'r') as file:
    labels = json.load(file)

In [None]:
unique_labels = (set(labels.values()))
print(unique_labels)

{0.4, 0.5, 0.6, 0.2, 0.3, 0.7, 0.1, 0.0, 0.8, 0.9, 1.0}


In [None]:
label_to_foldername_mapping = {
    0.0: 'score0',
    0.1: 'score1',
    0.2: 'score2',
    0.3: 'score3',
    0.4: 'score4',
    0.5: 'score5',
    0.6: 'score6',
    0.7: 'score7',
    0.8: 'score8',
    0.9: 'score9',
    1.0: 'score10'
}

In [None]:

def split_images_dataset(source_folder = 'raw_saved_images', train_folder = 'dataset_images/train', test_folder = 'dataset_images/test', training_size_percentage = 0.7) -> None:

    """ Moves images from raw folder to split into training and testing sub sets and also assign labels from dataset_labels.json"""

    # List all images
    images = os.listdir(source_folder)
    random.shuffle(images)

    # Calculate the split
    split_index = int(training_size_percentage * len(images))

    # Split into train and test sets
    train_images = images[:split_index]
    test_images = images[split_index:]

    # Move images to train folder
    for img in train_images:
        src_path = os.path.join(source_folder, img)
        dest_path = os.path.join(train_folder, img)
        shutil.move(src_path, dest_path, copy_function = shutil.copytree)
    # Move images to test folder
    for img in test_images:
        src_path = os.path.join(source_folder, img)
        dest_path = os.path.join(test_folder, img)
        shutil.move(src_path, dest_path)

    print("Dataset split completed.")

# split_images_dataset()

Dataset split completed.


In [32]:
def order_images_dataset_to_classes(train_folder = 'dataset_images/train', test_folder = 'dataset_images/test', class_mapping = label_to_foldername_mapping):

    # create class folders if they dont exist
    for key, value in class_mapping.items():
        class_folder = value
        train_destination_folder = train_folder +  '/' + class_folder
        test_destination_folder = test_folder + '/' + class_folder
        print(train_destination_folder)
        if not os.path.exists(train_destination_folder):
            os.makedirs(train_destination_folder)
        if not os.path.exists(test_destination_folder):
            os.makedirs(test_destination_folder)

    def move_to_class_folders(source_folder, class_mapping):

        images = os.listdir(source_folder)
        for img in images:
            src_path = os.path.join(source_folder, img)
            labels_key = os.path.splitext(os.path.basename(src_path))[0]
            label = labels[labels_key]
            class_folder = class_mapping[label]

            dest_path = os.path.join(source_folder, class_folder, img)
            print(dest_path)
            shutil.move(src_path, dest_path)

    # move_to_class_folders(train_folder, class_mapping)
    # move_to_class_folders(test_folder, class_mapping)

# order_images_dataset_to_classes()

NameError: name 'label_to_foldername_mapping' is not defined