# Training a simple CNN model in Tensorflow for Tornado Detection

This notebook steps through how to train a simple CNN model using a subset of TorNet.

This will not produce a model with any skill, but simply provides a working end-to-end example of how to set up a data loader, build, and fit a model


In [1]:
import sys
# Uncomment if tornet isn't installed in your environment or in your path already
#sys.path.append('../')  

import os
import glob
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt

from tornet.data.tf.loader import create_tf_dataset 
from tornet.data.constants import ALL_VARIABLES
import tornet.data.preprocess as pp
from tornet.data import preprocess as tfpp
import keras
from tornet.data.constants import CHANNEL_MIN_MAX
import tornet.metrics.keras.metrics as km

In [2]:
def classify_region(row):
    tornado_alley_bounds = {
    'lat_min': 32.0,
    'lat_max': 43.0,
    'lon_min': -102.0,
    'lon_max': -94.0
}

    dixie_alley_bounds = {
        'lat_min': 30.0,
        'lat_max': 38.0,
        'lon_min': -94.0,
        'lon_max': -83.0
    }
    if (
        tornado_alley_bounds['lat_min'] <= row['lat'] <= tornado_alley_bounds['lat_max'] and
        tornado_alley_bounds['lon_min'] <= row['lon'] <= tornado_alley_bounds['lon_max']
    ):
        return 2  # Tornado Alley
    elif (
        dixie_alley_bounds['lat_min'] <= row['lat'] <= dixie_alley_bounds['lat_max'] and
        dixie_alley_bounds['lon_min'] <= row['lon'] <= dixie_alley_bounds['lon_max']
    ):
        return 1  # Dixie Alley
    else:
        return 0  # None

In [3]:
import os
import pandas as pd

def load_dataset(data_root, data_type, years, variables, random_state=1234):
    """
    Load and prepare the dataset from the given catalog and file list.

    Args:
        data_root (str): Path to the root directory containing the catalog and data files.
        data_type (str): Type of data to load (e.g., 'train' or 'test').
        years (list): List of years to filter the catalog.
        variables (list): List of variables for the dataset.
        random_state (int): Seed for shuffling.

    Returns:
        tf.data.Dataset: TensorFlow dataset prepared from the file list.
    """
    catalog_path = os.path.join(data_root, 'catalog.csv')
    if not os.path.exists(catalog_path):
        raise RuntimeError(f'Unable to find catalog.csv at {data_root}')

    catalog = pd.read_csv(catalog_path, parse_dates=['start_time', 'end_time'])
    catalog = catalog[catalog['type'] == data_type]
    catalog = catalog[catalog['category'] != "NUL"]
    catalog = catalog[catalog.start_time.dt.year.isin(years)]
    catalog['region'] = catalog.apply(classify_region, axis=1)
    # catalog = catalog[catalog['region'] == 0]
    catalog = catalog.sample(frac=1, random_state=random_state)

    file_list = [os.path.join(data_root, f) for f in catalog.filename]
    return create_tf_dataset(file_list, variables=variables)

# Location of tornet
data_root = "C:/Users/mjhig/tornet_2013"

# # Get training dataset
years = [2013]
#, 2014, 2015, 2016, 2017, 2018]
ds = load_dataset(data_root, data_type='train', years=years, variables=ALL_VARIABLES)
ds = ds.map(lambda d: pp.add_coordinates(d, include_az=False, backend=tf))
ds = ds.map(pp.remove_time_dim)
ds = ds.map(tfpp.split_x_y)
ds = ds.prefetch(tf.data.AUTOTUNE)
ds = ds.batch(32)
# Get test dataset
ds_test = load_dataset(data_root, data_type='test', years=years, variables=ALL_VARIABLES)
# preprocess
ds_test = ds_test.map(lambda d: pp.add_coordinates(d,include_az=False,backend=tf))
ds_test = ds_test.map(pp.remove_time_dim)
ds_test = ds_test.map(tfpp.split_x_y)
ds_test = ds_test.prefetch(tf.data.AUTOTUNE)
ds_test = ds_test.batch(32)



