In [None]:
import nbimporter
import numpy as np
import pandas as pd
import stats_helper as sh
import preprocessing as pp
import helper_methods as hm
from matplotlib import pyplot as plt
from sklearn.metrics import accuracy_score

In [None]:
from sklearn.neighbors import KNeighborsClassifier
from sklearn.neural_network import MLPClassifier
from sklearn import svm

## Backtesting Portfolio Performance
https://www.quantstart.com/articles/Backtesting-a-Forecasting-Strategy-for-the-SP500-in-Python-with-pandas

In [None]:
class MarketIntradayPortfolio():
    """Buys or sells 500 shares of an asset at the opening price of
    every bar, depending upon the direction of the forecast, closing 
    out the trade at the close of the bar.

    Requires:
    symbol - A stock symbol which forms the basis of the portfolio.
    bars - A DataFrame of bars for a symbol set.
    signals - A pandas DataFrame of signals (1, -1) for each symbol.
    initial_capital - The amount in cash at the start of the portfolio."""

    def __init__(self, symbol_name, bars, signals, with_benchmark = True, initial_capital=100000.0, shares=500):
        self.symbol_name = symbol_name        
        self.bars = bars
        self.signals = signals
        self.initial_capital = float(initial_capital)
        self.with_benchmark = with_benchmark
        self.shares = int(shares)
        self.positions = self.generate_positions()
        
    def generate_positions(self):
        """Generate the positions DataFrame, based on the signals
        provided by the 'signals' DataFrame."""
        positions = pd.DataFrame(index=self.signals.index).fillna(0.0)

        positions[self.symbol_name] = self.shares*self.signals['signal']
        if self.with_benchmark:
            positions['benchmark'] = self.shares*self.signals['benchmark']
        return positions
                    
    def backtest_portfolio(self):
        """Backtest the portfolio and return a DataFrame containing
        the equity curve and the percentage returns."""
       
        portfolio = pd.DataFrame(index=self.positions.index)
        pos_diff = self.positions.diff()
            
        portfolio['price_diff'] = self.bars['CLOSE']-self.bars['OPEN']
        portfolio['price_diff'][0:1] = 0.0
        portfolio['profit'] = self.positions[self.symbol_name] * portfolio['price_diff']
     
        portfolio['total'] = self.initial_capital + portfolio['profit'].cumsum()
        
        if self.with_benchmark:
            portfolio['benchmark_profit'] = self.positions['benchmark'] * portfolio['price_diff']
            portfolio['benchmark_total'] = self.initial_capital + portfolio['benchmark_profit'].cumsum()
            portfolio.drop(columns=['benchmark_profit'], inplace=True)
        
        portfolio['returns'] = portfolio['total'].pct_change()
        
        self.returns = portfolio
        return portfolio
    
    def plot_performance(self):
        fig, ax = plt.subplots(2, sharex=True, figsize=(20, 12))
        ylabel = self.symbol_name + ' Adjusted Close Price in $'
    
        ax[0].plot(self.bars.index, self.bars['CLOSE'], color='r', lw=3.)
        ax[0].set_xlabel('', fontsize=15)
        ax[0].set_ylabel(ylabel, fontsize=15)
        ax[0].legend(('Close Price ' + self.symbol_name,), loc='upper left', prop={"size":15})
        ax[0].set_title(self.symbol_name + ' Close Price VS Portofolio Performance', fontsize=20, fontweight="bold")
        ax[0].grid(True)
  
        ax[1].plot(self.returns.index, self.returns['total'], color='b', lw=3.)
        if self.with_benchmark:
            ax[1].plot(self.returns.index, self.returns['benchmark_total'], color='g', lw=3.)
        ax[1].set_xlabel('Date', fontsize=15)
        ax[1].set_ylabel('Portfolio value in $', fontsize=15)
        ax[1].legend(('Portofolio Performance', 'Random Benchmark'), loc='upper left', prop={"size":18})
        ax[1].grid(True)
        
        plt.tick_params(axis='both', which='major', labelsize=12)
        
        plt.savefig('Images/AAPL/AAPL_simulate.png')
#         plt.show()

In [None]:
def getPredictionFromModel(algorithm, num_features, symbol_name):
    """
    returns array of prediction and score from the selected model.
    """
    
    X_train, X_test, Y_train, Y_test = hm.prepare_data(num_features, symbol_name, is_binary_ouput=True)

    algorithm.fit(X_train, Y_train) 
    Y_pred = algorithm.predict(X_test)
    
#     hm.accuracy_metrics(Y_test, Y_pred)
    print('accuracy_score: ', accuracy_score(Y_test, Y_pred))
        
    return Y_pred

