In [1]:

import numpy as np
import pandas as pd
from deap import base, creator, tools
from functools import reduce
import finlab
import random
import os
import pickle
from tqdm import tqdm
from finlab import data
from finlab.backtest import sim
from finlab.dataframe import FinlabDataFrame
from finlab.portfolio import Portfolio

#這裡要替換成自己的FinLab VIP token
token = ''
finlab.login(token)

Running locally, skipping Colab-specific code.
輸入成功!


## 準備因子(Feature)
### dataset:存放你的基因,多個基因會組合成一個染色體,我們稱染色體為基因的載體
- 基因的dataframe的column可能不同,ex:營收類的資料column就是從1101開始;技術面的會從0015開始,會包含etf

In [None]:
# 技術面
close = data.get("etl:adj_close")  # 還原收盤價
close = close.ffill()  # 等同於 method='pad'
close_pct_change = close.pct_change()
漲停 = round(close_pct_change * 100, 2) > 9
成交金額 = data.get("price:成交金額")
成交股數 = data.get("price:成交股數")

ma5 = close.average(5)
ma10 = close.average(10)
ma15 = close.average(15)
ma20 = close.average(20)
ma60 = close.average(60)
月季線差 = ma20 / ma60

ema5 = data.indicator("EMA", adjust_price=True, resample="D", timeperiod=5)
ema10 = data.indicator("EMA", adjust_price=True, resample="D", timeperiod=10)
ema15 = data.indicator("EMA", adjust_price=True, resample="D", timeperiod=15)
ema20 = data.indicator("EMA", adjust_price=True, resample="D", timeperiod=20)
ema30 = data.indicator("EMA", adjust_price=True, resample="D", timeperiod=30)
ema60 = data.indicator("EMA", adjust_price=True, resample="D", timeperiod=60)
ema月季線差 = ema20 / ema60


def wma(price, n):
    return price.ewm(com=n).mean()


def zlma(price, n):
    lag = (n - 1) // 2
    series = 2 * price - price.shift(lag)
    return wma(series, n)


bias_sma_250 = close / close.rolling(250).mean() - 1
bias_zlma_120 = close / close.apply(lambda s: zlma(s, 120)) - 1
bias_wma_60 = close / close.apply(lambda s: wma(s, 60)) - 1
slope1_sma_60 = close.rolling(60).mean().pipe(lambda df: df / df.shift(60) - 1)
kurtosis_250 = close.pct_change().rolling(250).kurt()
kurtosis_120 = close.pct_change().rolling(120).kurt()


# 基本面
市值 = data.get("etl:market_value")
股本 = data.get("financial_statement:股本")
vol_stock = data.get("price:成交股數")
vol = data.get("price:成交金額")
vol_ma20 = vol.average(20)
vol_stock_ma20 = vol_stock.average(20)
rev = data.get("monthly_revenue:當月營收")
rev_year_growth = data.get("monthly_revenue:去年同月增減(%)")
rev_month_growth = data.get("monthly_revenue:上月比較增減(%)")
rev_3ma = rev.average(3)  # 近3月平均營收
rev_4ma = rev.average(5)  # 近5月平均營收
rev_2ma = rev.average(2)
每股盈餘 = data.get("financial_statement:每股盈餘")

df1 = data.get("financial_statement:投資活動之淨現金流入_流出")
df2 = data.get("financial_statement:營業活動之淨現金流入_流出")
自由現金流 = (df1 + df2).rolling(4).mean()
rd_ratio = data.get("fundamental_features:研究發展費用率")
pm_ratio = data.get("fundamental_features:管理費用率")
eq_ratio = data.get("fundamental_features:淨值除資產").deadline()
rd_pm = rd_ratio / pm_ratio
eq_price = eq_ratio / close.reindex(eq_ratio.index, method="ffill")
rebalance = eq_price.index

稅後淨利 = data.get("fundamental_features:經常稅後淨利")
權益總計 = data.get("financial_statement:股東權益總額")
股東權益報酬率 = 稅後淨利 / 權益總計

