In [31]:
import pandas as pd
import numpy as np
import seaborn as sns
import ipywidgets as widgets
from IPython.display import display

### Function

In [32]:
# Function to calculate percentile rank incrementally
def calculate_percent_rank_inc(data, start_date=None, end_date=None):
    """
    Calculate PercentRank.INC for each row compared to all previous rows in a Series.
    Ensures scalability for large data sizes by using efficient pandas operations.

    Parameters:
    data (pd.Series, pd.DataFrame, dict): Input data. Expected dictionary format: {date: {nested_keys: value}}
    start_date (str): Optional start date for filtering data.
    end_date (str): Optional end date for filtering data.

    Returns:
    pd.Series: Series with PercentRank.INC values.
    """
    if isinstance(data, pd.DataFrame):
        if data.shape[1] != 1:
            raise ValueError("DataFrame input should have exactly one column.")
        series = data.iloc[:, 0]
    elif isinstance(data, dict):
        flat_data = {pd.to_datetime(date): value for date, nested_dict in data.items() for key, value in nested_dict.items()}
        series = pd.Series(flat_data)
    elif isinstance(data, pd.Series):
        series = data
    else:
        raise TypeError("Input data should be a pandas Series, DataFrame, or dictionary.")
    
    if not isinstance(series.index, pd.DatetimeIndex):
        series.index = pd.to_datetime(series.index)
    
    if start_date or end_date:
        start_date = pd.to_datetime(start_date) if start_date else series.index.min()
        end_date = pd.to_datetime(end_date) if end_date else series.index.max()
        series = series.loc[start_date:end_date]
    
    if series.empty:
        return pd.Series([], index=series.index)
    
    if end_date and pd.to_datetime(end_date) not in series.index:
        series.loc[pd.to_datetime(end_date)] = series.iloc[-1]
    
    ranks = series.rank(method='min').apply(lambda x: (x - 1) / (len(series) - 1))
    return ranks

# Function to calculate percentile rank surface
def calculate_percentile_rank_surface(nested_dict, udl, date, moneyness_levels, start_date=None, end_date=None):
    """
    Calculate the percentile rank surface from a nested dictionary.

    Parameters:
    nested_dict (dict): Nested dictionary containing the data.
    udl (str): Underlying identifier.
    date (str): Target date for which the percentile rank is calculated.
    moneyness_levels (list): List of moneyness levels to include in the surface.
    start_date (str): Optional start date for filtering data.
    end_date (str): Optional end date for filtering data.

    Returns:
    pd.DataFrame: DataFrame representing the percentile rank surface.
    """
    try:
        date_str = pd.to_datetime(date).strftime('%Y-%m-%d')
        percentile_rank_surface = pd.DataFrame(index=moneyness_levels, columns=nested_dict[date_str][udl]['IV'].keys())
        
        for matu in percentile_rank_surface.columns:
            for mon in moneyness_levels:
                try:
                    values = {
                        pd.to_datetime(past_date): nested_dict[past_date][udl]['IV'][matu][mon]
                        for past_date in nested_dict
                        if udl in nested_dict[past_date] and 'IV' in nested_dict[past_date][udl]
                        and matu in nested_dict[past_date][udl]['IV'] and mon in nested_dict[past_date][udl]['IV'][matu]
                    }
                    
                    if values:
                        series = pd.Series(values)
                        # Filter series based on start_date and end_date
                        if start_date or end_date:
                            start_date = pd.to_datetime(start_date) if start_date else series.index.min()
                            end_date = pd.to_datetime(end_date) if end_date else series.index.max()
                            series = series.loc[start_date:end_date]
                        
                        percentile_series = series.rank(method='min').apply(lambda x: (x - 1) / (len(series) - 1))
                        rank_for_date = percentile_series.get(pd.to_datetime(date_str), np.nan)
                        percentile_rank_surface.at[mon, matu] = rank_for_date
                    else:
                        percentile_rank_surface.at[mon, matu] = np.nan
                except KeyError:
                    percentile_rank_surface.at[mon, matu] = np.nan

        return percentile_rank_surface.T
    except Exception as e:
        print(f"An error occurred in calculate_percentile_rank_surface: {e}")
        return pd.DataFrame()

