# Breakout Strategy

In [172]:
import pandas as pd 
import yfinance as yf
import numpy as np
import sys
import os
# Add the parent directory to the sys.path
sys.path.append(os.path.join(os.path.dirname('Practice'), '..'))

from Practice import project_test2

import cufflinks as cf
cf.go_offline()
cf.set_config_file(offline=False, world_readable=True)

import plotly.graph_objects as go

import plotly.offline as offline_py
offline_py.init_notebook_mode(connected=True)


In [173]:
import yfinance as yf

# tickers = ['AAPL', 'AMZN', 'IBM', 'GOOG', 'MSFT','NVDA']
tickers = ['XOM','CVX','COP','EOG','OXY']
prices = yf.download(tickers, start="2018-01-01", end="2020-01-01")

ticker_sample = tickers[0]


[*********************100%%**********************]  5 of 5 completed


In [174]:
prices['Close'].iplot(xTitle='Dates',yTitle='Stock Price', title='Stock Prices')


DatetimeIndex.format is deprecated and will be removed in a future version. Convert using index.astype(str) or index.map(formatter) instead.



### Compute the Highs and Lows in a Window

Using the price highs and lows as an indicator for the breakout strategy. Implement a function to get the maximum high price and minimum low price over a window of days. The variable lookback_days contains the number of days to look in the past. Make sure this doesn't include the current day.

In [175]:
def get_high_lows_lookback(high, low, lookback_days):
    """
    Get the highs and lows in a lookback window.
    Parameters
    ----------
    high : DataFrame
        High price for each ticker and date
    low : DataFrame
        Low price for each ticker and date
    lookback_days : int
        The number of days to look back
    
    Returns
    -------
    lookback_high : DataFrame
        Lookback high price for each ticker and date
    lookback_low : DataFrame
        Lookback low price for each ticker and date
    """
    # avoid the current date in the lookback window

    lookback_high = high.shift(1).rolling(lookback_days).max()
    lookback_low = low.shift(1).rolling(lookback_days).min()


    return lookback_high, lookback_low

# choose 50 days to lookback
lookback_days = 50

lookback_high, lookback_low = get_high_lows_lookback(prices.High, prices.Low, lookback_days)


In [176]:
fig = go.Figure()

fig.add_trace(go.Scatter(x=lookback_high.index, y=lookback_high[f'{ticker_sample}'], name='Lookback High'))
fig.add_trace(go.Scatter(x=lookback_low.index, y=lookback_low[f'{ticker_sample}'], name='Lookback Low'))
fig.add_trace(go.Scatter(x=prices.Close.index, y=prices.Close[f'{ticker_sample}'], name='Close'))

fig.update_layout(title=f'{ticker_sample} Lookback High and Low', xaxis_title='Date', yaxis_title='Price')

### Generate Signals

We will generate signals using the lookback highs and lows computed above. 


| Signal | Condition |
|--------|-----------|
| -1     | Low > Close Price |
| 1  | High < Close Price |
| 0  | Otherwise |
    
    

In [177]:
def get_long_short(close, lookback_high, lookback_low):
    """
    Generate the signals long, short, and do nothing.
    
    Parameters
    ----------
    close : DataFrame
        Close price for each ticker and date
    lookback_high : DataFrame
        Lookback high price for each ticker and date
    lookback_low : DataFrame
        Lookback low price for each ticker and date
    
    Returns
    -------
    long_short : DataFrame
        The long, short, and do nothing signals for each ticker and date
    """
    long_short = close.copy()
    long_short[:] = 0

    long_short[lookback_high < close] = 1
    long_short[lookback_low > close] = -1
    
    
    return long_short.astype(int)

signal = get_long_short(prices.Close, lookback_high, lookback_low)

In [178]:
import plotly.graph_objs as go
from plotly.offline import iplot, init_notebook_mode

# Initialize Plotly for offline mode within Jupyter Notebook
init_notebook_mode(connected=True)



# Example data setup
price = prices['Close'][f'{ticker_sample}']  # Example, assuming 'prices' DataFrame has 'Close' and 'AAPL'
signal_apl = signal[f'{ticker_sample}']  # Assuming 'signals' DataFrame has buy/sell signals for 'AAPL'

# Generate annotations for buy and sell signals
buy_annotations = [{'x': index, \
                    'y': price, \
                    'text': 'Long',\
                    'bgcolor':'#9dbdd5',\
                    'ayref': 'y', 'ax': 0, 'ay': 20}\
        for index, price in price[signal_apl == 1].items()]