import os
import pandas as pd
import tensorflow as tf
from collections import Counter
def load_dataset(data_root, data_type, years, variables, random_state=1234):
    """
    Load and prepare the dataset from the given catalog and file list.

    Args:
        data_root (str): Path to the root directory containing the catalog and data files.
        data_type (str): Type of data to load (e.g., 'train' or 'test').
        years (list): List of years to filter the catalog.
        variables (list): List of variables for the dataset.
        random_state (int): Seed for shuffling.

    Returns:
        tf.data.Dataset: TensorFlow dataset prepared from the file list.
    """
    catalog_path = os.path.join(data_root, 'catalog.csv')
    if not os.path.exists(catalog_path):
        raise RuntimeError(f'Unable to find catalog.csv at {data_root}')

    catalog = pd.read_csv(catalog_path, parse_dates=['start_time', 'end_time'])
    catalog = catalog[catalog['category'] == 'TOR']
    catalog = catalog[catalog['type'] == data_type]
    catalog = catalog[catalog.start_time.dt.year.isin(years)]
    catalog['region'] = catalog.apply(classify_region, axis=1)
    catalog=catalog[catalog.region != 0]
    catalog = catalog.sample(frac=1, random_state=random_state)

    file_list = [os.path.join(data_root, f) for f in catalog.filename]
    return create_tf_dataset(file_list, variables=variables)


def new_x_y(d):
    """
    Splits dict into X,y, where y are tornado labels
    """
    print(d['coordinates'])
    y=classify_region(d)
    return d,y


# Location of tornet
data_root = "C:/Users/mjhig/tornet_2013"

# Define years and variables
years = [2013, 2014, 2015]

# Get training dataset
ds = load_dataset(data_root, data_type='train', years=years, variables=ALL_VARIABLES)
ds = ds.map(lambda d: pp.add_coordinates(d, include_az=False, backend=tf))
ds = ds.map(pp.remove_time_dim)

def classify_region(row):
    tornado_alley_bounds = {
    'lat_min': 32.0,
    'lat_max': 43.0,
    'lon_min': -102.0,
    'lon_max': -94.0
}

    dixie_alley_bounds = {
        'lat_min': 30.0,
        'lat_max': 38.0,
        'lon_min': -94.0,
        'lon_max': -83.0
    }
    if (
        tornado_alley_bounds['lat_min'] <= row['lat'] <= tornado_alley_bounds['lat_max'] and
        tornado_alley_bounds['lon_min'] <= row['lon'] <= tornado_alley_bounds['lon_max']
    ):
        return 2  # Tornado Alley
    elif (
        dixie_alley_bounds['lat_min'] <= row['lat'] <= dixie_alley_bounds['lat_max'] and
        dixie_alley_bounds['lon_min'] <= row['lon'] <= dixie_alley_bounds['lon_max']
    ):
        return 1  # Dixie Alley
    else:
        return 0  # None

def new_x_y(d):
    """
    Splits dict into X,y, where y are tornado labels
    """
    print(d['coordinates'])

ds = ds.map(new_x_y)
# ds = ds.prefetch(tf.data.AUTOTUNE)
# ds = ds.batch(32)



# Get test dataset
ds_test = load_dataset(data_root, data_type='test', years=years, variables=ALL_VARIABLES)
ds_test = ds_test.map(lambda d: pp.add_coordinates(d,include_az=False,backend=tf))
ds_test = ds_test.map(pp.remove_time_dim)
ds_test = ds_test.map(tfpp.split_x_y)
ds_test = filter_positive_tornado_cases(ds_test)  # Filter by positive tornado cases
ds_test = set_target_to_region(ds_test)          # Set the target to 'region'
ds_test = ds_test.prefetch(tf.data.AUTOTUNE)
ds_test = ds_test.batch(32)