# Function to calculate z-score surface
def calculate_zscore_surface(nested_dict, udl, date, moneyness_levels, start_date=None, end_date=None):
    try:
        date_str = pd.to_datetime(date).strftime('%Y-%m-%d')
        
        zscore_surface = pd.DataFrame(index=moneyness_levels, columns=nested_dict[date_str][udl]['IV'].keys())
        
        for matu in zscore_surface.columns:
            for mon in moneyness_levels:
                try:
                    values = {
                        pd.to_datetime(past_date): nested_dict[past_date][udl]['IV'][matu][mon]
                        for past_date in nested_dict
                        if udl in nested_dict[past_date] and 'IV' in nested_dict[past_date][udl]
                        and matu in nested_dict[past_date][udl]['IV'] and mon in nested_dict[past_date][udl]['IV'][matu]
                    }
                    
                    if values:
                        series = pd.Series(values)
                        mean = series.mean()
                        std = series.std()
                        zscore = (series.loc[pd.to_datetime(date_str)] - mean) / std if std > 0 else np.nan
                        zscore_surface.at[mon, matu] = zscore
                    else:
                        zscore_surface.at[mon, matu] = np.nan
                except KeyError:
                    zscore_surface.at[mon, matu] = np.nan

        return zscore_surface.T
    except Exception as e:
        print(f"An error occurred in calculate_zscore_surface: {e}")
        return pd.DataFrame()

# Function to create volatility surface
def create_vol_surface(nested_dict, date, udl, moneyness_levels):
    """
    Create a volatility surface from nested dictionary data.

    Parameters:
    nested_dict (dict): Nested dictionary containing the data.
    date (str): Target date for which the volatility surface is generated.
    udl (str): Underlying identifier.
    moneyness_levels (list): List of moneyness levels to include in the surface.

    Returns:
    pd.DataFrame: DataFrame representing the volatility surface.
    """
    date_str = date if isinstance(date, str) else date.strftime('%Y-%m-%d')
    if date_str not in nested_dict:
        raise ValueError(f"Date {date_str} not found in data.")

    vol_surface = pd.DataFrame(index=moneyness_levels)
    for matu, moneyness_data in nested_dict[date_str][udl]['IV'].items():
        vol_surface[matu] = [moneyness_data.get(moneyness, np.nan) for moneyness in moneyness_levels]
    vol_surface = vol_surface.T
    vol_surface.columns = [f'{mon}' for mon in vol_surface.columns]
    vol_surface.index.name = 'Maturity'
    vol_surface = vol_surface.applymap(lambda x: round(x, 2) if pd.notna(x) else np.nan)
    return vol_surface

# Function to ensure numerical format
def ensure_numerical(df):
    """
    Ensure that all values in the DataFrame are numerical.

    Parameters:
    df (pd.DataFrame): Input DataFrame.

    Returns:
    pd.DataFrame: DataFrame with numerical values.
    """
    return df.apply(pd.to_numeric, errors='coerce')

# Function to style DataFrame
def style_df(df, caption):
    """
    Style the DataFrame for better visualization.

    Parameters:
    df (pd.DataFrame): Input DataFrame.
    caption (str): Caption for the styled DataFrame.

    Returns:
    pd.io.formats.style.Styler: Styled DataFrame.
    """
    df = ensure_numerical(df)
    cm = sns.light_palette("green", as_cmap=True)
    
    if df.isnull().all().all():
        print("The DataFrame contains only NaN values.")
        df_styled = df.style.set_caption(caption)
    else:
        # Fill NaN values with 0 for styling purposes
        df_filled = df.fillna(0)
        # Apply styling to the DataFrame
        df_styled = df_filled.style.background_gradient(cmap=cm).format("{:.2f}").set_caption(caption)
    
    # Apply consistent table styles
    df_styled = df_styled.set_table_styles([
        {'selector': 'th', 'props': [('min-width', '90px'), ('max-width', '90px'), ('text-align', 'center')]},
        {'selector': 'td', 'props': [('text-align', 'center')]}
    ])
    return df_styled

