In [1]:
import datetime as dt
import yfinance as yf
import pandas as pd
import numpy as np

In [34]:
start = dt.datetime.today() - dt.timedelta(252*10)
end = dt.datetime.today()

In [26]:
def MACD(DF, a, b, c):
    intra_price = DF.copy()
    intra_price["MA_FAST"] = intra_price["Adj Close"].ewm(span = a, min_periods = a).mean()
    intra_price["MA_SLOW"] = intra_price["Adj Close"].ewm(span = b, min_periods = b).mean()
    intra_price["MACD"] = intra_price["MA_FAST"] - intra_price["MA_SLOW"]
    intra_price["SIGNAL"] = intra_price["Adj Close"].ewm(span = c, min_periods = c).mean()
    intra_price.dropna(inplace=True)
    return intra_price

def SMA(DF, day):
    copy = DF.copy()
    sma = copy.rolling(window=day, min_periods=day).mean() 
    return sma

def EMA(DF, day):
    copy = DF.copy()
    sma = copy.ewm(span=day, min_periods=day).mean() 
    return sma

def ATR(DF, n):
    df = DF.copy()
    df["H-L"] = abs(df["High"] - df["Low"]).shift(1)
    df["H-PC"] = abs(df["High"] - df["Adj Close"]).shift(1)
    df["L-PC"] = abs(df["Low"] - df["Adj Close"]).shift(1)
    df["TR"] = df[["H-L", "H-PC", "L-PC"]].max(axis=1, skipna=False)
    df["ATR"] = df["TR"].rolling(n).mean()
    df2 = df.drop(["H-L", "H-PC", "L-PC"], axis=1)
    return df2

def BollBnd(DF, n):
    df = DF.copy()
    df["MA"] = df["Adj Close"].rolling(n).mean()
    df["BB_UP"] = df["MA"] + 2*df["MA"].rolling(n).std()
    df["BB_DOWN"] = df["MA"] - 2*df["MA"].rolling(n).std()
    df["BB_RANGE"] = df["BB_UP"] - df["BB_DOWN"]
    df.dropna(inplace=True)
    return df

def RSI(DF, n):
    df = DF.copy()
    print(len(df))
    df["delta"] = df["Adj Close"] - df["Adj Close"].shift(1)
    df["gain"] = np.where(df["delta"]>=0, df["delta"], 0)
    df["loss"] = np.where(df["delta"]<0, abs(df["delta"]), 0)
    avg_gain = []
    avg_loss = []
    gain = df["gain"].tolist()
    loss = df["loss"].tolist()
    for i in range(len(df)):
        if i < n:
            avg_gain.append(np.NaN)
            avg_loss.append(np.NaN)
        elif i == n:
            avg_gain.append(df["gain"].rolling(n).mean().tolist()[n])
            avg_loss.append(df["loss"].rolling(n).mean().tolist()[n])
        elif i > n:
            avg_gain.append((avg_gain[i - 1]*(n - 1) + gain[i])/n)
            avg_loss.append((avg_loss[i - 1]*(n - 1) + loss[i])/n)
    df["avg_gain"] = np.array(avg_gain)

    df["avg_loss"] = np.array(avg_loss)
    df["RS"] = df["avg_gain"]/df["avg_loss"]
    df["RSI"] = 100 - (100/(1+df["RS"]))
    return df["RSI"]