import os
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow import keras


def create_model(architecture):
    """Create a CNN model with different architectures."""
    inputs = {v: keras.Input(shape=(120, 240, 2), name=v) for v in ALL_VARIABLES}
    norm_layers = []
    for v in ALL_VARIABLES:
        min_max = np.array(CHANNEL_MIN_MAX[v])
        var = ((min_max[1] - min_max[0]) / 2) ** 2
        var = np.array(2 * [var])
        offset = (min_max[0] + min_max[1]) / 2
        offset = np.array(2 * [offset])
        norm_layers.append(
            keras.layers.Normalization(mean=offset, variance=var, name=f'Normalized_{v}')
        )
    x = keras.layers.Concatenate(axis=-1, name='Concatenate1')(
        [l(inputs[v]) for l, v in zip(norm_layers, ALL_VARIABLES)]
    )
    x = keras.layers.Lambda(lambda x: tf.where(tf.math.is_nan(x), -3.0, x), name='ReplaceNan')(x)
    
    if architecture == "baseline":
        x = keras.layers.Conv2D(32, 3, padding='same', activation='relu')(x)
        x = keras.layers.Conv2D(1, 1, padding='same', activation='relu', name='TornadoLikelihood')(x)
    elif architecture == "deeper":
        x = keras.layers.Conv2D(32, 3, padding='same', activation='relu')(x)
        x = keras.layers.Conv2D(64, 3, padding='same', activation='relu')(x)
        x = keras.layers.Conv2D(1, 1, padding='same', activation='relu', name='TornadoLikelihood')(x)
    elif architecture == "residual":
        x = keras.layers.Conv2D(64, 3, padding='same', activation='relu')(x)
        x= keras.layers.Conv2D(32, 3, padding='same', activation='relu')(x)
        x = keras.layers.Conv2D(1, 1, padding='same', activation='relu', name='TornadoLikelihood')(x)
    elif architecture == "wide":
        x = keras.layers.Conv2D(64, 3, padding='same', activation='relu')(x)
        x = keras.layers.Conv2D(64, 3, padding='same', activation='relu')(x)
        x = keras.layers.Conv2D(1, 1, padding='same', activation='relu', name='TornadoLikelihood')(x)
    elif architecture == "dropout":
        x = keras.layers.Conv2D(32, 3, padding='same', activation='relu')(x)
        x = keras.layers.Dropout(0.5)(x)
        x = keras.layers.Conv2D(1, 1, padding='same', activation='relu', name='TornadoLikelihood')(x)
    else:
        raise ValueError("Unknown architecture type")
    
    y = keras.layers.GlobalMaxPool2D(name='GlobalMaxPool')(x)
    return keras.Model(inputs=inputs, outputs=y, name=f'TornadoDetector_{architecture}')

# Train and evaluate different models
architectures = ["baseline", "deeper", "residual", "wide", "dropout"]

model = create_model("baseline")
model.compile(optimizer=keras.optimizers.Adam(learning_rate=1e-3),loss=keras.losses.BinaryCrossentropy(from_logits=True))
model.fit(ds, epochs=4, steps_per_epoch=10)



In [9]:
for x, y in ds.take(1):
    print({k: v.shape for k, v in x.items()})

{'DBZ': TensorShape([32, 1, 120, 240, 2]), 'VEL': TensorShape([32, 1, 120, 240, 2]), 'KDP': TensorShape([32, 1, 120, 240, 2]), 'RHOHV': TensorShape([32, 1, 120, 240, 2]), 'ZDR': TensorShape([32, 1, 120, 240, 2]), 'WIDTH': TensorShape([32, 1, 120, 240, 2]), 'range_folded_mask': TensorShape([32, 1, 120, 240, 2]), 'label': TensorShape([32, 1]), 'category': TensorShape([32, 1]), 'event_id': TensorShape([32, 1]), 'ef_number': TensorShape([32, 1]), 'az_lower': TensorShape([32, 1]), 'az_upper': TensorShape([32, 1]), 'rng_lower': TensorShape([32, 1]), 'rng_upper': TensorShape([32, 1]), 'time': TensorShape([32, 1]), 'tornado_start_time': TensorShape([32, 1]), 'tornado_end_time': TensorShape([32, 1])}


