In [None]:
import pandas as pd
from sqlalchemy import create_engine
from datetime import datetime, timedelta 
import copy
from tqdm import tqdm
import numpy as np
import itertools 
from sklearn.metrics import mean_squared_error
from sklearn.metrics import mean_absolute_error
import random

In [None]:
#時間範圍
end_date = datetime.today().date()
start_date = end_date - pd.tseries.offsets.DateOffset(years = 6)
# start_date = datetime(2011, 1, 1)

# Get data from SQL

In [None]:
con_syting = "mysql+pymysql://{user}:{pw}@{localhost}:{port}/{db}"
engine70 = create_engine(con_syting.format(user="XXXX",
                                           pw="XXXX",
                                           localhost="XXXX",
                                           db="XXXX",
                                           port=3306))

In [None]:
#'''Get EPS data from SQL'''
def get_Eps_data(start_date = datetime(2011, 1, 31), end_date = datetime.today().date()):
    start_date, end_date = str(start_date), str(end_date)
    sqrl = 'SELECT st.Ticker, st.TickerName, stm.* FROM Stock_TKList st\nLEFT JOIN FP.Stock_TMParameter stm ON st.TKL_ID = stm.TKL_ID \nWHERE st.TKL_ID IN (SELECT TKL_ID FROM Stock_TKList WHERE MAC_ID IN (63, 64))AND SheetDate >= "' + start_date + '" AND SheetDate <= "' + end_date + '"'
    df = pd.read_sql(sqrl, engine70)

    df['SheetDate'] = df['SheetDate'] - pd.tseries.offsets.DateOffset(days=1) + pd.tseries.offsets.MonthEnd()
    #確認日期為當季最後一天： 若3/31 + pd.tseries.offsets.QuarterEnd()會變為6/30 (錯)
    #日期先減一天，避免最後一天直接加上pd.tseries.offsets.QuarterEnd()變為下個季度

    df = df.drop_duplicates(subset=['Ticker','SheetDate'])
#     df = df[["SheetDate", "Ticker", "TickerName", "Eps", "QuarterlyRevenue"]]
    df = df[["SheetDate", "Ticker", "TickerName", "Eps", "QuarterlyRevenue", "GrossProfit", "OPIncomeLoss", "PreTaxIncome"]]
    df = df.sort_values(by = ["Ticker", "SheetDate"]).reset_index(drop = True)
    
    print("Get EPS data from SQL")
    return df

In [None]:
#'''Get MonthRevenue data from SQL'''
def get_MonthRevenue_data(start_date = datetime(2011, 1, 31), end_date = datetime.today().date()):
    start_date, end_date = str(start_date), str(end_date)
    sqrl = 'SELECT st.Ticker, st.TickerName, stm.* FROM Stock_TKList st\nLEFT JOIN FP.Stock_TMRevenue stm ON st.TKL_ID = stm.TKL_ID \nWHERE st.TKL_ID IN (SELECT TKL_ID FROM Stock_TKList WHERE MAC_ID IN (63, 64))AND SheetDate >= "' + start_date + '" AND SheetDate <= "' + end_date + '"'
    df = pd.read_sql(sqrl, engine70)

    df['SheetDate'] = df['SheetDate'] - pd.tseries.offsets.DateOffset(days=1) + pd.tseries.offsets.MonthEnd()
    #確認日期為當季最後一天： 若3/31 + pd.tseries.offsets.QuarterEnd()會變為4/30 (錯)
    #日期先減一天，避免最後一天直接加上pd.tseries.offsets.QuarterEnd()變為下個月
    df = df.drop_duplicates(subset=['Ticker','SheetDate'])
    df = df[["SheetDate", "Ticker", "TickerName", "MonthRevenue"]]
    df = df.sort_values(by = ["Ticker", "SheetDate"]).reset_index(drop = True)
    
    print("Get MonthRevenue data from SQL")
    return df

In [None]:
#'''Get financial ratio data from SQL'''
def get_Ratio_data(start_date = datetime(2011, 1, 31), end_date = datetime.today().date()):
    start_date, end_date = str(start_date), str(end_date)
    sqrl = 'SELECT st.Ticker, st.TickerName, stm.* FROM Stock_TKList st\nLEFT JOIN FP.Stock_MCParameter stm ON st.TKL_ID = stm.TKL_ID \nWHERE st.TKL_ID IN (SELECT TKL_ID FROM Stock_TKList WHERE MAC_ID IN (63, 64))AND SheetDate >= "' + start_date + '" AND SheetDate <= "' + end_date + '"'
    df = pd.read_sql(sqrl, engine70)

    df['SheetDate'] = df['SheetDate'] - pd.tseries.offsets.DateOffset(days=1) + pd.tseries.offsets.MonthEnd()
    #確認日期為當季最後一天： 若3/31 + pd.tseries.offsets.QuarterEnd()會變為6/30 (錯)
    #日期先減一天，避免最後一天直接加上pd.tseries.offsets.QuarterEnd()變為下個季度

    df = df.drop_duplicates(subset=['Ticker','SheetDate'])
    df = df[["SheetDate", "Ticker", "GrossMargin", "ROE"]]
    df = df.sort_values(by = ["Ticker", "SheetDate"]).reset_index(drop = True)
    
    print("Get financial ratio data from SQL")
    return df

