In [1]:
import requests
import pandas as pd
import json
import numpy as np
from datetime import datetime, date
import csv
import time
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
pd.options.mode.chained_assignment = None  # default='warn'

In [2]:
def get_twse_stock_db_info():
    link = 'http://www.twse.com.tw/exchangeReport/BWIBBU_ALL?response=open_data'
    df = pd.read_csv(link, encoding='utf_8_sig')
    return df
    
def get_twse_stock_info(df, stock):
    target_data = df[df["股票代號"] == int(stock)]
    name = target_data.iloc[0]['股票名稱']
    priceEarningRatio = target_data.iloc[0]['本益比']
    yieldRatio = target_data.iloc[0]['殖利率(%)']
    priceBookRatio = target_data.iloc[0]['股價淨值比']
    name, priceEarningRatio, yieldRatio, priceBookRatio
    return name, priceEarningRatio, yieldRatio, priceBookRatio

def get_otc_stock_db_info():
    link = 'http://www.tpex.org.tw/openapi/v1/tpex_mainboard_peratio_analysis'
    json_data = requests.get(link).json()
    df = pd.DataFrame.from_records(json_data)
    return df
    
def get_otc_stock_info(df, stock):
    target_data = df[df['SecuritiesCompanyCode'] == stock]
    name = target_data.iloc[0]['CompanyName']
    priceEarningRatio = target_data.iloc[0]['PriceEarningRatio']
    dividendPerShare = target_data.iloc[0]['DividendPerShare']
    yieldRatio = target_data.iloc[0]['YieldRatio']
    priceBookRatio = target_data.iloc[0]['PriceBookRatio']
    name, priceEarningRatio, yieldRatio, priceBookRatio
    return name, priceEarningRatio, yieldRatio, priceBookRatio

def get_time_input(yy, mm, dd, days = 100):
    time_list = []
    month_num = days//20
    for i in range(month_num):
        yyy = yy
        mmm = mm
        if (mm - i > 0):
            yyy = yy
            mmm = mm - i
        else:
            yyy = yy-1
            mmm = 12 - (i - mm)
        time_list.append([yyy, mmm])
    time_list.reverse()
    return time_list, [yy, mm, dd]

In [3]:
def string_with_comma_to_int(x):
    return int(x.replace(",", ""))

def string_with_comma_to_float(x):
    try:
        return float(x.replace(",", ""))
    except:
        return 0
    
def string_to_float(x):
    try:
        return float(x)
    except:
        return 0
    
def vol_for_twse(x):
    try:
        return round(float(x.replace(",", ""))/1000)
    except:
        return 0

def moving_average(x, w):
    return np.convolve(x, np.ones(w), "valid") / w

def get_stock_volumn_price(yy, mm, dd, stock_tag, last_date):
    date_tag = date_tag = str(yy) + str(mm).zfill(2) + "01"
    url = 'http://www.twse.com.tw/exchangeReport/STOCK_DAY?response=open_data&date=%s&stockNo=%s'%(date_tag, stock_tag)
    try:
        df = pd.read_csv(url, encoding='utf_8_sig')
    except:
        return None
    # ["日期","成交股數","成交金額","開盤價","最高價","最低價","收盤價","漲跌價差","成交筆數"]
    df_target = df[['日期', '成交股數', '收盤價']]
    df_target.iloc[:, 1] = df_target.iloc[:, 1].apply(vol_for_twse) # volumn, 成交張數 = 成交股數 / 1000
    df_target.iloc[:, 2] = df_target.iloc[:, 2].apply(string_to_float) # price
    df_target.rename(columns = {'成交股數':'成交張數'}, inplace = True)
    return df_target

def get_otc_stock_volumn_price(yy, mm, dd, stock_tag, last_date):
    yy = yy - 1911
    url = 'http://www.tpex.org.tw/web/stock/aftertrading/daily_trading_info/st43_result.php?d=%s/%s/%s&stkno=%s'%(yy, mm, dd, stock_tag)
    json_data = requests.get(url).json()
    # ['日期', '成交張數', '成交金額', '開盤價', '最高價', '最低價', '收盤價', '漲跌價差', '成交筆數']
    columns = ['日期', '成交張數', '成交金額', '開盤價', '最高價', '最低價', '收盤價', '漲跌價差', '成交筆數']
    df = pd.DataFrame(json_data['aaData'], columns=columns)
    df_target = df[['日期', '成交張數', '收盤價']]
    df_target.iloc[:, 1] = df_target.iloc[:, 1].apply(string_with_comma_to_float) # volumn
    df_target.iloc[:, 2] = df_target.iloc[:, 2].apply(string_with_comma_to_float) # price
    return df_target