def ADX(DF, n):
    df2 = DF.copy()
    df2["TR"] = ATR(df2, n)["TR"]
    df2["DMplus"] = np.where((df2["High"] - df2["High"].shift(1))>(df2["Low"].shift(1)-df2["Low"]), 
                             df2["High"] - df2["High"].shift(1),
                             0)
    df2["DMplus"] = np.where(df2["DMplus"]<0, 0, df2["DMplus"])
    df2["DMminus"] = np.where((df2["Low"].shift(1)-df2["Low"])>(df2["High"] - df2["High"].shift(1)),
                             df2["Low"].shift(1)-df2["Low"],
                             0)
    df2["DMminus"] = np.where(df2["DMminus"]<0, 0, df2["DMminus"])
    TRn=[]
    DMplusN=[]
    DMminusN=[]
    TR = df2["TR"].tolist()
    DMplus = df2['DMplus'].tolist()
    DMminus = df2['DMminus'].tolist()
    for i in range(len(df2)):
        if i<n:
            TRn.append(np.NaN)
            DMplusN.append(np.NaN)
            DMminusN.append(np.NaN)
        elif i == n:
            TRn.append(df2["TR"].rolling(n).sum().tolist()[n])
            DMplusN.append(df2["DMplus"].rolling(n).sum().tolist()[n])
            DMminusN.append(df2["DMminus"].rolling(n).sum().tolist()[n])
        else:
            TRn.append(TRn[i-1] - (TRn[i-1]/14) + TR[i])
            DMplusN.append(DMplusN[i-1] - (DMplusN[i-1]/14) + DMplus[i])
            DMminusN.append(DMminusN[i-1] - (DMminusN[i-1]/14) + DMminus[i])
            
    df2["TRn"] = np.array(TRn)
    df2["DMplusN"] = np.array(DMplusN)
    df2["DMminusN"] = np.array(DMminusN)
    df2["DIplusN"] = (100*df2["DMplusN"]/df2["TRn"])
    df2["DIminusN"] = (100*df2["DMminusN"]/df2["TRn"])
    df2["DIsum"] =  df2["DIplusN"] + df2["DIminusN"] 
    df2["DIdiff"] =  abs(df2["DIplusN"] - df2["DIminusN"])
    df2["DX"] = 100*(df2["DIdiff"]/df2["DIsum"])
    ADX = []
    DX = df2["DX"].tolist()
    for j in range(len(df2)):
        if j < 2*n-1:
            ADX.append(np.NaN)
        elif j == 2*n-1:
            ADX.append(df2["DX"][j-n+1: j+1].mean())
        elif j > 2*n-1:
            ADX.append(((n - 1)*ADX[j-1] + DX[j])/n)
    df2["ADX"] =np.array(ADX)
    return df2["ADX"]
        

In [35]:
def calTechnicalIndicator(stocks, start, end):
    sticker_map = {}
    for ticker in stocks:
        MSTF_cl = pd.DataFrame()
        MSTF_cl = yf.download(ticker, start, end)
        daily_return = MSTF_cl.pct_change()["Adj Close"]
        MSTF_cl["RETURN"] = daily_return
        MSTF_cl["MACD"] = MACD(MSTF_cl, 12, 26, 9)["SIGNAL"]
        MSTF_cl["SMA"] = SMA(daily_return, 5)
        MSTF_cl["EMA"] = EMA(daily_return, 5)
        MSTF_cl = ATR(MSTF_cl, 20)
        MSTF_cl = BollBnd(MSTF_cl,10)
        MSTF_cl["RSI"] = np.array(RSI(MSTF_cl, 14))
        MSTF_cl["ADX"] = np.array(ADX(MSTF_cl, 14))
        MSTF_cl = MSTF_cl.dropna(axis=0)
        MSTF_cl.to_csv(ticker + ".csv")
        sticker_map[ticker] = MSTF_cl
    return sticker_map

In [37]:
stocks = ["KNDI", "BLNK", "TSLA", "FUV", "XPEV", "LI", "NIO", "GP", "QS", "FSR" ]
# stocks = ["KNDI", "BLNK", "TSLA", "FUV"]  # "FSR","GP", "QS"
# chinese_stocks = ["NIO", "XPEV", "LI"] 
stickers_map = calTechnicalIndicator(stocks, start, end)

