In [4]:
# Imports
import pandas as pd
import datetime as dt
import numpy as np

In [None]:
class PortfolioDB():
    
    # Maintenance margin requirement for each asset (as a fraction of absolute market value)
    maint_margin_frac = 0.5
    
    # Do not modify these
    cash_security_id = 'cash'     # Security identifier for cash
    ret_tallied_dates = set()     # Set of dates we've already tallied
    
    
    # Initialization function (aka constructor) that runs whenever you create a new PortfolioDB() elsewhere, for example: portfolio_db = PortfolioDB()
    def __init__(self) :
        # Create three main DataFrames: positions_df, trades_df, account_history_df

        #####################################################################
        # positions_df:
        #
        # A snapshot of all positions currently in the portfolio, lumping them togeter by identifier
        # so if you bought 100 shares ABC on 11/1 and another 150 shares on 12/1, and have made no other trades, current_portfolio will have a row with
        # identifier = ABC, position = 250
        #
        # Meant to match what you observe on your brokerage account screen
        #
        # In backtests, we keep track of this ourselves
        # In live trading, this will be pulled from the brokerage
        #       
        self.positions_df = self.empty_positions_df()
        #####################################################################
        
        
        #####################################################################
        # trades_df
        #
        # An archive of all trades ever made as part of this strategy
        # Each row is a trade, defined as a round-trip transaction for a specific security, wherein you:
        # - Buy/sell to "open" the trade at time t1
        # - Sell/buy to "close" the trade at some time t2 > t1
        # 
        # Here we specify that trades_df records the prices, quantities, and timings of trades, but
        # users can add columns to this DataFrame for inputs that let to the trade being opened, details of the transaction, etc
        #
        # index is an automatically-generated integer identifier
        self.trades_df = self.empty_trades_df()
        #####################################################################
        
        #####################################################################
        # account_history_df
        #
        # An archive of account-level variables for each time unit (usually day) in the backtest
        # or recorded as often as you want during live trading
        self.account_history_df = self.empty_account_history_df()
        #####################################################################

    # Creates an empty positions_df with all the right column types
    def empty_positions_df(self):
        return pd.DataFrame({'security_id': pd.Series([], dtype='object'), #security identifier such as ticker or permno or optionid whatever unique identifier for the position
                             'quantity': pd.Series([], dtype='float'), # number of units in portfolio, can be positive or negative. Never 0 (zero positions removed from this DataFrame unless its cash)
                             'average_cost': pd.Series([], dtype='float'), # average cost of units in portfolio
                             'current_price': pd.Series([], dtype='float') # current price of security
                            })
    
    # Creates an empty trades_df with all the right column types
    def empty_trades_df(self):
        return pd.DataFrame({'security_id': pd.Series([], dtype='object'), # ticker or permno or optionid whatever unique identifier for the position
                             'quantity': pd.Series([], dtype='float'), # number of units traded. positive = buy, negative = sell
                             'open_datetime': pd.Series([], dtype='datetime64[ns]'), # datetime on which the trade was opened
                             'open_average_price': pd.Series([], dtype='float'), # average open price of the units in the trade                                              
                             'close_datetime': pd.Series([], dtype='datetime64[ns]'), # datetime on which the trade was closed
                             'close_average_price': pd.Series([], dtype='float') # average close price of the units in the traade
                            })
    
    # Creates an empty account_history_df with all the right column types
    def empty_account_history_df(self):
        return pd.DataFrame({'datetime': pd.Series([], dtype='datetime64[ns]'), # datetime on which the account data was recorded
                             'nav': pd.Series([], dtype='float'), # account net asset value
                             'cash_position': pd.Series([], dtype='float'), # account cash position
                             'margin_requirement': pd.Series([], dtype='float') # account margin requirement
                            })
        
    # Adds the passed amount of cash to the positions_df
    def add_cash(self,cash_to_add):        
        if( not self.positions_df['security_id'].str.contains(self.cash_security_id).any() ):
            # Append is being deprecated, so we need to switch to concat
            self.positions_df = pd.concat([ self.positions_df, pd.DataFrame({'security_id':[self.cash_security_id],'quantity':[0], 'average_cost':[1], 'current_price':[1]})])
             # self.positions_df = self.positions_df.append(pd.DataFrame({'security_id':[self.cash_security_id],'quantity':[0], 'average_cost':[1], 'current_price':[1]}))
                

        self.positions_df.loc[self.positions_df['security_id']==self.cash_security_id,'quantity'] += cash_to_add
        
    # adds new trade rows into the trades_df, and updates positions_df accordingly
    # passed open_trades_df must be a Pandas DataFrame with columns 'security_id','open_datetime','quantity' and 'open_average_price' variables.
    # open_trades_df can also have custom data in whatever additional columns you want
    # So for example, you can create a single trade where you opened a short position for 200 shares of stock with security_id=12345 on 2005-02-25 in the following way:
    #    trades pd.DataFrame({'security_id':[12345], 'open_datetime':[np.datetime64('2005-01-23')], 'quantity':[-200], 'open_average_price':[39.25], 'custom_data':[1.234]})
    # where custom_data is some custom column you want to add
    def open_trades(self,open_trades_df):     
        # trades must have columns for security_id, open_datetime, quantity, and open_average_price (cannot correct, raise error)
        validate_df_columns(open_trades_df,['security_id','open_datetime','quantity','open_average_price'])
        
        # security_id column must have type object (can correct)
        if open_trades_df['security_id'].dtype != object:            
            open_trades_df['security_id'] = open_trades_df['security_id'].astype(object)
                
        # lump together all trades with the same security_id and open_datetime as one
        open_trades_df = groupby_agg_wavg(df=open_trades_df, group_by=['security_id','open_datetime'],
                                          default_method='first',
                                          wavg_columns=['open_average_price'],
                                          wavg_weight='quantity')

        # Append to existing trades_df
        # Append is being deprecated, so we need to switch to concat
        # self.trades_df = self.trades_df.append(open_trades_df)
        self.trades_df = pd.concat([ self.trades_df, open_trades_df ])
        
        # Each trade adds -open_average_price*quantity to the cash balance
        self.add_cash( -(open_trades_df['open_average_price']*open_trades_df['quantity']).sum() )
                                                 
        # positions impact of this is to add quantity to position for security id and adjust average_cost 
            
        # extract the columns we need for positions_df from the trades data
        trades_position_df = open_trades_df.loc[:,['security_id', 'quantity', 'open_average_price']] 
        trades_position_df = trades_position_df.rename(columns={'open_average_price':'average_cost'}) 
            
        self.append_positions(trades_position_df)
                        
        self.remove_zero_positions() 
        
    # closes existing trade rows in the trades_df, and updates positions_df accordingly
    # passed close_trades_df must be a Pandas DataFrame with columns 'security_id', 'open_datetime', close_datetime', and 'close_average_price' variables.
    # security_id and open_datetime will be used to identify the trade in the current trades_df to close
    # close_trades_df can also have custom data in whatever additional columns you want
    # So for example, you can close trade where you opened a short position for 200 shares of stock with security_id=12345 on 2005-02-25 in the following way:
    #    close_trades_df = pd.DataFrame({'security_id':[12345], 'open_datetime':[np.datetime64('2005-01-23')], 'close_datetime':[np.datetime64('2005-08-09')], 'close_average_price':[np.datetime64('2005-08-09')], 'close_custom_data':[1.234]})
    # where close_custom_data is some custom column you want to add
    def close_trades(self,close_trades_df):
        # trades must have columns for security_id, open_datetime, close_datetime, and close_average_price (cannot correct, raise error)
        validate_df_columns(close_trades_df,['security_id','open_datetime','close_datetime','close_average_price'])

        # find corresponding trades in trades_df
        # if we somehow have multiple for a given security_idk/open_datetime pair, go with the first 
        close_trades_df = close_trades_df.groupby(['security_id','open_datetime']).agg('first').reset_index() 
        close_trades_df = close_trades_df.astype({'security_id':'object', 'open_datetime':'datetime64[ns]'})
        
        self.trades_df = self.trades_df.merge(close_trades_df, on=['security_id','open_datetime'], suffixes=('','_cldf'), how='left')

        # use the close_datetime_cldf column to add a new view indicating whether each trade was just closed
        just_closed = self.trades_df.loc[:,'close_datetime_cldf'].notnull()
    
        # use the _cldf suffix values for close_datetime and close_average_price
        self.trades_df.loc[just_closed,'close_datetime'] = pd.to_datetime(self.trades_df.loc[just_closed,'close_datetime_cldf'])
        self.trades_df.loc[just_closed,'close_average_price'] = self.trades_df.loc[just_closed,'close_average_price_cldf']
        
        # drop the _cldf suffix values
        self.trades_df = self.trades_df.loc[:,~self.trades_df.columns.str.endswith('_cldf')]

        # add cash
        self.add_cash( (self.trades_df.loc[just_closed,'close_average_price']*self.trades_df.loc[just_closed,'quantity']).sum() )
        
        # now update positions
        # create a new positions row for each of the closed trades
        trades_position_df = self.trades_df.loc[just_closed,['security_id', 'quantity', 'open_average_price']]
        trades_position_df = trades_position_df.rename(columns={'open_average_price':'average_cost'}) 
        trades_position_df['quantity'] = trades_position_df['quantity']*(-1) # negate quantity because we are closing
        
        self.append_positions(trades_position_df)     
        
        self.remove_zero_positions() 
    
    # Update prices of everything in portfolio_df using passed price_df
    def update_prices(self,price_df):
        # price_df must have columns for security_id, prc, and ret, and must have only one date (cannot correct, raise error)
        validate_df_columns(price_df,['security_id','prc','ret'])
        validate_all_values_same(price_df['date'])
                
        # security_id column must have type object (can correct)
        if price_df.loc[:,'security_id'].dtype != object:            
            price_df['security_id'] = price_df.loc[:,'security_id'].astype(object)
        
        self.positions_df = self.positions_df.merge(price_df.loc[:,['security_id','prc','ret']],on='security_id',how='left')
        # set missing returns to zero, otherwise we drop positions from our NAV calculation
        self.positions_df.loc[:,'ret'] = self.positions_df.loc[:,'ret'].fillna(0)
                
        self.positions_df.loc[:,'lagged_price'] = self.positions_df.loc[:,'current_price']
        
        # update current_prices to match actual data
        self.positions_df.loc[:,'current_price'] = np.where(self.positions_df.loc[:,'prc'].notnull(),
                                                            self.positions_df.loc[:,'prc'],
                                                            self.positions_df.loc[:,'current_price'])
        
        # Replace na current_prices with average_cost
        null_current_price = self.positions_df.loc[:,'current_price'].isna()
        self.positions_df.loc[null_current_price,'current_price'] = self.positions_df.loc[null_current_price,'average_cost']
        
        # First date is fine since they should all be the same
        date = price_df.loc[:,'date'].iloc[0]
        
        # we only want to tally returns once per date, so check that we haven't already
        if( date not in self.ret_tallied_dates ):
            # multiply lagged_price by returne to get a hypothetical price
            self.positions_df.loc[:,'ret_based_price'] = self.positions_df.loc[:,'lagged_price']*(self.positions_df.loc[:,'ret']+1)
            
            # update quantities so that ret_based_price*old_quantity = current_price*new_quantity
            self.positions_df.loc[:,'quantity_multiple'] = self.positions_df.loc[:,'ret_based_price'] / self.positions_df.loc[:,'current_price']
            self.positions_df.loc[:,'quantity'] = self.positions_df.loc[:,'quantity']*self.positions_df.loc[:,'quantity_multiple']
            
            # No longer need ret-based_price
            self.positions_df = self.positions_df.drop(columns=['ret_based_price'])
            
            # update quantities of open trades as well
            still_open = self.trades_df.loc[:,'close_datetime'].isna() 
            
            if( not still_open.empty ):  
                self.trades_df = self.trades_df.merge(self.positions_df.loc[:,['security_id','quantity_multiple']],
                                                      on=['security_id'],
                                                      suffixes=('','_posdf'),
                                                      how='left')
                self.trades_df.loc[still_open,'quantity'] = self.trades_df.loc[still_open,'quantity']*self.trades_df.loc[still_open,'quantity_multiple']          
            
                # drop the _posdf suffix values
                self.trades_df = self.trades_df.loc[:,~self.trades_df.columns.str.endswith('_posdf')]

                # drop the extra columns
                self.trades_df = self.trades_df.drop(columns=['quantity_multiple'])                
            # end of if
            
            self.ret_tallied_dates.add(date)
        # end of if
        
        self.positions_df = self.positions_df.drop(columns=['prc','lagged_price','ret'])
    
    # Removes any positions in with zero quantity from the positions_df
    # Due to rounding errors, we may not have quantity exactly equal to zero, so we remove when the price*quantity is bigger than 0.1 cents
    def remove_zero_positions(self): 
        # Find "price" as current_price, if we have one, or average_cost if we don't
        prices = np.where( self.positions_df.loc[:,'current_price'].notnull(),
                          self.positions_df.loc[:,'current_price'],
                          self.positions_df.loc[:,'average_cost'] )
        # Be sure also to never remove the cash entry even when it has 0 quantity
        self.positions_df = self.positions_df[ (np.abs(self.positions_df.loc[:,'quantity']*prices) > 1.0E-5) |
                                               (self.positions_df.loc[:,'security_id'] == 'cash') ]
        
    # Add new row to account_history_df based on current portfolio and passed datetime, after updating prices using passed price_df    
    def record_account_data(self,price_df,datetime):
        self.update_prices(price_df)
        # Append is being deprecated, so we need to switch to concat
        # self.account_history_df = self.account_history_df.append(
        #     pd.DataFrame({'datetime':[datetime],'nav':[self.current_nav()], 'cash_position':[self.current_cash()], 'margin_requirement':[self.current_margin()]}))
        self.account_history_df = pd.concat([ self.account_history_df, 
                                             pd.DataFrame({'datetime':[datetime],'nav':[self.current_nav()], 'cash_position':[self.current_cash()], 'margin_requirement':[self.current_margin()]}) ] )
        
    # Returns current NAV    
    def current_nav(self):
        return (self.positions_df.loc[:,'current_price']*self.positions_df.loc[:,'quantity']).sum()
    
    # Returns current cash position
    def current_cash(self):
        # Sum across everything with cash's security_id, just in case there's multiple rows
        return self.positions_df.loc[self.positions_df.loc[:,'security_id']==self.cash_security_id,'quantity'].sum()
    
    # Returns current maintenance margin requirement for the whole portfolio
    def current_margin(self):
        return self.maint_margin_frac*(np.absolute(self.positions_df['current_price']*self.positions_df['quantity']).sum() - abs(self.current_cash()))

    # Append new_positions_df to the positions_df
    # new_positions can potentially have multiple rows with the same security_id, and will often have rows with security_id already in positions_df
    # problem is that positions_df is supposed to have only one row per security_id
    # so to merge positions_df with trade_positions, we need to first append the new positions, then group by security id
    def append_positions(self,new_position_df):
        # Append, which creates duplicates
        # Append is being deprecated, so we need to switch to concat
        # self.positions_df = self.positions_df.append(new_position_df) 
        self.positions_df = pd.concat([ self.positions_df, new_position_df ])
        
        # group and aggregate using weighted average
        self.positions_df = groupby_agg_wavg(df=self.positions_df, group_by='security_id', default_method='last', wavg_columns=['average_cost'], wavg_weight='quantity')
        
        # Fix average_cost in case it's tiny or negative due to bad weighted averaging
        self.positions_df.loc[ self.positions_df.loc[:,'average_cost'] < 0.1, 'average_cost' ] = np.NaN
        
        
