<a href="https://colab.research.google.com/github/AnjaliS2021/WildfireDetection/blob/main/wildfire_detection_dataset.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# @title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the "License")

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

#Introduction
Purpose of this colab is to curate satellite imagery based tensorflow dataset for ML model based wildfire detection.

Steps are:
* University of Maryland catalogs “Monthly Fire Location Product”. Download the list for the months and years of interest.
* Google Earth Engine hosts satelellite imagery. Download the MODIS satellite imageries for the dates and locations of interest.
* Specifically, download MOD14A1 V6.1 imagery. It provides daily fire mask composites at 1km resolution derived from the MODIS 4-micrometer and 11-micrometer radiances.
* Transform the satellite imagery into tensorflow example dataset.
* Save the dataset files into google drive.

References:
* https://developers.google.com/earth-engine/datasets/catalog/MODIS_061_MOD14A1#description
* https://developers.google.com/earth-engine/guides/machine-learning
* https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/people-and-planet-ai/land-cover-classification/README.ipynb


In [None]:
#@title Installs
!pip install --quiet earthengine-api
!pip install --quiet apache-beam
!pip install --quiet apache-beam[interactive]
!pip install --quiet pysftp

In [None]:
#@title Imports
from datetime import datetime
from datetime import date as datetime_date
from google.api_core import exceptions, retry
from google.colab import auth
from numpy.lib.recfunctions import structured_to_unstructured

import apache_beam as beam
import ee
import google.auth
import gzip
import io
import numpy as np
import os
import pysftp
import requests
import tensorflow as tf
import shutil
import time

In [None]:
#@title Earth Engine Authentication
# Please fill in google earth engine project id.
project = "ee-wildfire-detection-ml"  # @param {type:"string"}
assert project, "Please provide a Google Cloud project ID"
# Set GOOGLE_CLOUD_PROJECT for google.auth.default().
os.environ["GOOGLE_CLOUD_PROJECT"] = project

auth.authenticate_user()
# Set the gcloud project for other gcloud commands.
!gcloud config set project {project}

credentials, _ = google.auth.default()
ee.Initialize(
    credentials.with_quota_project(None),
    project=project,
    opt_url="https://earthengine-highvolume.googleapis.com",
)

Updated property [core/project].


In [None]:
#@title Google Drive Mount
# Save dataset on persistent storage to avoid deletion after colab restart.
from google.colab import drive
drive.mount('/content/drive')
wildfire_drive_path = '/content/drive/MyDrive/wildfire_detection'
os.makedirs(wildfire_drive_path, exist_ok=True)
print(os.listdir(wildfire_drive_path))

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
[]


#Wildfire Identification