In [4]:
# Get training dataset
years = [2013, 2014, 2015, 2016, 2017, 2018]
ds = load_dataset(data_root, data_type='train', years=years, variables=ALL_VARIABLES)
ds_test = load_dataset(data_root, data_type='test', years=years, variables=ALL_VARIABLES)

In [8]:
from tensorflow.keras.layers import Conv2D, Dropout, GlobalAveragePooling2D, Dense, BatchNormalization
from tensorflow.keras.layers import Lambda
from tensorflow.keras import Input

def add_derived_features(inputs):
    """Add only ZDR/KDP ratio as a derived feature."""
    derived_inputs = {}
    if "ZDR" in inputs and "KDP" in inputs:
        # Compute ZDR/KDP ratio
        zdr_kdp_ratio = Lambda(
            lambda x: tf.math.divide_no_nan(x[0], x[1] + 1e-6), name="zdr_kdp_ratio"
        )([inputs["ZDR"], inputs["KDP"]])
        derived_inputs["zdr_kdp_ratio"] = zdr_kdp_ratio

    inputs.update(derived_inputs)
    return inputs

def create_model_improved(architecture):
    """Create a model using six base radar images and ZDR/KDP ratio."""
    inputs = {
        "DBZ": Input(shape=(120, 240, 2), name="DBZ"),
        "VEL": Input(shape=(120, 240, 2), name="VEL"),
        "KDP": Input(shape=(120, 240, 2), name="KDP"),
        "RHOHV": Input(shape=(120, 240, 2), name="RHOHV"),
        "ZDR": Input(shape=(120, 240, 2), name="ZDR"),
        "WIDTH": Input(shape=(120, 240, 2), name="WIDTH"),
    }


    # Normalize inputs
    norm_layers = []
    for v in inputs:
        min_max = np.array(CHANNEL_MIN_MAX.get(v, [0, 1]))
        var = ((min_max[1] - min_max[0]) / 2) ** 2
        var = np.array(2 * [var])
        offset = (min_max[0] + min_max[1]) / 2
        offset = np.array(2 * [offset])
        norm_layers.append(
            keras.layers.Normalization(mean=offset, variance=var, name=f'Normalized_{v}')
        )

    # Concatenate normalized inputs
    x = keras.layers.Concatenate(axis=-1, name='Concatenate1')(
        [l(inputs[v]) for l, v in zip(norm_layers, inputs.keys())]
    )
    x = keras.layers.Lambda(lambda x: tf.where(tf.math.is_nan(x), -3.0, x), name='ReplaceNan')(x)

    # Architecture-specific logic
    if architecture == "improved":
        x = Conv2D(32, 3, padding='same', activation='relu', kernel_regularizer='l2')(x)
        x = BatchNormalization()(x)
        x = Dropout(0.3)(x)
        x = Conv2D(64, 3, padding='same', activation='relu', kernel_regularizer='l2')(x)
        x = GlobalAveragePooling2D()(x)
        x = Dense(1, activation='sigmoid', name='TornadoLikelihood')(x)
    else:
        raise ValueError("Unknown architecture type")

    return keras.Model(inputs=inputs, outputs=x, name=f'TornadoDetector_{architecture}')