營業利益成長率 = data.get("fundamental_features:營業利益成長率")
當月營收 = data.get("monthly_revenue:當月營收") * 1000
當季營收 = 當月營收.rolling(4).sum()
市值營收比 = 市值 / 當季營收
融資使用率 = data.get("margin_transactions:融資使用率")


# 大盤
收盤指數 = data.get("market_transaction_info:收盤指數")
收盤指數季線 = 收盤指數.average(60)
收盤指數年線 = 收盤指數.average(240)
收盤指數半年線 = 收盤指數.average(120)
收盤指數月線 = 收盤指數.average(20)
收盤指數週季線 = 收盤指數.average(300)
TAIEX_週季線下 = 收盤指數["TAIEX"] < 收盤指數週季線["TAIEX"]

全市場前50趴 = round(1830 * 0.5)
全市場前30趴 = round(1830 * 0.3)
成交量成長 = (成交股數.pct_change() * 100).fill_method = None
# dataset: 放你的基因
dataset = {
    "價格動量5日E均線": ((close - ema5) / ema5).is_largest(300),
    "價格動量10日E均線": ((close - ema10) / ema10) >= 0.05,
    "價格動量15日E均線": ((close - ema15) / ema15) >= 0.05,
    "價格動量20日E均線": ((close - ema20) / ema20) >= 0.05,
    "價格動量30日E均線": ((close - ema30) / ema30) >= 0.05,
    "價格動量60日E均線": ((close - ema60) / ema60) >= 0.05,
    "價格動量20日E均線市場前30%": ((close - ema20) / ema20).is_largest(全市場前30趴),
    "價格動量20日均線市場後30%": ((close - ema20) / ema20).is_smallest(全市場前30趴),
    "月季線ema差": ema月季線差 < 1.2,
    "月營收年增率30": rev_year_growth > 30,
    "月營收年增率40": rev_year_growth > 40,
    "月營收年增率50": rev_year_growth > 50,
    "月營收年增率60": rev_year_growth > 60,
    "月營收月增率100": rev_month_growth > 100,
    "月營收月增前50%": rev_year_growth.is_largest(全市場前50趴),
    "月營收月增前30%": rev_year_growth.is_largest(全市場前30趴),
    "市值小於100億": (市值 < 1e10),
    "股東權益報酬正數": 股東權益報酬率 > 0,
    "股東權益報酬前50%": 股東權益報酬率.is_largest(全市場前50趴),
    "股東權益報酬正數5趴": 股東權益報酬率 > 5,
    "股東權益報酬正數10趴": 股東權益報酬率 > 10,
    "股東權益報酬負數": 股東權益報酬率 < 0,
    "二十日均高於六十日E均": (ema20 >= ema60),
    "二十日均高於六十日E均差20趴": (ema20 >= ema60) & (ema20 * 1.2 <= ema60),
    "二十日均高於六十日E均差": (ema20 / ema60).is_smallest(100),
    "十五日均高於六十日E均": (ema15 >= ema60),
    "十日均高於六十日E均": (ema10 >= ema60),
    "十日均高於三十日E均": (ema10 >= ema30),
    "五日均高於六十日E均": ema5 >= ema60,
    "五日均高於二十日E均": ema5 >= ema20,
    "五日均高於三十日E均": ema5 >= ema30,
    "均線條件_5": (close > close.average(5)),
    "均線條件_10": (close > close.average(10)),
    "均線條件_15": (close > close.average(15)),
    "均線條件_20": (close > close.average(20)),
    "均線條件_60": (close > close.average(60)),
    "均線條件_120": (close > close.average(120)),
    "均線條件_240": (close > close.average(240)),
    "近3月平均營收創12月新高": rev_3ma == rev_3ma.rolling(12, min_periods=6).max(),
    "近3月平均營收創24月新高": rev_3ma == rev_3ma.rolling(24, min_periods=12).max(),
    "近3月平均營收創36月新高": rev_3ma == rev_3ma.rolling(36, min_periods=12).max(),
    "近2月平均營收創12月新高": rev_2ma == rev_2ma.rolling(12, min_periods=6).max(),
    "近2月平均營收創24月新高": rev_2ma == rev_2ma.rolling(24, min_periods=12).max(),
    "近2月平均營收創36月新高": rev_2ma == rev_2ma.rolling(36, min_periods=12).max(),
    "近4月平均營收創12月新高": rev_4ma == rev_4ma.rolling(12, min_periods=6).max(),
    "近4月平均營收創24月新高": rev_4ma == rev_4ma.rolling(24, min_periods=12).max(),
    "近4月平均營收創36月新高": rev_4ma == rev_4ma.rolling(36, min_periods=12).max(),
    "當月營收創12月新高": rev == rev.rolling(12, min_periods=6).max(),
    "當月營收創24月新高": rev == rev.rolling(24, min_periods=12).max(),
    "當月營收創36月新高": rev == rev.rolling(36, min_periods=12).max(),
    "每股盈餘好": 每股盈餘 >= 1,
    "成交金額超過1千萬": 成交金額.average(5) > 10000000,
    "成交量大於1000": round(成交股數 / 1000) >= 1000,
    "連創5日創3日高": ((close / close.rolling(3).max()) == 1).sustain(5),
    "連續5日不創低": ((close / close.rolling(3).min()) > 1).sustain(5),
    "連續5日不創5日低": ((close / close.rolling(5).min()) > 1).sustain(5),
    "連續3日不創低": ((close / close.rolling(3).min()) > 1).sustain(3),
    "連續4日不創低": ((close / close.rolling(3).min()) > 1).sustain(4),
    "研發費用與管理費用比值好": (rd_pm.deadline().rank(axis=1, pct=True) > 0.5),
    "量大於200": 成交股數 > 200000,
    "均量": (成交股數.average(10) > 成交股數.average(60)),
}

