In [None]:
import concurrent.futures, random, math, time, os, json, pickle, datetime, webbrowser, itertools, re
from urllib.parse import urljoin
import requests, pandas as pd
from tqdm import tqdm
# ───────────────────── 基本設定 ─────────────────────
BASE_URL            = 'https://api.worldquantbrain.com'
LOGIN_ENDPOINT      = f'{BASE_URL}/authentication'
SIMULATION_ENDPOINT = f'{BASE_URL}/simulations'
ALPHA_ENDPOINT      = f'{BASE_URL}/alphas'
BASE_PATH  = 'SAVE_PATH'
SESSION_PKL = os.path.join(BASE_PATH, 'SESSION_PATH')
CRED_FILE   = os.path.join(BASE_PATH, 'CRED_PATH')
s = requests.Session()
# ───── 讀 / 登入 session ─────
def login_and_save():
    with open(CRED_FILE, 'r') as f:
        s.auth = tuple(json.load(f))
    r = s.post(LOGIN_ENDPOINT)
    if r.status_code == 401 and r.headers.get("WWW-Authenticate") == "persona":
        url = urljoin(r.url, r.headers["Location"])
        print("Open biometric page:", url); webbrowser.open(url)
        input("完成 biometrics 後按 Enter")
        s.post(url)
    with open(SESSION_PKL, 'wb') as f:
        pickle.dump(s, f)
if os.path.exists(SESSION_PKL):
    with open(SESSION_PKL, 'rb') as f:
        s = pickle.load(f)
    if time.time() - os.path.getmtime(SESSION_PKL) > 4*3600:
        login_and_save()
else:
    login_and_save()
session_start = datetime.datetime.fromtimestamp(os.path.getmtime(SESSION_PKL))
print("Session start:", session_start)
# ───────────────────── tqdm-safe 輔助 ─────────────────────
def bar_msg(bar: tqdm, txt: str):
    """在單一 bar 尾端刷字，不換行。"""
    bar.set_postfix_str(txt, refresh=True)
# ───────────────────── API simulate ─────────────────────
def simulate(sess: requests.Session, payload: dict, bar: tqdm,
             max_retries=3, wait_seconds=5):
    for i in range(max_retries+1):
        try:
            r = sess.post(SIMULATION_ENDPOINT, json=payload)
            if   r.status_code == 429:
                bar_msg(bar, f"POST429 wait{wait_seconds}s ({i+1})")
                time.sleep(wait_seconds); continue
            elif r.status_code not in (200, 201):
                bar_msg(bar, f"POSTerr {r.status_code}"); return False, {}
            break
        except Exception as e:
            bar_msg(bar, f"POSTexc {e}"); return False, {}
    else:
        bar_msg(bar, "POST give-up"); return False, {}
    prog_url = r.headers['Location']
    while True:
        try:
            g = sess.get(prog_url)
            if g.status_code == 429:
                bar_msg(bar, "GETprog429"); time.sleep(wait_seconds); continue
            if g.headers.get('Retry-After', '0') == '0': break
            time.sleep(2.5)
        except Exception as e:
            bar_msg(bar, f"GETprogExc {e}"); return False, {}
    try:
        alpha_id = g.json().get('alpha')
        if not alpha_id: return False, {}
        for i in range(max_retries+1):
            r2 = sess.get(f"{ALPHA_ENDPOINT}/{alpha_id}")
            if r2.status_code == 429:
                bar_msg(bar, f"GETα429 ({i+1})"); time.sleep(wait_seconds); continue
            if r2.status_code != 200:
                bar_msg(bar, f"GETαerr {r2.status_code}"); return False, {}
            break
        else:
            return False, {}
        return True, r2.json()
    except Exception as e:
        bar_msg(bar, f"finalExc {e}"); return False, {}
def is_valid(js):
    try:
        return all(c['result']!='FAIL' for c in js['is']['checks'])
    except: return False
def extract_placeholders(template):
    return set(re.findall(r'<(.*?)>', template))

In [None]:
template_space_chose = random.choice([0, 1, 2, 3])
payload_mode = "template_space"  # "template_space" / "fixed_expression"
# ──────── 抽取範圍設定 ────────
expression_queue = []
settings_queue = []
universe_choices = ['TOP3000', 'TOP1000', 'TOP500', 'TOP200', 'TOPSP500'] # 'TOP3000', 'TOP1000', 'TOP500', 'TOP200', 'TOPSP500'
decay_choices = [0, 1, 5, 10, 20, 30, 60] # 0, 1, 5, 10, 20, 30, 60
delay_choices = [1] # 0, 1
truncation_choices = [0.01, 0.02, 0.05, 0.08] # 0.01, 0.02, 0.05, 0.08
neutralization_choices = ['MARKET', 'SECTOR', 'INDUSTRY', 'SUBINDUSTRY'] # 'MARKET', 'SECTOR', 'INDUSTRY', 'SUBINDUSTRY'

