# Import

In [2]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = 'all'

from itertools import groupby
from operator import itemgetter
import talib
import re
from typing import Optional, Callable, Any, Union
from pathlib import Path
import matplotlib
import numpy as np
import pandas as pd
import ast

import matplotlib.pyplot as plt
import seaborn as sns

sns.set(style="darkgrid")
plt.style.use("dark_background")
plt.rcParams.update({"grid.linewidth": 0.5, "grid.alpha": 0.5})
plt.rc("figure", figsize=(16, 10))
plt.rc("lines", markersize=4)
plt.rcParams["figure.autolayout"] = True
sns.set_context("poster")

import plotly.graph_objects as go
import plotly.io as pio
from plotly.subplots import make_subplots
import scipy as sc
import yfinance as yf
from plotly.offline import  init_notebook_mode
pio.templates.default = "plotly_dark"
init_notebook_mode(connected=True)

# Talib

Technical Analysis Library
- https://ta-lib.org/

- Multi-Platform Tools for Market Analysis ...
TA-Lib is widely used by trading software developers requiring to perform technical analysis of financial market data.

- Includes 200 indicators such as ADX, MACD, RSI, Stochastic, Bollinger Bands etc... (more info)
Candlestick pattern recognition
Open-source API for C/C++, Java, Perl, Python and 100% Managed .NET

The Python wrapper for TA-LIB, https://mrjbq7.github.io/ta-lib/doc_index.html, which we will use is based on Cython and Numpy.

# Data

In [3]:
tickers = ['GOOG']
df = yf.download(tickers=tickers)

[*********************100%***********************]  1 of 1 completed


# Plot stock

In [4]:
def plot_stock() -> go.Figure:
    fig = make_subplots(
                            rows=3, cols=1,
                            row_heights=[0.5, 0.25, 0.25],
                            shared_xaxes=True,
                            vertical_spacing=0.04,
                    )  
    

    fig.add_trace(go.Candlestick(x=df.index,
                    open=df['Open'],
                    high=df['High'],
                    low=df['Low'],
                    close=df['Close'],
                    name='Candles'
                    ), 
                    row=1, col=1
    )
        
    fig.add_trace(go.Scatter(
    x=df.index,
    y=df.Close,
    name = 'Close',
    mode="lines",
    marker_color='purple'
    ), row=2, col=1)

    fig.add_trace(go.Bar(
        x=df.index, 
        y=df['Volume'],
        name='Volume',
        marker_color=np.tile(['red', 'green'], df.Volume.shape[0])
    ), row=3, col=1) 
    
    fig.update(
        layout_xaxis1_rangeslider_visible=False,
        layout_xaxis2_rangeslider_visible=False,
        layout_xaxis3_rangeslider_visible=False,
    )

    fig.update_layout(
        height=700, width=1200,
        yaxis1_title='Candles',
        yaxis2_title='Close',
        yaxis3_title='Volume',
    )
    
    return fig
plot_stock()

# Scripting

Helper function that identifies increasing sequences in a list and then nests them

In [5]:
def seq_seeker(l:list) -> list[list[Any]]:
    """Groups any increasing sequence of numbers found from the beginning to the end.

    Args:
        l (list): list input

    Returns:
        list[list[Any]]: list of lists of grouped elements.
    """  
    return [list(map(itemgetter(1), g)) for k, g in groupby(enumerate(l), lambda ix: ix[0] - ix[1])]

seq_seeker([2,1,4,5,10,9,8])

[[2], [1], [4, 5], [10], [9], [8]]

Extract pattern info from ta-lib

In [6]:
pattern_full_name = lambda x: re.findall(r'\n\n    (.*?) \(Pattern Recognition\)\n\n', getattr(talib, x).__doc__)[0].rstrip()     # escape the actual parentheses as \(\)

PATTERNS_INFO: list[tuple[Callable, str, Any]] = [
    (getattr(talib, x), x, pattern_full_name(x)) 
    # # talib.get_function_groups().keys()
    for x in talib.get_function_groups()['Pattern Recognition']
]

PATTERNS_INFO