In [4]:
def get_time_duration_stock_info(time_data, stock_tag, min_volumn = 150, ma_num = 5, isOtc = False):
    df_all = pd.DataFrame()
    time_list, last_date = get_time_input(time_data[0], time_data[1], time_data[2])
    for time_item in time_list:
        year = time_item[0]
        month = time_item[1]
        if isOtc:
            df = get_otc_stock_volumn_price(year, month, "01", stock_tag, last_date)
        else:
            df = get_stock_volumn_price(year, month, "01", stock_tag, last_date)
        if df is None:
            continue
        time.sleep(0.15)
        df_all = pd.concat([df_all, df], axis=0)
        # yy = time_item[0]
        # for mm in range(time_item[1], time_item[2] + 1):
        #     if isOtc:
        #         df = get_otc_stock_volumn_price(yy, mm, "01", stock_tag)
        #     else:
        #         df = get_stock_volumn_price(yy, mm, "01", stock_tag)
        #         if df is None:
        #             continue
        #         time.sleep(0.15)
        #     df_all = pd.concat([df_all, df], axis=0)
    
    # drop out of range date
    df_all = df_all.reset_index()
    df_all.drop(df_all.columns[0], axis=1, inplace = True)
    drop_num = 0
    for i in range(len(df_all)):
        try:
            row_idx = len(df_all) - i - 1
            row_date =df_all.loc[row_idx, "日期"]
            row_date = row_date.replace('*', "")
            row_yy = int(row_date.split('/')[0]) + 1911
            row_mm = int(row_date.split('/')[1])
            row_dd = int(row_date.split('/')[2])
            if row_yy >= last_date[0] and row_mm >= last_date[1] and row_dd > last_date[2]:
                drop_num = drop_num + 1
        except:
            # print(row_date)
            continue
    df_all.drop(df_all.tail(drop_num).index, inplace = True)
      
    df_np = df_all.to_numpy().copy()
    
    if len(df_np) == 0:
        return None, None
    # if df_np[:, 1].max() > min_volumn:
    #     df_np[:, 1][df_np[:, 1] < min_volumn] = min_volumn
    # if df_np[:, 1].max() == df_np[:, 1].min():
    #     return None, None
    # if df_np[:, 2].max() == df_np[:, 2].min():
    #     return None, None
    
    # df_np[:, 1] = (df_np[:, 1] - df_np[:, 1].min()) / ((df_np[:, 1].max() - df_np[:, 1].min()))
    # df_np[:, 2] = (df_np[:, 2] - df_np[:, 2].min()) / ((df_np[:, 2].max() - df_np[:, 2].min()))
    
    # df_vol_ma = moving_average(df_np[:, 1], ma_num)
    # df_pri_ma = moving_average(df_np[:, 2], ma_num)
    df_vol_ma = df_np[:, 1]
    df_pri_ma = df_np[:, 2]
    
    vol_data = df_vol_ma
    pri_data = df_pri_ma
    return vol_data, pri_data, df_all