In [None]:
#'''Get Price & PE data from SQL'''
def get_PE_data(start_date = datetime(2011, 1, 31), end_date = datetime.today().date()):
    start_date, end_date = str(start_date), str(end_date)
    sqrl = 'SELECT st.Ticker, st.TickerName, stm.* FROM Stock_TKList st\nLEFT JOIN FP.Stock_TDPrice stm ON st.TKL_ID = stm.TKL_ID \nWHERE st.TKL_ID IN (SELECT TKL_ID FROM Stock_TKList WHERE MAC_ID IN (63, 64))AND PriceDate >= "' + start_date + '" AND PriceDate <= "' + end_date + '"'
    df = pd.read_sql(sqrl, engine70)
    
    df = df.drop_duplicates(subset=['Ticker','PriceDate'])
    df = df[["PriceDate", "Ticker", "TickerName", "Price", "PE"]]
    df = df.sort_values(by = ["Ticker", "PriceDate"]).reset_index(drop = True)
    
    print("Get PE data from SQL")
    return df

In [None]:
#'''Get industry types'''
def get_industry_data(start_date = datetime(2011, 1, 31), end_date = datetime.today().date()):
    start_date, end_date = str(start_date), str(end_date)
    sqrl = 'SELECT st.Ticker, stm.* FROM Stock_TKList st\nLEFT JOIN FP.Stock_ITName stm ON st.ITN_ID = stm.ITN_ID \nWHERE st.TKL_ID IN (SELECT TKL_ID FROM Stock_TKList WHERE MAC_ID IN (63, 64))'
    df = pd.read_sql(sqrl, engine70)

    print("Get industry types")
    return df

# Filter

In [None]:
#'''獲取各間公司的季財報資料'''
def eps_group_data(group, ticker = "2330", date_name = "SheetDate"):
    comp_ = group.get_group(ticker)
    comp = copy.deepcopy(comp_)
    comp[date_name] = pd.to_datetime(comp[date_name])
    comp = comp.sort_values(by = ["Ticker", "SheetDate"]).reset_index(drop = True)

    #創建完整時間軸
    date_check = pd.date_range(start = comp[date_name].iloc[0],
                               end = comp[date_name].iloc[-1],
                               freq = "Q")
    
    #填補缺失的時間
    comp = comp.merge(pd.DataFrame(date_check, columns = [date_name]),
                      on = date_name, how = "right")
    comp = comp.fillna(method='bfill').fillna(method='ffill')#填補缺失值
    
    return comp, comp[date_name].iloc[0], comp[date_name].iloc[-1]

