In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
%%writefile '/content/drive/MyDrive/Colab Notebooks/Enhancing Urban Seismic Resilience: Integrating AI and Monte Carlo Methods for Real-Time Hazard Prediction/modules/hydraulic_simulator.py'

import numpy as np
import pandas as pd
import wntr
from scipy.stats import lognorm
import warnings
import logging

# Suppress warnings
warnings.filterwarnings('ignore')
logger = logging.getLogger('wntr.sim.hydraulics')
logger.setLevel(logging.ERROR)

class HydraulicSimulator:
    def __init__(self, epanet_file_path, scenarios_df):
        """
        Initializes the HydraulicSimulator with the water network model and earthquake scenario data.

        Parameters:
        -----------
        epanet_file_path : str
            File path to the EPANET input file.
        scenarios_df : pandas.DataFrame
            A DataFrame containing Peak Ground Acceleration (PGA) and Peak Ground Velocity (PGV) scenario data.
        """
        self.epanet_file_path = epanet_file_path
        self.scenarios_df = scenarios_df

        self.tanks_damage_state_params = {
            'Minor': (0.18, 0.5), 'Moderate': (0.55, 0.5),
            'Extensive': (1.15, 0.6), 'Complete': (1.50, 0.6)
        }
        self.pump_damage_state_params = {
            'Minor': (0.15, 0.75), 'Moderate': (0.36, 0.65),
            'Extensive': (0.77, 0.65), 'Complete': (1.5, 0.8)
        }
        self.tank_operational_levels = {
            'None': 1.0, 'Minor': 0.9, 'Moderate': 0.8,
            'Extensive': 0.6, 'Complete': 0.0
        }
        self.pump_operational_levels = {
            'None': 1.0, 'Minor': 0.9, 'Moderate': 0.8,
            'Extensive': 0.6, 'Complete': 0.0
        }

    def run_hydraulic_simulation(self, duration, leak_start_time, leak_end_time, minimum_pressure, required_pressure):
        """
        Runs hydraulic simulations for each scenario, updates the water network model accordingly,
        and computes the serviceability of each component based on the simulation results.

        Parameters:
        -----------
        duration : int
            Duration of simulation in hours.
        leak_start_time : int
            The simulation time (in hours) when leaks start.
        leak_end_time : int
            The simulation time (in hours) when leaks stop.
        minimum_pressure : float
            Minimum pressure required in the network.
        required_pressure : float
            Required pressure in the network.

        Returns:
        --------
        dict
            A dictionary containing cumulative serviceability of each component across all scenarios.
        """
        serviceability_dict = {}
        for scenario_index, scenario_group in self.scenarios_df.groupby('scenario_index'):

            wn = wntr.network.WaterNetworkModel(self.epanet_file_path)
            wn.options.hydraulic.demand_model = 'PDD'
            wn.options.time.duration = duration
            wn.options.hydraulic.minimum_pressure = minimum_pressure
            wn.options.hydraulic.required_pressure = required_pressure

            original_tank_levels = {tank_name: wn.get_node(tank_name).init_level for tank_name in wn.tank_name_list}

            operational_levels = self.determine_operational_levels(wn, scenario_group)
            pipelines_orifice_areas = operational_levels.values()

            Apply operational levels for tanks and pumps
            for tank_id, level in tanks_operational_levels.items():
                tank = wn.get_node(tank_id)
                # original_level = tank.init_level
                tank.init_level *= level

            # Apply orifice areas for pipelines
            for pipe_id, orifice_area in pipelines_orifice_areas.items():
                pipe = wn.get_link(pipe_id)
                wn = wntr.morph.split_pipe(wn, str(pipe), f"{pipe}_A", f"Leak_{pipe}")
                leak_node = wn.get_node(f"Leak_{pipe}")
                leak_node.add_leak(wn, area=orifice_area, start_time=leak_start_time, end_time=leak_end_time)

            # Run a hydraulic simulation
            sim = wntr.sim.WNTRSimulator(wn)
            results = sim.run_sim()

            # Calculate serviceability
            serviceability = {}
            for node_name in wn.node_name_list:
                if node_name in wn.tank_name_list:
                    # For tanks, serviceability is the ratio of new tank level to original tank level
                    new_tanks_level = results.node['head'].loc[:, node_name].mean() - wn.get_node(node_name).elevation
                    # serviceability[node_name] = new_tanks_level / original_tank_levels[node_name]
                    serviceability[node_name] = tanks_operational_levels[node_name]
                if node_name in wn.junction_name_list:
                    # For junctions, serviceability is the ratio of satisfied demand to required demand
                    satisfied_demand = results.node['demand'].loc[:, node_name].mean()
                    required_demand = wntr.metrics.expected_demand(wn)[node_name].mean()
                    if required_demand == 0:
                        # Skip the node if the required demand is zero
                        continue
                    serviceability[node_name] = satisfied_demand / required_demand

            for pump_name in wn.pump_name_list:
                # For pumps, serviceability is based on their operational level
                serviceability[pump_name] = pumps_operational_levels[pump_name]

            # Accumulate serviceability from this scenario
            for node, value in serviceability.items():
                if node not in serviceability_dict:
                    serviceability_dict[node] = []
                serviceability_dict[node].append(value)

        wide_serviceability_df = pd.DataFrame.from_dict(serviceability_dict).transpose().reset_index().rename(columns={'index': 'site_id'})
        serviceability_df = wide_serviceability_df.melt(id_vars=["site_id"], var_name="scenario_index", value_name="serviceability").astype({'scenario_index': int})

        return serviceability_df

    def determine_operational_levels(self, wn, scenario_group):
        """
        Determine the operational levels for tanks, pumps, and pipelines for each scenario based on damage states.

        Parameters:
        -----------
        wn : wntr.network.WaterNetworkModel
            Water network model object.
        scenario_group : pandas.DataFrame
            A subset of the full scenario DataFrame filtered for a specific scenario index.

        Returns:
        --------
        dict
            A dictionary with operational levels for tanks, pumps, and orifice areas for pipelines.
        """

        operational_levels = {}

        pipelines_orifice_areas = self.apply_damage_states(wn, scenario_group)
        operational_levels = {
            'tanks_operational_levels': tanks_operational_levels,
            'pumps_operational_levels': pumps_operational_levels,
            'pipelines_orifice_areas': pipelines_orifice_areas
        }

        return operational_levels

    def apply_damage_states(self, wn, scenario_group):
        """
        Applies damage states to tanks, pumps, and pipelines based on the intensity measures from the given scenarios.

        Parameters:
        -----------
        wn : wntr.network.WaterNetworkModel
            Water network model object.
        scenario_group : pandas.DataFrame
            A subset of the full scenario DataFrame filtered for a specific scenario index.

        Returns:
        --------
        tuple
            A tuple containing dictionaries for operational levels of tanks, pumps, and pipelines.
        """

        tanks_operational_levels = {}
        pumps_operational_levels = {}
        pipelines_orifice_areas = {}
        for index, row in scenario_group.iterrows():
            if row['site_id'] in wn.tank_name_list:
                damage_state = self.determine_damage_state(row['PGA'], self.tanks_damage_state_params)
                operational_level = self.tank_operational_levels.get(damage_state, 1.0)
                tanks_operational_levels[row['site_id']] = operational_level

            elif row['site_id'] in wn.pump_name_list:
                damage_state = self.determine_damage_state(row['PGA'], self.pump_damage_state_params)
                operational_level = self.pump_operational_levels.get(damage_state, 1.0)
                pumps_operational_levels[row['site_id']] = operational_level

            # Calculate orifice area for pipelines
            if row['site_id'] in wn.pipe_name_list:
                pipe_name = row['site_id']
                repair_rate = 0.000241 * row['PGV'] / 100
                link_length = wn.get_link(pipe_name).length * 0.3048  # Convert to meters
                num_damages = np.random.poisson(repair_rate * link_length)
                leaks, breaks = int(0.85 * num_damages), int(0.15 * num_damages)
                diameter = wn.get_link(pipe_name).diameter
                area = np.pi * (diameter / 2) ** 2
                orifice_area = min((0.03 * leaks + 0.2 * breaks) * area, area)
                pf = 1 - np.exp(-repair_rate * link_length)
                # Check if the pipeline fails
                if np.random.uniform() < pf:
                    pipelines_orifice_areas[row['site_id']] = orifice_area

        return pipelines_orifice_areas

    def determine_damage_state(self, intensity_measure, damage_state_params):
        """
        Determines the damage state for a component based on the given intensity measure and damage state parameters.

        Parameters:
        -----------
        intensity_measure : float
            The intensity measure (PGA or PGV) for the scenario.
        damage_state_params : dict
            A dictionary containing damage state parameters including medians and betas for the log-normal distribution.

        Returns:
        --------
        str
            The determined damage state (e.g., 'Minor', 'Moderate', 'Extensive', 'Complete', 'None').
        """
        probabilities = {}
        previous_probability = 0
        for state, params in damage_state_params.items():
            median, beta = params
            exceedance_probability = 1 - lognorm.cdf(intensity_measure, s=beta, scale=median)
            probabilities[state] = exceedance_probability - previous_probability
            previous_probability = exceedance_probability
        random_number = np.random.uniform(0, 1)
        cumulative_probability = 0
        for state, probability in probabilities.items():
            cumulative_probability += probability
            if random_number <= cumulative_probability:
                return state
        return 'None'


Overwriting /content/drive/MyDrive/Colab Notebooks/Enhancing Urban Seismic Resilience: Integrating AI and Monte Carlo Methods for Real-Time Hazard Prediction/modules/hydraulic_simulator.py