def preprocess_with_derived_features(inputs, labels):
    """Add ZDR/KDP ratio during preprocessing."""
    feature_inputs = {
        "DBZ": inputs["DBZ"],
        "VEL": inputs["VEL"],
        "KDP": inputs["KDP"],
        "RHOHV": inputs["RHOHV"],
        "ZDR": inputs["ZDR"],
        "WIDTH": inputs["WIDTH"],
    }

    # Add ZDR/KDP ratio with the correct tensor name
    if "ZDR" in inputs and "KDP" in inputs:
        zdr_kdp_ratio = tf.math.divide_no_nan(inputs["ZDR"], inputs["KDP"] + 1e-6)
        feature_inputs["zdr_kdp_ratio"] = tf.identity(zdr_kdp_ratio, name="zdr_kdp_ratio")  # Explicitly name the tensor

    return feature_inputs, labels


# Define the batch size
BATCH_SIZE = 32
data_root = "C:/Users/mjhig/tornet_2013"

# Preprocess datasets
try:
    ds = (
        ds
        .map(tfpp.split_x_y)
        .prefetch(tf.data.AUTOTUNE)
        .batch(BATCH_SIZE)
    )
except:
    print("Dataset preprocessing skipped.")


from tensorflow.keras.metrics import AUC

# Create the model
model = create_model_improved("improved")

# Compile the model
model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=.01),
    loss=keras.losses.BinaryCrossentropy(from_logits=False),
    metrics=[AUC(name="AUC")]
)

# Train the model
model.fit(ds,epochs=3, callbacks=[
        keras.callbacks.EarlyStopping(monitor="val_loss", patience=5, restore_best_weights=True),
        keras.callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=2),
    ]
)
history_df = pd.DataFrame(history.history)
history_df.plot()
# # Evaluate the model
# results = model.evaluate(ds_test)
# print(f"Test results: {results}")



Dataset preprocessing skipped.
Epoch 1/3


ValueError: Input 0 of layer "TornadoDetector_improved" is incompatible with the layer: expected shape=(None, 120, 240, 2), found shape=(None, 1, 120, 240, 2)

In [None]:
for batch in ds.take(1):
    inputs, labels = batch
    print("Input keys:", inputs.keys())

Input keys: dict_keys(['DBZ_gradients', 'DBZ', 'VEL_gradients', 'VEL', 'KDP_gradients', 'KDP', 'RHOHV_gradients', 'RHOHV', 'ZDR_gradients', 'ZDR', 'WIDTH_gradients', 'WIDTH', 'zdr_kdp_ratio'])


In [None]:
# Evaluate
import tornet.metrics.keras.metrics as km
metrics = [keras.metrics.AUC(from_logits=True,name='AUC'),
           km.BinaryAccuracy(from_logits=True,name='BinaryAccuracy'), 
           ]
model.compile(loss=keras.losses.BinaryCrossentropy(from_logits=True),metrics=metrics)

# steps=10 for demo purposes
model.evaluate(ds_test,steps=10)

In [None]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt

def make_gradcam_heatmap_with_input_gradients(model_inputs, model, last_conv_layer_name, pred_index=None):
    grad_model = tf.keras.models.Model(
        [model.inputs],
        [model.get_layer(last_conv_layer_name).output, model.output]
    )

    with tf.GradientTape(persistent=True) as tape:
        tape.watch(model_inputs)  # Watch the inputs to compute gradients
        conv_outputs, predictions = grad_model(model_inputs)
        if pred_index is None:
            pred_index = tf.argmax(predictions[0])
        class_channel = predictions[:, pred_index]

    # Gradients with respect to the inputs
    input_grads = tape.gradient(class_channel, model_inputs)

    # Gradients with respect to the last conv layer
    conv_grads = tape.gradient(class_channel, conv_outputs)
    pooled_grads = tf.reduce_mean(conv_grads, axis=(0, 1, 2))  # Average gradients over width and height
    #pooled_grads=conv_grads
    heatmap = tf.reduce_sum(tf.multiply(pooled_grads, conv_outputs), axis=-1)  # Sum across channels

    heatmap = np.maximum(heatmap, 0)  # ReLU to remove negative values
    if tf.reduce_max(heatmap) > 0:
        heatmap /= tf.reduce_max(heatmap)  # Normalize heatmap to [0, 1]

    return np.array(heatmap), input_grads




