In [1]:
import import_ipynb
from lib import *

class Ewo:
    #All methods of Ewo should be used in order
    
    #Initialization
    def __init__(self, target_market_ticker, start_date, end_date):
        self.target_market_ticker = target_market_ticker
        self.start_date = start_date
        self.end_date = end_date
        self.market = pdr.get_data_yahoo(target_market_ticker, start_date, end_date)
        
    #Process data and return market & market_buy_init dataframes    
    def data_process(self, fast_sma, slow_sma, longer_sma, n_day_EWO_slope, n_day_closing_price, use_filter4):
        self.var1 = f'fast_sma: {fast_sma}'
        self.var2 = f'slow_sma: {slow_sma}'
        self.var3 = f'longer_sma: {longer_sma}'
        self.use_filter4 = use_filter4
        
        market = self.market.copy()
        day = np.arange(1, len(market) + 1)
        market['Day'] = day 
        market[self.var1] = market['Close'].rolling(fast_sma).mean()   #Column for fast_sma
        market[self.var2] = market['Close'].rolling(slow_sma).mean()   #Column for slow_sma
        market[self.var3] = market['Close'].rolling(longer_sma).mean() #Column for longer_sma
        market['EWO'] = market[self.var1] - market[self.var2]          #Column for EWO
        market = market[['Day', 'Open', 'Close', self.var1, self.var2, self.var3, 'EWO']]
        market = market.reset_index()
        market = market.set_index('Day')

        #Check whether EWO of each day is over a certain positive value, which is 1% of fast_sma
        #(value 1 = Yes, value 0 = No)
        market['Filter1'] = np.where(market['EWO'] > market[self.var1]/100, 1, 0)

        #Check whether the slope of the line of best fit corresponding to N most recent EWO values is positive
        #(value 1 = Yes, value 0 = No)
        market['Filter2'] = np.where(market['EWO'].rolling(n_day_EWO_slope).apply(lobf) > 0, 1, 0)
        
        #Check whether the current price is in uptrend (i.e. Closing Price > longer_sma)
        #(value 1 = Yes, value 0 = No)
        market['Filter3'] = np.where(market['Close'] > market[self.var3], 1, 0)
        
        if use_filter4:
            #Check whether there's a notable drop in the market
            #(value 1 = Yes, value 0 = No)
            market['Filter4'] = np.where(market['EWO'] < market[self.var1]*(-10/100), 1, 0)
        
        market.dropna(inplace=True)

        #Indicate where the N-day EWO's slope is negative AND where the price is in downtrend
        #(value -1 = Yes, value 0 = No)
        market['Exit1'] = np.where(market['EWO'].rolling(n_day_EWO_slope).apply(lobf) < 0, -1, 0)
        market['Exit2'] = np.where(market['Close'].rolling(n_day_closing_price).apply(lobf) < 0, -1, 0)

        #market_buy_init dataframe refers to the initial buy signals, where all the filters are satistfied
        condition1 = (market['Filter1'] == 1)
        condition2 = (market['Filter2'] == 1)
        condition3 = (market['Filter3'] == 1)
        if use_filter4:
            condition4 = (market['Filter4'] == 1)
            market_buy_init = market.loc[condition1&condition2&condition3|condition4]
        else:
            market_buy_init = market.loc[condition1&condition2&condition3]

        return market, market_buy_init
        
    #Backtesting of trading strategy on the target_market_ticker from start_date to end_date
    #Return market_buy and market_sell dataframes
    def backtest(self, budget, market, market_buy_init):
        buy_list = []   #List storing all the buy signals
        sell_list = []  #List storing all the sell signals
        sold_day = 0
        bought_amount = 0
        trade_num = 0
        if self.use_filter4:
            #This varaible tells whether the trade started from condition 4 is ongoing
            #(value 1 = Yes, value 0 = No)
            cond4 = 0

        #Iterate the elements in market_buy_init, which contains initial buy signals
        for i in range(len(list(market_buy_init.index))-1): 
            #If there are consecutive buy signals w/o a sell signal, continue the first buy signal
            if self.use_filter4:
                if (sold_day > list(market_buy_init.index)[i]+1) or \
                (market_buy_init.loc[list(market_buy_init.index)[i], 'Filter4'] == 1) and (cond4 == 1): 
                    continue
            elif sold_day > list(market_buy_init.index)[i]+1: 
                continue
  
            if bought_amount == 0: #Enter a trade only when there is no current position
                #Buy the security at the next morning's open price
                bought_day = list(market_buy_init.index)[i]+1
                buy_list.append(bought_day)
                bought_price = market.loc[bought_day, 'Open']
                bought_amount = budget // bought_price #Buy as many security as the budget allows at that price
                budget -= bought_amount * bought_price #Remaining budget after entering a long position
                #print(int(bought_amount), "amount of security is bought at",
                     #bought_price, "on", market.loc[bought_day, "Date"], "(Day", bought_day, ")")
                #print(budget, "is the remaining budget")
                
                if self.use_filter4:
                    if market.loc[bought_day-1, 'Filter4'] == 1:
                        cond4 = 1 #The trade started from condition 4 begins
                        continue
                        
            if self.use_filter4:
                if market_buy_init.loc[list(market_buy_init.index)[i], 'Filter4'] == 0:
                    bought_day = list(market_buy_init.index)[i]+1

            #Beginning from the day of long position until the end_date,
            #check whether both exit signals are satisfied simultaneously or 
            #whether it is the last day of trading
            for j in range(bought_day, market.iloc[[-1]].index[0]):
                if (market.loc[j, 'Exit1'] == -1) and (market.loc[j, 'Exit2'] == -1)\
                or (j == market.iloc[[-1]].index[0]-1):
                    #Sell the security at the next morning's open price
                    sold_day = j+1
                    sold_price = market.loc[sold_day, 'Open']
                    sell_list.append(sold_day)
                    budget += bought_amount * sold_price
                    #print(int(bought_amount), "amount of security is sold at",
                          #sold_price, 'on', market.loc[sold_day, "Date"], "(Day", sold_day, ")")
                    #print("After closing the position, current budget is", budget)
                    #print("\n")
                    #Reset the parameters
                    bought_amount = 0
                    trade_num += 1
                    if self.use_filter4:
                        cond4 = 0
                    break

        if bought_amount != 0: #In case the position is not cleared after the end_date
            sold_day = market.iloc[[-1]].index[0]
            sold_price = market.loc[sold_day, 'Open']
            sell_list.append(sold_day)
            budget += bought_amount * sold_price
            #print(int(bought_amount), 'amount of security is sold at', \
                #sold_price, 'on', market.loc[sold_day, "Date"], "(Day", sold_day, ")")
            #print("After closing the position, current budget is", budget)
            #print("\n")
            #Reset the parameter
            bought_amount = 0
            trade_num += 1
            if self.use_filter4:
                cond4 = 0
            
        print("The final budget of the trading strategy is", round(budget), "USD")
        print("Total number of trades is", trade_num)

        #All buy signals after the trade
        market_buy = market.copy()
        market_buy = market_buy.loc[buy_list] 

        #All sell signals after the trade
        market_sell = market.copy()
        market_sell = market_sell.loc[sell_list]
        
        return market_buy, market_sell
    
    def visualize(self, market, market_buy, market_sell):
        #Visualize the price trend, EWO value, buy signals, and sell signals throughout the trading period
        plt.rcParams['figure.figsize'] = 12, 6

        plt.subplot(211) #Upper plot
        plt.grid(True, alpha = .3)
        plt.plot(market['Date'], market['Close'], label = self.target_market_ticker) #market
        plt.plot(market['Date'], market[self.var1], label = self.var1) #fast_sma
        if self.var2[10:] != self.var3[12:]:
            plt.plot(market['Date'], market[self.var2], label = self.var2) #slow_sma
            plt.plot(market['Date'], market[self.var3], label = self.var3) #longer_sma
        else:
            #slow_sma&longer_sma
            plt.plot(market['Date'], market[self.var2], label = f'slow_sma&longer_sma: {self.var2[10:]}') 

        plt.plot(market_buy['Date'], market_buy['Close'], '^',
                 color = 'brown', label='Buy', markersize = 9) #Buy signals
        plt.plot(market_sell['Date'], market_sell['Open'], '.',
                 color = 'k', label='Sell', markersize = 9)    #Sell signals
        plt.legend(loc='upper left')

        plt.subplot(212) #Lower plot for EWO 
        plt.grid(True, alpha = .3)
        plt.plot(market['Date'], market['EWO'], color='blue', label = 'EWO') #EWO
        plt.plot(market_buy['Date'], market_buy['EWO'], '^',
                 color = 'brown', label='Buy', markersize = 9) #Buy signals
        plt.plot(market_sell['Date'], market_sell['EWO'], '.',
                 color = 'k', label='Sell', markersize = 9)    #Sell signals
        plt.axhline(0, color='lightgray', linestyle='--', linewidth=2) #x-axis
        plt.legend(loc='lower left')
        plt.show()

