### Imports

In [1]:
import yfinance as yf
import pandas as pd
import numpy as np

from sklearn.metrics import accuracy_score
from sklearn.preprocessing import MinMaxScaler
import pennylane.numpy as pnp
import pennylane as qml
from pennylane.optimize import AdamOptimizer

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader

from pennylane.templates import StronglyEntanglingLayers


### Checking Available Tickers

In [4]:

tickers = [
    "AKBNK.IS", "ARCLK.IS", "DOHOL.IS", "DYHOL.IS", "EREGL.IS", "FINBN.IS", "FORTS.IS",
    "GARAN.IS", "GSDHO.IS", "HURGZ.IS", "ISCTR.IS", "ISGYO.IS", "KCHOL.IS"
]

## -> data range 
start_train = "1998-01-05"
end_test = "2007-08-31"

## downloading
data = yf.download(
    tickers,
    start=start_train,
    end=end_test,
    progress=False
)

# only extracting available data 
available_tickers = []
missing_tickers = []

for ticker in tickers:
    try:
        if not data['Close'][ticker].dropna().empty:
            available_tickers.append(ticker)
        else:
            missing_tickers.append(ticker)
    except KeyError:
        missing_tickers.append(ticker)

available_tickers, missing_tickers


YF.download() has changed argument auto_adjust default to True



4 Failed downloads:
['FINBN.IS', 'DYHOL.IS']: YFPricesMissingError('possibly delisted; no price data found  (1d 1998-01-05 -> 2007-08-31)')
['ISCTR.IS']: YFPricesMissingError('possibly delisted; no price data found  (1d 1998-01-05 -> 2007-08-31) (Yahoo error = "Data doesn\'t exist for startDate = 883951200, endDate = 1188507600")')
['FORTS.IS']: HTTPError('HTTP Error 404: ')


(['AKBNK.IS',
  'ARCLK.IS',
  'DOHOL.IS',
  'EREGL.IS',
  'GARAN.IS',
  'GSDHO.IS',
  'HURGZ.IS',
  'ISGYO.IS',
  'KCHOL.IS'],
 ['DYHOL.IS', 'FINBN.IS', 'FORTS.IS', 'ISCTR.IS'])

### Computing Indicators 

In [5]:
def compute_indicators(df):
    df = df.copy()  # don't wanna mess up original

    # moving averages for trend
    df['MA14'] = df['Close'].rolling(14).mean()
    df['MA37'] = df['Close'].rolling(37).mean()

    # stochastic %K and %D
    low14 = df['Low'].rolling(14).min()
    high14 = df['High'].rolling(14).max()
    df['K14'] = 100 * (df['Close'] - low14) / (high14 - low14)
    df['D3'] = df['K14'].rolling(3).mean()

    # RSI
    delta = df['Close'].diff()
    gain = delta.clip(lower=0)
    loss = -delta.clip(upper=0)
    avg_gain = gain.rolling(14).mean()
    avg_loss = loss.rolling(14).mean()
    rs = avg_gain / avg_loss
    df['RSI14'] = 100 - (100 / (1 + rs))

    # direction label (up = 1, else 0)
    df['Direction'] = (df['Close'].diff() > 0).astype(int)

    # just drop the NaNs from all the rolling ops
    return df.dropna()

### Hybrid-QNN

Consist of a linear 3-3 layer of pre-processing, a 6 layer QML structure followed by classical MLP. 

In [None]:
def make_hybrid_binary_classifier(feature_dim, n_layers=6, lr=0.2, momentum=0.5):
    """
    Hybrid quantum-classical binary classifier.
    Inputs: feature_dim = number of classical features.
    Uses SGD + MSE, nothing fancy.
    """
    
    dev = qml.device("default.qubit", wires=feature_dim) # we use default.qubit - use lightning.qubit for speedup. 

    #  Defining the quantum circuit QNode - we use torch interface so we can use the torch optimizer
    @qml.qnode(dev, interface="torch")
    def qnode(inputs, weights):
        qml.AngleEmbedding(inputs, wires=range(feature_dim))
        qml.BasicEntanglerLayers(weights, wires=range(feature_dim))
        return [qml.expval(qml.PauliZ(i)) for i in range(feature_dim)]

    # Wraping everything in  Torch layer 
    weight_shapes = {"weights": (n_layers, feature_dim)}
    qlayer = qml.qnn.TorchLayer(qnode, weight_shapes)

    #  We now build the full model using nn.sequential now that we have defined our quantum layer
    # Classical -> Quantum -> Classical -> Sigmoid
    model = nn.Sequential(
        nn.Linear(feature_dim, feature_dim),  # classical prep layer
        qlayer,                               # quantum circuit
        nn.Linear(feature_dim, 1),            # squeeze to 1 output
        nn.Sigmoid()                          # squish to [0,1] for binary
    )

    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=momentum) # to be consistent with the benchmark ANN 

    loss_fn = nn.MSELoss() # again, to be consistent with the benchmark ANN. We did also try BCE but it doesn't really give any appreciable gains. 

    return model, optimizer, loss_fn


