<a href="https://colab.research.google.com/github/dbrnjd/crypto-trading-dissertation/blob/main/just_ohlcv.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [8]:
import random
import numpy as np
import pandas as pd
import collections

# --- 1. Project Setup & Configuration (Phase 1) ---
POPULATION_SIZE = 100
NUM_GENERATIONS = 30
MAX_TREE_DEPTH = 5
TOURNAMENT_SIZE = 5
ELITISM_COUNT = 2
MUTATION_RATE = 0.1
CROSSOVER_RATE = 0.8
TRANSACTION_COST = 0.0005  # 0.05% per transaction

# --- Data Configuration ---
# Update this path to where your BTC/USD daily data CSV is stored.
# The CSV must have 'Open', 'High', 'Low', 'Close', and 'Volume' columns.
DATA_FILE_PATH = "/content/drive/MyDrive/crypto_data/btc_daily.csv"
TRAINING_SPLIT = 0.8  # 80% for training, 20% for out-of-sample testing
WINDOW_LENGTH = 250  # 250 trading days for each rolling window

# --- Random Seed for Reproducibility ---
RANDOM_SEED = 42
random.seed(RANDOM_SEED)

# --- 2. Genome Representation (Genetic Programming) ---

class Node:
    """Represents a node in the expression tree."""
    def __init__(self, value, children=None):
        self.value = value
        self.children = children if children is not None else []
        self.is_terminal = len(self.children) == 0

    def __repr__(self):
        """String representation for debugging."""
        if self.is_terminal:
            return str(self.value)
        else:
            return f"({self.value} {' '.join(str(c) for c in self.children)})"

    def deep_copy(self):
        """
        Creates a deep copy of the node and its entire subtree.
        This is crucial for preventing infinite recursion during crossover.
        """
        new_children = [child.deep_copy() for child in self.children]
        return Node(self.value, new_children)

def create_random_tree(max_depth, current_depth=0, return_type='arithmetic'):
    """Generates a random expression tree with a specified return type."""
    # Define the grammar of our genetic program
    ARITHMETIC_SET = ['+', '-', '*', '/']
    LOGICAL_SET = ['>', '<']
    BOOLEAN_SET = ['AND', 'OR']
    ALL_FUNCTIONS = ARITHMETIC_SET + LOGICAL_SET + BOOLEAN_SET + ['if']
    TERMINAL_SET_OHLCV = ['Close', 'Open', 'High', 'Low', 'Volume']
    TERMINAL_SET_CONSTANTS = [round(random.uniform(-1, 2), 2) for _ in range(5)]
    LAG_RANGE = range(1, 10)

    # To maintain depth constraint and ensure a mix of functions/terminals
    if current_depth >= max_depth or (random.random() > 0.5 and return_type != 'logical'):
        # Create a terminal node
        term_type = random.choice(['OHLCV', 'Constant'])
        if term_type == 'OHLCV':
            ohlcv_value = random.choice(TERMINAL_SET_OHLCV)
            lag_node = Node(random.choice(LAG_RANGE))
            return Node(ohlcv_value, children=[lag_node])
        else:
            return Node(random.choice(TERMINAL_SET_CONSTANTS))
    else:
        # Create a function node
        if return_type == 'logical':
            # A logical tree must have a logical or boolean operator at the root
            function = random.choice(LOGICAL_SET + BOOLEAN_SET)
        else:
            # An arithmetic tree can have an arithmetic or 'if' operator
            function = random.choice(ARITHMETIC_SET + ['if'])

        children = []
        if function == 'if':
            # The 'if' node has a condition (logical), then-clause (arithmetic), and else-clause (arithmetic)
            children.append(create_random_tree(max_depth, current_depth + 1, return_type='logical'))
            children.append(create_random_tree(max_depth, current_depth + 1))
            children.append(create_random_tree(max_depth, current_depth + 1))
        elif function in LOGICAL_SET:
            # Relational operators have two arithmetic children
            children.append(create_random_tree(max_depth, current_depth + 1))
            children.append(create_random_tree(max_depth, current_depth + 1))
        elif function in BOOLEAN_SET:
            # Boolean operators have two logical children
            children.append(create_random_tree(max_depth, current_depth + 1, return_type='logical'))
            children.append(create_random_tree(max_depth, current_depth + 1, return_type='logical'))
        else:
            # Arithmetic operators have two arithmetic children
            children.append(create_random_tree(max_depth, current_depth + 1))
            children.append(create_random_tree(max_depth, current_depth + 1))

        return Node(function, children)