[*********************100%***********************]  1 of 1 completed
1711
[*********************100%***********************]  1 of 1 completed
1711
[*********************100%***********************]  1 of 1 completed
1711
[*********************100%***********************]  1 of 1 completed
868
[*********************100%***********************]  1 of 1 completed
130
[*********************100%***********************]  1 of 1 completed
150
[*********************100%***********************]  1 of 1 completed
623
[*********************100%***********************]  1 of 1 completed
129
[*********************100%***********************]  1 of 1 completed
138
[*********************100%***********************]  1 of 1 completed
609


In [38]:
import seaborn as sns
import numpy as np # linear algebra
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split
from sklearn import preprocessing
from sklearn import utils
from sklearn import metrics
from sklearn.metrics import precision_score

In [39]:
X_train = np.zeros(shape=(0,12))
X_test = np.zeros(shape=(0,12))
Y_train = []
Y_test = []

for ticker in stocks:
    df = stickers_map[ticker]
    y_t = np.array(df['RETURN'])
    y_t = np.where(y_t < 0, 0, 1)

    X_t = df.copy()
    X_t = X_t.drop(["Open", "High", "Low", "Adj Close", "Volume", "RETURN" ],axis=1)
    X_t = np.array(X_t)
    
    lab_enc = preprocessing.LabelEncoder()
    y_t = lab_enc.fit_transform(y_t)

    print("shape of Y :"+ str(y_t.shape))
    print("shape of X :"+ str(X_t.shape))

    from sklearn.preprocessing import MinMaxScaler
    scaler = MinMaxScaler()
    X_t = scaler.fit_transform(X_t)
    n = len(y_t)
    split = (int)(n/5)

    X_train = np.concatenate((X_train, X_t[: -split]), axis=0)
    X_test = np.concatenate((X_test , X_t[-split :]), axis=0)
    Y_train = np.concatenate((Y_train, y_t[: -split]), axis=0) 
    Y_test = np.concatenate((Y_test , y_t[-split :]), axis =0)

print("shape of X_train :"+ str(X_train.shape))
print("shape of X_test :"+ str(X_test.shape))

shape of Y :(1684,)
shape of X :(1684, 12)
shape of Y :(1684,)
shape of X :(1684, 12)
shape of Y :(1684,)
shape of X :(1684, 12)
shape of Y :(841,)
shape of X :(841, 12)
shape of Y :(103,)
shape of X :(103, 12)
shape of Y :(123,)
shape of X :(123, 12)
shape of Y :(596,)
shape of X :(596, 12)
shape of Y :(102,)
shape of X :(102, 12)
shape of Y :(111,)
shape of X :(111, 12)
shape of Y :(582,)
shape of X :(582, 12)
shape of X_train :(6013, 12)
shape of X_test :(1497, 12)


In [40]:
TP =0
FP =0
TN =0
FN =0
for i in range(len(predict)):
    if Y_predict[i] == 1 and Y_test[i] == 1:
        TP+=1
    elif Y_predict[i] == 1 and Y_test[i] == 0:
        FP+=1
    elif Y_predict[i] == 0 and Y_test[i] == 0:
        TN+=1
    elif Y_predict[i] == 0 and Y_test[i] == 1:
        FP+=1

NameError: name 'predict' is not defined

In [41]:
for this_C in [1,3,5,10,40,60,80,100]:

    clf = SVC(kernel='linear',C=this_C).fit(X_train,Y_train)
    scoretrain = clf.score(X_train,Y_train)
    scoretest  = clf.score(X_test,Y_test)
    Y_predict = clf.predict(X_test)
    precisiontest = precision_score(Y_predict, Y_test)
    fpr, tpr, thresholds = metrics.roc_curve(Y_test, Y_predict)
    roc = metrics.auc(fpr, tpr)
    print("Linear SVM value of C:{}, training score :{:2f} , Test score: {:2f}, ".format(this_C,scoretrain,scoretest))
    print("Test precision: {:2f}, Test roc: {:2f}\n".format(precisiontest, roc))

