<a href="https://colab.research.google.com/github/CesarSarmiento1111/MetNumUN2024II/blob/main/LabATQ/strategy.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import logging

import xarray as xr  # xarray for data manipulation

import qnt.data as qndata     # functions for loading data
import qnt.backtester as qnbt # built-in backtester
import qnt.ta as qnta         # technical analysis library
import qnt.stats as qnstats   # statistical functions

import pandas as pd
import numpy as np

import matplotlib.pyplot as plt

np.seterr(divide = "ignore")

from qnt.ta.macd import macd
from qnt.ta.rsi  import rsi
from qnt.ta.stochastic import stochastic_k, stochastic, slow_stochastic

from sklearn import linear_model
from sklearn.metrics import r2_score
from sklearn.metrics import explained_variance_score
from sklearn.metrics import mean_absolute_error

from sklearn.ensemble import HistGradientBoostingClassifier

In [None]:
stock_data = qndata.stocks_load_spx_data(min_date='2005-06-01')

In [None]:
def get_features(data):
    """Builds the features used for learning:
       * a trend indicator;
       * the moving average convergence divergence;
       * a volatility measure;
       * the stochastic oscillator;
       * the relative strength index;
       * the logarithm of the closing price;
       * Trix indicator.
       * Average True Range (ATR);
       * On-Balance Volume (OBV).
       These features can be modified and new ones can be added easily.
    """

    # trend:
    #trend = qnta.roc(qnta.lwma(data.sel(field="close"), 60), 1)

    # Liquidity filter (1.0 for liquid assets, 0.0 for non-liquid assets):
    liq = data.sel(field="is_liquid").ffill("time").bfill("time").fillna(0)

    # moving average convergence divergence (MACD):
    macd = qnta.macd(data.sel(field="close"))
    macd2_line, macd2_signal, macd2_hist = qnta.macd(data, 12, 26, 9)

    # volatility:
    volatility = qnta.tr(data.sel(field="high"), data.sel(field="low"), data.sel(field="close"))
    volatility = volatility / data.sel(field="close")
    volatility = qnta.lwma(volatility, 14)

    # the stochastic oscillator:
    #k, d = qnta.stochastic(data.sel(field="high"), data.sel(field="low"), data.sel(field="close"), 14)

    # the relative strength index:
    rsi = qnta.rsi(data.sel(field="close"))

    # the logarithm of the closing price:
    #price = data.sel(field="close").ffill("time").bfill("time").fillna(0)  # fill NaN
    #price = np.log(price)

    # new feature: Trix (TRIX)
    trix = qnta.trix(data.sel(field="close"), 15)  # Using a period of 15 (can be adjusted)

    # new feature: Average True Range (ATR)
    atr = qnta.atr(data.sel(field="high"), data.sel(field="low"), data.sel(field="close"), 14).expand_dims(field=["atr"])

    obv = qnta.obv(data.sel(field="close"), data.sel(field="vol")).expand_dims(field=["obv"])


    # combine the features:
    result = xr.concat(
        [macd2_signal.sel(field="close"), volatility, rsi, trix, atr, obv],
        pd.Index(
            ["macd", "volatility", "rsi", "trix", "atr", "obv"],
            name="field"
        )
    )

    # Expand liquidity to match dimensions of result
    liq_expanded = liq.expand_dims(field=result.field)  # Match the "field" dimension

    # Apply liquidity filter (assets with liq == 0 will be excluded)
    result = result.where(liq_expanded > 0.5, drop=True)


    return result.transpose("time", "field", "asset")

In [None]:
# displaying the features:
my_features = get_features(stock_data)
display(my_features.sel(field="trix").to_pandas())

