Skip to content

Commit

Permalink
Merge pull request #49 from NREL/gb/gauss_pool
Browse files Browse the repository at this point in the history
Gb/gauss pool
  • Loading branch information
grantbuster committed May 8, 2024
2 parents 836060f + 3ab5038 commit 1ce0ef1
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 1 deletion.
4 changes: 4 additions & 0 deletions phygnn/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
from .base import CustomNetwork, GradientUtils
from .phygnn import PhysicsGuidedNeuralNetwork
from .layers import Layers, HiddenLayers
from .layers.custom_layers import GaussianKernelInit2D
from .utilities import PreProcess, tf_isin, tf_log10
from phygnn.version import __version__
from tensorflow.keras.utils import get_custom_objects

get_custom_objects()['GaussianKernelInit2D'] = GaussianKernelInit2D

__author__ = """Grant Buster"""
__email__ = "grant.buster@nrel.gov"
Expand Down
45 changes: 45 additions & 0 deletions phygnn/layers/custom_layers.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,51 @@ def call(self, x):
return x * rand_tensor


class GaussianKernelInit2D(tf.keras.initializers.Initializer):
"""Convolutional kernel initializer that creates a symmetric 2D array with
a gaussian distribution. This can be used with Conv2D as a gaussian average
pooling layer if trainable=False
"""

def __init__(self, stdev=1):
"""
Parameters
----------
stdev : float
Standard deviation of the gaussian distribution defining the kernel
values
"""
self.stdev = stdev

def __call__(self, shape, dtype=tf.float32):
"""
Parameters
---------
shape : tuple
Shape of the input tensor, typically (y, x, n_features, n_obs)
dtype : None | tf.DType
Tensorflow datatype e.g., tf.float32
Returns
-------
kernel : tf.Tensor
Kernel tensor of shape (y, x, n_features, n_obs) for use in a
Conv2D layer.
"""

ax = np.linspace(-(shape[0] - 1) / 2., (shape[0] - 1) / 2., shape[0])
kernel = np.exp(-0.5 * np.square(ax) / np.square(self.stdev))
kernel = np.outer(kernel, kernel)
kernel = kernel / np.sum(kernel)

kernel = np.expand_dims(kernel, (2, 3))
kernel = np.repeat(kernel, shape[2], axis=2)
kernel = np.repeat(kernel, shape[3], axis=3)

kernel = tf.convert_to_tensor(kernel, dtype=dtype)
return kernel


class FlattenAxis(tf.keras.layers.Layer):
"""Layer to flatten an axis from a 5D spatiotemporal Tensor into axis-0
observations."""
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ numpy>=1.16
pandas>=0.25
pytest>=5.2
scikit-learn>=1.2
tensorflow
tensorflow>2.4,<2.16
54 changes: 54 additions & 0 deletions tests/test_layers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""
Test the custom tensorflow utilities
"""
import os
from tempfile import TemporaryDirectory
import numpy as np
import pytest
import tensorflow as tf
Expand All @@ -13,8 +15,10 @@
SpatioTemporalExpansion,
TileLayer,
FunctionalLayer,
GaussianKernelInit2D,
)
from phygnn.layers.handlers import HiddenLayers, Layers
from phygnn import TfModel


@pytest.mark.parametrize(
Expand Down Expand Up @@ -444,3 +448,53 @@ def test_functional_layer():
with pytest.raises(AssertionError) as excinfo:
FunctionalLayer('bad_arg', 0)
assert "must be one of" in str(excinfo.value)


def test_gaussian_kernel():
"""Test the gaussian kernel initializer for gaussian average pooling"""

kernels = []
biases = []
for stdev in [1, 2]:
kinit = GaussianKernelInit2D(stdev=stdev)
layer = tf.keras.layers.Conv2D(filters=16, kernel_size=5, strides=1,
padding='valid',
kernel_initializer=kinit)
_ = layer(np.ones((24, 100, 100, 35)))
kernel = layer.weights[0].numpy()
bias = layer.weights[1].numpy()
kernels.append(kernel)
biases.append(bias)

assert (kernel[:, :, 0, 0] == kernel[:, :, -1, -1]).all()
assert kernel[:, :, 0, 0].sum() == 1
assert (bias == 0).all()
assert kernel[2, 2, 0, 0] == kernel.max()
assert kernel[0, 0, 0, 0] == kernel.min()
assert kernel[-1, -1, 0, 0] == kernel.min()

assert kernels[1].max() < kernels[0].max()
assert kernels[1].min() > kernels[0].min()

layers = [{'class': 'Conv2D', 'filters': 16, 'kernel_size': 3,
'kernel_initializer': GaussianKernelInit2D(),
'trainable': False}]
model1 = TfModel.build(['a'], ['b'], hidden_layers=layers,
input_layer=False, output_layer=False)
x_in = np.random.uniform(0, 1, (10, 12, 12, 1))
out1 = model1.predict(x_in)
kernel1 = model1.layers[0].weights[0][:, :, 0, 0].numpy()

with TemporaryDirectory() as td:
model_path = os.path.join(td, 'test_model')
model1.save_model(model_path)
model2 = TfModel.load(model_path)

kernel2 = model2.layers[0].weights[0][:, :, 0, 0].numpy()
out2 = model2.predict(x_in)
assert np.allclose(kernel1, kernel2)
assert np.allclose(out1, out2)

layer = model2.layers[0]
x_in = np.random.uniform(0, 1, (10, 24, 24, 1))
_ = model2.predict(x_in)

0 comments on commit 1ce0ef1

Please sign in to comment.