In [None]:
!pip install pandas-ta
!pip install qiskit
!pip install yfinance
#!pip install TA-Lib
!pip install qiskit_machine_learning
!pip install sklearn

In [497]:
#@title Imports
import torch
from torch.autograd import Function
from torchvision import datasets, transforms
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
from torchsummary import summary

import qiskit
from qiskit import transpile, assemble
from qiskit.visualization import *
from qiskit.circuit.random import random_circuit

import pandas as pd
import yfinance as yf
import matplotlib.pyplot as plt
import pandas_ta as ta
import itertools
import numpy as np
from tqdm.auto import tqdm
from torch.utils.data import DataLoader, Dataset
from sklearn.preprocessing import StandardScaler, MinMaxScaler
import torch
from torch.utils.data.dataset import Subset
from torch import Tensor
import math
from qiskit.circuit import Parameter
from qiskit.compiler import transpile

In [498]:
#@title Data Processing
def is_sqrt(n):
    if n > 0:
        x = 1 << (n.bit_length() + 1 >> 1)
        while True:
            y = (x + n // x) >> 1
            if y >= x:
                return x**2 == n
            x = y
    elif n == 0:
        return 0
    else:
        raise ValueError("square root not defined for negative numbers")

class IndicatorDataset(Dataset):
  def __init__(self, ticker: str, start_date: str, end_date: str,  device, dataset: pd.DataFrame=None, window_size=11) -> None:  # , training_length:int, forecast_window:int
        self.data = dataset
        #self.scaler = StandardScaler()
        self.transform = MinMaxScaler(feature_range=(-1, 1))

        self.device = device
        self.window_size = window_size
        self.ticker = ticker
        self.start_date = start_date
        self.end_date = end_date
        self.data: pd.DataFrame= self._get_data(self.ticker, self.start_date, self.end_date)# if dataset == None else dataset
        self.labels: np.ndarray = self._create_labels()[int(window_size/2+28):-int(window_size/2+28)]
  
  def _get_data(self, ticker: str, start_date:str, end_date:str):
    df = yf.download(ticker, start_date, end_date)
    length = list({"length": i} for i in range(6,20+1))
    indicators = [{"kind": "rsi"}, {"kind" : "willr"}, {"kind": "fwma"}, {"kind": "ema"}, {"kind": "sma"}, {"kind": "hma"}, {"kind": "tema"}, {"kind": "cci"},
                   {"kind": "cmo"}, {"kind": "inertia"}, {"kind": "pgo"}, {"kind": "roc"}, {"kind": "cmf"}, {"kind": "mom"}, {"kind": "psl"}]
    ta_s = [{**indicator,**length[i]} for indicator, i in itertools.product(indicators, range(len(indicators)))]
    window_size=11
    MyStrategy = ta.Strategy(name="15x15", ta=ta_s)
    df.ta.strategy(MyStrategy) 
    return df

  def _create_labels(self, window_size=11): #df, col_name,
    df = self.data
    col_name = "Close"
    row_counter = 0
    total_rows = len(df)
    labels = np.zeros(total_rows)
    labels[:] = np.nan
    print("Calculating labels")
    pbar = tqdm(total=total_rows)

    while row_counter < total_rows:
      if row_counter >= self.window_size - 1:
          window_begin = row_counter - (self.window_size - 1)
          window_end = row_counter
          window_middle = int((window_begin + window_end) / 2)
          min_ = np.inf
          min_index = -1
          max_ = -np.inf
          max_index = -1
          for i in range(window_begin, window_end + 1):
              price = df.iloc[i][col_name]
              if price < min_:
                  min_ = price
                  min_index = i
              if price > max_:
                  max_ = price
                  max_index = i

          if max_index == window_middle:
              labels[window_middle] = 0
          elif min_index == window_middle:
              labels[window_middle] = 1
          else:
              labels[window_middle] = 2

      row_counter = row_counter + 1
      pbar.update(1)
    pbar.close()
    return labels

  def plot_data(self):
    plt.figure(figsize=(10,7))
    plt.xlabel('Date')
    plt.ylabel('Price')
    plt.plot(self.data['Close'],lw=1, label='Close Price')
    #plt.plot(data['Close'],lw=1, label='Close Price')
    idx = 6
    for column in self.data:
      if idx%15 == 6:
        plt.plot(self.data[column], lw=1, label=column)
      idx += 1
    plt.legend()
    plt.show()

  def _transform_features(self) -> np.ndarray:
    # Scale features:
    scaler = self.transform
    scaler.fit(self.data.iloc[int(self.window_size/2+9):-int(self.window_size/2), 6:].values)
    scaled_features = scaler.transform(self.data.iloc[int(self.window_size/2+28):-int(self.window_size/2+28), 6:].values)
    return scaled_features

  def __len__(self) -> int:
        return len(self.labels)
        #return self.data.shape[0]
  
  def __getitem__(self, idx: int) -> Tensor:
    squarable_datapoint = self._transform_features()[idx]
    assert is_sqrt(len(squarable_datapoint)) 
    len_square = int(math.sqrt(len(squarable_datapoint)))
    squared_datapoint = squarable_datapoint
    squared_datapoint = torch.from_numpy(squarable_datapoint).reshape(len_square, len_square)
    label_position = int(self.labels[idx])
    label= np.zeros(3)
    label[label_position] = 1
    label = torch.from_numpy(label)
    #label = torch.tensor(self.labels[idx])

    return squared_datapoint.unsqueeze(0).float(), label.float()


In [539]:
#@title Dataloader
batch_size = 1
shuffle = True
drop_last = True
num_workers = 0
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
train_test_ratio = 0.8
# [FULL_SEQUENCE, NUM_SEQUENCES]
# 30PRI 120 AFTER
full_dataset = IndicatorDataset("GOOGL", "2015-01-01", "2022-09-30", device=device)


train_size = int(train_test_ratio * len(full_dataset))
test_size = len(full_dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(
    full_dataset, [train_size, test_size]
)
train_dataloader = DataLoader(
    train_dataset,
    batch_size=batch_size,
    shuffle=shuffle,
    drop_last=drop_last,
    num_workers=num_workers,
)
test_dataloader = DataLoader(
    test_dataset,
    batch_size=batch_size,
    shuffle=shuffle,
    drop_last=drop_last,
    num_workers=num_workers,
)



[*********************100%***********************]  1 of 1 completed
Calculating labels


  0%|          | 0/1950 [00:00<?, ?it/s]

In [500]:
#@title QuantumCircuit 
class QuantumCircuit1:
    """ 
    This class provides a simple interface for interaction 
    with the quantum circuit 
    """
    
    def __init__(self, n_qubits, backend, shots):
        # --- Circuit definition ---
        self._circuit = qiskit.QuantumCircuit(n_qubits)
        
        all_qubits = [i for i in range(n_qubits)]
        self.theta = qiskit.circuit.Parameter('theta')
        
        self._circuit.h(all_qubits)
        self._circuit.barrier()
        self._circuit.ry(self.theta, all_qubits)
        
        self._circuit.measure_all()
        # ---------------------------

        self.backend = backend
        self.shots = shots
    
    def run(self, thetas):
        t_qc = transpile(self._circuit,
                         self.backend)
        qobj = assemble(t_qc,
                        shots=self.shots,
                        parameter_binds = [{self.theta: theta} for theta in thetas])
        job = self.backend.run(qobj)
        result = job.result().get_counts()
        
        counts = np.array(list(result.values()))
        states = np.array(list(result.keys())).astype(float)
        
        # Compute probabilities for each state
        probabilities = counts / self.shots
        # Get state expectation
        expectation = np.sum(states * probabilities)
        
        return np.array([expectation])

In [501]:
#@title QCircuit
class QuantumVCircuit:
    def __init__(self, kernel_size, backend, shots, threshold):    
      '''
          the quantum circuit class to execute the convolution operation
          args:
              kernel_size   =   size of (same as classical conv) kernel/ the image dimension patch
              backend       =   the quantum computer hardware to use (only simulator is used here)
              shots         =   how many times to run the circuit to get a probability distribution
              threshold     =   the threshold value (0-255) to assign theta
      '''
      # the number of qubits needed
      self.n_qubits = kernel_size ** 2
      # initiate the circuit
      self._circuit = qiskit.QuantumCircuit(self.n_qubits)
      # parameters
      self._params = [Parameter('θ_{}'.format(i)) for i in range(self.n_qubits)]

      for i in range(self.n_qubits):
          self._circuit.rx(self._params[i], i)
      
      self._circuit.barrier()
      # add unitary random circuit
      self._circuit += random_circuit(self.n_qubits, 2)
      self._circuit.measure_all()
      # initialize
      self.backend   = backend
      self.shots     = shots
      self.threshold = threshold

    def run(self, data):
      # reshape input data-> [1, kernel_size, kernel_size] -> [1, self.n_qubits]
      data = torch.reshape(data, (1, self.n_qubits))
      # encoding data to parameters
      thetas = []
      for dat in data:
          theta = []
          for val in dat:
              if val > self.threshold:
                  theta.append(np.pi)
              else:
                  theta.append(0)
          thetas.append(theta)
      # for binding parameters
      param_dict = dict()
      for theta in thetas:
          for i in range(self.n_qubits):
              param_dict[self._params[i]] = theta[i]
      param_binds = [param_dict]

      # execute random quantum circuit
      result = qiskit.execute(self._circuit, 
                              self.backend, 
                              shots = self.shots, 
                              parameter_binds = param_binds).result().get_counts()
          
      # decoding the result
      counts = 0
      for key, val in result.items():
          cnt = sum([int(char) for char in key])
          counts += cnt * val

      # Compute probabilities for each state
      probabilities = counts / (self.shots * self.n_qubits)
      
      return probabilities

In [None]:
#@title Draw Circuits
simulator = qiskit.Aer.get_backend('aer_simulator')

circuit = QuantumCircuit1(1, simulator, 100)
print('Expected value for rotation pi {}'.format(circuit.run([np.pi])[0]))
circuit._circuit.draw()

#backend = qiskit.providers.aer.QasmSimulator(method = "statevector_gpu")
filter_size = 3
circ = QuanvolutionCircuit(filter_size, simulator, 100, 127)
data = torch.tensor([[0, 200], [100, 255]])

#print(data.size())
#print(circ.run(data))

circ._circuit.draw()


In [516]:
#@title Quantum Dense Layer
class HybridFunction(Function):
    """ Hybrid quantum - classical function definition """
    
    @staticmethod
    def forward(ctx, inputs, quantum_circuit, shift):
        """ Forward pass computation """
        ctx.shift = shift
        ctx.quantum_circuit = quantum_circuit

        expectation_z = []
        for input in inputs:
            expectation_z.append(ctx.quantum_circuit.run(input.tolist()))
        result = torch.tensor(expectation_z).cuda()
        
        ctx.save_for_backward(inputs, result)
        return result
        
    @staticmethod
    def backward(ctx, grad_output):
        """ Backward pass computation """
        input, expectation_z = ctx.saved_tensors
        input_list = np.array(input.tolist())
        
        shift_right = input_list + np.ones(input_list.shape) * ctx.shift
        shift_left = input_list - np.ones(input_list.shape) * ctx.shift
        
        gradients = []
        for i in range(len(input_list)):
            expectation_right = ctx.quantum_circuit.run(shift_right[i])
            expectation_left  = ctx.quantum_circuit.run(shift_left[i])
            
            gradient = torch.tensor([expectation_right]).cuda() - torch.tensor([expectation_left]).cuda()
            gradients.append(gradient)
        
        # gradients = np.array([gradients]).T
        gradients = torch.tensor([gradients]).cuda()
        gradients = torch.transpose(gradients, 0, 1)

        # return torch.tensor([gradients]).float() * grad_output.float(), None, None
        return gradients.float() * grad_output.float(), None, None

class Hybrid(nn.Module):
    """ Hybrid quantum - classical layer definition """
    
    def __init__(self, backend, shots, shift):
        super(Hybrid, self).__init__()
        self.quantum_circuit = QuantumCircuit1(1, backend, shots)
        self.shift = shift
        
    def forward(self, input):
        return HybridFunction.apply(input, self.quantum_circuit, self.shift)

In [504]:
#@title Quantum Convolution Layer

#Custom forward/backward pass for Pytorch
class QuanvFunction(Function):
    @staticmethod
    def forward(ctx, inputs, in_channels, out_channels, kernel_size, quantum_circuits, shift, verbose=True):
        """
           input  shape : (batch_size, feature_size, length, length)
           otuput shape : (batch_size, feature_size', length, length')
        """
        ctx.in_channels      = in_channels
        ctx.out_channels     = out_channels
        ctx.kernel_size      = kernel_size
        ctx.quantum_circuits = quantum_circuits
        ctx.shift            = shift

        # adjust length 
        _, _, len_x, len_y = inputs.size()
        len_x = len_x - kernel_size + 1
        len_y = len_y - kernel_size + 1
        
        # this calculates the conv results for nxn patches
        features = []
        ## loop over the images
        for input in inputs:
            feature = []
            ## loop over the circuits
            for circuit in quantum_circuits:
                # save the results
                xys = []
                for x in range(len_x):
                    ys = []
                    for y in range(len_y):
                        # get the patches
                        data = input[0, x:x+kernel_size, y:y+kernel_size]
                        # store the results
                        res=circuit.run(data)
                        ys.append(res)
                    xys.append(ys)
                feature.append(xys)
            features.append(feature)
        # construct the tensor
        result = torch.tensor(features)

        ctx.save_for_backward(inputs, result)
        return result
        
    @staticmethod
    def backward(ctx, grad_output): 
        input, expectation_z = ctx.saved_tensors
        input_list = np.array(input.tolist())
        
        shift_right = input_list + np.ones(input_list.shape) * ctx.shift
        shift_left = input_list - np.ones(input_list.shape) * ctx.shift
        
        gradients = []
        for i in range(len(input_list)):
            expectation_right = ctx.quantum_circuit.run(shift_right[i])
            expectation_left  = ctx.quantum_circuit.run(shift_left[i])
            
            gradient = torch.tensor([expectation_right]) - torch.tensor([expectation_left])
            gradients.append(gradient)
        gradients = np.array([gradients]).T
        return torch.tensor([gradients]).float() * grad_output.float(), None, None


#The actual Quantum convolutional layer
class Quanv(nn.Module):
    def __init__(self, 
                 in_channels, 
                 out_channels, 
                 kernel_size, 
                 backend=qiskit.providers.aer.QasmSimulator(method = "statevector_gpu"), 
                 shots=100, 
                 shift=np.pi/2):
        
        super(Quanv, self).__init__()
        
        self.quantum_circuits = [QuantumVCircuit(kernel_size=kernel_size, 
                                          backend=backend, 
                                          shots=shots, 
                                          threshold=0.5) for i in range(out_channels)]
        self.in_channels  = in_channels
        self.out_channels = out_channels
        self.kernel_size  = kernel_size
        self.shift        = shift
        
    def forward(self, inputs):
        return QuanvFunction.apply(inputs, 
                                   self.in_channels, 
                                   self.out_channels, 
                                   self.kernel_size,
                                   self.quantum_circuits, 
                                   self.shift)

In [513]:
#@title Classical CNN

#CNN with Quantum Fully Connected Layer
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3)
        self.dropout = nn.Dropout2d()
        self.fc1 = nn.Linear(256, 64)
        self.fc2 = nn.Linear(64, 3)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = self.dropout(x)
        x = torch.flatten(x, start_dim=1)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x.float().cuda()

In [506]:
#@title CNN with Quantum Dense Layer
#CNN with Quantum Fully Connected Layer
class NetQF(nn.Module):
    def __init__(self):
        super(NetQF, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3)
        self.dropout = nn.Dropout2d()
        self.fc1 = nn.Linear(256, 64)
        self.fc2 = nn.Linear(64, 3)
        self.hybrid = [Hybrid(qiskit.Aer.get_backend('qasm_simulator'), 100, np.pi / 2) for i in range(3)]

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = self.dropout(x)
        x = torch.flatten(x, start_dim=1)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        x = torch.chunk(x, 3, dim=1)
        x = tuple([hy(x_) for hy, x_ in zip(self.hybrid, x)])
        return torch.cat(x, -1)

In [None]:
#@title CNN with Quantum C

class NetQC(nn.Module):
    def __init__(self):
        super(NetQC, self).__init__()
        self.quanv = Quanv(in_channels=1, out_channels=32, kernel_size=3) 
        self.conv = nn.Conv2d(32, 64, kernel_size=3)
        self.dropout = nn.Dropout2d()
        self.fc1 = nn.Linear(256, 64)
        self.fc2 = nn.Linear(64, 3)

    def forward(self, x):
        x = F.relu(self.quanv(x))
        x = F.max_pool2d(x, 2)
        x = F.relu(self.conv(x))
        x = F.max_pool2d(x, 2)
        x = self.dropout(x)
        x = torch.flatten(x, start_dim=1)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.softmax(x)

In [517]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = NetQF().to(device)
summary(model, (1, 15, 15))


----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 32, 13, 13]             320
            Conv2d-2             [-1, 64, 4, 4]          18,496
         Dropout2d-3             [-1, 64, 2, 2]               0
            Linear-4                   [-1, 64]          16,448
            Linear-5                    [-1, 3]             195
