<a href="https://colab.research.google.com/github/esaare/cocalc-doc/blob/master/Stock_Anaysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# @title Dependancies
from eod import EodHistoricalData
import pandas as pd
import vectorbt as vbt
from plotly.subplots import make_subplots
from datetime import datetime, timedelta
import numpy as np
from deap import base, creator, tools, algorithms
from functools import partial
from scipy.optimize import differential_evolution
from typing import Dict, Any
from enum import Enum
import csv
from itertools import product
import time
from pyswarm import pso
import sys
import os
from contextlib import contextmanager
from skopt import gp_minimize
from skopt.space import Integer
from scipy.optimize import dual_annealing
import matplotlib.pyplot as plt
import plotly.graph_objs as go
import warnings
import itertools
import random
from geneticalgorithm import geneticalgorithm as ga

# Place this after all other import statements
warnings.filterwarnings("ignore",
                        category=UserWarning,
                        module="skopt.optimizer.optimizer")

In [None]:
# @title Input
ticker = 'crr.au'
years = 10
train_ratio = 0.9
method = 'GA'

In [None]:
# @title Classes
class Config:

    def api_key(self):
        return '61ab5c110d1837.55723794'


class Backtester:

    def __init__(self,
                 ticker,
                 years,
                 train_ratio,
                 init_cash=1000,
                 fees=0.001,
                 fast_window=10,
                 slow_window=50):

        self.ticker = ticker
        self.years = years
        self.init_cash = init_cash
        self.fees = fees
        self.fast_window = fast_window
        self.slow_window = slow_window
        self.train_ratio = train_ratio
        self.client = EodHistoricalData(api_key=Config().api_key())
        self.load_data()

    def load_data(self):
        # Calculate the start date based on the given years
        end_date = datetime.now()
        start_date = end_date - timedelta(days=365 * self.years)

        # Get historical prices
        price = self.client.get_prices_eod(self.ticker, order='asc')
        price = pd.DataFrame(price)
        price['date'] = pd.to_datetime(price['date'])
        price.set_index('date', inplace=True)

        # Filter data for the specified time range
        self.price = price.loc[start_date:end_date]

        # Split data into training and testing
        split_index = int(len(self.price) * self.train_ratio)
        self.train_data = self.price.iloc[:split_index]
        self.test_data = self.price.iloc[split_index:]

        # Add training start and finishing dates
        self.train_start_date = self.train_data.index[0]
        self.train_end_date = self.train_data.index[-1]

    def calculate_signals(self, data):
        fast_ma = vbt.MA.run(data['adjusted_close'], window=self.fast_window)
        slow_ma = vbt.MA.run(data['adjusted_close'], window=self.slow_window)
        entries = fast_ma.ma_above(slow_ma)
        exits = fast_ma.ma_below(slow_ma)

        # Add moving averages to the data
        data['Fast_MA'] = fast_ma.ma
        data['Slow_MA'] = slow_ma.ma

        # Store the dataframe as an instance variable
        self.processed_data = data

        return entries, exits

    def run_backtest(self, data):
        entries, exits = self.calculate_signals(data)
        portfolio = vbt.Portfolio.from_signals(data['adjusted_close'],
                                               entries,
                                               exits,
                                               init_cash=self.init_cash,
                                               fees=self.fees,
                                               freq='D')
        return portfolio

    def plot_results(self, portfolio):
        fig = portfolio.plot()

        # Add Fast MA
        fig.add_trace(
            go.Scatter(x=self.processed_data.index,
                       y=self.processed_data['Fast_MA'],
                       mode='lines',
                       name='Fast MA',
                       line=dict(width=1)))

        # Add Slow MA
        fig.add_trace(
            go.Scatter(x=self.processed_data.index,
                       y=self.processed_data['Slow_MA'],
                       mode='lines',
                       name='Slow MA',
                       line=dict(width=1)))
        fig.update_layout(height=900, width=1600,template="simple_white+gridon")
        fig.update_xaxes(title_text="")


        # Add dashed line at y = 0 for the middle plot
        fig.add_shape(type="line",
                      x0=0,
                      y0=0,
                      x1=1,
                      y1=0,
                      line=dict(color="black", width=2, dash="dash"),
                      xref="paper",
                      yref="y2")

        fig.show()

    def print_results(self, portfolio):
        print(f"\033[1m{self.ticker}\033[0m")
        print(f"Total Return: {portfolio.total_return() * 100:.1f}%")
        print(
            f"Total Benchmark Return: {portfolio.total_benchmark_return() * 100:.1f}%"
        )
        print(f"Sharpe Ratio: {portfolio.sharpe_ratio():.2f}")


    def get_trade_data(self, portfolio):
        trades_df = portfolio.trades.records_readable
        trades_df = trades_df[['Size', 'Entry Timestamp']]
        return trades_df


