In [22]:
import yfinance as yf
import pandas as pd
import numpy as np
import random
import heapq
import itertools
from collections import deque

In [23]:
def get_stock_data(ticker, start_date, end_date):
    """
    Downloads historical stock data from Yahoo Finance.
    """
    print(f"Downloading data for {ticker} from {start_date} to {end_date}...")
    data = yf.download(ticker, start=start_date, end=end_date, auto_adjust=True)
    if data.empty:
        raise ValueError(f"No data found for ticker {ticker}. Check the ticker symbol.")
    print("Data download complete.")
    return data

In [None]:
def run_backtest(data, short_window, long_window):
    """
    Simulates a moving average crossover strategy and returns the final profit.
    """
    # Parameter sanity check
    if short_window >= long_window or short_window < 5 or long_window < 10:
        return -float('inf')

    # --- Signal Generation ---
    signals = pd.DataFrame(index=data.index)
    signals['Close'] = data['Close']
    signals['short_mavg'] = data['Close'].rolling(window=short_window, min_periods=1, center=False).mean()
    signals['long_mavg'] = data['Close'].rolling(window=long_window, min_periods=1, center=False).mean()
    
    # Create signal: 1 when short > long, 0 otherwise
    signals['signal'] = 0.0
    signals.loc[signals.index[short_window:], 'signal'] = np.where(
        signals['short_mavg'].iloc[short_window:] > signals['long_mavg'].iloc[short_window:], 1.0, 0.0
    )
    
    # Generate trading orders: 1 for buy, -1 for sell
    signals['positions'] = signals['signal'].diff()

    # --- Portfolio Calculation ---
    initial_capital = 10000.0
    portfolio = pd.DataFrame(index=signals.index).fillna(0.0)
    
    # Create a position column: 10 shares if signal is 1, 0 otherwise
    portfolio['positions'] = signals['signal'] * 10
    
    # Calculate the cash change from trades
    portfolio['cash_diff'] = -signals['positions'] * portfolio['positions'].shift(1) * signals['Close']
    
    # Calculate cumulative cash
    portfolio['cash'] = initial_capital + portfolio['cash_diff'].cumsum()
    
    # Calculate holdings value
    portfolio['holdings'] = portfolio['positions'] * signals['Close']
    
    # Total portfolio value
    portfolio['total'] = portfolio['cash'] + portfolio['holdings']
    
    final_profit = portfolio['total'].iloc[-1] - initial_capital
    
    return final_profit

In [25]:
def get_neighbors(state, step=1):
    """
    Generates neighboring states by slightly changing one parameter at a time.
    """
    neighbors = []
    neighbors.append({'short_window': state['short_window'] + step, 'long_window': state['long_window']})
    neighbors.append({'short_window': state['short_window'] - step, 'long_window': state['long_window']})
    neighbors.append({'short_window': state['short_window'], 'long_window': state['long_window'] + step})
    neighbors.append({'short_window': state['short_window'], 'long_window': state['long_window'] - step})
    return [n for n in neighbors if n['short_window'] > 0 and n['long_window'] > n['short_window']]

In [26]:
def hill_climbing_search(data, max_iterations=100):
    """
    Finds optimal parameters using the Hill Climbing algorithm.
    """
    print("\n--- Starting Hill Climbing Search ---")
    current_state = {'short_window': random.randint(10, 40), 'long_window': random.randint(50, 200)}
    current_score = run_backtest(data, **current_state)
    print(f"Starting at: {current_state} with score: ${current_score:,.2f}")

    for i in range(max_iterations):
        neighbors = get_neighbors(current_state)
        if not neighbors:
            break
        best_neighbor = max(neighbors, key=lambda state: run_backtest(data, **state))
        best_neighbor_score = run_backtest(data, **best_neighbor)
        if best_neighbor_score <= current_score:
            print("Reached a local maximum.")
            break
        current_state = best_neighbor
        current_score = best_neighbor_score
        print(f"Iter {i+1}: Moved to {current_state} with score: ${current_score:,.2f}")

    print(f"--- Hill Climbing Finished ---")
    print(f"Optimal parameters: {current_state}, Profit: ${current_score:,.2f}")
    return current_state, current_score

In [27]:
def local_beam_search(data, beam_width=5, max_iterations=50):
    print("\n--- Starting Local Beam Search ---")
    beam = [{'short_window': random.randint(10, 40), 'long_window': random.randint(50, 200)} for _ in range(beam_width)]
    for i in range(max_iterations):
        all_neighbors = []
        for state in beam:
            all_neighbors.extend(get_neighbors(state))
        unique_neighbors = [dict(t) for t in {tuple(d.items()) for d in all_neighbors}]
        if not unique_neighbors:
            print("No new neighbors to explore.")
            break
        beam = heapq.nlargest(beam_width, unique_neighbors, key=lambda s: run_backtest(data, **s))
        best_in_beam = beam[0]
        best_score = run_backtest(data, **best_in_beam)
        print(f"Iter {i+1}: Best in beam is {best_in_beam} with score: ${best_score:,.2f}")
    final_best_state = beam[0]
    final_best_score = run_backtest(data, **final_best_state)
    print(f"--- Local Beam Search Finished ---")
    print(f"Optimal parameters: {final_best_state}, Profit: ${final_best_score:,.2f}")
    return final_best_state, final_best_score

