In [1]:
# ================================================================================
# CRITICAL MEMORY OPTIMIZATION FOR NASDAQ 100+ SCALE COMPUTATION
# ================================================================================

import os
import pandas as pd
import numpy as np
from functools import reduce
from operator import mul
from typing import Iterator, Dict, List, Any, Optional
import gc

# --- Core Memory Optimization Helpers ---

def iter_param_combinations(indicator_name: str, INDICATOR_PARAMS: Dict) -> Iterator[Dict[str, Any]]:
    if indicator_name not in INDICATOR_PARAMS:
        return
    param_names = list(INDICATOR_PARAMS[indicator_name].keys())
    param_values = list(INDICATOR_PARAMS[indicator_name].values())
    from itertools import product
    for combo in product(*param_values):
        yield {param_names[i]: combo[i] for i in range(len(param_names))}


def count_param_combinations(indicator_name: str, INDICATOR_PARAMS: Dict) -> int:
    if indicator_name not in INDICATOR_PARAMS:
        return 0
    lengths = [len(v) for v in INDICATOR_PARAMS[indicator_name].values()]
    return reduce(mul, lengths, 1)


def _ensure_df(results_like):
    if isinstance(results_like, list):
        return pd.DataFrame(results_like)
    return results_like


def calculate_signals_vectorized(ticker_data: pd.DataFrame, signals: pd.Series) -> Dict[str, Any]:
    daily_return = ticker_data['Close'].pct_change()
    position = signals.shift(1).fillna(0).astype('float32')
    strategy_return = (position * daily_return).astype('float32')

    valid_returns = strategy_return.dropna()
    if valid_returns.empty:
        return {
            'total_return': np.nan, 'sharpe': np.nan, 'max_drawdown': np.nan,
            'win_rate': np.nan, 'total_signals': 0, 'buy_signals': 0, 'sell_signals': 0
        }

    total_return = float((1.0 + valid_returns).prod() - 1.0)
    std = float(valid_returns.std())
    sharpe = float((valid_returns.mean() / std) * np.sqrt(252)) if std > 1e-8 else 0.0

    equity = (1.0 + valid_returns).cumprod()
    running_max = equity.cummax()
    drawdown = (equity - running_max) / running_max
    max_drawdown = float(drawdown.min())

    win_rate = float((valid_returns > 0).mean())

    return {
        'total_return': total_return,
        'sharpe': sharpe,
        'max_drawdown': max_drawdown,
        'win_rate': win_rate,
        'total_signals': int(signals.notna().sum()),
        'buy_signals': int((signals == 1).sum()),
        'sell_signals': int((signals == -1).sum())
    }


def stream_results_to_csv(results: List[Dict], filename: str, mode: str = 'a') -> None:
    if not results:
        return
    df_chunk = pd.DataFrame(results)
    file_exists = os.path.exists(filename)
    df_chunk.to_csv(filename, mode=mode, header=not file_exists, index=False)
    del df_chunk
    gc.collect()


print("🚀 MEMORY OPTIMIZATION HELPERS LOADED! Ready for NASDAQ 100+ scale.")


🚀 MEMORY OPTIMIZATION HELPERS LOADED! Ready for NASDAQ 100+ scale.


In [2]:
# ================================================================================
# INDICATOR PARAMS (CLEAN) AND SIGNAL LOGIC
# ================================================================================

# Indicators and parameter space (NO ADX/RSI/ATR/Entropy)
INDICATOR_PARAMS = {
    'KAMA': {
        'er_period': list(range(5, 8)),
        'fast_period': list(range(7, 12)),
        'slow_period': list(range(15, 25)),
    },
    'Supertrend': {
        'period': list(range(7, 16)),
        'multiplier': [1.0, 2.0]
    },
    'MFI': {
        'period': list(range(7, 15)),
        'overbought': [80],
        'oversold': [20]
    }
}

# Signal logic

