# Decoding Market Signals: Candlestick patterns

## Load libraries
At first, we will load up some dependencies that we will make use of later on. We follow the convention of naming them a standard alias for easier handling.

In [None]:
import multiprocessing
import random
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import talib
from scipy.stats import norm
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression, LogisticRegressionCV
from sklearn.feature_selection import RFECV
from IPython.display import Image

pd.set_option("display.max_columns", None)
%load_ext autoreload
%autoreload 2

# What is a candle stick and what are candlestick patterns?
This section aims at briefly describing what candlesticks are, so we bring everyone up to speed regarding this graphical representation of price data. 

## Anatomy of a candle stick (aka Japanese candle stick)

In [None]:
Image(
    filename="/home/bjs/Desktop/vcs/ReCoDE-DecodingMarketSignals/figures/cs_anatomy.png"
)

## Candle stick patterns

In [None]:
Image(
    filename="/home/bjs/Desktop/vcs/ReCoDE-DecodingMarketSignals/figures/cs_prediction.png"
)

In [None]:
pd.read_csv("./../data/MSFT.csv")

# Loading and pre-processing the data

In [None]:
# what columns are available from the Wharton CRSP data?
pd.read_csv("./../data/MSFT.csv").columns

We can make use of a technique called `method chaining` to pipe multiple processing steps within Pandas. This offers two advantages
 - We start from the raw data as input statement initially. In case the data is small enough to be processed in-memory, we do not require to store intermediary results on disk which confuses transparency and is less efficient.
 - Increased code readability and transparency for debugging.

In [None]:
df = (
    pd.read_csv(
        "./../data/MSFT.csv",
        usecols=["date", "OPENPRC", "ASKHI", "BIDLO", "PRC", "VOL"],
    )
    .rename(
        columns={
            "OPENPRC": "open",
            "ASKHI": "high",
            "BIDLO": "low",
            "PRC": "close",
            "VOL": "vol",
        }
    )
    .dropna()
    .reset_index()
    .assign(
        date=lambda df: pd.to_datetime(df["date"], format="%Y-%m-%d"),
        vol=lambda df: df["vol"].astype(int),
        intraday_return=lambda df: (df["close"] - df["open"]) / df["open"],
        sign_intraday_return=lambda df: np.sign(df["intraday_return"]).astype(int),
    )[
        [
            "date",
            "open",
            "high",
            "low",
            "close",
            "vol",
            "intraday_return",
            "sign_intraday_return",
        ]
    ]
)

df["log_return"] = np.log(df["close"] / df["close"].shift(1))
df["next_intraday_return"] = df["intraday_return"].shift(-1)
df["sign_next_day_return"] = df["sign_intraday_return"].shift(-1).astype("Int64")

# df['previous_day_return'] = df['sign_intraday_return'].shift(+1)#.dropna().astype(int)
# df['5_days_mean_return'] = df['log_return'].rolling(window=5, min_periods=1).mean().shift(-5)  # Idea: Compute 5 days average ahead returns one day after the signal. using pd.rolling and pd.shift.
df = df.dropna()
# df['previous_day_return'] = df['previous_day_return'].astype(int)
df.reset_index(drop=True, inplace=True)
df.head()

# What candlestick patterns are available?

In [None]:
candle_names = talib.get_function_groups()["Pattern Recognition"]
candle_names[:5]

In [None]:
list_of_results_df = []
# for ListingId in df['ListingId'].unique():
#     equity_df = df.loc[df['ListingId']==ListingId].reset_index()

for candle in candle_names:
    tds = df.copy()
    tds["cs_pattern"] = getattr(talib, candle)(
        df["open"], df["high"], df["low"], df["close"]
    )
    tds["candle"] = candle
    list_of_results_df.append(tds.set_index(["candle", "date"]))

result_df = pd.concat(list_of_results_df)
result_df.head()

In [None]:
# we can query an CDL pattern from a multiindex data frame like so:
result_df.loc["CDL2CROWS"]

In [None]:
cs_signals_df = result_df[result_df["cs_pattern"] != 0]
cs_signals_df.head()

# James' suggestion regarding returns:
# relative difference between today's return and next day's return
# (next days value - current days value) /  current days value

## Performance for signals will tbe a confusion matrix

A confusion matrix is a table that is used to define the performance of a classification algorithm. A confusion matrix quantifies and summarizes the performance of a classification algorithm.

The confusion matrix consists of four characteristics that are used to define the measurement metrics of the classifier. These four numbers are:

- i) TP (True Positive): TP represents the number of days who have been properly picked up by the trading algorithm, meaning they have we make a profit.