for this_C in [1,3,5,10,40,60,80,100]:
    clf = SVC(kernel='poly',C=this_C).fit(X_train,Y_train)
    scoretrain = clf.score(X_train,Y_train)
    scoretest  = clf.score(X_test,Y_test)
    Y_predict = clf.predict(X_test)
    precision = precision_score(Y_predict, Y_test)
    fpr, tpr, thresholds = metrics.roc_curve(Y_test, Y_predict)
    roc = metrics.auc(fpr, tpr)  
    print("Poly SVM value of C:{}, training score :{:2f} , Test score: {:2f}, ".format(this_C,scoretrain,scoretest))
    print("Test precision: {:2f}, Test roc: {:2f}\n".format(precisiontest, roc))

for this_C in [1,3,5,10,40,60,80,100]:
    clf = SVC(kernel='rbf',C=this_C).fit(X_train,Y_train)
    scoretrain = clf.score(X_train,Y_train)
    scoretest  = clf.score(X_test,Y_test)
    Y_predict = clf.predict(X_test)
    precision = precision_score(Y_predict, Y_test)
    fpr, tpr, thresholds = metrics.roc_curve(Y_test, Y_predict)
    roc = metrics.auc(fpr, tpr)
    print("RBF SVM value of C:{}, training score :{:2f} , Test score: {:2f}, ".format(this_C,scoretrain,scoretest))
    print("Test precision: {:2f}, Test roc: {:2f}\n".format(precisiontest, roc))

Linear SVM value of C:1, training score :0.651256 , Test score: 0.647963, 
Test precision: 0.760695, Test roc: 0.648038

Linear SVM value of C:3, training score :0.652919 , Test score: 0.639947, 
Test precision: 0.743316, Test roc: 0.640016

Linear SVM value of C:5, training score :0.655247 , Test score: 0.637275, 
Test precision: 0.744652, Test roc: 0.637346

Linear SVM value of C:10, training score :0.654914 , Test score: 0.637943, 
Test precision: 0.752674, Test roc: 0.638019

Linear SVM value of C:40, training score :0.657243 , Test score: 0.643955, 
Test precision: 0.750000, Test roc: 0.644025

Linear SVM value of C:60, training score :0.656744 , Test score: 0.643955, 
Test precision: 0.751337, Test roc: 0.644026

Linear SVM value of C:80, training score :0.656078 , Test score: 0.642619, 
Test precision: 0.750000, Test roc: 0.642690

Linear SVM value of C:100, training score :0.657243 , Test score: 0.643287, 
Test precision: 0.751337, Test roc: 0.643359

Poly SVM value of C:1, tra

In [None]:
# parameters = [
#     {'C': [1,3,5,10,40,60,80,100], 
#      'gamma': np.logspace(-9, 3, 13)}]

# rbf_clf = GridSearchCV(SVC(kernel='rbf'), 
#                    parameters, 
#                    cv = StratifiedKFold(n_splits = 10, shuffle = True, random_state = 2021))

# rbf_clf.fit(X_train, y_train)

In [None]:
# print('best parameters: ', rbf_clf.best_params_)
# y_train_pred = rbf_clf.predict(X_train)
# y_test_pred = rbf_clf.predict(X_test)
# accuracy_train = accuracy_score(y_train, y_train_pred)
# accuracy_test = accuracy_score(y_test, y_test_pred)
# precisiontest = precision_score(y_test, y_test_pred)
# fpr, tpr, thresholds = metrics.roc_curve(y_test, y_test_pred)
# roc = metrics.auc(fpr, tpr)
# print("RBF SVM value of C:{}, gamma:{}, training accuracy :{:2f} , Test accuracy: {:2f}".format(rbf_clf.best_params_["C"],rbf_clf.best_params_["gamma"],accuracy_train,accuracy_test))
# print("Test precision: {:2f}, Test roc: {:2f}\n".format(precisiontest, roc))