In [627]:
import pandas as pd
import os
import numpy as np
from datetime import datetime, timedelta
import datetime as dt
from pylab import mpl, plt
import plotly.graph_objects as go

In [628]:
group = pd.read_csv('group.csv')
stock = pd.read_csv('stock.csv')
df = pd.merge(stock,group,on='gid',how='inner')

In [629]:
group

Unnamed: 0,gid,gcname,gname
0,1,水泥工業類,Cement
1,2,食品工業類,Foods
2,3,塑膠工業類,Plastics
3,4,紡織纖維類,Textiles
4,5,電機機械類,Electric Machinery
5,6,電器電纜類,Electrical and Cable
6,8,玻璃陶瓷類,Glass and Ceramics
7,9,造紙工業類,Paper and Pulp
8,10,鋼鐵工業類,Iron and Steel
9,11,橡膠工業類,Rubber


In [632]:
stock.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 949 entries, 0 to 948
Data columns (total 3 columns):
 #   Column  Non-Null Count  Dtype 
---  ------  --------------  ----- 
 0   gid     949 non-null    int64 
 1   sid     949 non-null    int64 
 2   sname   949 non-null    object
dtypes: int64(2), object(1)
memory usage: 22.4+ KB


In [635]:
949 * 250 * 10 / 10 / 60 / 60

65.90277777777777

In [631]:
df[ df['gid'] ==1][0:20]

Unnamed: 0,gid,sid,sname,gcname,gname
0,1,1101,台泥,水泥工業類,Cement
1,1,1102,亞泥,水泥工業類,Cement
2,1,1103,嘉泥,水泥工業類,Cement
3,1,1104,環泥,水泥工業類,Cement
4,1,1108,幸福,水泥工業類,Cement
5,1,1109,信大,水泥工業類,Cement
6,1,1110,東泥,水泥工業類,Cement


In [642]:
dfs = []
header=1
footer=5
sid = "2330"

start_date = datetime(2015, 1, 1)
end_date = datetime(2025, 7, 1)

current_date = start_date
while current_date <= end_date:
    year=current_date.strftime("%Y")
    month=current_date.strftime("%m")
    filename = f"STOCK_DAY_{sid}_{year}{month}.csv"  # Formats 1 as '01', 2 as '02', ..., 12 as '12'
    folder_path = f'twse/STOCK_DAY/{sid}/{year}'
    file_path = os.path.join(folder_path, filename)
    try:
#        print(file_path)
        df = pd.read_csv(file_path,skiprows=header, skipfooter=footer,engine='python')
        df = df.dropna(axis=1, how='all')
        df['Date'] = pd.to_datetime(df['Date'], format='%Y/%m/%d')
        dfs.append(df)
    except FileNotFoundError:
        print(f"File not found: {file_path}")
    except EmptyDataError:
        print(f"Empty Data Error: {file_path}")

    if current_date.month == 12:
        current_date = current_date.replace(year=current_date.year + 1, month=1)
    else:
        current_date = current_date.replace(month=current_date.month + 1)

# Optionally, concatenate all DataFrames
df = pd.concat(dfs, ignore_index=True)
df.set_index('Date', inplace=True)

In [643]:
df.info()

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 2559 entries, 2015-01-05 to 2025-07-31
Data columns (total 8 columns):
 #   Column         Non-Null Count  Dtype 
---  ------         --------------  ----- 
 0   Trade Volume   2559 non-null   object
 1   Trade Value    2559 non-null   object
 2   Opening Price  2559 non-null   object
 3   Highest Price  2559 non-null   object
 4   Lowest Price   2559 non-null   object
 5   Closing Price  2559 non-null   object
 6   Change         2559 non-null   object
 7   Transaction    2559 non-null   object
dtypes: object(8)
memory usage: 179.9+ KB


In [648]:
df.index.strftime('%Y%m%d').to_series().to_csv('index_date.csv',index=False, header=False)

