In [None]:
# !pip install pandas
# !pip install numpy
# !pip install matplotlib
# !pip install networkx
# !pip install requests
# !pip install openai
# !pip install nest_asyncio
# !pip install tqdm
# !pip install Jinja2

# LLM으로 알파 생성하기
* LLM이 발달하면서 이제 mini 등 가벼운 모델로도 알파를 꽤나 쉽게 만들 수 있는 환경이 되었습니다. 
* 오늘 돌리면서 한 쿼리를 6번 정도 날렸는데 $0.4 정도가 나왔습니다. 
* 한 번 알파 100개 만드는 시도를 할 때 약 100원 정도 들어간다고 생각하시면 될 것 같습니다.
* 사양이 괜찮으신 분들은 (또는 최신 맥북을 쓰신다면) ollama로 로컬 LLM도 한번 써보세요! 생각보다 괜찮습니다 성능이

# 데이터필드 정보 저장하기
USA TOP3000 기준 30분-1시간 정도 걸리는 작업입니다.

In [2]:
# 처음에만 진행하시면 됩니다.
import AAF as aaf
import ace_lib as ace

# 세팅 
settings = {

    "nan_handling" : "OFF",
    "instrumentType": "EQUITY",      # FIXED
    "delay"          : 1,
    "universe"       : "TOP3000",
    "truncation"     : 0.01,
    "unitHandling"   : "VERIFY",     # FIXED
    "pasteurization" : "ON",
    "region"         : "USA",
    "language"       : "FASTEXPR",   # FIXED

    "decay"          : 0,
    "neutralization" : "MARKET",     # Can be varied
    "visualization"  : False
    
}

s = ace.start_session()
operators = ace.get_operators(s)

aaf.initiate_datafield(s,settings)


Complete biometrics authentication and press any key to continue: 
https://api.worldquantbrain.com/authentication/persona?inquiry=inq_WHLzFUnnf6B8BLPfvNEoVfaBsNHi


Region: USA | Universe: TOP3000 | Delay: 1
Total datasets available: 206
Whitelisted datasets to process: 10
Skipped (not in whitelist): 196



Processing datasets: 100%|██████████| 10/10 [00:00<00:00, 376.36dataset/s, Skipped (exists): socialmedia8]


Creating total.json...
total.json created successfully.

SUMMARY
Successfully processed: 0
Skipped (already exists): 10
Failed (logged to errors.log): 0






# Expression Sanity checker
데이터필드 정보를 이용해서, LLM이 만든 expression이 실제 브레인에서 돌아갈 수 있는지를 체크합니다.

In [16]:
import llm_functions as llm
from parser import *
import ace_lib as ace
import numpy as np
import pandas as pd
import os 
import json
import asyncio



datafields = llm.import_json(f'datafield/1/USA/TOP3000/1_{settings["region"]}_{settings["universe"]}_total.json')
operators = llm.import_json('operators_list.json')

def return_type(node, operators, datafields):
    if node.node_type == 'operator':
        return operators[node.value]['output']

    elif node.node_type == 'number':
        return "NUMBER"

    elif node.node_type == 'datafield':
        if node.value.lower() == 'nan':
            return "MATRIX"
        return datafields[node.value]['type']

    else:
        return "SPECIAL_ARGUMENT"


def check_input(operator_inputs, children_types, _debug = False):
    if _debug: print(operator_inputs)
    while len(operator_inputs) != 0:

        operator_input = operator_inputs.pop(0)
        if _debug: print(f"operator input: {operator_input}, remain inputs: {len(operator_inputs)}" )

        if operator_input == '_SPECIAL_ARGUMENT':
            if len(children_types) == 0:
                pass
            elif children_types[0] == '_SPECIAL_ARGUMENT':
                if _debug: print(operator_input)
                if _debug: print(children_types.pop(0))
            else:
                pass

        else:
            if len(children_types) == 0:
                return False

            children_type = children_types.pop(0)
            if operator_input == "MATRIX" and children_type == "NUMBER":
                pass
            elif operator_input != children_type:
                if _debug: print(operator_input ,children_type, operator_input == children_type)
                return False

    return True


