In [1]:
#default_exp richext

# Extensions To Rich

Extensions to rich for ghtop.

In [2]:
#export
import time,random
from dataclasses import dataclass
from typing import List
from collections import deque, OrderedDict, namedtuple
from ghtop.all_rich import (Console, Color, FixedPanel, box, Segments, Live,
                            grid, ConsoleOptions, Progress, BarColumn, Spinner)
from ghapi.event import *
from fastcore.all import *
console = Console()


In [3]:
evts = load_sample_events()
exs = [first(evts, risinstance(o)) for o in described_evts]

## Animated Stats

This section outlines how we can display statistics and visualizations such as sparklines and status bars that are animated as events are received.

### `EProg` - Progress Bar

In [4]:
#export
class EProg:
    "Progress bar with a heading `hdg`."
    def __init__(self, hdg='Quota', width=10):
        self.prog = Progress(BarColumn(bar_width=width), "[progress.percentage]{task.percentage:>3.0f}%")
        self.task = self.prog.add_task("",total=100, visible=False)
        store_attr()
    def update(self, completed): self.prog.update(self.task, completed=completed)
    def __rich_console__(self, console: Console, options: ConsoleOptions):
        self.prog.update(self.task, visible=True)
        yield grid([["Quota"], [self.prog.get_renderable()]], width=self.width+2, expand=False)


When you instantiate `Eprog` the starting progress is set to 0%:

In [5]:
p = EProg()
console.print(p)

Output()

You can update the progress bar with the `update` method:

In [6]:
p.update(10)
console.print(p)

### `Espark` - A sparkline combined with an EventTimer

fastcore's `EventTimer` calculates frequency metrics aggregated by slices of time specified by the argument `span`.  The `EventTimer` can produce a sparkline that shows the last n time slices, where n is specified by the parameter `store`:

In [7]:
#export
class ESpark(EventTimer):
    "An `EventTimer` that displays a sparkline with a heading `nm`."
    def __init__(self, nm:str, color:str, ghevts:List[GhEvent]=None, store=5, span=.2, stacked=True): 
        super().__init__(store=store, span=span)
        self.ghevts=L(ghevts)
        store_attr('nm,color,store,span,stacked')
    def _spark(self): return f"[{self.color}]{self.freq:.0f} {sparkline(self.hist)}[/]"
    def _nm(self): return f"[{self.color}]{self.nm}[/]"
    def __rich_console__(self, console: Console, options: ConsoleOptions): 
        yield grid([[self._nm()], [self._spark()]]) if self.stacked else f'{self._nm()}  {self._spark()}'
    __repr__ = basic_repr('nm,color,ghevts')

By default `nm` will be stacked on top of the sparkline.  We simulate adding events to `ESpark` and render the result:

In [8]:
e = ESpark(nm='💌Issue', color='blue')

def _r(): return random.randint(1,30)

def _sim(e, steps=8, sleep=.2):
    for i in range(steps):
        e.add(_r())
        time.sleep(sleep)

_sim(e)
console.print(e)

If you would prefer `nm` and the sparkline to be on one line instead, you can set `stacked` to `false`:

In [9]:
e = ESpark(color='blue', nm='💌Issue', stacked=False)
_sim(e)
console.print(e)

In [166]:
None in ['foo', 'bar']

False

## New

In [176]:
class SpkMap:
    def __init__(self, spks:List[ESpark], default:str=None): 
        self.ev2spk={ev:s for s in spks for ev in s.ghevts}
        self.names = [s.nm for s in spks]
        if default: assert default in self.names, f'default value must be one of SpkCfg names: {self.names}'
        self.defaultevt = first([s for s in spks if s.nm==default])
        store_attr()
    def __getitem__(self, k): return self.ev2spk[k] if not self.default else self.ev2spk.get(k, self.defaultevt)
    def add_events(self, evs:GhEvent): L(evs).map(lambda x: self[x].add(1))
    __repr__ = basic_repr('spks,default')