genes = ['decay', 'delay', 'truncation', 'universe', 'neutralization']
try:
    with open('templates_spaces.json', 'r', encoding='utf-8') as f:
        template_space_list = json.load(f)

    if not (0 <= template_space_chose < len(template_space_list)):
        raise IndexError(f"template_space_chose = {template_space_chose} 超出範圍")

    current_tpl_item = template_space_list[template_space_chose]
    required_keys = {"template", "data_space", "shared_space"}
    if not required_keys.issubset(current_tpl_item):
        missing = required_keys - current_tpl_item.keys()
        raise KeyError(f"模板缺少必要欄位：{missing}")

    current_template = current_tpl_item["template"]
    template_to_spaces = {
        current_template: (
            current_tpl_item["data_space"],
            current_tpl_item["shared_space"]
        )
    }
    shared_space_global = current_tpl_item["shared_space"]

except Exception as e:
    print(f"[錯誤] 載入 template 失敗：{e}")
    template_to_spaces = {}
    shared_space_global = {}
    current_template = ""

fixed_expressions = [
"""
data1 = ts_backfill(mdl77_put_put_indfcfp, 10);
data2 = ts_backfill(mdl77_garpanalystmodel_qgp_relgrowth, 10);
data1_gp = group_zscore(data1, bucket(rank(log(sharesout)), range="0,1,0.1"));
data2_gp = group_zscore(data2, bucket(rank(log(sharesout)), range="0,1,0.1"));
diff = subtract(data1_gp, data2_gp);
-ts_decay_linear(group_rank(diff, bucket(rank(cap), range="0,1,0.1")), 5)
""",
"""
data1 = ts_backfill(mdl77_2historicalgrowthfactor_y3fcfq4rqsr, 252);
data2 = ts_backfill(mdl77_ohistoricalgrowthfactor_pfcghc, 252);
data1_gp = group_zscore(data1, industry);
data2_gp = group_zscore(data2, industry);
diff = subtract(data1_gp, data2_gp);
-ts_mean(group_neutralize(diff, bucket(rank(log(sharesout)), range="0,1,0.1")), 21)
""",
"""
data1 = ts_backfill(mdl77_2deepvaluefactor_cashsev, 10);
data2 = ts_backfill(mdl77_2liquidityriskfactor_bap20d, 10);
data1_gp = group_zscore(data1, bucket(ts_delta(close, 5), range="0,1,0.1"));
data2_gp = group_zscore(data2, bucket(ts_delta(close, 5), range="0,1,0.1"));
diff = subtract(data1_gp, data2_gp);
-ts_mean(group_neutralize(diff, bucket(ts_delta(close, 5), range="0,1,0.1")), 10)
""",
"""
data1 = ts_backfill(mdl77_2earningsqualityfactor_rau, 10);
data2 = ts_backfill(mdl77_earningsqualityfactor_vniu, 10);
data1_gp = group_rank(data1, bucket(rank(bookvalue_ps/cap*sharesout), range="0,1,0.1"));
data2_gp = group_rank(data2, bucket(rank(bookvalue_ps/cap*sharesout), range="0,1,0.1"));
diff = subtract(data1_gp, data2_gp);
-ts_mean(group_rank(diff, bucket(ts_rank(volume, 120), range="0,1,0.1")), 10)
""",
"""
data1 = ts_backfill(mdl77_2liquidityriskfactor_nlassets, 63);
data2 = ts_backfill(mdl77_fangma_mam4, 63);
data1_gp = group_rank(data1, bucket(rank(cap), range="0,1,0.1"));
data2_gp = group_rank(data2, bucket(rank(cap), range="0,1,0.1"));
diff = divide(data1_gp, data2_gp);
-ts_decay_linear(group_rank(diff, subindustry), 1)
""",
"""
data1 = ts_backfill(mdl77_earningsmomemtummodel_pge_cf, 126);
data2 = ts_backfill(mdl77_2liquidityriskfactor_lfd, 126);
data1_gp = group_zscore(data1, market);
data2_gp = group_zscore(data2, market);
diff = divide(data1_gp, data2_gp);
ts_mean(group_rank(diff, bucket(rank(log(sharesout)), range="0,1,0.1")), 63)
""",
"""
data1 = ts_backfill(mdl77_2growthanalystmodel_qga_opmarginsales, 252);
data2 = ts_backfill(mdl77_putput_indestep, 252);
data1_gp = group_neutralize(data1, bucket(rank(cap), range="0,1,0.1"));
data2_gp = group_neutralize(data2, bucket(rank(cap), range="0,1,0.1"));
diff = subtract(data1_gp, data2_gp);
ts_decay_linear(group_rank(diff, bucket(ts_delta(close, 5), range="0,1,0.1")), 1)
""",
"""
data1 = ts_backfill(mdl77_2put_put_indestep, 126);
data2 = ts_backfill(mdl77_2momemtumanalystmodel_ghcfcfmtt_amq, 126);
data1_gp = group_neutralize(data1, market);
data2_gp = group_neutralize(data2, market);
diff = subtract(data1_gp, data2_gp);
ts_mean(group_rank(diff, bucket(rank(assets), range="0,1,0.1")), 42)
"""
]
# ──────── 基本方法 ────────
def create_ind():
    return {
        'decay': random.choice(decay_choices),
        'delay': random.choice(delay_choices),
        'truncation': random.choice(truncation_choices),
        'universe': random.choice(universe_choices),
        'neutralization': random.choice(neutralization_choices)
    }

