In [1]:
# if you haven't installed requests, get it via 'pip install requests'
import requests
# if you haven't installed pandas, get it via 'pip install pandas'
import pandas as pd

import yfinance as yf

import numpy as np

from datetime import date, datetime, timedelta

from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
from sklearn.metrics import mean_squared_error
from sklearn import tree
from math import sqrt

from sklearn.tree import export_text
from sklearn.metrics import r2_score
from sklearn.metrics import f1_score

import panel as pn
pn.extension()

In [2]:
def rolling_analysis(ticker):
               
    stock = yf.Ticker(ticker)
    sp500 = yf.Ticker('IVV')
    
    #get end date as todays date and start date as ten years back
    today = datetime.today().strftime('%Y-%m-%d')
    today_minus_10 = datetime.today() - timedelta(days=3650)
    today_minus_10_converted = today_minus_10.strftime('%Y-%m-%d')
    #today_minus_10111

    start_date = today_minus_10_converted
    end_date = today

    # get historical market data for 10 years
    hist = stock.history(start=start_date,end=end_date)
    df_ticker=hist
    
    #get only closing price for the ticker
    df_close=df_ticker["Close"].to_frame()
    
    #drop nulls
    df_close.dropna()
    
    #get number of rows in dataset
    index = df_close.index
    number_of_rows = len(index)
    
    #check if there is atleast 3 years worth data
    flag=0
    if number_of_rows<750:
        flag=1
        msg="The stock does not have sufficient historical data for analysis"
        return(msg)
    
    hist = None
    hist = sp500.history(start=start_date,end=end_date)
    df_sp500=hist
    
    df_sp500_close=df_sp500["Close"].to_frame()
    df_sp500_close.rename(columns={"Close": "sp500"}, inplace=True)
    
    df_final=None
    df_final=pd.concat([df_close, df_sp500_close], axis=1, join="inner")
    #df_final.rename(columns={"Close":f"{ticker}"},inplace=True)
    
    #create copy for moving average returns
    df_sma = df_final.copy(deep=True)
    df_sma.drop(columns=["sp500"],inplace=True)
    #create copy for exponential moving average returns
    df_ewm = df_final.copy(deep=True)
    df_ewm.drop(columns=["sp500"],inplace=True)
    
           
    #Ignore the name. There is no returns here. The actual prices are used.
    df_returns = df_final.copy(deep=True)
    df_returns.dropna(inplace=True)
    
    #get daily returns
    df_daily_returns = df_final.copy(deep=True)
    df_daily_returns2 = df_daily_returns.pct_change()
    df_daily_returns=None
    df_daily_returns = df_daily_returns2
    df_daily_returns.dropna(inplace=True)
    
       
    #Get Bollinger bands
    # Set bollinger band window
    bollinger_window = 20

    # Calculate rolling mean and standard deviation
    df_returns['bollinger_mid_band'] = df_returns['Close'].rolling(window=bollinger_window).mean()
    df_returns['bollinger_std'] = df_returns['Close'].rolling(window=20).std()
    
    # Calculate upper and lowers bands of bollinger band
    df_returns['bollinger_upper_band']  = df_returns['bollinger_mid_band'] + (df_returns['bollinger_std'] * 1)
    df_returns['bollinger_lower_band']  = df_returns['bollinger_mid_band'] - (df_returns['bollinger_std'] * 1)
    
    # Calculate bollinger band trading signal
    df_returns['bollinger_long'] = np.where(df_returns['Close'] < df_returns['bollinger_lower_band'], 1.0, 0.0)
    df_returns['bollinger_short'] = np.where(df_returns['Close'] > df_returns['bollinger_upper_band'], -1.0, 0.0)
    df_returns['bollinger_signal'] = df_returns['bollinger_long'] + df_returns['bollinger_short']
    
    df_returns.dropna(inplace=True)
    #Retain only returns and bollinger signal columns
    ##################################################################
    #(1)
    df_bollinger = df_returns[["Close", "sp500", "bollinger_signal"]]
    ##################################################################
    
    # Set the short window and long windows
    short_window = 50
    long_window = 100

    # Generate the short and long moving averages (50 and 100 days, respectively)
    df_sma["SMA50"] = df_sma["Close"].rolling(window=short_window).mean()
    df_sma["SMA100"] = df_sma["Close"].rolling(window=long_window).mean()

    # Initialize the new `Signal` column
    df_sma["Signal"] = 0.0
    
    df_sma.dropna(inplace=True)

    # Generate the trading signal 0 or 1
    df_sma["Signal"] = np.where(df_sma["SMA50"] < df_sma["SMA100"], 1.0, -1.0)
    
    # Calculate the points in time at which a position should be taken, 1 or -1
    df_sma["Entry/Exit"] = df_sma["Signal"].diff()
    
    ############################################################
    #(2)
    #Moving average 50 versus 100 day final set
    df_sma.dropna(inplace=True)
    #retain only the entry/exit column
    df_sma_final= df_sma[["Signal"]]
    df_sma_final2=df_sma_final.rename(columns={"Signal":"MA_50_100_Signal"})
    ############################################################
    
    # Set short and long windows
    short_window = 1
    long_window = 10

    # Construct a `Fast` and `Slow` Exponential Moving Average from short and long windows, respectively
    df_ewm['fast_close'] = df_ewm['Close'].ewm(halflife=short_window).mean()
    df_ewm['slow_close'] = df_ewm['Close'].ewm(halflife=long_window).mean()

    # Construct a crossover trading signal
    df_ewm['crossover_long'] = np.where(df_ewm['fast_close'] > df_ewm['slow_close'], 1.0, 0.0)
    df_ewm['crossover_short'] = np.where(df_ewm['fast_close'] < df_ewm['slow_close'], -1.0, 0.0)
    df_ewm['ewm_crossover_signal'] = df_ewm['crossover_long'] + df_ewm['crossover_short']
    
    ########################################################################
    #(3)
    df_ewm.dropna(inplace=True)
    df_ewm_final = df_ewm[["ewm_crossover_signal"]]
    #######################################################################
    
    # Set short and long volatility windows
    short_vol_window = 1
    long_vol_window = 10
    
    df_daily_returns.rename(columns={"Close":"daily_return"},inplace=True)
    

    # Construct a `Fast` and `Slow` Exponential Moving Average from short and long windows, respectively
    df_daily_returns['fast_vol'] = df_daily_returns['daily_return'].ewm(halflife=short_vol_window).std()
    df_daily_returns['slow_vol'] = df_daily_returns['daily_return'].ewm(halflife=long_vol_window).std()

    # Construct a crossover trading signal
    df_daily_returns['vol_trend_long'] = np.where(df_daily_returns['fast_vol'] < df_daily_returns['slow_vol'], 1.0, 0.0)
    df_daily_returns['vol_trend_short'] = np.where(df_daily_returns['fast_vol'] > df_daily_returns['slow_vol'], -1.0, 0.0) 
    df_daily_returns['vol_trend_signal'] = df_daily_returns['vol_trend_long'] + df_daily_returns['vol_trend_short']
    
    ########################################################################
    #(4)
    df_daily_returns.dropna(inplace=True)
    df_daily_returns_final = df_daily_returns[["vol_trend_signal"]]
    ########################################################################
    
    df_features=pd.concat([df_bollinger, df_sma_final2, df_ewm_final, df_daily_returns_final], axis=1, join="inner")
    df_ticker_price=None
    df_ticker_price=df_features["Close"]
    df_ticker_price=df_ticker_price.to_frame()
    df_ticker_price["daily_returns"]=df_ticker_price.pct_change()
    df_ticker_price.dropna(inplace=True)
    df_ticker_price["shifted_returns"]=df_ticker_price.daily_returns.shift(-1)
    df_ticker_price["decision_number"]=np.where(df_ticker_price["shifted_returns"]>0,1.0,0.0)
    df_ticker_price["decision_variable"]=np.where(df_ticker_price["decision_number"]==1.0,"BUY","SELL")
    
    ##############################################################
    df_y = df_ticker_price["decision_number"]
    df_x = df_features
    df_x = df_x.drop(columns=["Close", "sp500"])
    
    df_dataset=None
    df_dataset =pd.concat([df_y,df_x],axis=1,join="inner")
    ##############################################################
    
    total_data_points = len(df_dataset.index)
    
    #get number of data points for training and test
    training_number = round(0.80*total_data_points)
    test_number = total_data_points - training_number
    
    #checkkkk = training_number + test_number
    
    ################################################################
    x_var_list=None
    x_var_list= ["bollinger_signal", "MA_50_100_Signal", "ewm_crossover_signal", "vol_trend_signal"]
    
    X_train_temp=None
    X_train_temp = df_dataset[x_var_list].copy(deep=True)
    
    #X Variable training set
    X_train=None
    X_train=X_train_temp.head(training_number)
    
    X_test=None
    X_test=X_train_temp[x_var_list].tail(test_number)
    
    y_train_temp=None
    y_train_temp=df_dataset["decision_number"].head(training_number).to_frame()
    
    y_train=y_train_temp["decision_number"]
    
    y_test=None
    y_test=df_dataset["decision_number"].tail(test_number).to_frame()
    
    ################################################################
    
    # Fit a SKLearn linear regression using just the training set (X_train, Y_train):
    model=None
    model = RandomForestClassifier(n_estimators=100, max_depth=3, random_state=0)
    model.fit(X_train, y_train)
    
    # Make a prediction of "y" values from the X_test dataset
    predictions=None
    predictions = model.predict(X_test)
    
    # Fit a SKLearn linear regression using just the training set (X_train, Y_train):
    model2=None
    model2 = RandomForestClassifier(n_estimators=100, max_depth=4, random_state=0)
    model2.fit(X_train, y_train)
    
    # Make a prediction of "y" values from the X_test dataset
    predictions2=None
    predictions2 = model2.predict(X_test)
    
    #Get results of model 1   
    Results=None
    Results = y_test.copy(deep=True)
    Results["Predicted Value"] = predictions
    
    #Get results of model 2
    Results2=None
    Results2 = y_test.copy(deep=True)
    Results2["Predicted Value"] = predictions2
    
          
    #Predictive strength - model 1
    y_actual=Results["decision_number"].to_list()
    y_predicted=Results["Predicted Value"].to_list()
    rms = sqrt(mean_squared_error(y_actual, y_predicted, squared=False))
    
    #Predictive strength - model 2
    y_actual2=Results2["decision_number"].to_list()
    y_predicted2=Results2["Predicted Value"].to_list()
    rms2 = sqrt(mean_squared_error(y_actual2, y_predicted2, squared=False))
    
    model1_f1=f1_score(y_actual, y_predicted, average="macro")
    model2_f1=f1_score(y_actual2, y_predicted2, average="macro")
    
    #check which model lower rms error value
    if rms>rms2:
        temp_model = "model 2"
    else:
        temp_model = "model 1"
        
    if model1_f1>model2_f1:
        f1_model = "model 1"
    else:
        f1_model = "model 2"
        
    result_placeholder1=Results.tail()
    result_placeholder2=Results2.tail()   
     
            
    ######################################################
    empty_line = ""
    output1= f"The first decision tree model has root mean square error value of {rms}"
    output2= f"The second decision tree model has root mean square error value of {rms2}"
    
    output3=None
    if temp_model == "model 1":
        output3=" As Model 1 has lower RMSE value OR RMSE same as Model 2, Model 1 is selected for final prediction"
        output4=Results.tail()
    else:
        output3=" As Model 2 has lower RMSE value, Model 2 is selected for final prediction"
        output4=Results2.tail()
    
    output5="The below table shows the last 5 rows of the test dataset. The predicted value for the final row is the recommendation."
    
    output_temp=output4["Predicted Value"][-1]
    if output_temp==1:
        output6="## Model Recommendation is BUY"
    else:
        output6="## Model Recommendation is SELL"
    
    output7=f"Model 1 has a F1 Score of {model1_f1}"
    output8=f"Model 2 has a F1 Score of {model2_f1}"
    
    if f1_model=="model 1":
        output9="Model 1 has a higher F1 Score"
        output10="Below is test dataset for Model 1. Use last Row for Recommendation 1-BUY, 0-SELL"
        output11=result_placeholder1
    else:
        output9="Model 2 has a higher F1 Score"
        output10="Below is test dataset for Model 2. Use last Row for Recommendation 1-BUY, 0-SELL"
        output11=result_placeholder2
    
    output_temp2=output11["Predicted Value"][-1]
    if output_temp2==1:
        output12="## Model Recommendation is BUY"
    else:
        output12="## Model Recommendation is SELL"
    
       
    ######################################################
    
    #######################################################
    #Presentation Outputs
    p_out1="Below table is initial extract form Yahoo Finance"
    p_out2=df_final.head()
    
    initial_extract = pn.Column(p_out1,p_out2)
    
    p_out3="Below is table with bollinger signal (based on a 20 day rolling window standard deviation)"
    p_out4=df_returns.head()
    p_out5=df_bollinger.head()
    
    bollinger_analysis1=df_returns.head()
    bollinger_analysis2=df_bollinger
    
    bollinger_analysis3=pn.Row(bollinger_analysis1,bollinger_analysis2)
    bollinger_analysis4=pn.Column(p_out3,bollinger_analysis1,width=1400)
    
    p_sma="Below is the table with Simple Moving Average analysis 50/100 day"
    p_sma_analysis=df_sma.head()
    p_sma=pn.Column(p_sma,p_sma_analysis, width=500)
    
    p_ewm1="Below is the table with Exponential weighted Moving Average (short window 1 and long window 10 days)"
    p_ewm2=df_ewm.head()
    p_ewm3=pn.Column(p_ewm1,p_ewm2,width=700)
    
    p_vol_trend1="Below table is for volatility trend signal (short window 1 and long window 10 days)"
    p_vol_trend2=df_daily_returns.head()
    p_vol_trend3=pn.Column(p_vol_trend1,p_vol_trend2,width=800)
    
    p_y1="Below is Table with Target Variable"
    p_y2=df_ticker_price.head()
    p_y3=pn.Column(p_y1,p_y2,width=700)
    
    p_model_set1="Below is the Table with the model dataset"
    p_model_set2=df_dataset.head()
    p_model_set3=pn.Column(p_model_set1,p_model_set2,width=900)
    
    p_m_results1= pn.Column('#RMSE Analysis',output1,output2,output3,output5,output4,output6)
    p_m_results2= pn.Column('#F1 Score Analysis',output7,output8,output9,output10,output11,output12)
    p_m_results3= pn.Row(p_m_results1,empty_line,empty_line,p_m_results2)
    #p_m_results2=pn.Row(p_m_results1,"Based on RMSE score, the ")
    
    
    
    final_dashboard = None

    final_dashboard = pn.Tabs(
    ("(1). Initial Data Extract", initial_extract),
    ("(2). Bollinger Analysis",bollinger_analysis4),
    ("(3). Simple Moving Average Analysis",p_sma),
    ("(4). Exponential Weighted Moving Average",p_ewm3),
    ("(5). Volatility Trend Signal",p_vol_trend3),
    ("(6). Target Variable",p_y3),
    ("(7). Model Dataset", p_model_set3),
    ("(8). Results Summary",p_m_results3)
    )
    

    #######################################################
           
    #Clearing Ticker Variable
    ticker=None
    
    #check if there is sufficient data to perform analysis, if not return message
    if flag == 1:
        msg="The stock does not have sufficient historical data for analysis"
        return(msg)
    else:
        #return(output1,output2,output3,output5,empty_line,output4,empty_line,output6,empty_line,output7,output8,output9,empty_line,output10,empty_line,output11)
        #return(p_out1,p_out2,empty_line,p_out3,p_out4,p_out5)
        return(final_dashboard)
        #return(Results2)
    

In [3]:
#pd.set_option('display.max_rows', None)
rolling_analysis("AAPL")