class BaseOptimizer:

    def __init__(self, ticker, years, train_ratio):
        self.ticker = ticker
        self.years = years
        self.train_ratio = train_ratio
        self.backtester = Backtester(ticker, years, train_ratio)
        self.train_data = self.backtester.train_data
        self.test_data = self.backtester.test_data
        self.train_data = pd.DataFrame(self.train_data)
        self.test_data = pd.DataFrame(self.test_data)
        self.counter = 0

    def evaluate(self, individual):
        self.counter += 1
        try:
            fast_window, slow_window = map(int, individual)
        except ValueError:
            # Return a large negative value to indicate a bad solution
            return float('-inf')

        sys.stdout.write(
            f"\rEvaluation: {self.counter}  Fast window: {fast_window}  Slow window: {slow_window}  "
        )
        sys.stdout.flush()

        self.backtester.fast_window = fast_window
        self.backtester.slow_window = slow_window

        portfolio = self.backtester.run_backtest(self.train_data)
        sharpe = portfolio.sharpe_ratio()

        # Handle infinity and very large values
        if np.isinf(sharpe) or np.abs(sharpe) > 1e100:
            return -1e100  # Return a large negative value for minimization

        return -sharpe  # Negate for minimization


class ParticleSwarmOptimizer(BaseOptimizer):

    def __init__(self, ticker, years, train_ratio, n_particles, n_iter, c1, c2,
                 w, mf):
        super().__init__(ticker, years, train_ratio)
        self.n_particles = n_particles
        self.n_iter = n_iter
        self.c1 = c1
        self.c2 = c2
        self.w = w
        self.mf = mf

    def optimize(self):
        with run_timer("Particle Swarm Optimization (PSO)"):
            # Define search space boundaries
            lb = (5, 10)  # Lower bounds for fast_window and slow_window
            ub = (100, 200)  # Upper bounds for fast_window and slow_window

            # Perform PSO
            best_params, _ = pso(self.evaluate,
                                 lb,
                                 ub,
                                 swarmsize=self.n_particles,
                                 maxiter=self.n_iter,
                                 phip=self.c1,
                                 phig=self.c2,
                                 omega=self.w,
                                 minfunc=self.mf)

        return best_params


class MovingAverageOptimizerDE(BaseOptimizer):

    def __init__(self, ticker, years, train_ratio, strategy):
        super().__init__(ticker, years, train_ratio)
        self.strategy = strategy

    def optimize(self):
        with run_timer("Differential Evolution (DE) Optimization"):
            # Define search space boundaries
            bounds = [(5, 100),
                      (10, 200)]  # Bounds for fast_window and slow_window

            # Perform Differential Evolution
            result = differential_evolution(self.evaluate,
                                            bounds,
                                            strategy=self.strategy)

        return result.x


class BayesianOptimizer(BaseOptimizer):

    def __init__(self, ticker, years, train_ratio, n_calls, random_state):
        super().__init__(ticker, years, train_ratio)
        self.n_calls = n_calls
        self.random_state = random_state

    def optimize(self):
        with run_timer("Bayesian Optimization"):
            # Define search space
            space = [
                Integer(5, 100, name='fast_window'),
                Integer(10, 200, name='slow_window')
            ]

            # Perform Bayesian Optimization
            result = gp_minimize(self.evaluate,
                                 space,
                                 n_calls=self.n_calls,
                                 random_state=self.random_state)

        return result.x