def crossover(a, b):
    return {k: random.choice([a[k], b[k]]) for k in genes}

def mutate(ind):
    key = random.choice(genes)
    if key == 'decay':
        ind[key] = random.choice(decay_choices)
    elif key == 'delay':
        ind[key] = random.choice(delay_choices)
    elif key == 'truncation':
        ind[key] = random.choice(truncation_choices)
    elif key == 'universe':
        ind[key] = random.choice(universe_choices)
    elif key == 'neutralization':
        ind[key] = random.choice(neutralization_choices)
    return ind

def get_next_expression():
    global expression_queue
    if not expression_queue:
        expression_queue = random.sample(fixed_expressions, len(fixed_expressions))
    return expression_queue.pop()

def build_payload(individual=None):
    if individual:
        decay = individual['decay']
        delay = individual['delay']
        truncation = individual['truncation']
        universe = individual['universe']
        neutralization = individual['neutralization']
    else:
        decay = random.choice(decay_choices)
        delay = random.choice(delay_choices)
        truncation = random.choice(truncation_choices)
        universe = random.choice(universe_choices)
        neutralization = random.choice(neutralization_choices)

    settings = {
        'instrumentType': 'EQUITY',
        'region': 'USA',
        'universe': universe,
        'language': 'FASTEXPR',
        'decay': decay,
        'delay': delay,
        'truncation': truncation,
        'neutralization': neutralization,
        'pasteurization': 'ON',
        'testPeriod': 'P0Y0M',
        'unitHandling': 'VERIFY',
        'nanHandling': 'OFF',
        'visualization': False
    }

    if payload_mode == "template_space":
        expr_template = current_template
        data_space, shared_space = template_to_spaces[current_template]

        for key, values in data_space.items():
            pattern = re.escape(key)
            value = random.choice(values)
            expr_template = re.sub(pattern, value, expr_template)

        for key, values in shared_space.items():
            pattern = re.escape(key)
            value = random.choice(values)
            expr_template = re.sub(pattern, str(value), expr_template)

        missing_keys = re.findall(r"<[^>]+>", expr_template)
        if missing_keys:
            raise ValueError(f"模板中以下變數未被替換：{missing_keys}")

        expr = expr_template.strip()

    elif payload_mode == "fixed_expression":
        expr = get_next_expression().strip()
        for key, values in shared_space_global.items():
            pattern = re.escape(key)
            value = random.choice(values)
            expr = re.sub(pattern, str(value), expr)
    else:
        raise ValueError(f"Unknown payload_mode: {payload_mode}")

    return {
        'type': 'REGULAR',
        'settings': settings,
        'regular': expr
    }
    settings = get_next_settings()

    if payload_mode == "template_space":
        expr_template = current_template
        data_space, shared_space = template_to_spaces[current_template]

        for key, values in data_space.items():
            pattern = re.escape(key)
            value = random.choice(values)
            expr_template = re.sub(pattern, value, expr_template)

        for key, values in shared_space.items():
            pattern = re.escape(key)
            value = random.choice(values)
            if isinstance(value, list):
                value = str(value)
            elif not isinstance(value, str):
                value = str(value)
            expr_template = re.sub(pattern, value, expr_template)

        missing_keys = re.findall(r"<[^>]+>", expr_template)
        if missing_keys:
            raise ValueError(f"模板中以下變數未被替換：{missing_keys}")

        expr = expr_template.strip()

    elif payload_mode == "fixed_expression":
        expr = get_next_expression().strip()
        if individual:
            settings['decay'] = individual['decay']
            settings['delay'] = individual['delay']
            settings['truncation'] = individual['truncation']
            settings['neutralization'] = random.choice(neutralization_choices)

        for key, values in shared_space_global.items():
            pattern = re.escape(key)
            value = random.choice(values)
            if isinstance(value, list):
                value = str(value)
            elif not isinstance(value, str):
                value = str(value)
            expr = re.sub(pattern, value, expr)

    else:
        raise ValueError(f"Unknown payload_mode: {payload_mode}")

    return {
        'type': 'REGULAR',
        'settings': settings,
        'regular': expr
    }

