# Coil Normalization and Neural Coil Layer Usage

In [1]:
from coilspy.normalization import CoilNormalizer, segment_time_series
from coilspy.neuralcoil import NeuralCoilLayer

import pandas as pd
import numpy as np

First we can load in some sample data (source: https://www.kaggle.com/datasets/mnassrib/jena-climate)

In [2]:
# Load in csv
df = pd.read_csv(r"../data/jena_climate_2009_2016.csv",
                parse_dates=['Date Time'],
                index_col=['Date Time'])
df.index = pd.to_datetime(df.index, format='%d.%m.%Y %H:%M:%S')

# Save data frame
df_orig = df.copy()
# For these tests we will just use a small slice of the dataset
df = df.iloc[:1000,:]

Then we can create a coil normalizer and create a new normalized dataframe. 

In [3]:
# Instantiate CoilNormalizer
coilnormer = CoilNormalizer()

coilnormed_df = coilnormer.normalize(df)

All the rows of this dataframe should sum to 1. 

In [4]:
coilnormed_df.sum(axis = 1)

Date Time
2009-01-01 00:20:00    1.0
2009-01-01 00:30:00    1.0
2009-01-01 00:40:00    1.0
2009-01-01 00:50:00    1.0
2009-01-01 01:00:00    1.0
                      ... 
2009-01-07 22:00:00    1.0
2009-01-07 22:10:00    1.0
2009-01-07 22:20:00    1.0
2009-01-07 22:30:00    1.0
2009-01-07 22:40:00    1.0
Length: 999, dtype: float64

Of course, we can also reverse normalization to get the original timeseries, provided an initial value. 

In [5]:
# We should be able to take any slice of the coilnormed timeseries and reproduce
start_index = 200
end_index = 300
df_orig_slice = df.iloc[start_index:end_index,:]
coilnormed_df_slice = coilnormed_df.iloc[(start_index):(end_index-1),:]
initial_value_slice = df.iloc[start_index,:]

denormed_slice = coilnormer.denormalize(coilnormed_df_slice,initial_value_slice)

Lets do a quick plot to make sure this looks good.

In [6]:
import plotly.graph_objects as go

# Sample data creation
df1 = denormed_slice
df2 = df_orig_slice

# Plotting
fig = go.Figure()

# Add traces for the first dataframe
for column in df1.columns:
    fig.add_trace(go.Scatter(x=df2.index, y=df1[column], mode='lines', name=f'Denormed: {column}'))

# Add traces for the second dataframe
for column in df2.columns:
    fig.add_trace(go.Scatter(x=df2.index, y=df2[column], mode='lines', name=f'Orig: {column}'))

# Update layout
fig.update_layout(title='Interactive Denormalization Check',
                  xaxis_title='Date',
                  yaxis_title='Value',
                  legend_title='Legend',
                  hovermode='x unified')

# Show the plot
fig.show()


## Neural Coil Layer Testing

We want our layer to predict the next value in a sequence, so let's split our coilnormed dataframe into lookbacks, with the objective being shifted by one step. 

In [7]:
# Neural Coil Testing
import torch

# Generate and segment the time series
series = coilnormed_df.values
length = 36

series_x = series[:-1,]
series_y = series[1:,]

segments_x = segment_time_series(series_x, length)
segments_y = segment_time_series(series_y, length)

# Convert to tensors
segments_tensor_x = torch.tensor(segments_x, dtype=torch.float)
segments_tensor_y = torch.tensor(segments_y, dtype=torch.float)

# Prepare inputs and targets
X = segments_tensor_x.to("cuda")
# Shift segments to the right by one timestep to create the targets
Y =  segments_tensor_y.to("cuda")

# Get number of features and batch size
n_features = X.shape[2]
batch_size = X.shape[0]

print("Available Batches: ", batch_size)

Available Batches:  27


Instead of a single layer, we can also stack coil layers to make a block:

In [8]:
from torch import nn, optim

In [9]:
class CoilBlock(nn.Module):
    def __init__(self, n_features, n_batch, device):
        super(CoilBlock, self).__init__()

        self.coil1 = NeuralCoilLayer(n_features = n_features, n_batch = n_batch, device= device)
        self.coil2 = NeuralCoilLayer(n_features = n_features, n_batch = n_batch, device= device)
        self.coil3 = NeuralCoilLayer(n_features = n_features, n_batch = n_batch, device= device)
    
    def forward(self, x):
        x, transition_tensor = self.coil1(x)
        x, transition_tensor = self.coil2(x)
        #x, transition_tensor = self.coil3(x)
        return x, transition_tensor

However, if we use a block we can't use the 'step_coil' feature that we can use for a single coil layer. 

Let's train a single coil layer on this data. 

In [10]:

# Model

# Single Coil
model = NeuralCoilLayer(
    n_features = n_features,
    n_batch = X.shape[0],
    device="cuda"
).to("cuda")

# Coil Block
# model = CoilBlock(n_features=n_features, n_batch= X.shape[0], device="cuda").to("cuda")

# Loss and optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

# Training loop
epochs = 3000
for epoch in range(epochs):
    model.train()
    optimizer.zero_grad()
    # Forward pass
    outputs, transition_tensor = model(X)
    loss = criterion(outputs, Y)

    # Backward and optimize
    loss.backward()
    optimizer.step()

    if (epoch + 1) % 100 == 0:
        print(f'Epoch [{epoch+1}/{epochs}], Loss: {loss.item()}')

Epoch [100/3000], Loss: 3.377693792572245e-05
Epoch [200/3000], Loss: 3.7850804801564664e-05
Epoch [300/3000], Loss: 2.5508190446998924e-05
Epoch [400/3000], Loss: 2.346028122701682e-05
Epoch [500/3000], Loss: 2.2439000531448983e-05
Epoch [600/3000], Loss: 2.0133060388616286e-05
Epoch [700/3000], Loss: 1.765019987942651e-05
Epoch [800/3000], Loss: 1.5537556464551017e-05
Epoch [900/3000], Loss: 1.3185126590542495e-05
Epoch [1000/3000], Loss: 1.1514682228153106e-05
Epoch [1100/3000], Loss: 1.0430933798488695e-05
Epoch [1200/3000], Loss: 9.840651728154626e-06
Epoch [1300/3000], Loss: 8.356259968422819e-06
Epoch [1400/3000], Loss: 7.708109478699043e-06
Epoch [1500/3000], Loss: 7.361285042861709e-06
Epoch [1600/3000], Loss: 6.680213573417859e-06
Epoch [1700/3000], Loss: 6.39645350020146e-06
Epoch [1800/3000], Loss: 6.377490535669494e-06
Epoch [1900/3000], Loss: 5.964533102087444e-06
Epoch [2000/3000], Loss: 5.473972123581916e-06
Epoch [2100/3000], Loss: 5.504981345438864e-06
Epoch [2200/300

Let's do some basic comparisons between our modelled and observed. 

In [11]:
import plotly.graph_objects as go

def plot_model_output_vs_target(model_outputs, targets, batch_index=0, feature_index=0):
    # Extract the specified feature for the given batch from both the model outputs and targets
    model_output_series = model_outputs[batch_index, :, feature_index].detach().numpy()
    target_series = targets[batch_index, :, feature_index].numpy()
    
    # Create a range for the x-axis (timesteps)
    timesteps = list(range(model_output_series.shape[0]))
    
    # Create traces
    model_trace = go.Scatter(x=timesteps, y=model_output_series, mode='lines', name='Model Output')
    target_trace = go.Scatter(x=timesteps, y=target_series, mode='lines', name='Target')
    
    # Create the figure and add traces
    fig = go.Figure()
    fig.add_trace(model_trace)
    fig.add_trace(target_trace)
    
    # Add title and labels
    fig.update_layout(title=f'Model Output vs Target for Feature {feature_index}, Batch {batch_index}',
                      xaxis_title='Timestep',
                      yaxis_title='Value')
    
    # Show the figure
    fig.show()

# Assuming `y` and `Y` are your model outputs and targets, respectively
# Adjust batch_index and feature_index as needed
plot_model_output_vs_target(outputs.to("cpu"), Y.to("cpu"), batch_index=1, feature_index=0)

Because coils are self-perpetuating dynamic systems, we can spin them up and let them run by using the 'step_coil' method. This gives a good look into just how crazy dynamics produces by coils on their own can be. 

In [12]:
states = []
# Select the batch we want to make predictions for
batch = 10


# Grab starting state tensor
state_tensor = X[:,0,:]

# How many steps do we want to run the coil overall?
max_steps = 100

batch_size = X.shape[0]
transition_tensor = torch.softmax(torch.zeros(batch_size, n_features, n_features), dim = 1).to("cuda")
for step_state in range(max_steps):
    state_tensor, transition_tensor = model.step_coil(state_tensor, transition_tensor)
    states.append(state_tensor[batch,:])

# Move state dynamics to CPU
data = [row.to('cpu').detach().numpy() for row in states]
# Transpose the data to get 5 traces
traces = list(zip(*data))


# Create the figure and add traces
fig = go.Figure()

# Plotting
for i, trace in enumerate(traces):
    model_trace = go.Scatter(y=trace, mode='lines', name=f'State {i}')
    fig.add_trace(model_trace)

# Add title and labels
fig.update_layout(title=f'Self-Perpetuating Coil Dynamics',
                    xaxis_title='Timestep',
                    yaxis_title='Value')

# Show the figure
fig.show()

## Coil Predictions vs Observed

In [52]:
states = []
# Select the batch we want to make predictions for
batch = 10

# After what step do we want the coil to start making its own predictions?
prediction_step = 24

# What features do we want to plot?
feature_sel = [0,17]

# Grab starting state tensor
state_tensor = X[:,0,:]

# How many steps do we want to run the coil overall?
max_steps = 70

states.append(state_tensor[batch,feature_sel])
batch_size = X.shape[0]
transition_tensor = torch.softmax(torch.zeros(batch_size, n_features, n_features), dim = 1).to("cuda")
for step_state in range(1,max_steps):
    state_tensor, transition_tensor = model.step_coil(state_tensor, transition_tensor)
    if step_state <= prediction_step:
        state_tensor = X[:,step_state,:]
    states.append(state_tensor[batch,feature_sel])
    
# Move state dynamics to CPU
data = [row.to('cpu').detach().numpy() for row in states]
traces = list(zip(*data))

# Create the figure and add traces
fig = go.Figure()

# Plotting
for i, trace in enumerate(traces):
    model_trace = go.Scatter(y=trace, mode='lines', name=f'Modelled Feature {feature_sel[i]}')
    fig.add_trace(model_trace)
    
    
# Get observed data to CPU and plot
data = X[batch,:,feature_sel].to('cpu')
# Transpose the data to get 5 traces
traces = list(zip(*data))

# Plotting
for i, trace in enumerate(traces):
    obs_trace = go.Scatter(y=trace, mode='lines', name=f'Observed Feature {feature_sel[i]}')
    fig.add_trace(obs_trace)
    
# Add Line highlighting when the coil becomes responsible for its own predictions
fig.add_vline(x=prediction_step, line_width=3, line_dash="dash", line_color="grey", annotation_text='Point of Prediction')    

# Add title and labels
fig.update_layout(title=f'Coil Training Modelled vs Observed Comparison',
                    xaxis_title='Timestep',
                    yaxis_title='Value')

# Show the figure
fig.show()

## Coil Memory Demonstration

In [53]:
import matplotlib.pyplot as plt

states = []
# Select the batch we want to make predictions for
batch = 10

# After what step do we want the coil to start making its own predictions?
prediction_step = 24

# What features do we want to plot?
feature_sel = [0,17]

# Grab starting state tensor
state_tensor = X[:,0,:]

# How many steps do we want to run the coil overall?
max_steps = 70

# Which timestamp do we want to force modify?
step_modify = 12

states.append(state_tensor[batch,feature_sel])
batch_size = X.shape[0]
transition_tensor = torch.softmax(torch.zeros(batch_size, n_features, n_features), dim = 1).to("cuda")
for step_state in range(1,max_steps):
    state_tensor, transition_tensor = model.step_coil(state_tensor, transition_tensor)
    if step_state <= prediction_step:
        state_tensor = X[:,step_state,:]
    states.append(state_tensor[batch,feature_sel])
    

# Get observed data to CPU and plot
data = [row.to('cpu').detach().numpy() for row in states]
# Transpose the data to get 5 traces
traces = list(zip(*data))

# Create the figure and add traces
fig = go.Figure()

# Plotting
for i, trace in enumerate(traces):
    model_trace = go.Scatter(y=trace, mode='lines', name=f'Modelled Feature {feature_sel[i]}')
    fig.add_trace(model_trace)
    
# Try with changing one X in memory
states = []
state_tensor = X[:,0,:]
states.append(state_tensor[batch,feature_sel])
batch_size = X.shape[0]
transition_tensor = torch.softmax(torch.zeros(batch_size, n_features, n_features), dim = 1).to("cuda")
for step_state in range(1,max_steps):
    state_tensor, transition_tensor = model.step_coil(state_tensor, transition_tensor)
    if step_state <= prediction_step:
        state_tensor = X[:,step_state,:]
        
    # change a single state in memory:
    if step_state == step_modify:
        state_tensor = X[:,step_state,:] * 1.1
    states.append(state_tensor[batch,feature_sel])


# Get observed data to CPU and plot
data = [row.to('cpu').detach().numpy() for row in states]
# Transpose the data to get 5 traces
traces = list(zip(*data))

# Plotting
for i, trace in enumerate(traces):
    obs_trace = go.Scatter(y=trace, mode='lines', name=f'Altered Feature {feature_sel[i]}')
    fig.add_trace(obs_trace)
    
# Add Line highlighting when the coil becomes responsible for its own predictions
fig.add_vline(x=prediction_step, line_width=3, line_dash="dash", line_color="grey", annotation_text='Point of Prediction')    

# Add title and labels
fig.update_layout(title=f'Coil Memory Demonstration',
                    xaxis_title='Timestep',
                    yaxis_title='Value')


fig.add_vrect(x0=step_modify-1, x1=step_modify+1, 
              annotation_text="altered time", annotation_position="top left",
              fillcolor="grey", opacity=0.25, line_width=0)

# Show the figure
fig.show()

# Coil Prediction Testing

In [54]:
# Take original timeseries and bring in data we haven't seen before
df_test = df_orig.iloc[:5000,:]

# Take our previously fit coil normalizer and apply it to the new data
coilnormed_df_test = coilnormer.normalize(df_test, fit_change=False)

# Generate and segment the time series
series = coilnormed_df_test.values
length = 36

series_x = series[:-1,]
segments_x = segment_time_series(series_x, length)

# Convert to tensors
segments_tensor_x = torch.tensor(segments_x, dtype=torch.float)

# Prepare inputs and targets
X = segments_tensor_x.to("cuda")

# Get number of features and batch size
n_features = X.shape[2]
batch_size = X.shape[0]

print("Available Batches: ", batch_size)

Available Batches:  138


In [55]:
states = []
# Select the batch we want to make predictions for
batch = 75

# After what step do we want the coil to start making its own predictions?
prediction_step = 24

# What features do we want to plot?
feature_sel = [0,17]

# Grab starting state tensor
state_tensor = X[:,0,:]

# How many steps do we want to run the coil overall?
max_steps = 45

states.append(state_tensor[batch,feature_sel])
batch_size = X.shape[0]
transition_tensor = torch.softmax(torch.zeros(batch_size, n_features, n_features), dim = 1).to("cuda")
for step_state in range(1,max_steps):
    state_tensor, transition_tensor = model.step_coil(state_tensor, transition_tensor)
    if step_state <= prediction_step:
        state_tensor = X[:,step_state,:]
    states.append(state_tensor[batch,feature_sel])
    print(sum(state_tensor[batch,:]))
    
# Move state dynamics to CPU
data = [row.to('cpu').detach().numpy() for row in states]
traces = list(zip(*data))

# Create the figure and add traces
fig = go.Figure()

# Plotting
for i, trace in enumerate(traces):
    model_trace = go.Scatter(y=trace, mode='lines', name=f'Modelled Feature {feature_sel[i]}')
    fig.add_trace(model_trace)
    
    
# Get observed data to CPU and plot
data = X[batch,:,feature_sel].to('cpu')
# Transpose the data to get 5 traces
traces = list(zip(*data))

# Plotting
for i, trace in enumerate(traces):
    obs_trace = go.Scatter(y=trace, mode='lines', name=f'Observed Feature {feature_sel[i]}')
    fig.add_trace(obs_trace)
    
# Add Line highlighting when the coil becomes responsible for its own predictions
fig.add_vline(x=prediction_step, line_width=3, line_dash="dash", line_color="grey", annotation_text='Point of Prediction')    

# Add title and labels
fig.update_layout(title=f'Coil Testing Modelled vs Observed Comparison',
                    xaxis_title='Timestep',
                    yaxis_title='Value')

# Show the figure
fig.show()

tensor(1., device='cuda:0')
tensor(1., device='cuda:0')
tensor(1.0000, device='cuda:0')
tensor(1., device='cuda:0')
tensor(1.0000, device='cuda:0')
tensor(1.0000, device='cuda:0')
tensor(1.0000, device='cuda:0')
tensor(1.0000, device='cuda:0')
tensor(1.0000, device='cuda:0')
tensor(1., device='cuda:0')
tensor(1.0000, device='cuda:0')
tensor(1.0000, device='cuda:0')
tensor(1.0000, device='cuda:0')
tensor(1.0000, device='cuda:0')
tensor(1.0000, device='cuda:0')
tensor(1.0000, device='cuda:0')
tensor(1.0000, device='cuda:0')
tensor(1.0000, device='cuda:0')
tensor(1.0000, device='cuda:0')
tensor(1., device='cuda:0')
tensor(1.0000, device='cuda:0')
tensor(1.0000, device='cuda:0')
tensor(1., device='cuda:0')
tensor(1.0000, device='cuda:0')
tensor(1., device='cuda:0', grad_fn=<AddBackward0>)
tensor(1., device='cuda:0', grad_fn=<AddBackward0>)
tensor(1.0000, device='cuda:0', grad_fn=<AddBackward0>)
tensor(1.0000, device='cuda:0', grad_fn=<AddBackward0>)
tensor(1.0000, device='cuda:0', grad_fn=

# Denormalization Checks

In [111]:
# We should be able to take any slice of the coilnormed timeseries and reproduce
start_index = 4000
length = 36
end_index = start_index + length
df_test_slice = df_test.iloc[start_index:end_index,:]
coilnormed_df_test_slice = coilnormer.normalize(df_test_slice, fit_change=False)
initial_value_slice = df_test.iloc[start_index,:]

# Generate and segment the time series
series = coilnormed_df_test_slice.values

series_x = series[:-1,]

# Convert to tensors
segments_tensor_x = torch.tensor(series_x, dtype=torch.float).unsqueeze(0)

# Prepare inputs and targets
X = segments_tensor_x.to("cuda")


states = []
# Select the batch we want to make predictions for
batch = 0

# After what step do we want the coil to start making its own predictions?
prediction_step = 24

# Grab starting state tensor
state_tensor = X[:,0,:]

# How many steps do we want to run the coil overall?
max_steps = length - 1

states.append(state_tensor[batch,:])
batch_size = X.shape[0]
transition_tensor = torch.softmax(torch.zeros(batch_size, n_features, n_features), dim = 1).to("cuda")
for step_state in range(1,max_steps):
    state_tensor, transition_tensor = model.step_coil(state_tensor, transition_tensor)
    if step_state <= prediction_step:
        state_tensor = X[:,step_state,:]
    states.append(state_tensor[batch,:])
    
# Move state dynamics to CPU
data = [row.to('cpu').detach().numpy() for row in states]

for index, dat_in in enumerate(data):
    coilnormed_df_test_slice.iloc[index,:] = dat_in
    
denormed_slice = coilnormer.denormalize(coilnormed_df_test_slice,initial_value_slice)


# Sample data creation
df1 = denormed_slice
df2 = df_test_slice

# Plotting
fig = go.Figure()

# Add traces for the first dataframe
for column in df1.columns:
    fig.add_trace(go.Scatter(x=df2.index, y=df1[column], mode='lines', name=f'Denormed: {column}'))

# Add traces for the second dataframe
for column in df2.columns:
    fig.add_trace(go.Scatter(x=df2.index, y=df2[column], mode='lines', name=f'Orig: {column}'))

# Update layout
fig.update_layout(title='Model Denormalized Testing',
                  xaxis_title='Date',
                  yaxis_title='Value',
                  legend_title='Legend',
                  hovermode='x unified')

# Show the plot
fig.show()
