In [1]:
%load_ext autoreload
%autoreload 2
%reload_ext autoreload

In [2]:
import yfinance as yf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from grammar import grammar
from grammatical_evolution import map_genotype_to_fenotype
from crossovers import one_point_crossover
from mutations import random_mutation

### RSI

RSI - wskaźnik momentum, który mierzy szybkość i zmianę ruchów cenowych.
Range - 0 do 100

$$
RSI = 100 - \frac{100}{1 + RS}
$$

Gdzie

$$
RS = \frac{\text{Average Gain}}{\text{Average Loss}}
$$

In [3]:
def calculate_rsi(series, period):
    """RSI mowi nam o sile aktywow sugerujac czy sa overbought czy oversold"""
    delta = series.diff()

    gain = delta.where(delta > 0, 0)
    loss = -delta.where(delta < 0, 0)

    avg_gain = gain.rolling(window=period).mean()
    avg_loss = loss.rolling(window=period).mean()

    rs = avg_gain / (avg_loss + 1e-10)
    rsi = 100 - (100 / (1 + rs))
    
    return rsi

In [4]:
def get_data(ticker, start="2020-01-01", end="2025-01-01"):
    data = yf.download(ticker, start=start, end=end)
    data.columns = data.columns.get_level_values(0)

    data["SMA10"] = data["Close"].rolling(10).mean()
    data["SMA50"] = data["Close"].rolling(50).mean()
    data["RSI"] = calculate_rsi(data["Close"], period=14)

    data = data.dropna()
    
    return data

In [5]:
def get_fast_data(ticker, start="2020-01-01", end="2025-01-01"):
    data = yf.download(ticker, start=start, end=end)
    data.columns = data.columns.get_level_values(0)

    data["SMA10"] = data["Close"].rolling(10).mean()
    data["SMA50"] = data["Close"].rolling(50).mean()
    data["RSI"] = calculate_rsi(data["Close"], period=14)
    
    data = data.dropna()

    return {col: data[col].values for col in data.columns} # zamiana na slownik do szybszych operacji

In [6]:
def evaluate_condition(cond, row):
    if "logic_op" in cond:
        left_res = evaluate_condition(cond["left"], row)
        right_res = evaluate_condition(cond["right"], row)
        if cond["logic_op"] == "AND":
            return left_res and right_res
        elif cond["logic_op"] == "OR":
            return left_res or right_res

    left = row[cond["left"]] if cond["left"] in row else cond["left"]
    right = row[cond["right"]] if cond["right"] in row else cond["right"]

    if cond["op"] == ">":
        return left > right
    elif cond["op"] == "<":
        return left < right
    elif cond["op"] == ">=":
        return left >= right
    elif cond["op"] == "<=":
        return left <= right
    elif cond["op"] == "==":
        return left == right

In [7]:
def vectorized_evaluate_condition(cond, data_dict):
    # Jeśli to operator logiczny (AND/OR)
    if "logic_op" in cond:
        left_res = vectorized_evaluate_condition(cond["left"], data_dict)
        right_res = vectorized_evaluate_condition(cond["right"], data_dict)
        
        if cond["logic_op"] == "AND":
            return left_res & right_res # Bitowe AND na całych tablicach
        elif cond["logic_op"] == "OR":
            return left_res | right_res # Bitowe OR
            
    # Pobieranie całych tablic danych naraz
    # Left
    if isinstance(cond["left"], str) and cond["left"] in data_dict:
        left = data_dict[cond["left"]]
    else:
        left = cond["left"] # np. stała liczba (50)
        
    # Right
    if isinstance(cond["right"], str) and cond["right"] in data_dict:
        right = data_dict[cond["right"]]
    else:
        right = cond["right"]

    # Porównania na całych tablicach (NumPy broadcast)
    op = cond["op"]
    if op == ">": return left > right
    elif op == "<": return left < right
    elif op == ">=": return left >= right
    elif op == "<=": return left <= right
    elif op == "==": return left == right
    return False

In [8]:
def evaluate_strategy(strategy, row):
    if strategy["type"] == "action":
        return strategy["value"]
    
    condition = strategy["condition"]
    if evaluate_condition(condition, row):
        return evaluate_strategy(strategy["then"], row)
    else:
        return evaluate_strategy(strategy["else"], row)

In [9]:
def backtest(rule, data, initial_cash):
    """
    rule - decision rules used to calculating return
    ticker - ticker on which we will be testing rule

    return: total return by using given decision rule
    """
    cash = initial_cash
    stock = 0
    for i in range(len(data)):
        price = data.iloc[i]["Close"]
        action = evaluate_strategy(rule, data.iloc[i])
        if action == "BUY" and stock == 0:
            stock = cash / price
            cash = 0
        elif action == "SELL" and stock > 0:
            cash = stock * price
            stock = 0

    total_return = cash + stock * data.iloc[-1]["Close"] - initial_cash
    return total_return

In [10]:
test_rule = {
    "type": "if",
    "condition": {
        "left": "SMA10",
        "op": ">",
        "right": "SMA50"
    },
    "then": {
        "type": "action",
        "value": "BUY"
    },
    "else": {
        "type": "action",
        "value": "SELL"
    }
}