def sanity_checker(exp, _debug=False):

    try:
        exp_tree = tree_node(exp)

        for node in [node for node in exp_tree.collect_all_nodes() if node.node_type == "operator"]:

            if check_input(eval(operators[node.value]['input']), [return_type(x,operators, datafields) for x in node.children]):
                if _debug: print(check_input(eval(operators[node.value]['input']), [return_type(x) for x in node.children]))
            else:
                return False
        if return_type(exp_tree,operators, datafields)=="MATRIX":
            return True

        else:
            return False
    except Exception as e:
        print(f"err: {e}")
        return False

## Sanity check 예시

In [22]:
sanity_checker('bucket(rank(star_si_country_rank), range="0,100,10")') # 이상한 수식, False

False

In [21]:
sanity_checker('subtract(star_si_shortsqueeze_rank, shrt2_t3m_volatility_rank)') # 정상적인 expression, True

err: 'star_si_shortsqueeze_rank'


False

# 탐색 데이터셋 설정
여기서는 데이터셋 카테고리별로 User Count가 가장 높은 순으로 정렬해서 탐색

In [None]:
s = ace.start_session()

region = "USA"
universe = "TOP3000"

datafields = llm.import_json(f'datafield/1/{region}/{universe}/1_{region}_{universe}_total.json')
operators = llm.import_json('operators_list.json')


datasets = ace.get_datasets(s, region =region, universe = universe)
datasets = datasets.sort_values(['category_id','userCount'],ascending=False)

userCounts = []
DatasetNames = []
datasets = datasets.sort_values(['category_id','userCount'],ascending=False).drop_duplicates(['id'])

for category in datasets.category_id.unique():
    category_datasets = datasets[datasets.category_id == category]
    userCounts.append(category_datasets.userCount.reset_index(drop=True).map(lambda x: int(x) if x>0 else x))
    DatasetNames.append(category_datasets.id.reset_index(drop=True))

DatasetNamesDF = pd.DataFrame(DatasetNames).T
DatasetNamesDF.columns = datasets.category_id.unique()
DatasetNamesDF.to_csv(f'./datasets/1/{region}/{universe}/1_{region}_{universe}_dataset.csv')
datasets_list = [ x for x in sum([list(x) for x in DatasetNamesDF.values],start=[]) if not pd.isna(x)]
datasets_list.remove('pv1')

# ALLOWED_DATASETS 필터링 (허용된 dataset만 처리)
import AAF
datasets_list = [d for d in datasets_list if d in AAF.ALLOWED_DATASETS.get(region, [])]

datasets_dict = {row['id']: row.to_dict() for _, row in datasets.iterrows()}

# 상위 5개 예시
datasets_list[:5]

## 시뮬레이션 하기에 앞서...
* 저카운트와 필드카운트에 따라 몇번 expression을 생성하고 돌릴지 결정 
* llm으로 만드는 알파들을 따로 json으로 저장
* csv 로드하고 저장하는 함수

In [24]:
# 유저카운트와 필드카운트로 몇번 expression을 생성하고 돌릴지 결정 
# 한번에 100개씩 generate하니까 10은 알파를 1000개 만들어서 돌리겠다는 뜻
def datset_to_simnum(dataset):
    score = int(np.log10(datasets_dict[dataset]['userCount']+1) + datasets_dict[dataset]['fieldCount'])

    if score <= 3:
        return 1
    elif score <= 10:
        return 2
    elif score <= 50:
        return 3
    elif score <= 100:
        return 10
    else:
        return 20


# llm으로 알파 리스트를 만들면 json도 저장해주기
def get_json_num(dataset):

    json_list = [x for x in os.listdir('./gen_json/') if dataset in x]
    if len(json_list) == 0:
        return 0
    nums = [int(x.split('_')[-1].split('.')[0]) for x in json_list]

    return max(nums)+1

# csv 로드하고 저장하는 함수
def load_csv(path: str) -> pd.DataFrame:
    if not os.path.exists(path):
        return pd.DataFrame()
    df = pd.read_csv(path) # index_col=None(기본값): 인덱스 컬럼으로 읽지 않음
    # 과거에 인덱스가 저장되어 있으면 'Unnamed: 0' 같은 컬럼 제거
    df = df.loc[:, ~df.columns.str.match(r'^Unnamed: \d+$')]
    return df

def save_csv(df: pd.DataFrame, path: str) -> None:
    df.to_csv(path, index=False)

