In [None]:
import enum
import math
import os
import random
from typing import List, Text, Dict
from google.colab import auth
from google.api_core import retry
from IPython.display import Image
from matplotlib import pyplot as plt
from numpy.lib import recfunctions as rfn
import ee
import google
import io
import numpy as np
import requests
import tensorflow as tf
import ee
from datetime import datetime, timedelta

import geemap
random.seed(123)

In [None]:
ee.Authenticate()
ee.Initialize(project='wildfire-lab')

In [None]:
import pandas as pd
fires = pd.read_csv("request_data.csv", index_col=0, parse_dates=[1, 2])
fire = fires[["4416704" in fire for fire in fires.fire_id]].iloc[0]
fire.head()

  fires = pd.read_csv("request_data.csv", index_col=0, parse_dates=[1, 2])


fire_id       2016_4416704_PIONEER
start_date     2016-07-18 19:00:00
end_date       2016-08-07 19:00:00
poo_lat                      43.95
poo_lon                    -115.76
Name: 2444, dtype: object

In [None]:
fires['id'] = fires["fire_id"].apply(lambda x: x.split("_")[1] )

In [None]:
band_names = [
        'pr',
        'sph',
        'th',
        'tmmn',
        'tmmx',
        'vs',
        'erc',
    ]
weather = ee.ImageCollection("IDAHO_EPSCOR/GRIDMET").select(band_names)
band_names = weather.first().bandNames().getInfo() #in case order changes

In [None]:
PATCH_SIZE = 64000
lag = 5
window_start = ee.Date(fire['start_date'].strftime("%Y-%m-%d"))
window = 21
window_end = window_start.advance(window, "days")
# center_point = fire[['poo_lon', 'poo_lat']].values
# geom_point = ee.Geometry.Point(*center_point)
# bbox = geom_point.buffer(PATCH_SIZE).bounds()
#weather = images.filterDate(window_start.advance(-lag, "days"), window_end).filterBounds(bbox)
# RADIUS = 64000//2
# geometry = geom_point.buffer(RADIUS).bounds()
# fire_weather = weather.filterDate(window_start.advance(-lag, "days"), window_end).map(lambda x: x.clipToBoundsAndScale(geometry, 64, 64)).toBands()

In [None]:
from itertools import product

df = fires
dates = ["%.2d" % i for i in range(-lag, window)]
output_band_names = {id: [f"{id}_{date}_{band}" for date, band in product(dates, band_names)] for id in df['id'].values}

def get_image(fire, dates, patch_size=PATCH_SIZE):
  # date filter
  window_start = ee.Date(fire['start_date'].strftime("%Y-%m-%d"))
  window_end = window_start.advance(window, "days")

  # space filter
  center_point = fire[['poo_lon', 'poo_lat']].values
  radius = PATCH_SIZE//2
  geom_point = ee.Geometry.Point(*center_point)
  geometry = geom_point.buffer(radius).bounds()

  # bands names
  id = fire['id']
  flatten_band_names = [f"{date}_{band}" for date, band in product(dates, band_names)]
  fire_weather = (weather
    .filterDate(window_start.advance(-lag, "days"), window_end)
    .map(lambda x: x.clipToBoundsAndScale(geometry, 70, 70))
    .toBands()
    .rename(flatten_band_names))
  return fire_weather

#get_image(df.iloc[0], dates)