# selected_keys = [
#     '價格動量20日均線', '月營收年增率50', '市值小於100億',
#     '股東權益報酬正數', '二十日均高於六十日均', '近3月平均營收創12月新高'
# ]


# 將所有的資料轉換成boolean，並確保index是datetime，並且轉換成FinlabDataFrame
for key, value in dataset.items():  # key是條件名稱,value是dataframe
    if value.index.dtype == "O":  # 判斷dataframe的index是不是日期格式
        dataset[key] = value.deadline().fillna(False).astype(bool)
    else:
        dataset[key] = value.fillna(False).astype(bool)
    dataset[key] = FinlabDataFrame(dataset[key])

# 把條件數目作為基因長度
individual_size = len(dataset)
# 把dataset的key取出來，之後會用到
keys = list(dataset.keys())

print("資料抓取結束")

## cross_condition解說

### 1.cross_condition(conditions, weight)
```python
# 對選定的基因取交集
def cross_condition(conditions, weight):
    cond_temp = [conditions[i] for i in weight]
    return reduce(lambda i, j: i & j, cond_temp)
```
在提供的程式碼片段中，cross_condition 函數接收兩個參數：conditions 和 weight。

- conditions: 這是一個包含各種交易條件的字典。每個鍵值對代表一個特定的交易條件，例如「價格低動量5日均線」。
- weight: 這是一個索引列表。它指定了要從 conditions 字典中選取哪些條件。weight 的作用是作為一個篩選器，用於從 conditions 字典中選擇特定的交易條件。