- ii) TN (True Negative): TN represents the number of correctly classified days who are where the trading algorithm is not supposed to act based on the signal.

- iii) FP (False Positive): FP represents the number of misclassified days, i.e. we enter a trade but the market in fact trend in opposit direction as our signal predicts. FP is also known as a Type I error.

- iv) FN (False Negative): FN represents the number of trading days that were not picked up by the signal when the market, in fact turned in the direction the signal was meant to indicate. It means a missed opportunity. FN is also known as a Type II error.

- Performance metrics of an algorithm are accuracy, precision, recall, and F1 score, which are calculated on the basis of TP, TN, FP, and FN.

- Accuracy of an algorithm is represented as the ratio of correctly classified trading days (TP+TN) to the total number of trading days (TP+TN+FP+FN).

- Precision of an algorithm is represented as the ratio of correctly classified trading days in which we make a profit (TP) to the total number of trading days predicted to move synchronously with our signal (TP+FP).

- Recall metric is defined as the ratio of correctly classified trading days (TP) divided by total number of trading days who actually were in our favour given the signal. The perception behind recall is how many day were classified as trending in the direction the signal shoul dpick up. Recall is also called as sensitivity.

- F1 score is also known as the F Measure. The F1 score states the equilibrium between the precision and the recall.

For further information on confusion matrices, see:

https://www.kdnuggets.com/2020/01/guide-precision-recall-confusion-matrix.html

https://towardsdatascience.com/performance-metrics-confusion-matrix-precision-recall-and-f1-score-a8fe076a2262

The Wilson score interval for a proportion is given by:

$$
\text{Center} = \frac{\hat{p} + \frac{z^2}{2n}}{1 + \frac{z^2}{n}}
$$

$$
\text{Margin} = \frac{z \sqrt{\frac{\hat{p}(1 - \hat{p})}{n} + \frac{z^2}{4n^2}}}{1 + \frac{z^2}{n}}
$$

$$
\text{CI Lower Bound} = \text{Center} - \text{Margin}
$$

$$
\text{CI Upper Bound} = \text{Center} + \text{Margin}
$$

where:
- \( $\hat{p}$ \) is the observed proportion (e.g., precision)
- \( z \) is the z-score corresponding to the desired confidence level (1.96 for 95% confidence)
- \( n \) is the total number of instances (e.g., TP + FP)

The Wilson score interval is useful when the sample size is small or when the proportion is near 0 or 1. Unlike the normal approximation interval, the Wilson interval does not produce probabilities outside the [0, 1] range and adjusts the observed proportion by a factor related to the confidence level and sample size, providing a more accurate confidence interval for proportions.

In [None]:
cs_signals_df = cs_signals_df.copy()


# Calculate TP, FP, TN, FN for each CS pattern
cs_signals_df["TP"] = (
    (
        np.sign(cs_signals_df["cs_pattern"])
        == np.sign(cs_signals_df["sign_next_day_return"])
    )
    & (cs_signals_df["cs_pattern"] != 0)
).astype(int)

cs_signals_df["FP"] = (
    (
        np.sign(cs_signals_df["cs_pattern"])
        != np.sign(cs_signals_df["sign_next_day_return"])
    )
    & (cs_signals_df["cs_pattern"] != 0)
).astype(int)

# Does not really make sense, as most of the time the cs patterns dont appear
# cs_signals_df['TN'] = ((cs_signals_df['cs_pattern'] == 0) & (cs_signals_df['sign_next_day_return'] == 0)).astype(int)
# cs_signals_df['FN'] = ((cs_signals_df['cs_pattern'] == 0) & (cs_signals_df['sign_next_day_return'] != 0)).astype(int)

# Group by the 'signal' level of the index (which represents the candlestick pattern)
# and sum the TP, FP, TN, FN for each pattern
performance_metrics = cs_signals_df.groupby("candle").agg(
    {
        "TP": "sum",
        "FP": "sum",
        # 'TN': 'sum',
        # 'FN': 'sum'
    }
)
performance_metrics["total_instances"] = (
    performance_metrics["TP"] + performance_metrics["FP"]
)

# Calculate the performance metrics for each candlestick pattern
performance_metrics["precision"] = performance_metrics["TP"] / (
    performance_metrics["TP"] + performance_metrics["FP"]
)
# performance_metrics['Recall'] = performance_metrics['TP'] / (performance_metrics['TP'] + performance_metrics['FN'])
# performance_metrics['Accuracy'] = (performance_metrics['TP'] + performance_metrics['TN']) / (performance_metrics['TP'] + performance_metrics['FP'] + performance_metrics['TN'] + performance_metrics['FN'])
# performance_metrics['F1 Score'] = 2 * (performance_metrics['Precision'] * performance_metrics['Recall']) / (performance_metrics['Precision'] + performance_metrics['Recall'])