In [None]:
#'''強勢成長股的篩選器'''
def get_Strong_filter(start_date, end_date):
    #需要變數：Eps、季營收、毛利率、業外損益佔比
    
    data = get_Eps_data(start_date, end_date)#Eps、季營收
    data["GrossMargin"] = data["GrossProfit"] / data["QuarterlyRevenue"]

    #業外損益佔比 （業外損益 / 稅前淨利）
    data["OPIncomeLossRatio"] = data["OPIncomeLoss"] / data["PreTaxIncome"]


    #依緒建置每間公司的filter
    group1 = data.groupby("Ticker")
    df = pd.DataFrame()
    Ticker = list(set(data["Ticker"]))
    Ticker.sort()
    for ticker in tqdm(Ticker):
    # ticker = "2514"
    # for i in range(1):
        #獲取各間公司的財報資料
        comp1, eps_start, eps_end = eps_group_data(group = group1, ticker = ticker,
                                                   date_name = "SheetDate")


        #-0.2 < 業外損益佔比 < 0.2 (近8季中至少有6次達標)
        comp1["OPIncomeLossRatio%"] =  (comp1["OPIncomeLossRatio"] > -0.20) & (comp1["OPIncomeLossRatio"] < 0.20)
        comp1["OP_8Q"] = comp1["OPIncomeLossRatio%"].rolling(8).sum()#近八季達標次數
        comp1["OP_8Q_F"] = comp1["OP_8Q"] >= 6


        #EPS連續2年，每年成長>10％
        comp1["EPS_1_4Q"] = comp1["Eps"].rolling(4).sum().round(2)#累積近4季
        comp1["EPS_5_8Q"] = (comp1["Eps"].rolling(8).sum() - comp1["Eps"].rolling(4).sum()).round(2)#累積近5-8季
        comp1["EPS_9_12Q"] = (comp1["Eps"].rolling(12).sum() - comp1["Eps"].rolling(8).sum()).round(2)#累積近9-12季

        comp1["EPSG_1y"] = (comp1["EPS_1_4Q"] - comp1["EPS_5_8Q"]) / abs(comp1["EPS_5_8Q"])#近1年成長率
        #comp1["EPSG_2y"] = (comp1["EPS_5_8Q"] - comp1["EPS_9_12Q"]) / abs(comp1["EPS_9_12Q"])#近2年成長率

        comp1["EPSG_2y_F"] = (comp1["EPSG_1y"] > 0.10)# & (comp1["EPSG_2y"] > 0.10)


        #EPS近2年，每年平均 > 0.5元
        comp1["EPS_2y"] = comp1["Eps"].rolling(8).sum() / 2 #EPS近2年，每年平均
        comp1["EPS_2y_F"] = (comp1["EPS_2y"] > 0.5)


        #毛利率連續2年，每年成長 > 3％
        comp1["Gross_1_4Q"] = comp1["GrossMargin"].rolling(4).sum()#累積近4季
        comp1["Gross_5_8Q"] = (comp1["GrossMargin"].rolling(8).sum() - comp1["GrossMargin"].rolling(4).sum())#累積近5-8季
        comp1["Gross_9_12Q"] = (comp1["GrossMargin"].rolling(12).sum() - comp1["GrossMargin"].rolling(8).sum())#累積近9-12季

        comp1["Gross_1y"] = (comp1["Gross_1_4Q"] - comp1["Gross_5_8Q"]) / abs(comp1["Gross_5_8Q"])#近1年成長率
        #comp1["Gross_2y"] = (comp1["Gross_5_8Q"] - comp1["Gross_9_12Q"]) / abs(comp1["Gross_9_12Q"])#近2年成長率

        comp1["Gross_2y_F"] = (comp1["Gross_1y"] > 0.03)# & (comp1["Gross_2y"] > 0.03)


        #季營收連續2年，每年成長 > 20％
        comp1["Revenue_1_4Q"] = comp1["QuarterlyRevenue"].rolling(4).sum()#累積近4季
        comp1["Revenue_5_8Q"] = (comp1["QuarterlyRevenue"].rolling(8).sum() - comp1["QuarterlyRevenue"].rolling(4).sum())#累積近5-8季
        comp1["Revenue_9_12Q"] = (comp1["QuarterlyRevenue"].rolling(12).sum() - comp1["QuarterlyRevenue"].rolling(8).sum())#累積近9-12季

        comp1["Revenue_1y"] = (comp1["Revenue_1_4Q"] - comp1["Revenue_5_8Q"]) / abs(comp1["Revenue_5_8Q"])#近1年成長率
        #comp1["Revenue_2y"] = (comp1["Revenue_5_8Q"] - comp1["Revenue_9_12Q"]) / abs(comp1["Revenue_9_12Q"])#近2年成長率

        comp1["Revenue_2y_F"] = (comp1["Revenue_1y"] > 0.20)# & (comp1["Revenue_2y"] > 0.20)


        #生成Filter: 第n個sample中所有資料皆為True
        comp1["Filter"] = comp1[["OP_8Q_F", "EPSG_2y_F", "EPS_2y_F", "Gross_2y_F", "Revenue_2y_F"]].all(axis = 1)


        comp1.replace(float("inf"), float("nan"), inplace = True)#算成長率時分母為0會出現inf
        comp1.replace(float("-inf"), float("nan"), inplace = True)
        comp1.dropna(subset = ["EPSG_1y", "Gross_1y",
                               "Revenue_1y"], inplace = True)

        comp1.reset_index(drop = True, inplace = True)

        comp1 = comp1[["SheetDate", "Ticker", "OP_8Q_F", "EPSG_2y_F", "EPS_2y_F", "Gross_2y_F", "Revenue_2y_F", "Filter"]]
        comp1 = comp1[comp1["Filter"] == 1]

        if len(comp1) != 0:
            df = pd.concat([df, comp1], axis = 0)
            df.reset_index(inplace = True, drop = True)
        
    print("強勢成長股的篩選器")
    return df