In [179]:
s1 = ESpark('Issues', 'blue', [IssueCommentEvent, IssuesEvent])
s2 = ESpark('PR', 'red', [PullRequestEvent, PullRequestReviewCommentEvent, PullRequestReviewEvent])
s3 = ESpark('Other', 'red', span=60)

sm = SpkMap(spks=[s1,s2,s3], default='Other')

In [180]:
test_eq(sm[IssuesEvent], s1)
test_eq(sm[PullRequestReviewCommentEvent], s2)
test_eq(sm[ReleaseEvent], s3)
test_eq(sm[WatchEvent], s3)

In [181]:
for i in range(5): sm.add_events(WatchEvent)
test_eq(sm[WatchEvent].events,5)

### SparkGrp - Sparklines, Progress bars and Statistics Combined

We may want to combine sparklines (with `ESpark`), spinners, and progress bars (with `EProg`) to display organized information concerning an event stream.  `SparkGrp` helps you create, group, display and update these elements together.

In [141]:
#export
Sfg = namedtuple('Sparkcfg', ['color', 'nm'])

# TODO refactor this to incorporate SpkMap
class SparkGrp:
    "Renders a group of `ESpark` along with a spinner and progress bar that are dynamically sized."
    def __init__(self, spks:List[SparkCfg], store=5, span=.2, stacked=True, max_width=console.width-5, spin:str='earth', spn_lbl="/min"):
        self.spn = Spinner(spin)
        self.sparklines = OrderedDict({s.nm: ESpark(color=s.color, nm=s.nm, store=store, span=span, stacked=stacked) for s in spks})
        self.slen = len(self.sparklines) * max(15, store*2)
        self.plen = max(max_width-self.slen-15, 15)
        self.progbar = EProg(width=self.plen)
        store_attr()
        
    def get_spk(self): 
        return grid([self.sparklines.values()], width=min(console.width-15, self.slen), expand=False)
    
    def get_spinner(self): return grid([[self.spn], [self.spn_lbl]])
    
    def update(self, events_dict:Dict[str,int], pct_complete:int=None):
        for k in events_dict: 
            if k in self.sparklines: self.sparklines[k].add(events_dict[k])
        if pct_complete: self.progbar.update(pct_complete)
        
    def __rich_console__(self, console: Console, options: ConsoleOptions): 
        yield grid([[self.get_spinner(), self.get_spk(), grid([[self.progbar]], width=self.plen+5) ]], width=self.max_width)


NameError: name 'SparkCfg' is not defined

In [140]:
scs = [SpkCfg('a', 'blue', [1,2,3,4]), SpkCfg('d', 'blue', [5,6,7,8])]



{1: 'a', 2: 'a', 3: 'a', 4: 'a', 5: 'd', 6: 'd', 7: 'd', 8: 'd'}

In [207]:
od = OrderedDict({'a':1, 'b':2})
1 in od.values()

True

In [134]:
c.events

