In [2]:
!pip list | grep qiskit

qiskit                        0.46.1
qiskit-aer                    0.13.3
qiskit-ibm-runtime            0.20.0
qiskit-terra                  0.46.1


In [1]:
import math
import pdb
import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.utils.rnn import pad_sequence
import torchquantum as tq
import qiskit_aer


In [2]:
from torchquantum.plugin.qiskit import tq2qiskit

In [3]:
import torchquantum.functional as tqf
import argparse
import tqdm
import time

import torch
import torch.nn.functional as F
#import torchtext.legacy
#from torchtext.legacy import data, datasets, vocab

In [4]:
class MultiHeadAttentionBase(nn.Module):
    def __init__(self,
                 embed_dim: int,
                 num_heads: int,
                 dropout: float = 0.1,
                 mask=None,
                 use_bias=False):
        super(MultiHeadAttentionBase, self).__init__()

        assert embed_dim % num_heads == 0, f"Embedding dimension ({embed_dim}) should be divisible by number of heads ({num_heads})"

        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.d_k = embed_dim // num_heads  # projection dimensions
        self.k_linear = None
        self.q_linear = None
        self.v_linear = None
        self.combine_heads = None
        self.dropout = nn.Dropout(dropout)
        self.attn_weights = None
    
    def separate_heads(self, x):
        '''
        split into N heads
        from (batch_size, seq_len, embed_dim)
        to   (batch_size, seq_len, num_heads, embed_dim)
        then transpose (1,2) to (batch_size, num_heads, seq_len, embed_dim)
        to make mat mult straightforward for each head
        '''
        batch_size = x.size(0)
        x = x.view(batch_size, -1, self.num_heads, self.d_k)
        return x.transpose(1, 2)

    def attention(self, query, key, value, mask=None, dropout=None):
        '''
        Attention(Q, K, V) = softmax(Q K^T / sqrt(d_k))V
        '''
        scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(self.d_k)
        # see also: https://tensorchiefs.github.io/dlday2018/tutorial/einsum.html
        #scores = torch.einsum('bijh, bkjh -> bikh', query, key) / math.sqrt(self.d_k)
        if mask is not None:
            mask = mask.unsqueeze(1)
            scores = scores.masked_fill(mask == 0, -1e9)
        scores = F.softmax(scores, dim=-1)
        if dropout is not None:
            scores = dropout(scores)
        attn = torch.matmul(scores, value)
        return attn, scores
    
    def downstream(self, query, key, value, batch_size, mask=None):
        Q = self.separate_heads(query)
        K = self.separate_heads(key)
        V = self.separate_heads(value)

        x, self.attn_weights = self.attention(Q, K, V, mask, dropout=self.dropout)

        concat = x.transpose(1, 2).contiguous().view(batch_size, -1, self.embed_dim)

        return concat
        # output = self.combine_heads(concat)
        # return output

   # def forward(self, x, mask=None):
    #    raise NotImplementedError("Base class does not execute forward function.")
        
        


In [5]:
class MultiHeadAttentionClassical(MultiHeadAttentionBase):
    
    def __init__(self, embed_dim: int,
                 num_heads: int,
                 dropout=0.1,
                 mask=None,
                 use_bias=False):
        super(MultiHeadAttentionClassical, self).__init__(embed_dim=embed_dim, num_heads=num_heads, dropout=dropout, mask=mask, use_bias=use_bias)

        self.k_linear = nn.Linear(embed_dim, embed_dim, bias=use_bias)
        self.q_linear = nn.Linear(embed_dim, embed_dim, bias=use_bias)
        self.v_linear = nn.Linear(embed_dim, embed_dim, bias=use_bias)
        self.combine_heads = nn.Linear(embed_dim, embed_dim, bias=use_bias)
    
    def forward(self, x, mask=None):
        batch_size, seq_len, embed_dim = x.size()
        assert embed_dim == self.embed_dim, f"Input embedding ({embed_dim}) does not match layer embedding size ({self.embed_dim})"

        K = self.k_linear(x)
        Q = self.q_linear(x)
        V = self.v_linear(x)

        x = self.downstream(Q, K, V, batch_size, mask)
        output = self.combine_heads(x)
        return output

