In [None]:
import pandas as pd 
import numpy as np 

In [None]:
high_corr_dict = {}
threshold = 0.4 # 상관관계 임계값 설정 (원하는 값으로 수정 가능)

for partition_num in range(0,10):
    file_path = f"feature_corr/corr_csv/{partition_num}.csv"
    corr = pd.read_csv(file_path, index_col=0)
    
    # 상관관계가 threshold보다 높은 경우만 처리
    high_corr = corr[abs(corr['correlation']) > threshold]  # correlation 열의 절댓값이 threshold보다 큰 행만 필터링
    
    for pair in (high_corr['var1'] + "-" + high_corr['var2']).values:
        if pair in high_corr_dict.keys():
            high_corr_dict[pair] += 1
        else:
            high_corr_dict[pair] = 1

In [None]:
high_corr_df = pd.DataFrame([high_corr_dict], index=["count"]).T
high_corr_df.sort_values(by="count", ascending=False).head()

In [None]:
# 몇개의 partition에서 threshold를 넘긴 feature 쌍이 존재하는 지 확인
high_corr_df.value_counts()

In [None]:
import networkx as nx
from community.community_louvain import best_partition  # 변경된 import 방식

# 상관관계가 높은 feature pair 데이터를 네트워크 그래프로 변환
def create_feature_clusters(df_high_corr):
    # 네트워크 그래프 생성
    G = nx.Graph()
    
    # DataFrame의 인덱스(feature pair)를 순회하며 엣지 추가
    for pair in df_high_corr.index:
        feat1, feat2 = pair.split('-')
        # 가중치는 해당 pair가 발견된 횟수(count)
        weight = df_high_corr.loc[pair, 'count']
        G.add_edge(feat1, feat2, weight=weight)
    
    # Louvain 방법을 사용하여 커뮤니티(클러스터) 탐지
    clusters = best_partition(G)
    

    
    # 클러스터별 feature 목록 반환
    cluster_groups = {}
    for node, cluster_id in clusters.items():
        if cluster_id not in cluster_groups:
            cluster_groups[cluster_id] = []
        cluster_groups[cluster_id].append(node)
    
    return cluster_groups, G

# 클러스터링 실행
cluster_groups, graph = create_feature_clusters(high_corr_df[high_corr_df['count']==10])

# 클러스터 결과 출력
for cluster_id, features in cluster_groups.items():
    print(f"\nCluster {cluster_id}:")
    print(", ".join(features))

In [None]:
def get_representative_features():
   """
   각 클러스터에서 가장 작은 feature 번호를 대표값으로 반환하고
   제거할 feature 목록을 반환하는 하드코딩된 함수
   """
   # 대표 feature 목록 (각 클러스터에서 가장 작은 번호)
   representative_features = [
       'feature_73',  # Cluster 0 
       'feature_12',  # Cluster 1
       'feature_15',  # Cluster 2
       'feature_05',  # Cluster 3 
       'feature_32',  # Cluster 5
       'feature_42',  # Cluster 6
       'feature_39',  # Cluster 7
       'feature_24',  # Cluster 8
       'feature_50',  # Cluster 4
       'feature_09',  # Cluster 11
       'feature_22',  # Cluster 12
       'responder_4', # Cluster 9
       'feature_48'   # Cluster 10
   ]

   # 제거할 feature 목록
   features_to_remove = [
       # Cluster 0
       'feature_74', 'feature_75', 'feature_76', 'feature_77', 'feature_78',
       
       # Cluster 1
       'feature_13', 'feature_14', 'feature_67', 'feature_68', 'feature_69', 
       'feature_70', 'feature_71', 'feature_72',
       
       # Cluster 2
       'feature_16', 'feature_17',
       
       # Cluster 3
       'feature_06', 'feature_07', 'feature_08', 'feature_18', 'feature_19',
       'feature_37', 'feature_38', 'feature_45', 'feature_46', 'feature_56',
       'feature_57', 'feature_58', 'feature_65', 'feature_66',
       
       # Cluster 5
       'feature_34', 'feature_35', 'feature_61',
       
       # Cluster 6
       'feature_44',
       
       # Cluster 7
       'feature_41',
       
       # Cluster 8
       'feature_25',
       
       # Cluster 4
       'feature_52', 'feature_53', 'feature_55', 'feature_59', 'feature_60',
       
       # Cluster 11
       'feature_11',
       
       # Cluster 12
       'feature_23',
       
       # Cluster 9
       'responder_7',
       
       # Cluster 10
       'feature_49'
   ]
   
   return representative_features, features_to_remove