In [23]:


if __name__ == "__main__":
    TICKERS     = available_tickers
    START_TRAIN = "1998-01-05"
    END_TRAIN   = "2005-12-29"
    START_TEST  = "2006-01-06"
    END_TEST    = "2007-08-31"
    FEATURES    = ['RSI14', 'K14', 'D3']

    raw = yf.download(TICKERS, start=START_TRAIN, end=END_TEST, progress=False)
    results = []

    for ticker in TICKERS:
        df = pd.DataFrame({
            'Close':  raw['Close'][ticker],
            'High':   raw['High'][ticker],
            'Low':    raw['Low'][ticker],
            'Volume': raw['Volume'][ticker]
        }).dropna()

        df = compute_indicators(df)  # must add 'Direction' column (0/1)

        # Train/test split
        tr = df.index <= END_TRAIN
        te = df.index >= START_TEST

        X_tr = df.loc[tr, FEATURES].values.astype(np.float32)
        y_tr = df.loc[tr, 'Direction'].values.reshape(-1,1).astype(np.float32)
        X_te = df.loc[te, FEATURES].values.astype(np.float32)
        y_te = df.loc[te, 'Direction'].values.reshape(-1,1).astype(np.float32)

        # Scale
        scaler = MinMaxScaler()
        X_tr = scaler.fit_transform(X_tr).astype(np.float32)
        X_te = scaler.transform(X_te).astype(np.float32)

        # DataLoader
        train_ds = TensorDataset(torch.from_numpy(X_tr), torch.from_numpy(y_tr))
        loader   = DataLoader(train_ds, batch_size=16, shuffle=True)

        # Build model with MSE loss
        feature_dim = X_tr.shape[1]
        model, optimizer, loss_fn = make_hybrid_binary_classifier(
            feature_dim, n_layers=6, lr=0.2, momentum=0.5
        )

        # Train
        epochs = 10000
        for epoch in range(epochs):
            model.train()
            running_loss = 0.0
            for xb, yb in loader:
                optimizer.zero_grad()
                preds = model(xb)
                loss  = loss_fn(preds, yb)   # MSE between pred prob and 0/1 label
                loss.backward()
                optimizer.step()
                running_loss += loss.item() * xb.size(0)
            running_loss /= len(loader.dataset)

        # Test
        model.eval()
        with torch.no_grad():
            probs = model(torch.from_numpy(X_te))
        preds = (probs.numpy() > 0.5).astype(int).ravel()
        acc   = accuracy_score(y_te.ravel(), preds)
        print(f"{ticker:7s} → HybridQNN Acc: {acc:.4f}")
        results.append(acc)

    print(f"\nAverage HybridQNN Accuracy: {np.mean(results):.4f}")


AKBNK.IS → HybridQNN Acc: 0.8208
ARCLK.IS → HybridQNN Acc: 0.7972
DOHOL.IS → HybridQNN Acc: 0.8208
EREGL.IS → HybridQNN Acc: 0.5849
GARAN.IS → HybridQNN Acc: 0.8396
GSDHO.IS → HybridQNN Acc: 0.8137
HURGZ.IS → HybridQNN Acc: 0.5896
ISGYO.IS → HybridQNN Acc: 0.8113
KCHOL.IS → HybridQNN Acc: 0.5849


KeyboardInterrupt: 

# QNN Classifier

This QNN classifer was made in pennylane. We did use a torch node also. We will give the code for that also. 