# Function to plot surface
def plot_surface(nested_dict, udl, date, surface_type, moneyness_levels, start_date=None, end_date=None):
    try:
        date = pd.Timestamp(date)

        if surface_type == 'Level':
            vol_surface = create_vol_surface(nested_dict, date, udl, moneyness_levels)
            vol_surface = ensure_numerical(vol_surface)
            styled_df = style_df(vol_surface, "Volatility Surface")
            display(styled_df)
        elif surface_type == 'Percentile':
            if start_date and end_date:
                start_date = pd.Timestamp(start_date)
                end_date = pd.Timestamp(end_date)
                if start_date > end_date:
                    print("Start date cannot be after end date.")
                    return
                percentile_surface = calculate_percentile_rank_surface(nested_dict, udl, date, moneyness_levels, start_date, end_date)
                percentile_surface = ensure_numerical(percentile_surface)
                title = f"Percentile Surface ({udl}) From: {start_date.strftime('%Y-%m-%d')} to: {end_date.strftime('%Y-%m-%d')}"
                styled_df = style_df(percentile_surface, title)
                display(styled_df)
            else:
                print("Please select start and end dates for Percentile surface.")
        elif surface_type == 'Z-score':
            if start_date and end_date:
                start_date = pd.Timestamp(start_date)
                end_date = pd.Timestamp(end_date)
                if start_date > end_date:
                    print("Start date cannot be after end date.")
                    return
                zscore_surface = calculate_zscore_surface(nested_dict, udl, date, moneyness_levels, start_date, end_date)
                zscore_surface = ensure_numerical(zscore_surface)
                title = f"Z-score Surface ({udl}) From: {start_date.strftime('%Y-%m-%d')} to: {end_date.strftime('%Y-%m-%d')}"
                styled_df = style_df(zscore_surface, title)
                display(styled_df)
            else:
                print("Please select start and end dates for Z-score surface.")
        else:
            print("Invalid surface type selected.")
    except KeyError as e:
        print(f"KeyError: {e} - Ensure the selected date range is within the data's date range.")
    except Exception as e:
        print(f"An error occurred: {e}")

def toggle_date_widgets(surface_type):
    """
    Toggle the visibility of date widgets based on the selected surface type.
    """
    if surface_type in ['Percentile', 'Z-score']:
        start_date_widget.layout.display = 'block'
        end_date_widget.layout.display = 'block'
        date_widget.layout.display = 'none'
    else:
        start_date_widget.layout.display = 'none'
        end_date_widget.layout.display = 'none'
        date_widget.layout.display = 'block'

In [33]:
# Widget definitions
udl_widget = widgets.Dropdown(
    options=udl_list,
    value='JP_NKY',
    description='UDL:',
    disabled=False,
)

date_widget = widgets.DatePicker(
    description='Date',
    value=pd.to_datetime('2024-05-28'),
    disabled=False
)

start_date_widget = widgets.DatePicker(
    description='Start Date',
    value=pd.to_datetime('2024-01-01'),
    disabled=False
)

end_date_widget = widgets.DatePicker(
    description='End Date',
    value=pd.to_datetime('2024-05-27'),
    disabled=False
)

surface_type_widget = widgets.Dropdown(
    options=['Level', 'Percentile', 'Z-score'],
    value='Level',
    description='Type:',
    disabled=False,
)

def toggle_date_widgets(surface_type):
    if surface_type in ['Percentile', 'Z-score']:
        start_date_widget.layout.display = 'block'
        end_date_widget.layout.display = 'block'
        date_widget.layout.display = 'none'
    else:
        start_date_widget.layout.display = 'none'
        end_date_widget.layout.display = 'none'
        date_widget.layout.display = 'block'

surface_type_widget.observe(lambda change: toggle_date_widgets(change['new']), names='value')

# Create interactive output
output = widgets.interactive_output(
    plot_surface,
    {
        'nested_dict': widgets.fixed(nested_dict), 
        'udl': udl_widget, 
        'date': date_widget,
        'surface_type': surface_type_widget,
        'moneyness_levels': widgets.fixed(moneyness_levels),
        'start_date': start_date_widget,
        'end_date': end_date_widget
    }
)

# Layout for widgets
left_box = widgets.VBox([udl_widget, surface_type_widget], layout=widgets.Layout(margin='10px'))
right_box = widgets.VBox([start_date_widget, end_date_widget, date_widget], layout=widgets.Layout(margin='10px'))
top_box = widgets.HBox([left_box, right_box], layout=widgets.Layout(justify_content='space-between', align_items='center', margin='10px'))
main_box = widgets.VBox([top_box, output])

# Set initial visibility
toggle_date_widgets(surface_type_widget.value)

In [34]:
# Example data
nested_dict = {
    '2024-05-20': {'JP_NKY': {'IV': {1: {80: 0, 90: 0}}}},
    '2024-05-21': {'JP_NKY': {'IV': {1: {80: 0, 90: 0}}}},
    '2024-05-22': {'JP_NKY': {'IV': {1: {80: -1, 90: -1}}}},
    '2024-05-23': {'JP_NKY': {'IV': {1: {80: 0, 90: 0}}}},
    '2024-05-24': {'JP_NKY': {'IV': {1: {80: -10, 90: -10}}}},
    '2024-05-25': {'JP_NKY': {'IV': {1: {80: 20, 90: 20}}}},
    '2024-05-26': {'JP_NKY': {'IV': {1: {80: 1, 90: 1}}}},
    '2024-05-27': {'JP_NKY': {'IV': {1: {80: 2, 90: 2}}}},
    '2024-05-28': {'JP_NKY': {'IV': {1: {80: 3, 90: 3}}}},
}