- 函數的運作方式如下：  
cond_temp = [conditions[i] for i in weight]: 這一行程式碼使用列表推導式，根據 weight 列表中的索引，從 conditions 字典中選取對應的交易條件，並將它們儲存在 cond_temp 列表中。
return reduce(lambda i, j: i & j, cond_temp): 這一行程式碼使用 reduce 函數和一個匿名函數（lambda 函數），將 cond_temp 列表中的所有交易條件進行邏輯 AND (&) 運算。換句話說，它會返回一個新的布林序列，只有當 weight 中指定的所有條件都為真時，結果才為真。
總結來說，weight 在 cross_condition 函數中扮演著關鍵的角色，它決定了哪些交易條件會被納入最終的交易策略中。 這在基因演算法的背景下非常重要，因為 weight 可以被視為個體的基因，通過調整 weight 的值，可以探索不同的交易策略組合。

---
### 2.gene_to_RNA(gene, keys)
```python
# 把基因轉換成條件
def gene_to_RNA(gene, keys):
    return [key for key, g in zip(keys, gene) if g == 1]
```
這個函數的功能是將基因轉換為 RNA (Ribonucleic acid，核糖核酸)。在這個上下文中，基因是一個由 0 和 1 組成的列表，而 RNA 是一個包含交易條件名稱的列表。

- 輸入:  
    - gene: 一個由 0 和 1 組成的列表，代表個體的基因。例如：[0, 1, 1, 0, 0]。  
    - keys: 一個包含所有交易條件名稱的列表。例如：['價格低動量5日均線', '價格低動量10日均線', ...] 。
- 輸出:  
    - 一個包含被選中的交易條件名稱的列表。

- 運作方式:  
zip(keys, gene): 將 keys 列表和 gene 列表中的元素一一配對。  
if g == 1: 檢查基因中的值是否為 1。  
[key for key, g in zip(keys, gene) if g == 1]: 如果基因中的值為 1，則將對應的交易條件名稱添加到結果列表中。  
簡而言之，gene_to_RNA() 函數的功能是根據基因中為 1 的位置，找出對應的交易條件。  
---

### 3.RNA_to_gene(RNA, keys)
```python
# 把條件轉換成基因
def RNA_to_gene(RNA, keys):
    return [1 if key in RNA else 0 for key in keys]
```
這個函數的功能與 gene_to_RNA() 相反，它將 RNA 轉換回基因。

輸入:
RNA: 一個包含交易條件名稱的列表。例如：['價格低動量10日均線', '價格低動量20日均線']。
keys: 一個包含所有交易條件名稱的列表。例如：['價格低動量5日均線', '價格低動量10日均線', ...] 。
輸出:
一個由 0 和 1 組成的列表，代表個體的基因。
運作方式:

if key in RNA: 檢查交易條件名稱是否在 RNA 列表中。
[1 if key in RNA else 0 for key in keys]: 如果交易條件名稱在 RNA 列表中，則在基因中對應的位置設為 1，否則設為 0。
簡而言之，RNA_to_gene() 函數的功能是根據被選中的交易條件，在基因中對應的位置設為 1。



In [None]:
#讀檔用的
def load_data(filename, default_data):
    print(f'讀取 {filename}.pkl...')
    if os.path.exists(filename + '.pkl'):
        with open(filename + '.pkl', 'rb') as f:
            print(f'{filename}.pkl 讀取成功')
            return pickle.load(f)
    print(f'{filename}.pkl 不存在，將使用預設值')
    return default_data
#存檔用的
def save_data(data, filename):
    print(f'保存 {filename}.pkl...')
    with open(filename + '.pkl', 'wb') as f:
        pickle.dump(data, f)
    print(f'{filename}.pkl 保存成功')
# 對選定的基因取交集
def cross_condition(conditions, weight):
    cond_temp = [conditions[i] for i in weight]
    return reduce(lambda i, j: i & j, cond_temp)
# 把基因轉換成條件
def gene_to_RNA(gene, keys):
    return [key for key, g in zip(keys, gene) if g == 1]
# 把條件轉換成基因
def RNA_to_gene(RNA, keys):
    return [1 if key in RNA else 0 for key in keys]

