In [13]:
import re
import pandas as pd
import numpy as np
from pathlib import Path
import glob
import datetime
import random
from tqdm.notebook import tqdm
from sklearn.metrics import mean_squared_error, mean_absolute_error

# Import necessary numpy functions
from numpy import log, sqrt, abs as np_abs, negative as np_negative

def parse_log_file(log_file_path):
    with open(log_file_path, 'r') as file:
        content = file.read()
    
    program_blocks = re.findall(r'--- New Best Program.*?Program: (.*?)\nTest Set Metrics:', content, re.DOTALL)
    return program_blocks

def load_stock_data(data_folder, sample_percentage=0.1):
    all_files = glob.glob(str(Path(data_folder) / "*.parquet"))
    sample_size = max(1, int(len(all_files) * sample_percentage))
    sampled_files = random.sample(all_files, sample_size)
    df_list = []
    
    for file in sampled_files:
        df = pd.read_parquet(file)
        if 'Date' not in df.columns and df.index.name == 'Date':
            df = df.reset_index()
        if 'Date' in df.columns:
            df['Date'] = pd.to_datetime(df['Date'])
            df_list.append(df)
    
    if not df_list:
        raise ValueError("No valid data files found.")
    
    combined_df = pd.concat(df_list, axis=0, ignore_index=True)
    
    if 'Date' not in combined_df.columns:
        raise KeyError("None of the dataframes have 'Date' as a column or index.")
    
    return combined_df.set_index('Date')

def create_lagged_features(df, max_lag=7):
    for col in ['Open', 'High', 'Low', 'Close', 'Volume']:
        for t in range(1, max_lag + 1):
            df[f'{col}{t}'] = df[col].shift(t)
    return df.dropna()

def safe_div(a, b):
    return np.where(b != 0, a / b, 0)

def evaluate_formula(formula, df):
    # Replace function names with numpy equivalents
    formula = formula.replace('add', 'np.add')
    formula = formula.replace('sub', 'np.subtract')
    formula = formula.replace('mul', 'np.multiply')
    formula = formula.replace('div', 'safe_div')
    formula = formula.replace('abs', 'np_abs')
    formula = formula.replace('neg', 'np_negative')
    formula = formula.replace('log', 'np.log')
    formula = formula.replace('sqrt', 'np.sqrt')
    
    # Replace column names
    for col in ['Open', 'High', 'Low', 'Close', 'Volume']:
        formula = re.sub(rf'\b{col}\b', f"df['{col}']", formula)
        for t in range(1, 8):
            formula = re.sub(rf'\b{col}\(t-{t}\)\b', f"df['{col}{t}']", formula)
            formula = re.sub(rf'\b{col}{t}\b', f"df['{col}{t}']", formula)
    
    try:
        result = eval(formula)
        if isinstance(result, np.ndarray):
            result = pd.Series(result, index=df.index)
        return result
    except Exception as e:
        print(f"Error evaluating formula: {formula}")
        print(f"Error message: {str(e)}")
        return None

def calculate_metrics(y_true, y_pred):
    mask = np.isfinite(y_true) & np.isfinite(y_pred)
    y_true, y_pred = y_true[mask], y_pred[mask]
    
    if len(y_true) == 0:
        return None
    
    mse = mean_squared_error(y_true, y_pred)
    mae = mean_absolute_error(y_true, y_pred)
    correlation = np.corrcoef(y_true, y_pred)[0, 1]
    direction_accuracy = np.mean((np.sign(y_true) == np.sign(y_pred)))
    
    return {
        'MSE': mse,
        'MAE': mae,
        'Correlation': correlation,
        'Direction Accuracy': direction_accuracy
    }

def select_extreme_predictions(y_true, y_pred, percentage=10):
    threshold = np.percentile(np.abs(y_pred), 100 - percentage)
    extreme_mask = np.abs(y_pred) >= threshold
    return y_true[extreme_mask], y_pred[extreme_mask]

def main(log_file_path, data_folder, output_file, sample_percentage=0.2):
    formulas = parse_log_file(log_file_path)
    df = load_stock_data(data_folder, sample_percentage)
    df = create_lagged_features(df)
    
    best_formulas = []
    best_metrics = None
    
    for formula in tqdm(formulas, desc="Evaluating formulas", ncols=100, leave=True):
        result = evaluate_formula(formula, df)
        if result is not None:
            y_true, y_pred = select_extreme_predictions(df['Close'].pct_change().shift(-1).dropna(), result.dropna())
            metrics = calculate_metrics(y_true, y_pred)
            if metrics and (best_metrics is None or metrics['Direction Accuracy'] > best_metrics['Direction Accuracy']):
                best_metrics = metrics
                best_formulas = [(formula, metrics)]
            elif metrics and metrics['Direction Accuracy'] == best_metrics['Direction Accuracy']:
                best_formulas.append((formula, metrics))
    
    with open(output_file, 'w') as f:
        for formula, metrics in best_formulas:
            f.write(f"Formula: {formula}\n")
            f.write("Metrics:\n")
            for metric, value in metrics.items():
                f.write(f"  {metric}: {value:.6f}\n")
            f.write("\n")
    
    print(f"Best formulas saved to {output_file}")

if __name__ == "__main__":
    log_file_path = "Results/best_programs.txt"
    data_folder = "Data/PriceData"
    output_file = "selected_best_formulas.txt"
    main(log_file_path, data_folder, output_file)


ImportError: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html