# Widget definitions
udl_list = ['JP_NKY', 'DE_DAX', 'GB_FTSE100', 'CH_SMI', 'IT_FTMIB', 'ES_IBEX', 'US_SPX', 'EU_STOXX50E', 'EU_SX7E', 'EU_SX7P',
            'EU_SXDP', 'US_KO', 'US_MCD', 'US_KOMO', 'EU_SXPP', 'EU_SOXP', 'HK_HSI']

moneyness_levels = [80, 90, 100, 110, 120]

# Test the function
start_date = "2024-05-21"
end_date = "2024-05-27"
udl = 'JP_NKY'
date = "2024-05-28"

In [35]:
# Display the widgets and output
display(main_box)

VBox(children=(HBox(children=(VBox(children=(Dropdown(description='UDL:', options=('JP_NKY', 'DE_DAX', 'GB_FTS…

#### Data

In [36]:
# Example data
nested_dict = {
    '2024-05-20': {'JP_NKY': {'IV': {1: {80: 0, 90: 1}}}},
    '2024-05-21': {'JP_NKY': {'IV': {1: {80: 0, 90: 1}}}},
    '2024-05-22': {'JP_NKY': {'IV': {1: {80: -1, 90: 0}}}},
    '2024-05-23': {'JP_NKY': {'IV': {1: {80: 0, 90: 1}}}},
    '2024-05-24': {'JP_NKY': {'IV': {1: {80: -10, 90: 0}}}},
    '2024-05-25': {'JP_NKY': {'IV': {1: {80: 20, 90: 2}}}},
    '2024-05-26': {'JP_NKY': {'IV': {1: {80: 1, 90: 2}}}},
    '2024-05-27': {'JP_NKY': {'IV': {1: {80: 2, 90: 3}}}},
    '2024-05-28': {'JP_NKY': {'IV': {1: {80: 3, 90: 4}}}},
}

# Widget definitions
udl_list = ['JP_NKY', 'DE_DAX', 'GB_FTSE100', 'CH_SMI', 'IT_FTMIB', 'ES_IBEX', 'US_SPX', 'EU_STOXX50E', 'EU_SX7E', 'EU_SX7P',
            'EU_SXDP', 'US_KO', 'US_MCD', 'US_KOMO', 'EU_SXPP', 'EU_SOXP', 'HK_HSI']

moneyness_levels = [80, 90, 100, 110, 120]

# Test the function
start_date = "2024-05-21"
end_date = "2024-05-27"
udl = 'JP_NKY'
date = "2024-05-28"

### Sanity check

In [37]:
import pandas as pd

def calculate_percent_rank_inc(series):
    """
    Calculate PercentRank.INC for each row compared to all previous rows in a Series.

    Parameters:
    series (pd.Series): Series of observations.

    Returns:
    pd.Series: Series with PercentRank.INC values.
    """
    percent_ranks = []
    
    # Iterate through each row in the Series
    for i in range(len(series)):
        if i == 0:
            # The first value has the highest rank
            percent_ranks.append(1)
        else:
            # Get the current target value
            target_value = series.iloc[i]
            
            # Get the previous observations
            previous_observations = series.iloc[:i]
            
            # Count the number of previous observations less than the target value
            num_less_than_target = (previous_observations < target_value).sum()
            
            # Total number of previous observations
            total_observations = len(previous_observations)
            
            # Calculate PercentRank.INC
            percent_rank = num_less_than_target / total_observations
            percent_ranks.append(percent_rank)
    
    return pd.Series(percent_ranks, index=series.index)

# Example usage
dates = pd.date_range(start="2024-05-20", periods=9)
values = [0, 0, -1, 0, -10, 20, 1, 2, 3]
df = pd.DataFrame({
    'Value': values
}, index=dates)

# Apply the function
df['PercentRank.INC'] = calculate_percent_rank_inc(df['Value'])
df


Unnamed: 0,Value,PercentRank.INC
2024-05-20,0,1.0
2024-05-21,0,0.0
2024-05-22,-1,0.0
2024-05-23,0,0.333333
2024-05-24,-10,0.0
2024-05-25,20,1.0
2024-05-26,1,0.833333
2024-05-27,2,0.857143
2024-05-28,3,0.875