importing Jupyter notebook from lib.ipynb


In [2]:
class Ewo_loops1(Ewo):
    def __init__(self, target_market_ticker, start_date, end_date):
        self.target_market_ticker = target_market_ticker
        self.start_date = start_date
        self.end_date = end_date
        self.market = pdr.get_data_yahoo(target_market_ticker, start_date, end_date)
        self.profit_list = [[[], [], []] for _ in range(9)]     #List storing profit
        self.signal_num_list = [[[], [], []] for _ in range(9)] #List storing the number of buy/sell signals
        self.current = 0
        
    #Backtesting of trading strategy on the target_market_ticker from start_date to end_date
    #with different combinations of parameters
    #Return budget and the number of signals
    def backtest(self, budget, market, market_buy_init):
        buy_list = []   #List storing all the buy signals
        sell_list = []  #List storing all the sell signals
        sold_day = 0
        bought_amount = 0
        trade_num = 0
        if self.use_filter4:
            #This varaible tells whether the trade started from condition 4 is ongoing
            #(value 1 = Yes, value 0 = No)
            cond4 = 0

        #Iterate the elements in market_buy_init, which contains initial buy signals
        for i in range(len(list(market_buy_init.index))-1): 
            #If there are consecutive buy signals w/o a sell signal, continue the first buy signal
            if self.use_filter4:
                if (sold_day > list(market_buy_init.index)[i]+1) or \
                (market_buy_init.loc[list(market_buy_init.index)[i], 'Filter4'] == 1) and (cond4 == 1): 
                    continue
            elif sold_day > list(market_buy_init.index)[i]+1: 
                continue
  
            if bought_amount == 0: #Enter a trade only when there is no current position
                #Buy the security at the next morning's open price
                bought_day = list(market_buy_init.index)[i]+1
                buy_list.append(bought_day)
                bought_price = market.loc[bought_day, 'Open']
                bought_amount = budget // bought_price #Buy as many security as the budget allows at that price
                budget -= bought_amount * bought_price #Remaining budget after entering a long position
                
                if self.use_filter4:
                    if market.loc[bought_day-1, 'Filter4'] == 1:
                        cond4 = 1 #The trade started from condition 4 begins
                        continue
                        
            if self.use_filter4:
                if market_buy_init.loc[list(market_buy_init.index)[i], 'Filter4'] == 0:
                    bought_day = list(market_buy_init.index)[i]+1

            #Beginning from the day of long position until the end_date,
            #check whether both exit signals are satisfied simultaneously or 
            #whether it is the last day of trading
            for j in range(bought_day, market.iloc[[-1]].index[0]):
                if (market.loc[j, 'Exit1'] == -1) and (market.loc[j, 'Exit2'] == -1)\
                or (j == market.iloc[[-1]].index[0]-1):
                    #Sell the security at the next morning's open price
                    sold_day = j+1
                    sold_price = market.loc[sold_day, 'Open']
                    sell_list.append(sold_day)
                    budget += bought_amount * sold_price
                    #Reset the parameters
                    bought_amount = 0
                    trade_num += 1
                    if self.use_filter4:
                        cond4 = 0
                    break

        if bought_amount != 0: #In case the position is not cleared after the end_date
            sold_day = market.iloc[[-1]].index[0]
            sold_price = market.loc[sold_day, 'Open']
            sell_list.append(sold_day)
            budget += bought_amount * sold_price
            #Reset the parameter
            bought_amount = 0
            trade_num += 1
            if self.use_filter4:
                cond4 = 0
        
        return round(budget), len(buy_list)
    
    def collect(self, budget, signal_num, fast_sma, n_day_EWO_slope):
        self.profit_list[fast_sma-2][n_day_EWO_slope-3].append(budget)
        self.signal_num_list[fast_sma-2][n_day_EWO_slope-3].append(signal_num)
        self.current += 1
        if self.current % 1000 == 0:
            print(self.current, "combinations are tested\n")
            
    #Return profit & trade_num dataframes
    def data_process2(self):
        #Tested Ranges - Fast SMA: 2~10 days, Periods to define a slope of EWO: 3~5 days, Slow SMA: 20~50 days
        self.max_profit = 0
        self.max_trade = 0
        for i in range(len(self.profit_list)):
            for j in range(len(self.profit_list[i])):
                for k in range(len(self.profit_list[i][j])):
                    if self.profit_list[i][j][k] > self.max_profit:
                        self.max_profit = self.profit_list[i][j][k]
                        SMA_fast = i + 2
                        n_day_EWO_slope = j + 3
                        SMA_slow = k + 20
                    if self.signal_num_list[i][j][k] > self.max_trade:
                        self.max_trade = self.signal_num_list[i][j][k]
                          
        print(f"""
        Maximum final budget and trading number among all combinations are {self.max_profit} and {self.max_trade},
        respectively, with SMA_fast = {SMA_fast}, n_day_EWO_slope = {n_day_EWO_slope}, and SMA_slow = {SMA_slow}
        for the maximum final budget
        """)
        
        #Process backtested results into 'profit' dataframe
        profit = pd.DataFrame({'N':[], 'L':[], 'M':[], 'Value':[]})
        for i in range(9):
            for j in range(3):
                for k in range(31):
                    df = pd.DataFrame({'N':[i+2], 'L':[j+3], 'M':[k+20], 'Value':[self.profit_list[i][j][k]]})
                    profit = profit.append(df, ignore_index = True)
            
        #Process backtested results into 'trade_num' dataframe
        trade_num = pd.DataFrame({'N':[], 'L':[], 'M':[], 'Value':[]})
        for i in range(9):
            for j in range(3):
                for k in range(31):
                    df = pd.DataFrame({'N':[i+2], 'L':[j+3], 'M':[k+20], \
                                       'Value':[self.signal_num_list[i][j][k]]})
                    trade_num = trade_num.append(df, ignore_index = True)
        
        return profit, trade_num
    
    def visualize_final_cash(self, profit):
        #Visualize final cash data with 3D heatmap
        fig1 = plt.figure(figsize=(16, 10))
        ax = fig1.add_subplot(111, projection = '3d')

        colmap = cm.ScalarMappable(cmap='Blues')
        colmap.set_array(profit['Value'])

        ax.scatter(profit[['N']], profit[['L']], 
                   profit[['M']], marker = 's', s = 170, c = profit[['Value']], cmap = 'Blues')
        plt.colorbar(colmap, label = "Final Cash")
        ax.set_title("EWO Revision 1: Final Cash", fontsize = 15)
        ax.set_xlabel('SMA_fast: 2~10 days')
        ax.set_ylabel('n-day EWO_slope: 3~5 days')
        ax.set_zlabel('SMA_slow: 20~50 days')
        ax.view_init(30,-30)
        plt.show()
        
    def visualize_trade_num(self, trade_num):
        #Visualize trade number data with 3D heatmap
        fig2 = plt.figure(figsize=(16, 10))
        ax = fig2.add_subplot(111, projection = '3d')

        colmap = cm.ScalarMappable(cmap='Purples')
        colmap.set_array(trade_num['Value'])

        ax.scatter(trade_num[['N']], trade_num[['L']], 
                   trade_num[['M']], marker = 's', s = 170, c = trade_num[['Value']], cmap ='Purples')
        plt.colorbar(colmap, label = "# of Trades")
        ax.set_title("EWO Revision 1: Number of Trades", fontsize = 15)
        ax.set_xlabel('SMA_fast: 2~10 days')
        ax.set_ylabel('n-day EWO_slope: 3~5 days')
        ax.set_zlabel('SMA_slow: 20~50 days')
        ax.view_init(30,-30)
        plt.show()

