In [14]:
%%writefile 'PortfolioRebalancing.py'
import numpy as np
import pandas as pd
from typing import Dict, List

class PortfolioRebalancing:
    
    @classmethod
    def get_monthly_return(cls, ohlc: Dict[str, pd.DataFrame]) -> pd.DataFrame:
        return_df = pd.DataFrame()
        for ticker in ohlc:
            monthly_return = ohlc[ticker]['Adj Close'].pct_change().values
            return_df[ticker] = monthly_return
        return_df.index = ohlc[return_df.columns[0]].index
        return return_df
    
    @classmethod
    def cumulative_portfolio_returns(cls,
                                     df: pd.DataFrame,
                                     no_of_stocks_in_portfolio: int,
                                     no_of_stocks_to_swap: int) -> (np.ndarray,
                                                                    List[List[str]]):
        portfolio = []
        monthly_returns = [0]
        portfolio_history = []
        for i in range(len(df)):
            if len(portfolio) > 0:
                monthly_returns.append(df[portfolio].iloc[i, :].mean())
                bad_stocks = df[portfolio].iloc[i, :].sort_values(ascending=True)[:no_of_stocks_to_swap].index.values
                portfolio = [t for t in portfolio if t not in bad_stocks]
            fill = no_of_stocks_in_portfolio - len(portfolio)
            new_picks = df.iloc[i, :].sort_values(ascending=False)[:fill].index.values
            portfolio.extend(new_picks)
            portfolio_history.append(portfolio)
        monthly_returns = np.array(monthly_returns)
        return monthly_returns, portfolio_history

Overwriting PortfolioRebalancing.py


In [5]:
%%writefile ResistanceBreakout.py
import numpy as np
import pandas as pd
from typing import Dict, List

class ResistanceBreakout:
    
    @classmethod
    def calculate_returns(cls,
                          ohlc_dict: Dict[str, pd.DataFrame]):
        tickers_signal: Dict[str, str] = {}
        tickers_return: Dict[str, List[float]] = {}
        for ticker in ohlc_dict:
            tickers_signal[ticker] = ''
            tickers_return[ticker] = [0]
        for ticker in ohlc_dict:
            ohlc = ohlc_dict[ticker]
            for i in range(1, len(ohlc)):
                vol_greater_than_threshold = ohlc['Volume'][i] > 1.5 * ohlc['roll_max_vol'][i-1]
                low_less_than_roll_min_cp = ohlc['Low'][i] <= ohlc['roll_min_cp'][i]
                high_greater_than_roll_max_cp = ohlc['High'][i] >= ohlc['roll_max_cp'][i]
                if tickers_signal[ticker] == '':
                    tickers_return[ticker].append(0)
                    if vol_greater_than_threshold:
                        if high_greater_than_roll_max_cp:
                            tickers_signal[ticker] = 'buy'
                        elif low_less_than_roll_min_cp:
                            tickers_signal[ticker] = 'sell'
                elif tickers_signal[ticker] == 'buy':
                    exit_price = (ohlc['Close'][i-1] - ohlc['ATR'][i-1])
                    if ohlc['Low'][i] < exit_price:
                        tickers_signal[ticker] = ''
                        tickers_return[ticker].append((exit_price / ohlc['Close'][i-1])-1)
                    else:
                        tickers_return[ticker].append((ohlc['Close'][i]/ohlc['Close'][i-1])-1)
                        if low_less_than_roll_min_cp and vol_greater_than_threshold:
                            tickers_signal[ticker] = 'sell'
                else:
                    exit_price = ohlc['Close'][i-1] + ohlc['ATR'][i-1]
                    if ohlc['High'][i] > exit_price:
                        tickers_signal[ticker] = ''
                        tickers_return[ticker].append((ohlc['Close'][i-1]/exit_price) - 1)
                    else:
                        tickers_return[ticker].append((ohlc['Close'][i-1] / ohlc['Close'][i]) - 1)
                        if high_greater_than_roll_max_cp and vol_greater_than_threshold:
                            tickers_signal[ticker] = 'buy'
            ohlc['ret'] = np.array(tickers_return[ticker])

Overwriting ResistanceBreakout.py
