In [1]:
#import os
import random
import numpy as np
import pandas as pd
import yfinance as yf
import plotly.graph_objects as go
#import chart_studio
#import chart_studio.plotly as py
#from dotenv import load_dotenv

### NOTE
You don't need to run the piece of code just below which will be be commented out. Its only purpose was for me to save the interactive plots and let me share them in the Medium article. It requires an account in Plotly Chart Studio, as well as its own API key. I am not going to remove these piece of code because someone might be interested in this optionality.

In [2]:
# load environment variables
#load_dotenv()
# set chart studio credentials
#chart_studio.tools.set_credentials_file(username=os.getenv('PLOTLY_CHART_STUDIO_USERNAME'), 
#                                        api_key=os.getenv('PLOTLY_CHART_STUDIO_API_KEY'))

Reading this notebook via GitHub folder will NOT show the interactive Plotly charts. I STRONGLY suggest you to run this code in your local machine.

### DATA RETRIEVAL

In [3]:
# Define the ticker symbol
ticker = 'AAPL'

# Define the start date
start_date = '1995-01-01'

# Initialize the stock
stock = yf.Ticker(ticker)

stock_data = {}
# Retrieve the data regarding the stock
stock_info = stock.info
historical_data = stock.history(start=start_date)
splits = stock.splits
stock_data[ticker] = {
            'info': stock_info,
            'historical_data': historical_data,
            'splits': splits,
        }

In [4]:
stock_data[ticker]['splits']

Date
2000-06-21 00:00:00-04:00    2.0
2005-02-28 00:00:00-05:00    2.0
2014-06-09 00:00:00-04:00    7.0
2020-08-31 00:00:00-04:00    4.0
Name: Stock Splits, dtype: float64

In [5]:
# Define the dataset we will work on
df = stock_data[ticker]['historical_data']

In [6]:
df.head(10)

Unnamed: 0_level_0,Open,High,Low,Close,Volume,Dividends,Stock Splits
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
1995-01-03 00:00:00-05:00,0.290143,0.290143,0.28268,0.286411,103868800,0.0,0.0
1995-01-04 00:00:00-05:00,0.288277,0.295741,0.288277,0.293875,158681600,0.0,0.0
1995-01-05 00:00:00-05:00,0.292941,0.293875,0.28921,0.290143,73640000,0.0,0.0
1995-01-06 00:00:00-05:00,0.310668,0.321863,0.306936,0.313466,1076622400,0.0,0.0
1995-01-09 00:00:00-05:00,0.310668,0.312533,0.306002,0.307519,274086400,0.0,0.0
1995-01-10 00:00:00-05:00,0.307869,0.328393,0.307869,0.326061,614790400,0.0,0.0
1995-01-11 00:00:00-05:00,0.326527,0.358713,0.318597,0.348918,873824000,0.0,0.0
1995-01-12 00:00:00-05:00,0.344253,0.346119,0.333991,0.338656,551779200,0.0,0.0
1995-01-13 00:00:00-05:00,0.344253,0.344253,0.331192,0.334924,351377600,0.0,0.0
1995-01-16 00:00:00-05:00,0.334924,0.337723,0.330259,0.332125,188977600,0.0,0.0


In [7]:
df.describe()

Unnamed: 0,Open,High,Low,Close,Volume,Dividends,Stock Splits
count,7433.0,7433.0,7433.0,7433.0,7433.0,7433.0,7433.0
mean,31.354382,31.692921,31.033339,31.376932,386515600.0,0.001109,0.002018
std,51.896628,52.458182,51.383768,51.946017,376948200.0,0.014283,0.099087
min,0.097189,0.10002,0.096245,0.097661,24048300.0,0.0,0.0
25%,0.354487,0.362639,0.347239,0.355241,130102400.0,0.0,0.0
50%,5.759042,5.827584,5.703484,5.753911,272053600.0,0.0,0.0
75%,31.749346,31.870536,31.691082,31.849567,503983200.0,0.0,0.0
max,231.389999,233.080002,229.25,232.979996,7421641000.0,0.25,7.0