sell_annotations = [{'x': index, \
                     'y': price, \
                    'text': 'Short',\
                    'bgcolor': '#ff6f69',\
                    'ayref': 'y', 'ax': 0, 'ay': 160}
                    for index, price in price[signal_apl == -1].items()]

# Define the title
title = f'{ticker_sample} Long and Short Signal'

# Create the layout with annotations
layout = go.Layout(
    title=title,
    annotations=buy_annotations + sell_annotations
)

# Create the stock trace
stock_trace = go.Scatter(
    x=price.index,
    y=price,
    name=f'{ticker_sample}',
    line={'color': '#2D3ECF'}
)

# Create the figure and plot it
fig = go.Figure(data=[stock_trace], layout=layout)
iplot(fig)


We notice multiple signals for short or buy occuring signals on consecutive days. We need to filter them out. Within the lookback window, if the previous signal was the same, change the signal to 0 (do nothing signal).

In [179]:
def clear_signals(signals, window_size):
    """
    Clear out signals in a Series of just long or short signals.
    
    Remove the number of signals down to 1 within the window size time period.
    
    Parameters
    ----------
    signals : Pandas Series
        The long, short, or do nothing signals
    window_size : int
        The number of days to have a single signal       
    
    Returns
    -------
    signals : Pandas Series
        Signals with the signals removed from the window size
    """
    # Start with buffer of window size
    # This handles the edge case of calculating past_signal in the beginning
    clean_signals = [0]*window_size
    
    for signal_i, current_signal in enumerate(signals):
        # Check if there was a signal in the past window_size of days
        has_past_signal = bool(sum(clean_signals[signal_i:signal_i+window_size]))
        # Use the current signal if there's no past signal, else 0/False
        clean_signals.append(not has_past_signal and current_signal)
        
    # Remove buffer
    clean_signals = clean_signals[window_size:]


    # Return the signals as a Series of Ints
    return pd.Series(np.array(clean_signals).astype(np.int16), signals.index)


def filter_signals(signal, lookahead_days):
    """
    Filter out signals in a DataFrame.
    
    Parameters
    ----------
    signal : DataFrame
        The long, short, and do nothing signals for each ticker and date
    lookahead_days : int
        The number of days to look ahead
    
    Returns
    -------
    filtered_signal : DataFrame
        The filtered long, short, and do nothing signals for each ticker and date
    """
    
    # Divide the signal into positive and negative
    positive_signals = signal.where(signal==1, 0)
    negative_signals = signal.where(signal==-1, 0)
    
    for col in signal.columns:
        positive_signals[col] = clear_signals(positive_signals[col], lookahead_days)
        negative_signals[col] = clear_signals(negative_signals[col], lookahead_days)
        
    filtered_signal = positive_signals + negative_signals
        
    return filtered_signal.astype(np.int64)

signal_5 = filter_signals(signal, 5)
signal_10 = filter_signals(signal, 10)
signal_20 = filter_signals(signal, 20)


In [180]:
# plot the signals

def buy_signal_layout(price, signali):
    return [{'x': index, \
                    'y': price, \
                    'text': 'Long',\
                    'bgcolor':'#9dbdd5',\
                    'ayref': 'y', 'ax': 0, 'ay': 60}\
        for index, price in price[signali == 1].items()]

def sell_signal_layout(price, signali):
    return [{'x': index, \
                     'y': price, \
                    'text': 'Short',\
                    'bgcolor': '#ff6f69',\
                    'ayref': 'y', 'ax': 0, 'ay': 100}
                    for index, price in price[signali == -1].items()]


def plot_signal(price, signal, title):
    buy_annotations = buy_signal_layout(price, signal)
    sell_annotations = sell_signal_layout(price, signal)

    layout = go.Layout(
        title=title,
        annotations=buy_annotations + sell_annotations)

    stock_trace = go.Scatter(
        x=price.index,
        y=price,
        name=f'{ticker_sample}',
        line={'color': '#2D3ECF'}
    )

    offline_py.iplot({'data': [stock_trace], 'layout': layout})

close = prices.Close
signal_5 = filter_signals(signal, 5)
signal_10 = filter_signals(signal, 10)
signal_20 = filter_signals(signal, 20)

