In [None]:
import yfinance as yf
import datetime 
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.pyplot import MultipleLocator 

today = str(datetime.date.today())
stock_list = ['^GSPC', '^VIX']
start_years='2002-11-01'
df = yf.download(stock_list[0], start = start_years, end = today).drop(columns='Adj Close')
vix = yf.download(stock_list[1], start = start_years, end = today)#.reset_index()

df['VIX']=vix['Close']
df['VixMA']=df["VIX"].rolling(10,min_periods=1).mean() #.median()#
df['Daily Return'] = df['Close'].pct_change()
df['std'] = df['Daily Return'].rolling(20,min_periods=1).std()
df['VIXDiffer']=df['VIX']-df['VixMA']
df['VIXup']= df['VIXDiffer'].apply(lambda x: 1 if x>0 else 0)
df['Close0']=(df['Close']-df['Open'])/df['Open']
df['MA10']=df['Close'].rolling(10,min_periods=1).mean()
df['MA60']=df['Close'].rolling(60,min_periods=1).mean()
df['Vol0']=(df['Volume']-df['Volume'].rolling(10,min_periods=1).mean())/df['Volume']
df['MA10_0']=(df['Close']-df['MA10'])/df['MA10']
df['MA60_0']=(df['Close']-df['MA60'])/df['MA60']
df['Mid']=(df['Close']-df['Low'])/(df['High']-df['Low'])
df['TR']=(df['High']-df['Low'])/df['Close']
df['Open0']=df['Open']-df['Close'].shift(1)
#df['5d'] = df['Daily Return'].rolling(5,min_periods=1).median()
df['diff']=df['Daily Return'].shift(-1)
df['up']= df['diff'].apply(lambda x: 2 if x>0.0035 else (0 if x<-0.0025 else 1 ))

df=df[:-1]
df=df[60:]

#.....................................
fed = pd.read_csv('./FEDFUNDS.csv')
fed['DATE'] = pd.to_datetime(fed['DATE'])
fed.set_index("DATE" , inplace=True)

fed['Label1'] = (fed['FEDFUNDS'] - fed['FEDFUNDS'].shift())
start = 1 if fed.iloc[0]['Label1'] > 0 else 0
label2 = [start]
for i in range(1,len(fed)):
    if label2[i-1] == 1 and fed['Label1'].tolist()[i] < 0 and abs(fed['Label1'].tolist()[i]) > 0.25:
        start -= 1
    if label2[i-1] == 0 and fed['Label1'].tolist()[i] > 0 and abs(fed['Label1'].tolist()[i]) > 0.25:
        start += 1
    label2.append(start)
fed['fed'] = label2

fed1 = pd.concat([df, fed], join='outer') 
fed1=fed1.sort_index()

fed1 = fed1.fillna(method='ffill').dropna()
fed1 = fed1[~fed1.index.duplicated(keep='last')]
fed1 = fed1.drop_duplicates(subset=['Volume', 'Open', 'High', 'Low'],keep='first')

#.....................................
pe = pd.read_csv('./PE Ratio (TTM) for the S&P 500 2022-11-09 01_00_21.csv').rename(columns={'Value':'PE'})#.drop(columns = 'YOY (%)')
pe['Date'] = pd.to_datetime(pe['Date'])
pe.set_index("Date" , inplace=True)
#pe.index = pe.index.strftime('%Y/%m/%D')
df = pd.merge(fed1, pe, left_index=True, right_index=True, how='outer')
df=df.sort_index()
df = df.fillna(method='ffill').dropna()
df = df.drop_duplicates(subset=['Volume', 'Open', 'High', 'Low'],keep='first')
#.....................................
df['PE_FFR']=100/df['PE']-df['FEDFUNDS']

print('漲: ',len(df.loc[df['up'] == 2]), '\n平: ',len(df.loc[df['up'] == 1]), '\n跌: ',len(df.loc[df['up'] == 0]))
#X=df[['VIXDiffer','std','Close0','Mid','Vol0','MA10_0','MA60_0','TR','Open0','PE_FFR','PE','FEDFUNDS','fed']]
X=df[['Daily Return','TR','Mid','std','Open0','Close0','Vol0','MA10_0','MA60_0','PE_FFR','fed','FEDFUNDS','VIXDiffer']]
y=df[['up']]
#



In [None]:

import sklearn.model_selection
X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(X, y, test_size=0.3, shuffle=False)
#y_date = date[-len(y_test):].reset_index(drop=True)