In [None]:
#'''長期成長股的篩選器'''
def get_Long_filter(start_date, end_date):
    #需要變數：Eps、季營收、毛利率、業外損益佔比
    
    data = get_Eps_data(start_date, end_date)#Eps、季營收
    data["GrossMargin"] = data["GrossProfit"] / data["QuarterlyRevenue"]

    
    #業外損益佔比 （業外損益 / 稅前淨利）
    data["OPIncomeLossRatio"] = data["OPIncomeLoss"] / data["PreTaxIncome"]
    
    
    #依緒建置每間公司的filter
    group1 = data.groupby("Ticker")
    df = pd.DataFrame()
    Ticker = list(set(data["Ticker"]))
    Ticker.sort()
    for ticker in tqdm(Ticker):
        #獲取各間公司的財報資料
        comp1, eps_start, eps_end = eps_group_data(group = group1, ticker = ticker,
                                                   date_name = "SheetDate")

        
        #-0.2 < 業外損益佔比 < 0.2 (近16季中至少有12次達標)
        comp1["OPIncomeLossRatio"] =  (comp1["OPIncomeLossRatio"] > -0.20) & (comp1["OPIncomeLossRatio"] < 0.20)
        comp1["OP_16Q"] = comp1["OPIncomeLossRatio"].rolling(16).sum()#近16季達標次數
        comp1["OP_16Q_F"] = comp1["OP_16Q"] >= 12

        
        #EPS連續5年，每年成長 > 1％
        comp1["EPS_1_4Q"] = comp1["Eps"].rolling(4).sum().round(2)#累積近4季
        comp1["EPS_5_8Q"] = (comp1["Eps"].rolling(8).sum() - comp1["Eps"].rolling(4).sum()).round(2)#累積近5-8季
        comp1["EPS_9_12Q"] = (comp1["Eps"].rolling(12).sum() - comp1["Eps"].rolling(8).sum()).round(2)#累積近9-12季
        comp1["EPS_13_16Q"] = (comp1["Eps"].rolling(16).sum() - comp1["Eps"].rolling(12).sum()).round(2)#累積近13-16季
        comp1["EPS_17_20Q"] = (comp1["Eps"].rolling(20).sum() - comp1["Eps"].rolling(16).sum()).round(2)#累積近17-20季
        #comp1["EPS_21_24Q"] = (comp1["Eps"].rolling(24).sum() - comp1["Eps"].rolling(20).sum()).round(2)#累積近21-24季

        comp1["EPSG_1y"] = (comp1["EPS_1_4Q"] - comp1["EPS_5_8Q"]) / abs(comp1["EPS_5_8Q"])#近1年成長率
        comp1["EPSG_2y"] = (comp1["EPS_5_8Q"] - comp1["EPS_9_12Q"]) / abs(comp1["EPS_9_12Q"])#近2年成長率
        comp1["EPSG_3y"] = (comp1["EPS_9_12Q"] - comp1["EPS_13_16Q"]) / abs(comp1["EPS_13_16Q"])#近3年成長率
        comp1["EPSG_4y"] = (comp1["EPS_13_16Q"] - comp1["EPS_17_20Q"]) / abs(comp1["EPS_17_20Q"])#近4年成長率
        #comp1["EPSG_5y"] = (comp1["EPS_17_20Q"] - comp1["EPS_21_24Q"]) / abs(comp1["EPS_21_24Q"])#近5年成長率

        comp1["EPSG_5y_F"] = (comp1["EPSG_1y"] > 0.01) & (comp1["EPSG_2y"] > 0.01) & (comp1["EPSG_3y"] > 0.01) & (comp1["EPSG_4y"] > 0.01)# & (comp1["EPSG_5y"] > 0.01)


        #EPS近5年，每年平均 > 0.5元
        comp1["EPS_5y"] = comp1["Eps"].rolling(20).sum() / 5 #EPS近5年，每年平均
        comp1["EPS_5y_F"] = (comp1["EPS_5y"] > 0.5)


        #季營收連續5年，每年成長>1％
        comp1["Revenue_1_4Q"] = comp1["QuarterlyRevenue"].rolling(4).sum()#累積近4季
        comp1["Revenue_5_8Q"] = (comp1["QuarterlyRevenue"].rolling(8).sum() - comp1["QuarterlyRevenue"].rolling(4).sum())#累積近5-8季
        comp1["Revenue_9_12Q"] = (comp1["QuarterlyRevenue"].rolling(12).sum() - comp1["QuarterlyRevenue"].rolling(8).sum())#累積近9-12季
        comp1["Revenue_13_16Q"] = (comp1["QuarterlyRevenue"].rolling(16).sum() - comp1["QuarterlyRevenue"].rolling(12).sum())#累積近13-16季
        comp1["Revenue_17_20Q"] = (comp1["QuarterlyRevenue"].rolling(20).sum() - comp1["QuarterlyRevenue"].rolling(16).sum())#累積近17-20季
        #comp1["Revenue_21_24Q"] = (comp1["QuarterlyRevenue"].rolling(24).sum() - comp1["QuarterlyRevenue"].rolling(20).sum())#累積近21-24季
        
        
        comp1["Revenue_1y"] = (comp1["Revenue_1_4Q"] - comp1["Revenue_5_8Q"]) / abs(comp1["Revenue_5_8Q"])#近1年成長率
        comp1["Revenue_2y"] = (comp1["Revenue_5_8Q"] - comp1["Revenue_9_12Q"]) / abs(comp1["Revenue_9_12Q"])#近2年成長率
        comp1["Revenue_3y"] = (comp1["Revenue_9_12Q"] - comp1["Revenue_13_16Q"]) / abs(comp1["Revenue_13_16Q"])#近3年成長率
        comp1["Revenue_4y"] = (comp1["Revenue_13_16Q"] - comp1["Revenue_17_20Q"]) / abs(comp1["Revenue_17_20Q"])#近4年成長率
        #comp1["Revenue_5y"] = (comp1["Revenue_17_20Q"] - comp1["Revenue_21_24Q"]) / abs(comp1["Revenue_21_24Q"])#近5年成長率

        comp1["Revenue_5y_F"] = (comp1["Revenue_1y"] > 0.01) & (comp1["Revenue_2y"] > 0.01) & (comp1["Revenue_3y"] > 0.01) & (comp1["Revenue_4y"] > 0.01)# & (comp1["Revenue_5y"] > 0.01)


        #生成Filter: 第n個sample中所有資料皆為True
        comp1["Filter"] = comp1[["OP_16Q_F", "EPSG_5y_F", "EPS_5y_F", "Revenue_5y_F"]].all(axis = 1)


        comp1.replace(float("inf"), float("nan"), inplace = True)#算成長率時分母為0會出現inf
        comp1.replace(float("-inf"), float("nan"), inplace = True)
        comp1.dropna(subset = ["EPSG_1y", "EPSG_2y", "EPSG_3y", "EPSG_4y",
                               "Revenue_1y", "Revenue_2y", "Revenue_3y", "Revenue_4y"],
                     inplace = True)
        comp1.reset_index(drop = True, inplace = True)

        comp1 = comp1[["SheetDate", "Ticker", "OP_16Q_F", "EPSG_5y_F", "EPS_5y_F", "Revenue_5y_F", "Filter"]]
        comp1 = comp1[comp1["Filter"] == 1]

        if len(comp1) != 0:
            df = pd.concat([df, comp1], axis = 0)
            df.reset_index(inplace = True, drop = True)
        
    print("長期成長股的篩選器")
    return df