class SimmulatedAnnealingOptimizer(BaseOptimizer):

    def __init__(self, ticker, years, train_ratio, maxiter, initial_temp,
                 seed):
        super().__init__(ticker, years, train_ratio)
        self.maxiter = maxiter
        self.initial_temp = initial_temp
        self.seed = seed

    def optimize(self):
        with run_timer("Simulated Annealing (SA)"):
            bounds = [(5, 100),
                      (10, 200)]  # Bounds for fast_window and slow_window

            # Perform Simulated Annealing
            result = dual_annealing(self.evaluate_sa,
                                    bounds,
                                    maxiter=self.maxiter,
                                    initial_temp=self.initial_temp,
                                    seed=self.seed)

            # Check if the result is valid
            if np.isnan(result.x).any():
                print("Optimization failed to find a valid solution.")
                return None

        return result.x

    def evaluate_sa(self, params):
        sharpe_ratio = self.evaluate(params)
        if np.isfinite(sharpe_ratio):
            return -sharpe_ratio
        else:
            return np.inf


class GridOptimizer(BaseOptimizer):

    def __init__(self, ticker, years, train_ratio, fast_range, slow_range):
        super().__init__(ticker, years, train_ratio)
        # Ensure fast_range and slow_range are ranges or lists
        self.fast_range = range(fast_range[0],
                                fast_range[1] + 1)  # Make it inclusive
        self.slow_range = range(slow_range[0],
                                slow_range[1] + 1)  # Make it inclusive

    def optimize(self):
        with run_timer("Grid Optimization"):
            best_sharpe = float('-inf')
            best_params = None  # Initialize to None for failure case

            for fast_window in self.fast_range:
                for slow_window in self.slow_range:  # Nested loops for clarity
                    if fast_window < slow_window:
                        sharpe_ratio = self.evaluate(
                            (fast_window, slow_window))
                        if sharpe_ratio > best_sharpe:
                            best_sharpe = sharpe_ratio
                            best_params = (fast_window, slow_window)

        return best_params  # Return the best parameters


class GeneticAlgorithimOptimizer(BaseOptimizer):

    def __init__(self, ticker, years, train_ratio, generations,
                 population_size):
        super().__init__(ticker, years, train_ratio)
        self.generations = generations
        self.population_size = population_size

    def optimize(self):
        with run_timer("Genetic Algorithm Optimization"):
            varbound = np.array([[5, 100], [10, 200]
                                 ])  # Bounds for fast_window and slow_window

            algorithm_param = {
                'max_num_iteration': self.generations,
                'population_size': self.population_size,
                'elit_ratio': 0.01,
                'parents_portion': 0.3,
                'mutation_probability': 0.1,
                'crossover_probability': 0.5,
                'selection_type': 1,
                'crossover_type': 'uniform',
                'mutation_type': 'uniform',
                'max_iteration_without_improv': 100
            }

            model = ga(function=self.evaluate,
                       dimension=2,
                       variable_type='int',
                       variable_boundaries=varbound,
                       algorithm_parameters=algorithm_param)

            model.run()
            best_params = model.best_variable
            return best_params


class NelderMeadOptimizer(BaseOptimizer):

    def __init__(self,
                 ticker,
                 years,
                 train_ratio,
                 max_iter=1000,
                 initial_simplex=None):
        """
        Initializes the Nelder-Mead optimizer class with stock data and optimizer parameters.

        Args:
            ticker (str): The stock ticker.
            years (int): Number of years of historical data to use.
            train_ratio (float): Ratio of data to be used for training.
            max_iter (int): Maximum number of iterations for the optimizer.
            initial_simplex (list of tuples): Initial simplex points. Each tuple corresponds to (fast_window, slow_window).
        """
        super().__init__(ticker, years, train_ratio)
        self.max_iter = max_iter
        self.initial_simplex = initial_simplex if initial_simplex else [(10,
                                                                         30),
                                                                        (20,
                                                                         60),
                                                                        (15,
                                                                         45)]

    def optimize(self):
        """
        Execute the optimization process using the Nelder-Mead algorithm.

        Returns:
            numpy.ndarray: The best parameters discovered (fast_window, slow_window).
        """
        from scipy.optimize import minimize

        # Define objective function to be minimized (negative of Sharpe Ratio)
        def objective(x):
            # Ensure the windows remain integers and comply with logical constraint
            fast, slow = int(x[0]), int(x[1])
            if fast >= slow:
                return float('inf')  # Constraint handling
            return -self.evaluate((fast, slow))

        # Run the Nelder-Mead algorithm
        result = minimize(
            objective,
            x0=np.mean(self.initial_simplex,
                       axis=0),  # Start from the mean of the initial simplex
            method='Nelder-Mead',
            options={
                'maxiter': self.max_iter,
                'initial_simplex': self.initial_simplex
            })

        if not result.success:
            print("Optimization did not converge: ", result.message)

        # Return the best parameters found
        return result.x if result.success else None