In [12]:
class QNN_StronglyEntanglingClassifier:
    def __init__(self, num_layers, num_qubits, learning_rate=0.01, optimizer="Adam"):
        self.num_layers = num_layers
        self.num_qubits = num_qubits
        self.wires = list(range(num_qubits))  # the total number of wires or qubits 
        
        # param = layers × qubits × 3 because 3 params per qubit in each layer)
        self.num_params = num_layers * num_qubits * 3
        
        # init weights randomly 
        self.weights = 0.01 * pnp.random.randn(num_layers, num_qubits, 3)

        # we use adam optimizer for this. when using torch qnode, we do add SGD for proper comparison. 
        if optimizer == "Adam":
            self.optimizer = AdamOptimizer(learning_rate)
        else:
            raise ValueError(f"not supporting {optimizer} for now")

        self.dev = qml.device("lightning.qubit", wires=self.wires)
        self.qnode = qml.QNode(self._circuit, self.dev, interface="autograd")

        self._X = None
        self._Y = None

    def num_parameters(self):
        return self.num_params

    def encode(self, inputs):
        # classical input → angle for RY gate on each wire
        for i, x in enumerate(inputs):
            angle = (x - 0.5) * pnp.pi  # center around 0 - this is important since we use RY
            qml.RY(angle, wires=self.wires[i])

    def _circuit(self, inputs, weights):
        self.encode(inputs)  # angle embedding
        StronglyEntanglingLayers(weights, wires=self.wires)  # provides more expressibility rather 
                                                             # than making a custom ansatz because pennylane 
                                                             # already adds a lot of circuit depth and complexity
        return qml.expval(qml.PauliZ(self.wires[0]))  # measure Z on first qubit

    def predict(self, inputs, weights=None):
        w = self.weights if weights is None else weights
        out = self.qnode(inputs, w)  
        return (out + 1) / 2  # because expectation is between [-1, 1], we scale to [0, 1] for binary

    def binary_cross_entropy(self, preds, labels):
        # manual BCE, with epsilon for stability
        eps = 1e-10
        preds = pnp.clip(preds, eps, 1 - eps)
        return -pnp.mean(labels * pnp.log(preds) + (1 - labels) * pnp.log(1 - preds))

    def cost(self, weights):
        # forward pass over all inputs
        preds = pnp.stack([self.predict(x, weights) for x in self._X])
        return self.binary_cross_entropy(preds, self._Y)

    def train(self, X, Y, epochs=50):
        self._X = pnp.array(X)
        self._Y = pnp.array(Y)
        w = self.weights

        for e in range(1, epochs + 1):
            w, c = self.optimizer.step_and_cost(self.cost, w)
            grad_norm = pnp.linalg.norm(qml.grad(self.cost)(w))
            print(f"Epoch {e:2d} | BCE Loss: {c:.6f} | ∇ norm: {grad_norm:.2e}")

        self.weights = w  # update internal weights


In [13]:


if __name__ == "__main__":
    TICKERS = available_tickers
    START_TRAIN, END_TRAIN = "1998-01-05", "2005-12-29"
    START_TEST,  END_TEST  = "2006-01-06", "2007-08-31"
    
    FEATURES = ['RSI14','K14','D3']

    raw = yf.download(TICKERS, start=START_TRAIN, end=END_TEST, progress=False)

    results = []
    for ticker in TICKERS:
        df = pd.DataFrame({
            'Close':  raw['Close'][ticker],
            'High':   raw['High'][ticker],
            'Low':    raw['Low'][ticker],
            'Volume': raw['Volume'][ticker]
        }).dropna()
        df = compute_indicators(df)

        tr = df.index <= END_TRAIN
        te = df.index >= START_TEST
        X = df[FEATURES].values
        y = df['Direction'].values

        X_tr, y_tr = X[tr], y[tr]
        X_te, y_te = X[te], y[te]

        scaler = MinMaxScaler()
        X_tr = scaler.fit_transform(X_tr)
        X_te = scaler.transform(X_te)

        qnn = QNN_StronglyEntanglingClassifier(num_layers=3, num_qubits=X_tr.shape[1], learning_rate=0.05)
        qnn.train(X_tr, y_tr, epochs=100)

        probs = [qnn.predict(x) for x in X_te]
        preds = (pnp.array(probs) > 0.5).astype(int).ravel()
        acc   = accuracy_score(y_te, preds)
        print(f"{ticker:7s} → QNN Test accuracy: {acc:.4f}")

        results.append(acc)

    print(f"\nAverage QNN Test Accuracy: {sum(results)/len(results):.4f}")