# EPS Predict

In [None]:
#'''獲取各間公司的月營收資料'''
def MonthRevenue_group_data(group, ticker, date_name, eps_start, eps_end):
    comp_ = group.get_group(ticker)
    comp = copy.deepcopy(comp_)
    comp[date_name] = pd.to_datetime(comp[date_name])

    
    #創建完整時間軸
    date_check = pd.date_range(start = eps_start,
                               end = eps_end + pd.tseries.offsets.MonthEnd(3),
                               freq = "M")
    
    #填補缺失的時間
    comp = comp.merge(pd.DataFrame(date_check, columns = [date_name]),
                      on = date_name, how = "right")
    comp = comp.fillna(method='bfill').fillna(0)#填補缺失值#還未公開的填0，不計算
    
    return comp

In [None]:
#'''EPS預測資料前處理'''
def Eps_preprocess(start_date, end_date):
    #需要變數：Eps、月營收
    
    data1 = get_Eps_data(start_date, end_date)#Eps
    data1.drop(["QuarterlyRevenue"], axis = 1, inplace = True)
    data2 = get_MonthRevenue_data(start_date, end_date)#月營收


    df = pd.DataFrame()
    group1 = data1.groupby("Ticker")
    group2 = data2.groupby("Ticker")
    Ticker = list(set(data1["Ticker"]) & set(data2["Ticker"]))

    for ticker in tqdm(Ticker):

        #獲取各間公司的季財報資料
        comp1, eps_start, eps_end = eps_group_data(group = group1, ticker = ticker,
                                                   date_name = "SheetDate")


        #近5年EPS成長率
        comp1["EPS_1_4Q"] = comp1["Eps"].rolling(4).sum().round(2)#累積近4季
        comp1["EPS_5_8Q"] = (comp1["Eps"].rolling(8).sum() - comp1["Eps"].rolling(4).sum()).round(2)#累積近5-8季
        comp1["EPS_9_12Q"] = (comp1["Eps"].rolling(12).sum() - comp1["Eps"].rolling(8).sum()).round(2)#累積近9-12季
        comp1["EPS_13_16Q"] = (comp1["Eps"].rolling(16).sum() - comp1["Eps"].rolling(12).sum()).round(2)#累積近13-16季
        comp1["EPS_17_20Q"] = (comp1["Eps"].rolling(20).sum() - comp1["Eps"].rolling(16).sum()).round(2)#累積近17-20季
        comp1["EPS_next4Q"] = comp1["Eps"].shift(-4).rolling(4).sum().round(2)#累積未來4季#算誤差用

        comp1["EPSG_5y"] = ((comp1["EPS_1_4Q"] - comp1["EPS_5_8Q"]) / abs(comp1["EPS_5_8Q"]) +\
                            (comp1["EPS_5_8Q"] - comp1["EPS_9_12Q"]) / abs(comp1["EPS_9_12Q"]) +\
                            (comp1["EPS_9_12Q"] - comp1["EPS_13_16Q"]) / abs(comp1["EPS_13_16Q"]) +\
                            (comp1["EPS_13_16Q"] - comp1["EPS_17_20Q"]) / abs(comp1["EPS_17_20Q"])) / 4


        comp1.replace(float("inf"), float("nan"), inplace = True)#算成長率時分母為0會出現inf
        comp1.replace(float("-inf"), float("nan"), inplace = True)
        comp1.dropna(subset = ["EPSG_5y"], inplace = True)
        comp1.reset_index(drop = True, inplace = True)


        #獲取各間公司的月營收資料
        comp2 = MonthRevenue_group_data(group = group2, ticker = ticker, date_name = "SheetDate",
                                        eps_start = eps_start, eps_end = eps_end)


        comp2.reset_index(inplace = True)#透過index去抓營收
        comp1 = comp1.merge(comp2[["SheetDate", "index"]], on = ["SheetDate"], how = "left")


        #未來一季的營收成長率
        comp1["REV_G"] = 0
        for i in range(len(comp1)):
            #今年未來三個月營收
            start = comp1["index"][i] + 1


            rev_m1, rev_last_m1 = 0, 0
            rev_m2, rev_last_m2 = 0, 0
            rev_m3, rev_last_m3 = 0, 0


            #抓出有營收數字的值
            if comp2.iloc[start]["MonthRevenue"] != 0:#確認下個月有值
                rev_m1 = comp2.iloc[start]["MonthRevenue"]
                rev_last_m1 = comp2.iloc[start-12]["MonthRevenue"]

            if comp2.iloc[start + 1]["MonthRevenue"] != 0:#確認下兩個月有值
                rev_m2 = comp2.iloc[start+1]["MonthRevenue"]
                rev_last_m2 = comp2.iloc[start-11]["MonthRevenue"]

            if comp2.iloc[start + 2]["MonthRevenue"] != 0:#確認下三個月有值
                rev_m3 = comp2.iloc[start+2]["MonthRevenue"]
                rev_last_m3 = comp2.iloc[start-10]["MonthRevenue"]


            #後三個月加總
            revenue = rev_m1 + rev_m2 + rev_m3
            #去年的同期營收
            last_revenue = rev_last_m1 + rev_last_m2 + rev_last_m3


            if last_revenue != 0:#分母不能為0
                comp1.iloc[i, -1] = (revenue - last_revenue) / abs(last_revenue)
            else:
                comp1.iloc[i, -1] = None


        comp1.dropna(subset = ["REV_G"], inplace = True)