###################################################################
# Helper methods, do not modify
###################################################################

# Groups passed df by the group_by columns and aggregates using:
#   - default_method, unless:
#   - specified_methods, a dictionary mapping column names to methods, contains the column name
#   - the column is contained in wavg_columns, which aggregaes using a custom "weighted average" aggregation
#       - if we are doing weighted average aggregation, wavg_weight is the name of the column used for weighting
def groupby_agg_wavg(df, group_by, default_method='sum', specified_methods={}, wavg_columns=[], wavg_weight=''):    
    starting_col_names = df.columns
    starting_col_types = df.dtypes

    df2 = df.copy() # don't want to modify original passed in
    
    # create dictionary that will map column names to functions used for aggregation
    method_dict = dict.fromkeys(df2.columns,default_method)  # start with using default_method for all columns
    method_dict.update(specified_methods) # update with any specified methods. dict.update modifies the dictionary and does NOT return one
    
    # create columns for weight*var 
    temp_columns = ['wtimes_' + s for s in wavg_columns] # name temp columns wtimes_<original_name>
    df2[temp_columns] = df2[wavg_columns].multiply(df2[wavg_weight], axis='index') # multiply by weights
    method_dict.update( dict.fromkeys(temp_columns,'sum') ) # want to sum these cols when we aggregate
    method_dict[wavg_weight] = 'sum' # need to sum the wavg weight
    
    # run actual groupby, resetting index
    df2 = df2.groupby(group_by).agg(method_dict).reset_index(drop=True)
    
    # use weight*var sum columns to go back to actual weighted average
    df2[wavg_columns] = df2[temp_columns].divide(df2[wavg_weight], axis='index')
    
    # go back to original column names in correct order
    df2 = df2[starting_col_names]
    df2 = df2.astype(starting_col_types)
    
    return df2 

# Validates the passed dataframe has the columns in passed array required_columns
# If not, it raises a ValueError
def validate_df_columns(df,required_columns):
    for col in required_columns:
        if col not in df.columns:
            raise ValueError('DataFrame missing required column named \'' + col + '\'' )

# Validates all the values in the passed series are the same
# If not, it raises a ValueError
def validate_all_values_same(series):
    np_array = series.to_numpy()
    if not (np_array[0] == np_array).all():
        raise ValueError('Series has multiple unique values but is required to have only one')
        
# Validates the passed series has no na entries
# If it has any, it raises a ValueError
def validate_no_missing_values(series):
    if( series.isna().sum() > 0 ):
        raise ValueError('Series has missing value but is required to have none')