# Unpack the dataset
for sample in ds_test.take(1):  # Take a single batch from the dataset
    inputs, _ = sample  # Assuming the dataset is structured as (features, labels)
    break

# Prepare the list of inputs for the model
model_inputs = [
    inputs['DBZ'], 
    inputs['VEL'], 
    inputs['KDP'], 
    inputs['RHOHV'], 
    inputs['ZDR'], 
    inputs['WIDTH']
]

# Generate heatmap and input gradients
heatmap, input_grads = make_gradcam_heatmap_with_input_gradients(model_inputs, model, 'TornadoLikelihood')

# Calculate the importance of each input
input_importance = [tf.reduce_mean(tf.abs(grad)).numpy() for grad in input_grads]

# Map input names to their importance
input_names = ['DBZ', 'VEL', 'KDP', 'RHOHV', 'ZDR', 'WIDTH']
input_importance_dict = dict(zip(input_names, input_importance))

# Print or plot the importance of each input
print("Input importance:", input_importance_dict)


In [None]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt

def make_gradcam_heatmap_with_input_gradients(model_inputs, model, last_conv_layer_name, pred_index=None):
    grad_model = tf.keras.models.Model(
        [model.inputs],
        [model.get_layer(last_conv_layer_name).output, model.output]
    )

    with tf.GradientTape(persistent=True) as tape:
        tape.watch(model_inputs)  # Watch the inputs to compute gradients
        conv_outputs, predictions = grad_model(model_inputs)
        if pred_index is None:
            pred_index = tf.argmax(predictions[0])
        class_channel = predictions[:, pred_index]

    # Gradients with respect to the inputs
    input_grads = tape.gradient(class_channel, model_inputs)

    # Gradients with respect to the last conv layer
    conv_grads = tape.gradient(class_channel, conv_outputs)
    #pooled_grads = tf.reduce_mean(conv_grads, axis=(0, 1, 2))  # Average gradients over width and height
    pooled_grads=conv_grads
    heatmap = tf.reduce_sum(tf.multiply(pooled_grads, conv_outputs), axis=-1)  # Sum across channels

    heatmap = np.maximum(heatmap, 0)  # ReLU to remove negative values
    if tf.reduce_max(heatmap) > 0:
        heatmap /= tf.reduce_max(heatmap)  # Normalize heatmap to [0, 1]

    return np.array(heatmap), input_grads

# Unpack the dataset
for sample in ds_test.take(1):  # Take a single batch from the dataset
    inputs, y = sample  # Assuming the dataset is structured as (features, labels)
    break

# Prepare the list of inputs for the model
model_inputs = [
    inputs['DBZ'], 
    inputs['VEL'], 
    inputs['KDP'], 
    inputs['RHOHV'], 
    inputs['ZDR'], 
    inputs['WIDTH']
]

# Generate heatmap and input gradients
heatmap, input_grads = make_gradcam_heatmap_with_input_gradients(model_inputs, model, 'TornadoLikelihood')

# Calculate the importance of each input
input_importance = [tf.reduce_mean(tf.abs(grad)).numpy() for grad in input_grads]

# Map input names to their importance
input_names = ['DBZ', 'VEL', 'KDP', 'RHOHV', 'ZDR', 'WIDTH']
radar_cmap_mapping = {
    "DBZ": "viridis",  # Reflectivity
    "VEL": "coolwarm",  # Velocity
    "KDP": "Greens",    # Specific Differential Phase
    "RHOHV": "hot",     # Correlation Coefficient
    "ZDR": "coolwarm",  # Differential Reflectivity
    "WIDTH": "inferno"  # Spectrum Width
}
input_importance_dict = dict(zip(input_names, input_importance))

print("Input importance:", input_importance_dict)

