<a href="https://colab.research.google.com/github/37stu37/FFE/blob/master/FFE_pure_Dask.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
%%time 
%%capture
!apt update
!apt upgrade
!apt install gdal-bin python-gdal python3-gdal 
# Install rtree - Geopandas requirment
!apt install python3-rtree 
# Install Geopandas
!pip install git+git://github.com/geopandas/geopandas.git
# Install descartes - Geopandas requirment
!pip install descartes 
!pip install memory_profiler

CPU times: user 276 ms, sys: 65.8 ms, total: 341 ms
Wall time: 25.8 s


In [2]:
# Load the Drive helper and mount
from google.colab import drive
%tensorflow_version 2.x
# This will prompt for authorization.
drive.mount('/content/drive')

TensorFlow 2.x selected.
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
%%time
import datetime
import glob
import shutil
from math import sqrt
import os
import matplotlib.pyplot as plt
import bokeh
import numpy as np
import pandas as pd
import geopandas as gpd
from scipy.spatial import distance
from shapely.geometry import box
from shapely.geometry import shape
from shapely.geometry import Point
import networkx as nx
from sys import getsizeof
from numba import jit
import dask.dataframe as dd
import dask.array as da
import dask
from dask.distributed import Client
from dask.diagnostics import ProgressBar
%matplotlib inline
%load_ext memory_profiler

pd.options.mode.chained_assignment = None  # default='warn'

CPU times: user 899 ms, sys: 91.8 ms, total: 990 ms
Wall time: 1.01 s


In [4]:
client = Client(processes=False)
client

Failed to start diagnostics server on port 8787. [Errno 99] Cannot assign requested address
Could not launch service 'bokeh' on port 8787. Got the following message:

[Errno 99] Cannot assign requested address
  self.scheduler.start(scheduler_address)


0,1
Client  Scheduler: inproc://172.28.0.2/5596/1,Cluster  Workers: 1  Cores: 2  Memory: 13.66 GB


Set up the path  to data and output

In [5]:
path = '/content/drive/My Drive/05_Sync/FFE/FireNetwork/00_input'
path_output = '/content/drive/My Drive/05_Sync/FFE/FireNetwork/00_output'

!ls "/content/drive/My Drive/05_Sync/FFE/FireNetwork/00_input"
!ls '/content/drive/My Drive/05_Sync/FFE/FireNetwork/00_output/dask_edge_list'
!ls '/content/drive/My Drive/05_Sync/FFE/FireNetwork/00_output'

buildings_raw.cpg      buildings_raw_pts.mshp  buildings_raw.shp
buildings_raw.dbf      buildings_raw_pts.prj   buildings_raw.shx
buildings_raw.prj      buildings_raw_pts.shp   GD_wind.csv
buildings_raw_pts.cpg  buildings_raw_pts.shx   source_target.csv
buildings_raw_pts.dbf  buildings_raw.qpj
edge_data.parquet
dask_edge_list


In [0]:
def clean_up_file(path, prefix):
    files = glob.glob(os.path.join(path, prefix))
    for file in files:
        print(file)
        os.remove(file)
        # shutil.rmtree(file)

Create the functions to be used by the algorithm

In [0]:
def wind_scenario(file_name):
    # wind scenario conditions
    wind_data = pd.read_csv(os.path.join(path, file_name))
    i = np.random.randint(0, wind_data.shape[0])
    w = wind_data.iloc[i, 2]
    d = wind_data.iloc[i, 1]
    b = wind_data.iloc[i, 3]
    # wind direction
    wind_bearing_max = b + 45
    wind_bearing_min = b - 45
    if b == 360:
        wind_bearing_max = 45
    if b <= 0:  # should not be necessary
        wind_bearing_min = 0
    if b == 999:
        wind_bearing_max = 999
        wind_bearing_min = 0
    
    return wind_bearing_max, wind_bearing_min, d

In [0]:
@dask.delayed
def ignitions(edges, scenario):
    # add random column
    edges['rng'] = np.random.uniform(0, 1, size=len(edges))
    # to dask dataframe
    edges = dd.from_pandas(edges, npartitions=1)
    # filter on random column
    fires = edges[edges.rng < edges.IgnProb_bl]

    fires['step'] = 0
    fires['scenario'] = scenario

    return fires