def calculate_target(vol_data, pri_data):
    aa = -7
    bb = -28
    if vol_data[bb:aa].mean() == 0:
        return False
    if pri_data[bb:aa].mean() == 0:
        return False
    try:
        # vol_valid = vol_data[aa:].mean() / vol_data[bb:aa].mean() > 5
        # vol_valid = np.median(vol_data[aa:]) / np.median(vol_data[bb:aa]) > 7
        vol_med_factor = 0.5
        vol_cur_factor = 0.5
        vol_short = ((1 - vol_cur_factor) * (vol_med_factor * np.median(vol_data[aa:]) + (1 - vol_med_factor) * np.mean(vol_data[aa:])) + vol_cur_factor * vol_data[-1])
        # vol_short = (vol_med_factor * np.median(vol_data[aa:]) + (1 - vol_med_factor) * np.mean(vol_data[aa:]))
        vol_long = (vol_med_factor * np.median(vol_data[bb:aa]) + (1 - vol_med_factor) * np.mean(vol_data[bb:aa]))
        vol_valid = vol_short / vol_long > 4.3
        vol_valid = vol_valid and (vol_short > 800)
        # previous_high_valid = pri_data[-10:].max() / pri_data[-1] <= 1.08
        # previous_high_valid = previous_high_valid & (pri_data[-20:].max() / pri_data[-1] < 1.05)
        # pri_valid = pri_data[aa:].mean() / pri_data[bb:aa].mean() > 1.03
        # pri_ma = moving_average(pri_data, 4)
        # trend_valid =  pri_ma[-8:].mean()/pri_ma[-88:-80].mean() > 0.8
        pri_5ma = moving_average(pri_data, 5)
        pri_10ma = moving_average(pri_data, 10)
        pri_20ma = moving_average(pri_data, 20)
        pri_60ma = moving_average(pri_data, 60)
        # trend_valid = (pri_5ma[-1]/pri_10ma[-1] > 0.98) and (pri_5ma[-1]/pri_20ma[-1] > 1.01) and (pri_5ma[-1]/pri_60ma[-1] > 1.02)
        ma_pos_valid = (pri_data[-1] / pri_5ma[-1] >= 0.95) and (pri_data[-1] / pri_10ma[-1] >= 0.98) and (pri_data[-1] / pri_20ma[-1] >= 1.00) and (pri_data[-1] / pri_60ma[-1] >= 1.00)
        ma_close_valid = (pri_5ma[-1]) / (pri_10ma[-1]) <= 1.08 and (pri_5ma[-1]) / (pri_10ma[-1]) > 0.94
        ma_close_valid = ma_close_valid and (pri_10ma[-1]) / (pri_20ma[-1]) <= 1.07 and (pri_10ma[-1]) / (pri_20ma[-1]) > 0.95
    except:
        return False
    # print(vol_valid, trend_valid, ma_pos_valid)
    return vol_valid and ma_pos_valid and ma_close_valid


def calculate_target_trend(vol_data, pri_data):
    data_min_num = 70
    if len(vol_data) < data_min_num or len(pri_data) < data_min_num:
        # print("Not enough data amount")
        return False
    aa = -7
    bb = -28
    if vol_data[bb:aa].mean() == 0:
        return False
    if pri_data[bb:aa].mean() == 0:
        return False
    
    pos_valid = True
    vol_valid = True
    trend_valid = True
    
    try:
        vol_valid = vol_valid and np.max(vol_data[aa:-1]) > 600
        pri_cur = pri_data[-1]
        pri_5ma = moving_average(pri_data, 5)
        pri_10ma = moving_average(pri_data, 10)
        pri_20ma = moving_average(pri_data, 20)
        pri_60ma = moving_average(pri_data, 60)
        
        pos_valid = pos_valid and pri_cur / pri_5ma[-1] <= 1.04 and pri_cur / pri_5ma[-1] >= 0.97
        pos_valid = pos_valid and pri_cur / pri_10ma[-1] <= 1.04 and pri_cur / pri_10ma[-1] >= 0.97
        pos_valid = pos_valid and pri_5ma[-1] / pri_10ma[-1] <= 1.04 and pri_5ma[-1] / pri_10ma[-1] >= 0.975
        pos_valid = pos_valid and pri_10ma[-1] / pri_20ma[-1] <= 1.06 and pri_10ma[-1] / pri_20ma[-1] >= 0.98
        pos_valid = pos_valid and pri_cur / pri_20ma[-1] <= 1.07 and pri_cur / pri_20ma[-1] >= 1
        pos_valid = pos_valid and pri_10ma[-1] / pri_60ma[-1] >= 1.04
        
        trend_valid = trend_valid and pri_20ma[-1] / pri_20ma[-8] >= 0.985
        trend_valid = trend_valid and pri_60ma[-1] / pri_60ma[-10] >= 1.035
    except:
        # print("?")
        return False
    # print(vol_valid , pos_valid , trend_valid)
    return vol_valid and pos_valid and trend_valid

In [5]:
def analyze_stock(time_data, isOtc):
    if isOtc:
        df = get_otc_stock_db_info()
        data_path = r".\db\otc.csv"
    else:
        df = get_twse_stock_db_info()
        data_path = r".\db\twse.csv"

    stock_list = []
    with open(data_path, newline='', encoding='utf_8_sig') as csvfile:
        line_list = csv.reader(csvfile)
        for line in line_list:
            stock_list.append(line[0])
            
    target_list = []

    for stock in stock_list:
        vol_data, pri_data, df_all = get_time_duration_stock_info(time_data, stock, min_volumn=150, ma_num=1, isOtc=isOtc)
        if vol_data is None or pri_data is None:
            continue
        # valid = calculate_target(vol_data, pri_data)
        valid = calculate_target_trend(vol_data, pri_data)
        if valid:
            if isOtc:
                name, priceEarningRatio, yieldRatio, priceBookRatio = get_otc_stock_info(df, stock)
                target_list.append([stock, name, priceEarningRatio, yieldRatio, priceBookRatio, pri_data[-1], 1])
                print(stock, name, priceEarningRatio, yieldRatio, priceBookRatio, pri_data[-1], 1)
            else:
                name, priceEarningRatio, yieldRatio, priceBookRatio = get_twse_stock_info(df, stock)
                target_list.append([stock, name, priceEarningRatio, yieldRatio, priceBookRatio, pri_data[-1], 0])
                print(stock, name, priceEarningRatio, yieldRatio, priceBookRatio, pri_data[-1], 0)
    return target_list