for signal_data, signal_days in [(signal_5, 5), (signal_10, 10), (signal_20, 20)]:
    plot_signal(
        close[ticker_sample],
        signal_data[ticker_sample],
        'Long and Short of {} Stock with {} day signal window'.format(ticker_sample, signal_days))



Now, the signals are filtered out and are non repeating.

## Lookahead Close Prices

With the trading signal done, we can start working on evaluating how many days to short or long the stocks. We can get the price at a day in future for a trade we executed to get us the returns. 

In [181]:
def get_lookahead_prices(close, lookahead_days):
    """
    Get the lookahead prices for `lookahead_days` number of days.
    
    Parameters
    ----------
    close : DataFrame
        Close price for each ticker and date
    lookahead_days : int
        The number of days to look ahead
    
    Returns
    -------
    lookahead_prices : DataFrame
        The lookahead prices for each ticker and date
    """
    # get price at the end of the lookhead_days
    lookahead_prices = close.shift(-lookahead_days)
    
    return lookahead_prices

lookahead_days = 5
lookahead_prices_5 = get_lookahead_prices(prices.Close, lookahead_days)

lookahead_days = 10
lookahead_prices_10 = get_lookahead_prices(prices.Close, lookahead_days)

lookahead_days = 20
lookahead_prices_20 = get_lookahead_prices(prices.Close, lookahead_days)


In [182]:
def plot_lookahead_prices(prices, lookahead_prices_list, title):
    
    layout = go.Layout(title=title)
    
    traces = []
    for lookahead_prices,days in lookahead_prices_list:
        traces.append(
            go.Scatter(
                x=lookahead_prices.index,
                y=lookahead_prices,
                name='{} Day Lookahead'.format(days)))
        
    traces.append(
        go.Scatter(
            x=prices.index,
            y=prices,
            name=f'{ticker_sample}'))


    offline_py.iplot({'data': traces, 'layout': layout})


lookahead_5 = get_lookahead_prices(close, 5)
lookahead_10 = get_lookahead_prices(close, 10)
lookahead_20 = get_lookahead_prices(close, 20)


plot_lookahead_prices(
    close[ticker_sample].iloc[150:250],
    [
        (lookahead_5[ticker_sample].iloc[150:250], 5),
        (lookahead_10[ticker_sample].iloc[150:250], 10),
        (lookahead_20[ticker_sample].iloc[150:250], 20)],
    '5, 10, and 20 day Lookahead Prices for Slice of {} Stock'.format(ticker_sample))

### Lookahead Returns

We can get the returns from the lookahead prices.

In [183]:
def get_return_lookahead(close, lookahead_prices):
    """
    Calculate the log returns from the lookahead days to the signal day.
    
    Parameters
    ----------
    close : DataFrame
        Close price for each ticker and date
    lookahead_prices : DataFrame
        The lookahead prices for each ticker and date
    
    Returns
    -------
    lookahead_returns : DataFrame
        The lookahead log returns for each ticker and date
    """
    lookahead_returns = np.log(lookahead_prices/close)
    
    return lookahead_returns
    

lookahead_returns_5 = get_return_lookahead(prices.Close, lookahead_prices_5)
lookahead_returns_10 = get_return_lookahead(prices.Close, lookahead_prices_10)
lookahead_returns_20 = get_return_lookahead(prices.Close, lookahead_prices_20)

In [184]:
def plot_lookahead_returns(price,lookahead_returns_list, title):
    
    layout = go.Layout(title=title)
    
    traces = []
    for lookahead_returns,days in lookahead_returns_list:
        traces.append(
            go.Scatter(
                x=lookahead_returns.index,
                y=lookahead_returns,
                name='{} Day Lookahead'.format(days)))
        

    offline_py.iplot({'data': traces, 'layout': layout})

plot_lookahead_returns(
    close[ticker_sample].iloc[150:250],
    [
        (lookahead_returns_5[ticker_sample].iloc[150:250], 5),
        (lookahead_returns_10[ticker_sample].iloc[150:250], 10),
        (lookahead_returns_20[ticker_sample].iloc[150:250], 20)],
    '5, 10, and 20 day Lookahead Returns for Slice {} Stock'.format(ticker_sample))

### Signal Returns

Now we can get the returns due to the trading signal.

