In [1]:
import pandas as pd
import numpy as np
from datetime import datetime

In [2]:
data = pd.read_csv('Cumulative_data.csv', low_memory=False)
outstanding_stocks = pd.read_csv('Outstanding_stocks.csv')

In [3]:
stock_list = outstanding_stocks[outstanding_stocks['上市別'] == 'TSE']
stock_list = stock_list.copy()
stock_list['股票代號'] = stock_list['股票代號'].apply(lambda x: str(x))

avaliable_date_set = list(data['Date'].unique())
avaliable_date_set.sort()

In [4]:
# 確保資料最少有60天
calculation_range = avaliable_date_set[60:]

In [5]:
collection = pd.DataFrame()
top_number = 30

for calculation_date in calculation_range:
    end_index = avaliable_date_set.index(calculation_date)
    target_date_set = set(avaliable_date_set[end_index - 59: end_index + 1])
    
    indicator_result = pd.DataFrame()
    # inner for loop
    for i in range(len(stock_list)):
        current_code = stock_list.iloc[i]['股票代號']
        current_company_name = stock_list.iloc[i]['股票名稱']

        current_data = data[data['股票代號'] == current_code]
        current_data = current_data[current_data.Date.isin(target_date_set)]
        current_outstanding_stocks = outstanding_stocks[
            outstanding_stocks['股票代號'] == int(current_code)
        ]['淨流通在外張數'].values[0]

        assert len(current_data) <= 61, '資料重覆請檢查，否則指標將失真！'

        if len(current_data) < 60:
            missing_date = set(target_date_set).difference(set(current_data.Date))
            supplementary_data = pd.DataFrame(
                0, index=range(len(missing_date)), columns=current_data.columns
            )
            supplementary_data['Date'] = missing_date
            current_data = pd.concat([current_data, supplementary_data])

        current_data.fillna(0, inplace=True)
        current_data.sort_values('Date', inplace=True)

        current_data["當日指標餘額"] = (
            current_data['外陸資買賣超股數(不含外資自營商)'] +
            current_data['投信買賣超股數'] -
            current_data['借券張數'] - 
            current_data['融資張數'] + 
            current_data['融券張數']
        )

        for num in [5, 10, 20, 60]:
            column_name = '近{}日指標比例'.format(num)
            current_data[column_name] = list(map(
                lambda x: round(x, 2),
                current_data['當日指標餘額'].rolling(num).sum() / current_outstanding_stocks * 100
            ))

        temp_result = current_data[[
            'Date', '股票代號',
            '外陸資買賣超股數(不含外資自營商)', '投信買賣超股數', '借券張數', '融資張數', '融券張數',
            '當日指標餘額', '近5日指標比例', '近10日指標比例', '近20日指標比例', '近60日指標比例'
        ]]

        temp_result = pd.DataFrame(temp_result.iloc[-1, ]).T
        temp_result['股票代號'] = current_code
        temp_result.insert(2, '股票名稱', current_company_name)
        indicator_result = pd.concat([indicator_result, temp_result])

    indicator_result = indicator_result.rename(columns={
        '外陸資買賣超股數(不含外資自營商)': '外陸資買賣超張數',
        '投信買賣超股數': '投信買賣超張數'
    })
    
    # 建立各排序方式(大致小)前30名名單
    sort_by_five = indicator_result.sort_values("近5日指標比例", ascending=False).iloc[
        :top_number, 
    ]
    sort_by_five.index = range(1, top_number+1)

    sort_by_twenty = indicator_result.sort_values("近20日指標比例", ascending = False).iloc[
        :top_number, 
    ]
    sort_by_twenty.index = range(1, top_number+1)

    sort_by_sixty = indicator_result.sort_values("近60日指標比例", ascending = False).iloc[
        :top_number, 
    ]
    sort_by_sixty.index = range(1, top_number+1)
    
    # 將符合條件者合併起來
    qualified_companies = pd.concat([sort_by_five, sort_by_twenty, sort_by_sixty])
    
    qualified_companies.drop_duplicates(['Date', '股票代號'], inplace=True)

    qualified_companies['Date'] = qualified_companies['Date'].apply(
        lambda x: datetime.strptime(str(x), '%Y%m%d')
    )
    qualified_companies.rename(columns={'Date': '日期'}, inplace=True)
    
    collection = pd.concat([collection, qualified_companies])
    
    print(calculation_date)

In [6]:
collection.to_csv('Exchange_indicator_collection.csv', index=False)