def get_buy_signal(indicator_name: str, data: pd.Series, params: dict) -> pd.Series:
    if indicator_name == 'KAMA':
        short_kama = params.get('short_kama', pd.Series(index=data.index))
        long_kama = params.get('long_kama', pd.Series(index=data.index))
        up = (short_kama > long_kama) & (short_kama.shift(1) <= long_kama.shift(1))
        dn = (short_kama < long_kama) & (short_kama.shift(1) >= long_kama.shift(1))
        s = pd.Series(0, index=data.index)
        s[up] = 1
        s[dn] = -1
        return s

    if indicator_name == 'Supertrend':
        close = params.get('close', pd.Series(index=data.index))
        return pd.Series(np.where(close > data, 1, np.where(close < data, -1, 0)), index=data.index)

    if indicator_name == 'MFI':
        overbought = params.get('overbought', 80)
        oversold = params.get('oversold', 20)
        return pd.Series(np.where(data < oversold, 1, np.where(data > overbought, -1, 0)), index=data.index)

    return pd.Series(0, index=data.index)

print("✅ Clean indicator params & signal logic loaded.")
for k in INDICATOR_PARAMS:
    print(f"  {k}: {count_param_combinations(k, INDICATOR_PARAMS)} combinations")


✅ Clean indicator params & signal logic loaded.
  KAMA: 150 combinations
  Supertrend: 18 combinations
  MFI: 8 combinations


In [3]:
# ================================================================================
# OPTIMIZED GRID SEARCH FUNCTION
# ================================================================================

def run_comprehensive_grid_search_optimized(
    data: pd.DataFrame,
    indicator_params: Dict,
    indicator_class,
    max_combinations_per_indicator: int = 0,
    stream_results_dir: Optional[str] = None,
    summary_interval: int = 100
) -> Dict[str, Dict[str, List[Dict[str, Any]]]]:
    all_results: Dict[str, Dict[str, List[Dict[str, Any]]]] = {}
    tickers = pd.Index(data['Ticker']).unique().tolist()

    print("\n🚀 Starting Optimized Grid Search...")
    print(f"Processing {len(tickers)} tickers with {len(indicator_params)} indicators.")

    for ticker in tickers:
        print(f"\n📊 Processing {ticker}...")
        td = data.loc[data['Ticker'] == ticker].copy()
        td.sort_index(inplace=True)
        ind = indicator_class(td)
        all_results[ticker] = {}

        for indicator_name, _ in indicator_params.items():
            total_combos = count_param_combinations(indicator_name, indicator_params)
            if total_combos == 0:
                continue
            limit = max_combinations_per_indicator if max_combinations_per_indicator > 0 else total_combos
            print(f"    🔁 {indicator_name}: evaluating {limit}/{total_combos} combinations")

            indicator_results: List[Dict[str, Any]] = []
            batch_stream: List[Dict[str, Any]] = []

            for i, params in enumerate(iter_param_combinations(indicator_name, indicator_params), start=1):
                if i > limit:
                    break
                if i % summary_interval == 0 or i == limit:
                    print(f"      -> {i}/{limit} combos processed")
                try:
                    if indicator_name == 'KAMA':
                        vals = ind.kama(params['er_period'], params['fast_period'], params['slow_period'])
                        s_params = {**params, 'close': td['Close']}
                    elif indicator_name == 'Supertrend':
                        vals = ind.supertrend(period=params['period'], multiplier=params['multiplier'])
                        s_params = {**params, 'close': td['Close']}
                    elif indicator_name == 'MFI':
                        vals = ind.mfi(params['period'])
                        s_params = params
                    else:
                        continue

                    sig = get_buy_signal(indicator_name, vals, s_params)
                    metrics = calculate_signals_vectorized(td, sig)

                    entry = {'Ticker': ticker, 'Indicator': indicator_name, 'Parameters': params, **metrics}
                    indicator_results.append(entry)
                    batch_stream.append(entry)

                    if stream_results_dir and (len(batch_stream) >= 500 or i == limit):
                        os.makedirs(stream_results_dir, exist_ok=True)
                        fname = os.path.join(stream_results_dir, f"{ticker}_{indicator_name}.csv")
                        stream_results_to_csv(batch_stream, fname)
                        batch_stream.clear()
                        gc.collect()

                except Exception as e:
                    print(f"    ❌ {indicator_name} params {params} failed: {e}")
                    continue

            all_results[ticker][indicator_name] = indicator_results

            if indicator_results:
                df_show = pd.DataFrame(indicator_results)
                if 'sharpe' in df_show.columns and not df_show['sharpe'].dropna().empty:
                    bi = df_show['sharpe'].idxmax()
                    br = df_show.loc[bi]
                    print(f"    ✅ Best {indicator_name}: Sharpe={br['sharpe']:.3f}, Return={br['total_return']:.3f}")

        del td, ind
        gc.collect()

    print("\n✅ Optimized Grid Search Complete!")
    return all_results