(#4) [1,2,3,4]

Instantiate a `SparkGrp` by passing in a list of `SparkCfg`.  This builds a group of `ESpark` sparklines which you can display like so:

In [11]:
s = SparkGrp([SparkCfg('blue', '⭐Push'), 
              SparkCfg('red', '🎁Issue'), 
              SparkCfg('purple', '💌PRs'),
              SparkCfg('green', '🚀Release'), 
             ], stacked=True)

console.print(s)

Output()

You can add events to all `ESpark` instances of the `SparkGrp` at once by passing a dictionary to the `update` method.  Furthermore, you can pass the `pct_complete` argument to `update` to advance the progress bar:

In [12]:
def _d(): return {'⭐Push':_r(), '🎁Issue':_r(), '💌PRs':_r(), '🚀Release':_r()}

s.update(_d())
time.sleep(.1)
s.update(_d(), pct_complete=40)

console.print(s)

Here is what this looks like when animated using `Live`:

In [13]:
def _sim_spark(s, sleep=.05):
    with Live(refresh_per_second=20) as live:
        for i in range(101):
            evts = {k: _r() for k in s.sparklines.keys()}
            s.update(evts, pct_complete=i)
            live.update(s)
            time.sleep(sleep)
            
_sim_spark(s)

Output()

The progress bar will automatically resize depending on the number of sparklines you add to `SparkGrp`.  For example, if we only have one sparkline the width of objects dynamically change to look like this:

In [14]:
s = SparkGrp([SparkCfg('blue', 'foo')], store=20, stacked=True)
_sim_spark(s)

Output()

Output()

## Event Panel

Display GitHub events in a `FixedPanel`, which is a frame of fixed height that displays streaming data.

In [15]:
#export
@patch
def __rich_console__(self:GhEvent, console, options):
    res = Segments(options)
    kw = {'color': colors[self.type]}
    res.add(f'{self.emoji}  ')
    res.add(self.actor.login, pct=0.25, bold=True, **kw)
    res.add(self.description, pct=0.5, **kw)
    res.add(self.repo.name, pct=0.5 if self.text else 1, space = ': ' if self.text else '', italic=True, **kw)
    if self.text: 
        clean_text = self.text.replace('\n', ' ').replace('\r', ' ')
        res.add (f'"{clean_text}"', pct=1, space='', **kw)
    res.add('\n')
    return res

In [16]:
#export
colors = dict(
    PushEvent=None, CreateEvent=Color.red, IssueCommentEvent=Color.green, WatchEvent=Color.yellow,
    PullRequestEvent=Color.blue, PullRequestReviewEvent=Color.magenta, PullRequestReviewCommentEvent=Color.cyan,
    DeleteEvent=Color.bright_red, ForkEvent=Color.bright_green, IssuesEvent=Color.bright_magenta,
    ReleaseEvent=Color.bright_blue, MemberEvent=Color.bright_yellow, CommitCommentEvent=Color.bright_cyan,
    GollumEvent=Color.white, PublicEvent=Color.turquoise4)

colors2 = dict(
    PushEvent=None, CreateEvent=Color.dodger_blue1, IssueCommentEvent=Color.tan, WatchEvent=Color.steel_blue1,
    PullRequestEvent=Color.deep_pink1, PullRequestReviewEvent=Color.slate_blue1, PullRequestReviewCommentEvent=Color.tan,
    DeleteEvent=Color.light_pink1, ForkEvent=Color.orange1, IssuesEvent=Color.medium_violet_red,
    ReleaseEvent=Color.green1, MemberEvent=Color.orchid1, CommitCommentEvent=Color.tan,
    GollumEvent=Color.sea_green1, PublicEvent=Color.magenta2)

In [17]:
p = FixedPanel(15, box=box.HORIZONTALS, title='ghtop')
for e in evts[:163]: p.append(e)
p

#### Using `grid` with `FixedPanel`

We can use `grid` to arrange multiple `FixedPanel` instances in rows and columns.  Below is an example of how two `FixedPanel` instances can be arranged in a row:

In [18]:
p = FixedPanel(15, box=box.HORIZONTALS, title='ghtop')
for e in exs: p.append(e)
grid([[p,p]])

Here is another example of a four `FixedPanel` instances arranged in two rows and two columns:

In [19]:
types = IssueCommentEvent,IssuesEvent,PullRequestEvent,PullRequestReviewEvent
ps = {o:FixedPanel(15, box=box.HORIZONTALS, title=camel2words(remove_suffix(o.__name__,'Event'))) for o in types}

In [20]:
for k,v in ps.items(): v.extend(evts.filter(risinstance(k)))
isc,iss,prs,prrs = ps.values()
grid([[isc,iss],[prs,prrs]], width=110)

## Export -

In [21]:
#hide
from nbdev.export import notebook2script
notebook2script()

Converted 00_ghtop.ipynb.
Converted Prototype_Rich.ipynb.
Converted index.ipynb.
Converted richext.ipynb.


TypeError: unhashable type: 'list'