In [6]:
class QLayer(tq.QuantumModule):
        def __init__(self, n_qbits):
            super().__init__()    
            self.n_wires = n_qbits
            self.encoder = tq.GeneralEncoder(
                    [{'input_idx': [i], 'func': 'rx', 'wires': [i]} for i in range(self.n_wires)])
            self.rx_list = [tq.RX(has_params=True, trainable=True) for _ in range(self.n_wires)]
            self.measure = tq.MeasureAll(tq.PauliZ)
        def forward (self, x, q_device: tq.QuantumDevice):
            #self.q_device = q_device
            self.encoder(q_device, x)
            for k in range(self.n_wires):
                 self.rx_list[k](q_device, wires=k)
            for k in range(self.n_wires):
                if k==self.n_wires-1:
                    tqf.cnot(q_device, wires=[k, 0]) 
                else:
                    tqf.cnot(q_device, wires=[k, k+1])
            q_device = q_device.bfloat16() 
            #print((self.measure(self.q_device))) 
            return(self.measure(q_device))

In [7]:
q_layer = QLayer(4)

In [8]:
class MultiHeadAttentionQuantum(MultiHeadAttentionBase):
    
            
    def __init__(self,
                 embed_dim: int,
                 num_heads: int,
                 dropout=0.1,
                 mask=None,
                 use_bias=False,
                 n_qubits: int = 4,
                 n_qlayers: int = 1,
                 q_device="default.qubit"):
        super(MultiHeadAttentionQuantum, self).__init__(embed_dim, num_heads, dropout=dropout, mask=mask, use_bias=use_bias)
        
        # todo: add intermediate layer to "dress" quantum circuit
        assert n_qubits == embed_dim, "Number of qubits ({n_qubits}) does not match embedding dim ({embed_dim})"
        self.n_qubits = n_qubits
        #self.n_qlayers = n_qlayers
        self.q_layer = QLayer(n_qubits)
        #self.measure = tq.MeasureAll(tq.PauliZ)
        self.q_device = q_device
    def forward(self, x, mask=None):
        batch_size, seq_len, embed_dim = x.size()
        assert embed_dim == self.embed_dim, f"Input embedding ({embed_dim}) does not match layer embedding size ({self.embed_dim})"
        q_dev = tq.QuantumDevice(n_wires=self.n_qubits, device=self.q_device, bsz=x.shape[0])
        K = [self.q_layer(x[:, t, :].clone(),q_dev) for t in range(seq_len)]
        Q = [self.q_layer(x[:, t, :].clone(),q_dev) for t in range(seq_len)]
        V = [self.q_layer(x[:, t, :].clone(),q_dev) for t in range(seq_len)]
        K = torch.Tensor(pad_sequence(K))
        Q = torch.Tensor(pad_sequence(Q))
        V = torch.Tensor(pad_sequence(V))
        x = self.downstream(Q, K, V, batch_size, mask)
        output = [self.q_layer(x[:, t, :],q_dev) for t in range(seq_len)]
        output = torch.Tensor(pad_sequence(output)).clone()
        return output


In [9]:
EMBED_DIM = 8

BATCH_SIZE = 4

In [10]:
classical_module = MultiHeadAttentionClassical(embed_dim=EMBED_DIM, num_heads=4, dropout=0.0)
quantum_module = MultiHeadAttentionQuantum(embed_dim=EMBED_DIM, num_heads=4, dropout=0.0, n_qubits=EMBED_DIM, q_device="cuda")

In [11]:
test_input = torch.tensor(np.random.rand(BATCH_SIZE, 20, EMBED_DIM), dtype=torch.float32)

In [12]:
output = classical_module(test_input)

In [13]:
output.shape

torch.Size([4, 20, 8])

In [14]:
output_q = quantum_module(test_input)

In [15]:
output_q.shape

torch.Size([4, 20, 8])