#         comp1 = comp1[["SheetDate", "Ticker", "TickerName", "EPS_1_4Q", "EPS_next4Q", "EPSG_5y", "REV_G"]]
        if len(comp1) != 0:
            df = pd.concat([df, comp1], axis = 0)
            df.reset_index(inplace = True, drop = True)
    
    print("EPS預測資料前處理")
    return df

In [None]:
#'''透過篩選器篩出要用的EPS樣本'''

#datachoose: 0 => 所有個股
#datachoose: 1 => 強勢成長股
#datachoose: 2 => 長期成長股
def eps_filter(start_date, end_date, datachoose):
    data = Eps_preprocess(start_date, end_date)#EPS預測資料前處理
    data.drop(["GrossProfit", "OPIncomeLoss", "PreTaxIncome"], axis = 1, inplace = True)
    if datachoose == 0:
        print("透過篩選器篩出要用的EPS樣本")
        return data
    
    elif datachoose == 1:
        Strong = get_Strong_filter(start_date, end_date)#強勢成長股的篩選器
        data = data.merge(Strong[["SheetDate", "Ticker"]],
                        on = ["SheetDate", "Ticker"], how = "inner")
    elif datachoose == 2:
        Long = get_Long_filter(start_date, end_date)#長期成長股的篩選器
        data = data.merge(Long[["SheetDate", "Ticker"]],
                        on = ["SheetDate", "Ticker"], how = "inner")
    
    print("透過篩選器篩出要用的EPS樣本")
    return data

In [None]:
#'''EPS預測模型1'''
def rev_1(EPS4, EPSG, REV_G):#成長率大於70%
    eps_pred = (EPS4 * ((1 + EPSG) * 2 + (1 + REV_G))) / 3
    return eps_pred

#'''EPS預測模型2'''
def rev_0(EPS4, EPSG, REV_G, w3):#成長率不大於70％
    eps_pred = (EPS4 * ((1 + EPSG) * (2 + w3) + (1 + REV_G))) / (3 + w3)
    return eps_pred

In [None]:
#'''EPS預測'''

#datachoose: 0 => 所有個股
#datachoose: 1 => 強勢成長股
#datachoose: 2 => 長期成長股
def eps_predict(w1 = 0.3, w2 = 0.7, w3 = 1, datachoose = 0):
    data = eps_filter(start_date, end_date, datachoose)

    weights = [w1, w2, w3]

    #EPSG上限只到30％
    data["EPSG_5y"] = [weights[0] if i >= weights[0] else i for i in data["EPSG_5y"]]

    #營收成長大於70％，則比重為2:1
    REV_data = (data["REV_G"] >= weights[1]).astype(int)#確認哪些樣本大於70％

    #model
    data["EPS_pred"] = [rev_1(EPS4 = data["EPS_1_4Q"][i], EPSG = data["EPSG_5y"][i],\
                              REV_G = data["REV_G"][i]) if REV_data[i] == 1 else\
                        rev_0(EPS4 = data["EPS_1_4Q"][i], EPSG = data["EPSG_5y"][i], \
                              REV_G = data["REV_G"][i], w3 = weights[2])\
                        for i in range(len(data))]
    
    print("EPS預測")
    return data

In [None]:
#'''EPS預測之加分條件'''