In [None]:
df.describe()

In [None]:
# Visual
plt.style.use('seaborn-v0_8')
mpl.rcParams['font.family'] = 'serif'

In [None]:
# Trading Strategy : SMA
data = (pd.DataFrame(df['Closing Price']).dropna())
SMA1 = 42
SMA2 = 252

#SMA1 = 60
#SMA2 = 270
symbol = 'Closing Price'
data['SMA1'] = data[symbol].rolling(SMA1).mean()
data['SMA2'] = data[symbol].rolling(SMA2).mean()

data.plot(figsize=(10, 6));

In [None]:
data['Position'] = np.where(data['SMA1'] > data['SMA2'], 1, -1)

In [None]:
ax = data.plot(secondary_y='Position', figsize=(10, 6))
#ax.get_legend().set_bbox_to_anchor((0.25, 0.85));

In [None]:
### Vectorized Backtesting

In [None]:
# Initialize list to store trades
trades = []

dp = data.reset_index()

# Iterate through the DataFrame to detect position changes
for i in range(1, len(data)):
    prev_pos = dp.loc[i-1, 'Position']
    curr_pos = dp.loc[i, 'Position']
    curr_val = dp.loc[i, 'Closing Price']
    
    if prev_pos == -1 and curr_pos == 1:
        trades.append({'date': dp.loc[i,'Date'], 'action': 'buy', 'price': curr_val})
    elif prev_pos == 1 and curr_pos == -1:
        trades.append({'date': dp.loc[i,'Date'], 'action': 'sell', 'price': curr_val})

# Calculate profit/loss
profit = 0
for i in range(0, len(trades)-1, 2):
    if trades[i]['action'] == 'buy' and trades[i+1]['action'] == 'sell':
        profit += trades[i+1]['price'] - trades[i]['price']

# Display trades and profit
trades_df = pd.DataFrame(trades)
print("Trade Actions:")
print(trades_df)
print(f"\nTotal Profit/Loss: {profit}")


In [None]:
data['Returns'] = np.log(data[symbol] / data[symbol].shift(1))  
data['Strategy'] = data['Position'].shift(1) * data['Returns']  
data.dropna(inplace=True)
#np.exp(data[['Returns', 'Strategy']].sum())  
#data[['Returns', 'Strategy']].std() * 252 ** 0.5  
ax = data[['Returns', 'Strategy']].cumsum(
        ).apply(np.exp).plot(figsize=(10, 6))
data['Position'].plot(ax=ax, secondary_y='Position', style='--')
ax.get_legend().set_bbox_to_anchor((0.25, 0.85));

In [None]:
### Optimization

In [None]:
from itertools import product
sma1 = range(20, 61, 4)  
sma2 = range(180, 281, 10)  

In [None]:
results = pd.DataFrame()
symbol = 'Closing Price'
for SMA1, SMA2 in product(sma1, sma2):  
    data = pd.DataFrame(df[symbol])
    data.dropna(inplace=True)
    data['Returns'] = np.log(data[symbol] / data[symbol].shift(1))
    data['SMA1'] = data[symbol].rolling(SMA1).mean()
    data['SMA2'] = data[symbol].rolling(SMA2).mean()
    data.dropna(inplace=True)
    data['Position'] = np.where(data['SMA1'] > data['SMA2'], 1, -1)
    data['Strategy'] = data['Position'].shift(1) * data['Returns']
    data.dropna(inplace=True)
    perf = np.exp(data[['Returns', 'Strategy']].sum())
    results = pd.concat((results, pd.DataFrame(
                {'SMA1': SMA1, 'SMA2': SMA2,
                 'MARKET': perf['Returns'],
                 'STRATEGY': perf['Strategy'],
                 'OUT': perf['Strategy'] - perf['Returns']},
                 index=[0])), ignore_index=True)  

In [None]:
results.sort_values('OUT', ascending=False).head(7)