# Waterfall Chart

See:

- [Wikipedia article](https://en.wikipedia.org/wiki/Waterfall_chart)
- [ggplot2 + waterfalls](https://r-charts.com/flow/waterfall-chart/)

In [1]:
from lets_plot import *

In [2]:
LetsPlot.setup_html()

In [3]:
data = dict(
    x = ["A", "B", "C", "D", "E"],
    y = [100, 200, -400, 500, -200],
)

In [4]:
_DY_STAT_NAME = "dy"
_CUMSUM_STAT_NAME = "cumsum"
_INITIAL_STAT_NAME = "initial"
_FLOW_TYPE_STAT_NAME = "Flow type"
_DY_TOOLTIP_TITLE = "Difference"
_CUMSUM_TOOLTIP_TITLE = "Cumulative sum"
_INITIAL_TOOLTIP_TITLE = "Initial"
_FLOW_TYPE_NAMES = {
    "increase": "Increase",
    "decrease": "Decrease",
    "total": "Total",
}
_FLOW_TYPE_BOX_COLORS = {
    "increase": "#4daf4a",
    "decrease": "#e41a1c",
    "total": "#377eb8",
}
_FLOW_TYPE_TEXT_COLORS = {
    "increase": "#b2df8a",
    "decrease": "#fb9a99",
    "total": "#a6cee3",
}
_FLOW_TYPE_COLOR_VALUE = 'flow_type'

_SIZE_DEF = 0
_WIDTH_DEF = .9
_SHOW_LEGEND_DEF = False
_TOOLTIPS_DEF = layer_tooltips().title("^x")\
                                .line("{0}|@{1}".format(_INITIAL_TOOLTIP_TITLE, _INITIAL_STAT_NAME))\
                                .line("{0}|@{1}".format(_DY_TOOLTIP_TITLE, _DY_STAT_NAME))\
                                .line("{0}|@{1}".format(_CUMSUM_TOOLTIP_TITLE, _CUMSUM_STAT_NAME))\
                                .disable_splitting()
_SORTED_VALUE_DEF = False
# Total
_CALC_TOTAL_DEF = True
# Horizontal line
_HLINE_PROPS_DEF = dict(color=None, size=None, linetype='dashed', blank=True)
_HLINE_ONTOP_DEF = True
# Connector lines
_CONNECTOR_PROPS_DEF = dict(color=None, size=None, linetype=None, blank=False)
# Labels
_LABEL_PROPS_DEF = dict(
    color="white",
    family=None, face=None, size=None,
    angle=None,
    hjust=None, vjust=None,
    margin=None,
    blank=False
)

def _get_flow_type_names(total_title):
    flow_type_names = _FLOW_TYPE_NAMES.copy()
    if total_title is not None:
        flow_type_names['total'] = total_title
    return flow_type_names

def _get_props(props, props_def):
    if props is None:
        return props_def
    else:
        return {**props_def, **{k: v for k, v in props.items() if v is not None}}

def _get_stat_data(data, x, y, sorted_value, calc_total, threshold, max_values, flow_type_names):
    xs, ys = data[x], data[y]
    assert len(xs) == len(set(xs)), "x values shouldn't contains duplicates"
    if sorted_value:
        xs, ys = zip(*sorted(zip(xs, ys), key=lambda p: abs(p[1]), reverse=True))
    xs = [str(v) for v in xs]
    if threshold is not None:
        other_value = sum([v for v in ys if abs(v) < threshold])
        xs, ys = zip(*[p for p in zip(xs, ys) if abs(p[1]) >= threshold])
        if abs(other_value) > 0:
            xs = list(xs) + ["Other"]
            ys = list(ys) + [other_value]
    elif max_values is not None:
        indices = list(zip(*sorted(zip(range(len(xs)), xs, ys), key=lambda p: abs(p[2]), reverse=True)))[0][:max_values]
        other_value = sum([v for i, v in enumerate(ys) if i not in indices], 0)
        xs = [v for i, v in enumerate(xs) if i in indices]
        ys = [v for i, v in enumerate(ys) if i in indices]
        if abs(other_value) > 0:
            xs = xs + ["Other"]
            ys = ys + [other_value]
    cum_sum = 0
    yprev = []
    ynext = []
    ymin = []
    ymax = []
    flow_type = []
    for y_val in ys:
        yprev.append(cum_sum)
        ynext.append(cum_sum + y_val)
        ymin.append(min(cum_sum, ynext[-1]))
        ymax.append(max(cum_sum, ynext[-1]))
        flow_type.append(flow_type_names["increase"] if y_val >= 0 else flow_type_names["decrease"])
        cum_sum = ynext[-1]
    if calc_total:
        xs = list(xs) + [flow_type_names["total"]]
        ys = list(ys) + [cum_sum - ys[0]]
        yprev.append(ys[0])
        ynext.append(cum_sum)
        ymin.append(min(cum_sum, 0))
        ymax.append(max(cum_sum, 0))
        flow_type.append(flow_type_names["total"])
    return {
        'x': xs,
        _INITIAL_STAT_NAME: yprev,
        _CUMSUM_STAT_NAME: ynext,
        _DY_STAT_NAME: ys,
        'ymin': ymin,
        'ymax': ymax,
        _FLOW_TYPE_STAT_NAME: flow_type,
    }

def _get_annotations_data(stat_data, calc_total):
    n = len(stat_data['x'])
    return {**stat_data,
            **{'y': [(stat_data["ymin"][i] + stat_data["ymax"][i]) / 2 \
                     for i in range(n)],
               'label': [(stat_data[_DY_STAT_NAME][i] if i < n - 1 or not calc_total else stat_data[_CUMSUM_STAT_NAME][i]) \
                         for i in range(n)]}}

def _get_intermediate_lines(stat_data):
    from itertools import pairwise
    xs = []
    ys = []
    xends = []
    yends = []
    for i in range(len(stat_data['x']) - 1):
        xs.append(stat_data['x'][i])
        ys.append(stat_data[_CUMSUM_STAT_NAME][i])
    return {
        'x': xs,
        'y': ys,
    }

def _append_hline_layer(p, hline):
    hline = _get_props(hline, _HLINE_PROPS_DEF)
    if hline['blank'] != True:
        p += geom_hline(yintercept=0, color=hline['color'], size=hline['size'], linetype=hline['linetype'], tooltips='none')
    return p

def _append_connector_layer(p, connector, stat_data, width):
    connector = _get_props(connector, _CONNECTOR_PROPS_DEF)
    if connector['blank'] != True:
        actual_width = _WIDTH_DEF if width is None else width
        p += geom_spoke(aes('x', 'y'), angle=0, radius=1-actual_width, \
                        data=_get_intermediate_lines(stat_data), \
                        position=position_nudge(x=.5-(1-actual_width)/2.0), \
                        linetype=connector['linetype'], \
                        color=connector['color'], size=connector['size'], tooltips='none')
    return p

def _append_label_layer(p, label, stat_data, calc_total, label_format, show_legend):
    label = _get_props(label, _LABEL_PROPS_DEF)
    if label['blank'] != True:
        label_mapping_dict = {'x': 'x', 'y': 'y', 'label': 'label'}
        color_is_mapped = label['color'] == _FLOW_TYPE_COLOR_VALUE
        if color_is_mapped:
            label_mapping_dict['color'] = _FLOW_TYPE_STAT_NAME
            label['color'] = None
        p += geom_text(aes(**label_mapping_dict), \
                       data=_get_annotations_data(stat_data, calc_total), \
                       color=label['color'], label_format=label_format, \
                       family=label['family'], fontface=label['face'], size=label['size'],
                       angle=label['angle'], hjust=label['hjust'], vjust=label['vjust'],
                       show_legend=(show_legend and color_is_mapped))
    return p

def _append_scales(p, calc_total, flow_type_names):
    actual_flow_type_names = list(flow_type_names.keys())
    if not calc_total:
        actual_flow_type_names.remove('total')
    return p + \
        scale_fill_manual(values={flow_type_names[k]: _FLOW_TYPE_BOX_COLORS[k] for k in actual_flow_type_names}) + \
        scale_color_manual(values={flow_type_names[k]: _FLOW_TYPE_TEXT_COLORS[k] for k in actual_flow_type_names})

def waterfall_plot(data, x, y, *, \
                   color=None, fill=_FLOW_TYPE_COLOR_VALUE, size=_SIZE_DEF, alpha=None, linetype=None, width=_WIDTH_DEF, \
                   show_legend=_SHOW_LEGEND_DEF, tooltips=_TOOLTIPS_DEF, \
                   sorted_value=_SORTED_VALUE_DEF, threshold=None, max_values=None, \
                   calc_total=_CALC_TOTAL_DEF, total_title=None, \
                   hline=None, hline_ontop=_HLINE_ONTOP_DEF, \
                   connector=None, \
                   label=None, label_format=None):
    # Step 1: Prepare dict of flow_type names
    flow_type_names = _get_flow_type_names(total_title)
    # Step 2: Use flow_type_names to prepare the stat data and the mapping
    stat_data = _get_stat_data(data, x, y, sorted_value, calc_total, threshold, max_values, flow_type_names)
    mapping_dict = {'x': 'x', 'y': _CUMSUM_STAT_NAME, 'ymin': 'ymin', 'ymax': 'ymax'}
    if fill == _FLOW_TYPE_COLOR_VALUE:
        mapping_dict['fill'] = _FLOW_TYPE_STAT_NAME
        fill = None
    # Step 3: Build plot from different layers
    p = ggplot()
    if not hline_ontop:
        p = _append_hline_layer(p, hline)
    p = _append_connector_layer(p, connector, stat_data, width)
    p += geom_crossbar(aes(**mapping_dict), \
                       data=stat_data, \
                       fatten=0, \
                       color=color, fill=fill, size=size, alpha=alpha, linetype=linetype, \
                       width=width, \
                       show_legend=show_legend, tooltips=tooltips)
    if hline_ontop:
        p = _append_hline_layer(p, hline)
    p = _append_label_layer(p, label, stat_data, calc_total, label_format, show_legend)
    p = _append_scales(p, calc_total, flow_type_names)
    return p

## Default

In [5]:
waterfall_plot(data, 'x', 'y')

## Parameters

### Aesthetics

In [6]:
# color
waterfall_plot(data, 'x', 'y', color="magenta")

In [7]:
# fill
waterfall_plot(data, 'x', 'y', fill="blue")

In [8]:
# size
waterfall_plot(data, 'x', 'y', size=2)

In [9]:
# alpha
waterfall_plot(data, 'x', 'y', alpha=.5)

In [10]:
# linetype
waterfall_plot(data, 'x', 'y', size=1, linetype='dashed')

In [11]:
# width
waterfall_plot(data, 'x', 'y', width=.4)

### Standard parameters

In [12]:
# show_legend
gggrid([
    waterfall_plot(data, 'x', 'y', show_legend=True) + ggtitle("Show legend", "Default calc_total"),
    waterfall_plot(data, 'x', 'y', show_legend=True, calc_total=False) + ggtitle("Show legend", "calc_total=False"),
])

In [13]:
# tooltips
gggrid([
    waterfall_plot(data, 'x', 'y', tooltips='none'),
    waterfall_plot(data, 'x', 'y', tooltips=layer_tooltips().line("@dy: from @initial to @cumsum").disable_splitting())
])

### Waterfall-specific parameters

In [14]:
# sorted_value
waterfall_plot(data, 'x', 'y', sorted_value=True)

In [15]:
# threshold
waterfall_plot(data, 'x', 'y', threshold=300)

In [16]:
# max_values
waterfall_plot(data, 'x', 'y', max_values=3)

In [17]:
# Use threshold to skip zeros
data_with_zeros = dict(
    x=['a', 'b', 'c', 'd', 'e'],
    y=[1, -2, 3, 0, 1],
)

gggrid([
    waterfall_plot(data_with_zeros, 'x', 'y'),
    waterfall_plot(data_with_zeros, 'x', 'y', threshold=1),
])

In [18]:
# calc_total
waterfall_plot(data, 'x', 'y', calc_total=False)

In [19]:
# total_title
waterfall_plot(data, 'x', 'y', total_title="Result", show_legend=True)

### Control additional geometries

In [20]:
# hline
waterfall_plot(data, 'x', 'y', hline=element_line())

In [21]:
# hline_ontop
waterfall_plot(data, 'x', 'y', hline=element_line(), hline_ontop=False)

In [22]:
# hline color
waterfall_plot(data, 'x', 'y', hline=element_line(color="magenta"))

In [23]:
# hline size
waterfall_plot(data, 'x', 'y', hline=element_line(size=2))

In [24]:
# hline linetype
waterfall_plot(data, 'x', 'y', hline=element_line(linetype='solid'))

In [25]:
# connector
waterfall_plot(data, 'x', 'y', width=.5, connector=element_line(blank=True))

In [26]:
# connector color
waterfall_plot(data, 'x', 'y', width=.5, connector=element_line(color="magenta"))

In [27]:
# connector size
waterfall_plot(data, 'x', 'y', width=.5, connector=element_line(size=2))

In [28]:
# connector linetype
waterfall_plot(data, 'x', 'y', width=.5, connector=element_line(linetype='dotted'))

In [29]:
# label
waterfall_plot(data, 'x', 'y', label=element_text(blank=True))

In [30]:
# label color
waterfall_plot(data, 'x', 'y', label=element_text(color="yellow"))

In [31]:
# label family
waterfall_plot(data, 'x', 'y', label=element_text(family="Courier"))

In [32]:
# label face
waterfall_plot(data, 'x', 'y', label=element_text(face='bold'))

In [33]:
# label size
waterfall_plot(data, 'x', 'y', label=element_text(size=12))

In [34]:
# label angle
waterfall_plot(data, 'x', 'y', label=element_text(angle=45))

In [35]:
# label hjust/vjust
def get_waterfall_with_justified_labels(hjust, vjust):
    return waterfall_plot(data, 'x', 'y', label=element_text(hjust=hjust, vjust=vjust)) + \
        ggtitle("Justified labels", "hjust={0}, vjust={1}".format(hjust, vjust))

gggrid([
    get_waterfall_with_justified_labels(0, 0), get_waterfall_with_justified_labels(0, 1),
    get_waterfall_with_justified_labels(1, 0), get_waterfall_with_justified_labels(1, 1),
], ncol=2)

In [36]:
# label_format
waterfall_plot(data, 'x', 'y', label_format=".2f")

## Other Customizations

In [37]:
# fill and color

gggrid([
    waterfall_plot(data, 'x', 'y', show_legend=True) + ggtitle("Default"),
    waterfall_plot(data, 'x', 'y', show_legend=True, fill='flow_type') + ggtitle("fill='flow_type'"),
    waterfall_plot(data, 'x', 'y', show_legend=True, label=element_text(color='flow_type')) + ggtitle("label color='flow_type'"),
    waterfall_plot(data, 'x', 'y', show_legend=True, fill=None, label=element_text(color='flow_type')) + ggtitle("fill=None and label color='flow_type'"),
    waterfall_plot(data, 'x', 'y', show_legend=True, color="#777777", label=element_text(color="#777777")) + \
        scale_fill_manual({"Increase": "white", "Decrease": "black", "Total": "yellow"}) + \
        ggtitle("Custom scale_fill_manual()"),
    waterfall_plot(data, 'x', 'y', show_legend=True, fill="black", label=element_text(color='flow_type')) + \
        scale_color_manual({"Increase": "green", "Decrease": "red", "Total": "#bbbbbb"}) + \
        ggtitle("Custom scale_color_manual()"),
    waterfall_plot(data, 'x', 'y', show_legend=True, color="#777777", label=element_text(color="#777777")) + \
        scale_fill_manual({"Increase": "green", "Decrease": "red", "Total": "yellow"}, labels=["Up", "Down", "Result"]) + \
        ggtitle("Custom flow type names"),
], ncol=3)

In [38]:
# flip coordinates
waterfall_plot(data, 'x', 'y') + coord_flip()

In [39]:
# custom theme
waterfall_plot(data, 'x', 'y') + theme_bw()