def append_and_save_csv(path: str, new_rows: pd.DataFrame) -> pd.DataFrame:
    base = load_csv(path)
    out = pd.concat([base, new_rows], ignore_index=True) # 인덱스 재부여
    save_csv(out, path) # index=False
    return out

# 시뮬레이션 돌리기
* 데이터셋에 따라 정해진 횟수에 맞추어 시뮬레이션 돌리기
* 원한다면 datasets_list를 조작하여 원하는 데이터셋에 맞추어 돌릴 수 있음

In [25]:
# ============================================================
# 시뮬레이션 돌리기 (Resume 지원 버전)
# - 이미 생성된 gen_json은 스킵하고 부족한 배치만 생성
# - 중간에 끊겨도 다시 실행하면 이어서 진행
# ============================================================

import os
import json
import re

# ------------------------------------------------------------
# Resume 지원 함수들
# ------------------------------------------------------------

def get_existing_json_count(dataset, gen_dir="./gen_json"):
    """
    gen_dir에서 {dataset}_*.json 패턴 파일 개수를 반환
    """
    if not os.path.exists(gen_dir):
        os.makedirs(gen_dir, exist_ok=True)
        return 0
    
    pattern = re.compile(rf"^{re.escape(dataset)}_(\d+)\.json$")
    count = 0
    for filename in os.listdir(gen_dir):
        if pattern.match(filename):
            count += 1
    return count


def get_next_json_index(dataset, gen_dir="./gen_json"):
    """
    gen_dir에서 {dataset}_*.json 패턴 파일을 찾고
    가장 큰 k를 계산한 다음 max_k + 1 반환
    파일이 없으면 0 반환
    """
    if not os.path.exists(gen_dir):
        os.makedirs(gen_dir, exist_ok=True)
        return 0
    
    pattern = re.compile(rf"^{re.escape(dataset)}_(\d+)\.json$")
    max_k = -1
    
    for filename in os.listdir(gen_dir):
        match = pattern.match(filename)
        if match:
            k = int(match.group(1))
            if k > max_k:
                max_k = k
    
    return max_k + 1


def save_json_safe(data, filepath):
    """
    저장 직전에 파일 존재 여부 체크 (레이스 방지)
    이미 존재하면 저장하지 않고 False 반환
    """
    if os.path.exists(filepath):
        print(f"[WARN] File already exists, skipping: {filepath}")
        return False
    
    with open(filepath, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=2)
    return True


# ------------------------------------------------------------
# 메인 루프 (Resume 지원)
# ------------------------------------------------------------

GEN_DIR = "./gen_json"
os.makedirs(GEN_DIR, exist_ok=True)