# 
# 設定提早換股 , 將每個 rev_date 向前移動3個交易日，用於提早賣出
def get_previous_trade_days(target_dates, trade_days, n=5):
    # 找到 target_dates 在 trade_days 中的索引
    indices = trade_days.searchsorted(target_dates) # 找到目標日期在交易日中的位置
    previous_days = [trade_days[max(i - n, 0)] for i in indices] # 向前取n天
    return pd.DatetimeIndex(previous_days)


print("func定義完成")

- creator.create("FitnessMax", base.Fitness, weights=(1.0,)):  
    - 這一行使用 DEAP 函式庫的 creator 模組來創建一個名為 FitnessMax 的新類別。  
    這個類別繼承自 base.Fitness，用於定義適應度 (Fitness) 的類型。  
    **weights=(1.0,) 表示這是一個最大化問題，適應度的權重為正數。**

- creator.create("Individual", list, fitness=creator.FitnessMax):
    - 這一行使用 creator 模組創建另一個名為 Individual 的新類別。  
    這個類別繼承自 list，表示個體的基因將以列表的形式儲存。  
    fitness=creator.FitnessMax 將之前創建的 FitnessMax 類別指派給 Individual 類別，表示每個個體都有一個 FitnessMax 類型的適應度屬性。

- toolbox = base.Toolbox(): 
    - 這一行創建了一個 base.Toolbox 物件，並將其指派給變數 toolbox。  
    Toolbox 可以被視為一個工具箱，用於註冊和管理基因演算法中使用的各種函數，例如 **個體生成、適應度評估、交配和突變等操作**。

- toolbox.register("attr_bool", random.randint, 0, 1):
    - 這一行在 toolbox 中註冊了一個名為 "attr_bool" 的新函數。  
    這個函數使用 Python 的 random.randint 函數來生成 0 或 1 的隨機整數。這個函數將被用來隨機產生個體的基因。

- toolbox.register("individual", tools.initRepeat, creator.Individual, toolbox.attr_bool, n=individual_size):  
    - 這一行在 toolbox 中註冊了一個名為 "individual" 的新函數。  
    這個函數使用 tools.initRepeat 函數來重複調用之前註冊的 "attr_bool" 函數 individual_size 次，  
    以創建一個 creator.Individual 物件（即一個個體）。  
    individual_size 變數應該在程式碼的其他地方定義，它決定了個體的基因長度。

- toolbox.register("population", tools.initRepeat, list, toolbox.individual): 
    - 這一行在 toolbox 中註冊了一個名為 "population" 的新函數。  
    這個函數使用 tools.initRepeat 函數來重複調用之前註冊的 "individual" 函數，以創建一個包含多個個體的列表，即一個族群。

| **總結來說，這段程式碼使用 DEAP 函式庫設定了基因演算法的基本組件，包括適應度類型、個體結構和用於創建個體及族群的工具。**

In [None]:
# 設定演算法要求最大值
creator.create("FitnessMax", base.Fitness, weights=(1.0,))

# 設定個體的基因格式為List，並且包含FitnessMax的屬性
creator.create("Individual", list, fitness=creator.FitnessMax)
# 設定toolbox，工具箱用於註冊和管理演算法中使用的各種函數和操作
toolbox = base.Toolbox()
#註冊一個名為"attr_bool"的屬性生成器。使用Python的random.randint函數，生成0或1的隨機整數。
toolbox.register("attr_bool", random.randint, 0, 1)
#註冊一個用於創建個體的函數。使用tools.initRepeat函數重複調用attr_bool函數來創建一個個體。
toolbox.register("individual", tools.initRepeat, creator.Individual, toolbox.attr_bool, n=individual_size)
#註冊一個用於創建整個族群的函數。使用tools.initRepeat函數重複調用toolbox.individual函數來創建一個族群。
toolbox.register("population", tools.initRepeat, list, toolbox.individual)