[(<function talib._ta_lib.CDL2CROWS>, 'CDL2CROWS', 'Two Crows'),
 (<function talib._ta_lib.CDL3BLACKCROWS>,
  'CDL3BLACKCROWS',
  'Three Black Crows'),
 (<function talib._ta_lib.CDL3INSIDE>, 'CDL3INSIDE', 'Three Inside Up/Down'),
 (<function talib._ta_lib.CDL3LINESTRIKE>,
  'CDL3LINESTRIKE',
  'Three-Line Strike'),
 (<function talib._ta_lib.CDL3OUTSIDE>,
  'CDL3OUTSIDE',
  'Three Outside Up/Down'),
 (<function talib._ta_lib.CDL3STARSINSOUTH>,
  'CDL3STARSINSOUTH',
  'Three Stars In The South'),
 (<function talib._ta_lib.CDL3WHITESOLDIERS>,
  'CDL3WHITESOLDIERS',
  'Three Advancing White Soldiers'),
 (<function talib._ta_lib.CDLABANDONEDBABY>,
  'CDLABANDONEDBABY',
  'Abandoned Baby'),
 (<function talib._ta_lib.CDLADVANCEBLOCK>,
  'CDLADVANCEBLOCK',
  'Advance Block'),
 (<function talib._ta_lib.CDLBELTHOLD>, 'CDLBELTHOLD', 'Belt-hold'),
 (<function talib._ta_lib.CDLBREAKAWAY>, 'CDLBREAKAWAY', 'Breakaway'),
 (<function talib._ta_lib.CDLCLOSINGMARUBOZU>,
  'CDLCLOSINGMARUBOZU',
  'Closing

Define a dataframe with the naming conventions of the talib patterns

In [7]:
PATTERNS_INFO_df = pd.DataFrame([x[1::] for x in PATTERNS_INFO], columns=['Pattern Func', 'Pattern Name'])
PATTERNS_INFO_df.index += 1
PATTERNS_INFO_df

Unnamed: 0,Pattern Func,Pattern Name
1,CDL2CROWS,Two Crows
2,CDL3BLACKCROWS,Three Black Crows
3,CDL3INSIDE,Three Inside Up/Down
4,CDL3LINESTRIKE,Three-Line Strike
5,CDL3OUTSIDE,Three Outside Up/Down
...,...,...
57,CDLTHRUSTING,Thrusting Pattern
58,CDLTRISTAR,Tristar Pattern
59,CDLUNIQUE3RIVER,Unique 3 River
60,CDLUPSIDEGAP2CROWS,Upside Gap Two Crows


We define the function patterns_signal in order to isolate all pattern dfs that have at least one Bullish(green) and one Bearish(red) pattern.

In [8]:
def patterns_signal(df=df, patterns_list:list[tuple[Callable, str, Any]]=PATTERNS_INFO) -> list[pd.DataFrame]:
    """_summary_

    Args:
        df (_type_, optional): Stock price df. Defaults to df.
        patterns_list (list[tuple[Callable, str, Any]], optional): A list of tuples(func object, func name, pattern name). Defaults to PATTERNS_INFO.

    Returns:
        list[pd.DataFrame]: Dataframes with valid pattern signals.
    """
    # Calculate patterns on df
    patterns_calc: list[pd.DataFrame] = []
    for pfunc, pname, _ in patterns_list:
        elem: pd.DataFrame = pfunc(df.Open, df.High, df.Low, df.Close).to_frame().rename({0: pname}, axis=1)
        # Filter out patterns without Bullish or Bearish signals
        masken: pd.DataFrame = elem != (0)
        elem: pd.DataFrame = elem.loc[masken.any(axis=1), masken.any()]
        if elem.shape[0]>0:
            elem = elem/np.abs(elem)
            factor:pd.Series = df.Close.loc[elem.index]
            elem = elem.mul(factor, axis=0)
            # add plotting/profit properties for later
            elem['Close'] = elem.iloc[:,0]
            elem['marker_symbol'] = elem.iloc[:,0].agg(lambda x: 'triangle-up' if x>0 else 'triangle-down')
            elem['marker_color'] = elem.iloc[:,0].agg(lambda x: 'lightgreen' if x>0 else 'darkred')
            elem['marker_line_color'] = elem.iloc[:,0].agg(lambda x: 'green' if x>0 else 'red')
            elem.iloc[:,0] = np.abs(elem.iloc[:,0])
            patterns_calc += [elem]
    return patterns_calc 

signals = patterns_signal()
signals[1]

Unnamed: 0_level_0,CDL3BLACKCROWS,Close,marker_symbol,marker_color,marker_line_color
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2019-03-15,59.223,-59.223,triangle-down,darkred,red
2022-11-02,87.07,-87.07,triangle-down,darkred,red


We then define the plot_patterns function in order to plot the pattern signal dataframes of patterns_signal() onto the close price.

In [9]:
def plot_patterns(df_patterns:list[pd.DataFrame], df=df) -> go.Figure:
    """Plots the signals of patterns onto the close price of the stock.

    Args:
        df_patterns (list[pd.DataFrame]): Dataframes containing valid patterns signals.
        df (_type_, optional): Stock dataframe. Defaults to df.

    Returns:
        go.Figure: Figure object.
    """    
    fig: go.Figure = go.Figure()
    
    fig.add_trace(go.Scatter(
        x=df.index,
        y=df.Close,
        name = 'Close',
        mode="lines",
    ))
    
    for elem in  df_patterns:
        fig.add_trace(go.Scatter(
            x=elem.index,
            y=elem.iloc[:,0],
            mode="markers",
            text=["Here"],
            textposition="bottom center",
            marker_symbol=elem['marker_symbol'],
            marker_color=elem['marker_color'],
            marker_line_color=elem["marker_line_color"],
            marker_size=10,
            marker_line_width=1,
            name=elem.columns[0]
        ))

    return fig

plot_patterns(df_patterns=signals, df=df)

# Defining Strategy and Profits

- The way to extract profits using patterns is not so straightfoward.
- We need to devise a way of consistently being able to define and extract profits.
- To that end we need to create buy and sell chains as:
  1. Counting every Bullish signal as purchasing 1 full stock.
  2. Counting every Bearish signal as selling all (Bullish) current stocks.
  3. Between every batch of Bullish signals there can only be one Bear signal.
- To that end we define the function patterns_profit, which will:
  2. Allow only consecutive Bullish signals.
     1. On consecutive Bearish patterns only the first one is kept.
  3. Every Bearish pattern is multiplied by the number of previous Bullish patterns.
  4. To extract profit we then sum up all the signals and divide the result by the initial Bullish signal price.

In [10]:
def pattern_strat(df:pd.DataFrame) -> dict[str, list[float]]:
    """Calculates the total change from a pattern dataframe.

    Args:
        df (pd.DataFrame): Contains all the pattern info as generated by patterns_signal function.

    Returns:
        dict[str, float]: {"name of the pattern": change}
    """    
    df: pd.DataFrame = df.reset_index()
    df.index = df.index + 1
    # find all negative price indices for the pattern's Close price
    neg_index: pd.Index = df[df.Close<0].index
    # find all positive price indices for the pattern's Close price
    pos_index: pd.Index = df[df.Close>0].index
    # group the increasing sequences of positive indices
    pos_index_grouped: list[list[Any]] = seq_seeker(pos_index)
    # transform the groupings into multipliers according the number of elements
    index_mult: list[int] = [len(x) for x in pos_index_grouped]
    # Check if we have as much multipliers as negative indices
    if len(index_mult) == len(neg_index):
        for idx, mult in zip(neg_index, index_mult):
            # multiply all negative Closing prices with their multipliers
            df.loc[[idx], 'Close'] = df.loc[[idx], 'Close']*mult
    # finally calculate change        
    change:float = -(df.Close.sum()/np.abs(df.Close[1]))
    return {df.columns[1]:[change]}
pattern_strat(df=signals[1])

{'CDL3BLACKCROWS': [2.4702058376469265]}

In [11]:
def patterns_profit(patterns_signal_list:list[pd.DataFrame]=patterns_signal()) -> dict[pd.DataFrame, list[pd.DataFrame]]:
    """_summary_

    Args:
        patterns_signal_list (list[pd.DataFrame], optional): Contains all the pattern info as generated by patterns_signal function. Defaults to patterns_signal().

    Returns:
        dict[pd.DataFrame, list[pd.DataFrame]]: {'change': total_change, 'df': res_dfs}. total_change is a dataframe containing the pattern name and its change. res_dfs is the pattern_signal_list after conforming to the strategy.
    """    
    total_change = {}
    res_dfs = []
    
    for elem in patterns_signal_list:
        # find consecutive bearish rows
        no_consecut_negativ: pd.Series[bool]=((elem.Close.shift(1)<0) & (elem.Close<0)) 
        # remove every second consecutive bearish row
        # so that we have complete buy and sell signals in the end
        elem: pd.DataFrame = elem[~no_consecut_negativ]

        if elem.shape[0] > 2 and np.any(elem.Close > 0) and np.any(elem.Close < 0):
            start_index: pd.Timestamp = elem[elem.Close > 0].index[0]
            end_index: pd.Timestamp = elem[elem.Close < 0].index[-1]
            if end_index > start_index:  # type: ignore
                elem = elem[start_index:end_index]
                total_change |= pattern_strat(elem)
                res_dfs += [elem]
                
    total_change: pd.DataFrame = pd.DataFrame().from_dict(total_change).T
    total_change = total_change.sort_values(by=0, ascending=False)
    total_change = total_change.reset_index()
    total_change.index = total_change.index +1
    total_change = total_change.rename({0:'Profit', 'index':'Pattern'}, axis=1)
    return {'change': total_change, 'df': res_dfs}

total_change = patterns_profit(patterns_signal_list=signals)

In [12]:
total_change['change']

Unnamed: 0,Pattern,Profit
1,CDLHIKKAKE,45.364658
2,CDLSHORTLINE,38.210665
3,CDLXSIDEGAP3METHODS,33.465675
4,CDLDOJISTAR,31.264481
5,CDLSEPARATINGLINES,30.664321
6,CDLTASUKIGAP,27.789113
7,CDLCLOSINGMARUBOZU,26.805802
8,CDLLONGLINE,26.525567
9,CDLMARUBOZU,21.658769
10,CDLENGULFING,13.893378


In [13]:
total_change['df'][1]

Unnamed: 0_level_0,CDL3OUTSIDE,Close,marker_symbol,marker_color,marker_line_color
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2004-09-13,2.677464,2.677464,triangle-up,lightgreen,green
2004-10-13,3.509346,3.509346,triangle-up,lightgreen,green
2004-11-10,4.180829,-4.180829,triangle-down,darkred,red
2005-04-05,4.696646,4.696646,triangle-up,lightgreen,green
2005-06-17,6.981332,6.981332,triangle-up,lightgreen,green
...,...,...,...,...,...
2022-02-25,134.519501,134.519501,triangle-up,lightgreen,green
2022-03-04,132.121994,-132.121994,triangle-down,darkred,red
2022-03-29,143.250000,143.250000,triangle-up,lightgreen,green
2022-08-03,118.779999,118.779999,triangle-up,lightgreen,green


In [14]:
plot_patterns(df=df, df_patterns=total_change['df'])

In [16]:
total_change

{'change':                 Pattern     Profit
 1            CDLHIKKAKE  45.364658
 2          CDLSHORTLINE  38.210665
 3   CDLXSIDEGAP3METHODS  33.465675
 4           CDLDOJISTAR  31.264481
 5    CDLSEPARATINGLINES  30.664321
 6          CDLTASUKIGAP  27.789113
 7    CDLCLOSINGMARUBOZU  26.805802
 8           CDLLONGLINE  26.525567
 9           CDLMARUBOZU  21.658769
 10         CDLENGULFING  13.893378
 11          CDLBELTHOLD  13.885515
 12       CDLHARAMICROSS   7.060856
 13  CDLGAPSIDESIDEWHITE   4.316843
 14           CDLTRISTAR   3.727045
 15     CDLCOUNTERATTACK   3.603862
 16           CDL3INSIDE   2.136626
 17          CDLHIGHWAVE  -1.879013
 18       CDLSPINNINGTOP  -2.355928
 19            CDLHARAMI -10.834073
 20          CDL3OUTSIDE -33.038474,
 'df': [            CDL3INSIDE       Close  marker_symbol marker_color  \
  Date                                                             
  2004-10-28    4.814454    4.814454    triangle-up   lightgreen   
  2005-05-16    5.75468

In [17]:
def pattern_summary(total_change=patterns_profit(patterns_signal()), patterns_info=PATTERNS_INFO) -> str:
    """_summary_

    Args:
        total_change (_type_, optional): _description_. Defaults to total_change.
        patterns_info (_type_, optional): _description_. Defaults to PATTERNS_INFO.

    Returns:
        str: _description_
    """
    change:pd.DataFrame = total_change['change']
    df_list:list[pd.DataFrame] = total_change['df']
    
    max_profit = round(change[change.Profit > 0].Profit.iloc[1], 2) if change[change.Profit > 0].Profit.shape[0] > 0 else None
    max_profit_pattern = change[change.Profit > 0].Pattern.iloc[1] 
    max_profit_pattern_name = PATTERNS_INFO_df[PATTERNS_INFO_df['Pattern Func'] == max_profit_pattern]['Pattern Name'].values[0]
    max_profit_str = f'\nThe most profitable pattern was {max_profit_pattern}({max_profit_pattern_name}) with {max_profit} increase.' if max_profit else ''
    
    least_profit = round(change[change.Profit > 0].Profit.iloc[-1], 2) if change[change.Profit > 0].Profit.shape[0] > 0 else None
    least_profit_pattern = change[change.Profit > 0].Pattern.iloc[-1]
    least_profit_pattern_name = PATTERNS_INFO_df[PATTERNS_INFO_df['Pattern Func'] == least_profit_pattern]['Pattern Name'].values[0]
    least_proft_str = f'\nThe least profitable pattern was {least_profit_pattern}({least_profit_pattern_name}) with {least_profit} increase.' if least_profit else ''
    
    max_loss = round(change[change.Profit < 0].Profit.iloc[-1], 2) if change[change.Profit < 0].Profit.shape[0] >0 else None
    max_loss_pattern = change[change.Profit < 0].Pattern.iloc[-1] 
    max_loss_pattern_name = PATTERNS_INFO_df[PATTERNS_INFO_df['Pattern Func'] == max_loss_pattern]['Pattern Name'].values[0]
    max_loss_str = f'\nThe max loss pattern was {max_loss_pattern}({max_loss_pattern_name}) with {max_loss} decrease.' if max_loss else ''
        
    least_loss = round(change[change.Profit < 0].Profit.iloc[1], 2) if change[change.Profit < 0].Profit.shape[0] >0 else None
    least_loss_pattern = change[change.Profit < 0].Pattern.iloc[1]
    least_loss_pattern_name = PATTERNS_INFO_df[PATTERNS_INFO_df['Pattern Func'] == least_loss_pattern]['Pattern Name'].values[0]
    least_loss_str = f'\nThe least loss pattern was {least_loss_pattern}({least_loss_pattern_name}) with {least_loss} decrease.' if least_loss else ''    
    
    return f'''Out of the {len(patterns_info)} total patterns only {len(df_list)} comprised complete buy-sell chains.
From these {len(change[change.Profit>0])}/{len(change.Profit)} were profitable. {max_profit_str} {least_proft_str} {max_loss_str} {least_loss_str} '''
print(pattern_summary())

Out of the 61 total patterns only 20 comprised complete buy-sell chains.
From these 16/20 were profitable. 
The most profitable pattern was CDLSHORTLINE(Short Line Candle) with 38.21 increase. 
The least profitable pattern was CDL3INSIDE(Three Inside Up/Down) with 2.14 increase. 
The max loss pattern was CDL3OUTSIDE(Three Outside Up/Down) with -33.04 decrease. 
The least loss pattern was CDLSPINNINGTOP(Spinning Top) with -2.36 decrease. 


# Putting everything together

## 1. Plot stock and indicators

In [149]:
plot_stock()

## 2. Plot all valid patterns

In [150]:
plot_patterns(df=df, df_patterns=patterns_signal())

Note the legend icons are irrelevant. They are just automatically picked from the first row of each pattern.

## 3. Plot strategy-valid patterns

Plot all valid buy-chain patterns compatible with our strategy

In [153]:
plot_patterns(df=df, df_patterns=total_change['df'])

In [18]:
print(pattern_summary())

Out of the 61 total patterns only 20 comprised complete buy-sell chains.
From these 16/20 were profitable. 
The most profitable pattern was CDLSHORTLINE(Short Line Candle) with 38.21 increase. 
The least profitable pattern was CDL3INSIDE(Three Inside Up/Down) with 2.14 increase. 
The max loss pattern was CDL3OUTSIDE(Three Outside Up/Down) with -33.04 decrease. 
The least loss pattern was CDLSPINNINGTOP(Spinning Top) with -2.36 decrease. 
