In [2]:
import mesa

import numpy as np
from PIL import Image
import rasterio
import math
import tqdm

In [3]:
# Read the tif file with DEM data
dem_file = "data/UK_DEM.tif"
dem_image = Image.open(dem_file)

# store the dem data into array
dem_array = np.array(dem_image)

In [4]:
# Obtain geographic resolution information
with rasterio.open(dem_file) as src:
    x_resolution = src.transform[0]
    y_resolution = src.transform[4]
    # The average of the x and y directions is taken as the geographic resolution of the pixel
    pixel_resolution = (abs(x_resolution) + abs(y_resolution)) / 2.0
    print("Each pixel of the DEM equals [" + str(pixel_resolution) + "m] in the real world")

Each pixel of the DEM equals [10.0m] in the real world


In [5]:
# Variable definition
T = 25    # Average temperature
h = 0.6   # Relative humidity
dt = 0.001    # Time cost in each step

# Variable will change when model runs
s = pixel_resolution    # Length of the patch side in real world
h_i = 0   # Elevation of patch i
h_i1 = 0  # Elevation of patch i+1

In [6]:
# Constant definition
a = 0.03
c = 0.05
D = -0.3

In [7]:
# Definition of spread rate
R_0 = a*T + c*h - D   # Flat, windless, homogeneous burning speed

# Constant definition 
K_s = 1  # Combustibles coefficient, which indicates the effect of combustibles on the progress of combustion.
K_w = 1  # The wind direction coefficient, which indicates the effect of windiness and wind direction on the progress of combustion. In this paper the effect of wind is not considered.



K_phi = 1 # Slope factor, which indicates the effect of elevation difference on the burning progress.
K_phi_4 = math.exp(3.533 * pow( (abs(h_i-h_i1) / s ), 1.2))                      # Slope factor, calculation formula in the four-neighborhood domain
K_phi_8 = math.exp(3.533 * pow( (abs(h_i-h_i1) / (math.sqrt(2)*s)), 1.2))        # Slope factor, calculation formula in the eight-neighborhood diagonal

# Definition of integrated propagation speed
R = R_0 * K_s * K_w * K_phi
R_4 = R_0 * K_s * K_w * K_phi_4   # spread in the four-neighborhood domain
R_8 = R_0 * K_s * K_w * K_phi_8   # spread in the eight-neighborhood diagonal


In [8]:
class PatchAgent(mesa.Agent):
    def __init__(self, pos, unique_id, model):
        super().__init__(unique_id, model)
        self.pos = pos
        self.grid = model
        self.elevation = None
        self.state = 0

    def step(self):
        if self.check_affected():
            print("I'm affacted at " + str(self.pos))
            try:
                self.update_state()
                print("My new state is " + str(self.state))
            except:
                return
        return
    
    # Definition of an agent's behavior
    # Check if it has been affected by its neighbour 
    def check_affected(self):
        flag = False
        for each in self.grid.get_neighbors(self.pos, moore=True):
            if each.state>0:
                flag = True
        return flag
    
    # Update the state value if affected
    def update_state(self):
        if self.check_affected() and not(self.check_boundary()):
            self.state = self.state + self.get_R_summary(4) + self.get_R_summary(8)  
            # print the log
            print("Update at " + str(self.pos))
        return
    
    # Check if it is boundary
    def check_boundary(self):
        flag = False
        if self.pos[0] in (0, 1885, 1934) or self.pos[1] in (0, 1885, 1934):
            flag = True
        
        return flag
    
    # get the single R, self as point1
    def get_R(self, x2, y2, mode = 4):
        elevation_1 = self.elevation
        if mode == 4:
            elevation_2 = self.grid[x2][y2].elevation
            R_4_value = R_0 * K_s * K_w * math.exp(3.533 * pow( (abs(elevation_1-elevation_2) / s ), 1.2))
            return R_4_value
        elif mode == 8:
            elevation_2 = self.grid[x2][y2].elevation
            R_8_value = R_0 * K_s * K_w * math.exp(3.533 * pow( (abs(elevation_1-elevation_2) / (math.sqrt(2)*s)), 1.2))
            return R_8_value
        else:
            print("get_R: Worng mode! please check the input")


    # get the result of sum R, self as point1
    def get_R_summary(self, mode = 4):
        x = self.pos[0]
        y = self.pos[1]

        if mode == 4:
            R_4_summary_value = (self.get_R(x+1, y, 4) + \
                                 self.get_R(x-1, y, 4) + \
                                 self.get_R(x, y+1, 4) + \
                                 self.get_R(x, y-1, 4))*dt/s
            return R_4_summary_value
        elif mode == 8:
            R_8_summary_value = (math.pow(self.get_R(x+1, y+1, 8), 2) + \
                                 math.pow(self.get_R(x+1, y-1, 8), 2) + \
                                 math.pow(self.get_R(x-1, y+1, 8), 2) + \
                                 math.pow(self.get_R(x-1, y-1, 8), 2)) * dt \
                                 / (2*math.pow(s,2))
            return R_8_summary_value
        else:
            print("get_R_summary: Worng mode! please check the input")


In [9]:
class WorldModel(mesa.space.SingleGrid):
    def __init__(self, dem_arry):
        self.schedule = mesa.time.SimultaneousActivation(self)
        self.height = dem_arry.shape[0]
        self.width = dem_arry.shape[1]
        super().__init__(self.width, self.height, False)

        self.ini_id = 0

        for x in range(self.width):
            for y in range(self.height):
                elevation = dem_arry[y ,x]
                patchAgent = PatchAgent(pos= (x,y),unique_id= self.ini_id, model= self)
                patchAgent.elevation = elevation
                self.place_agent(patchAgent, (x,y))
                self.schedule.add(patchAgent)
                self.ini_id = self.ini_id + 1
    # each step what will happen?
    def step(self):
        self.schedule.step()


    def get_agent_counts(self):
        print(self.schedule.get_agent_count())
        # 3649410

    # set a fire at XY
    def set_fire(self, x, y):
        fire_patch = self[x][y]
        fire_patch.state = 1
        print("Well done, you set up a fire at " + str(fire_patch.pos))



In [10]:
# 建立测试模型， 花费时间16s
test = WorldModel(dem_array)

In [12]:
test.set_fire(1000,1000)

Well done, you set up a fire at (1000, 1000)


In [13]:
for i in range(2):
    print("Step: " + str(i))
    test.step()

Step: 0


In [None]:
# test.step()