# Replace any NaN values with 0
performance_metrics.fillna(0, inplace=True)


# Calculate the Wilson score interval for each row

z = norm.ppf(0.975)
# performance_metrics['precision_lower'] = 0
# performance_metrics['precision_upper'] = 0
for index, row in performance_metrics.iterrows():
    p_hat = row["precision"]  # The observed proportion (precision)
    n = row["total_instances"]  # The total instances (TP + FP)
    denominator = 1 + z**2 / n
    center = (p_hat + z**2 / (2 * n)) / denominator
    margin = (z * np.sqrt((p_hat * (1 - p_hat) / n) + z**2 / (4 * n**2))) / denominator
    #     performance_metrics.at[index, 'precision_lower'] = center - margin  # somehow this does not work
    #     performance_metrics.at[index, 'precision_upper'] = center + margin
    performance_metrics.at[index, "center"] = center
    performance_metrics.at[index, "margin"] = margin


performance_metrics["ci_upper"] = (
    performance_metrics["center"] + performance_metrics["margin"]
)
performance_metrics["ci_lower"] = (
    performance_metrics["center"] - performance_metrics["margin"]
)
performance_metrics["TP_wilson"] = performance_metrics["center"]


# Sort the performance DataFrame by 'TP' in descending order
performance_metrics = performance_metrics.sort_values(
    by=["total_instances", "TP"], ascending=False
)
performance_metrics

In [None]:
def plot_cs_performance(
    df: pd.DataFrame,
    criterion: str = "total_instances",
    plot_performance: bool = False,
    title_suffix: str = "",
) -> None:
    fig = px.bar(
        df,
        x=df.index,
        y=criterion,
        barmode="group",
    )

    fig.update_layout(
        font={"size": 18},
        legend={
            "yanchor": "top",
            "y": 0.99,
            "xanchor": "right",
            "x": 0.99,
            "font": {"size": 20},
        },
        title=f"{criterion} for {len(df)} CS patterns<br>{title_suffix}",
        xaxis_title="CS Pattern",
        yaxis_title=criterion,
        autosize=False,
        width=800,
        height=800,
    )

    if plot_performance:
        fig.add_hrect(y0=0.0, y1=0.5, line_width=0, fillcolor="red", opacity=0.4)
        fig.add_hrect(y0=0.5, y1=0.55, line_width=0, fillcolor="yellow", opacity=0.4)
        fig.add_hrect(y0=0.55, y1=1.0, line_width=0, fillcolor="green", opacity=0.4)

    fig.show()

    return None

In [None]:
# plot all patterns, ranked by number of instances
plot_cs_performance(
    df=performance_metrics,
    criterion="total_instances",
    title_suffix="across the whole data set.",
)

# plot the patterns, ranked by number of instances, with a true-positive rate >50%.
plot_cs_performance(
    df=performance_metrics.query("precision > 0.5").sort_values(
        by="total_instances", ascending=False
    ),
    criterion="total_instances",
    title_suffix="with precision > 50%.",
)

In [None]:
def plot_cs_performance(
    df: pd.DataFrame, criterion: str = "total_instances", plot_performance: bool = False
) -> None:
    fig = go.Figure()

    # Add the main bar chart
    fig.add_trace(
        go.Bar(
            x=df.index,
            y=df[criterion],
            name=criterion,
            error_y={
                "type": "data",
                "symmetric": False,
                "array": df["ci_upper"] - df[criterion],
                "arrayminus": df[criterion] - df["ci_lower"],
            },
        )
    )

    fig.update_layout(
        font={"size": 18},
        legend={
            "yanchor": "top",
            "y": 0.99,
            "xanchor": "right",
            "x": 0.99,
            "font": {"size": 20},
        },
        title=f"{criterion} for {len(df)} CS patterns across the data set",
        xaxis_title="CS Pattern",
        yaxis_title=criterion,
        autosize=False,
        width=2000,
        height=1000,
    )

    # Add performance zones if plot_performance is True
    if plot_performance:
        fig.add_hrect(y0=0.0, y1=0.5, line_width=0, fillcolor="red", opacity=0.2)
        fig.add_hrect(y0=0.5, y1=0.55, line_width=0, fillcolor="yellow", opacity=0.2)
        fig.add_hrect(y0=0.55, y1=1.0, line_width=0, fillcolor="green", opacity=0.2)

    fig.show()

    return None


plot_cs_performance(
    df=performance_metrics, criterion="TP_wilson", plot_performance=True
)

## How does acting on the trading signals do (idealised situation without transaaction costs)?