In [None]:
"""

def get_target_classes(data):
    """ Target classes for predicting if price goes up, down, or stays the same."""

    price_current = data.sel(field="close")
    price_future = qnta.shift(price_current, -1).fillna(price_current)  # Manejo de NaNs

    class_buy = 1    # prices goes up
    class_hold = 0   # prices stay the same
    class_sell = -1  # prices go down

    epsilon = 0.1  # Ajusta el umbral según la escala de precios

    # Alinear dimensiones antes de clasificar
    price_current, price_future = xr.align(price_current, price_future, join="inner")

    # Clasificación basada en comparación con epsilon
    target_classes = xr.where(
        price_future > price_current + epsilon, class_buy,
        xr.where(
            price_future < price_current - epsilon, class_sell,
            class_hold
        )
    )

    return target_classes


"""

In [None]:
def get_target_classes(data):
    """Target classes for predicting if price goes up, down, or stays the same."""

    # Calcular la variación porcentual diaria
    price_change_ratio = qnta.change(data.sel(field="close")) / data.sel(field="close")
    future_price_change_ratio = price_change_ratio.shift(time=-1).fillna(0)  # Manejo de NaNs

    # Definir clases
    class_positive = 1  # Price goes up more than move
    class_neutral = 0   # Price does not move more than move
    class_negative = -1 # Price goes down more than move

    # Umbral de movimiento (ajustar según sea necesario)
    move = 0.02  # 1%

    # Clasificar los rendimientos futuros
    target_price = xr.where(
        future_price_change_ratio < -move, class_negative, future_price_change_ratio
    )
    target_price = xr.where(
        future_price_change_ratio > move, class_positive, target_price
    )
    target_price = xr.where(abs(future_price_change_ratio) <= move, class_neutral, target_price)

    return target_price


In [None]:
def get_target_classes(data):
    """ Target classes for predicting if price goes up or down."""

    price_current = data.sel(field="close")
    price_future  = qnta.shift(price_current, -1)

    class_positive = 1 # prices goes up
    class_negative = 0 # price goes down

    target_price_up = xr.where(price_future > price_current, class_positive, class_negative)

    return target_price_up

In [None]:
def get_target_classes(data):
    """Target classes for predicting if price goes down or stays the same/up."""

    price_current = data.sel(field="close")
    price_future  = qnta.shift(price_current, -1)

    class_down  = -1  # price goes down
    class_same  = 0   # price stays the same or goes up

    target_price_up = xr.where(price_future < price_current, class_down, class_same)

    return target_price_up


In [None]:
# displaying the target classes:
my_targetclass = get_target_classes(stock_data)
display(my_targetclass.to_pandas())

In [None]:
def get_model():
    """Constructor para el modelo ML: HistGradientBoostingClassifier."""
    model = HistGradientBoostingClassifier()
    return model

In [None]:
# Create and train the models working on an asset-by-asset basis.

asset_name_all = stock_data.coords["asset"].values

target_assets = set(my_targetclass.coords['asset'].values)
feature_assets = set(my_features.coords['asset'].values)
common_assets = target_assets.intersection(feature_assets)

models = dict()

for asset_name in common_assets:
        target_cur = my_targetclass.sel(asset=asset_name).dropna("time", "any")
        features_cur = my_features.sel(asset=asset_name).dropna("time", "any")

        # align features and targets:
        target_for_learn_df, feature_for_learn_df = xr.align(target_cur, features_cur, join="inner")

        if len(features_cur.time) < 10:
            # not enough points for training
                continue
            # HistGradientBoostingClassifier requires targets as 1D arrays
        target_for_learn = target_for_learn_df.values.ravel()
        features_for_learn = feature_for_learn_df.values

        model = get_model()

        try:
            model.fit(feature_for_learn_df.values, target_for_learn_df)
            models[asset_name] = model

        except:
            logging.exception("model training failed")


In [None]:
# Prediction and generating output weights:
weights = xr.zeros_like(stock_data.sel(field="close"))

