In [None]:
import pandas as pd
import polars as pl
import numpy as np
import random
import rtsvg
rt = rtsvg.RACETrack()

df = pl.read_csv('../../data/2013_vast_challenge/mc3_netflow/nf/nf-chunk1.csv')
df = rt.columnsAreTimestamps(df, 'parsedDate')
df = df.rename({'TimeSeconds':                '_del1_',
                'parsedDate':                 'timestamp',
                'dateTimeStr':                '_del2_',
                'ipLayerProtocol':            'pro',
                'ipLayerProtocolCode':        '_del3_',
                'firstSeenSrcIp':             'sip',
                'firstSeenDestIp':            'dip',
                'firstSeenSrcPort':           'spt',
                'firstSeenDestPort':          'dpt',
                'moreFragments':              '_del4_',
                'contFragments':              '_del5_',
                'durationSeconds':            'dur',
                'firstSeenSrcPayloadBytes':   '_del6_',
                'firstSeenDestPayloadBytes':  '_del7_',
                'firstSeenSrcTotalBytes':     'soct',
                'firstSeenDestTotalBytes':    'doct',
                'firstSeenSrcPacketCount':    'spkt',
                'firstSeenDestPacketCount':   'dpkt',
                'recordForceOut':             '_del8_'})
df = df.drop(['_del1_', '_del2_', '_del3_', '_del4_', '_del5_', '_del6_', '_del7_', '_del8_'])
df = df.sample(100_000)

In [None]:
#
# spreadLines() - attempt to implement this visualization
#
# Based on:
#
# @misc{kuo2024spreadlinevisualizingegocentricdynamic,
#       title={SpreadLine: Visualizing Egocentric Dynamic Influence}, 
#       author={Yun-Hsin Kuo and Dongyu Liu and Kwan-Liu Ma},
#       year={2024},
#       eprint={2408.08992},
#       archivePrefix={arXiv},
#       primaryClass={cs.HC},
#       url={https://arxiv.org/abs/2408.08992}, 
# }
# 
def spreadLines(rt_self,
                df,
                relationships,
                node_focus,
                ts_field        = None,  # Will attempt to guess based on datatypes
                every           = '1d',  # "the every field for the group_by_dynamic" ... 1d, 1h, 1m
                color_by        = None,
                count_by        = None,
                count_by_set    = False,
                widget_id       = None,
                w               = 1024,
                h               = 512,
                txt_h           = 12):
    if rt_self.isPolars(df) == False: raise Exception('spreadLines() - only supports polars dataframe')
    return SpreadLines(**locals())

