In [103]:
import pandas as pd
import json
import typing as tp

from plotly import graph_objects as go

# pip install Pillow
from PIL.ImageColor import getcolor

In [119]:
def get_colors(fp, line_start, a_node, a_link, link_color_shift, color_step, cut):
    colors = []
    with open(fp, mode='r') as file:
        for line in file:
            colors.append('#' + line.strip())
    colors_node = []
    colors_link = []
    for i in range(line_start, len(colors)-cut, color_step):

        try:
            r, g, b = getcolor(colors[i+link_color_shift], "RGB")
            colors_link.append('rgba({r},{g},{b}, {a_link})'.format(r=r, g=g, b=b, a_link=a_link))

            r, g, b = getcolor(colors[i], "RGB")
            colors_node.append('rgba({r},{g},{b}, {a_node})'.format(r=r, g=g, b=b, a_node=a_node))  
        except IndexError:
            continue

    return colors_node, colors_link

In [105]:
def prepare_data(raw_data, cond_data):
    '''
    raw_data = json[
        {
            log_datetime: datetime,
            object_id: str | int,
            event_id: str | int,
            event_value: int
        }, ...
    ]
    
    cond_data = json[
        {
            event_id: str | int,
            event_value_condition: str | int,
            node_name: str
        }, ...
    ]

    output = json[
        {
            object_id: str | int,
            step: int,
            source_node_name: str,
            target_node_name: str
        }, ...
    ]
    '''
    def _node_name_definition(raw_data_row):
        event_id_absence = True
        for cond in cond_data:
            if raw_data_row.iloc[2] == cond['event_id']:
                event_id_absence = False
                try:
                    comparison, value = cond['event_value_condition'].split(' ')
                    value = int(value)
                except (ValueError, AttributeError):
                    comparison = '='
                    value = int(cond['event_value_condition'])
                if comparison == '=' and raw_data_row.iloc[3] == value:
                    return cond['node_name']
                elif comparison == '>' and raw_data_row.iloc[3] > value:
                    return cond['node_name']
                elif comparison == '<' and raw_data_row.iloc[3] < value:
                    return cond['node_name']
                elif comparison == '>=' and raw_data_row.iloc[3] >= value:
                    return cond['node_name']
                elif comparison == '<=' and raw_data_row.iloc[3] <= value:
                    return cond['node_name']
                else:
                    continue
            else:
                continue
        if event_id_absence:
            return 'undefined due to event_id absence'
        else:
            return 'undefined due to event_condition_value absence'


    def _drop_duplicates(raw_data_df):
        raw_data_df = raw_data_df.sort_values(by=['object_id','log_datetime', 'node_name']).reset_index(drop=True)
        indexes_to_delete = []
        for i in range(1, len(raw_data_df)):
            if raw_data_df.iloc[i]['object_id'] == raw_data_df.iloc[i-1]['object_id']\
                and raw_data_df.iloc[i]['node_name'] == raw_data_df.iloc[i-1]['node_name']:
                indexes_to_delete.append(i)
        raw_data_df = raw_data_df.drop(index=indexes_to_delete)
        return raw_data_df


    def _source_target_definition(raw_data_df):
        prepared_data = raw_data_df.sort_values(by=['object_id','log_datetime', 'node_name']).reset_index(drop=True)
        prepared_data['step'] = prepared_data.groupby(['object_id']).cumcount() + 1
        prepared_data['source_node_name'] = prepared_data['node_name']
        prepared_data['target_node_name'] = prepared_data.groupby(['object_id'])['source_node_name'].shift(-1)
        prepared_data = prepared_data\
            .query('target_node_name.isna() == False')[['object_id', 'step', 'source_node_name', 'target_node_name']]
        return prepared_data
    
    raw_data_df = pd.json_normalize(raw_data)
    raw_data_df['node_name'] = raw_data_df.apply(_node_name_definition, axis=1)
    raw_data_df = _drop_duplicates(raw_data_df)
    prepared_data = _source_target_definition(raw_data_df)

    return prepared_data.to_json(indent=4, orient='records', force_ascii=False)
    