print('訓練資料: ',len((y_train)))
print('測試資料: ',len((y_test)))
ax = plt.gca()
plt.plot(df['Close'].head(len(y_train)),color="red", label='Train')
plt.plot(df['Close'].tail(len(y_test)),color="blue", label='Test')
plt.xticks(rotation='30')
ax.xaxis.set_major_locator(MultipleLocator(800))
plt.title("Train & Test Data")
plt.legend()
plt.show()

df = df.reset_index(drop=True)
from xgboost import XGBClassifier
from sklearn.metrics import accuracy_score

model = XGBClassifier(max_depth=3,learning_rate =0.1,n_estimators=50,booster='gbtree', objective='multi:softprob')
model.fit(X_train, y_train)

y_pred = model.predict(X_test)
#y_pred_xgb = model.predict_proba(X_test)#[:, 1]
accuracy = accuracy_score(y_test, y_pred )

#print("Accuracy: %.2f%%" % (accuracy * 100.0))
print("訓練集 Accuracy: %.2f%%" % (model.score(X_train,y_train)*100.0))
print("測試集 Accuracy: %.2f%%" % (model.score(X_test,y_test)*100.0))



In [None]:
from sklearn.metrics import confusion_matrix,ConfusionMatrixDisplay
confusion_matrix = confusion_matrix(y_test, y_pred)
cm_display = ConfusionMatrixDisplay(confusion_matrix = confusion_matrix, display_labels = [0,1,2])
cm_display.plot()
plt.show()

from sklearn.metrics import classification_report
report = classification_report(y_test, y_pred, labels=[0,1,2], target_names=["Down", "Unchanged", "Up"])
print(report)

preds = model.predict_proba(X_test)

from sklearn.preprocessing import label_binarize
from sklearn.metrics import roc_curve, auc
y_test = label_binarize(y_test, classes=[0,1,2])
fpr, tpr, roc_auc = dict(), dict(), dict()
for i in range(3):
    fpr[i], tpr[i], _ = roc_curve(y_test[:,i], preds[:,i])
    roc_auc[i] = auc(fpr[i], tpr[i])

for i in range(3):
    plt.plot(fpr[i], tpr[i], label=f'AUC = {roc_auc[i]:.4f}')
    plt.ylabel('True Positive Rate')
    plt.xlabel('False Positive Rate')   
    if i==2: 
       plt.title('ROC Curve: '+  "Up" )
    else:
       if i==1: 
          plt.title('ROC Curve: '+  "Unchanged" )
       else:
          plt.title('ROC Curve: '+  "Down" )

    plt.legend()
    plt.show()



In [None]:
#
df1 = df[-len(y_pred):].reset_index()
df1['check'] = y_pred
df1['income'] = df1['Close'] - df1['Close'].shift()

buy = False
df1['principal'] = 0
df1['buy_and_hold'] = 0
for i in range(len(df1)-1):
    if df1.iloc[i]['check'] == 2 and buy == False:
        df1.loc[i+1,'principal'] = df1.iloc[i]['principal'] + df1.iloc[i+1]['income']
        buy = True
    
    elif df1.iloc[i]['check'] == 0 and buy == True:
        df1.loc[i+1,'principal'] = df1.iloc[i]['principal']
        buy = False
    
    elif (df1.iloc[i]['check'] == 1 and buy == True) or (df1.iloc[i]['check'] == 2 and buy == True):
        df1.loc[i+1,'principal'] = df1.iloc[i]['principal'] + df1.iloc[i+1]['income']
    else:
        df1.loc[i+1,'principal'] = df1.iloc[i]['principal']
    
    df1.loc[i+1, 'buy_and_hold'] = df1.iloc[i]['buy_and_hold'] + df1.iloc[i+1]['income']
 
ax = plt.gca()
plt.rcParams['figure.figsize']=(10,6.4)
plt.plot(df1['buy_and_hold'], label='buy_and_hold')
plt.plot(df1['principal'], label='Model')
plt.xticks(rotation='30')
ax.xaxis.set_major_locator(MultipleLocator(200))
plt.title(f'model P&L: {df1.iloc[-1]["principal"]:.2f} \n buy and hold: {df1.iloc[-1]["buy_and_hold"]:.2f}')
plt.legend()
plt.show()

ax = plt.gca()
plt.rcParams['figure.figsize']=(10,6.4)
plt.plot(df1['buy_and_hold'][-250:], label='buy_and_hold')
plt.plot(df1['principal'][-250:], label='Model')
plt.xticks(rotation='30')
ax.xaxis.set_major_locator(MultipleLocator(10))
plt.title(f'model P&L: {df1.iloc[-1]["principal"]:.2f} \n buy and hold: {df1.iloc[-1]["buy_and_hold"]:.2f}')
plt.legend()
plt.grid()