class LogOptimizationResults:

    def __init__(self,
                 file_path="/home/user/Stocks/datasets/sma_optimisation.csv"):
        self.file_path = file_path
        self.fields = [
            'ticker', 'date', 'years', 'train_ratio', 'method', 'fast_window',
            'slow_window', 'total_return', 'sharpe_ratio'
        ]
        self._create_file_if_not_exists()

    def _create_file_if_not_exists(self):
        if not os.path.exists(self.file_path):
            with open(self.file_path, 'w', newline='') as file:
                writer = csv.DictWriter(file, fieldnames=self.fields)
                writer.writeheader()

    def add_result(self, ticker, date, years, train_ratio, method, fast_window,
                   slow_window, total_return, sharpe_ratio):
        with open(self.file_path, 'a', newline='') as file:
            writer = csv.DictWriter(file, fieldnames=self.fields)
            writer.writerow({
                'ticker': ticker,
                'date': date,
                'years': years,
                'train_ratio': train_ratio,
                'method': method,
                'fast_window': fast_window,
                'slow_window': slow_window,
                'total_return': total_return,
                'sharpe_ratio': sharpe_ratio
            })


@contextmanager
def run_timer(message=method):
    """A context manager that records and prints the duration of a code block."""
    start_time = time.time()
    formatted_start_time = datetime.now().strftime('%H:%M:%S')
    print(f'{message} commenced: {formatted_start_time} GMT')
    try:
        yield
    finally:
        end_time = time.time()
        formatted_end_time = datetime.fromtimestamp(end_time).strftime(
            '%H:%M:%S')
        elapsed_time = end_time - start_time
        minutes, seconds = divmod(int(elapsed_time), 60)
        print(
            f"\n{message} completed: {formatted_end_time} GMT \nDuration: {minutes} minutes {seconds} seconds"
        )

In [None]:
# @title Run Optimizer
# Initialize optimizers
optimizer_pos = ParticleSwarmOptimizer(ticker,
                                       years,
                                       train_ratio,
                                       n_particles=1000,
                                       n_iter=500,
                                       c1=0.5,
                                       c2=0.3,
                                       w=0.9,
                                       mf=500)
optimizer_de = MovingAverageOptimizerDE(ticker,
                                        years,
                                        train_ratio,
                                        strategy='rand2bin')
optimizer_bayes = BayesianOptimizer(ticker,
                                    years,
                                    train_ratio,
                                    n_calls=100,
                                    random_state=42)
optimizer_sa = SimmulatedAnnealingOptimizer(ticker,
                                        years,
                                        train_ratio,
                                        maxiter=1000,
                                        initial_temp=5230,
                                        seed=42)
optimizer_grid = GridOptimizer(ticker,
                               years,
                               train_ratio,
                               fast_range=(5, 100),
                               slow_range=(10, 200))
optimizer_ga = GeneticAlgorithimOptimizer(ticker,
                                          years,
                                          train_ratio,
                                          generations=100,
                                          population_size=500)

# Dictionary to map methods to optimizers
optimizers = {
    'POS': optimizer_pos,
    'DE': optimizer_de,
    'BAYES': optimizer_bayes,
    'SA': optimizer_sa,
    'GRID': optimizer_grid,
    'GA': optimizer_ga
}

# Select optimizer based on method
optimizer = optimizers.get(method)

# Run the optimization
best_params = optimizer.optimize()
best_params = [int(value) for value in best_params]

# Print the best parameters
print(
    f"Best parameters: fast_window: {best_params[0]}, slow_window: {best_params[1]}"
)

In [None]:
# @title Output
## Print & Plot ##

# Use the best parameters to run a backtest on the test data
optimizer.backtester.fast_window = best_params[0]
optimizer.backtester.slow_window = best_params[1]
portfolio = optimizer.backtester.run_backtest(optimizer.test_data)

# Print backtest results
optimizer.backtester.print_results(portfolio)
print(
    f"fast_window: {best_params[0]}, slow_window: {best_params[1]}"
)

# Plot backtest results
optimizer.backtester.plot_results(portfolio)