In [106]:
def construct_sankey_data(prepared_data, 
                          node_color,
                          node_pad,
                          node_thickness,
                          node_line_color,
                          node_line_width,
                          link_color, 
                          title, 
                          title_width, 
                          title_height, 
                          title_font_size,
                          orientation,
                          valueformat,
                          valuesuffix
                          ):
    '''
    prepared_data: json = output from func prepare_data

    output: json = {
        "data":{\n
            "orientation": "h" | "v",\n
            "valueformat": ".0f" | str,\n
            "valuesuffix": str,\n
            "node": {
                "pad": int,
                "thickness": int,
                "line": {
                    "color": str,
                    "width": float
                    },
                "label": [str, ...],
                "color": ["rgba(0-255, 0-255, 0-255, 0.0 - 1.0)", ...]
                },\n
            "link": {
                "source": [int, ...],
                "target": [int, ...],
                "value": [float | int],
                "color": ["rgba(0-255, 0-255, 0-255, 0.0 - 1.0)", ...],
                "label": [optional[str], ...]
                }
            },\n
        "layout": {
            "title": {"text": str},
            "width": int,
            "height": int,
            "font": {"size": int}
            }
        }
    '''
    prepared_data_df = pd.json_normalize(prepared_data)

    def _prepare_lstv(prepared_data_df):
        label = []
        source = []
        target = []
        value = []

        # label
        object_id_prev = None
        for i in range(len(prepared_data_df)):
            object_id = prepared_data_df.iloc[i]['object_id']
            source_elem = prepared_data_df.iloc[i]['source_node_name']
            target_elem = prepared_data_df.iloc[i]['target_node_name']
            if object_id_prev != object_id:
                label_trimmed = label
                object_id_prev = object_id
            if source_elem not in(label_trimmed):
                label = [source_elem] + label
            if target_elem not in(label_trimmed):
                label = label + [target_elem]
                label_trimmed = label_trimmed + [target_elem]
            left_trim = label_trimmed.index(target_elem)
            label_trimmed = label_trimmed[left_trim:]

        # source, target, value
        object_id_prev = None
        result_dict = {}
        for i in range(len(prepared_data_df)):
            object_id = prepared_data_df.iloc[i]['object_id']
            source_elem = prepared_data_df.iloc[i]['source_node_name']
            target_elem = prepared_data_df.iloc[i]['target_node_name']
            if object_id_prev != object_id:
                position = 0
                object_id_prev = object_id
            source_indx = position + label[position:].index(source_elem)
            target_indx = source_indx + 1 + label[source_indx + 1:].index(target_elem)
            position = target_indx
            result_dict.setdefault((source_indx, target_indx), 0)
            result_dict[(source_indx, target_indx)] += 1

        for (source_elem, target_elem), value_elem in result_dict.items():
            source.append(source_elem)
            target.append(target_elem)
            value.append(value_elem)
        
        return label, source, target, value
    

    def _set_colors(node_color, link_color, label, source):
        labels_cnt = len(label)
        node_color_multiply = (round(labels_cnt/len(node_color)) + 1) * node_color
        link_color_multiply = (round(labels_cnt/len(link_color)) + 1) * link_color

        label_sorted_set = []
        for x in label:
            if x not in label_sorted_set:
                label_sorted_set.append(x)
            else:
                continue
        node_color_dict = {x: y for x, y in zip(label_sorted_set, node_color_multiply)}
        node_color_label = [node_color_dict[x] for x in label]

        link_color_dict = {x: y for x, y in zip(label_sorted_set, link_color_multiply)}
        link_color_label = [link_color_dict[x] for x in label]

        link_index_color_dict = {x: y for x, y in zip(list(range(len(label))), link_color_label)}
        link_color_source = [link_index_color_dict[x] for x in source]

        return node_color_label, link_color_source


    label, source, target, value = _prepare_lstv(prepared_data_df)
    node_color_label, link_color_source = _set_colors(node_color, link_color, label, source)

    output = {
        "data": {
            "orientation": orientation,
            "valueformat": valueformat,
            "valuesuffix": valuesuffix,
            "node": {
                "pad": node_pad,
                "thickness": node_thickness,
                "line": {"color": node_line_color, "width": node_line_width},
                "label": label,
                "color": node_color_label
                },
            "link": {
                "source": source,
                "target": target,
                "value": value,
                "color": link_color_source,
                "label": []
                }
            },
        "layout": {
            "title": {"text": title},
            "width": title_width,
            "height": title_height,
            "font": {"size": title_font_size}
            }
        }
    
    return output


In [107]:
def sankey_fig(sankey_data):
    fig = go.Figure(
        data=[go.Sankey(
            valueformat = sankey_data['data']['valueformat'],
            valuesuffix = sankey_data['data']['valuesuffix'],
            domain = {"x": [0,1],"y": [0,1]},
            arrangement ='freeform',
            orientation = sankey_data['data']['orientation'],
            node = {**sankey_data['data']['node'],
                    'hoverlabel': {'align': 'left', 'bgcolor': 'lightgreen', 'bordercolor': 'white',
                                   # 'namelength': 4
                                   },
                    # 'hovertemplate': '%{label}<extra>%{value}</extra>'
                    },
            link = {**sankey_data['data']['link'],
                    'arrowlen': 10,
                    'hoverlabel': {'bgcolor': 'lightgreen', 'bordercolor': 'black', 'align': 'left',
                                   # 'namelength': 4
                                   },
                    # 'hovertemplate': 'sa<extra>%{value}</extra>'
                    },
            textfont = {'color': 'black'}
        )]
    )
    fig.update_layout(title_text=sankey_data['layout']['title']['text'],
                      font_size=sankey_data['layout']['font']['size'])
    return fig

In [137]:
duel_colors_node, duel_colors_link = get_colors(fp='/home/ivan/projects/visual_experiments/duel.txt',
                                      line_start=3,
                                      a_node=1,
                                      a_link=0.3,
                                      link_color_shift=1,
                                      color_step=8,
                                      cut=0)


dewdrop_colors_node, dewdrop_colors_link = get_colors(fp='/home/ivan/projects/visual_experiments/dewdrop-dynasty-40.txt',
                                      line_start=0,
                                      a_node=0.5,
                                      a_link=0.3,
                                      link_color_shift=1,
                                      color_step=2,
                                      cut=6)


raw_data = pd.read_excel('/home/ivan/projects/visual_experiments/data_sankey.xlsx')
raw_data = raw_data.to_json(indent=4, orient='records', force_ascii=False)
raw_data = json.loads(raw_data)


cond_data = pd.read_excel('/home/ivan/projects/visual_experiments/data_sankey_cond.xlsx', )
cond_data = cond_data.to_json(indent=4, orient='records', force_ascii=False)
cond_data = json.loads(cond_data)


prepared_data = json.loads(prepare_data(raw_data, cond_data))


sankey_data = construct_sankey_data(prepared_data=prepared_data, 
                                    node_color=duel_colors_node,
                                    node_pad=50,
                                    node_thickness=20,
                                    node_line_color='black',
                                    node_line_width=0.5,
                                    link_color=duel_colors_link, 
                                    title='Test Diagram', 
                                    title_width=10, 
                                    title_height=50, 
                                    title_font_size=10,
                                    orientation='h',
                                    valueformat='int',
                                    valuesuffix='чел')


sankey_fig(sankey_data).show()