# 함수 사용 예시
rep_features, remove_features = get_representative_features()

print("대표 Features:")
print(sorted(rep_features))
print("\n제거할 Features:")
print(sorted(remove_features))
print(f"\n대표 Features 개수: {len(rep_features)}")
print(f"제거할 Features 개수: {len(remove_features)}")

In [None]:
df = pd.read_parquet('jane-street-real-time-market-data-forecasting/train.parquet/partition_id=6/part-0.parquet')

In [None]:
result_df = df.drop(remove_features, axis=1) # 대표 feature에 해당하지 않고 클러스터에 속한 feature drop

In [None]:
feature_col = result_df.columns[result_df.columns.str.contains("feature") | result_df.columns.str.contains("id")] # 데이터 프레임에서 feature와 symbol, time, date 필터링
responder_col = result_df.columns[result_df.columns.str.contains("responder")] # target column 필터링
rolling_feature_col = [col for col in feature_col if col not in (['date_id', 'time_id', 'symbol_id']+responder_col.to_list())] # moving average를 구할 대표 feature만 필터링

In [None]:
print("# of feature_col:", feature_col.shape[0])
print("# of rolling_feature_col:", len(rolling_feature_col))

--------------

아래함수는 Moving Average를 생성하는 함수입니다. 

In [None]:
def create_rolling_feature(data, rolling_window_size:list, rolling_feature_col:list, symbols) -> pd.DataFrame:    
    """
    data를 인풋으로 받아서 각 symbol 별로 rolling_window_size에 해당하는 window size를 갖는 Moving Average를 계산하는 함수입니다. 
    
    data: 데이터가 담긴 DataFrame
    rolling_window_size: Moving Average를 계산할 window size  
    rolling_faeture_col: Moving Average를 계산할 Feautre (그냥 Data에 넘기는 방법으로 코드를 이쁘게 작성할 수 있을 듯 합니다)
    symbols: 데이터에 존재하는 symbol (이부분도 그냥 data에 접근해서 얻을 수 있는 부분입니다 | 다만, 메모리 때문에 이렇게 구현했습니다.)
    """
    # 결과를 저장할 빈 리스트
    result_chunks = []
    
    # 심볼별로 순차 처리
    for symbol in symbols:
        print(f"Complete Symbol {symbol}")
        # 1. 필요한 컬럼만 선택하여 메모리 사용 최소화
        symbol_data = data.loc[data['symbol_id'] == symbol,].copy().ffill().bfill()
        
        # 2. 각 window size별로 처리
        for window in rolling_window_size:
            # 새로운 컬럼명
            new_cols = [f'{col}_rolling_{window}' for col in rolling_feature_col]
            
            # 3. 한 번에 하나의 feature만 처리
            for feat, new_col in zip(rolling_feature_col, new_cols):
                symbol_data[new_col] = (symbol_data[feat]
                                      .rolling(window=window)
                                      .mean()
                                      .ffill()
                                      .bfill())
        
        # 4. 처리된 청크 저장
        result_chunks.append(symbol_data)
        
        # 5. 메모리에서 불필요한 데이터 명시적 제거
        del symbol_data
    
    # 6. 청크 병합 및 반환
    return pd.concat(result_chunks, axis=0, ignore_index=True)

In [None]:
def create_rolling_feature_slice_concat_with_save(data, rolling_window_size, rolling_feature_col, symbols_num=5, save_path='temp_results'):
    """
    바로 위에 있는 함수랑 기능은 똑같은데 메모리 때문에 symbol을 5개씩 처리하는 함수입니다. 
    추가적으로 이함수는 save까지 진행합니다. 
    없애도 될 것 같아요. 
    """
    
    import os
    import gc
    import warnings
    warnings.filterwarnings('ignore')
    
    # 임시 저장 디렉토리 생성
    os.makedirs(save_path, exist_ok=True)
    
    symbol_iter = data['symbol_id'].unique().shape[0] // symbols_num
    
    for i in range(symbol_iter):
        print(f"\nProcessing iteration {i+1}/{symbol_iter}")
        
        symbols = data.symbol_id.unique()[i*symbols_num:(i+1)*symbols_num]
        
        result = create_rolling_feature(
            data=data,
            rolling_window_size=rolling_window_size,
            rolling_feature_col=rolling_feature_col,
            symbols=symbols
        )
        
        # 중간 결과 저장
        result.to_parquet(f'{save_path}/chunk_{i}.parquet')
        del result
        gc.collect()
    
    # 마지막 남은 심볼들 처리
    if data.symbol_id.unique().shape[0] % symbols_num != 0:
        remaining_symbols = data.symbol_id.unique()[symbol_iter*symbols_num:]
        result = create_rolling_feature(
            data=data,
            rolling_window_size=rolling_window_size,
            rolling_feature_col=rolling_feature_col,
            symbols=remaining_symbols
        )
        result.to_parquet(f'{save_path}/chunk_final.parquet')
    

    
    return 

