In [196]:
import numpy as np
import xbbg 
from xbbg import blp
import plotly.express as px
import plotly.graph_objects as go
import copy

In [197]:
target_Stock = 'SPX Index'

In [198]:
stock = blp.bdh(target_Stock, ["PX_Last", "PR005", "RT115", "RR902"], start_date="1/1/2016")
print(stock)

           SPX Index                                
             PX_Last    PR005     RT115        RR902
2016-01-01       NaN      NaN       NaN  18609450.37
2016-01-04   2012.66  2012.66    0.0000  18322460.96
2016-01-05   2016.71  2016.71    0.2012  18377102.94
2016-01-06   1990.26  1990.26   -1.0774  18138173.01
2016-01-07   1943.09  1943.09   -3.4193  17712383.01
...              ...      ...       ...          ...
2023-06-20   4388.71  4388.71  150.1147  38267913.20
2023-06-21   4365.69  4365.69  148.8191  38065936.10
2023-06-22   4381.89  4381.89  149.7646  38226010.32
2023-06-23   4348.33  4348.33  147.8537  37932167.42
2023-06-26   4328.82  4328.82  146.7466  37756036.97

[1952 rows x 4 columns]


In [199]:
px_arr = stock[(target_Stock, 'PX_Last')].to_numpy()
mask = np.isnan(px_arr)
px_arr_ = px_arr[~mask]
date_arr = stock.index.values

In [200]:
def directional_change(prices: np.array, dates: np.array, sigma:float= 0.03):
    nextUp = True
    tmp_max = 0
    tmp_min = 0
    tmp_max_i = 0
    tmp_min_i = 0
    x2 = [dates[0]]
    y2 = [prices[0]]
    for i in range(len(prices)):
        if nextUp:
            if prices[i]>tmp_max:
                tmp_max = prices[i]
                tmp_max_i = i
            elif prices[i]<tmp_max-tmp_max*sigma:
                x2.append(dates[tmp_max_i])
                y2.append(tmp_max)
                nextUp = False
                tmp_min = prices[i]
                tmp_min_i = i

        elif not nextUp:
            if prices[i]<tmp_min:
                tmp_min = prices[i]
                tmp_min_i = i
            elif prices[i]>tmp_min+tmp_min*sigma:
                x2.append(dates[tmp_min_i])
                y2.append(tmp_min)
                nextUp = True
                tmp_max = prices[i]
                tmp_max_i = i
    if nextUp:
        x2.append(dates[tmp_max_i])
        y2.append(tmp_max)
    else:
        x2.append(dates[tmp_min_i])
        y2.append(tmp_min)
    return x2, y2

In [201]:
def rw_local_max(prices: np.array, index:int, order:int):
    if index < order * 2 + 1:
        return False
    else:
        is_max = True
        k = index - order
        v = prices[k]
        for i in range(1, order+1):
            if prices[k+i]>v or prices[k-i]>v :
                is_max = False
                break
        return is_max
        
    
def rw_local_min(prices: np.array, index:int, order:int):
    if index < order * 2 + 1:
        return False
    else:
        is_min = True
        k = index - order
        v = prices[k]
        for i in range(1, order+1):
            if prices[k+i]<v or prices[k-i]<v :
                is_min = False
                break
        return is_min
def rolling_window (prices: np.array, dates: np.array, order:int= 3):
    x2 = [dates[0]]
    y2 = [prices[0]]
    for i in range(len(prices)):
        if rw_local_max(prices, i, order):
            x2.append(dates[i-order])
            y2.append(prices[i-order])
        elif rw_local_min(prices, i, order):
            x2.append(dates[i-order])
            y2.append(prices[i-order])
    return x2, y2

In [202]:
x2, y2 = rolling_window(px_arr_,date_arr, 10)

fig = go.Figure()

# add the line trace using add_trace() method
fig.add_trace(go.Scatter(x = date_arr, y=px_arr_, mode='lines', name='Stock price'))

# add the scatter trace using add_trace() method
fig.add_trace(go.Scatter(x=x2, y=y2, mode='lines', name='Trend Line'))

# customize the layout
fig.update_layout(title='Stock Price With Trend Line',
                  xaxis_title='dates',
                  yaxis_title='Price')

# show the figure
fig.show()

In [203]:

fig = go.Figure()

# add the line trace using add_trace() method
fig.add_trace(go.Scatter(x = date_arr, y=px_arr_, mode='lines', name='Stock price'))

# add the scatter trace using add_trace() method

# customize the layout
fig.update_layout(title='Stock Price With Trend Line',
                  xaxis_title='dates',
                  yaxis_title='Price')

# show the figure
fig.show()
x2, y2 = directional_change(px_arr_,date_arr, 0.05)

fig = go.Figure()

# add the line trace using add_trace() method
fig.add_trace(go.Scatter(x = date_arr, y=px_arr_, mode='lines', name='Stock price'))

# add the scatter trace using add_trace() method
fig.add_trace(go.Scatter(x=x2, y=y2, mode='lines', name='Trend Line'))

# customize the layout
fig.update_layout(title='Stock Price With Trend Line',
                  xaxis_title='Date',
                  yaxis_title='Price')

# show the figure
fig.show()
x2, y2 = directional_change(px_arr_,date_arr, 0.01)

fig = go.Figure()

# add the line trace using add_trace() method

fig.add_trace(go.Scatter(x = date_arr, y=px_arr_, mode='lines', name='Stock price'))