### DATA CLEANING

Let's start with a first raw cleaning of the dataset

In [8]:
initial_rows = len(df)
# Remove rows with missing values
df = df.dropna(subset=['Open', 'High', 'Low', 'Close'], how='any')

# Remove rows with negative prices (valid for stocks)
df = df[(df['Open'] > 0) & (df['High'] > 0) & (df['Low'] > 0) & (df['Close'] > 0)]

# Remove rows with Open, High, Low, Close values that don't make sense
df = df[df['High'] >= df['Low']]
df = df[df['High'] >= df['Open']]
df = df[df['High'] >= df['Close']]
df = df[df['Low'] <= df['Open']]
df = df[df['Low'] <= df['Close']]

final_rows = len(df)
if initial_rows != final_rows:
    print(f'{initial_rows - final_rows} rows were removed from {ticker} dataset because of missing or incorrect data')

4 rows were removed from AAPL dataset because of missing or incorrect data


Let's round the values because we don't like too many decimals

In [9]:
columns_to_round = ['Open', 'High', 'Low', 'Close']
# Rules: price > 10: round to 2 decimal places; price > 1: round to 3 decimal places; price < 1: round to 4 decimal places
for column in columns_to_round:
        df[column] = df[column].apply(lambda x: round(x, 2) if x > 10 else round(x, 3) if x > 1 else round(x, 4))

Let's check for anomalies within the dataset: occurrences which may be correct but should be investigated, especially if they are recurrent in the dataset.

In [10]:
# Dates with OHLC all the same
same_price_dates = df[(df['Open'] == df['High']) & (df['High'] == df['Low']) & (df['Low'] == df['Close'])].index.tolist()

# Dates with low volume
low_volume_dates = df[df['Volume'] < 1000].index.tolist()

# Calculate the average price
avgPrice = df[['Open', 'High', 'Low', 'Close']].mean(axis=1)

# Calculate the excursion of the average price
max_avg_price = avgPrice.max()
min_avg_price = avgPrice.min()
pct_excursion = (max_avg_price - min_avg_price) / min_avg_price * 100

if same_price_dates:
    print(f"Ticker: {ticker} has the same OHLC prices on {len(same_price_dates)} dates")
    print(same_price_dates)

if low_volume_dates:
    print(f"Ticker: {ticker} has low volume on {len(low_volume_dates)} dates")
    print(low_volume_dates)

if pct_excursion < 75:
    print(f"Ticker: {ticker} has a price excursion of less than 75% in the whole period: {pct_excursion:.2f}%")

In [11]:
anomalies = {
    'Open-pClose Anomalies': [],
    'High-Low Anomalies': [],
    'Close-Open Anomalies': []
}
threshold1 = 0.35
threshold2 = 0.5
# Find other anomalies in the prices of the stock day by day
for i in range(1, len(df)):
    previous_close = df.iloc[i-1]['Close']
    current_open = df.iloc[i]['Open']
    current_high = df.iloc[i]['High']
    current_low = df.iloc[i]['Low']
    current_close = df.iloc[i]['Close']

    # Check if the open is more than 35% higher or lower than the previous close
    if abs(current_open - previous_close) / previous_close > threshold1:
        anomalies['Open-pClose Anomalies'].append((df.index[i], current_open, previous_close))

    # Check if the high-low excursion is more than 50%
    if (current_high - current_low) / current_low > threshold2:
        anomalies['High-Low Anomalies'].append((df.index[i], current_high, current_low))

    # Check if the close is more than 35% higher or lower than the open
    if abs(current_close - current_open) / current_open > threshold1:
        anomalies['Close-Open Anomalies'].append((df.index[i], current_close, current_open))

num_anomalies = sum([len(anomalies[key]) for key in anomalies.keys()])
if num_anomalies:
    print(f"Ticker: {ticker} has {num_anomalies} anomalies:")
    for key, value in anomalies.items():
        if value:
            print(f"  {key}: {len(value)}")
            print(value)

Ticker: AAPL has 1 anomalies:
  Open-pClose Anomalies: 1
