In [None]:
from datetime import datetime, date, timedelta, time
from dotenv import load_dotenv
import pandas as pd
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots


from quantrion import settings
from quantrion.asset.alpaca import (
    AlpacaUSStock,
    AlpacaUSStockListProvider,
    AlpacaCrypto,
)
from sklearn.linear_model import LinearRegression

load_dotenv()
%load_ext autoreload
%autoreload 2

In [None]:
# asset = AlpacaUSStock("AAPL")
asset = AlpacaCrypto("BTC/USD")

In [None]:
start = asset.dt.now() - timedelta(days=3)
df = await asset.bars.get(start, freq="3min")
b_sma = await asset.bars.get_sma(start, freq="3min", n=100)
lower, sma, upper = await asset.bars.get_bollinger_bands(start, freq="3min")

In [None]:
# 161.202512
# b_sma, bands
fig = make_subplots(specs=[[{"secondary_y": True}]])
fig.add_trace(
    go.Candlestick(
        x=df.index, open=df["open"], high=df["high"], low=df["low"], close=df["close"]
    ),
)
fig.add_trace(
    go.Scatter(x=sma.index, y=sma),
)
fig.add_trace(
    go.Scatter(x=lower.index, y=lower),
)
fig.add_trace(
    go.Scatter(x=upper.index, y=upper),
)
fig.add_trace(
    go.Scatter(
        x=b_sma.index,
        y=b_sma,
    ),
)
fig.update_xaxes(
    rangeslider_visible=True,
    rangebreaks=[
        dict(bounds=["sat", "mon"]),  # hide weekends, eg. hide sat to before mon
        dict(bounds=[16, 9.5], pattern="hour"),  # hide hours outside of 9.30am-4pm
    ],
)
fig.show()

In [None]:
_bars_resample_funcs = {
    "open": "first",
    "high": "max",
    "low": "min",
    "close": "last",
    "volume": "sum",
    "price": "sum",
    "n_trades": "sum",
}
spread = 0.012
stock = True
tz = "America/New_York"
bars = pd.read_csv("./files/apple_historical.csv")
bars["start"] = pd.DatetimeIndex(pd.to_datetime(bars["start"], utc=True))
bars = bars.set_index("start").tz_convert(tz)
if stock:
    bars = bars[bars.index.dayofweek <= 4]
    bars = bars[(bars.index.time <= time(16, 0)) & (bars.index.time >= time(9, 30))]
bars["price"] = bars["price"] * bars["volume"]
bars = bars.resample("3min").aggregate(_bars_resample_funcs).dropna()
bars["price"] /= bars["volume"]
_bars_fill_values = {
    "volume": 0,
}
bars = bars.fillna(_bars_fill_values)
bars = bars.dropna()
bars.info()

In [None]:
split_dt = pd.Timestamp("2022-07-01").tz_localize(tz)
bars_train = bars[:split_dt]
bars_test = bars[split_dt:]


def plot_candles(df):
    fig = make_subplots(specs=[[{"secondary_y": True}]])
    fig.add_trace(
        go.Candlestick(
            x=df.index,
            open=df["open"],
            high=df["high"],
            low=df["low"],
            close=df["close"],
        ),
    )
    fig.add_trace(
        go.Scatter(x=df.index, y=df["SMA"], name="SMA"),
    )
    fig.add_trace(
        go.Scatter(x=df.index, y=df["LB"], name="Lower Band"),
    )
    fig.add_trace(
        go.Scatter(x=df.index, y=df["UB"], name="Upper Band"),
    )
    fig.add_trace(
        go.Scatter(x=df.index, y=df["positions"], name="Positions"),
        secondary_y=True,
    )
    fig.add_trace(
        go.Scatter(x=df.index, y=2 * df["rsi"] / 100 - 1, name="RSI"),
        secondary_y=True,
    )
    fig.add_trace(
        go.Scatter(x=df.index, y=df["B_SMA"], name="Big SMA"),
    )
    if stock:
        fig.update_xaxes(
            rangeslider_visible=True,
            rangebreaks=[
                dict(
                    bounds=["sat", "mon"]
                ),  # hide weekends, eg. hide sat to before mon
                dict(
                    bounds=[16, 9.5], pattern="hour"
                ),  # hide hours outside of 9.30am-4pm
            ],
        )
    fig.update_yaxes(secondary_y=True)
    fig.show()