for asset_name in asset_name_all:
    if asset_name in models:
        model = models[asset_name]
        features_all = my_features
        features_cur = features_all.sel(asset=asset_name).dropna("time", "any")
        if len(features_cur.time) < 1:
            continue
        try:
            # HistGradientBoostingClassifier outputs probabilities, so we take class probabilities for class 1
            probs = model.predict_proba(features_cur.values)[:, 1]
            weights.loc[dict(asset=asset_name, time=features_cur.time.values)] = probs
        except KeyboardInterrupt as e:
            raise e
        except:
            logging.exception("model prediction failed")

print(weights)

In [None]:
def get_sharpe(stock_data, weights):
    """Calculates the Sharpe ratio"""
    rr = qnstats.calc_relative_return(stock_data, weights)
    sharpe = qnstats.calc_sharpe_ratio_annualized(rr).values[-1]
    return sharpe

sharpe = get_sharpe(stock_data, weights)
sharpe

The sharpe ratio using the method above follows from **forward looking**. Predictions for (let us say) 2017 know about the relation between features and targets in 2020. Let us visualize the results:

In [None]:
import qnt.graph as qngraph

statistics = qnstats.calc_stat(stock_data, weights)

display(statistics.to_pandas().tail())

performance = statistics.to_pandas()["equity"]
qngraph.make_plot_filled(performance.index, performance, name="PnL (Equity)", type="log")

display(statistics[-1:].sel(field = ["sharpe_ratio"]).transpose().to_pandas())

# check for correlations with existing strategies:
qnstats.print_correlation(weights,stock_data)

In [None]:
"""R2 (coefficient of determination) regression score function."""
r2_score(my_targetclass, weights, multioutput="variance_weighted")

In [None]:
"""The explained variance score explains the dispersion of errors of a given dataset"""
explained_variance_score(my_targetclass, weights, multioutput="uniform_average")

In [None]:
"""The explained variance score explains the dispersion of errors of a given dataset"""
mean_absolute_error(my_targetclass, weights)

Let us now use the Quantiacs **backtester** for avoiding **forward looking**.

The backtester performs some transformations: it trains the model on one slice of data (using only data from the past) and predicts the weights for the following slice on a rolling basis:

In [None]:
def train_model(data):
    """Create and train the model working on an asset-by-asset basis."""

    asset_name_all = data.coords["asset"].values
    target_all = get_target_classes(data)
    features_all = get_features(data)

    # Alinear ambos conjuntos de datos para garantizar consistencia
    target_all, features_all = xr.align(target_all, features_all, join="inner")

    models = dict()

    for asset_name in features_all.coords["asset"].values:
        if asset_name not in target_all.coords["asset"].values:
            continue  # Omitir activos que no están presentes en ambos conjuntos
        # Drop missing values:
        target_cur = target_all.sel(asset=asset_name).dropna("time", "any")
        features_cur = features_all.sel(asset=asset_name).dropna("time", "any")

        # Align features and targets:
        target_for_learn_df, feature_for_learn_df = xr.align(target_cur, features_cur, join="inner")

        # Verificar si hay suficientes datos
        if len(features_cur.time) < 10:
            continue

        # Convertir a arrays planos (1D para targets y 2D para features)
        target_for_learn = target_for_learn_df.values.ravel()
        features_for_learn = feature_for_learn_df.values

        model = get_model()

        try:
            model.fit(features_for_learn, target_for_learn)
            models[asset_name] = model

        except Exception:
            logging.exception("model training failed")

    return models


In [None]:
def predict_weights(models, data):
    """The model predicts if the price is going up or down.
       The prediction is performed for several days in order to speed up the evaluation."""

    asset_name_all = data.coords["asset"].values
    weights = xr.zeros_like(data.sel(field="close"))

    for asset_name in asset_name_all:
        if asset_name in models:
            model = models[asset_name]
            features_all = get_features(data)
            features_cur = features_all.sel(asset=asset_name).dropna("time", "any")

            if len(features_cur.time) < 10:
                continue

            try:
                # Obtener probabilidades para la clase 1
                probs = model.predict_proba(features_cur.values)[:, 1]
                weights.loc[dict(asset=asset_name, time=features_cur.time.values)] = probs

            except KeyboardInterrupt as e:
                raise e

            except Exception:
                logging.exception("model prediction failed")

    return weights