for dataset in datasets_list[1:]:
    
    # 목표 생성 횟수
    target_simnum = datset_to_simnum(dataset)
    
    # 현재 존재하는 파일 개수
    existing_count = get_existing_json_count(dataset, GEN_DIR)
    
    # 남은 생성 횟수
    remaining = target_simnum - existing_count
    
    # 다음 저장할 인덱스
    start_index = get_next_json_index(dataset, GEN_DIR)
    
    # 로그 출력
    print(f"\n{'='*60}")
    print(f"[DATASET] {dataset}")
    print(f"  existing={existing_count}  target={target_simnum}  remaining={remaining}  start_index={start_index}")
    print(f"{'='*60}")
    
    # 이미 완료된 경우 스킵
    if remaining <= 0:
        print(f"[SKIP] dataset={dataset} already complete ({existing_count}/{target_simnum})")
        continue
    
    dataset_category = datasets_dict[dataset]['category_id']
    batch_name = f"{region}_{universe}_{dataset_category}"
    
    # remaining 만큼만 생성
    for i in range(remaining):
        
        current_index = start_index + i
        save_path = f"{GEN_DIR}/{dataset}_{current_index}.json"
        
        # 저장 전 파일 존재 여부 재확인 (레이스 방지)
        if os.path.exists(save_path):
            print(f"[SKIP] {save_path} already exists, skipping this batch")
            continue
        
        print(f"\n--- [{dataset}] Batch {i+1}/{remaining} (index={current_index}) ---")
        print("--- Generating alphas with LLM ---")
        
        try:
            # LLM으로 알파 생성
            response = llm.generate_expressions_from_dataset(
                s, region, universe, dataset, 
                model='gpt-5-mini-2025-08-07', 
                datafields_num_cap=500
            )
        except Exception as e:
            print(f"[ERROR] LLM generation failed: {e}")
            continue
        
        print("--- Received response ---")
        
        try:
            json_response = llm.cut_first_to_last_brace(response)
            alphas = json.loads(json_response)
            print(f"--- Alpha generated successfully ---")
        except Exception as e:
            print(f"[ERROR] JSON parsing failed: {e}")
            continue
        
        # JSON 저장 (안전하게)
        if not save_json_safe(alphas, save_path):
            print(f"[WARN] Could not save {save_path}, skipping simulation")
            continue
        
        print(f"[SAVED] {save_path}")
        
        # ------------------------------------------------------------
        # 시뮬레이션 실행
        # ------------------------------------------------------------
        
        # Sanity check로 유효한 알파만 필터링
        alpha_expressions = [
            x for x in alphas.get("results", []) 
            if sanity_checker(x.get('implementation', ''), _debug=False)
        ]
        
        if len(alpha_expressions) == 0:
            print(f"[WARN] No valid alphas after sanity check")
            continue
        
        alpha_list = [
            ace.generate_alpha(
                regular=x['implementation'], 
                region=region, 
                universe=universe, 
                neutralization="INDUSTRY"
            ) 
            for x in alpha_expressions
        ]
        
        len_alpha_list = len(alpha_list)
        print(f"[INFO] eligible_alpha_count: {len_alpha_list}")
        
        tags_list = [
            ['llm_test', f"confidence_{x.get('confidence_level', 0)}"] 
            for x in alpha_expressions
        ]
        descs_list = [x.get('description', '') for x in alpha_expressions]
        
        # 8개씩 배치로 시뮬레이션
        for rep in range(0, len_alpha_list, 8):
            
            # 세션 타임아웃 체크
            if ace.check_session_timeout(s) < 500:
                print("[INFO] Session timeout approaching, refreshing...")
                s = ace.start_session()
            
            batch_end = min(rep + 8, len_alpha_list)
            
            try:
                print(f"--- Simulating alpha batch {rep} to {batch_end} ---")
                
                sim_results = list(
                    ace.multi_simulate_alphas_map(
                        s,
                        alpha_list[rep:batch_end],
                        tags_list[rep:batch_end],
                        descs_list[rep:batch_end],
                        batch_end - rep
                    )
                )
                
                df = pd.DataFrame([
                    ace.flatten_dict(x) 
                    for x in sim_results 
                    if 'id' in x.keys()
                ])
                
                if len(df) > 0:
                    append_and_save_csv(f"./results/{batch_name}.csv", df)
                    print(f"[SAVED] {len(df)} results to ./results/{batch_name}.csv")
                    
            except Exception as e:
                print(f"[ERROR] Simulation failed: {e}")
                continue

print("\n" + "="*60)
print("[DONE] All datasets processed")
print("="*60)


[DATASET] shortinterest43
  existing=3  target=3  remaining=0  start_index=3
[SKIP] dataset=shortinterest43 already complete (3/3)

[DATASET] sentiment1
  existing=3  target=3  remaining=0  start_index=3
[SKIP] dataset=sentiment1 already complete (3/3)

[DATASET] risk70
  existing=17  target=20  remaining=3  start_index=17

--- [risk70] Batch 1/3 (index=17) ---
--- Generating alphas with LLM ---
[ERROR] LLM generation failed: The api_key client option must be set either by passing api_key to the client or by setting the OPENAI_API_KEY environment variable

--- [risk70] Batch 2/3 (index=18) ---
--- Generating alphas with LLM ---


KeyboardInterrupt: 

# LLM 함수 해설

In [None]:
import random
# def generate_expressions_from_dataset(s, alpha_region, alpha_universe, dataset_id, model = 'gpt-5-mini-2025-08-07', datafields_num_cap = 500, alpha_num = 100 ):

# 함수 인풋들
alpha_region = "USA"
alpha_universe = "TOP3000"
dataset_id = "other463"
datafields_num_cap = 500
alpha_num = 100

# 데이터셋에서 데이터필드 불러오기
data_fields = ace.get_datafields(s, region=alpha_region, universe=alpha_universe, dataset_id=dataset_id, data_type='ALL')
data_fields = data_fields[data_fields['type'] != "GROUP"]
# data_fields는 [[id1, id2, id3, ...], [description1, description2, description3, ...]] 이런 형태로 생김