def plot_strategy(df):
    fig = make_subplots(specs=[[{"secondary_y": True}]])
    fig.add_traces(
        [
            go.Scatter(x=df.index, y=df["strategy"], name="strategy"),
            go.Scatter(x=df.index, y=df["benchmark"], name="benchmark"),
            go.Scatter(x=df.index, y=df["trades"], name="trades"),
        ],
        secondary_ys=[False, False, True],
    )
    if stock:
        fig.update_xaxes(
            rangeslider_visible=True,
            rangebreaks=[
                dict(
                    bounds=["sat", "mon"]
                ),  # hide weekends, eg. hide sat to before mon
                dict(
                    bounds=[16, 9.5], pattern="hour"
                ),  # hide hours outside of 9.30am-4pm
            ],
        )
    fig.show()


def bollinger_positions(bars: pd.DataFrame, sma_lag):
    close = bars["close"]
    sma = close.rolling(sma_lag).mean()
    std = close.rolling(sma_lag).std()
    l_band, u_band = sma - 2 * std, sma + 2 * std
    bars["LB"] = l_band
    bars["UB"] = u_band
    dist = close - sma
    positions = np.where(close > u_band, -1, np.nan)
    positions[close < l_band] = 1
    positions[dist * dist.shift(1) < 0] = 0
    return pd.Series(positions, index=bars.index).ffill().shift(1).fillna(0)


def momentum_positions(bars: pd.DataFrame, sma_lag, big_sma_lag):
    close = bars["close"]
    sma = close.rolling(sma_lag).mean()
    big_sma = close.rolling(big_sma_lag).mean()
    bars["SMA"] = sma
    bars["B_SMA"] = big_sma
    macd = sma - big_sma
    positions = np.where(macd > 0, 1, np.nan)
    positions[macd < 0] = -1
    return pd.Series(positions, index=bars.index).ffill().shift(1).fillna(0)


def rsi_positions(bars: pd.DataFrame, sma_lag):
    profit: pd.Series = np.log(bars["close"] / bars["close"].shift(1))
    avg_gain = (
        ((profit > 0) * profit).rolling(sma_lag).mean().ewm(alpha=1 / sma_lag).mean()
    )
    avg_loss = (
        -((profit < 0) * profit).rolling(sma_lag).mean().ewm(alpha=1 / sma_lag).mean()
    )
    rsi = 100 - 100 / (1 + avg_gain / avg_loss)
    bars["rsi"] = rsi
    positions = np.where((80 <= rsi) | ((20 < rsi) & (rsi <= 40)), -1, np.nan)
    positions[((60 <= rsi) & (rsi < 80)) | (rsi <= 20)] = 1
    positions[(40 <= rsi) & (rsi < 60)] = 0
    return pd.Series(positions, index=bars.index).ffill().shift(1).fillna(0)