In [None]:
# Calculate weights using the backtester:
weights = qnbt.backtest_ml(
    train                         = train_model,
    predict                       = predict_weights,
    train_period                  =  2 *365,  # the data length for training in calendar days
    retrain_interval              = 10 *365,  # how often we have to retrain models (calendar days)
    retrain_interval_after_submit = 1,        # how often retrain models after submission during evaluation (calendar days)
    predict_each_day              = False,    # Is it necessary to call prediction for every day during backtesting?
                                              # Set it to True if you suspect that get_features is looking forward.
    competition_type              = "stocks_s&p500",  # competition type
    lookback_period               = 365,                 # how many calendar days are needed by the predict function to generate the output
    start_date                    = "2005-01-01",        # backtest start date
    analyze                       = True,
    build_plots                   = True  # do you need the chart?
)

The Sharpe ratio is obviously smaller as the training process is not looking forward (as it happens by processing data on a global basis), but performed on a rolling basis.

# May I import libraries?

Yes, please refer to the file **init.ipynb** in your home directory. You can for example use:

! conda install -y scikit-learn

# How to load data?

Daily stock data for the **Q18 Nasdaq-100** contest can be loaded using:
```python
data = qndata.stocks.load_ndx_data(tail = 17*365, dims = ("time", "field", "asset"))
```

Cryptocurrency daily data used for the Q16/Q17 contests can be loaded using:
```python
data = qndata.cryptodaily.load_data(tail = 17*365, dims = ("time", "field", "asset"))
```

Futures data for the Q15 contest can be loaded using:
```python
data= qndata.futures.load_data(tail = 17*365, dims = ("time", "field", "asset"))
```

BTC Futures data for the Q15 contest can be loaded using:
```python
data= qndata.cryptofutures.load_data(tail = 17*365, dims = ("time", "field", "asset"))
```

# How to view a list of all tickers?

```python
data.asset.to_pandas().to_list()
```

# How to see which fields are available?

```python
data.field.to_pandas().to_list()
```

# How to load specific tickers?

```python
data = qndata.stocks.load_ndx_data(tail=17 * 365, assets=["NAS:AAPL", "NAS:AMZN"])
```

# How to select specific tickers after loading all data?

```python
def get_data_filter(data, assets):
    filler= data.sel(asset=assets)
    return filler

get_data_filter(data, ["NAS:AAPL", "NAS:AMZN"])
```

# How to get the prices for the previous day?

```python
qnta.shift(data.sel(field="open"), periods=1)
```

or:

```python
data.sel(field="open").shift(time=1)
```

# How to get the Sharpe ratio?

```python
import qnt.stats as qnstats

def get_sharpe(market_data, weights):
    rr = qnstats.calc_relative_return(market_data, weights)
    sharpe = qnstats.calc_sharpe_ratio_annualized(rr).values[-1]
    return sharpe

sharpe = get_sharpe(data, weights) # weights.sel(time=slice("2006-01-01",None))
```

# How do I get a list of the top 3 assets ranked by Sharpe ratio?

```python
import qnt.stats as qnstats

data = qndata.stocks.load_ndx_data(tail = 17*365, dims = ("time", "field", "asset"))

def get_best_instruments(data, weights, top_size):
    # compute statistics:
    stats_per_asset = qnstats.calc_stat(data, weights, per_asset=True)
    # calculate ranks of assets by "sharpe_ratio":
    ranks = (-stats_per_asset.sel(field="sharpe_ratio")).rank("asset")
    # select top assets by rank "top_period" days ago:
    top_period = 1
    rank = ranks.isel(time=-top_period)
    top = rank.where(rank <= top_size).dropna("asset").asset

    # select top stats:
    top_stats = stats_per_asset.sel(asset=top.values)

    # print results:
    print("SR tail of the top assets:")
    display(top_stats.sel(field="sharpe_ratio").to_pandas().tail())

    print("avg SR = ", top_stats[-top_period:].sel(field="sharpe_ratio").mean("asset")[-1].item())
    display(top_stats)
    return top_stats.coords["asset"].values

get_best_instruments(data, weights, 3)
```