In [None]:
trading_signal = (
    cs_signals_df[cs_signals_df["cs_pattern"] != 0][["cs_pattern"]]
    .pivot_table(index="date", columns="candle", values="cs_pattern", aggfunc="sum")
    .sum(axis=1)
    .loc[lambda x: x != 0]
)

trading_signal.plot(
    figsize=(8, 8),
    title="Signal strength based on candle sticks\n(signal > 0 => long; signal < 0 => short)",
)
plt.ylabel("candle")
plt.show()

In [None]:
def compute_trading_strategy_performance(df: pd.DataFrame) -> None:
    mean_intraday_return = df["intraday_return"].mean()
    std_intraday_return = df["intraday_return"].std()
    sharpe_ratio = (260 * mean_intraday_return) / (np.sqrt(260) * std_intraday_return)
    print(f"Annualised strategy return [%]: {260 * mean_intraday_return:.4}")
    print(
        f"Annualised strategy standard deviation of returns [%]: {np.sqrt(260) * std_intraday_return:.4}"
    )
    print(f"Sharpe ratio of strategy: {sharpe_ratio:.4}")
    return None

## How does acting on the trading signals docompare to a buy-and-hold strategy?

In [None]:
performance_trading_signals = (
    df[
        df["date"].isin([date + pd.DateOffset(days=1) for date in trading_signal.index])
    ][["date", "intraday_return"]]
    .assign(account_curve=lambda x: (1 + x["intraday_return"]).cumprod())
    .assign(cumsumret=lambda x: x["intraday_return"].cumsum())
    .assign(time_between_signals=lambda x: x["date"].diff().dt.days)
)

performance_trading_signals.head()

In [None]:
df_reference_strategy = df[["date", "log_return", "intraday_return"]].copy()
df_reference_strategy["account_curve"] = (
    1 + df_reference_strategy["intraday_return"]
).cumprod()
df_reference_strategy["cumsumret"] = df_reference_strategy["intraday_return"].cumsum()

In [None]:
# Calculate the cumulative sum of intraday returns to plot the account curve
performance_trading_signals.hist(column="time_between_signals", bins=20, figsize=(8, 8))
plt.xlabel("Time between signals [days]")
plt.ylabel("Frequency")
plt.title("Histogram of time differences between signals")
plt.show()

fig, ax = plt.subplots(figsize=(8, 8))
ax.plot(
    performance_trading_signals["date"],
    performance_trading_signals["cumsumret"],
    label="cumulative return using candlesticks",
    color="b",
)
ax.plot(
    df_reference_strategy["date"],
    df_reference_strategy["cumsumret"],
    label="cumulative return assuming buy-and-hold",
    color="r",
)
ax.set_xlabel("date")
ax.set_ylabel("cumulative return")
ax.legend(loc="upper left")
ax.set_title("long-short strategy using candlesticks vs. buy-and-hold")
plt.show()

# Compute trading strategy statistics
compute_trading_strategy_performance(df=performance_trading_signals)
print()
compute_trading_strategy_performance(df=df_reference_strategy)

# Does Machine learning help to to reduce the noise in the signals?

The following code pivots the dataframe such that we got the date as an index and the candlestick signals as columns.

Joinging two or more data frames is a very important and frequently occuring task in data science in general and computing in particular, and a topic in its own right. The author encourages you to study merge, joins and concatonations of dataframes in your own time, as we need to focus here at the task at hand, but I cannot stress their importance enough.

In [None]:
df_signal_and_return = (
    cs_signals_df.reset_index()
    .pivot_table(index="date", columns="candle", values="cs_pattern", aggfunc="first")
    .fillna(0)
    .join(df.set_index("date")[["sign_next_day_return"]], how="left")
)

df_signal_and_return

In the next cell it is demonstrated explicitely that the column named `sign_next_day_return` is a categorical variable. Either the return can be positive or negative. Let us neglect the few cases where they are zero. We will filter these cases as to limit ourselves witha binary classification problem. Technically, we could, in logistic regression, accommodate for the occurance of three cathegorical variables, namely positive returns, negative returns and zero returns. However, as zero returns are the minority and machine learning is data hungy, we filter them out. Also the next cell shows that days of zero returns are a clear minority and hence we feel comfortable our slight simplification is indeed justified empirically. 

In [None]:
df_signal_and_return["sign_next_day_return"].value_counts(normalize=True)

Let us quickly review logistic regression that helps us trying to classify a category for next days' return, depending on the candlestick signal. Recall that in logistic regression we map our linear model to a probability:

$$z=\boldsymbol{w^T}\boldsymbol{x}$$

$$P(y = 1) = \frac{1}{(1+\exp(-z))}$$