def process_bars(bars: pd.DataFrame, sma_lag, big_sma_lag):
    bars = bars.copy()
    positions = pd.concat(
        [
            bollinger_positions(bars, sma_lag),
            momentum_positions(bars, sma_lag, big_sma_lag),
            rsi_positions(bars, sma_lag),
        ],
        axis=1,
    )
    n_indicators = len(positions.columns)
    count_short = (positions == -1).sum(axis=1)
    count_neutral = (positions == 0).sum(axis=1)
    count_long = (positions == 1).sum(axis=1)

    bars.loc[count_short >= n_indicators // 2 + 1, "positions"] = -1
    bars.loc[count_long >= n_indicators // 2 + 1, "positions"] = 1
    bars.loc[count_neutral >= n_indicators // 2 + 1, "positions"] = 0
    bars["positions"] = bars["positions"].ffill().fillna(0)

    # positions = bars["positions"].ffill()
    # bars.loc[
    #     (positions == -1) & (bars["SMA"] - bars["B_SMA"] > 0.05 * bars["SMAdist_std"]), "positions"
    # ] = 0
    # bars.loc[
    #     (positions == 1) & (bars["B_SMA"] - bars["SMA"] > 0.05 * bars["SMAdist_std"]), "positions"
    # ] = 0
    # bars.loc[
    #     (bars["close"] < bars["LB"]) & (bars["SMA"] < bars["B_SMA"]), "positions"
    # ] = 0

    #     taps_count = 0
    #     position = 0
    #     for index, bar in bars.iterrows():
    #         tap = bar["taps"]
    #         if tap == 0:
    #             continue
    #         if int(tap * taps_count) < 0:
    #             position = 0
    #             taps_count = 0
    #         taps_count += tap
    #         if tap < 0:
    #             position = 1
    #         if tap > 0:
    #             position = -1
    #         bars.loc[index, "positions"] = position
    #     bars["ctaps"] = bars.groupby(bars.index.date)["taps"].cumsum()

    #     bars.loc[bars[up_cross_stat] > bars["UB"], "positions"] = -1
    #     bars.loc[bars[low_cross_stat] < bars["LB"], "positions"] = 1
    #     bars.loc[bars.index.time >= time(15, 55), "positions"] = 0
    #     bars.loc[
    #         ((bars[low_cross_stat] <= bars["SMA"]) & (bars["positions"].ffill() == -1)) |
    #         ((bars[up_cross_stat] >= bars["SMA"]) & (bars["positions"].ffill() == 1)),
    #         "positions"
    #     ] = 0
    bars["trades"] = bars["positions"].diff().abs().fillna(0).cumsum()
    bars["price_change"] = np.log(bars["close"] / bars["close"].shift(1)).fillna(0)
    bars["profit"] = bars["price_change"] * bars["positions"]
    bars["strategy"] = (
        np.exp(bars["profit"].cumsum()) - bars["trades"] * spread / bars["close"]
    )
    bars["benchmark"] = np.exp(bars["price_change"].cumsum()) - spread / bars["close"]
    return bars

In [None]:
import optuna


def objective(trial):
    sma_lag = trial.suggest_int("sma_lag", 10, 50)
    big_sma_lag = trial.suggest_int("big_sma_lag", sma_lag, 500)
    #     up_cross_stat = trial.suggest_categorical("up_cross_stat", ["open", "low", "high", "close", "price"])
    #     low_cross_stat = trial.suggest_categorical("low_cross_stat", ["open", "low", "high", "close", "price"])
    df = process_bars(bars_train, sma_lag, big_sma_lag)
    return (df["strategy"][-1] - df["strategy"][0]) / df["strategy"][0]


study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=200)

In [None]:
study.trials_dataframe().sort_values("value", ascending=False).head(5)

In [None]:
start = pd.Timestamp("2022-08-01T15:25:00").tz_localize(tz)
end = pd.Timestamp("2022-08-04T16:00:00").tz_localize(tz)
df = process_bars(bars_test, 20, 46)
# start = pd.Timestamp("2022-06-16T00:00:00").tz_localize(tz)
# end = pd.Timestamp("2022-06-17T00:00:00").tz_localize(tz)
# df = process_bars(bars_train, 38, 85)
# df = process_bars(bars_test, 36, 298)
plot_candles(df[start:end])
plot_strategy(df)


# plo(bars_test, 403, "low", "high", True)

In [None]:
df[start:end].iloc[190:]

In [None]:
bars[start:end].iloc[190:]