test_payload = build_payload()

print("=== 測試生成的 Expression ===")
print(test_payload['regular'])

print("\n=== 對應的設定 ===")
print(test_payload['settings'])

In [None]:
# ───────────────────── GA 主迴圈 ─────────────────────
POP    = 100
MAX_W  = 3
GENS   = 10
MINPOP = 50
MAXPOP = 500
INJECT = 0.4

target_success_count = 50

population = [create_ind() for _ in range(POP)]
results    = []
for gen in range(1, GENS+1):
    if datetime.datetime.now() > session_start + datetime.timedelta(hours=4) - datetime.timedelta(minutes=5):
        print("Session 將過期，存檔退出"); break
    batch = [(ind, build_payload(ind)) for ind in population]
    succ_this_gen, sent_idx = 0, 0
    futures = {}
    with tqdm(total=len(batch), desc=f"Gen{gen}") as bar, \
         concurrent.futures.ThreadPoolExecutor(MAX_W) as exe:
        while sent_idx < len(batch) and len(futures)<MAX_W:
            ind,pay = batch[sent_idx]; sent_idx+=1
            futures[exe.submit(simulate,s,pay,bar)] = ind; bar.update()
        while futures:
            done = next(concurrent.futures.as_completed(futures))
            ind  = futures.pop(done)
            ok, js = done.result()
            if ok and is_valid(js):
                sh, ft, tv = js['is']['sharpe'], js['is']['fitness'], js['is'].get('turnover',0)
                if sh>=1.25 and ft>=1.0 and 0.01<=tv<=0.7:
                    name = f"Alpha_{len(results)+1:04d}"
                    bar_msg(bar, f"{name} S={sh:.2f} F={ft:.2f} T={tv:.3f}")
                    results.append((name, ind, sh, ft, tv)); succ_this_gen += 1
            if sent_idx < len(batch):
                ind2,pay2 = batch[sent_idx]; sent_idx+=1
                futures[exe.submit(simulate,s,pay2,bar)] = ind2; bar.update()
    success_rate = succ_this_gen/POP
    if len(results) >= target_success_count:
        print(f"已達成目標成功組合數 {target_success_count} 組，存檔退出"); break
    cross = 0.9 if success_rate>=.2 else .5 if success_rate<=.05 else .7
    mut   = .8  if success_rate<=.05 else .5 if success_rate<=.15 else .3 if success_rate<=.3 else .1
    if success_rate<=.05: POP=min(POP*2, MAXPOP)
    elif success_rate<=.15:POP=min(int(POP*1.5), MAXPOP)
    elif success_rate>=.3: POP=max(int(POP*.7), MINPOP)
    top = [r[1] for r in sorted(results,key=lambda x:x[3],reverse=True)[:10]]
    if not top: population=[create_ind() for _ in range(POP)]
    else:
        population = top[:]
        while len(population)<POP:
            if random.random()<INJECT: population.append(create_ind()); continue
            child = crossover(random.choice(top), random.choice(top)) if random.random()<cross else create_ind()
            if random.random()<mut: child = mutate(child)
            population.append(child)
print("\n=== 符合條件 Alpha ===")
for n,_,sh,ft,tv in results:
    print(f"{n}  Sharpe={sh:.2f}  Fitness={ft:.2f}  Turn={tv:.3f}")