For classification purposes, we typically then assign this probability to a discrete class (-1 and +1 in our case), based on a threshold (0 by default):

$$y=\left\{
\begin{array}{ll}
    -1, & P(y = 1)\le0 \\
    1, & P(y = 1)>0 \\
\end{array} 
\right.$$ 

In [None]:
# LogisticRegression??
# cost function, gradient descent function, Andrew Ng explains how that works
# James sends me some resource:

In [None]:
N_CORES = multiprocessing.cpu_count()
df_filtered = df_signal_and_return[df_signal_and_return["sign_next_day_return"] != 0]
TEST_SIZE = 0.1

RANDOM_STATE = random.randint(0, 1000)
print(RANDOM_STATE)

X = df_filtered.iloc[:, :-1]
y = df_filtered.iloc[:, -1].astype(int)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=TEST_SIZE, shuffle=False
)
# X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=TEST_SIZE, shuffle=True, random_state=RANDOM_STATE)

# lets quickly confirm whether the length of our train-test split is correct
print(X_train.__len__() / X.__len__())
print(X_test.__len__() / X.__len__())

Note: You could have also slided the data set into training and test splits yourself, for example like so:

```
split_index = int(0.8 * len(df_filtered))  # 80% for training, 20% for testing
df_train = df_filtered.iloc[:split_index]
df_test = df_filtered.iloc[split_index:]
```

In Python and Pandas, slicing follows a consistent rule: the start index is included, and the stop index 
is excluded. This is known as half-open interval: `[start, stop)`. Hence, slicing is end-exclusive and `df_train` and `df_test` are non-overlapping and essentially fulfil the same purpose we achieved using `train_test_split`. 

Let us now fit the logistic regressor on teh training data:

In [None]:
%%time

model = LogisticRegression(max_iter=5000, tol=0.01, n_jobs=N_CORES)
model.fit(X_train, y_train)

# Prediction on training set
pred_train = model.predict(X_train)
assert len(pred_train) == len(y_train)

# Prediction on test set
pred_test = model.predict(X_test)
assert len(pred_test) == len(y_test)

print(
    "Compute hit-rate of strategy obtained from the logistic regressor on the TEST set"
)
print((pred_test == y_test).sum() / len(y_test), end="\n\n")
print(
    "Compute hit-rate of strategy obtained from the logistic regressor on the TRAINING set"
)
print((pred_train == y_train).sum() / len(y_train), end="\n\n")

In [None]:
# quick remark: we could have achieved the same result more tersly, opon invoking:
np.mean(pred_test == y_test)  # 0.5154553049289892

Why can we use `np.mean(pred == y_test)` to compute the hit-rate efficiently?

This is because of how boolean values are treated in numerical operations in Python. When you compare two arrays (or lists) element-wise for equality (pred == y_test), you get an array of boolean values (True or False). In this boolean array, True represents a hit (where the corresponding elements in pred and y_test are equal), and False represents a miss.

In Python, boolean values are a subtype of integers. True is treated as 1, and False is treated as 0. Therefore, when you calculate the mean of an array of boolean values, you are effectively calculating the proportion of True values in the array. This is exactly the hit rate - the proportion of times pred and y_test are equal. The sum of this boolean array gives you the number of hits (since True is 1 and False is 0, the sum is the count of True values).

Dividing this sum by the length of the array gives you the average value, which is the proportion of hits.
So, np.mean(pred == y_test) is a neat way to calculate the hit rate directly from the arrays without explicitly counting the hits and dividing by the total number of elements.

In [None]:
y_train.value_counts(normalize=True)

In [None]:
y_test.value_counts(normalize=True)

In [None]:
# class weighting will disproportionally weight the learning from less often occuring instances as compared to more occuring ones.
# say, we got a segmentation problem in image recognition.

In [None]:
tmp = (
    df[df["date"].isin(y_test.index)][
        ["date", "intraday_return", "sign_next_day_return"]
    ]
    .dropna()
    .reset_index(drop=True)
    .assign(pred=pred_test)  # directly assigning pred without reindexing
    .assign(
        daily_return_strategy=lambda x: np.where(
            np.sign(x["sign_next_day_return"]) == np.sign(x["pred"]),
            abs(x["intraday_return"]),
            -abs(x["intraday_return"]),
        )
    )
    .assign(account_curve=lambda x: x["daily_return_strategy"].cumsum())
    .assign(contrarian_account_curve=lambda x: -1 * x["daily_return_strategy"])
)
tmp

In [None]:
plt.figure(figsize=(10, 10))