## Quick verif
band_names[lag*#bands)-1] = xxxx_start_date___

In [None]:
import itertools
images = []

for idx, fire in df.iterrows(): #non vectorized
  images.append(get_image(fire, dates))

projection = ee.Projection('EPSG:4326')
resampling_scale = 1000
fires_images = ee.Image.cat(images)#.reproject(projection.atScale(resampling_scale)).resample('bicubic')


final_bands = fires_images.bandNames().getInfo()
concatenated_list = list(itertools.chain.from_iterable(output_band_names.values()))
assert concatenated_list == final_bands

KeyboardInterrupt: 

In [None]:
len(fires)

5783

## Export batch

In [None]:
import datetime
now = datetime.datetime.now()
str(now.date())

'2024-04-22'

In [None]:
bands = {}

In [None]:
bucket = "wildfire-lab"
folder = "batch_export_0422"
row = df.iloc[0]
name = row['fire_id']

In [None]:
description = name #{i}{now}" # :02d}
file_format = "TFRecord"
image_export_options = {
  'patchDimensions': [64, 64],
  'compressed': True
}
fire_image = get_image(row, dates)
bands[0] = fire_image.bandNames().getInfo()
task = ee.batch.Export.image.toCloudStorage(
      image=fire_image,
      description=description,
      bucket=bucket,
      scale=1000,
      fileNamePrefix=os.path.join(folder, description),
      fileFormat=file_format,
      formatOptions=image_export_options)
task.start()

In [None]:
actual_fires = pd.read_csv("actual_fires.csv", index_col=0)

In [None]:
actual_fires = actual_fires.fire_id.unique()

In [None]:
Map = geemap.Map()
fire = fires.iloc[1]
dates = ["%.2d" % i for i in range(-lag, window)]
img = get_image(fire, dates)
vis_2mt = {
    'min': 250,
    'max': 320,
    'palette': [
        '000080',
        '0000d9',
        '4000ff',
        '8000ff',
        '0080ff',
        '00ffff',
        '00ff80',
        '80ff00',
        'daff00',
        'ffff00',
        'fff500',
        'ffda00',
        'ffb000',
        'ffa400',
        'ff4f00',
        'ff2500',
        'ff0a00',
        'ff00ff',
    ],
}

Map.setCenter(fire['poo_lon'], fire['poo_lat'], 9)
band2 = img.select('00_th')
Map.addLayer(img.select('00_tmmx'), vis_2mt)

Map

NameError: name 'get_image' is not defined

In [None]:
from tqdm import tqdm
import re
df = fires.iloc[3500+85:]
bucket = "wildfire-lab"
folder = "batch_export_0422_test_3"
end = 5
missed_fires = []

for i, row in tqdm(df.iterrows(), total=df.shape[0]):
  description = row['fire_id']
  if description in actual_fires:
    pattern = r'[^a-zA-Z0-9.,:_;-]'
    cleaned_desc = re.sub(pattern, '', description)
    file_format = "TFRecord"
    image_export_options = {
      'patchDimensions': [64, 64],
      'compressed': True
    }
    fire_image = get_image(row, dates)
    task = ee.batch.Export.image.toCloudStorage(
          image=fire_image,
          description=cleaned_desc,
          bucket=bucket,
          scale=1000,
          fileNamePrefix=os.path.join(folder, cleaned_desc),
          fileFormat=file_format,
          formatOptions=image_export_options)
    task.start()
  else:
    missed_fires.append(row['fire_id'])

100%|██████████| 2198/2198 [11:32<00:00,  3.18it/s]


## Tensorflow

In [None]:
fire_nb = 11
dataset_path = f"gs://{bucket}/{folder}/fire{fire_nb}.tfrecord.gz"#"gs://" + os.path.join(bucket, folder, description+".tfrecord.gz")
train_dataset = tf.data.TFRecordDataset(dataset_path, compression_type='GZIP')
final_bands = bands[fire_nb]
columns = [tf.io.FixedLenFeature(shape=[64, 64], dtype=tf.float32)]*len(final_bands)
features_dict = dict(zip(final_bands, columns))

In [None]:
import pickle
FINAL_BANDS = ["_".join(name.split("_")[1:]) for name in final_bands]
columns = [tf.io.FixedLenFeature(shape=[64, 64], dtype=tf.float32)]*len(FINAL_BANDS)
features = dict(zip(FINAL_BANDS, columns))
with open("band_features.pkl", "wb") as f:
  pickle.dump(features, f)

In [None]:
def parse_tfrecord(example_proto, features_dict):
  parsed_features = tf.io.parse_single_example(example_proto, features_dict)
  return parsed_features
parsed_dataset = train_dataset.map(lambda x: parse_tfrecord(x, features_dict), num_parallel_calls=5)

In [None]:
example = next(iter(parsed_dataset))
id = final_bands[fire_nb].split('_')[0]

In [None]:
plt.set_cmap("coolwarm")
fig, ax = plt.subplots(1+len(band_names), 1+len(dates), figsize=(26,8))
ax[0, 0].set_axis_off()
ax[0, 0].set_title(id)
for j, day in enumerate(dates):
  ax[0, j+1].set_axis_off()
  ax[0, j+1].set_title(day)
for i, band in enumerate(band_names):
    ax[i+1, 0].set_axis_off()
    ax[i+1, 0].set_title(band)
    for j, day in enumerate(dates):
      image = example[f"{id}_{day}_{band}"].numpy()
      ax[i+1, j+1].imshow(image)
      ax[i+1, j+1].set_axis_off()

## Create dataset

In [None]:
from google.cloud import storage

def get_fire_paths_gc(bucket, folder):
  client = storage.Client()
  blobs = client.list_blobs(bucket, prefix=folder)
  paths = {}
  for blob in blobs:
    fire_nb = blob.name.split('/')[-1].split('.')[0].split('fire')[-1]
    path = f"gs://{bucket}/{folder}/fire{fire_nb}.tfrecord.gz"
    paths[fire_nb] = path
  return paths

tfrecord_paths = get_fire_paths_gc(bucket, folder)

In [None]:
filenames = list(tfrecord_paths.values())
dataset = tf.data.TFRecordDataset(filenames, compression_type='GZIP')

In [None]:
FINAL_BANDS = ["_".join(name.split("_")[1:]) for name in final_bands]
columns = [tf.io.FixedLenFeature(shape=[64, 64], dtype=tf.float32)]*len(FINAL_BANDS)
features = dict(zip(FINAL_BANDS, columns))
def parse_tfrecord(example_proto, features_dict):
  parsed_features = tf.io.parse_single_example(example_proto, features_dict)
  return parsed_features
parsed_dataset = dataset.map(lambda x: parse_tfrecord(x, features), num_parallel_calls=5)

Tensor("args_0:0", shape=(), dtype=string)


In [None]:
next(iter(parsed_dataset))

InvalidArgumentError: {{function_node __wrapped__IteratorGetNext_output_types_182_device_/job:localhost/replica:0/task:0/device:CPU:0}} Feature: -01_erc (data type: float) is required but could not be found.
	 [[{{node ParseSingleExample/ParseExample/ParseExampleV2}}]] [Op:IteratorGetNext] name: 

## Graveyard

In [None]:
tfrecord_path = [dataset_path]

In [None]:
dataset = tf.data.Dataset.list_files(tfrecord_path)

In [None]:
def parse_tfrecord(example_proto):
  print(example_proto)
  parsed_features = tf.io.parse_single_example(example_proto, features_dict)
  return parsed_features
dataset = tf.data.Dataset.from_tensor_slices(tfrecord_path)

In [None]:
dataset.map(parse_tfrecord);

Tensor("args_0:0", shape=(), dtype=string)


In [None]:
FINAL_BANDS = ["_".join(name.split("_")[1:]) for name in final_bands]
columns = [tf.io.FixedLenFeature(shape=[64, 64], dtype=tf.float32)]*len(FINAL_BANDS)
features = dict(zip(FINAL_BANDS, columns))
def parse_function(filename):
  """Parses a single TFRecord file and returns a 3D tensor representing the fire."""
  columns = [tf.io.FixedLenFeature(shape=[64, 64], dtype=tf.float32)]*len(FINAL_BANDS)
  features = dict(zip(FINAL_BANDS, columns))
  dataset = tf.data.TFRecordDataset(filename)
  dataset = dataset.map(lambda x: tf.io.parse_single_example(x, features))
  return dataset

In [None]:
dataset = tf.data.Dataset.from_tensor_slices(tfrecord_path)
dataset = dataset.map(parse_function)

<_MapDataset element_spec=DatasetSpec({'-01_erc': TensorSpec(shape=(64, 64), dtype=tf.float32, name=None), '-01_pr': TensorSpec(shape=(64, 64), dtype=tf.float32, name=None), '-01_sph': TensorSpec(shape=(64, 64), dtype=tf.float32, name=None), '-01_th': TensorSpec(shape=(64, 64), dtype=tf.float32, name=None), '-01_tmmn': TensorSpec(shape=(64, 64), dtype=tf.float32, name=None), '-01_tmmx': TensorSpec(shape=(64, 64), dtype=tf.float32, name=None), '-01_vs': TensorSpec(shape=(64, 64), dtype=tf.float32, name=None), '-02_erc': TensorSpec(shape=(64, 64), dtype=tf.float32, name=None), '-02_pr': TensorSpec(shape=(64, 64), dtype=tf.float32, name=None), '-02_sph': TensorSpec(shape=(64, 64), dtype=tf.float32, name=None), '-02_th': TensorSpec(shape=(64, 64), dtype=tf.float32, name=None), '-02_tmmn': TensorSpec(shape=(64, 64), dtype=tf.float32, name=None), '-02_tmmx': TensorSpec(shape=(64, 64), dtype=tf.float32, name=None), '-02_vs': TensorSpec(shape=(64, 64), dtype=tf.float32, name=None), '-03_erc': 

In [None]:
import tensorflow as tf

def fire_dataset(filenames):
  """Creates a TensorFlow dataset that represents each fire in a 3D tensor.

  Args:
    filenames: A list of file paths to the TFRecord files.

  Returns:
    A TensorFlow dataset where each element is a 3D tensor representing a fire.
  """

  def parse_function(filename):
    """Parses a single TFRecord file and returns a 3D tensor representing the fire."""
    columns = [tf.io.FixedLenFeature(shape=[64, 64], dtype=tf.float32)]*len(FINAL_BANDS)
    features = dict(zip(FINAL_BANDS, columns))
    dataset = tf.data.TFRecordDataset(filename)
    dataset = dataset.map(lambda x: tf.io.parse_single_example(x, features))


    def _group_by_fire(dataset):
      def _batch_and_stack(dataset):
        dataset = dataset.batch(26 * 7)
        return tf.reshape(dataset, (26, 7, 224, 224))

      return dataset.group_by_window(
          key_func=lambda x: tf.strings.split(x['filename'], sep='_')[1:-1],
          reduce_func=_batch_and_stack,
          window_size=26 * 7)

    return dataset.flat_map(_group_by_fire)

  # Create the dataset from the list of filenames.
  dataset = tf.data.Dataset.from_tensor_slices(filenames)
  print(filenames)
  return dataset.flat_map(parse_function)


dataset = fire_dataset(tfrecord_path)



['gs://wildfire-lab/batch_export_test02_0503/fire11.tfrecord.gz']


NameError: in user code:

    File "<ipython-input-215-f4b8f5ea98b4>", line 23, in parse_function  *
        return dataset.flat_map(_group_by_fire)

    NameError: name '_group_by_fire' is not defined


In [None]:
# Iterate over the dataset and access the 3D tensors for each fire.
for fire in dataset:
  # Process the 3D tensor representing the fire here.
  print(fire.shape)

In [None]:
def _parse_function(serialized_example):
  features = tf.io.parse_single_example(
      serialized_example,
      features={
          "image": tf.io.FixedLenFeature([64, 64], tf.float32),
          "label_name": tf.io.FixedLenFeature([], tf.string),  # Replace with your label feature name
      })
  images = features["image"]
  labels = features["label_name"]

  # Reshape each image to have a dimension of 1 (batch dimension)
  images = tf.expand_dims(images, axis=0)

  # Stack the reshaped images to create a 3D tensor (182, 64, 64)
  images = tf.stack(images, axis=0)

  return images, labels

In [None]:
dataset = dataset.map(_parse_function)

prefetch_buffer_size = 10
dataset = dataset.prefetch(buffer_size=prefetch_buffer_size)

In [None]:
dataset = tf.data.Dataset.list_files(tfrecord_paths.values)

# Read the TFRecord.gzip files
dataset = dataset.flat_map(tf.data.TFRecordDataset)

# Parse the serialized examples

ValueError: Attempt to convert a value (<built-in method values of dict object at 0x79ab12bd17c0>) with an unsupported type (<class 'builtin_function_or_method'>) to a Tensor.

In [None]:
lengths = []
for ex in iter(parsed_dataset):
  lengths.append(np.array([ex[key] for key in ex.keys()]).sum())
lengths = np.array(lengths)
lengths.mean()

45993824.0

In [None]:
lengths

array([45993824.], dtype=float32)

In [None]:

for ex in iter(parsed_dataset):
  sum_ = np.array([ex[key] for key in ex.keys()]).sum()
  if sum_ >= 1:
    example = ex
    print(sum_)
    break

45987476.0


In [None]:
86972560

86972560

In [None]:
np.array([example[key] for key in example.keys()]).sum()

45987476.0

In [None]:
file_pattern = "gs://" + os.path.join(bucket, folder) + "/batch1*"
tf_dataset = tf.data.Dataset.list_files(file_pattern, shuffle=False)
tf_dataset = tf_dataset.interleave(
    lambda x: tf.data.TFRecordDataset(x, compression_type="GZIP"),
    num_parallel_calls=tf.data.experimental.AUTOTUNE)
tf_dataset = tf_dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
tf_dataset = tf_dataset.map(parsed_dataset,
    num_parallel_calls=tf.data.experimental.AUTOTUNE)
tf_dataset = tf_dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

In [None]:
next(iter(tf_dataset))

In [None]:
def _get_base_key(key: Text) -> Text:
  """Extracts the base key from the provided key.

  Earth Engine exports TFRecords containing each data variable with its
  corresponding variable name. In the case of time sequences, the name of the
  data variable is of the form 'variable_1', 'variable_2', ..., 'variable_n',
  where 'variable' is the name of the variable, and n the number of elements
  in the time sequence. Extracting the base key ensures that each step of the
  time sequence goes through the same normalization steps.
  The base key obeys the following naming pattern: '([a-zA-Z]+)'
  For instance, for an input key 'variable_1', this function returns 'variable'.
  For an input key 'variable', this function simply returns 'variable'.

  Args:
    key: Input key.

  Returns:
    The corresponding base key.

  Raises:
    ValueError when `key` does not match the expected pattern.
  """
  match = re.match(r'([a-zA-Z]+)', key)
  if match:
    return match.group(1)
  raise ValueError(
      'The provided key does not match the expected pattern: {}'.format(key))