In [28]:
def a_star_search(data, start_state, max_iterations=500):
    """
    Finds optimal parameters using A* search.
    """
    print("\n--- Starting A* Search ---")
    tie_breaker = itertools.count()
    frontier = []
    g_score = 0
    h_score = -run_backtest(data, **start_state)
    f_score = g_score + h_score
    heapq.heappush(frontier, (f_score, next(tie_breaker), g_score, start_state))
    visited = {tuple(start_state.items())}
    best_state_so_far = start_state
    best_score_so_far = -h_score
    print(f"Starting at: {start_state} with score: ${best_score_so_far:,.2f}")

    for _ in range(max_iterations):
        if not frontier:
            print("Frontier is empty.")
            break
        _, _, current_g, current_state = heapq.heappop(frontier)
        for neighbor in get_neighbors(current_state):
            neighbor_tuple = tuple(neighbor.items())
            if neighbor_tuple in visited:
                continue
            visited.add(neighbor_tuple)
            neighbor_g = current_g + 1
            neighbor_h = -run_backtest(data, **neighbor)
            neighbor_f = neighbor_g + neighbor_h
            heapq.heappush(frontier, (neighbor_f, next(tie_breaker), neighbor_g, neighbor))
            if -neighbor_h > best_score_so_far:
                best_score_so_far = -neighbor_h
                best_state_so_far = neighbor
                print(f"New best found: {best_state_so_far} with score: ${best_score_so_far:,.2f}")

    print(f"--- A* Search Finished ---")
    print(f"Optimal parameters: {best_state_so_far}, Profit: ${best_score_so_far:,.2f}")
    return best_state_so_far, best_score_so_far

In [29]:
if __name__ == "__main__":
    TICKER = 'AAPL'
    START_DATE = '2020-01-01'
    END_DATE = '2023-12-31'
    
    try:
        stock_data = get_stock_data(TICKER, START_DATE, END_DATE)
        hc_params, hc_profit = hill_climbing_search(stock_data)
        beam_params, beam_profit = local_beam_search(stock_data, beam_width=5)
        astar_start_state = {'short_window': 20, 'long_window': 50}
        astar_params, astar_profit = a_star_search(stock_data, start_state=astar_start_state)
        
        print("\n\n" + "="*50)
        print("          FINAL RESULTS SUMMARY")
        print("="*50)
        print(f"Stock Ticker: {TICKER}")
        print(f"Date Range: {START_DATE} to {END_DATE}")
        print("-"*50)
        print(f"Hill Climbing Result:")
        print(f"  Parameters: {hc_params}")
        print(f"  Profit: ${hc_profit:,.2f}")
        print("-"*50)
        print(f"Local Beam Search (width=5) Result:")
        print(f"  Parameters: {beam_params}")
        print(f"  Profit: ${beam_profit:,.2f}")
        print("-"*50)
        print(f"A* Search Result:")
        print(f"  Parameters: {astar_params}")
        print(f"  Profit: ${astar_profit:,.2f}")
        print("="*50)
        
    except Exception as e:
        print(f"\nAn error occurred: {e}")

[*********************100%***********************]  1 of 1 completed

Downloading data for AAPL from 2020-01-01 to 2023-12-31...
Data download complete.

--- Starting Hill Climbing Search ---
Starting at: {'short_window': 33, 'long_window': 164} with score: $7,075.31
Iter 1: Moved to {'short_window': 32, 'long_window': 164} with score: $7,153.37
Iter 2: Moved to {'short_window': 31, 'long_window': 164} with score: $8,365.55
Reached a local maximum.
--- Hill Climbing Finished ---
Optimal parameters: {'short_window': 31, 'long_window': 164}, Profit: $8,365.55

--- Starting Local Beam Search ---





Iter 1: Best in beam is {'short_window': 12, 'long_window': 105} with score: $13,131.24
Iter 2: Best in beam is {'short_window': 12, 'long_window': 104} with score: $13,131.24
Iter 3: Best in beam is {'short_window': 11, 'long_window': 106} with score: $13,131.24
Iter 4: Best in beam is {'short_window': 10, 'long_window': 106} with score: $14,564.04
Iter 5: Best in beam is {'short_window': 12, 'long_window': 105} with score: $13,131.24
Iter 6: Best in beam is {'short_window': 10, 'long_window': 108} with score: $14,813.08
Iter 7: Best in beam is {'short_window': 12, 'long_window': 105} with score: $13,131.24
Iter 8: Best in beam is {'short_window': 10, 'long_window': 106} with score: $14,564.04
Iter 9: Best in beam is {'short_window': 5, 'long_window': 106} with score: $14,648.89
Iter 10: Best in beam is {'short_window': 5, 'long_window': 107} with score: $15,869.88
Iter 11: Best in beam is {'short_window': 5, 'long_window': 108} with score: $14,740.65
Iter 12: Best in beam is {'short_