In [23]:
import pandas as pd

import plotly.express as px
import plotly.graph_objects as go

import numpy as np

import plotly.graph_objects as go
import plotly.express as px
import plotly.subplots as ms

In [2]:
# Generate a range of dates
dates = pd.date_range(start='2022-01-01', end='2022-12-10', freq='D')

# Create a dataframe with random values
binary_categorical = np.random.randint(0, 2, len(dates))
category_probs = [0.1, 0.2, 0.3, 0.4]
non_uniform_categorical = np.random.choice(4, len(dates), p=category_probs)
data = dict()
data['Binary'] = binary_categorical
data['NonUniform'] = non_uniform_categorical
data['Value'] = np.arange(len(dates))*(1+binary_categorical/2) + binary_categorical*5 + np.random.normal(binary_categorical,50, len(dates))
df = pd.DataFrame(data, index=dates)

print(df)

            Binary  NonUniform       Value
2022-01-01       0           2    3.066116
2022-01-02       1           3  -14.566676
2022-01-03       0           1   33.678230
2022-01-04       1           0   24.825752
2022-01-05       0           3  -14.701009
...            ...         ...         ...
2022-12-06       1           2  564.834390
2022-12-07       1           2  443.898585
2022-12-08       1           0  550.016215
2022-12-09       1           3  508.511674
2022-12-10       0           1  300.085013

[344 rows x 3 columns]


In [44]:
def get_mean_and_empirical_bounds(
        df_with_time_index: pd.DataFrame, 
        resample_freq: str,
        response_col: str,
        condition_on: str = None,
    ):
    """
    This function takes `df_with_time_index` and resamples it following `resample_freq` as 
    described in https://pandas.pydata.org/docs/user_guide/timeseries.html#resampling as well
    as counts per resampled time.

    If `condition_on` is specified then also return all data grouped on class.

    If this is a downsample then we also return emprical 95% confidence intervals

    """
    if condition_on:
        resampled_df = df_with_time_index.groupby(condition_on)
    else:
        resampled_df = df_with_time_index.copy()
    resampled_df = resampled_df[response_col].resample(resample_freq)

    mean = resampled_df.mean()
    category_counts = resampled_df.count()
    if mean.index.nlevels == 2:
        # Here we need to make sure to get the lengths of a given category
        # instead of overall DF length
        resampled_lengths = len(mean.loc[mean.index.levels[0][0]])
    elif mean.index.nlevels == 1:
        resampled_lengths = len(mean)
    else:
        raise NotImplementedError(f"Cannot handle conditioning on more than one variable.")
        
    if resampled_lengths < df_with_time_index[response_col].shape[0]:
        lower_bound = resampled_df.quantile(0.025).fillna(mean.bfill())
        upper_bound = resampled_df.quantile(0.975).fillna(mean.bfill())
        return mean, category_counts, lower_bound, upper_bound
    else:
        return mean, category_counts, None, None

In [45]:
mean, counts, lower_bound, upper_bound = get_mean_and_empirical_bounds(df, '1W', 'Value')

In [46]:
conditioned_mean, conditioned_counts, conditioned_lower_bound, conditioned_upper_bound = get_mean_and_empirical_bounds(df, '1W', 'Value', 'Binary')

In [47]:
def plot_mean_and_bounds(mean: pd.Series, fig, lower_bound=None, upper_bound=None, row=1, col=1, legendgroup=None):
    
    def plot_bounds(lb, ub, color, fig, legendgroup, alpha=0.3):
        def hex_to_rgb(hex_color: str) -> tuple:
            hex_color = hex_color.lstrip("#")
            if len(hex_color) == 3:
                hex_color = hex_color * 2
            return int(hex_color[0:2], 16), int(hex_color[2:4], 16), int(hex_color[4:6], 16)
        
        color = f"rgba{(*hex_to_rgb(color), alpha)}" if color.startswith("#") else color
        fig.add_trace(
            go.Scatter(
                x=lb.index,
                y=lb,
                mode='lines',
                line=dict(width=0),
                showlegend=False,
                name='Lower Bound',
                legendgroup=legendgroup,
                visible="legendonly"
            ),
            row=row,
            col=col,
        )
        fig.add_trace(
            go.Scatter(
                x=ub.index,
                y=ub,
                fill='tonexty',
                mode='lines',
                line=dict(width=0),
                fillcolor=color,
                showlegend=False,
                name='Upper Bound',
                legendgroup=legendgroup,
                visible="legendonly"
            ),
            row=row,
            col=col,
        )

    color_gen = iter(px.colors.qualitative.Plotly)
    color = next(color_gen)
    colors_used = []
    if mean.index.nlevels == 2:
        for condition in mean.index.levels[0]:
            subset = mean.loc[condition]
            fig.add_trace(
                go.Scatter(
                    x=subset.index,
                    y=subset,
                    mode='lines',
                    name=f"{mean.name} | {condition}",
                    legendgroup=condition,
                    fillcolor=color,
                    visible="legendonly"
                ),
                row=row,
                col=col
            )
            colors_used.append(color)
            if lower_bound is not None:
                plot_bounds(lower_bound.loc[condition], upper_bound.loc[condition], color, fig, legendgroup=condition)
            color = next(color_gen)
    else:
        fig.add_trace(
            go.Scatter(
                x=mean.index,
                y=mean,
                mode='lines',
                name=mean.name,
                legendgroup=mean.name,
                fillcolor=color,
                visible="legendonly"
            ),
            row=row,
            col=col,
        )
        # Add bounds with a fill between
        if lower_bound is not None:
            plot_bounds(lower_bound, upper_bound, color, fig, legendgroup=mean.name)
        colors_used.append(color)
        
    return fig, colors_used

In [48]:
def plot_value_and_counts(df, response_col, condition_on=None, freq='1W', save=False):
    mean, counts, lower_bound, upper_bound = get_mean_and_empirical_bounds(df, freq, response_col, condition_on)
    fig = ms.make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.1)
    fig, colors = plot_mean_and_bounds(mean, fig, lower_bound, upper_bound, row=1, col=1)
    if not condition_on:
        fig.add_trace(
            go.Bar(
                x=counts.index,
                y=counts,
                name='Counts',
                marker=dict(color=colors[0]),
                legendgroup='Value',
                visible="legendonly"
            ),
            row=2,
            col=1
        )
    else:
        for i, condition in enumerate(counts.index.levels[0]):
            bar = go.Bar(
                    x=counts.loc[condition].index,
                    y=counts.loc[condition],
                    name=f'Counts | {condition}',
                    legendgroup=condition,
                    marker=dict(color=colors[i]),
                    visible="legendonly"
                )
            fig.add_trace(
                bar,
                row=2,
                col=1
            )
    fig.update_layout(
        title=f"{response_col} | {condition_on}"
    )
    if not save:
        fig.show()
    else:
        fig.write_html()

In [49]:
plot_value_and_counts(df, 'Value')

In [50]:
plot_value_and_counts(df, 'Value', 'Binary')