def safe_divide(numerator, denominator):
    """Handles division by zero by returning 1.0 if denominator is 0."""
    return numerator / denominator if denominator != 0 else 1.0

def evaluate_tree(node, ohlcv, t):
    """
    Recursively evaluates the expression tree at a given timestep.
    Includes a failsafe to ensure a float is always returned.
    """
    try:
        if node.is_terminal:
            if isinstance(node.value, (int, float)):
                return float(node.value)
            elif node.value in ['Close', 'Open', 'High', 'Low', 'Volume']:
                lag = node.children[0].value
                if t - lag >= 0:
                    return float(ohlcv[node.value][t - lag])
                else:
                    return 0.0
            # Failsafe for unexpected terminal values
            else:
                return 0.0
        else:
            op = node.value
            children = node.children

            if op == '+':
                return evaluate_tree(children[0], ohlcv, t) + evaluate_tree(children[1], ohlcv, t)
            elif op == '-':
                return evaluate_tree(children[0], ohlcv, t) - evaluate_tree(children[1], ohlcv, t)
            elif op == '*':
                return evaluate_tree(children[0], ohlcv, t) * evaluate_tree(children[1], ohlcv, t)
            elif op == '/':
                return safe_divide(evaluate_tree(children[0], ohlcv, t), evaluate_tree(children[1], ohlcv, t))
            elif op == '>':
                return 1.0 if evaluate_tree(children[0], ohlcv, t) > evaluate_tree(children[1], ohlcv, t) else 0.0
            elif op == '<':
                return 1.0 if evaluate_tree(children[0], ohlcv, t) < evaluate_tree(children[1], ohlcv, t) else 0.0
            elif op == 'AND':
                return 1.0 if evaluate_tree(children[0], ohlcv, t) and evaluate_tree(children[1], ohlcv, t) else 0.0
            elif op == 'OR':
                return 1.0 if evaluate_tree(children[0], ohlcv, t) or evaluate_tree(children[1], ohlcv, t) else 0.0
            elif op == 'if':
                condition = evaluate_tree(children[0], ohlcv, t)
                if condition:
                    return evaluate_tree(children[1], ohlcv, t)
                else:
                    return evaluate_tree(children[2], ohlcv, t)
            else:
                return 0.0
    except (IndexError, TypeError, ValueError):
        # A failsafe to catch any evaluation errors and return a neutral value
        return 0.0

def tree_to_string(node):
    """Converts the tree to a human-readable trading rule string."""
    if node.is_terminal:
        if isinstance(node.value, str) and node.children:
            return f"{node.value}[t-{node.children[0].value}]"
        else:
            return str(node.value)
    else:
        op = node.value
        if op == 'if':
            cond = tree_to_string(node.children[0])
            expr1 = tree_to_string(node.children[1])
            expr2 = tree_to_string(node.children[2])
            return f"if ({cond}) then ({expr1}) else ({expr2})"
        elif op in ['AND', 'OR', '>', '<']:
            left = tree_to_string(node.children[0])
            right = tree_to_string(node.children[1])
            return f"({left} {op} {right})"
        else:
            # Arithmetic operators
            left = tree_to_string(node.children[0])
            right = tree_to_string(node.children[1])
            return f"({left} {op} {right})"

# --- 3. Genetic Operators (Crossover & Mutation) ---

def get_random_node(root):
    """Helper to select a random node from the tree."""
    nodes = []
    def traverse(node):
        nodes.append(node)
        for child in node.children:
            traverse(child)
    traverse(root)
    return random.choice(nodes)

def crossover(parent1, parent2):
    """Performs crossover by swapping sub-trees."""
    # Create a deep copy of the parent to prevent modifying it in place
    child = parent1.deep_copy()
    crossover_point_p2 = get_random_node(parent2)

    # Find a random node in the child to replace
    crossover_point_child = get_random_node(child)

    # Replace the subtree
    crossover_point_child.value = crossover_point_p2.value
    crossover_point_child.children = crossover_point_p2.children[:]

    return child