In [0]:
@dask.delayed
def add_cols(fires,wind_bearing_max,wind_bearing_min,wind_distance,scenario,step):
    fires = fires.compute()
    # add wind attributes
    fires['wind_bearing_max'] = wind_bearing_max
    fires['wind_bearing_min'] = wind_bearing_min
    fires['wind_distance'] = wind_distance
    # add step and scenario
    fires['step'] = step
    fires['scenario'] = scenario
    return fires

@dask.delayed
def filter_on_wind_distance(fires):
    new_fires = fires[fires.distance < fires.wind_distance]
    return new_fires

@dask.delayed
def filter_on_wind_direction(fires):
    new_fires = fires[(fires.bearing < fires.wind_bearing_max) & 
                          (fires.bearing < fires.wind_bearing_min)]
    return new_fires

@dask.delayed
def concatenate_step_data(fires, burn):
    data = [burn, fires]
    BURNED = dd.concat(data, axis=0)
    BURNED.to_parquet(os.path.join(path_output, 
                                   'BURNED_scenario_{}_step_{}.parquet'.format(scenario, step)), engine='pyarrow')
    return BURNED

def conditions_spread(fires,burn,wind_bearing_max,wind_bearing_min,wind_distance,
                      scenario, step):
    # add columns to ddf
    fires = add_cols(fires,wind_bearing_max,wind_bearing_min,wind_distance,
                     scenario,step)
    # wind speed -> neighbors selection from wind buffer
    new_fires = filter_on_wind_distance(fires)
    # wind direction
    new_fires = filter_on_wind_direction(new_fires)
    # should not be already burnt
    burn_list = list(set(burn.source.compute()))
    new_fires = new_fires[~new_fires.target.isin(burn_list)]
    # # add step and scenario
    # new_fires = new_fires.compute()
    # print("type new fires : {}".format(type(new_fires)))
    # new_fires['step'] = step
    # new_fires['scenario'] = scenario

    # log burnt assets
    BURNED = concatenate_step_data(fires, burn)

    return new_fires, BURNED

In [0]:
@dask.delayed
def new_fires(fires, edges):
  fires_list = list(set(fires.target.compute().values))
  new_fires = edges[edges.source.isin(fires_list)]
  new_fires = dd.from_pandas(new_fires, npartitions=1)
  return new_fires

In [0]:
# log burned assets
def record_burnt_assets(list_of_burnt_assets, scenario):
  burn_df = pd.concat(list_of_burnt_assets, sort=True)
  # burn_ddf = dd.from_pandas(burn_df, npartitions=4)
  #export to parquet
  burn_df.to_parquet(os.path.join(path_output, 
                                   'output_scenario_{}.parquet'.format(scenario)), engine='pyarrow')
  return

In [0]:
@dask.delayed
def concatenate_outputs(prefix):
  files = glob.glob(os.path.join(path_output, prefix))
  for i, file in enumerate(files):
      df = dd.read_parquet(files)
  return df

In [0]:
def display_network(edge_list_dataframe):
    graph = nx.from_pandas_edgelist(edge_list_dataframe, edge_attr=True)
    options = {'node_color': 'red', 'node_size': 50, 'width': 1, 'alpha': 0.4,
               'with_labels': False, 'font_weight': 'bold'}
    nx.draw_kamada_kawai(graph, **options)
    plt.show()
    return graph

In [15]:
%%time
%memit
@dask.delayed
def load_data(file_name):
    EDGES = pd.read_parquet(os.path.join(path_output, 'dask_edge_list', file_name), engine='pyarrow')
    print("number of edges : {}".format(len(EDGES.index)))
    # G = display_network(EDGES)
    return EDGES

peak memory: 288.66 MiB, increment: 0.00 MiB
CPU times: user 135 ms, sys: 54.4 ms, total: 189 ms
Wall time: 339 ms