Total params: 35,459
Trainable params: 35,459
Non-trainable params: 0
----------------------------------------------------------------
Input size (MB): 0.00
Forward/backward pass size (MB): 0.05
Params size (MB): 0.14
Estimated Total Size (MB): 0.19
----------------------------------------------------------------


In [None]:
#@title Training

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

optimizer = optim.Adam(model.parameters(), lr=0.001)
loss_func = nn.CrossEntropyLoss()

epochs = 50
loss_list = []
#BUY => 1, SELL => 0, HOLD => 2
model.train()
for epoch in range(epochs):
    total_loss = []
    for batch_idx, (data, target) in enumerate(train_dataloader):
        optimizer.zero_grad()

        data = data.to(device)
        target = target.to(device)
      
        output = model(data)
        loss = loss_func(output, target)
        loss.backward()
        optimizer.step()
        
        total_loss.append(loss.item())
    loss_list.append(sum(total_loss)/len(total_loss))
    print('Training [{:.0f}%]\tLoss: {:.4f}'.format(
        100. * (epoch + 1) / epochs, loss_list[-1]))
    
path_to_save_model = "/content/"
experiment = 1
torch.save(model.state_dict(),f"{path_to_save_model}best_train_experiment{experiment}.pth")

In [None]:
#@title Plot Training
plt.plot(loss_list)
plt.title('Hybrid NN Training Convergence')
plt.xlabel('Training Iterations')
plt.ylabel('Neg Log Likelihood Loss')