In [4]:
# ================================================================================
# OPTIMIZED ANALYSIS & PLOTTING
# ================================================================================

def analyze_grid_search_results_optimized(all_results: Dict[str, Dict[str, List[Dict[str, Any]]]]) -> Dict[str, Any]:
    analysis = {'best_per_ticker': {}, 'best_indicators_overall': {}, 'summary_stats': {}}
    perf: Dict[str, List[float]] = {}
    all_sharpes: List[float] = []

    for ticker, tr in all_results.items():
        best_sh, best_ind, best_params = -np.inf, None, None
        for ind_name, res_list in tr.items():
            df = _ensure_df(res_list)
            if df.empty or 'sharpe' not in df.columns:
                continue
            s = pd.to_numeric(df['sharpe'], errors='coerce').dropna()
            if s.empty:
                continue
            imax = s.idxmax()
            shv = float(s.loc[imax])
            if shv > best_sh:
                best_sh = shv
                best_ind = ind_name
                best_params = df.loc[imax, 'Parameters']
            perf.setdefault(ind_name, []).extend(s.tolist())
            all_sharpes.extend(s.tolist())
        analysis['best_per_ticker'][ticker] = {'indicator': best_ind, 'sharpe': best_sh, 'params': best_params}

    analysis['best_indicators_overall'] = {
        k: (np.nanmean(v) if v else np.nan) for k, v in perf.items()
    }
    analysis['summary_stats'] = {
        'num_tickers': len(all_results),
        'num_indicators': len(perf),
        'avg_sharpe_overall': float(np.nanmean(all_sharpes)) if all_sharpes else np.nan,
    }
    return analysis


def create_results_summary_table_optimized(all_results: Dict[str, Dict[str, List[Dict[str, Any]]]], output_file: Optional[str] = None) -> pd.DataFrame:
    rows: List[Dict[str, Any]] = []
    for ticker, tr in all_results.items():
        for ind_name, rl in tr.items():
            df = _ensure_df(rl)
            if df.empty:
                continue
            for _, row in df.iterrows():
                rows.append({
                    'Ticker': ticker,
                    'Indicator': ind_name,
                    'Parameters': row.get('Parameters'),
                    'Sharpe_Ratio': row.get('sharpe'),
                    'Total_Return': row.get('total_return'),
                    'Max_Drawdown': row.get('max_drawdown'),
                    'Win_Rate': row.get('win_rate'),
                    'Total_Signals': row.get('total_signals'),
                    'Buy_Signals': row.get('buy_signals'),
                    'Sell_Signals': row.get('sell_signals'),
                })
    out = pd.DataFrame(rows)
    if output_file:
        os.makedirs(os.path.dirname(output_file) or '.', exist_ok=True)
        out.to_csv(output_file, index=False)
    return out


def find_similar_parameter_combinations_optimized(results_like, target_params: Dict[str, Any], max_distance: float = 5.0) -> pd.DataFrame:
    df = _ensure_df(results_like)
    if df.empty or 'Parameters' not in df.columns:
        return pd.DataFrame()

    def _dist(p1: dict, p2: dict) -> float:
        if not p1 or not p2:
            return float('inf')
        common = set(p1.keys()) & set(p2.keys())
        if not common:
            return float('inf')
        d = 0.0
        for k in common:
            try:
                d += (float(p1[k]) - float(p2[k])) ** 2
            except Exception:
                pass
        return d ** 0.5

    sims: List[Dict[str, Any]] = []
    for _, row in df.iterrows():
        dist = _dist(row['Parameters'], target_params)
        if dist <= max_distance:
            r = row.to_dict()
            r['Parameter_Distance'] = dist
            sims.append(r)
    return pd.DataFrame(sims).sort_values('Parameter_Distance')