# data_fileds를 각각의 아이템으로 변경 
data_fields_list = [row.to_dict() for row in data_fields.iloc]

# 데이터필드 리스트에서 필요한 것들만 불러와서 토큰 줄이기
keys = ['id','description','subcategory','coverage','userCount','alphaCount']
data_fields_inputs = [{k:d[k] for k in keys if k in d} for d in data_fields_list]


# 데이터필드가 너무 많을 경우 적당히 개수에 캡을 둘 것
if len(data_fields_inputs) > datafields_num_cap:
    data_fields_inputs = random.sample(data_fields_inputs, datafields_num_cap)



# 오퍼레이터도 같은 방식으로 만들어 줌
operators = ace.get_operators(s)
operators_list = [row.to_dict() for row in operators[operators['scope']=='REGULAR'].iloc]

# rank는 LLM이 너무 많이 씀 - 빼줌
operator_exclude = ['rank']
operators_list = [x for x in operators_list if x['name'] not in operator_exclude]


answer_form = '''{
    "results":[
        {
            "idea":...,
            "description":...,
            "implementation":...,
            "confidence_level":...
        }
    ]
}'''

prompt = f"""
<MISSION>
Based on the OPERATORS and DATA below,
1. SUGGEST {alpha_num} SEPARATED and DIVERSIFIED Alpha ideas which can create excess return in the market
2. Divide your idea into 4 parts, "idea", "description", "implementation", and "target".
3. "idea" must contain the core idea of the alpha.
4. "description" must include how you considered SUGGESTIONS and KEEP_IN_MINDs, and how much you set the confidence of this alpha. Consider SUGGESTIONS and write the reason you applied or didn't apply each of the SUGGESTIONS.
5. "implementation" must contain implementation of the variation, including specific template (in OPERATOR(MATRIX) form) or exact name of datafield.
6. "confidence_level" should be numerical value set in description, in 0-1 scale. Bigger value means higher confidence of this alpha idea.
Very new user can understand the idea and how to implement this idea.
</MISSION>

<SUGGESTIONS>
SUGGESTION1: Actively use MULTIPLE datafields in DATA. Your main mission is finding great combinations of datafields, among infinite combiations of datafields.
SUGGESTION2: Identify VECTOR type datafiled and wrap it with vec_avg() or vec_sum() operator.
SUGGESTION3: If the datafield's coverage is lower than 0.6, try using ts_backfill() to preprocess the data.
SUGGESTION4: datafields' userCount and alphaCount are the count of users and alphas submitted. Try to use high user and alphaCount so that you can catch the signal easily.
</SUGGESTIONS>

<KEEP_IN_MIND>
KEEP_IN_MIND1: Final implementation MUST NOT be too long.
KEEP_IN_MIND2: Final implementation MUST NOT contain over 7 operators and over 2 datafields.
KEEP_IN_MIND3: You CANNOT use datafiled outside of the datafield lists
KEEP_IN_MIND4: You CANNOT use type=GROUP field by itself. You need to use it as “group” parameter in Group operator.
</KEEP_IN_MIND>

<OPERATORS>
You can use those operators: {operators_list}
</OPERATORS>

<DATA>
And data: {data_fields_inputs}
</DATA>

<ANSWER_FORMAT>
You must answer in this form 
{answer_form}
</ANSWER_FORMAT>
""".strip()


print(prompt)

<MISSION>
Based on the OPERATORS and DATA below,
1. SUGGEST 100 SEPARATED and DIVERSIFIED Alpha ideas which can create excess return in the market
2. Divide your idea into 4 parts, "idea", "description", "implementation", and "target".
3. "idea" must contain the core idea of the alpha.
4. "description" must include how you considered SUGGESTIONS and KEEP_IN_MINDs, and how much you set the confidence of this alpha. Consider SUGGESTIONS and write the reason you applied or didn't apply each of the SUGGESTIONS.
5. "implementation" must contain implementation of the variation, including specific template (in OPERATOR(MATRIX) form) or exact name of datafield.
6. "confidence_level" should be numerical value set in description, in 0-1 scale. Bigger value means higher confidence of this alpha idea.
Very new user can understand the idea and how to implement this idea.
</MISSION>

<SUGGESTIONS>
SUGGESTION1: Actively use MULTIPLE datafields in DATA. Your main mission is finding great combinations 