# How can I check the results for only the top 3 assets ranked by Sharpe ratio?

Select the top assets and then load their data:

```python
best_assets= get_best_instruments(data, weights, 3)

data= qndata.stocks.load_ndx_data(tail = 17*365, assets=best_assets)
```

# How can prices be processed?

Simply import standard libraries, for example **numpy**:

```python
import numpy as np

high= np.log(data.sel(field="high"))
```

# How can you reduce slippage impace when trading?

Just apply some technique to reduce turnover:

```python
def get_lower_slippage(weights, rolling_time=6):
    return weights.rolling({"time": rolling_time}).max()

improved_weights = get_lower_slippage(weights, rolling_time=6)
```

# How to use technical analysis indicators?

For available indicators see the source code of the library: /qnt/ta

## ATR

```python
def get_atr(data, days=14):
    high = data.sel(field="high") * 1.0
    low  = data.sel(field="low") * 1.0
    close= data.sel(field="close") * 1.0

    return qnta.atr(high, low, close, days)

atr= get_atr(data, days=14)
```

## EMA

```python
prices= data.sel(field="high")
prices_ema= qnta.ema(prices, 15)
```

## TRIX

```python
prices= data.sel(field="high")
prices_trix= qnta.trix(prices, 15)
```

## ADL and EMA

```python
adl= qnta.ad_line(data.sel(field="close")) * 1.0
adl_ema= qnta.ema(adl, 18)
```

# How can you check the quality of your strategy?

```python
import qnt.output as qnout
qnout.check(weights, data, "stocks_nasdaq100")
```

or

```python
stat= qnstats.calc_stat(data, weights)
display(stat.to_pandas().tail())
```

or

```python
import qnt.graph   as qngraph
statistics= qnstats.calc_stat(data, weights)
display(statistics.to_pandas().tail())

performance= statistics.to_pandas()["equity"]
qngraph.make_plot_filled(performance.index, performance, name="PnL (Equity)", type="log")

display(statistics[-1:].sel(field = ["sharpe_ratio"]).transpose().to_pandas())
qnstats.print_correlation(weights, data)

```

# An example using pandas

One can work with pandas DataFrames at intermediate steps and at the end convert them to xarray data structures:

```python
def get_price_pct_change(prices):
    prices_pandas = prices.to_pandas()
    assets = data.coords["asset"].values
    for asset in assets:
        prices_pandas[asset] = prices_pandas[asset].pct_change()
    return prices_pandas

prices = data.sel(field="close") * 1.0
prices_pct_change = get_price_pct_change(prices).unstack().to_xarray()
```

# How to submit a strategy to the competition?

Check that weights are fine:

```python
import qnt.output as qnout
qnout.check(weights, data, "stocks_nasdaq100")
```

If everything is ok, write the weights to file:

```python
qnout.write(weights)
```

In your **personal account**:

* **choose** a strategy;
* click on the **Submit** button;
* select the type of competition.

At the beginning you will find the strategy under the **Checking** area:

* **Sent strategies** > **Checking**.

If technical checks are successful, the strategy will go under the **Candidates** area:

* **Sent strategies** > **Candidates**.

Otherwise it will be **Filtered**:

* **Sent strategies** > **Filtered**

and you should inspect error and warning messages.

Note that a strategy under the **Candidates** area should have a Sharpe ratio larger than 1 for being eligible for a prize. Please check warning messages in your **Candidates** area!