In [None]:
def get_benchmark_prediction(length):
    mean, std_dev = 0, 1 # mean and standard deviation
    prediction = np.random.normal(mean, std_dev, length)
    
    prediction = np.abs(prediction) / prediction
#     unique, counts = np.unique(prediction, return_counts=True)
#     print('Benchmark Prediction Count: ', dict(zip(unique, counts)))
    return prediction

In [None]:
def biased_coin(length, threshold):
    prediction = np.random.rand(length)
    prediction = [1 if x > threshold else -1 for x in prediction]
    return prediction

In [None]:
def simulate(algorithm, num_features, symbol_name, with_benchmark):
    prediction = getPredictionFromModel(algorithm, num_features, symbol_name)
    
    # dataframe of stock historical prices 
    df = pp.read_file(symbol_name).iloc[::-1]
    bars = pd.DataFrame(index=df.index)
    df['OPEN'] = df['ADJ_CLOSE'].shift(1)
    df['CLOSE'] = df['ADJ_CLOSE']
    df = df[['OPEN', 'CLOSE']]

    _, bars_test = sh.series_split(df, train_ratio = 0.75)


    # initialize empty dataframe indexed as the bars. There's going to be perfect match between dates in bars and signals 
    signals = pd.DataFrame(index=bars_test.index)

    # initialize signals.signal column to zero
    signals['signal'] = 0.0
    
    # Removing extra signals (difference may arise due to reduction in datapoints becuase of feature generation)
    signals = signals.drop(signals.index[:(len(signals) - len(prediction))], axis=0)
    
    # copying into signals.signal column results of prediction
    signals['signal'] = prediction    
    benchmark_prediction = get_benchmark_prediction(len(signals.index))
    signals['benchmark'] = benchmark_prediction

    # calling portfolio evaluation on signals (predicted returns) and bars 
    # (actual returns)
    portfolio = MarketIntradayPortfolio(symbol_name, bars_test, signals, with_benchmark)

    # backtesting the portfolio and generating returns on top of that 
    returns = portfolio.backtest_portfolio()
    print(returns[:])
    
    portfolio.plot_performance()
    return returns.total[-1]

In [None]:
def simulate_benchmark(num_features, symbol_name, iterations):
    X_train, X_test, Y_train, Y_test = hm.prepare_data(num_features, symbol_name, is_binary_ouput=True)
    benchmark_returns = None
    final_profits = list()
    highest_profit = 0
    
    prob_down = list(Y_train).count(-1) / len(Y_train)
    
    for _ in range(iterations):
#         prediction = get_benchmark_prediction(len(Y_test))
        prediction = biased_coin(len(Y_test), prob_down)
        
        # dataframe of stock historical prices 
        df = pp.read_file(symbol_name).iloc[::-1]
        bars = pd.DataFrame(index=df.index)
        df['OPEN'] = df['ADJ_CLOSE'].shift(1)
        df['CLOSE'] = df['ADJ_CLOSE']
        df = df[['OPEN', 'CLOSE']]

        _, bars_test = sh.series_split(df, train_ratio = 0.75)


        # initialize empty dataframe indexed as the bars. There's going to be perfect match between dates in bars and signals 
        signals = pd.DataFrame(index=bars_test.index)

        # initialize signals.signal column to zero
        signals['signal'] = 0.0

        # Removing extra signals (difference may arise due to reduction in datapoints becuase of feature generation)
        signals = signals.drop(signals.index[:(len(signals) - len(prediction))], axis=0)

        # copying into signals.signal column results of prediction
        signals['signal'] = prediction 

        # calling portfolio evaluation on signals (predicted returns) and bars 
        # (actual returns)
        portfolio = MarketIntradayPortfolio(symbol_name, bars_test, signals, with_benchmark=False)

        # backtesting the portfolio and generating returns on top of that 
        returns = portfolio.backtest_portfolio()
        final_profits.append(returns.total[-1])
        if highest_profit < returns.total[-1]:
            highest_profit = returns.total[-1]
        
        if benchmark_returns is None:
            benchmark_returns = returns
        else:
            benchmark_returns += returns

    benchmark_returns /= iterations    
    print(benchmark_returns)
    print('Highest Profit: ', highest_profit)
    print('Total Benchmark Profit:', benchmark_returns.total[-1])
    std_results = np.std(np.array(final_profits))
    print('Standard Devation: ', std_results)
    return std_results, benchmark_returns.total[-1]

In [None]:
company_symbol = 'AAPL'
with_benchmark = True

std_results, benchmark_avg = simulate_benchmark(1, company_symbol, iterations = 1000)

# best knn results for apple
knn = KNeighborsClassifier(n_neighbors=13)
knn_result = simulate(knn, 1, company_symbol, with_benchmark)

num_std = (knn_result - benchmark_avg) / std_results
print('# of Std Dev:', num_std)