# 用個體基因計算適應度
def objective(gene,keys):

    # 基因跟基因組成RNA(單一策略相組成一個主策略)
    RNA = gene_to_RNA(gene,keys)
    position_temp = cross_condition(dataset.copy(), RNA)

    # 月營收截止日換股
    position_temp = position_temp.reindex(rev.index_str_to_date().index, method='ffill')

    
    # ========== 設定提早換股 ========
    # 獲取提前的賣出日期
    rev_date = rev.index_str_to_date().index 
    close_date = close.index_str_to_date().index
    aligned_close_date = get_previous_trade_days(rev_date, close_date, n=3)
    aligned_close_date = aligned_close_date.drop_duplicates()

    # 同樣使用hold_until設定提早賣出
    # 買入信號不變
    buy_signals = position_temp
    # 賣出信號提前3天
    sell_signals = position_temp.reindex(aligned_close_date, method="ffill")
    # 使用hold_until設定買入持有到賣出
    position_temp = buy_signals.hold_until(sell_signals)

    # ========== 設定提早換股 End ========

    # 檢查每年平均交易次數
    每年平均交易次數 = (abs(position_temp.diff())>0).sum().sum()/(float((position_temp.index.max() - position_temp.index.min()).days//365))
    if 每年平均交易次數 < 50:
        print(f"交易次數過少，每年平均交易次數: {每年平均交易次數}")
        return (0,)
    
    # 最多不超過33%
    report = sim(position_temp.loc['2022-01-01':'2023-01-10'], upload=False, position_limit=1/5, fee_ratio=1.425/1000/3,  trade_at_price='open')
    rs = report.get_stats()
    met =  report.get_metrics()
    年化報酬率 = round(rs['cagr'] * 100, 3)
    最大回落 = round(rs['max_drawdown'] * 100, 3)
    日索提諾 = round(rs['daily_sortino'], 3)
    胃納量 = round(met['liquidity']['capacity']/10000,3)
    胃納量大於百萬 = 胃納量 > 100
    # 求好績效fitness
    fitness = 年化報酬率/abs(最大回落) * 日索提諾 * 胃納量大於百萬 #胃納大於百萬true/false (1,0)
    # fitness = 年化報酬率* 胃納量大於百萬 #胃納大於百萬true/false (1,0)

    print(f'{RNA}')
    print(f'年化報酬率: {年化報酬率}%, 最大回落: {最大回落}%, 日索提諾: {日索提諾}, 適應度: {fitness}, 胃納量: {胃納量}萬')
    return (fitness,)

# 菁英選擇,預設每個族群選兩個最優因子
def elitist_selection(population, k, elite_size=2):
    elites = tools.selBest(population, elite_size)
    rest = tools.selTournament(population, k - elite_size, tournsize=3)
    return elites + rest



# 註冊一個用於評估整個族群的函數。使用objective函數來評估個體。
toolbox.register("evaluate", objective)
# 註冊一個用於交叉的函數。使用tools.cxUniform函數來實現均勻交叉。
toolbox.register("mate", tools.cxUniform)
# 註冊一個用於突變的函數。使用tools.mutFlipBit函數來實現基因突變。
toolbox.register("mutate", tools.mutFlipBit, indpb=0.05)
#註冊一個用於選擇的函數。
# toolbox.register("select", tools.selTournament, tournsize=5) #使用tools.selTournament函數來實現錦標賽選擇。
toolbox.register("select", elitist_selection, elite_size=5) # 使用Elite 選擇


#評估整個全體的function
def evaluate(offspring, pop, evaled_ind, evaled_ind_empty, keys):
    dict_count = 0
    empty_count = 0
    for idx,ind in enumerate(offspring):
        #先將基因轉換成條件
        RNA = str(gene_to_RNA(ind,keys))
        #如果該個體的適應度還沒被計算過，則計算適應度
        if not ind.fitness.valid:
            #如果該個體的適應度已經被計算過，則直接取用
            if RNA in evaled_ind:
                ind.fitness.values = (evaled_ind[RNA],)
                offspring[idx] = ind
                dict_count += 1
                print(f'使用字典中的適應度:{evaled_ind[RNA]}')
            #如果該個體的適應度為0，則直接取用
            elif RNA in evaled_ind_empty:
                ind.fitness.values = (0,)
                offspring[idx] = ind
                empty_count += 1
                print('個體適應度為0')
            #否則計算適應度
            else:
                fit = objective(ind, keys)
                ind.fitness.values = fit
                if fit[0] > 0:
                    evaled_ind[RNA] = fit[0]
                else:
                    evaled_ind_empty.append(RNA)
    print('使用字典中的適應度數量:{}'.format(dict_count))
    print('個體適應度為0數量:{}'.format(empty_count))

    return tools.selBest(pop + offspring, len(pop)), evaled_ind, evaled_ind_empty

# 根據經驗，全亂數的個體不容易找到好的解，所以先創建有一定機率為1的個體
def create_population(n):
    print('重新創建pop...')
    population = []
    for _ in range(n):
        individual = np.zeros(individual_size, dtype=int)
        # 隨機選擇5~10個基因為1
        buy_count = np.random.randint(5, 11)
        buy_indices = random.sample(range(len(dataset)), buy_count)
        for idx in buy_indices:
            individual[idx] = 1
        individual = creator.Individual(individual.tolist())
        population.append(individual)
    return population

print("基因演算法定義完成")

## pop跟gen的數量問題？
### pop * gen = 你探索過的解的數量
- 小pop,高gen  
    - 優點:快速收斂
    - 缺點:容易進入局部最佳解
- 高pop,低gen
    - 優點:探索範圍廣,局部最佳解的品質有比較高的機率更好
    - 缺點:收斂很慢,要跑比較久才有結果

**推薦使用高pop低gen**
- population size通常是妳條件數量的2~3倍, ex: dataset有40個條件,pop就是80~120  


In [None]:
#設定族群大小,通常用基因數量的2~3倍
pop_size = len(dataset) * 3
#打算演化多少代
gen_limit = 15



load_pre_evaled = False
if load_pre_evaled:
    #讀取之前的存檔，沒有就創建新的族群
    pop = load_data('pop', create_population(pop_size))
    #讀取存檔
    best_fitness = load_data('best_fitness', [])
    evaled_ind = load_data('evaled_ind', {})
    evaled_ind_empty = load_data('evaled_ind_empty', [])

else: 
    pop = create_population(pop_size)
    best_fitness =[]
    evaled_ind={}
    evaled_ind_empty=[]
# try:
# #減少網路依賴，避免長久運行跳掉。
#     data.use_local_data_only = True
#     df = data.get('etl:adj_high')
# except Exception:
# #本地端沒有資料則撈雲端的
#     data.use_local_data_only = False
#     df = data.get('etl:adj_high')
data.use_local_data_only = True
df = data.get('etl:adj_high')
print("df:",df)
try:
    gen = 0
    with tqdm(desc="Generations", unit="gen") as pbar:
        #判斷甚麼時候停止
        while gen<gen_limit:
            offspring = toolbox.select(pop, len(pop))
            offspring = list(map(toolbox.clone, offspring))
            #把族群按單雙分開，然後彼此有70%的機會交配
            for child1, child2 in zip(offspring[::2], offspring[1::2]):
                if random.random() < 0.7:
                    toolbox.mate(child1, child2, 0.5)
                    del child1.fitness.values
                    del child2.fitness.values
            #所有的後代有30%的機會進行突變
            for mutant in offspring:
                if random.random() < 0.3:
                    toolbox.mutate(mutant)
                    del mutant.fitness.values
            #把整個後代群送進evaluate去評分
            pop, evaled_ind, evaled_ind_empty = evaluate(offspring, pop, evaled_ind, evaled_ind_empty, keys)

            #看看第一名考幾分
            max_fit = max([ind.fitness.values[0] for ind in pop])
            best_fitness.append(max_fit)

            save_data(best_fitness, 'best_fitness')
            save_data(pop, 'pop')
            save_data(evaled_ind, 'evaled_ind')
            save_data(evaled_ind_empty, 'evaled_ind_empty')

            # 更新 tqdm 的描述
            tqdm.write(f'Generation {gen} completed, Max Fitness: {max_fit:.4f}')

            # 更新進度條
            pbar.update(1)
            gen += 1
#萬一出問題了，趕快存檔。比較好debug。
except :
    save_data(pop, 'pop')
    save_data(evaled_ind, 'evaled_ind')
    save_data(evaled_ind_empty, 'evaled_ind_empty')
    save_data(best_fitness, 'best_fitness')
    raise

# 找到適應度最高的個體
best_individual = max(pop, key=lambda ind: ind.fitness.values[0])
# 印出最佳個體的基因與適應度
print("最佳個體的基因:", best_individual)
print("最佳個體的適應度:", best_individual.fitness.values[0])
# 對應基因與 keys，僅保留基因為 1 的 keys
selected_keys = [key for gene, key in zip(best_individual, keys) if gene == 1]


In [None]:
# =========== 加入hold_until ==========
# 抓上面適應度最好的基因放到selected_keys
position_temp = reduce(lambda x, y: x & y, (dataset[key] for key in selected_keys))

# 月營收截止日換股
position_temp = position_temp.reindex(rev.index_str_to_date().index, method='ffill')



def getMaxFitness(position, limit):

    conditions = {
        "成值前十": 成交金額[position].is_largest(limit),
        "成值後十": 成交金額[position].is_smallest(limit),
        '市值前十': 市值[position].is_largest(limit),
        '市值後十': 市值[position].is_smallest(limit),
        '股本前十': 股本[position].is_largest(limit),
        '股本後十': 股本[position].is_smallest(limit),
        '成值二十日平均前十': vol_ma20[position].is_largest(limit),
        '成值二十日平均後十': vol_ma20[position].is_smallest(limit),
        '營收yoy成長前十': rev_year_growth[position].is_largest(limit),
        '營收yoy成長後十': rev_year_growth[position].is_smallest(limit),
        '二十日均距離六十日小': 月季線差[position].is_smallest(limit),
        '二十日均距離六十日大': 月季線差[position].is_largest(limit),
    }
    results = []
    for key, value in conditions.items():
        report = sim(value, upload=False, position_limit=1/5, fee_ratio=1.425/1000/3, trade_at_price='open')
        rs = report.get_stats()
        met = report.get_metrics()
        年化報酬率 = round(rs['cagr'] * 100, 3)
        最大回落 = round(rs['max_drawdown'] * 100, 3)
        日索提諾 = round(rs['daily_sortino'], 3)
        胃納量 = round(met['liquidity']['capacity']/10000, 3)
        fitness = 年化報酬率/abs(最大回落) * 日索提諾
        print(f"{key} - fitness: {fitness} , 胃納量: {胃納量} 萬")
        # 將條件、適應度和結果存入 results
        results.append((key, value, fitness))

    # 找到 fitness 最大的條件
    best_condition = max(results, key=lambda x: x[2])  # x[2] 是 fitness
    print(f"最佳條件: {best_condition[0]}，Fitness: {best_condition[2]}")
    # 返回最大 fitness 的 value
    return best_condition[1]


# ==== 定義出場時間
rev_date = rev.index_str_to_date().index 
close_date = close.index_str_to_date().index
aligned_close_date = get_previous_trade_days(rev_date, close_date, n=3)
aligned_close_date = aligned_close_date.drop_duplicates()

# ===== strategy1 =========
buy_signals = position_temp
# 賣出信號提前3天
sell_signals = position_temp.reindex(aligned_close_date, method="ffill")
# 使用hold_until設定買入持有到賣出
position_temp = buy_signals.hold_until(sell_signals)
best_pos = getMaxFitness(position_temp, 10)


report = sim(best_pos,upload=False, position_limit=1/5, fee_ratio=1.425/1000/3, trade_at_price='open', name="小傑")

report.display()
report.display_mae_mfe_analysis()

# ======end strategy1 =========