def plot_indicator_performance_comparison_optimized(all_results: Dict[str, Dict[str, List[Dict[str, Any]]]], metric: str = 'sharpe') -> pd.DataFrame:
    plot_rows: List[Dict[str, Any]] = []
    for ticker, tr in all_results.items():
        for ind_name, rl in tr.items():
            df = _ensure_df(rl)
            if df.empty or metric not in df.columns:
                continue
            vals = pd.to_numeric(df[metric], errors='coerce').dropna().tolist()
            for v in vals:
                plot_rows.append({'Ticker': ticker, 'Indicator': ind_name, metric: v})
    return pd.DataFrame(plot_rows)

print("✅ Optimized analysis & plotting loaded.")


✅ Optimized analysis & plotting loaded.


In [5]:
# ================================================================================
# CHUNKED DATA PROCESSING FOR INDICATORS
# ================================================================================

def load_data_and_apply_indicators_chunked(
    raw_data: pd.DataFrame,
    tickers: List[str],
    apply_indicators_single_ticker_func: callable,
    chunk_size: int = 10
) -> pd.DataFrame:
    processed_chunks = []
    print(f"\n📦 Chunked processing for {len(tickers)} tickers...")
    for i in range(0, len(tickers), chunk_size):
        part = tickers[i:i + chunk_size]
        print(f"  -> {i+1}-{i+len(part)} of {len(tickers)}")
        sub = raw_data[raw_data['Ticker'].isin(part)].copy()
        tmp = []
        for t in part:
            df_t = sub[sub['Ticker'] == t].copy()
            if df_t.empty:
                continue
            try:
                tmp.append(apply_indicators_single_ticker_func(df_t))
            except Exception as e:
                print(f"    ❌ {t}: {e}")
        if tmp:
            processed_chunks.append(pd.concat(tmp, ignore_index=False))
        del sub, tmp
        gc.collect()
    if processed_chunks:
        print("✅ Chunked processing complete.")
        return pd.concat(processed_chunks, ignore_index=False)
    print("⚠️ No data processed.")
    return pd.DataFrame()


In [6]:
# ================================================================================
# USAGE EXAMPLE (SKELETON)
# ================================================================================

# Example:
# 1) Prepare data (ensure it contains 'Ticker' and 'Close' columns and is indexed by date)
# data = your_loaded_dataframe
# tickers = sorted(data['Ticker'].unique().tolist())

# 2) Optionally apply indicators in a memory-friendly way if you have a per-ticker function
# def apply_indicators_single_ticker(df_single_ticker: pd.DataFrame) -> pd.DataFrame:
#     # Example: add your technical indicators here for a single ticker
#     # Return the modified DataFrame
#     return df_single_ticker
# data_with_indicators = load_data_and_apply_indicators_chunked(data, tickers, apply_indicators_single_ticker, chunk_size=20)

# 3) Run optimized grid search
# from your_indicators_module import Indicator
# all_results = run_comprehensive_grid_search_optimized(
#     data=data,
#     indicator_params=INDICATOR_PARAMS,
#     indicator_class=Indicator,  # class that computes KAMA, Supertrend, MFI series
#     max_combinations_per_indicator=0,  # 0 means all combinations
#     stream_results_dir='grid_search_streamed_results',
#     summary_interval=100,
# )

# 4) Analyze and save results
# analysis = analyze_grid_search_results_optimized(all_results)
# summary_df = create_results_summary_table_optimized(all_results, output_file='strategy_results/performance_summary_optimized.csv')
# plot_df = plot_indicator_performance_comparison_optimized(all_results, metric='sharpe')

print("✅ TAopt_optimized.ipynb ready. Fill in your data and run the example above.")


✅ TAopt_optimized.ipynb ready. Fill in your data and run the example above.