rolling_windows = [2, 10, 30, 50, 200, 500, 1000]
final_df = create_rolling_feature_slice_concat_with_save(
    data=result_df.reset_index(),
    rolling_window_size=rolling_windows,
    rolling_feature_col=rolling_feature_col,
    symbols_num=5
)

------------

아래 함수들은 제가 메모리에 한번에 올리지 못해서 사용한 함수입니다. <br>
쪼개져서 저장된 데이터를 합치는 역할을 합니다.

In [None]:
def concat_results_optimized(file_path, save_name='final_result.parquet'):
    """
    Polars를 사용해 parquet 파일들을 메모리 효율적으로 합치고 정렬하는 함수
    
    Parameters:
    -----------
    file_path : str
        parquet 파일들이 저장된 경로
    save_name : str
        저장할 파일명
        
    Returns:
    --------
    pd.DataFrame: 정렬된 최종 데이터프레임
    """
    import os
    import polars as pl
    import gc
    
    print("Starting concatenation...")
    
    # 해당 경로의 모든 parquet 파일 찾기
    all_files = [os.path.join(file_path, f) for f in os.listdir(file_path) 
                if f.endswith('.parquet')]
    print(f"Found {len(all_files)} parquet files")
    
    # LazyFrame으로 모든 파일 읽기
    lfs = [pl.scan_parquet(file) for file in all_files]
    
    print("\nConcatenating files...")
    # LazyFrame 상태에서 합치기
    final_lazy = pl.concat(lfs)
    
    # 정렬 및 계산 실행
    print("Sorting and computing...")
    final_df = (final_lazy
                .sort(by='index')  # 인덱스로 정렬
                .collect(streaming=True)  # 스트리밍 모드로 계산
    )
    
    return final_df

final_df = concat_results_optimized(
    file_path='temp_results',
    save_name='final_rolling_features.parquet'
)

In [None]:
import polars as pl
import os

def save_chunks_to_directory(df: pl.DataFrame, output_dir: str, chunk_size: int = 100_000, prefix: str = "chunk_"):
    """
    DataFrame의 각 청크를 별도의 parquet 파일로 저장
    
    Args:
        df: 저장할 Polars DataFrame
        output_dir: 저장할 디렉토리 경로
        chunk_size: 각 청크의 크기
        prefix: 파일명 접두사
    """
    # 디렉토리가 없으면 생성
    os.makedirs(output_dir, exist_ok=True)
    
    total_rows = df.height
    chunks_count = (total_rows + chunk_size - 1) // chunk_size  # 올림 나눗셈
    
    # 각 청크를 개별 파일로 저장
    for i in range(0, total_rows, chunk_size):
        chunk = df.slice(i, min(i + chunk_size, total_rows))
        chunk_number = i // chunk_size
        file_name = f"{prefix}{chunk_number:04d}.parquet"  # 예: chunk_0000.parquet
        file_path = os.path.join(output_dir, file_name)
        
        chunk.write_parquet(
            file_path,
            compression="snappy",
            statistics=False,
            use_pyarrow=True
        )
        
        # 진행상황 출력 (선택사항)
        print(f"저장 완료: {file_name} ({chunk_number + 1}/{chunks_count})")

# 사용 예시
# final_df를 chunks 디렉토리에 저장
save_chunks_to_directory(
    df=final_df, 
    output_dir="chunks", 
    chunk_size=1_000_000,
    prefix="final_rolling_"
)

# 나중에 모든 청크를 다시 읽을 때 사용할 수 있는 함수
def read_all_chunks(directory: str) -> pl.DataFrame:
    """
    디렉토리의 모든 parquet 파일을 읽어서 하나의 DataFrame으로 결합
    """
    # parquet 파일 목록 가져오기
    parquet_files = sorted([
        os.path.join(directory, f) 
        for f in os.listdir(directory) 
        if f.endswith('.parquet')
    ])
    
    # 모든 파일 읽어서 결합
    return pl.concat([
        pl.read_parquet(f) 
        for f in parquet_files
    ])