In [11]:
def evolution(population_size, chromosome_size, crossover_probability, mutation_probability, number_of_iterations, number_of_offspring, data, grammar, initial_cash, max_depth, crossover=None, mutation=None):
    best_objective_value = -np.inf
    best_chromosome = None
    best_rule = None
    current_population = np.random.randint(256, size=(population_size, chromosome_size))
    objective_values = np.zeros(population_size)
    for _ in range(number_of_iterations):
        # evaluating the objective function on the current population
        objective_values = np.zeros(population_size)
        for i, genotype in enumerate(current_population):
            rule = map_genotype_to_fenotype(genotype, grammar, max_depth)
            if rule is None:
                objective_values[i] = -100.0 * initial_cash
            else:
                try:
                    score = backtest(rule, data, initial_cash)
                    objective_values[i] = score
                except:
                    objective_values[i] = -100.0 * initial_cash
        
        # update best chromosome
        current_best = np.max(objective_values)
        if current_best > best_objective_value:
            best_idx = np.argmax(objective_values)
            best_chromosome = current_population[best_idx].copy()
            best_objective_value = current_best
            best_rule = map_genotype_to_fenotype(best_chromosome, grammar, max_depth)
            print(f"Znaleziono nowy najlepszy wynik: {current_best}, rule = {best_rule}")
        
        # selecting the parent indices by the roulette wheel method
        fitness_values = objective_values - objective_values.min()
        if fitness_values.sum() > 0:
            fitness_values = fitness_values / fitness_values.sum()
        else:
            fitness_values = np.ones(population_size) / population_size
        parent_indices = np.random.choice(population_size, number_of_offspring, True, fitness_values).astype(np.int64)
        parents = current_population[parent_indices]

        # creating offspring
        offspring = np.zeros((number_of_offspring, chromosome_size), dtype=np.int64)
        for i in range(int(number_of_offspring/2)):
            p1 = parents[2*i].copy()
            p2 = parents[2*i+1].copy()

            if crossover and np.random.random() < crossover_probability:
                offspring[2*i, :], offspring[2*i+1, :] = crossover(p1, p2)
            else:
                offspring[2*i, :], offspring[2*i+1, :] = p1, p2
        if np.mod(number_of_offspring, 2) == 1:
            offspring[-1, :] = current_population[parent_indices[-1], :]

        # mutation
        if mutation:
            for i in range(number_of_offspring):
                if np.random.random() < mutation_probability:
                    offspring[i, :] = mutation(offspring[i, :])
        
        current_population = offspring
    
    return best_chromosome, best_objective_value, best_rule
                

In [12]:
print(grammar)

{'<start>': [['IF', '<cond>', 'THEN', '<strategy>', 'ELSE', '<strategy>']], '<strategy>': [['IF', '<cond>', 'THEN', '<strategy>', 'ELSE', '<strategy>'], ['<action>']], '<cond>': [['<expr>', '<op>', '<expr>'], ['<cond>', 'AND', '<cond>'], ['<cond>', 'OR', '<cond>']], '<expr>': ['Close', 'SMA10', 'SMA50', 'RSI', '<const>'], '<op>': ['>', '<', '>=', '<=', '=='], '<action>': ['BUY', 'SELL', 'HOLD'], '<const>': [10, 20, 30, 40, 50, 60, 70, 80, 90, 100]}


In [14]:
print(evolution(100, 100, 0.7, 0.1, 20, 100, get_data('AAPL'), grammar, 1000, 10, crossover=one_point_crossover, mutation=random_mutation))

[*********************100%***********************]  1 of 1 completed


Znaleziono nowy najlepszy wynik: 2846.2793274204114, rule = {'type': 'if', 'condition': {'left': 'RSI', 'op': '<=', 'right': 'SMA10'}, 'then': {'type': 'action', 'value': 'HOLD'}, 'else': {'type': 'action', 'value': 'BUY'}}
Znaleziono nowy najlepszy wynik: 2988.4092339435947, rule = {'type': 'if', 'condition': {'left': {'left': 'SMA10', 'op': '<', 'right': 'SMA10'}, 'logic_op': 'OR', 'right': {'left': 'SMA10', 'op': '>', 'right': 'Close'}}, 'then': {'type': 'action', 'value': 'HOLD'}, 'else': {'type': 'if', 'condition': {'left': {'left': 'Close', 'op': '>', 'right': 'Close'}, 'logic_op': 'AND', 'right': {'left': {'left': 'RSI', 'op': '==', 'right': 'RSI'}, 'logic_op': 'OR', 'right': {'left': 'Close', 'op': '<=', 'right': 'RSI'}}}, 'then': {'type': 'action', 'value': 'BUY'}, 'else': {'type': 'action', 'value': 'BUY'}}}
(array([146, 227,  66, 251,  91, 181, 114,  91, 185,  65, 231, 251, 114,
       106, 183, 175,  20,  85,  38, 111, 253,  34, 148, 177, 180, 223,
        58, 107, 123,  93