def mutate(individual):
    """Mutates a random node in the tree."""
    # Create a deep copy of the individual to prevent modifying it in place
    mutated_individual = individual.deep_copy()
    mutation_point = get_random_node(mutated_individual)

    # Create a new sub-tree to replace the mutation point
    new_sub_tree = create_random_tree(MAX_TREE_DEPTH)

    mutation_point.value = new_sub_tree.value
    mutation_point.children = new_sub_tree.children[:]

    return mutated_individual

# --- 4. Fitness Function & Backtesting ---

def compute_sharpe_ratio(returns):
    """Calculates the annualised Sharpe Ratio."""
    if len(returns) < 2:
        return 0
    daily_returns = np.array(returns)
    if np.std(daily_returns) == 0:
        return 0
    # Annualised return and standard deviation
    annualised_return = np.mean(daily_returns) * np.sqrt(252)
    annualised_std_dev = np.std(daily_returns) * np.sqrt(252)
    return annualised_return / annualised_std_dev

def backtest(strategy, ohlcv):
    """
    Backtests a single strategy on the provided OHLCV data.
    The strategy produces a trading signal (+1, -1, 0)
    which is then converted to a position and returns.
    """
    position = 0  # 1 for long, -1 for short, 0 for cash
    daily_returns = []
    max_lag = 10  # A safe max lag based on current terminal set
    if len(ohlcv['Close']) <= max_lag:
        return []

    for t in range(max_lag, len(ohlcv['Close'])):
        # Calculate daily change for return calculation
        price_change = ohlcv['Close'][t] - ohlcv['Close'][t-1]

        # Calculate returns based on previous day's position
        daily_return = position * price_change / ohlcv['Close'][t-1]

        # Evaluate the strategy to get the new signal
        signal_raw = evaluate_tree(strategy, ohlcv, t)

        # Convert raw signal to a simplified trading signal
        new_position = 0
        if signal_raw > 0:
            new_position = 1
        elif signal_raw < 0:
            new_position = -1

        # Apply transaction cost only if position changes
        if abs(new_position - position) > 0:
            daily_return -= TRANSACTION_COST * abs(new_position - position)

        daily_returns.append(daily_return)
        position = new_position

    return daily_returns

def evaluate_agent(agent, full_ohlcv_data, window_length):
    """Evaluates an agent on multiple rolling windows."""
    all_returns = []
    for i in range(len(full_ohlcv_data) - window_length):
        window_data = full_ohlcv_data.iloc[i:i + window_length]

        # Convert DataFrame window to dict for tree evaluation
        window_ohlcv_dict = {
            'Open': window_data['Open'].tolist(),
            'High': window_data['High'].tolist(),
            'Low': window_data['Low'].tolist(),
            'Close': window_data['Close'].tolist(),
            'Volume': window_data['Volume'].tolist()
        }

        returns = backtest(agent, window_ohlcv_dict)
        all_returns.extend(returns)

    return compute_sharpe_ratio(all_returns)


# --- The Main Evolutionary Loop ---