# add the scatter trace using add_trace() method
fig.add_trace(go.Scatter(x=x2, y=y2, mode='lines', name='Trend Line'))

# customize the layout
fig.update_layout(title='Stock Price With Trend Line',
                  xaxis_title='dates',
                  yaxis_title='Price')

# show the figure
fig.show()

In [213]:
def distance(x1, y1, x2, y2, xn, yn):
    new_y = (((y2-y1)/(x2-x1))*(xn-x1))+y1
    diff = abs(new_y - yn)
    return diff
def loop_to_flat(x2, y2, index, prices, dates):
    new_xs = [x2[0]]
    new_ys = [y2[0]]
    new_index = [index[0]]
    need_back = False
    for i in range(1, len(x2)):
        
        date = x2[i]
        lastDate = x2[i-1]
        difference = (date - lastDate).days
        #slope = ((prices[index[i-1]]-prices[index[i]])/(dates[index[i-1]]-dates[index[i]]).days)
        if difference > 150:# and abs(slope) < 1.5:
            temp_max = 0
            temp_max_i = 0
            need_back = True
            for j in range(index[i-1], index[i]):
                abs_diff = distance(index[i-1], prices[index[i-1]], index[i], prices[index[i]], j, prices[j])
                if abs_diff > temp_max:
                    temp_max = abs_diff
                    temp_max_i = j
            new_xs.append(dates[temp_max_i])
            new_ys.append(prices[temp_max_i])
            new_index.append(temp_max_i)
        new_xs.append(x2[i])
        new_ys.append(y2[i])
        new_index.append(index[i])
    return new_xs, new_ys, new_index, need_back
def directional_change2(prices: np.array, dates: np.array, sigma:float= 0.03):
    nextUp = True
    tmp_max = 0
    tmp_min = 0
    tmp_max_i = 0
    tmp_min_i = 0
    x2 = [dates[0]]
    y2 = [prices[0]]
    index = [0]
    for i in range(len(prices)):
        if nextUp:
            if prices[i]>tmp_max:
                tmp_max = prices[i]
                tmp_max_i = i
            elif prices[i]<tmp_max-tmp_max*sigma:
                x2.append(dates[tmp_max_i])
                y2.append(tmp_max)
                index.append(tmp_max_i)
                nextUp = False
                tmp_min = prices[i]
                tmp_min_i = i

        elif not nextUp:
            if prices[i]<tmp_min:
                tmp_min = prices[i]
                tmp_min_i = i
            elif prices[i]>tmp_min+tmp_min*sigma:
                x2.append(dates[tmp_min_i])
                y2.append(tmp_min)
                index.append(tmp_min_i)
                nextUp = True
                tmp_max = prices[i]
                tmp_max_i = i
    if nextUp:
        x2.append(dates[tmp_max_i])
        y2.append(tmp_max)
        index.append(tmp_max_i)
    else:
        x2.append(dates[tmp_min_i])
        y2.append(tmp_min)
        index.append(tmp_min_i)
    return_needback = False
    return_x, return_y, return_index, return_needback = loop_to_flat(x2, y2, index, prices, dates)
    while return_needback:
        x2_copy = copy.deepcopy(return_x)
        y2_copy = copy.deepcopy(return_y)
        index_copy = copy.deepcopy(return_index)

        return_x = []
        return_y = []
        return_index=[]
        return_x, return_y, return_index, return_needback = loop_to_flat(x2_copy, y2_copy, index_copy, prices, dates)
    return return_x, return_y

x2, y2 = directional_change2(px_arr_,date_arr, 0.05)
x, y = directional_change(px_arr_,date_arr, 0.05)
fig = go.Figure()

# add the line trace using add_trace() method

fig.add_trace(go.Scatter(x = date_arr, y=px_arr_, mode='lines', name='Stock price'))

# add the scatter trace using add_trace() method
fig.add_trace(go.Scatter(x=x2, y=y2, mode='lines', name='Trend Line'))
#fig.add_trace(go.Scatter(x=x, y=y, mode='lines', name='Trend Line'))

# customize the layout
fig.update_layout(title='Stock Price With Trend Line',
                  xaxis_title='dates',
                  yaxis_title='Price')

# show the figure
fig.show()


In [226]:
x2, y2 = directional_change2(px_arr_,date_arr, 0.08)
x1, y1 = directional_change(px_arr_,date_arr, 0.03)
fig = go.Figure()

# add the line trace using add_trace() method

fig.add_trace(go.Scatter(x = date_arr, y=px_arr_, mode='lines', name='Stock price'))

# add the scatter trace using add_trace() method

#fig.add_trace(go.Scatter(x=x2, y=y2, mode='lines', name='Trend Line v2'))
fig.add_trace(go.Scatter(x=x1, y=y1, mode='lines', name='Trend Line v1'))

# customize the layout
fig.update_layout(title='Stock Price With Trend Line',
                  xaxis_title='dates',
                  yaxis_title='Price')

# show the figure
fig.show()

In [228]:
print(len(y2))

43


In [227]:
new_arr = []
for i in range(1, len(px_arr_)):
    new_arr.append(abs(px_arr_[i]-px_arr_[i-1])/px_arr_[i-1])
fig = go.Figure()

# add the line trace using add_trace() method

fig.add_trace(go.Scatter(x = date_arr, y=new_arr, mode='lines', name='Stock price'))


# customize the layout
fig.update_layout(title='Stock Price With Trend Line',
                  xaxis_title='dates',
                  yaxis_title='Price')

# show the figure
fig.show()