<a href="https://colab.research.google.com/github/Mohneesh09/Trend-Rafa/blob/main/best_trendline_detection_class.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import pandas as pd
import numpy as np
import statsmodels.api as sm
import matplotlib.pyplot as plt
from urllib.request import urlopen 
import json
import numba
from numba import njit
from scipy.signal import argrelextrema
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_absolute_error
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from sklearn.linear_model import RANSACRegressor
from statsmodels.nonparametric.kernel_regression import KernelReg
#Reducing data (pending) Window Approach. 5,3,1 yrs,6 months
#DBSCAN method on y-axis(clustering nearby pivots and rating of clusters).  #pd ranking fxn

In [None]:
#https://medium.com/@chris_42047/3-ways-to-smooth-the-price-waves-with-python-9a4e170aaf15
#Will use Nadaraya–Watson kernel regression for smoothing prices.

#Micro-Strategy

In [60]:
class Run_micro_strategy:
    def __init__(self,symbol,st,en):
        self.st,self.en=st,en
        self.symbol= symbol
        self.smoothed_max_list=[] 
        self.smoothed_min_list=[]
        
        self.df=self.load_data()
        self.x_new_max,self.y_new_max= np.array(np.nan),np.array(np.nan)

        
    def load_data(self):
        ticker=self.symbol   #QQQ
        API_KEY = "717311313634cf72e5c3b021728afe4e"
        url = "https://financialmodelingprep.com/api/v3/historical-price-full/{}?apikey={}".format(ticker, API_KEY)
  
        # store the response of URL
        response = urlopen(url)
  
        # storing the JSON response 
        data_json = json.loads(response.read()) 
        df=pd.DataFrame(data_json['historical'])
        df= df[['date','close','open','high','low']].iloc[self.st:self.en]
        return df
    
    def smoothed_curve(self,df_close,bandwidth):
        kernel_reg= KernelReg([df_close.values],[df_close.index],var_type='c',bw=bandwidth)
        reg_result = kernel_reg.fit([df_close.index])
        smoothed_df_close = pd.Series(data=reg_result[0], index=df_close.index)

        return smoothed_df_close
   
    def find_extrema(self,data):
        #Calculating local min and max
        smoothed_local_max = argrelextrema(data['high'].values, np.greater, order=15)[0]
        smoothed_local_min = argrelextrema(data['low'].values, np.less, order=15)[0]

        smoothed_max_list = []
        smoothed_min_list = []
        max_id = 0
        min_id = 0
        for i in range(data.index.start, data.index.stop+1, 1):
            if max_id < len(smoothed_local_max):
                if i-data.index.start == smoothed_local_max[max_id]:
                    smoothed_max_list.append(data['high'][i])
                    max_id += 1
                else:
                    smoothed_max_list.append(np.nan)
            if min_id < len(smoothed_local_min):
                if i-data.index.start == smoothed_local_min[min_id]:
                    smoothed_min_list.append(data['low'][i])
                    min_id += 1
                else:
                    smoothed_min_list.append(np.nan)

        return smoothed_max_list, smoothed_min_list
    
    def pivot_indexing(self, smoothed_list):
        x = np.arange(1, len(smoothed_list) + 1, 1)
        y = np.array(smoothed_list)
        x_new=np.flip(x[~np.isnan(y)])
        y_new=np.flip(y[~np.isnan(y)])
        return x_new,y_new 

    
    def detect_trend(self,x,y):
        ans=1e3
        for i in range(0,2):
            for j in range(i+2,len(x)):
                LR=LinearRegression()
                LR.fit(x[i:j+1].reshape(-1, 1),y[i:j+1].reshape(-1, 1))
                y_predict= LR.predict(x[i:j+1].reshape(-1, 1))
                temp=(mean_absolute_error(y[i:j+1].reshape(-1, 1),y_predict)/np.mean(y))*100
                print("Fitted Line is {0}".format(temp))
                if(ans>temp or abs(ans-temp)<1):
                    ans=temp
                    print('selected')
                    values_fit= LR.coef_[0][0],LR.intercept_[0]
                    x_best=x[i:j+1]
                    y_best=y[i:j+1]
        return values_fit, x_best, y_best

    def detect_trend_lines(self,x,y):
        ans=1e3
        store=[]
        for i in range(0,2):
            for j in range(i+2,len(x)):
                LR=LinearRegression()
                LR.fit(x[i:j+1].reshape(-1, 1),y[i:j+1].reshape(-1, 1))
                y_predict= LR.predict(x[i:j+1].reshape(-1, 1))
                temp=(mean_absolute_error(y[i:j+1].reshape(-1, 1),y_predict)/np.mean(y))*100
                #print("Fitted Line is {0}".format(temp))
                values_fit= LR.coef_[0][0],LR.intercept_[0]
                x_best=x[i:j+1]
                y_best=y[i:j+1]
                store.append((x_best,y_best,values_fit,temp))
                #store=tuple(store)
                store.sort(key = lambda x: x[3])
                #print(store[0][4])
                #print(store[1][4])
        return store

    def plot_chart(self,x_max,y_max,x_min,y_min,values_max,values_min,st):
        
        smoothed_df_close=self.smoothed_curve(self.df['close'],bandwidth='cv_ls')
        smoothed_df_low=self.smoothed_curve(self.df['low'],bandwidth='cv_ls')
        smoothed_df_high=self.smoothed_curve(self.df['high'],bandwidth='cv_ls')
        
        title = f"Trend Analysis"
        symbol='SPY'
        fig = make_subplots(rows=1, cols=1, subplot_titles=[title])

        fig.add_trace(go.Scatter(x=smoothed_df_close.index-st, y=smoothed_df_close.values, line=dict(color="blue", width=1),name="Smoothed Close"), row=1, col=1)
        fig.add_trace(go.Scatter(x=smoothed_df_low.index-st, y=smoothed_df_low.values, line=dict(color="orange", width=1),name="Smoothed Low"), row=1, col=1)
        fig.add_trace(go.Scatter(x=smoothed_df_high.index-st, y=smoothed_df_high.values, line=dict(color="aquamarine", width=1),name="Smoothed High"), row=1, col=1)
        #Trend Lines
        y_min_extrema= values_min[0]*(smoothed_df_close.index-st) + values_min[1]
        fig.add_trace(go.Scatter(x=smoothed_df_close.index-st, y=y_min_extrema, line=dict(color="red", width=1),name="Min Extrema Trend"), row=1, col=1)

        y_max_extrema = values_max[0]*(smoothed_df_close.index-st) + values_max[1]
        fig.add_trace(go.Scatter(x=smoothed_df_close.index-st, y=y_max_extrema, line=dict(color="green", width=1),name="Max Extrema Trend"), row=1, col=1)

        #x = np.arange(0, len(smoothed_min_list),1)
        fig.add_trace(go.Scatter(x=x_min, y=y_min, marker=dict(color='red', size=7), mode='markers', name='Min extrema'), row=1, col=1)
        fig.add_trace(go.Scatter(x=x_max, y=y_max, marker=dict(color='green', size=7), mode='markers', name='Max extrema'), row=1, col=1)

        fig.update_layout(
              title={'text': f"{symbol} Price", 'x': 0.5},
              autosize=True,
              width=1000, height=700,
              xaxis={"rangeslider": {"visible": False}},
              plot_bgcolor="#EFF5F8")
        fig.update_yaxes(visible=False, secondary_y=True)
        #file_name = f"{i}_trend_analysis.png"
        #fig.write_image(os.path.join('/content/images90test/', file_name), format="png")
        #print('write done')

        fig.show()

    def execute(self):
        #df=self.load_data()
        self.smoothed_max_list, self.smoothed_min_list=self.find_extrema(self.df)
        self.x_new_max,self.y_new_max=self.pivot_indexing(self.smoothed_max_list)
        self.x_new_min,self.y_new_min=self.pivot_indexing(self.smoothed_min_list)
        self.store_max=self.detect_trend_lines(self.x_new_max,self.y_new_max)
        self.store_min=self.detect_trend_lines(self.x_new_min,self.y_new_min)
        #self.values_max,self.x_max,self.y_max=self.detect_trend(self.x_new_max,self.y_new_max)
        #self.values_min,self.x_min,self.y_min=self.detect_trend(self.x_new_min,self.y_new_min)
        print(self.store_max[0][3],self.store_min[0][3])
        print(self.store_max[1][3],self.store_min[1][3])
        print(self.store_max[2][3],self.store_min[2][3])
        print(self.store_max[3][3],self.store_min[3][3])
        self.plot_chart(self.store_max[0][0],self.store_max[0][1],self.store_min[0][0],self.store_min[0][1],self.store_max[0][2],self.store_min[0][2],self.st)
        self.plot_chart(self.store_max[1][0],self.store_max[1][1],self.store_min[0][0],self.store_min[1][1],self.store_max[1][2],self.store_min[1][2],self.st)
        self.plot_chart(self.store_max[2][0],self.store_max[2][1],self.store_min[2][0],self.store_min[2][1],self.store_max[2][2],self.store_min[2][2],self.st)
        self.plot_chart(self.store_max[3][0],self.store_max[3][1],self.store_min[3][0],self.store_min[0][1],self.store_max[3][2],self.store_min[3][2],self.st)
    