In [None]:
#@title Univ of Maryland
class Maryland:

  def __init__(self):
    self.max_wildfires = 100
    self.build_paths()
    self.build_directories(cleanup=False)
    self.build_dates()
    self.build_filenames()
    # Checks google drive for existing files, only downloads files that don't
    # exist already.
    self.download_files()
    self.build_wildfires()

  def __str__(self):
    messages = [
      'University of Maryland MODIS Monthly Fire Location Product',
      f'Num dates: {len(self.dates)}',
      f'Num files: {len(self.filenames)}',
      f'Num wildfires: {len(self.wildfires)}',
      f'all files: {self.all_files}',
    ]
    return '\n'.join(messages)

  def build_paths(self):
    self.base_directory = wildfire_drive_path
    self.source_directory = 'data/MODIS/C6/MCD14ML'
    self.destination_directory = os.path.join(
        self.base_directory, 'maryland_files')

  def build_directories(self, cleanup=False):
    if cleanup:
      shutil.rmtree(self.destination_directory, ignore_errors=True)
    os.makedirs(self.destination_directory, exist_ok=True)

  def build_dates(self):
    self.dates = []
    years = [2023, ]
    months = range(1, 13)
    for year in years:
      for month in months:
        date = datetime_date(year, month, 1)
        date_str = date.strftime('%Y%m')
        self.dates.append(date_str)
    print(self.dates)

  def build_filenames(self):
    self.filenames = []
    for date in self.dates:
      filename = f'MCD14ML.{date}.006.03.txt.gz'
      self.filenames.append(filename)

  def get_pending_download_filenames(self):
    print(f'Lookup existing files in: {self.destination_directory}')
    existing_files = os.listdir()
    print(f'Num existing files: {len(existing_files)}')
    self.all_files = existing_files
    filenames = []
    for filename in self.filenames:
      if filename not in existing_files:
        filenames.append(filename)
    print(f'Num files to download: {len(filenames)}')
    return filenames

  def download_files(self):
    hostname = 'fuoco.geog.umd.edu'
    username = 'fire'
    password = 'burnt'
    with pysftp.cd(self.destination_directory):
      filenames = self.get_pending_download_filenames()
      if not filenames:
        return
      cnopts = pysftp.CnOpts()
      cnopts.hostkeys = None
      with pysftp.Connection(hostname, username=username,
                             password=password, cnopts=cnopts) as sftp:
        with sftp.cd(self.source_directory):
          for filename in self.filenames:
            try:
              sftp.get(filename)
            except Exception as e:
              print(filename)
              print(e)
      self.all_files = os.listdir()

  def build_wildfires(self):
    self.wildfires = []
    for filename in self.filenames:
      lines = self.read_destination_file(filename)
      wildfires = self.build_wildfires_from_lines(filename, lines)
      self.wildfires.extend(wildfires)
    if self.max_wildfires > 0:
      self.wildfires = self.wildfires[:self.max_wildfires]
    if self.wildfires:
      print(self.wildfires[0])

  def build_wildfires_from_lines(self, filename, lines):
    wildfires = []
    indexes = None
    header_length = 0
    for line in lines:
      if indexes is None: # Header line.
        indexes = self.get_indexes(line)
        if not indexes:
          print('error: indexes are empty.')
          break
        header_length = len(line)
        continue
      if len(line) < header_length:
        print(f'error: expected {header_length} entries. invalid line {line}')
        break
      wildfire = self.get_wildfire(filename, indexes, line)
      wildfires.append(wildfire)
    return wildfires

  def get_indexes(self, line):
    names = ['YYYYMMDD', 'lon', 'lat', 'FRP', 'conf', 'type']
    indexes = {}
    for i, word in enumerate(line):
      if word in names:
        indexes[word] = i
    for name in names:
      if indexes.get(name) is None:
        print(f'error: {name} does not exist.')
        return None
    return indexes

  def get_wildfire(self, filename, indexes, line):
    wildfire = {'filename': filename,}
    float_names = ['lon', 'lat', 'FRP', 'conf']
    int_names = ['type']
    for name, i in indexes.items():
      value = line[i]
      if name in float_names:
        value = float(value)
      if name in int_names:
        value = int(value)
      wildfire[name] = value
    start_date = datetime.strptime(wildfire['YYYYMMDD'], '%Y%m%d')
    wildfire['start'] = start_date.strftime('%Y-%m-%d')
    return wildfire

  def read_destination_file(self, filename):
    lines = []
    with pysftp.cd(self.destination_directory):
      with gzip.open(filename, 'r') as file:
        for line in file:
          line = line.decode(encoding='utf-8', errors='strict')
          words = line.rstrip().split(' ')
          non_empty_words = []
          for word in words:
            if len(word) > 0:
              non_empty_words.append(word)
          lines.append(non_empty_words)
    return lines


maryland = Maryland()
print(maryland)

['202301', '202302', '202303', '202304', '202305', '202306', '202307', '202308', '202309', '202310', '202311', '202312']
Lookup existing files in: /content/drive/MyDrive/wildfire_detection/maryland_files
Num existing files: 0
Num files to download: 12