In [185]:
def get_signal_return(signal, lookahead_returns):
    """
    Compute the signal returns.
    
    Parameters
    ----------
    signal : DataFrame
        The long, short, and do nothing signals for each ticker and date
    lookahead_returns : DataFrame
        The lookahead log returns for each ticker and date
    
    Returns
    -------
    signal_return : DataFrame
        Signal returns for each ticker and date
    """
    signal_return = signal * lookahead_returns
    
    
    return signal_return


signal_return_5 = get_signal_return(signal_5, lookahead_returns_5)
signal_return_10 = get_signal_return(signal_10, lookahead_returns_10)
signal_return_20 = get_signal_return(signal_20, lookahead_returns_20)

In [193]:
def plot_signal_returns(price,signal_return_list, titles):
    
    layout = go.Layout(
        yaxis2={
            'title': 'Signal Returns',
            'overlaying': 'y',
            'side': 'right'},yaxis={'title': 'Stock Price'})
    
    stock_trace = go.Scatter(x=price.index, y=price, name=ticker_sample)
    
    for (signal_return, signal, lookahead_days), title in zip(signal_return_list, titles):
        non_zero_signals = signal_return[signal_return != 0]
        signal_return_trace = go.Scatter(
                x=non_zero_signals.index,
                y=non_zero_signals,
                name='{} Day Lookahead'.format(lookahead_days),
                yaxis='y2')

        buy_annotations = buy_signal_layout(price, signal)
        sell_annotations = sell_signal_layout(price, signal)

        layout['title'] = title
        layout['annotations'] = buy_annotations + sell_annotations

        offline_py.iplot({'data': [stock_trace, signal_return_trace], 'layout': layout})



title_string ='Returns with {} day signal window for {} stock'
plot_signal_returns(
    close[ticker_sample],
    [
        (signal_return_5[ticker_sample], signal_5[ticker_sample], 5),
        (signal_return_10[ticker_sample], signal_10[ticker_sample], 10),
        (signal_return_20[ticker_sample], signal_20[ticker_sample], 20)],
    [title_string.format(5, ticker_sample), title_string.format(10, ticker_sample), title_string.format(20, ticker_sample)])

### Test for significance

We will try to analyse how significant the returns are compared to the random returns. 

Histogram of the signal returns

In [187]:
from plotly.subplots import make_subplots

def plot_signal_histograms(signal_list, title, subplot_titles):
    
    stacked_list =  [signal.stack() for signal in signal_list]


    fig = make_subplots(rows=1, cols=len(stacked_list), subplot_titles=subplot_titles, print_grid=False)
    fig['layout'].update(title=title, showlegend=False)

    for series_i, signal_series in enumerate(stacked_list):
        filtered_series = signal_series[signal_series != 0].dropna()
        trace = go.Histogram(x=filtered_series)
        fig.append_trace(trace,1,series_i+1)

      
        
    offline_py.iplot(fig)


plot_signal_histograms(
    [signal_return_5, signal_return_10, signal_return_20],
    'Signals for {} Stock'.format(ticker_sample),
    ['5 Day Signal', '10 Day Signal', '20 Day Signal'])



The returns are skewed to towards right side. Let's add a normal distribution to see the skew.

In [188]:
from plotly.subplots import make_subplots

def plot_signal_histograms(signal_list, title, subplot_titles):
    
    stacked_list =  [signal.stack() for signal in signal_list]


    fig = make_subplots(rows=1, cols=len(stacked_list), subplot_titles=subplot_titles, print_grid=False)
    fig['layout'].update(title=title, showlegend=True)

    for series_i, signal_series in enumerate(stacked_list):
        filtered_series = signal_series[signal_series != 0].dropna()
        trace = go.Histogram(x=filtered_series)
        fig.append_trace(trace,1,series_i+1)

        # normal distribution with line

        normal_trace = go.Histogram(x=np.random.normal(np.mean(filtered_series), np.std(filtered_series), len(filtered_series)),name='Normal',)
        fig.append_trace(normal_trace,1,series_i+1)
        
    offline_py.iplot(fig)


plot_signal_histograms(
    [signal_return_5, signal_return_10, signal_return_20],
    'Signals for {} Stock'.format(ticker_sample),
    ['5 Day Returns', '10 Day Returns', '20 Day Returns'])



We see that there are outliers in the histogram that make the distribution skewed. We have to analyse which stocks are causing these outlier returns in our stategy.

### Kolmogorov-Smirnov Test

We'll use the Kolmogorov-Smirnov Test or KS-Test. This test will be applied to teach ticker's signal returns where a long or short signal exits.

