# This is a demo for sfc project
Author: Martin Ptacek
Project variant: Predikce řešená dopřednou backprop. NN
Recurrent networks, 2/29. - Tapped delay line forward feed neural net or TDNN

In [2]:
import numpy as np
import pandas as pd

In [3]:
# load dataset
path = 'dataset_slim.csv'
df = pd.read_csv(path)
print(df.head())

              DateTime  Consumption
0  2019-01-01 00:00:00         6352
1  2019-01-01 01:00:00         6116
2  2019-01-01 02:00:00         5873
3  2019-01-01 03:00:00         5682
4  2019-01-01 04:00:00         5557


In [4]:
# Check for missing values in the data
df.isnull().sum()

DateTime       0
Consumption    0
dtype: int64

In [5]:
# Check for anomalies
df.describe()

Unnamed: 0,Consumption
count,46011.0
mean,6587.61644
std,1043.654923
min,3889.0
25%,5773.0
50%,6552.0
75%,7321.0
max,9615.0


In [6]:
# Get numpy array and normalize it
array = df["Consumption"].to_numpy()
array = array[:(len(array) // 24) * 24]


In [7]:
# downsample to day
factor = 24

# Reshape data to ensure it can be averaged in chunks
downsampled_data = np.mean(array.reshape(-1, factor), axis=1)


# Z-score normalization
std = np.std(downsampled_data)
mean = np.mean(downsampled_data)
normalized_array = (downsampled_data - mean) / std

In [None]:
# model my own

def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def sigmoid_deriv(x):
    return x * (1 - x)

# Two layer neural network with one hidden layer
class TDNN(object):

    def __init__(self, delay_line_length, hidden_layer_size, learning_rate):
        self.output_layer_size = 1 # predict one next value from n previous values
        self.delay_line_length = delay_line_length
        self.hidden_layer_size = hidden_layer_size
        self.learning_rate = learning_rate
        
        self.initialize_weights()
        self.initialize_biases()
        
    def initialize_weights(self):
        """Perform a simple random initialization of the weights"""
        self.weights_hidden_layer = np.random.rand(self.delay_line_length, self.hidden_layer_size)
        self.weights_output_layer = np.random.rand(self.hidden_layer_size, self.output_layer_size)
    
    def initialize_biases(self):
        self.bias_hidden_layer = np.random.rand(1, self.hidden_layer_size)
        self.bias_output_layer = np.random.rand(1, self.output_layer_size)
        
    def forward(self, input_data):
        assert len(input_data) == self.delay_line_length, "Input data must have the same length as the delay line"
        
        self.hidden_layer_output = sigmoid(np.dot(input_data, self.weights_hidden_layer) + self.bias_hidden_layer)
        return sigmoid(np.dot(self.hidden_layer_output, self.weights_output_layer) + self.bias_output_layer)
    
    def backward(self, input_data, desired_output, actual_output):
        # TODO copyed, rewrite this thing
        # Calculating the error
        output_error = desired_output - actual_output
        output_delta = output_error * sigmoid_deriv(actual_output)

        # Calculating the hidden layer error
        hidden_error = output_delta.dot(self.weights_output_layer.T)
        hidden_delta = hidden_error * sigmoid_deriv(self.hidden_output)
        
        lr = self.learning_rate

        # Update the weights and biases
        self.weights_hidden_layer += self.hidden_output.T.dot(output_delta) * lr
        self.bias_output_layer += np.sum(output_delta, axis=0, keepdims=True) * lr
        self.weights_hidden_layer += input_data.T.dot(hidden_delta) * lr
        self.bias_hidden_layer += np.sum(hidden_delta, axis=0, keepdims=True) * lr
    
    # TODO
    def predict_next_n_values(self, n, input_data):
        pass
    
    # TODO
    def train(self, timeseries_data):
        pass


In [20]:
class TappedDelayLine:
    def __init__(self, delay_length):
        self.delay_length = delay_length
        self.buffer = np.zeros(delay_length)  # Store past inputs
    
    def process(self, x):
        # Shift the values in the buffer
        self.buffer = np.roll(self.buffer, 1)
        self.buffer[0] = x  # Insert the new sample at the front
        return self.buffer  # Return the entire buffer as input

# Define a simple Neural Network class
class TDLNN:
    def __init__(self, delay_length, input_size, output_size):
        # Create a tapped delay line with a specific delay length
        self.tdl = TappedDelayLine(delay_length)
        self.input_size = input_size
        self.output_size = output_size
        
        # Initialize weights and bias
        self.weights = np.random.randn(delay_length * input_size, output_size)
        self.bias = np.zeros((1, output_size))
    
    def forward(self, x):
        # Process the input through the delay line
        delayed_input = self.tdl.process(x)
        
        # Flatten delayed input to feed into the neural network
        delayed_input_flat = delayed_input.flatten()
        
        # Compute the output of the network
        output = np.dot(delayed_input_flat, self.weights) + self.bias
        return output
    
    def backward(self, x, target, learning_rate=0.01):
        # Forward pass to get prediction
        output = self.forward(x)
        
        # Compute the error
        error = output - target
        
        # Backpropagation: Compute gradients for weights and bias
        delayed_input = self.tdl.buffer
        delayed_input_flat = delayed_input.flatten()
        
        # Gradients for weights and bias
        d_weights = np.outer(delayed_input_flat, error)
        d_bias = error
        
        # Update weights and biases
        self.weights -= learning_rate * d_weights
        self.bias -= learning_rate * d_bias
    
    def train(self, X, Y, epochs=1000, learning_rate=0.1):
        for epoch in range(epochs):
            total_loss = 0
            for i in range(len(X)):
                x = X[i]
                y = Y[i]
                
                output = self.forward(x)
                
                # Compute the loss (Mean Squared Error)
                loss = np.mean((output - y) ** 2)
                total_loss += loss  # Accumulate loss

                self.backward(x, y, learning_rate)
                
            if epoch % 100 == 0:
                avg_loss = total_loss / len(X)
                print(f'Epoch {epoch}/{epochs} complete, avg loss: {avg_loss:.6f}')

In [21]:
# Create a simple dataset for training
# For simplicity, let's train the TDLNN to learn a simple time-series function
# (e.g., a delayed signal) and learn to predict future values.
X = np.random.randn(1000)  # 1000 samples of random input
Y = np.roll(X, -1)  # Target is simply a shifted version of the input signal

# Reshape X and Y to match expected input/output sizes
X = X.reshape(-1, 1)
Y = Y.reshape(-1, 1)

# Initialize and train the TDLNN
delay_length = 5  # Using a delay of 5 time steps
input_size = 1  # Each input is a scalar (in this case)
output_size = 1  # Output is a scalar as well
model = TDLNN(delay_length, input_size, output_size)

# Train the model
model.train(X, Y, epochs=1000, learning_rate=0.01)

# Test the trained model
test_input = np.random.randn(1)
predicted_output = model.forward(test_input)
print(f"Test Input: {test_input}")
print(f"Predicted Output: {predicted_output}")


  self.buffer[0] = x  # Insert the new sample at the front


Epoch 0/1000 complete, avg loss: 11.320745
Epoch 100/1000 complete, avg loss: 11.527012
Epoch 200/1000 complete, avg loss: 11.527012
Epoch 300/1000 complete, avg loss: 11.527012
Epoch 400/1000 complete, avg loss: 11.527012


KeyboardInterrupt: 

In [15]:
delay_line_length = 30 * 12 * 3
model = FeedForwardNeuralNetwork(delay_line_length, 300)
model.train(normalized_array, 1000, 0.01)

ValueError: shapes (32,32) and (1,300) not aligned: 32 (dim 1) != 1 (dim 0)