Epoch  1 | BCE Cost: 1.716128 | ‖grad‖: 1.875e+00
Epoch  2 | BCE Cost: 1.520598 | ‖grad‖: 1.612e+00
Epoch  3 | BCE Cost: 1.338164 | ‖grad‖: 1.345e+00
Epoch  4 | BCE Cost: 1.183580 | ‖grad‖: 1.113e+00
Epoch  5 | BCE Cost: 1.048444 | ‖grad‖: 8.971e-01
Epoch  6 | BCE Cost: 0.937682 | ‖grad‖: 7.017e-01
Epoch  7 | BCE Cost: 0.850962 | ‖grad‖: 5.213e-01
Epoch  8 | BCE Cost: 0.784265 | ‖grad‖: 3.549e-01
Epoch  9 | BCE Cost: 0.737001 | ‖grad‖: 2.068e-01
Epoch 10 | BCE Cost: 0.707949 | ‖grad‖: 8.280e-02
Epoch 11 | BCE Cost: 0.693996 | ‖grad‖: 5.435e-02
Epoch 12 | BCE Cost: 0.691396 | ‖grad‖: 1.308e-01
Epoch 13 | BCE Cost: 0.695925 | ‖grad‖: 1.906e-01
Epoch 14 | BCE Cost: 0.701710 | ‖grad‖: 2.191e-01
Epoch 15 | BCE Cost: 0.700757 | ‖grad‖: 1.971e-01
Epoch 16 | BCE Cost: 0.689592 | ‖grad‖: 1.039e-01
Epoch 17 | BCE Cost: 0.675134 | ‖grad‖: 8.270e-02
Epoch 18 | BCE Cost: 0.670891 | ‖grad‖: 2.090e-01
Epoch 19 | BCE Cost: 0.678215 | ‖grad‖: 2.352e-01
Epoch 20 | BCE Cost: 0.679296 | ‖grad‖: 1.863e-01


### Torch QNN with no classical preprocessing 

As stated before, we create a qnode using torch interface. Just train it using the above train pipeline. The results remain almost the same.  

In [None]:
def make_pure_qnn_classifier(feature_dim, n_layers=6, lr=0.2, momentum=0.5):
    dev = qml.device("lightning.qubit", wires=feature_dim)

    @qml.qnode(dev, interface="torch")
    def qnode(inputs, weights):
        qml.AngleEmbedding(inputs, wires=range(feature_dim))
        qml.BasicEntanglerLayers(weights, wires=range(feature_dim))
        return qml.expval(qml.PauliZ(0))  # raw ∈ [-1,+1]

    weight_shapes = {"weights": (n_layers, feature_dim)}
    qlayer = qml.qnn.TorchLayer(qnode, weight_shapes)

    # Model is just the QLayer (output ∈[-1,1])
    model     = qlayer
    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=momentum)
    loss_fn   = nn.MSELoss()  # between raw output and {-1,+1} labels

    return model, optimizer, loss_fn

### Torch QNN with classical preprocessing

This gives better result that just torch QNN - however, hybridQNN performs the best. 

In [None]:
def make_pure_qnn_classifier(feature_dim, n_layers=6, lr=0.2, momentum=0.5):
    dev = qml.device("lightning.qubit", wires=feature_dim)

    # QNode now returns one PauliZ per wire
    @qml.qnode(dev, interface="torch")
    def qnode(inputs, weights):
        qml.AngleEmbedding(inputs, wires=range(feature_dim))
        qml.StronglyEntanglingLayers(weights, wires=range(feature_dim))
        return [qml.expval(qml.PauliZ(i)) for i in range(feature_dim)]

    weight_shapes = {"weights": (n_layers, feature_dim, 3)}
    qlayer = qml.qnn.TorchLayer(qnode, weight_shapes)

    class MultiQubitMSEClassifier(nn.Module):
        def __init__(self):
            super().__init__()
            self.qlayer = qlayer # collapse feature_dim quantum features → 1 output
            self.head = nn.Linear(feature_dim, 1)

        def forward(self, x):
            # x: [batch, feature_dim]
            exp_vals = self.qlayer(x)            # → [batch, feature_dim]
            out      = self.head(exp_vals).squeeze(-1)  
            return torch.sigmoid(out)               # → [batch]

    model    = MultiQubitMSEClassifier()
    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=momentum)
    loss_fn   = nn.MSELoss()                  # expects targets in [0, +1]

    return model, optimizer, loss_fn