In [3]:
class Ewo_loops2(Ewo_loops1):
    def __init__(self, target_market_ticker, start_date, end_date):
        self.target_market_ticker = target_market_ticker
        self.start_date = start_date
        self.end_date = end_date
        self.market = pdr.get_data_yahoo(target_market_ticker, start_date, end_date)
        self.profit_list = [[] for _ in range(81)]     #List storing profit
        self.signal_num_list = [[] for _ in range(81)] #List storing the number of buy/sell signals
        self.current = 0
        
    def collect(self, budget, signal_num, longer_sma):
        self.profit_list[longer_sma-50].append(budget)
        self.signal_num_list[longer_sma-50].append(signal_num)
        self.current += 1
        if self.current % 1000 == 0:
            print(self.current, "combinations are tested\n")
            
    def data_process2(self):
        #Tested Ranges - SMA_longer-term: 50~130 days, n-day Closing Price_slope: 50~130 days
        self.max_profit = 0
        self.max_trade = 0
        for i in range(len(self.profit_list)):
            for j in range(len(self.profit_list[i])):
                if self.profit_list[i][j] > self.max_profit:
                    self.max_profit = self.profit_list[i][j]
                    SMA_longer_term = i + 50
                    n_day_closing_price = j + 50
                if self.signal_num_list[i][j] > self.max_trade:
                    self.max_trade = self.signal_num_list[i][j]
                          
        print(f"""
        Maximum final budget and trading number among all combinations are {self.max_profit} and {self.max_trade},
        respectively, with SMA_longer_term = {SMA_longer_term} and n_day_closing_price = {n_day_closing_price}
        for the maximum final budget
        """)
        
        #Process backtested results into 'profit' dataframe
        profit = pd.DataFrame({'M':[], 'N':[], 'Value':[]})
        for i in range(81):
            for j in range(81):
                df = pd.DataFrame({'M':[i+50], 'N':[j+50], 'Value':[self.profit_list[i][j]]})
                profit = profit.append(df, ignore_index = True)
        profit = profit.pivot('M', 'N', 'Value')

        #Process backtested results into 'trade_num' dataframe
        trade_num = pd.DataFrame({'M':[], 'N':[], 'Value':[]})
        for i in range(81):
            for j in range(81):
                df = pd.DataFrame({'M':[i+50], 'N':[j+50], 'Value':[self.signal_num_list[i][j]]})
                trade_num = trade_num.append(df, ignore_index = True)
        trade_num = trade_num.pivot('M', 'N', 'Value')
        
        return profit, trade_num
    
    def visualize_final_cash(self, profit):
        #Visualize final cash data with 2D heatmap
        plt.figure(figsize=(8, 6))
        ax = sns.heatmap(profit, cmap='Blues', cbar_kws={'label': 'Final Cash'})
        plt.title('EWO Revision 2: Final Cash', fontsize = 15)
        plt.xlabel('n-day Closing Price_slope: 50~130 days')
        plt.ylabel('SMA_longer-term: 50~130 days')
        plt.savefig('EWO Revision 2: Final Cash.png', dpi=300)
        plt.show()
        
    def visualize_trade_num(self, trade_num):
        #Visualize trade number data with 2D heatmap
        plt.figure(figsize=(8, 6))
        ax = sns.heatmap(trade_num, cmap='Purples', cbar_kws={'label': '# of Trades'})
        plt.title('EWO Revision 2: Number of Trades', fontsize = 15)
        plt.xlabel('n-day Closing Price_slope: 50~130 days')
        plt.ylabel('SMA_longer-term: 50~130 days')
        plt.savefig('EWO Revision 2: Number of Trades.png', dpi=300)
        plt.show()