In [1]:
# @title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

<a href="https://colab.research.google.com/github/lmoroney/dlaicourse/blob/master/TensorFlow%20In%20Practice/Course%204%20-%20S%2BP/S%2BP%20Week%201%20-%20Lesson%203%20-%20Notebook.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
try:
    # %tensorflow_version only exists in Colab.
    %tensorflow_version 2.x
except Exception:
    pass

In [3]:
import tensorflow as tf

print(tf.__version__)

2.9.0


The next code block will set up the time series with seasonality, trend and a bit of noise. 

In [4]:
import numpy as np
import matplotlib.pyplot as plt
from tensorflow import keras
from pathlib import Path
from datetime import datetime
import io

In [5]:
def rm_tree(pth):
    pth = Path(pth)
    for child in pth.iterdir():
        if child.is_file():
            child.unlink()
        else:
            rm_tree(child)
    pth.rmdir()


if (logdir := Path.joinpath(Path.cwd(), Path("logs"))).exists():
    rm_tree(str(logdir.resolve()))

Path.mkdir(logdir, parents=True, exist_ok=True)

# Clear out prior logging data.
if (plotdir := Path.joinpath(logdir, "plots")).exists():
    rm_tree(str(plotdir.resolve()))

plotdir = Path.joinpath(plotdir, f"{datetime.now().strftime('%Y%m%d-%H%M%S')}")
file_writer = tf.summary.create_file_writer(str(plotdir.resolve()))


def plot_to_image(figure):
    """Converts the matplotlib plot specified by 'figure' to a PNG image and
    returns it. The supplied figure is closed and inaccessible after this call."""
    # Save the plot to a PNG in memory.
    buf = io.BytesIO()
    plt.savefig(buf, format="png")
    # Closing the figure prevents it from being displayed directly inside
    # the notebook.
    plt.close(figure)
    buf.seek(0)
    # Convert PNG buffer to TF image
    image = tf.image.decode_png(buf.getvalue(), channels=4)
    # Add the batch dimension
    image = tf.expand_dims(image, 0)
    return image


def plot_series(time, series, format="-", start=0, end=None, series_name=""):
    figure = plt.figure(figsize=(10, 10))
    plt.plot(time[start:end], series[start:end], format)
    plt.xlabel("Time")
    plt.ylabel("Value")
    plt.grid(True)
    with file_writer.as_default():
        tf.summary.image(series_name, plot_to_image(figure), step=0)


def trend(time, slope=0):
    return slope * time


def seasonal_pattern(season_time):
    """Just an arbitrary pattern, you can change it if you wish"""
    return np.where(
        season_time < 0.4, np.cos(season_time * 2 * np.pi), 1 / np.exp(3 * season_time)
    )


def seasonality(time, period, amplitude=1, phase=0):
    """Repeats the same pattern at each period"""
    season_time = ((time + phase) % period) / period
    return amplitude * seasonal_pattern(season_time)


def noise(time, noise_level=1, seed=None):
    rnd = np.random.RandomState(seed)
    return rnd.randn(len(time)) * noise_level

In [6]:
time = np.arange(4 * 365 + 1, dtype="float32")
baseline = 10
series = trend(time, 0.1)
baseline = 10
amplitude = 40
slope = 0.05
noise_level = 5

# Create the series
series = (
    baseline + trend(time, slope) + seasonality(time, period=365, amplitude=amplitude)
)
# Update with noise
series += noise(time, noise_level, seed=42)

plot_series(time, series, series_name="series_with_noise_trend")

Now that we have the time series, let's split it so we can start forecasting

In [7]:
split_time = 1000
time_train = time[:split_time]
x_train = series[:split_time]
time_valid = time[split_time:]
x_valid = series[split_time:]

plot_series(time_train, x_train, series_name="series_with_noise_trend_with_split")

plot_series(time_valid, x_valid, series_name="series_with_validation")

# Naive Forecast

In [8]:
naive_forecast = series[split_time - 1 : -1]

In [9]:
format = "-"

In [10]:
start = 0
end = None
figure = plt.figure(figsize=(10, 6))
plt.plot(time_valid[start:end], x_valid[start:end], format)
plt.plot(time_valid[start:end], naive_forecast[start:end], format)
plt.xlabel("Time")
plt.ylabel("Value")
plt.grid(True)
with file_writer.as_default():
    tf.summary.image("naive_forecast", plot_to_image(figure), step=0)

Let's zoom in on the start of the validation period:

In [11]:
figure = plt.figure(figsize=(10, 6))
start = 0
end = 150
plt.plot(time_valid[start:end], x_valid[start:end], format)
start = 1
end = 151
plt.plot(time_valid[start:end], naive_forecast[start:end], format)
plt.xlabel("Time")
plt.ylabel("Value")
plt.grid(True)
with file_writer.as_default():
    tf.summary.image("naive_forecast_zoomed_in", plot_to_image(figure), step=0)

You can see that the naive forecast lags 1 step behind the time series.

Now let's compute the mean squared error and the mean absolute error between the forecasts and the predictions in the validation period:

In [12]:
print(keras.metrics.mean_squared_error(x_valid, naive_forecast).numpy())
print(keras.metrics.mean_absolute_error(x_valid, naive_forecast).numpy())

61.827534
5.9379086


That's our baseline, now let's try a moving average:

In [13]:
def moving_average_forecast(series, window_size):
    """Forecasts the mean of the last few values.
    If window_size=1, then this is equivalent to naive forecast"""
    forecast = [
        series[time : time + window_size].mean()
        for time in range(len(series) - window_size)
    ]
    return np.array(forecast)

In [14]:
moving_avg = moving_average_forecast(series, 30)[split_time - 30 :]

figure = plt.figure(figsize=(10, 6))
start = 0
end = None
plt.plot(time_valid[start:end], x_valid[start:end], format)
plt.plot(time_valid[start:end], moving_avg[start:end], format)
plt.xlabel("Time")
plt.ylabel("Value")
plt.grid(True)
with file_writer.as_default():
    tf.summary.image("naive_forecast_moving_averages", plot_to_image(figure), step=0)

In [15]:
print(keras.metrics.mean_squared_error(x_valid, moving_avg).numpy())
print(keras.metrics.mean_absolute_error(x_valid, moving_avg).numpy())

106.674576
7.142419


That's worse than naive forecast! The moving average does not anticipate trend or seasonality, so let's try to remove them by using differencing. Since the seasonality period is 365 days, we will subtract the value at time *t* – 365 from the value at time *t*.

In [23]:
diff_series = series[365:] - series[:-365]
diff_time = time[365:]

plot_series(diff_time, diff_series, series_name="seasonality_removed")

1096 1096 1096 1461


Great, the trend and seasonality seem to be gone, so now we can use the moving average:

In [17]:
diff_moving_avg = moving_average_forecast(diff_series, 50)[split_time - 365 - 50 :]

figure = plt.figure(figsize=(10, 6))
start = 0
end = None
split_diff = diff_series[split_time - 365 :]
plt.plot(time_valid[start:end], split_diff[start:end], format)
plt.plot(time_valid[start:end], diff_moving_avg[start:end], format)
plt.xlabel("Time")
plt.ylabel("Value")
plt.grid(True)
with file_writer.as_default():
    tf.summary.image("moving_avg_no_trend", plot_to_image(figure), step=0)

Now let's bring back the trend and seasonality by adding the past values from t – 365:

In [18]:
diff_moving_avg_plus_past = series[split_time - 365 : -365] + diff_moving_avg

figure = plt.figure(figsize=(10, 6))
start = 0
end = None
plt.plot(time_valid[start:end], x_valid[start:end], format)
plt.plot(time_valid[start:end], diff_moving_avg_plus_past[start:end], format)
plt.xlabel("Time")
plt.ylabel("Value")
plt.grid(True)
with file_writer.as_default():
    tf.summary.image("moving_avg_with_trend", plot_to_image(figure), step=0)

In [19]:
print(keras.metrics.mean_squared_error(x_valid, diff_moving_avg_plus_past).numpy())
print(keras.metrics.mean_absolute_error(x_valid, diff_moving_avg_plus_past).numpy())

52.97366
5.8393106


Better than naive forecast, good. However the forecasts look a bit too random, because we're just adding past values, which were noisy. Let's use a moving averaging on past values to remove some of the noise:

In [20]:
diff_moving_avg_plus_smooth_past = (
    moving_average_forecast(series[split_time - 370 : -360], 10) + diff_moving_avg
)

figure = plt.figure(figsize=(10, 6))
start = 0
end = None
plt.plot(time_valid[start:end], x_valid[start:end], format)
plt.plot(time_valid[start:end], diff_moving_avg_plus_smooth_past[start:end], format)
plt.xlabel("Time")
plt.ylabel("Value")
plt.grid(True)
with file_writer.as_default():
    tf.summary.image("moving_avg_with_smoothened_past", plot_to_image(figure), step=0)

In [21]:
print(
    keras.metrics.mean_squared_error(x_valid, diff_moving_avg_plus_smooth_past).numpy()
)
print(
    keras.metrics.mean_absolute_error(x_valid, diff_moving_avg_plus_smooth_past).numpy()
)

33.452267
4.569442