In [189]:
# Filter out returns that don't have a long or short signal.
long_short_signal_returns_5 = signal_return_5[signal_5 != 0].stack()
long_short_signal_returns_10 = signal_return_10[signal_10 != 0].stack()
long_short_signal_returns_20 = signal_return_20[signal_20 != 0].stack()

# Getting the signal returns and tickers
long_short_signal_returns_5 = long_short_signal_returns_5.reset_index().iloc[:, [1,2]]

long_short_signal_returns_5.columns = ['ticker', 'signal_return']

long_short_signal_returns_10 = long_short_signal_returns_10.reset_index().iloc[:, [1,2]]
long_short_signal_returns_10.columns = ['ticker', 'signal_return']
long_short_signal_returns_20 = long_short_signal_returns_20.reset_index().iloc[:, [1,2]]
long_short_signal_returns_20.columns = ['ticker', 'signal_return']

# View some of the data
long_short_signal_returns_10

Unnamed: 0,ticker,signal_return
0,XOM,-0.01846791
1,COP,0.04654185
2,OXY,0.01791880
3,CVX,0.01339716
4,EOG,-0.00830676
...,...,...
67,EOG,-0.09905575
68,OXY,0.02894301
69,COP,0.02784809
70,OXY,-0.06153788


In [190]:
from scipy.stats import kstest

# function for kstest on signal returns
def calculate_kstest(long_short_signal_returns):
    """
    Parameters
    ----------
    long_short_signal_returns : DataFrame
        The signal returns which have a signal.
        This DataFrame contains two columns, "ticker" and "signal_return"
    
    Returns
    -------
    ks_values : Pandas Series
        KS static for all the tickers
    p_values : Pandas Series
        P value for all the tickers
    """
    
   # Implement function
    ks_values = pd.Series(index=long_short_signal_returns['ticker'].unique())
    p_values = pd.Series(index=long_short_signal_returns['ticker'].unique())
    
    mean = long_short_signal_returns['signal_return'].mean()
    std = long_short_signal_returns['signal_return'].std()
    normal_dist_args = (mean, std)
    
    
    # for each ticker
    for name, group in long_short_signal_returns.groupby('ticker'):
        sample = group['signal_return']
        ks_value, p_value = kstest(sample, 'norm', args=normal_dist_args)
        ks_values.loc[name] = ks_value
        p_values.loc[name] = p_value

    return ks_values, p_values

# get the ks values and p values for each ticker in different time frame
ks_values_5, p_values_5 = calculate_kstest(long_short_signal_returns_5)
ks_values_10, p_values_10 = calculate_kstest(long_short_signal_returns_10)
ks_values_20, p_values_20 = calculate_kstest(long_short_signal_returns_20)


In [191]:
# function for getting outliers
def find_outliers(ks_values, p_values, ks_threshold=0.5, pvalue_threshold=0.05):
    """
    Parameters
    ----------
    ks_values : Pandas Series
        KS static for all the tickers
    p_values : Pandas Series
        P value for all the tickers
    ks_threshold : float
        The threshold for the KS statistic
    pvalue_threshold : float
        The threshold for the p-value
    
    Returns
    -------
    outliers : set of str
        Symbols that are outliers
    """
    
    outliers = set(ks_values[ks_values > ks_threshold].index) & set(p_values[p_values < pvalue_threshold].index)
    
    return outliers


ks_threshold = 0.16
outliers_5 = find_outliers(ks_values_5, p_values_5, ks_threshold)
outliers_10 = find_outliers(ks_values_10, p_values_10, ks_threshold)
outliers_20 = find_outliers(ks_values_20, p_values_20, ks_threshold)

# get union of all the time period outliers 
outlier_tickers = outliers_5.union(outliers_10).union(outliers_20)
print('{} Outliers Found:\n{}'.format(len(outlier_tickers), ', '.join(list(outlier_tickers))))


1 Outliers Found:
CVX


In [192]:
good_tickers = list(set(close.columns) - outlier_tickers)

plot_signal_histograms(
    [signal_return_5[good_tickers], signal_return_10[good_tickers], signal_return_20[good_tickers]],
    'Signal Return Without Outliers',
    ('5 Days', '10 Days', '20 Days'))

We found that removing the outlier from the data has made the results better and the returns look close to normal. Thus, our analysis of Breakout Strategy is complete.