[(Timestamp('2000-09-29 00:00:00-0400', tz='America/New_York'), 0.4256, 0.8077)]


If you find some anomalies it would be better to investigate the reasons of such anomalies. If the dataset has too many recurrent anomalies or if the causes you found may be source of biased behavior in the stock prices, then maybe the best option is just to change the stock!

### CANDLESTICK PLOT

NB: Plotly is used throughout this project for its interactivity

In [12]:
fig = go.Figure( data = [go.Candlestick(x = df.index,
                    open  = df["Open"],
                    high  = df["High"],
                    low   = df["Low"],
                    close = df["Close"])],
                    )
# Add title, axis labels, remove rangeslider
fig.update_layout(title = dict(text = f'{ticker} Price from 1995 onwards',
                font = dict(size=18, color='red'),
                x = 0.5,
                ),
                yaxis_title = "Price (log)",
                xaxis_title = "Date",
                xaxis_rangeslider_visible = False,
                autosize = True,
                hovermode = 'x unified',
                )

fig.update_xaxes(showline = True, 
                linewidth = 2, 
                linecolor = 'black',
                mirror = True,
                tickangle = -45,
                )

fig.update_yaxes(type = "log",            # this adds a logarithmic scale on the y axis
                showgrid = True,
                gridcolor = 'blue', griddash = "dash",
                showline = True, linewidth = 2, 
                linecolor = 'black', mirror = True,
                )

fig.show()

In [13]:
#py.plot(fig, filename="AAPL_Prices", auto_open = False)

### CANDLESTICK PATTERN

For simplicity i will use a classical pattern callled Bullish Engulfing. You are free to add any filters you want or design and write your own pattern

In [14]:
# Add the returns and log returns to the dataframe
df['Returns'] = df['Close'].pct_change()
df['Log_Returns'] = np.log(1 + df['Returns'])

# Start by calculating the ATR - Average True Range (will be used as filter)
period = 14     # standard ATR period
df['Prev_Close'] = df['Close'].shift(1)

# Calculate the True Range (TR)
df['TR'] = df[['High', 'Low', 'Prev_Close']].apply(
    lambda row: max(row['High'] - row['Low'], 
                    abs(row['High'] - row['Prev_Close']), 
                    abs(row['Low'] - row['Prev_Close'])), axis=1)

# Calculate the initial ATR as the rolling mean of the first 'period' TR values
df['ATR'] = df['TR'].rolling(window=period).mean()
# Calculate subsequent ATR values using the formula:
# ATR(i) = (ATR(i-1) * (period - 1) + TR(i)) / period
for i in range(period+1, len(df)):
    df.loc[df.index[i], 'ATR'] = (df.iloc[i-1]['ATR'] * (period - 1) + df.iloc[i]['TR']) / period

df.drop(columns=['Prev_Close'], inplace=True)

In [15]:
df.tail(10)

Unnamed: 0_level_0,Open,High,Low,Close,Volume,Dividends,Stock Splits,Returns,Log_Returns,TR,ATR
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
2024-06-28 00:00:00-04:00,215.77,216.07,210.3,210.62,82542700,0.0,0.0,-0.016254,-0.016388,5.77,4.824019
2024-07-01 00:00:00-04:00,212.09,217.51,211.92,216.75,60402900,0.0,0.0,0.029105,0.028689,6.89,4.971589
2024-07-02 00:00:00-04:00,216.15,220.38,215.1,220.27,58046200,0.0,0.0,0.01624,0.016109,5.28,4.993618
2024-07-03 00:00:00-04:00,220.0,221.55,219.03,221.55,37369800,0.0,0.0,0.005811,0.005794,2.52,4.816931
2024-07-05 00:00:00-04:00,221.65,226.45,221.65,226.34,60412400,0.0,0.0,0.02162,0.02139,4.9,4.822865
2024-07-08 00:00:00-04:00,227.09,227.85,223.25,227.82,59085900,0.0,0.0,0.006539,0.006518,4.6,4.806946
2024-07-09 00:00:00-04:00,227.93,229.4,226.37,228.68,48076100,0.0,0.0,0.003775,0.003768,3.03,4.680021
2024-07-10 00:00:00-04:00,229.3,233.08,229.25,232.98,62627700,0.0,0.0,0.018804,0.018629,4.4,4.66002
2024-07-11 00:00:00-04:00,231.39,232.39,225.77,227.57,64710600,0.0,0.0,-0.023221,-0.023495,7.21,4.842161
2024-07-12 00:00:00-04:00,228.92,232.64,228.68,230.54,53008200,0.0,0.0,0.013051,0.012966,5.07,4.858435