# Define common bins
min_value = min(tmp["intraday_return"].min(), tmp["intraday_return"].min())
max_value = max(tmp["intraday_return"].max(), tmp["intraday_return"].max())
bins = np.linspace(min_value, max_value, 40)

# Calculate histogram for 'hits'
hits_data = tmp[tmp["sign_next_day_return"] == tmp["pred"]]["intraday_return"]
counts_hits, _ = np.histogram(hits_data, bins=bins)
percent_hits = counts_hits / counts_hits.sum()

# Calculate histogram for 'misses'
misses_data = tmp[tmp["sign_next_day_return"] != tmp["pred"]]["intraday_return"]
counts_misses, _ = np.histogram(misses_data, bins=bins)
percent_misses = counts_misses / counts_misses.sum()

# Verify that the sum of the percentages is equal to 1
print("Sum of percentages for hits histogram:", np.sum(percent_hits))
print("Sum of percentages for misses histogram:", np.sum(percent_misses))

# Plot the histograms
plt.hist(bins[:-1], bins, weights=percent_hits, alpha=0.5, label="hits")
plt.hist(bins[:-1], bins, weights=percent_misses, alpha=0.5, label="misses")

plt.legend()
plt.title("Intraday return histogram [percentage]")
plt.xlabel("intraday return")
plt.ylabel("percentage")
plt.show()

Note: in case you wish to quickly inspect the source code inline a jupyter notebook, of a function you wrote earlier and with to recall, you can invoke either of the following statements:

```
import inspect
print(inspect.getsource(compute_trading_strategy_performance))
```

```
compute_trading_strategy_performance??
```

In [None]:
compute_trading_strategy_performance(df=tmp)

In [None]:
tmp.plot(
    x="date",
    y="account_curve",
    title="Comparing account curve of Machine Learning strategy",
    figsize=(10, 10),
)
plt.show()

In [None]:
fig, ax = plt.subplots(figsize=(8, 8))

# Plotting the first curve
ax.plot(
    performance_trading_signals["date"],
    performance_trading_signals["cumsumret"],
    label="cumulative return using candlesticks",
    color="b",
)

# Plotting the second curve
ax.plot(
    df_reference_strategy["date"],
    df_reference_strategy["cumsumret"],
    label="cumulative return assuming buy-and-hold",
    color="r",
)

# Adding the third curve from 'tmp'
ax.plot(
    tmp["date"],
    tmp["account_curve"],
    label="cumulative return using ML strategy",
    color="g",
)  # Assuming color green

ax.set_xlabel("date")
ax.set_ylabel("cumulative return")
ax.legend(loc="upper left")
ax.set_title("long-short Strategy using candlesticks vs. buy-and-bold vs. ML strategy")

plt.show()

In [None]:
# how often [%] do we predict the correct returns using our Machine learning framework?
hits_data.__len__() / (hits_data.__len__() + misses_data.__len__())

In [None]:
### Idea: Get tail ratio in for the trading strategies

In [None]:
(1 + tmp["account_curve"].iloc[-1]) ** (
    1 / 3
) - 1  # annual geometric return during test period

In [None]:
(1 + 0.2073185303721532) ** 3

In [None]:
1 / 400 * 100

In [None]:
# regularizing the regression? (setting weights to zero for those CS signals)
# can we integrate this in Interactive Broker's API?

#
# - github actions to run pre-commit
# https://github.com/kynan/nbstripout/blob/master/.pre-commit-hooks.yaml
# https://github.com/EnergySystemsModellingLab/MUSE_OS/blob/4dd1a0a42889a5551b68a29faf25384c08849a90/.pre-commit-config.yaml#L32


# Meeting with James next: Tuesday 6 in the afternoon (send invite).

## Recursive feature elimination

`sklearn.feature_selection.RFECV` is a feature selection method from scikit-learn that stands for `Recursive Feature Elimination with Cross-Validation`. It is used to select the most significant features by recursively considering smaller and smaller sets of features.

- Recursive Feature Elimination (RFE): It fits a model and removes the weakest feature(s) (i.e., the feature(s) with the least importance according to the model), then fits a new model on the reduced dataset. This process is repeated until all features have been ranked by their importance.


- Cross-Validation (CV): The RFE procedure is embedded within a cross-validation loop to determine the optimal number of features. For each iteration, a different subset of the data is used for training and validation, and the performance of the model is assessed. This helps to ensure that the selection of features is robust and performs well on unseen data.


`selector.ranking_`: This array shows the ranking of features. A rank of 1 indicates that the feature was selected as important. Higher ranks indicate the order in which features were removed, where a rank of 2 means the feature was the second least important, a rank of 3 means the feature was the third least important, and so on.