In [6]:
time_data = [2024, 4, 15]

In [7]:
target_list = analyze_stock(time_data, isOtc = False)

1319 東陽 25.0 3.14 2.96 127.5 0
1535 中宇 17.13 4.28 2.61 77.1 0
1615 大山 18.82 4.14 3.35 60.4 0
1773 勝一 29.87 1.7 5.41 188.5 0
2330 台積電 24.93 1.61 6.04 806.0 0
2404 漢唐 16.76 5.05 6.43 416.0 0
2467 志聖 39.72 2.32 6.26 129.5 0
2535 達欣工 12.56 5.23 1.65 57.4 0
2539 櫻花建 33.27 2.7 5.39 74.2 0
2850 新產 9.76 5.54 1.62 90.0 0
3356 奇偶 17.31 5.23 2.46 55.4 0
3413 京鼎 14.93 3.93 2.56 305.0 0
3563 牧德 54.64 1.75 4.57 400.5 0
3622 洋華 17.6 3.03 1.52 66.0 0
3708 上緯投控 10.05 3.88 1.71 129.0 0
4915 致伸 15.88 4.69 2.32 85.3 0
5234 達興材料 30.49 2.64 5.1 155.5 0
5519 隆大 10.75 5.55 1.61 39.65 0
6176 瑞儀 15.55 5.67 2.39 176.5 0
6189 豐藝 22.71 3.86 3.57 90.6 0
6414 樺漢 19.59 3.4 2.02 335.5 0
6442 光聖 53.95 1.54 4.41 136.5 0
6605 帝寶 15.36 2.96 2.14 219.5 0
6806 森崴能源 50.34 1.01 2.87 148.0 0
6869 雲豹能源-創 16.06 4.36 4.17 140.0 0
9911 櫻花 17.92 4.42 3.21 87.8 0


In [8]:
target_list_otc = analyze_stock(time_data, isOtc = True)

1785 光洋科 26.64 3.08 2.25 48.75 1
3207 耀勝 78.29 0.66 6.00 137.0 1
3232 昱捷 130.00 1.54 2.17 32.5 1
3289 宜特 25.15 2.88 2.93 129.5 1
3663 鑫科 88.61 1.25 2.94 47.85 1
3680 家登 45.85 1.60 5.45 469.5 1
5383 金利 N/A 0.00 6.23 39.8 1
5432 新門 119.15 0.60 9.00 168.0 1
5493 三聯 13.62 3.49 2.24 88.8 1
6021 美好證 52.43 3.27 0.99 18.35 1
6208 日揚 20.70 0.00 2.53 62.5 1
6219 富旺 18.96 0.00 3.06 38.3 1
6274 台燿 59.67 2.20 4.26 182.0 1
8027 鈦昇 301.88 0.52 4.01 96.6 1
8064 東捷 30.31 2.41 1.62 29.1 1
8091 翔名 22.66 4.43 2.10 135.5 1
8936 國統 28.58 1.82 3.60 82.3 1


In [9]:
with open(r"D:\Stock\daily_result\%s%s%s_trend.csv"%(time_data[0], str(time_data[1]).zfill(2), str(time_data[2]).zfill(2)), 'a', newline='') as csvfile:
    writer = csv.writer(csvfile)
    writer.writerows([time_data])
    writer.writerows([['ID', 'Name', 'PE', 'Yield', 'PB', 'Price', 'isOTC']])
    writer.writerows(target_list)
    writer.writerows(target_list_otc)

In [18]:
# target_list = []
# time_data = [2024, 3, 8]

# vol_data, pri_data, df = get_time_duration_stock_info(time_data, 3402, min_volumn=150, ma_num=1, isOtc=True)
# vol_data, pri_data
# # valid = calculate_target(vol_data, pri_data)
# valid = calculate_target_trend(vol_data, pri_data)