We will use the pattern known as Bullish Engulfing

In [16]:
# Calculate Bullish Engulfing pattern
body = abs(df['Close'] - df['Open'])
full_range = abs(df['High'] - df['Low'])
# Determine if the candle is bullish (True) or bearish (False)
direction = df['Close'] > df['Open']

# Shift columns to keep track of the previous candle
prev_open = df['Open'].shift(1)
prev_close = df['Close'].shift(1)
prev_body = body.shift(1)
prev_direction = direction.shift(1)

# Bullish engulfing pattern conditions
bullish_engulfing = (
    (direction == True) &           # current candle is bullish
    (prev_direction == False) &     # previous candle was bearish
    (df['Close'] > prev_open) &     # current close is higher than previous open
    (df['Open'] < prev_close)       # current open is lower than previous close
)

# Add any additional filters you choose
engulfing_mask = (
        bullish_engulfing &
        (body > 2 * prev_body) &
        (full_range >= 1*df['ATR'])
        # add more filters here if needed
    )

print(len(engulfing_mask))
print(f'The occurrences of Bullish Engulfing pattern in the dataset are {engulfing_mask.sum()}')

7429
The occurrences of Bullish Engulfing pattern in the dataset are 106


In [17]:
engulfing_mask

Date
1995-01-03 00:00:00-05:00    False
1995-01-04 00:00:00-05:00    False
1995-01-05 00:00:00-05:00    False
1995-01-06 00:00:00-05:00    False
1995-01-09 00:00:00-05:00    False
                             ...  
2024-07-08 00:00:00-04:00    False
2024-07-09 00:00:00-04:00    False
2024-07-10 00:00:00-04:00    False
2024-07-11 00:00:00-04:00    False
2024-07-12 00:00:00-04:00    False
Length: 7429, dtype: bool

Now that we have our patterns, let's draw and check them both in the time series and visually in a zoomed area of the chart

In [18]:
pattern_name = 'Bullish Engulfing'
fig = go.Figure()
    
# Add the close price line
fig.add_trace(go.Scatter(
    x = df.index,
    y = df['Close'],
    mode = 'lines',
    name = 'Close Price'
))

# Find the dates where patterns occur
pattern_dates = df.index[engulfing_mask]

# Add vertical lines and labels for each pattern
for date in pattern_dates:
    fig.add_shape(
        type = "line",
        x0 = date, x1 = date,
        y0 = 0, y1 = 1,
        yref = "paper",           # y-coordinate is in paper coordinates, that is normalized in [0, 1]
        line = dict(color="red", width=0.5, dash="dash"),
    )
    
    # Add a label with just the date (not time)
    fig.add_annotation(
        x = date - pd.Timedelta(hours=48),
        y = 0.05,
        yref = "paper",
        text = date.strftime('%Y-%m-%d'),
        showarrow = False,
        textangle = -90,
        font = dict(size=7, color='red'),
    )

fig.update_layout(
    title = dict(text = f'{ticker} Close Price with {pattern_name} occurrences',
                        font=dict(size=18, color='red'),
                        x=0.5,
                        ),
    xaxis_title = 'Date',
    yaxis_title = 'Close Price (log)',
    hovermode = 'x',
    autosize = True,
)

fig.update_xaxes(#showline = True, 
                linewidth = 1, 
                linecolor = 'black', mirror = True,
                tickangle = -45,
                )

fig.update_yaxes(type='log',
                linewidth = 1, 
                linecolor = 'black', mirror = True,
                )

fig.show()