MCD14ML.202302.006.03.txt.gz
[Errno 2] No such file
MCD14ML.202303.006.03.txt.gz
[Errno 2] No such file
MCD14ML.202304.006.03.txt.gz
[Errno 2] No such file
MCD14ML.202305.006.03.txt.gz
[Errno 2] No such file
MCD14ML.202306.006.03.txt.gz
[Errno 2] No such file
MCD14ML.202307.006.03.txt.gz
[Errno 2] No such file
MCD14ML.202308.006.03.txt.gz
[Errno 2] No such file
MCD14ML.202309.006.03.txt.gz
[Errno 2] No such file
MCD14ML.202310.006.03.txt.gz
[Errno 2] No such file
MCD14ML.202311.006.03.txt.gz
[Errno 2] No such file
MCD14ML.202312.006.03.txt.gz
[Errno 2] No such file
{'filename': 'MCD14ML.202301.006.03.txt.gz', 'YYYYMMDD': '20230101', 'lat': -13.3373, 'lon': -44.1169, 'FRP': 7.0, 'conf': 54.0, 'type': 0, 'start': '2023-01-01'}
University of Maryland MODIS Monthly Fire Location Product
Num dates: 12
Num files: 12
Num wildfires: 100
all files: ['MCD14ML.202301.006.03.txt.gz', 'MCD14ML.202302.006.03.txt.gz', 'MCD14ML.202303.006.03.txt.gz', 'MCD14ML.202304.006.03.txt.gz', 'MCD14ML.202305.006

In [None]:
#@title 🛰 Earth Engine Satellite
class Satellite:

  def __init__(self):
    self.maryland = maryland
    self.name = 'MODIS/061/MOD14A1'
    self.bands = ['MaxFRP', 'FireMask', 'QA']
    self.band_max_values = [6000, 9, 7]
    self.low_confidence_fire_mask = 7
    self.nominal_confidence_fire_mask = 8
    self.high_confidence_fire_mask = 9
    self.scale = 1000  # resolution in meters.
    # NOTE: pixel_size=16, bands=3 means (16, 16, 3) image dimension.
    self.pixel_size = 16
    self.build_wildfires()
    self.build_images()

  def __str__(self):
    messages = [
      f'Satellite: {self.name}',
      f'Bands: {self.bands}',
      f'Wildfires: {len(self.wildfires)}',
    ]
    return '\n'.join(messages)

  def build_wildfires(self):
    self.wildfires = self.maryland.wildfires
    for wildfire in self.wildfires:
      wildfire['name'] = wildfire['filename']
      start_ee_date = ee.Date(wildfire['start'])
      end_ee_date = start_ee_date.advance(1, 'day')
      wildfire['date_range'] = ee.DateRange(start_ee_date, end_ee_date)

  def build_images(self):
    for wildfire in self.wildfires:
      self.build_image(wildfire)

  def build_image(self, wildfire):
    area_of_interest = self.get_area_of_interest(wildfire)
    image_collection = (
        ee.ImageCollection(self.name)
        .filterDate(wildfire['date_range'])
        .filterBounds(area_of_interest)
        .select(self.bands)
    )
    image = image_collection.max().clip(area_of_interest)
    wildfire['image'] = image

  def get_area_of_interest(self, wildfire):
    lonlat = self.get_location(wildfire)
    point = ee.Geometry.Point(lonlat)
    num_pixels = self.pixel_size  # rectangle image = (num_pixel, num_pixels)
    meters_per_pixel = 1000  # satellite resolution.
    radius_meters = meters_per_pixel * num_pixels / 2
    circular_area = point.buffer(radius_meters, 1)
    rectangular_area = circular_area.bounds(1)
    return rectangular_area

  def get_location(self, wildfire):
    return wildfire['lon'], wildfire['lat']

  def get_band_index(self, name):
    for i, band in enumerate(self.bands):
      if name == band:
        return i
    return None

  def download_wildfire_with_retry(self, wildfire, retries=3):
    timeout = 2  # sec.
    for i in range(retries):
      try:
        image_np = self.download_wildfire(wildfire)
        if image_np is not None:
          return image_np
      except:
        print(f'Retry {self.debug_wildfire(wildfire)}. sleep {timeout} sec.')
        time.sleep(timeout)
    return None

  def download_wildfire(self, wildfire):
    image = wildfire['image']
    area_of_interest = self.get_area_of_interest(wildfire)
    image_np = self.download_image(image, area_of_interest)
    if image_np is not None:
      image_np = structured_to_unstructured(image_np)
    return image_np

  def download_image(self, image, area_of_interest):
    # image dimension: (pixel_size, pixel_size, num_bands).
    num_pixels = self.pixel_size
    params = {
      'bands': self.bands,
      "region": area_of_interest,
      "dimensions": [num_pixels, num_pixels],
      "format": "NPY",
    }
    url = image.getDownloadURL(params)

    response = requests.get(url)
    if response.status_code == 429:
      raise exceptions.TooManyRequests(response.text)
    if response.ok:
      return np.load(io.BytesIO(response.content), allow_pickle=True)
    return None

  def debug_wildfire(self, wildfire):
    short = {}
    for key in ['lon', 'lat', 'start', 'name']:
      short[key] = wildfire.get(key, 'n/a')
    return short


satellite = Satellite()
print(satellite)

Satellite: MODIS/061/MOD14A1
Bands: ['MaxFRP', 'FireMask', 'QA']
Wildfires: 100


# 🗄 Dataset Download



In [None]:
#@title 📑 Build Dataset
class DatasetBuilder():

  def __init__(self):
    self.satellite = satellite
    self.build_wildfires()
    # Divide wildfires into small groups to avoid colab timeout during download.
    self.build_group_wildfires()
    self.build_paths()
    self.create_directories(cleanup=False)
    # It checks the drive directory first and already downloaded groups are
    # pruned from the download list.
    self.build_pending_group_downloads()

  def __str__(self):
    messages = [
      f'DatasetBuilder num Wildfires: {len(self.wildfires)}',
      f'Num train wildfires: {len(self.train_wildfires)}',
      f'Num test wildfires: {len(self.test_wildfires)}',
      f'Num group train wildfires: {len(self.group_train_wildfires)}',
      f'Num group test wildfires: {len(self.group_test_wildfires)}',
      f'Train dataset path: {self.train_path}',
      f'Test dataset path: {self.test_path}',
    ]
    return '\n'.join(messages)

  def build_wildfires(self):
    self.wildfires = self.satellite.wildfires
    self.split_wildfires()

  def split_wildfires(self):
    self.train_wildfires = []
    self.test_wildfires = []
    train_test_max_mods = 5
    train_max_mods = 4  # total:train:test = 5:4:1.
    for i, wildfire in enumerate(self.wildfires):
      if i % train_test_max_mods < train_max_mods:
        self.train_wildfires.append(wildfire)
      else:
        self.test_wildfires.append(wildfire)
    # The test set should not be empty, add at least one entry.
    if not self.test_wildfires and self.train_wildfires:
      self.test_wildfires.append(self.train_wildfires[0])

  def build_group_wildfires(self):
    # small groups so that beam could download files without timeout.
    self.group_train_wildfires = self.group_wildfires(self.train_wildfires)
    self.group_test_wildfires = self.group_wildfires(self.test_wildfires)
    print(f'groups: {self.group_train_wildfires.keys()}')

  def group_wildfires(self, wildfires):
    group_size = 10
    groups = {}
    for i, wildfire in enumerate(wildfires):
      group_id = int(i / group_size)
      group_dir = f'{group_id:05}'
      if group_dir not in groups:
        groups[group_dir] = []
      groups[group_dir].append(wildfire)
    return groups

  def build_paths(self):
    self.base_directory = wildfire_drive_path
    self.dataset_directory = 'dataset'
    self.dataset_path = os.path.join(
      self.base_directory, self.dataset_directory)
    self.train_path = os.path.join(self.dataset_path, 'train')
    self.test_path = os.path.join(self.dataset_path, 'test')

  def build_pending_group_downloads(self):
    self.group_train_downloads = self.group_downloads(
        self.train_path, self.group_train_wildfires)
    self.group_test_downloads = self.group_downloads(
        self.test_path, self.group_test_wildfires)

  def group_downloads(self, dataset_path, group_wildfires):
    downloads = []
    print(f'Lookup existing groups in: {dataset_path}')
    with pysftp.cd(dataset_path):
      existing_groups = os.listdir()
      print(f'Num existing groups: {len(existing_groups)}')
      for group_id in group_wildfires.keys():
        if group_id not in existing_groups:
          downloads.append(group_id)
    print(f'Num groups to download: {len(downloads)}')
    return downloads

  def create_directories(self, cleanup=False):
    if cleanup:
      shutil.rmtree(self.train_path, ignore_errors=True)
      shutil.rmtree(self.test_path, ignore_errors=True)
    os.makedirs(self.dataset_path, exist_ok=True)
    os.makedirs(self.train_path, exist_ok=True)
    os.makedirs(self.test_path, exist_ok=True)

  def download_wildfires(self):
    self.build_pending_group_downloads()
    for group_id in self.group_train_downloads:
      wildfires = self.group_train_wildfires[group_id]
      dataset_path = self.train_path
      self.download_group_wildfires(dataset_path, group_id, wildfires)
    for group_id in self.group_test_downloads:
      wildfires = self.group_test_wildfires[group_id]
      dataset_path = self.test_path
      self.download_group_wildfires(dataset_path, group_id, wildfires)

  def download_group_wildfires(self, dataset_path, group_id, wildfires):
    download_path = os.path.join(dataset_path, group_id, 'part')
    print(f'Download {download_path}')
    with beam.Pipeline() as pipeline:
      (
        pipeline
        | 'List wildfires' >> beam.Create(wildfires)
        | 'Serialize examples' >> beam.FlatMap(
              self.flatmap_serialized_tf_example)
        | 'Write TFRecords' >> beam.io.WriteToTFRecord(
              download_path, file_name_suffix='.tfrecord.gz')
      )

  def flatmap_serialized_tf_example(self, wildfire):
    example_bytes = self.get_serialized_tf_example(wildfire)
    if example_bytes is None:
      return []
    return [example_bytes]

  def get_serialized_tf_example(self, wildfire):
    input_np, label_np = self.get_input_and_label(wildfire)
    if input_np is None:
      return None
    example_bytes = self.serialize(wildfire, input_np, label_np)
    return example_bytes

  def get_input_and_label(self, wildfire):
    image_np = self.satellite.download_wildfire_with_retry(wildfire)
    if image_np is None:
      return None, None
    # MaxFRP and QA bands are part of the input.
    input_np = self.get_input(image_np)
    # FireMask band is the label.
    label_np = self.get_label(image_np)
    return input_np, label_np

  def get_input(self, image_np):
    input_np = image_np.copy()
    fire_mask_index = self.satellite.get_band_index('FireMask')
    input_np[:, :, fire_mask_index] = 0
    return input_np

  def get_label(self, image_np):
    # (16, 16, 3) shape pixel image will return (16, 16, 1) shape label.
    shape = list(image_np.shape)
    shape[-1] = 1
    label_np = np.ndarray(shape, int)
    label_np.fill(0)
    fire_mask_index = self.satellite.get_band_index('FireMask')
    label_np[:, :, 0] = image_np[:, :, fire_mask_index]
    label_np = self.clear_non_fire_labels(label_np)
    return label_np

  def clear_non_fire_labels(self, label_np):
    # Fire values in the label are kept as-is, others are replaced with 0.
    min_fire_label = self.satellite.low_confidence_fire_mask
    masked_np = np.ma.masked_less(label_np, min_fire_label)
    max_fire_label = self.satellite.high_confidence_fire_mask
    masked_np = np.ma.masked_greater(masked_np, max_fire_label)
    cleared_np = masked_np.filled(0)
    return cleared_np

  def serialize(self, wildfire, input_np, label_np):
    input_bl = tf.train.BytesList(
        value=[tf.io.serialize_tensor(input_np).numpy()])
    label_bl = tf.train.BytesList(
        value=[tf.io.serialize_tensor(label_np).numpy()])
    features = {
      'inputs':  tf.train.Feature(bytes_list=input_bl),
      'labels':  tf.train.Feature(bytes_list=label_bl),
    }
    example = tf.train.Example(features=tf.train.Features(feature=features))
    return example.SerializeToString()


dataset_builder = DatasetBuilder()
print(dataset_builder)

groups: dict_keys(['00000', '00001', '00002', '00003', '00004', '00005', '00006', '00007'])
Lookup existing groups in: /content/drive/MyDrive/wildfire_detection/dataset/train
Num existing groups: 0
Num groups to download: 8
Lookup existing groups in: /content/drive/MyDrive/wildfire_detection/dataset/test
Num existing groups: 0
Num groups to download: 2
DatasetBuilder num Wildfires: 100
Num train wildfires: 80
Num test wildfires: 20
Num group train wildfires: 8
Num group test wildfires: 2
Train dataset path: /content/drive/MyDrive/wildfire_detection/dataset/train
Test dataset path: /content/drive/MyDrive/wildfire_detection/dataset/test


In [None]:
#@title 🗃 Beam up
dataset_builder.download_wildfires()

Lookup existing groups in: /content/drive/MyDrive/wildfire_detection/dataset/train
Num existing groups: 8
Num groups to download: 0
Lookup existing groups in: /content/drive/MyDrive/wildfire_detection/dataset/test
Num existing groups: 2
Num groups to download: 0