# Analyze and plot each input slice
for i, grad in enumerate(input_grads):
    input_name = input_names[i]

    # Compute mean absolute gradient for each spatial location
    grad_abs_mean = tf.reduce_mean(tf.abs(grad), axis=-1).numpy()  # Shape: (batch_size, height, width)

    # Select the most important regions for the first sample in the batch
    important_regions = grad_abs_mean[0]  # First sample in batch
    input_data = model_inputs[i][7]
    print(y)
    input_data=input_data[..., 0] 
    print(f"Most important regions for {input_name}:")
    print("Shape of importance map:", important_regions.shape)
    print(input_data.shape)
    # Visualize the input data and importance map side by side
    fig, axes = plt.subplots(1, 2, figsize=(12, 6), edgecolor='k')
    fig.suptitle(f"Input and Importance Map for {input_name}")

    # Plot the original input
    input_name = input_names[i]
    axes[0].imshow(input_data,cmap=radar_cmap_mapping[input_name])
    axes[0].set_title(f"Original {input_name}")
    axes[0].axis('off')

    # Plot the importance map
    im = axes[1].imshow(important_regions, cmap="hot")
    axes[1].set_title(f"Importance Map for {input_name}")
    axes[1].axis('off')

    # Add colorbar to the importance map
    fig.colorbar(im, ax=axes[1], label="Importance")
    plt.show()


In [None]:
from tornet.display.display import plot_radar
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt

def make_gradcam_heatmap_with_input_gradients(model_inputs, model, last_conv_layer_name, pred_index=None):
    grad_model = tf.keras.models.Model(
        [model.inputs],
        [model.get_layer(last_conv_layer_name).output, model.output]
    )

    with tf.GradientTape(persistent=True) as tape:
        tape.watch(model_inputs)  # Watch the inputs to compute gradients
        conv_outputs, predictions = grad_model(model_inputs)
        if pred_index is None:
            pred_index = tf.argmax(predictions[0])
        class_channel = predictions[:, pred_index]

    # Gradients with respect to the inputs
    input_grads = tape.gradient(class_channel, model_inputs)

    # Gradients with respect to the last conv layer
    conv_grads = tape.gradient(class_channel, conv_outputs)
    pooled_grads = tf.reduce_mean(conv_grads, axis=(0, 1, 2))  # Average gradients over width and height
    heatmap = tf.reduce_sum(tf.multiply(pooled_grads, conv_outputs), axis=-1)  # Sum across channels

    heatmap = np.maximum(heatmap, 0)  # ReLU to remove negative values
    if tf.reduce_max(heatmap) > 0:
        heatmap /= tf.reduce_max(heatmap)  # Normalize heatmap to [0, 1]

    return np.array(heatmap), input_grads




# Unpack the dataset
for sample in ds_test.take(1):  # Take a single batch from the dataset
    inputs, _ = sample  # Assuming the dataset is structured as (features, labels)
    break

# Prepare the list of inputs for the model
model_inputs = [
    inputs['DBZ'], 
    inputs['VEL'], 
    inputs['KDP'], 
    inputs['RHOHV'], 
    inputs['ZDR'], 
    inputs['WIDTH']
]

# Generate heatmap and input gradients
heatmap, input_grads = make_gradcam_heatmap_with_input_gradients(model_inputs, model, 'TornadoLikelihood')

# Calculate the importance of each input
input_importance = [tf.reduce_mean(tf.abs(grad)).numpy() for grad in input_grads]

# Map input names to their importance
input_names = ['DBZ', 'VEL', 'KDP', 'RHOHV', 'ZDR', 'WIDTH']
input_importance_dict = dict(zip(input_names, input_importance))

# Print or plot the importance of each input
print("Input importance:", input_importance_dict)


