# Using **Quantum** Implementation for Recurrent Neural Network For Predicting the Evolution of A Damped Harmonic Oscillator

In [1]:
import os
import helpers
import numpy as np
from IPython import display
from base64 import b64encode

## Get Data

In [2]:
training, testing = helpers.get_dataset('dataset')
training.head

<bound method NDFrame.head of            time  displacement      velocity  acceleration
0      0.000000  1.000000e+00  0.000000e+00 -1.000000e+00
1      1.507538  1.067009e-01 -9.265856e-01 -1.404238e-02
2      3.015075 -8.471749e-01 -1.118788e-01  8.583629e-01
3      4.522613 -1.940599e-01  7.834100e-01  1.157189e-01
4      6.030151  7.051903e-01  1.908139e-01 -7.242717e-01
..          ...           ...           ...           ...
195  293.969850 -6.679955e-08  4.094007e-07  2.585948e-08
196  295.477400  3.721939e-07  6.650827e-08 -3.788447e-07
197  296.984920  1.005445e-07 -3.442146e-07 -6.612302e-08
198  298.492460 -3.088807e-07 -9.734748e-08  3.186155e-07
199  300.000000 -1.226690e-07  2.854318e-07  9.412586e-08

[200 rows x 4 columns]>

## Visualize Data

In [3]:
anim_vid = 'oscillator_vid.mp4'

if not os.path.exists(anim_vid):
    helpers.create_animation(training, anim_vid)

In [4]:
mp4 = open(anim_vid,'rb').read()
video_url = f"data:video/mp4;base64,{b64encode(mp4).decode()}"
display.HTML(
    """
      <video width=700 controls>
        <source src="%s" type="video/mp4">
      </video>
    """
% video_url)

## Feature Extraction

In [5]:
import params

In [6]:
# df_length = len(training)

# boundary = int(df_length * 0.8)
non_features = ['time', 'velocity', 'acceleration']
train_df = training.drop(non_features, axis = 1)[:80]
val_df = training.drop(non_features, axis = 1)[80:100]
test_df = testing.drop(non_features, axis=1)[:50]

In [7]:
train_ds = helpers.time_window_batch(train_df,params.windon_length,  1)
val_ds = helpers.time_window_batch(val_df,params.windon_length, 1) # validation dataset
test_ds = helpers.time_window_batch(test_df,params.windon_length, 1)

## Training

In [None]:
from model import QuantumModel
import torch.optim as optim
import torch.nn as nn
import torch

from tqdm import tqdm
import matplotlib.pyplot as plt
from torchmetrics import MeanAbsoluteError, MeanSquaredError

In [10]:
model = QuantumModel(params.n_qubits, params.input_size, params.num_hiddens, params.batch_size, params.target_size)
loss_function = nn.MSELoss()
optimizer = optim.RMSprop(model.parameters(), lr=0.01, momentum=0.2)

model.qrnn.classical_layer_in.requires_grad_(False)
model.qrnn.classical_layer_out.requires_grad_(False)

  self._weights.data = torch.tensor(initial_weights, dtype=torch.float)


Linear(in_features=3, out_features=4, bias=False)

In [11]:
history = params.history

for epoch in range(params.n_epochs):
    train_losses = []
    preds = []
    targets = []
    model.train()
    
    for i,X in enumerate(tqdm(train_ds)):
        if i == len(train_ds) - 1:
            break
        
        # Step 1. Remember that Pytorch accumulates gradients.
        # We need to clear them out before each instance
        model.zero_grad()
        
        # Step 3. Run our forward pass.
        X_in=torch.Tensor(X[0]).reshape((params.windon_length, params.batch_size, params.input_size))
        out_scores = model(X_in)[-1]

        # Step 4. Compute the loss, gradients, and update the parameters by
        label = torch.Tensor(np.array([X[1]])).reshape((params.batch_size,1))
        loss = loss_function(out_scores, label)
        loss.backward()
        optimizer.step()
        train_losses.append(float(loss))

        preds.append(torch.Tensor(out_scores.detach()))
        targets.append(label)
        
    valid_losses = []
    model.eval()

    for i,X in enumerate(tqdm(val_ds)):
        if i ==len(val_ds) - 1:
            break
        X_in=torch.Tensor(X[0]).reshape((params.windon_length, params.batch_size, params.input_size))
        try:
            out_scores = model(X_in)[-1]
        except Exception as e:
            print("There was a mistake in the qiskit code: ",e) 
            continue     
        label= torch.Tensor(np.array([X[1]])).reshape((params.batch_size,1))
        loss = loss_function(out_scores,label)
        valid_losses.append(float(loss))

    avg_loss_train = np.mean(train_losses)
    avg_loss_valid = np.mean(valid_losses)
    history['train_loss'].append(avg_loss_train)
    history['valid_loss'].append(avg_loss_valid)
    preds = torch.cat(preds)
    targets = torch.cat(targets)
    mean_abs_error = MeanAbsoluteError()
    mean_sqrt_error = MeanSquaredError()
    mse=mean_sqrt_error(preds, targets)
    history['mse'].append(mse)
    print(f"Epoch {epoch + 1} / {params.n_epochs}: Loss = {avg_loss_train:.3f} Valid Loss: {avg_loss_valid:.3f} MSE = {mse:.4f}")

 99%|█████████▊| 74/75 [27:13<00:22, 22.07s/it]
 93%|█████████▎| 14/15 [00:10<00:00,  1.38it/s]


Epoch 1 / 10: Loss = 0.025 Valid Loss: 0.000 MSE = 0.0249


 45%|████▌     | 34/75 [12:26<15:06, 22.11s/it]

In [None]:
predictions=np.array([])
test_labels=np.array([])
for i,y in enumerate(test_ds):
    test_tensor=torch.Tensor(y[0]).reshape((params.windon_length, params.batch_size, params.input_size))
    try:
        pred=model(test_tensor)[-1]
        predictions=np.append(predictions,pred.detach().numpy())
        test_labels= np.append(test_labels,torch.Tensor([y[1]]).reshape((params.batch_size,1)))
    except: 
        pred=1
        predictions=np.append(predictions,pred)
        test_labels= np.append(test_labels,torch.Tensor([y[1]]).reshape((params.batch_size,1)))
t_predictions=torch.from_numpy(predictions)
t_test_labels=torch.from_numpy(test_labels)


mean_sqrt_error = MeanSquaredError()
mse=mean_sqrt_error(t_predictions, t_test_labels)
print("MSE Test", mse)

In [None]:
plt.plot(np.arange(len(test_df[params.windon_length:])),test_df[params.windon_length:], color='blue', label='Test data')
plt.plot(np.arange(len(predictions)),predictions, color='red', label='Predicted data',linestyle='dashed')