Looking at `selector.ranking_` array, it seems that most features were selected (rank 1). Some features have higher rank values, like 4 and 3 indicating their relative importance was considered lower by the `RFECV` process. These features were removed in the corresponding iterations of the feature elimination process.

The feature with a rank of 4 was removed first, the one with 3 was removed second, and this pattern continues until all features with a rank greater than 1 are removed. The features with a rank of 1 are considered important and are selected by the `RFECV`. These are the features you might consider keeping in your model for the best balance between model complexity and performance as determined by cross-validation.

In [None]:
%%time
# takes a about 80 seconds to run for one stock, but surely is dependent on the number of CPU cores on your machine.

N_CORES = multiprocessing.cpu_count()
model = LogisticRegressionCV(
    cv=5, multi_class="ovr", max_iter=1000, n_jobs=N_CORES
)  # If we choose multi_class=‘ovr’, then a binary problem is fit for each label.
model.fit(X_train, y_train)

# See whether the L2 / L3 metrics add value to out strategy
selector = RFECV(model, cv=5, n_jobs=N_CORES)
selector.fit(X_train, y_train)

print(selector.ranking_)

selected_features = selector.get_support()

In [None]:
selector.ranking_.__len__()  # this corresponds to the numbe rof candlesticks we investigate

In [None]:
pred = model.predict(X_test)
assert len(pred) == len(y_test)

print("Compute hit-rate of strategy obtained from the logistic regressor")
print((pred == y_test).sum() / len(y_test))
model.score(X_test, y_test)  # gives same result

In [None]:
# Apply this mask to X_train and X_test to keep only important features
X_train_selected = X_train.loc[:, selected_features]
X_test_selected = X_test.loc[:, selected_features]

# Refit the model on the selected features
model_selected = LogisticRegressionCV(
    cv=5, multi_class="ovr", max_iter=1000, n_jobs=N_CORES
)
model_selected.fit(X_train_selected, y_train)

y_pred_selected = model_selected.predict(X_test_selected)
model_selected.score(X_test_selected, y_test)

# Your task: Can you improve the strategy further?

# Literature 

[1] Scientific Guide To Price Action and Pattern Trading Wisdom of Trend, Cycle, and Fractal Wave (Young Ho Seo)

If you are interested to back-test more trading strategies, you can look at:

[2] Finding Alphas: A Quantitative Approach to Building Trading Strategies Hardcover – 27 Sept. 2019 by Igor Tulchinsky (Editor)

[3] 151 Trading Strategies, Z. Kakushadze and J.A. Serur. 151 Trading Strategies. Cham, Switzerland: Palgrave Macmillan, an imprint of Springer Nature, 1st Edition (2018), XX, 480 pp; ISBN 978-3-030-02791-9, https://papers.ssrn.com/sol3/papers.cfm?abstract_id=3247865

# Notes (delete in final version)

1) Pull-Requests on github (Python package index)

2) Modularize the code using classes/OOP/Design Patterns.

3) Writing Unit-Test

4) CI test suits

5) Storage of data (backend)

In [None]:
# TODO's:

# Get strategy of logistic regression in as a third one to compare angainst the naive candle stic and the buy and hold one

# document the notebook in more detail and get it into a nice shape (ongoing process).
# IDEA: what to do in case of imbalanced features? I could use imblean,
# however, that will alter the data.(Maybe James has a better idea.)?

# jupytext to convert jupyter notebooks to text
# figure out about Teams sharing screen though not being a presenter
# TODO:Account curve
# contact CRSP regarding corporate actions meta data, i.e. stock splits and dividends and how to best correct prices.
# benchmark: SP500 for the overall set, but for one stock only, the buy-and-hold strategy is sufficient.

## Optional: Talk about split and dividend adjustment

In [None]:
# does contain stock splits
# df_reference_strategy = df[['date','log_return']].copy()
# df_reference_strategy['account_curve'] = (1 + temp_df2['log_return']).cumprod()
# df_reference_strategy['account_curve'].plot()
# plt.show()

# does contain stock splits
df_reference_strategy["cumsumret"].plot()
plt.show()

In [None]:
df["log_return"].plot()
plt.show()

In [None]:
# outlier returns should align with stock splits => CRSP data not split- and divident-adjusted
condition = (df["log_return"] > 0.2) | (df["log_return"] < -0.2)
df[condition | condition.shift(1) | condition.shift(-1)]

## Lets verify the shape of the account curve is the same for percentage changes and for closing prices

In [None]:
(1 + df["close"].pct_change()).cumprod()

In [None]:
(df["close"] / df["close"][0]).plot(figsize=(8, 8))
plt.show()
print((df["close"] / df["close"][0]).iloc[-1])