# Analyze each input slice
for i, grad in enumerate(input_grads):
    input_name = input_names[i]

    # Compute mean absolute gradient for each spatial location
    grad_abs_mean = tf.reduce_mean(tf.abs(grad), axis=-1).numpy()  # Shape: (batch_size, height, width)

    # Select the most important regions for the first sample in the batch
    important_regions = grad_abs_mean[0]  # First sample in batch

    print(f"Most important regions for {input_name}:")
    print("Shape of importance map:", important_regions.shape)

    # Visualize the importance map
    fig = plt.figure(figsize=(12,6),edgecolor='k')
    plt.title(f"Importance Map for {input_name}")
    #plot_radar(data=important_regions,fig=fig,channels=ALL_VARIABLES)
    plt.imshow(important_regions, cmap="hot")
    plt.colorbar(label="Importance")
    plt.show()


In [None]:
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.feature_selection import mutual_info_classif
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split

# Function to create derived features from radar variables
def create_derived_features(inputs):
    derived_features = []
    for var_name, data in inputs.items():
        # Skip non-radar variables
        if var_name not in ["DBZ", "VEL", "KDP", "RHOHV", "ZDR", "WIDTH"]:
            continue
        
        # Global statistics
        global_mean = np.mean(data)
        global_std = np.std(data)
        global_max = np.max(data)
        global_min = np.min(data)
        
        # Spatial gradients
        dx = np.mean(np.gradient(data, axis=1))  # Vertical gradient
        dy = np.mean(np.gradient(data, axis=2))  # Horizontal gradient
        
        # Add derived features
        derived_features.extend([global_mean, global_std, global_max, global_min, dx, dy])
        
        # Custom meteorological feature: Example ZDR/KDP ratio
        if var_name == "ZDR" and "KDP" in inputs:
            zdr_kdp_ratio = np.mean(inputs["ZDR"]) / (np.mean(inputs["KDP"]) + 1e-6)
            derived_features.append(zdr_kdp_ratio)
    
    return np.array(derived_features)

# Generate features and labels from the dataset
def generate_features_and_labels(dataset):
    features = []
    labels = []
    for batch in dataset:
        batch_inputs, batch_labels = batch
        for i in range(batch_inputs["DBZ"].shape[0]):  # Iterate over each sample in the batch
            sample_inputs = {k: v[i].numpy() for k, v in batch_inputs.items()}
            sample_label = batch_labels[i].numpy()
            
            # Create derived features
            derived_features = create_derived_features(sample_inputs)
            features.append(derived_features)
            labels.append(sample_label)
    
    return np.array(features), np.array(labels)

# Generate derived features and labels
features, labels = generate_features_and_labels(ds_test)

# Normalize features for further analysis
scaler = MinMaxScaler()
scaled_features = scaler.fit_transform(features)

# Split data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(scaled_features, labels, test_size=0.2, random_state=42)

# Train a Random Forest model to calculate feature importance
rf_model = RandomForestClassifier(random_state=42)
rf_model.fit(X_train, y_train)

# Calculate feature importance
feature_importance = rf_model.feature_importances_

# # Calculate mutual information
# mutual_info = mutual_info_classif(X_train, y_train, random_state=42)

# # Create a DataFrame to rank features
# num_features = features.shape[1]
# feature_names = [f"Feature_{i}" for i in range(num_features)]
# importance_df = pd.DataFrame({
#     "Feature": feature_names,
#     "Importance": feature_importance,
#     "Mutual Information": mutual_info
# })

# # Sort features by importance
# importance_df = importance_df.sort_values(by="Importance", ascending=False)

# # Display the feature ranking
# import ace_tools as tools; tools.display_dataframe_to_user(name="Feature Importance and Mutual Information", dataframe=importance_df)

# # Plot feature importance
# import matplotlib.pyplot as plt

# plt.figure(figsize=(12, 6))
# plt.barh(importance_df["Feature"], importance_df["Importance"])
# plt.gca().invert_yaxis()
# plt.title("Feature Importance")
# plt.xlabel("Importance")
# plt.show()