In [None]:
#@title Evaluation
best_model_path = "/content/best_train_experiment1.pth"
model = Net().to(device)
model.load_state_dict(torch.load(best_model_path))
model.eval()
with torch.no_grad():
    
    correct = 0
    for batch_idx, (data, target) in enumerate(test_dataloader):
        data = data.float().cuda()
        target = target.float().cuda()

        output = model(data).cuda()
        pred = output.argmax(dim=1, keepdim=True)
        #print(pred)
 
        #rint(target.argmax(dim=1, keepdim=True))
        correct += pred.eq(target.argmax(dim=1, keepdim=True)).sum().item()
        loss = loss_func(output, target)
        total_loss.append(loss.item())
        
    print('Performance on test data:\n\tLoss: {:.4f}\n\tAccuracy: {:.1f}%'.format(
        sum(total_loss) / len(total_loss),
        correct / len(test_dataloader) * 100 / batch_size)
        )

In [556]:
from sklearn.metrics import confusion_matrix, roc_auc_score, cohen_kappa_score
import seaborn as sns

best_model_path = "/content/best_train_experiment1.pth"
model = torch.load(best_model_path)
model.eval()

predictions = []
true = []
for square, label in test_dataloader:
  predictions.append(model(square.float().to(device)).argmax(dim=1, keepdim=True).item())
  true.append(label.argmax(dim=1, keepdim=True).item())

confusion_matrix(true, predictions)

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [576]:
from sklearn.metrics import confusion_matrix, roc_auc_score, cohen_kappa_score
import sklearn

f1_weighted = sklearn.metrics.f1_score(true, predictions, labels=None, 
         average='weighted', sample_weight=None)
print("F1 score (weighted)", f1_weighted)
print("F1 score (micro)", sklearn.metrics.f1_score(true, predictions, labels=None, 
         average='micro', sample_weight=None)) 
print("cohen's Kappa", sklearn.metrics.cohen_kappa_score(true, predictions))

conf_mat = sklearn.metrics.confusion_matrix(true, predictions)
print(conf_mat)


recall = []
for i, row in enumerate(conf_mat):
    recall.append(np.round(row[i]/np.sum(row), 2))
    print("Recall of class {} = {}".format(i, recall[i]))
print("Recall avg", sum(recall)/len(recall))



F1 score (weighted) 0.8144929172075174
F1 score (micro) 0.870026525198939
cohen's Kappa -0.006867607783288765
[[  0   0  31]
 [  0   0  16]
 [  0   2 328]]
Recall of class 0 = 0.0
Recall of class 1 = 0.0
Recall of class 2 = 0.99
Recall avg 0.33