In [19]:
#py.plot(fig, filename="AAPL_Bullish_Engulfing", auto_open = False)

In [21]:
from plotly.subplots import make_subplots

# Find the dates where patterns occur
pattern_dates = df.index[engulfing_mask]

# Select random dates from the pattern_dates up to k instances (which is the max number of subplots you want to show)
max_subplots = 12
section_dates = random.sample(list(pattern_dates), k=max_subplots)
section_dates = sorted(section_dates)

# Determine the number of rows and columns for the subplots
n_plots = len(section_dates)
n_cols = min(3, n_plots)        # max 3 columns
n_rows = (n_plots + n_cols - 1) // n_cols

# Define the number of candles to show in each subplot
max_candles = 20
back_candles = 5        # Number of candles before the engulfing pattern (which will be drawn)
num_candles = 2         # Number of candles in the engulfing pattern

# Create subplot figure
subplot_titles = [date.strftime('%B %d, %Y') for date in section_dates]
fig = make_subplots(rows=n_rows, cols=n_cols, 
                    vertical_spacing=0.1, horizontal_spacing=0.05,
                    subplot_titles=subplot_titles
                    )

for i, date in enumerate(section_dates):
    row = i // n_cols + 1
    col = i % n_cols + 1
    
    # Get the index of the engulfing pattern
    idx = df.index.get_loc(date)
    
    # Select data for the subplot. Must consider the back_candles and the num_candles
    start_idx = max(0, idx - back_candles)
    end_idx = min(len(df), start_idx + max_candles)
    subset = df.iloc[start_idx:end_idx]
    
    # create candlestick trace
    candlestick = go.Candlestick(
        x=subset.index,
        open=subset['Open'],
        high=subset['High'],
        low=subset['Low'],
        close=subset['Close'],
        showlegend=False,
        name = "",          # this is to avoid any trace number in the legend
    )
    
    fig.add_trace(candlestick, row=row, col=col)

    # add rectangle for candlestick pattern
    rect_start = subset.index[back_candles - num_candles + 1]    # First pattern candle
    rect_end = subset.index[back_candles]                           # Pattern candle

    # calculate extended x-coordinates for the rectangle
    x_left = rect_start - pd.Timedelta(hours=12)
    x_right = rect_end + pd.Timedelta(hours=12)
    
    rect_low = min(subset['Low'].iloc[back_candles-num_candles+1:back_candles+1]) * 0.998    # Extend 0.2% below
    rect_high = max(subset['High'].iloc[back_candles-num_candles+1:back_candles+1]) * 1.002  # Extend 0.2% above
    
    # create the rectangle trace
    rect = go.Scatter(
        x=[x_left, x_left, x_right, x_right, x_left],
        y=[rect_low, rect_high, rect_high, rect_low, rect_low],
        mode='lines',
        line=dict(color='red', width=1),
        fill='none',
        showlegend=False,
        hoverinfo='skip'    # No hover info for the rectangle
    )
    
    fig.add_trace(rect, row=row, col=col)
    
    # Update axes in each subplot
    fig.update_xaxes(title_text=None, showgrid=True, 
                    row=row, col=col, rangeslider_visible=False)
    if col == 1:
        fig.update_yaxes(title_text='Prices', showgrid=True, row=row, col=col)
    else:
        fig.update_yaxes(title_text=None, showgrid=True, row=row, col=col)
    #fig.update_yaxes(title_text=None, showgrid=True, row=row, col=col)

# Customize the layout of titles of the subplots
for annotation in fig['layout']['annotations']:
    annotation['font'] = dict(size=16)

fig.update_layout(
    #autosize = True,
    height=300*n_rows,  # Adjust height based on number of rows
    width=1100,         # Fixed width
    showlegend=False,
    hovermode='x unified', 
    title = dict(text = f'Examples of {pattern_name} detected in {ticker} stock',
                                font=dict(size=18, color='red'),
                                x=0.5)
    )

fig.show()

In [21]:
#py.plot(fig, filename="AAPL_Bullish_Engulfing_Examples", auto_open = False)