In [0]:
# run main
def main(number_of_scenarios, edges):
  # --- SCENARIOS
  print("number of scenarios : {}".format(number_of_scenarios))
  for scenario in range(number_of_scenarios):
      # list_BURNED = []
      BURNED = pd.DataFrame(columns=['source','target','distance', 'bearing',
                                      'IgnProb_bl', 'scenario', 'step'])
      BURNED = dd.from_pandas(BURNED, npartitions=1) # npartitions=os.cpu_count())
      print("--- SCENARIO : {}".format(scenario))
      print("type EDGES : {}".format(type(EDGES)))
      FIRES = ignitions(EDGES, scenario)
      print("type FIRES : {}".format(type(FIRES)))
      if len(FIRES.index.compute()) == 0:
          print("no fire")
          continue
      wind_bearing_max, wind_bearing_min, wind_distance = wind_scenario('GD_wind.csv') # no filtering, just adding wind info to dataframe
      # --------- STEPS
      for step in range(len(EDGES.index.compute())):
          print("--------- STEP : {}".format(step))
          FIRES, BURNED = conditions_spread(FIRES, BURNED, wind_bearing_max, wind_bearing_min, wind_distance,scenario, step) # filtering
          # list_BURNED.append(BURNED)
          print("type FIRES : {}".format(type(FIRES)))
          if len(FIRES.index.compute()) == 0:
            print("no more fire")
            break
          FIRES = new_fires(FIRES, EDGES)
          print("type FIRES : {}".format(type(FIRES)))

      # record scenario burnt assets
      # record_burnt_assets(list_BURNED, scenario)

  return

In [14]:
clean_up_file(path_output, "output*")
!ls '/content/drive/My Drive/05_Sync/FFE/FireNetwork/00_output'

dask_edge_list


In [29]:
%%time
%memit
# run main
EDGES = load_data('edge_data.parquet')
main(1, EDGES)

peak memory: 978.81 MiB, increment: 0.21 MiB
number of scenarios : 1
--- SCENARIO : 0
type EDGES : <class 'dask.delayed.Delayed'>
type FIRES : <class 'dask.delayed.Delayed'>
number of edges : 3457222
number of edges : 3457222
--------- STEP : 0
type FIRES : <class 'dask.delayed.Delayed'>
number of edges : 3457222
type FIRES : <class 'dask.delayed.Delayed'>
--------- STEP : 1
number of edges : 3457222


Function:  concatenate_step_data
args:      (         source  target  ...  wind_bearing_min  wind_distance
23            1    1266  ...               315             14
1136         27    5558  ...               315             14
2142         52    9317  ...               315             14
2144         52    9319  ...               315             14
9221        213   29032  ...               315             14
...         ...     ...  ...               ...            ...
3430425   74760   73711  ...               315             14
3431495   74787   74103  ...               315             14
3444138   75027   74245  ...               315             14
3446855   75073   75136  ...               315             14
3449207   75115   75130  ...               315             14

[1423 rows x 11 columns], Empty DataFrame
Columns: [source, target, distance, bearing, IgnProb_bl, scenario, step]
Index: [])
kwargs:    {}
Exception: TypeError("'>' not supported between instances of 'int' and

TypeError: ignored

In [0]:
%%time
%memit
df = concatenate_outputs(path_output, "scenario*")

In [0]:
remove parquet_file
clean_up_file(path_output, "output*")
!ls '/content/drive/My Drive/05_Sync/FFE/FireNetwork/00_output'

In [0]:
# %%time
# %memit
# df.to_parquet(os.path.join(path_output,'final.parquet'), engine='pyarrow')

In [0]:
# %%time
# # check convergence
# from itertools import groupby
# number_of_burns_per_scenarios = [len(list(group)) for key, group in groupby(scos)]
# cumulative_number_of_burns_per_scenarios = list(np.cumsum(number_of_burns_per_scenarios))
# scenario_list = list(set(scos))
# scenario_list = [x+1 for x in scenario_list]

# average_burn_per_scenario = [c / s for c,s in zip(cumulative_number_of_burns_per_scenarios, scenario_list)]

In [0]:
# plt.plot(scenario_list, average_burn_per_scenario)