In [61]:
if __name__ == '__main__':
    symbol = 'SPY'
    #df = 
    Run_micro=Run_micro_strategy(symbol,658,1258)
    #cProfile.run('Run_micro.execute()')    #Check Fxn execution time
    Run_micro.execute()

0.0006461834080106268 0.12035977109444625
1.2672271955312686 1.4473985878351308
1.2839254830701345 1.4566386812406966
1.3900000297883957 2.8450816938156582


#Rolling function Code awaiting.

#**RANSAC** **testing**

In [None]:
#combined = np.vstack((x_new_min, y_new_min)).T

In [None]:
class Run_Ransac_strategy:
    def __init__(self,symbol,st,en):
        self.st,self.en=st,en
        self.symbol= symbol
        self.smoothed_max_list=[] 
        self.smoothed_min_list=[]
        
        self.df=self.load_data()
        self.x_new_max,self.y_new_max= np.array(np.nan),np.array(np.nan)
        self.x_new_min,self.y_new_min= np.array(np.nan),np.array(np.nan)

        
    def load_data(self):
        ticker=self.symbol   #QQQ
        API_KEY = "717311313634cf72e5c3b021728afe4e"
        url = "https://financialmodelingprep.com/api/v3/historical-price-full/{}?apikey={}".format(ticker, API_KEY)
  
        # store the response of URL
        response = urlopen(url)
  
        # storing the JSON response 
        data_json = json.loads(response.read()) 
        df=pd.DataFrame(data_json['historical'])
        df= df[['date','close','open','high','low']].iloc[self.st:self.en]
        return df
    
    def smoothed_curve(self,df_close,bandwidth):
        kernel_reg= KernelReg([df_close.values],[df_close.index],var_type='c',bw=bandwidth)
        reg_result = kernel_reg.fit([df_close.index])
        smoothed_df_close = pd.Series(data=reg_result[0], index=df_close.index)

        return smoothed_df_close
   
    def find_extrema(self,data):
        #Calculating local min and max
        smoothed_local_max = argrelextrema(data['high'].values, np.greater, order=15)[0]
        smoothed_local_min = argrelextrema(data['low'].values, np.less, order=15)[0]

        smoothed_max_list = []
        smoothed_min_list = []
        max_id = 0
        min_id = 0
        for i in range(data.index.start, data.index.stop+1, 1):
            if max_id < len(smoothed_local_max):
                if i-data.index.start == smoothed_local_max[max_id]:
                    smoothed_max_list.append(data['high'][i])
                    max_id += 1
                else:
                    smoothed_max_list.append(np.nan)
            if min_id < len(smoothed_local_min):
                if i-data.index.start == smoothed_local_min[min_id]:
                    smoothed_min_list.append(data['low'][i])
                    min_id += 1
                else:
                    smoothed_min_list.append(np.nan)

        return smoothed_max_list, smoothed_min_list
    
    def pivot_indexing(self, smoothed_list):
        x = np.arange(1, len(smoothed_list) + 1, 1)
        y = np.array(smoothed_list)
        x_new=np.flip(x[~np.isnan(y)])
        y_new=np.flip(y[~np.isnan(y)])
        return x_new,y_new 

    
    def recent_inc_max(self,X,y):
      #print(X)
        if(X in self.x_new_max[:2] and len(X)>2):
          #print('Yes')
           return True
        else:
          #print('No')
           return False

    def recent_inc_min(self,X,y):
         #print(X)
        if(X in self.x_new_min[:2] and len(X)>2):
            #print('Yes')
            return True
        else:
            #print('No')
            return False
        
    def RANSAC(self,x,y,recent_inc,sample_no):
        ransac= RANSACRegressor(min_samples =sample_no,random_state=42,residual_threshold=1,is_data_valid=recent_inc)
        ransac.fit(x.reshape(-1,1),y.reshape(-1,1))
        #print(ransac.score(x[ransac.inlier_mask_].reshape(-1,1), y[ransac.inlier_mask_].reshape(-1,1)))
        m, c= ransac.estimator_.coef_,ransac.estimator_.intercept_
        #print(ransac.n_skips_invalid_data_)
        return m,c,ransac.inlier_mask_

    def macro_strategy(self):
        df1=self.df
        st=self.st
        RANSAC=self.RANSAC
        
        self.smoothed_df_close=self.smoothed_curve(df1['close'],bandwidth='cv_ls')
        self.smoothed_df_low=self.smoothed_curve(df1['low'],bandwidth='cv_ls')
        self.smoothed_df_high=self.smoothed_curve(df1['high'],bandwidth='cv_ls')
        self.smoothed_max_list, self.smoothed_min_list=self.find_extrema(df1)
        self.x_new_max, self.y_new_max=self.pivot_indexing(self.smoothed_max_list)
        self.x_new_min,self.y_new_min=self.pivot_indexing(self.smoothed_min_list)

        m_min, c_min,ransac_min_inlier= RANSAC(self.x_new_min.reshape(-1,1),self.y_new_min.reshape(-1,1),self.recent_inc_min,3)

        m_max, c_max,ransac_max_inlier= RANSAC(self.x_new_max.reshape(-1,1),self.y_new_max.reshape(-1,1),self.recent_inc_max,3)

        m_min1, c_min1,ransac_min_inlier1= RANSAC(self.x_new_min.reshape(-1,1),self.y_new_min.reshape(-1,1),self.recent_inc_min,4)

        m_max1, c_max1,ransac_max_inlier1= RANSAC(self.x_new_max.reshape(-1,1),self.y_new_max.reshape(-1,1),self.recent_inc_max,4)

        m_min2, c_min2,ransac_min_inlier2= RANSAC(self.x_new_min.reshape(-1,1),self.y_new_min.reshape(-1,1),self.recent_inc_min,5)

        m_max2, c_max2,ransac_max_inlier2= RANSAC(self.x_new_max.reshape(-1,1),self.y_new_max.reshape(-1,1),self.recent_inc_max,5)

        m_min3, c_min3,ransac_min_inlier3= RANSAC(self.x_new_min.reshape(-1,1),self.y_new_min.reshape(-1,1),None,None)

        m_max3, c_max3,ransac_max_inlier3= RANSAC(self.x_new_max.reshape(-1,1),self.y_new_max.reshape(-1,1),None,None)

        self.plot_strategy(st,self.x_new_max,self.y_new_max,self.x_new_min,self.y_new_min,m_max,c_max,m_min,c_min,ransac_max_inlier,ransac_min_inlier)
        self.plot_strategy(st,self.x_new_max,self.y_new_max,self.x_new_min,self.y_new_min,m_max1,c_max1,m_min1,c_min1,ransac_max_inlier1,ransac_min_inlier1)
        self.plot_strategy(st,self.x_new_max,self.y_new_max,self.x_new_min,self.y_new_min,m_max2,c_max2,m_min2,c_min2,ransac_max_inlier2,ransac_min_inlier2)
        self.plot_strategy(st,self.x_new_max,self.y_new_max,self.x_new_min,self.y_new_min,m_max3,c_max3,m_min3,c_min3,ransac_max_inlier3,ransac_min_inlier3)

    def plot_strategy(self,st,x_max,y_max,x_min,y_min,m_max,c_max,m_min,c_min,ransac_max_inlier,ransac_min_inlier):
        title = f"Trend Analysis"
        symbol='AAPL'
        fig = make_subplots(rows=1, cols=1, subplot_titles=[title])

        fig.add_trace(go.Scatter(x=self.smoothed_df_close.index-st, y=self.smoothed_df_close.values, line=dict(color="blue", width=1),name="Smoothed Close"), row=1, col=1)
        fig.add_trace(go.Scatter(x=self.smoothed_df_low.index-st, y=self.smoothed_df_low.values, line=dict(color="orange", width=1),name="Smoothed Low"), row=1, col=1)
        fig.add_trace(go.Scatter(x=self.smoothed_df_high.index-st, y=self.smoothed_df_high.values, line=dict(color="aquamarine", width=1),name="Smoothed High"), row=1, col=1)
        #Trend Lines
        y_min_extrema= (m_min[0][0])*(self.smoothed_df_close.index-st) + c_min[0]
        fig.add_trace(go.Scatter(x=self.smoothed_df_close.index-st, y=y_min_extrema, line=dict(color="red", width=1),name="Min Extrema Trend"), row=1, col=1)

        y_max_extrema = (m_max[0][0])*(self.smoothed_df_close.index-st) + c_max[0]
        fig.add_trace(go.Scatter(x=self.smoothed_df_close.index-st, y=y_max_extrema, line=dict(color="green", width=1),name="Max Extrema Trend"), row=1, col=1)

        #x = np.arange(0, len(smoothed_min_list),1)
        fig.add_trace(go.Scatter(x=x_min[ransac_min_inlier], y=y_min[ransac_min_inlier], marker=dict(color='red', size=7), mode='markers', name='Min extrema'), row=1, col=1)
        fig.add_trace(go.Scatter(x=x_max[ransac_max_inlier], y=y_max[ransac_max_inlier], marker=dict(color='green', size=7), mode='markers', name='Max extrema'), row=1, col=1)
        #fig.add_trace(go.Scatter(x=x_new_min[~ransac_min.inlier_mask_], y=y_new_min[~ransac_min.inlier_mask_], marker=dict(color='blue', size=7), mode='markers', name='Max extrema'), row=1, col=1)

        fig.update_layout(
              title={'text': f"{symbol} Price", 'x': 0.5},
              autosize=True,
              width=1000, height=600,
              xaxis={"rangeslider": {"visible": False}},
              plot_bgcolor="#EFF5F8")
        fig.update_yaxes(visible=False, secondary_y=True)

        fig.show()


In [None]:
if __name__ == '__main__':
    symbol = 'AAPL'
    #df = 
    Run_macro=Run_Ransac_strategy(symbol,658,1258)
    #cProfile.run('Run_micro.execute()')    #Check Fxn execution time
    Run_macro.macro_strategy()