In [None]:
from zpmeta.funcs.func import Func 
from pandas import DataFrame
import pandas as pd
from dateutil.relativedelta import relativedelta
import os

In [ ]:

class Cumulate_g_Returns(Func):
    """Function class for calculating the cumulative return from daily returns.
    """
    @classmethod
    def _std_params(cls, name=None):
        params = dict(freq='M', extrapolate=True)
        return params
    
    @classmethod
    def _execute(cls, operand: DataFrame=None, params: dict = None) -> object:
        if params['extrapolate']:
            df = DataFrame(0, index=[operand.index[-1] + relativedelta(years=1)], columns=operand.columns)
            freq_returns = operand.combine_first(df)
        cumulative_returns = (1.0 + freq_returns).cumprod(skipna=True).fillna(method='pad').asfreq(params['freq'], method='pad')
        if params['extrapolate']:
            next_period_date = cumulative_returns.index[cumulative_returns.index>=operand.index[-1]][0] # immediate next period date
            cumulative_returns = cumulative_returns.truncate(after=next_period_date) # truncate after next period date
        
        prev_returns = cumulative_returns.shift(1).fillna(1.0)
        result = (cumulative_returns/prev_returns).dropna(how='all') - 1.0
        
        return result

In [ ]:
class Drawdowns_g_R(Func):
    """Calculate rolling drawdowns given returns.
    """
    @classmethod
    def _std_params(cls, name=None):
        params = dict(freq='M', type=['flat'])
        return params

    @classmethod
    def _execute(cls, operand=None, params: dict = None) -> DataFrame:
        freq_returns = Cumulate_g_Returns({'freq': params['freq'], 'extrapolate': True})(operand)
        
        freq_index = (1.0 + freq_returns).cumprod()
        expanding_max_index = freq_index.expanding().max()
        drawdown = -1.0 * ((expanding_max_index - freq_index) / expanding_max_index).applymap(lambda x: x if x > 0 else 0)
        
        # results = DataFrame()
        # for type in params['types']:
        #     if type == 'flat':
        #         dd = drawdown
        #     if type == 'avg':
        #         dd = drawdown.expanding().mean()
        #     elif type == 'max':
        #         dd = drawdown.expanding().max()
        #     
        #     # if the columns of dd are MultiIndex, add the 'type' as the second level of the MultiIndex
        #     if isinstance(dd.columns, pd.MultiIndex):
        #         dd.columns = dd.columns.set_levels([type], level=1)
        #     else:
        #         dd.columns = [type]
        #     
        #     
        #     # join dd to the results with 'type' in a MultiIndex column with name as the value in 'type'
        #     results = results.join(dd, how='outer', rsuffix='_' + type)
             
        return drawdown
    
class AverageDrawdowns_g_R(Func):
    """Calculate the average drawdown given returns.
    """
    @classmethod
    def _std_params(cls, name=None):
        params = dict(freq='M')
        return params

    @classmethod
    def _execute(cls, operand=None, params: dict = None) -> DataFrame:
        drawdown = Drawdowns_g_R(params)(operand)
        
        # calculate rolling average of the drawdown
        avg_drawdown = drawdown.expanding().mean()
        
        return avg_drawdown
    
class MaxDrawdowns_g_R(Func):
    """Calculate the maximum drawdown given returns.
    """
    @classmethod
    def _std_params(cls, name=None):
        params = dict(freq='M')
        return params

    @classmethod
    def _execute(cls, operand=None, params: dict = None) -> DataFrame:
        drawdown = Drawdowns_g_R(params)(operand)
        
        # calculate rolling average of the drawdown
        max_drawdown = drawdown.expanding().max()
        
        return max_drawdown
    
class LongestDrawdownPeriods_g_R(Func):
    """Calculate the longest N drawdown periods given returns.
    """
    @classmethod
    def _std_params(cls, name=None):
        params = dict(freq='M', num_periods=5)
        return params
    
    @classmethod
    def _execute(cls, operand=None, params: dict = None) -> DataFrame:
        drawdowns = Drawdowns_g_R(params)(operand)

        def find_longest_drawdown_periods_for_column(drawdown_series, num_periods):
            in_drawdown = False
            drawdown_periods = []
        
            for i in range(len(drawdown_series)):
                if drawdown_series.iloc[i] < 0:
                    if not in_drawdown:
                        in_drawdown = True
                        start_date = drawdown_series.index[i]
                        max_drawdown = drawdown_series.iloc[i]
                        max_drawdown_date = drawdown_series.index[i]
                    else:
                        if drawdown_series.iloc[i] < max_drawdown:
                            max_drawdown = drawdown_series.iloc[i]
                            max_drawdown_date = drawdown_series.index[i]
                else:
                    if in_drawdown:
                        end_date = drawdown_series.index[i - 1]
                        period_length = end_date - start_date
                        drawdown_periods.append((start_date, end_date, period_length, max_drawdown, max_drawdown_date))
                        in_drawdown = False
        
            if in_drawdown:
                end_date = drawdown_series.index[-1]
                period_length = end_date - start_date
                drawdown_periods.append((start_date, end_date, period_length, max_drawdown, max_drawdown_date))
        
            drawdown_df = pd.DataFrame(drawdown_periods, columns=['start_date', 'end_date', 'length', 'max_drawdown', 'max_drawdown_date'])
            drawdown_df = drawdown_df.sort_values(by='length', ascending=False)
            drawdown_df = drawdown_df.head(num_periods)
            drawdown_df.index = range(1, len(drawdown_df) + 1)
        
            return drawdown_df
        
        all_results = []
    
        for column in drawdowns.columns:
            result_df = find_longest_drawdown_periods_for_column(drawdowns[column], params['num_periods'])
            result_df.columns = pd.MultiIndex.from_product([[column], result_df.columns])
            all_results.append(result_df)
    
        combined_results = pd.concat(all_results, axis=1)

        return combined_results


In [ ]:
worksp = r'C:\Users\raman\OneDrive\MSx\CMC\Convex'

# set the working directory to the location of the data files
os.chdir(worksp)

# read returns from the 'Daily' sheet of GETT_Portfolio_Returns_CSV.xlsx workbook
returns = pd.read_excel(os.path.join(worksp, 'GETT_Portfolio_Returns_Non.xlsx'), sheet_name='Daily', index_col=0, parse_dates=True)

# do some detailed EDA on the returns
returns.describe()
returns.info()

# returns = pd.read_csv(os.path.join(worksp, 'GETT_Portfolio_Returns_CSV.csv'), index_col=0, parse_dates=True)