#datachoose: 0 => 所有個股
#datachoose: 1 => 強勢成長股
#datachoose: 2 => 長期成長股
def eps_predict_add(w1 = 0.3, w2 = 0.7, w3 = 1, datachoose = 0):
    data1 = eps_predict(w1, w2, w3, datachoose)
    data2 = get_Ratio_data(start_date, end_date)


    #填補Ratio的缺失值
    #創建完整時間軸
    date_check = pd.date_range(start = str(start_date), end = str(end_date), freq = "Q")

    Ticker = list(set(data1["Ticker"]) & set(data2["Ticker"]))
    #生成每個標的所對應的日期，3個標的、5天，產生15個sample
    date_check = list(itertools.product(Ticker, date_check))
    date_check = pd.DataFrame(date_check, columns=["Ticker", "SheetDate"])
    #填補缺失的時間
    data2 = date_check.merge(data2, on = ["Ticker", "SheetDate"], how = "left")
    #填補每個標的之缺失值
    data2 = data2.groupby("Ticker").apply(lambda X: X.fillna(method="bfill").fillna(method="ffill"))


    #當期毛利率 > 近12季平均 ＝> EPS + 5%
    #當期ROE > 近12季平均 ＝> EPS + 2%
    data3 = data2.set_index("SheetDate")
    data3 = data3.groupby("Ticker").rolling(12).mean()#近12季平均 
    data3.reset_index(inplace = True)

    data2["Gross12"] = data3["GrossMargin"]
    data2["ROE12"] = data3["ROE"]

    data2.dropna(inplace = True)#刪除前12季
    data2["Gross12"] = (data2["GrossMargin"] > data2["Gross12"]).astype(int)#當期 > 平均
    data2["ROE12"] = (data2["ROE"] > data2["ROE12"]).astype(int)#當期 > 平均

    #當季毛利率、ROE > 近12季平均
    data2["Gross12"] = data2["Gross12"].replace(1, 0.05)#EPS + 5%
    data2["ROE12"] = data2["ROE12"].replace(1, 0.02)#EPS + 2%


    data1 = data1.merge(data2[["SheetDate", "Ticker", "ROE12", "Gross12"]],
                    on = ["SheetDate", "Ticker"], how = "left")
    data1.fillna(0, inplace = True)


    #EPS預測之加分項
    data1["EPS_pred_add"] = data1["EPS_pred"] * (1 + data1["Gross12"] + data1["ROE12"])

    
    print("EPS預測之加分條件")
    return data1

In [None]:
#'''估算誤差'''

#datachoose: 0 => 所有個股
#datachoose: 1 => 強勢成長股
#datachoose: 2 => 長期成長股
def evaluate_erro(w1 = 0.3, w2 = 0.7, w3 = 1, datachoose = 0):
    data = eps_predict_add(w1 = w1, w2 = w2, w3 = w3, datachoose = datachoose)
    data["error"] = abs( (data["EPS_pred"] - data["EPS_next4Q"]) / abs(data["EPS_next4Q"]) )
    data["error_add"] = abs( (data["EPS_pred_add"] - data["EPS_next4Q"]) / abs(data["EPS_next4Q"]) )

    data.replace(float("inf"), float("nan"), inplace = True)#算百分比時分母為0會出現inf
    data.replace(float("-inf"), float("nan"), inplace = True)
    data.dropna(subset = ["error", "error_add"], inplace = True)
    data.reset_index(inplace = True, drop = True)

    mae = mean_absolute_error(data["EPS_next4Q"], data["EPS_pred"])
    mse = mean_squared_error(data["EPS_next4Q"], data["EPS_pred"])

    mae_add = mean_absolute_error(data["EPS_next4Q"], data["EPS_pred_add"])
    mse_add = mean_squared_error(data["EPS_next4Q"], data["EPS_pred_add"])
    
    print("估算誤差")
    return data, mae, mse, mae_add, mse_add

In [None]:
#datachoose: 0 => 所有個股
#datachoose: 1 => 強勢成長股
#datachoose: 2 => 長期成長股
data_ = eps_filter(start_date, end_date, 0)

data_.drop(data_[data_["EPS_next4Q"] == 0].index, axis = 0, inplace = True)#分母為0，會算出inf
data_.dropna(inplace = True)#最後四比沒有未來值，可能有五筆，如果介於財報公佈期間
data_.reset_index(drop = True, inplace = True)