#
#
#
class SpreadLines(object):
    #
    # transform all fields (if they area t-field)
    # - replace those fields w/ the new versions (i actually don't think the names change...)
    #
    def __transformFields__(self):
        # Gather up all of the fields that are going to be used
        _all_columns_ = [self.ts_field]
        if self.color_by is not None: _all_columns_.append(self.color_by)
        if self.count_by is not None: _all_columns_.append(self.count_by)
        for _relationship_ in self.relationships:
            _fm_, _to_ = _relationship_[0], _relationship_[1]
            if   type(_fm_) is str: _all_columns_.append(_fm_)
            elif type(_fm_) is tuple:
                for i in range(len(_fm_)): _all_columns_.append(_fm_[i])
            if   type(_to_) is str: _all_columns_.append(_to_)
            elif type(_to_) is tuple:
                for i in range(len(_to_)): _all_columns_.append(_to_[i])
        # Transform the fields
        self.df, _new_columns_ = self.rt_self.transformFieldListAndDataFrame(self.df, _all_columns_)
        # Remap them
        col_i = 0
        self.ts_field        = _new_columns_[col_i]
        col_i += 1
        if self.color_by is not None: 
            self.color_by = _new_columns_[col_i]
            col_i += 1
        if self.count_by is not None:
            self.count_by = _new_columns_[col_i]
            col_i += 1
        _new_relationships_ = []
        for _relationship_ in self.relationships:
            _fm_, _to_ = _relationship_[0], _relationship_[1]
            if   type(_fm_) is str: 
                _fm_ = _new_columns_[col_i]
                col_i += 1
            elif type(_fm_) is tuple:
                as_list = []
                for i in range(len(_fm_)):
                    as_list.append(_new_columns_[col_i])                    
                    col_i += 1
                _fm_ = tuple(as_list)
            if   type(_to_) is str: 
                _to_ = _new_columns_[col_i]
                col_i += 1
            elif type(_to_) is tuple:
                as_list = []
                for i in range(len(_to_)): 
                    as_list.append(_new_columns_[col_i])
                    col_i += 1
                _to_ = tuple(as_list)
            _new_relationships_.append((_fm_, _to_))
        self.relationships = _new_relationships_


    #
    # __consolidateRelationships__() - simplify the relationship fields into a single field
    # ... and use standard naming
    # ... replaces the "relationships" field w/ the consolidated field names
    # ... use (__fm0__, __to0__),( __fm1__, __to1__), etc.
    #
    def __consolidateRelationships__(self):
        new_relationships = []
        for i in range(len(self.relationships)):
            _fm_, _to_ = self.relationships[i]
            new_fm = f'__fm{i}__'
            new_to = f'__to{i}__'
            if type(_fm_) is str: self.df = self.df.with_columns(pl.col(_fm_).alias(new_fm))
            else:                 self.df = self.rt_self.createConcatColumn(self.df, _fm_, new_fm)
            if type(_to_) is str: self.df = self.df.with_columns(pl.col(_to_).alias(new_to))
            else:                 self.df = self.rt_self.createConcatColumn(self.df, _to_, new_to)
            new_relationships.append((new_fm, new_to))
        self.relationships = new_relationships

    #
    #
    #
    def __init__(self, rt_self, **kwargs):
        self.rt_self       = rt_self
        self.df            = rt_self.copyDataFrame(kwargs['df'])
        self.relationships = kwargs['relationships']
        self.node_focus    = kwargs['node_focus']
        self.ts_field      = self.rt_self.guessTimestampField(self.df) if kwargs['ts_field'] is None else kwargs['ts_field']
        self.every         = kwargs['every']
        self.color_by      = kwargs['color_by']
        self.count_by      = kwargs['count_by']
        self.count_by_set  = kwargs['count_by_set']
        self.widget_id     = f'spreadlines_{random.randint(0,65535)}' if kwargs['widget_id'] is None else kwargs['widget_id']
        self.w             = kwargs['w']
        self.h             = kwargs['h']
        self.txt_h         = kwargs['txt_h']
        # Unwrap any fields w/ the appropriate transforms
        self.__transformFields__()
        # Consolidate the fm's and to's into a simple field (__fm0__, __to0__),( __fm1__, __to1__), etc.
        self.__consolidateRelationships__()
        # How many bins?  And what's in those bins for nodes next to the focus?
        self.df = self.df.sort(self.ts_field)
        _bin_                    = 0
        _dfs_containing_focus_   = [] # focus  -> alter1 or alter1 -> focus
        _dfs_containing_alter2s_ = [] # alter1 -> alter2 or alter2 -> alter1  ... note does not include focus or alter1 <-> alter1
        self.bin_to_timestamps   = {}
        self.bin_to_alter1s      = {}
        self.bin_to_alter2s      = {}
        for k, k_df in self.df.group_by_dynamic(self.ts_field, every=self.every):
            _timestamp_     = k[0]
            _found_matches_ = False
            # find the first alters
            for i in range(len(self.relationships)):
                _fm_, _to_ = self.relationships[i]
                
                # From Is Focus
                _df_fm_is_focus_ = k_df.filter(pl.col(_fm_) == self.node_focus)
                _df_fm_is_focus_ = _df_fm_is_focus_.with_columns(pl.lit(_fm_).alias('__focus_col__'), pl.lit(_to_).alias('__alter_col__'), pl.lit(1).alias('__alter_level__'), pl.lit(_bin_).alias('__bin__'), pl.lit(_timestamp_).alias('__bin_ts__'), pl.lit('to').alias('__alter_side__'))
                if len(_df_fm_is_focus_) > 0: 
                    _dfs_containing_focus_.append(_df_fm_is_focus_)
                    if _bin_ not in self.bin_to_alter1s:        self.bin_to_alter1s[_bin_]       = {}
                    if 'to'  not in self.bin_to_alter1s[_bin_]: self.bin_to_alter1s[_bin_]['to'] = set()
                    self.bin_to_alter1s[_bin_]['to'] |= set(_df_fm_is_focus_[_to_])
                    _found_matches_ = True

                # To Is Focus
                _df_to_is_focus_ = k_df.filter(pl.col(_to_) == self.node_focus)
                _df_to_is_focus_ = _df_to_is_focus_.with_columns(pl.lit(_to_).alias('__focus_col__'), pl.lit(_fm_).alias('__alter_col__'), pl.lit(1).alias('__alter_level__'), pl.lit(_bin_).alias('__bin__'), pl.lit(_timestamp_).alias('__bin_ts__'), pl.lit('fm').alias('__alter_side__'))
                if len(_df_to_is_focus_) > 0:
                    _dfs_containing_focus_.append(_df_to_is_focus_)
                    if _bin_ not in self.bin_to_alter1s:        self.bin_to_alter1s[_bin_]       = {}
                    if 'fm'  not in self.bin_to_alter1s[_bin_]: self.bin_to_alter1s[_bin_]['fm'] = set()
                    self.bin_to_alter1s[_bin_]['fm'] |= set(_df_to_is_focus_[_fm_])
                    _found_matches_ = True

            # find the second alters
            if _found_matches_:
                _all_alter1s_ = set()
                if 'fm' in self.bin_to_alter1s[_bin_]: _all_alter1s_ |= self.bin_to_alter1s[_bin_]['fm']
                if 'to' in self.bin_to_alter1s[_bin_]: _all_alter1s_ |= self.bin_to_alter1s[_bin_]['to']
                # Go through all the relationships
                for i in range(len(self.relationships)):
                    _fm_, _to_ = self.relationships[i]
                    if 'fm' in self.bin_to_alter1s[_bin_]:
                        _df_          = k_df.filter(pl.col(_fm_).is_in(self.bin_to_alter1s[_bin_]['fm']) | pl.col(_to_).is_in(self.bin_to_alter1s[_bin_]['fm']))
                        _set_alter2s_ = (set(_df_[_fm_]) | set(_df_[_to_])) - (_all_alter1s_ | set([self.node_focus]))
                        if len(_set_alter2s_) > 0:
                            if _bin_ not in self.bin_to_alter2s:        self.bin_to_alter2s[_bin_]       = {}
                            if 'fm'  not in self.bin_to_alter2s[_bin_]: self.bin_to_alter2s[_bin_]['fm'] = set()
                            self.bin_to_alter2s[_bin_]['fm'] |= _set_alter2s_

                            _df_ = k_df.filter(pl.col(_fm_).is_in(self.bin_to_alter1s[_bin_]['fm']) & pl.col(_to_).is_in(_set_alter2s_))
                            _df_ = _df_.with_columns(pl.lit(_fm_).alias('__alter1_col__'), pl.lit(_to_).alias('__alter2_col__'), pl.lit(2).alias('__alter_level__'), pl.lit(_bin_).alias('__bin__'), pl.lit(_timestamp_).alias('__bin_ts__'), pl.lit('fm').alias('__alter_side__'))
                            _dfs_containing_alter2s_.append(_df_)

                            _df_ = k_df.filter(pl.col(_to_).is_in(self.bin_to_alter1s[_bin_]['fm']) & pl.col(_fm_).is_in(_set_alter2s_))
                            _df_ = _df_.with_columns(pl.lit(_to_).alias('__alter1_col__'), pl.lit(_fm_).alias('__alter2_col__'), pl.lit(2).alias('__alter_level__'), pl.lit(_bin_).alias('__bin__'), pl.lit(_timestamp_).alias('__bin_ts__'), pl.lit('fm').alias('__alter_side__'))
                            _dfs_containing_alter2s_.append(_df_)

                    if 'to' in self.bin_to_alter1s[_bin_]:
                        _df_          = k_df.filter(pl.col(_fm_).is_in(self.bin_to_alter1s[_bin_]['to']) | pl.col(_to_).is_in(self.bin_to_alter1s[_bin_]['to']))
                        _set_alter2s_ = (set(_df_[_fm_]) | set(_df_[_to_])) - (_all_alter1s_ | set([self.node_focus]))
                        if len(_set_alter2s_) > 0:
                            if _bin_ not in self.bin_to_alter2s:        self.bin_to_alter2s[_bin_]       = {}
                            if 'to'  not in self.bin_to_alter2s[_bin_]: self.bin_to_alter2s[_bin_]['to'] = set()
                            self.bin_to_alter2s[_bin_]['to'] |= _set_alter2s_

                            _df_ = k_df.filter(pl.col(_fm_).is_in(self.bin_to_alter1s[_bin_]['to']) & pl.col(_to_).is_in(_set_alter2s_))
                            _df_ = _df_.with_columns(pl.lit(_fm_).alias('__alter1_col__'), pl.lit(_to_).alias('__alter2_col__'), pl.lit(2).alias('__alter_level__'), pl.lit(_bin_).alias('__bin__'), pl.lit(_timestamp_).alias('__bin_ts__'), pl.lit('to').alias('__alter_side__'))
                            _dfs_containing_alter2s_.append(_df_)

                            _df_ = k_df.filter(pl.col(_to_).is_in(self.bin_to_alter1s[_bin_]['to']) & pl.col(_fm_).is_in(_set_alter2s_))
                            _df_ = _df_.with_columns(pl.lit(_to_).alias('__alter1_col__'), pl.lit(_fm_).alias('__alter2_col__'), pl.lit(2).alias('__alter_level__'), pl.lit(_bin_).alias('__bin__'), pl.lit(_timestamp_).alias('__bin_ts__'), pl.lit('to').alias('__alter_side__'))
                            _dfs_containing_alter2s_.append(_df_)

            if _found_matches_: self.bin_to_timestamps[_bin_] = _timestamp_
            _bin_ += 1
        
        # Concatenate the pieces and parts
        self.df_alter1s = pl.concat(_dfs_containing_focus_)
        self.df_alter2s = pl.concat(_dfs_containing_alter2s_)

#
# spreadLines()
#
sl = spreadLines(rt, df, [('sip','dip')], '172.30.0.4')

In [None]:
_set_ = set([1,3,5])
_set_ = _set_ | set([7])
_set_