def main():
    """Main function to run the genetic algorithm."""

    # Load and split data
    try:
        df = pd.read_csv(DATA_FILE_PATH, parse_dates=['Date']).sort_values('Date')
        df.set_index('Date', inplace=True)
        # Rename the 'Volume BTC' column to 'Volume' to be used in the algorithm
        if 'Volume BTC' in df.columns:
            df.rename(columns={'Volume BTC': 'Volume'}, inplace=True)
        # Ensure the 'Volume' column exists for the algorithm
        if 'Volume' not in df.columns:
            print("Error: The CSV file does not contain a 'Volume' or 'Volume BTC' column.")
            return
    except FileNotFoundError:
        print(f"Error: The file {DATA_FILE_PATH} was not found. Please update the path.")
        return

    # Split into training and out-of-sample sets
    train_size = int(len(df) * TRAINING_SPLIT)
    train_data = df.iloc[:train_size]
    test_data = df.iloc[train_size:]

    print(f"Training data size: {len(train_data)} days")
    print(f"Out-of-sample data size: {len(test_data)} days")

    # Initialize population with random trees
    population = [create_random_tree(MAX_TREE_DEPTH) for _ in range(POPULATION_SIZE)]

    for generation in range(NUM_GENERATIONS):
        print(f"--- Generation {generation+1}/{NUM_GENERATIONS} ---")

        # Evaluate fitness for each individual on rolling windows
        fitness_scores = [evaluate_agent(ind, train_data, WINDOW_LENGTH) for ind in population]

        # Find the best individual and its score
        best_individual_index = np.argmax(fitness_scores)
        best_individual = population[best_individual_index]
        best_score = fitness_scores[best_individual_index]

        avg_score = np.mean(fitness_scores)

        print(f"Average Fitness: {avg_score:.4f}")
        print(f"Best Fitness: {best_score:.4f}")

        # Selection (Tournament Selection with Elitism)
        new_population = []

        # Elitism: carry over the top performers
        elite_indices = np.argsort(fitness_scores)[-ELITISM_COUNT:]
        for idx in elite_indices:
            new_population.append(population[idx])

        # Fill the rest of the population
        while len(new_population) < POPULATION_SIZE:
            tournament_candidates = random.sample(list(zip(population, fitness_scores)), TOURNAMENT_SIZE)
            winner = max(tournament_candidates, key=lambda x: x[1])[0]
            new_population.append(winner)

        # Crossover
        offspring_population = []
        for i in range(0, POPULATION_SIZE, 2):
            p1 = new_population[i]
            p2 = new_population[i+1] if i + 1 < POPULATION_SIZE else None

            if p2 is not None and random.random() < CROSSOVER_RATE:
                child1 = crossover(p1, p2)
                child2 = crossover(p2, p1)
                offspring_population.extend([child1, child2])
            else:
                offspring_population.append(p1)
                if p2 is not None:
                    offspring_population.append(p2)

        # Mutation
        for i in range(len(offspring_population)):
            if random.random() < MUTATION_RATE:
                offspring_population[i] = mutate(offspring_population[i])

        population = offspring_population

    # --- Final Outputs ---

    final_fitness = [evaluate_agent(ind, train_data, WINDOW_LENGTH) for ind in population]
    final_best_index = np.argmax(final_fitness)
    final_best_strategy = population[final_best_index]
    final_best_rule_string = tree_to_string(final_best_strategy)

    # In-sample (IS) performance on the full training set
    is_returns = backtest(final_best_strategy, {col: train_data[col].tolist() for col in train_data.columns})
    is_sharpe = compute_sharpe_ratio(is_returns)

    # Out-of-sample (OOS) performance on the held-out test set
    oos_returns = backtest(final_best_strategy, {col: test_data[col].tolist() for col in test_data.columns})
    oos_sharpe = compute_sharpe_ratio(oos_returns)

    print("\n" + "="*50)
    print("           Final Experiment Results            ")
    print("="*50)
    print(f"In-Sample Sharpe Ratio: {is_sharpe:.4f}")
    print(f"Out-of-sample Sharpe Ratio: {oos_sharpe:.4f}")
    print("\n--- Final Best Strategy Rule ---")
    print(final_best_rule_string)
    print("="*50)

if __name__ == "__main__":
    main()


Training data size: 3127 days
Out-of-sample data size: 782 days
--- Generation 1/30 ---
Average Fitness: 0.0030
Best Fitness: 0.0597
--- Generation 2/30 ---
Average Fitness: 0.0400
Best Fitness: 0.0597
--- Generation 3/30 ---
Average Fitness: 0.0448
Best Fitness: 0.0606
--- Generation 4/30 ---
Average Fitness: 0.0394
Best Fitness: 0.0606
--- Generation 5/30 ---
Average Fitness: 0.0465
Best Fitness: 0.0601
--- Generation 6/30 ---
Average Fitness: 0.0430
Best Fitness: 0.0601
--- Generation 7/30 ---
Average Fitness: 0.0346
Best Fitness: 0.0601
--- Generation 8/30 ---
Average Fitness: 0.0310
Best Fitness: 0.0601
--- Generation 9/30 ---
Average Fitness: 0.0340
Best Fitness: 0.0601
--- Generation 10/30 ---
Average Fitness: 0.0418
Best Fitness: 0.0601
--- Generation 11/30 ---
Average Fitness: 0.0364
Best Fitness: 0.0601
--- Generation 12/30 ---
Average Fitness: 0.0322
Best Fitness: 0.0602
--- Generation 13/30 ---
Average Fitness: 0.0352
Best Fitness: 0.0602
--- Generation 14/30 ---
Average Fi