In [None]:
df.plot(x="date", y="close", figsize=(8, 8))
plt.title("Account curve to buy and hold MSFT")
plt.show()

## How to visually inspect the signals?

In [None]:
def plot_cs_chart(df: pd.DataFrame, ListingId=None) -> go.Figure:
    if ListingId is None:
        fig = go.Figure(
            data=[
                go.Candlestick(
                    x=df.index,
                    open=df["open"],
                    high=df["high"],
                    low=df["low"],
                    close=df["close"],
                )
            ]
        )

        fig.update_layout(
            font={"size": 18},
            legend={
                "yanchor": "top",
                "y": 0.99,
                "xanchor": "right",
                "x": 0.99,
                "font": {"size": 20},
            },
            title="Evolution of price chart diplayed as candlestick chart",
            xaxis_title="time [days]",
            yaxis_title="price",
            autosize=False,
            width=800,
            height=800,
        )

        fig.show()

    elif ListingId is not None:
        fig = go.Figure(
            data=[
                go.Candlestick(
                    x=df["Date"][df["ListingId"] == ListingId],
                    open=df["open"][df["ListingId"] == ListingId],
                    high=df["high"][df["ListingId"] == ListingId],
                    low=df["low"][df["ListingId"] == ListingId],
                    close=df["close"][df["ListingId"] == ListingId],
                )
            ]
        )

        fig.update_layout(
            font={"size": 18},
            legend={
                "yanchor": "top",
                "y": 0.99,
                "xanchor": "right",
                "x": 0.99,
                "font": {"size": 20},
            },
            title="Evolution of price chart diplayed as candlestick chart",
            xaxis_title="time [days]",
            yaxis_title="price",
            autosize=False,
            width=800,
            height=800,
        )

        fig.show()

    else:
        print("Do not understand plotting instruction. Check data frame and ListingId")


plot_cs_chart(result_df.loc["CDLDOJI"].reset_index().set_index("date")[:20])

# Appendix : These is a collection of rough work for the author of this notebook. Ignore. 

Mike's idea:

The broker is rewarded a fraction of over performacne

50 days we trade 1% at a price of 100; then the price crashes to 90 and we trade 5 days at 90.

Questions: Can a trader benefit from knowing there is a share buy-back? I.e. Buying back shares should create a surplus demand

(50 * 100 + 5 * 90) / 55

```
import talib

def compute_intraday_return(row):
    row['positive_intraday'] = np.where((row['Close|Executed'] - row['Open|Executed']) > 0, True, False)
    return row

cs_performance_dict = dict()

for ListingId in df['ListingId'].unique():
    equity_df = pd.DataFrame(index=df[df['ListingId']==ListingId]['Date'], 
                                data=df[df['ListingId']==ListingId][['Open|Executed', 'High|Executed', 'Low|Executed', 'Close|Executed']].values, 
                                columns = df[df['ListingId']==ListingId][['Open|Executed', 'High|Executed', 'Low|Executed', 'Close|Executed']].columns
                            )
    equity_df = equity_df.apply(compute_intraday_return, axis='columns')
    
    results = []
    cols = []
    for attr in dir(talib):
        if attr[:3]=='CDL':
            res = getattr(talib, attr)(equity_df['Open|Executed'], equity_df['High|Executed'], equity_df['Low|Executed'], equity_df['Close|Executed'])
            results.append(res)
            cols.append(attr)

    patterns = pd.DataFrame(results).T
    patterns.columns = cols
    
    signal_df = patterns[(patterns.select_dtypes(include=['number']) != 0).any(1)]  # alternative: patterns.loc[(patterns.loc[:, patterns.dtypes != object] != 0).any(1)]
    
    signal_and_return_df = signal_df.sum(axis=1).rename('cumulative_signal').to_frame().merge(equity_df['positive_intraday'].shift(-1), left_index=True, right_index=True)[:-1]  # dont count the last as its return will be NaN
    signal_and_return_df = signal_and_return_df.loc[signal_and_return_df['cumulative_signal'] != 0]
    signal_and_return_df['performance'] = np.where((signal_and_return_df['cumulative_signal']>0) & (signal_and_return_df['positive_intraday']==True), True,
                                                   np.where((signal_and_return_df['cumulative_signal']<0) & (signal_and_return_df['positive_intraday']==False), True, False))
    
    TP = signal_and_return_df['performance'].sum() / (signal_and_return_df['performance'].shape[0] & Where the signaal occurs )  #TODO modify the ratio of TP/FP
    FP = 1-TP                                                                                      # 
    
    cs_performance_dict[ListingId] = {'TP_rate':round(TP, 4), 'FP_rate':round(FP, 4)}
```