In [None]:
#'''權重優化'''
Weights = []
Error_Q1, Error_Q2, Error_Q3 = [], [], []
MAE, MSE = [], []
for i in range(10000):
    data = copy.deepcopy(data_)

    w1 = random.uniform(0.01, 1)
    w2 = random.uniform(0.01, 1)
    w3 = random.randint(2,8)

    wg1 = random.uniform(0.01, 1)
    wg2 = random.uniform(0.01, 1 - wg1)
    wg3 = random.uniform(0.01, 1 - wg1 - wg2)
    wg4 = 1 - wg1 - wg2 - wg3
    

    #EPSG上限只到30％
    data["EPSG_5y"] = ((data["EPS_1_4Q"] - data["EPS_5_8Q"]) / abs(data["EPS_5_8Q"]) * wg1 +\
                       (data["EPS_5_8Q"] - data["EPS_9_12Q"]) / abs(data["EPS_9_12Q"]) * wg2 +\
                       (data["EPS_9_12Q"] - data["EPS_13_16Q"]) / abs(data["EPS_13_16Q"]) * wg3 +\
                       (data["EPS_13_16Q"] - data["EPS_17_20Q"]) / abs(data["EPS_17_20Q"]) * wg4)
    data["EPSG_5y"] = [w1 if i >= w1 else i for i in data["EPSG_5y"]]

    #營收成長大於70％，則比重為2:1
    REV_data = (data["REV_G"] >= w2).astype(int)#確認哪些樣本大於70％

    #model
    data["EPS_pred"] = [rev_1(EPS4 = data["EPS_1_4Q"][i], EPSG = data["EPSG_5y"][i],\
                              REV_G = data["REV_G"][i]) if REV_data[i] == 1 else\
                        rev_0(EPS4 = data["EPS_1_4Q"][i], EPSG = data["EPSG_5y"][i], \
                              REV_G = data["REV_G"][i], w3 = w3)\
                        for i in range(len(data))]

    data["error"] = abs( (data["EPS_pred"] - data["EPS_next4Q"]) / abs(data["EPS_next4Q"]) ) 

    data.replace(float("inf"), float("nan"), inplace = True)#算百分比時分母為0會出現inf
    data.replace(float("-inf"), float("nan"), inplace = True)
    if data.isnull().sum().sum() != 0:
        print("has missing value")
        break

    data.sort_values(by = ["error"], inplace = True)
    error_Q1, error_Q2, error_Q3 = np.quantile(data["error"], 0.25) , np.quantile(data["error"], 0.5), np.quantile(data["error"], 0.75)

    mae = mean_absolute_error(data["EPS_next4Q"], data["EPS_pred"])
    mse = mean_squared_error(data["EPS_next4Q"], data["EPS_pred"])
    
    
    weights = [round(w1, 5), round(w2, 5), round(w3, 5),
               round(wg1, 5), round(wg2, 5), round(wg3, 5), round(wg4, 5)]
    Weights.append(weights)
    Error_Q1.append(error_Q1)
    Error_Q2.append(error_Q2)
    Error_Q3.append(error_Q3)
    MAE.append(mae)
    MSE.append(mse)
    print(i, weights)
    print(" ", "Error_Q1:", round(np.min(Error_Q1), 4), "Error_Q2:", round(np.min(Error_Q2), 4),
          "Error_Q3: ", round(np.min(Error_Q3), 4), "MAE: ", round(np.min(MAE), 4), 
          "MSE: ", round(np.min(MSE), 4), "\n")

data1 = pd.DataFrame()
data1["Weights"] = Weights
data1["Error_Q1"] = Error_Q1
data1["Error_Q2"] = Error_Q2
data1["Error_Q3"] = Error_Q3
data1["MAE"] = MAE
data1["MSE"] = MSE
data1.to_csv("error_wg.csv", index = False)

In [None]:
print("MAE:", round(data["MAE"].min(), 4))
print("MSE:", round(data["MSE"].min(), 4))
print("Error_Q1:", round(data["Error_Q1"].min(), 4))
print("Error_Q2:", round(data["Error_Q2"].min(), 4))
print("Error_Q3:", round(data["Error_Q3"].min(), 4))

# 收斂圖

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline
%matplotlib qt

data = pd.read_csv("error_wg.csv")

sample = len(data)
plt.figure(figsize=(15, 4))
plt.subplot(1,2,1)
plt.title("MAE", fontsize=20)
plt.xlabel("sapmle")
plt.ylabel("MAE")
plt.yscale('log')
plt.plot(range(0, sample), data["MAE"].cummin())

plt.subplot(1,2,2)
plt.title("MSE", fontsize=20)
plt.xlabel("sapmle")
plt.ylabel("MSE")
plt.yscale('log')
plt.plot(range(0, sample), data["MSE"].cummin())
plt.show()

plt.figure(figsize=(30, 6))
plt.subplot(1,3,1)
plt.title("Error_Q1", fontsize=30)
plt.xlabel("sapmle", fontsize=20)
plt.ylabel("Error_Q1", fontsize=20)
plt.yscale('log')
plt.plot(range(0, sample), data["Error_Q1"].cummin())

plt.subplot(1,3,2)
plt.title("Error_Q2", fontsize=30)
plt.xlabel("sapmle", fontsize=20)
plt.ylabel("Error_Q2", fontsize=20)
plt.yscale('log')
plt.plot(range(0, sample), data["Error_Q2"].cummin())

plt.subplot(1,3,3)
plt.title("Error_Q3", fontsize=30)
plt.xlabel("sapmle", fontsize=20)
plt.ylabel("Error_Q3", fontsize=20)
plt.yscale('log')
plt.plot(range(0, sample), data["Error_Q3"].cummin())
plt.show()