In [1]:
import dataclasses
import random
from random import randint
import decimal

import perspective
from perspective import Table, PerspectiveWidget
import pandas as pd


from hazelcast import HazelcastClient
from hazelcast.serialization.api import (
    CompactSerializer,
    CompactWriter,
    CompactReader,
)


@dataclasses.dataclass
class Portfolio:
    stock: str
    sold: int
    bought: int
    change: int


class PortfolioSerializer(CompactSerializer[Portfolio]):
    def read(self, reader: CompactReader):
        stock = reader.read_string("stock")
        sold = reader.read_int32("sold")
        bought = reader.read_int32("bought")
        change = reader.read_int32("change")
        
        return Portfolio(stock, sold,bought, change)

    def write(self, writer: CompactWriter, obj: Portfolio):
        writer.write_string("stock", obj.stock)
        writer.write_int32("sold", obj.sold)
        writer.write_int32("bought", obj.bought)
        writer.write_int32("change", obj.change)

    def get_type_name(self):
        return "Portfolio"

    def get_class(self):
        return Portfolio

client = HazelcastClient(compact_serializers=[PortfolioSerializer()])

client.sql.execute(
    """
CREATE MAPPING IF NOT EXISTS portfolios (
    __key INT,
    stock VARCHAR,
    sold INT,
    bought INT,
    change INT
)
TYPE IMap
OPTIONS (
    'keyFormat' = 'int',
    'valueFormat' = 'compact',
    'valueCompactTypeName' = 'Portfolio'
)
"""
).result()

portfolios = client.get_map("portfolios").blocking()
stock_list = ['AAPL', 'MSFT', 'GOOG', 'JPM', 'IBM','TPL',
              'NDP', 'PBA', 'ESTE', 'DWAC', 'IBA','MTRN',
              'LPI', 'CVX', 'EOG', 'GHC', 'FRPT','EXPD',]


def data_source():
    d = [] 
    for x in range(20):
        sold= randint(-10, 0)
        bought= randint(0, 10)
        portfolios.set(x, Portfolio(random.choice(stock_list),sold, bought, (bought-sold)))
    with client.sql.execute("SELECT * FROM portfolios").result() as rows:
        for row in rows:
            d.append(
            {
                'stock': row['stock'],
                'sold': row['sold'],
                'bought': row['bought'],
                'change': row['change']
            }
            )
    return d



streaming_table = Table(data_source())


widget = perspective.PerspectiveWidget(streaming_table, 
                                       server=True, 
                                       plugin="Datagrid", 
                                       group_by=["stock"],
                                       sort=[["change", "desc"]],
                                       theme="Material Dark"
                                      )
widget




PerspectiveWidget(columns=['stock', 'sold', 'bought', 'change'], group_by=['stock'], server=True, sort=[['chan…

In [2]:
import asyncio
import threading

async def _update():
    while True:
        streaming_table.update(data_source())
        await asyncio.sleep(0.05)
        
def update_table():
    loop = asyncio.new_event_loop()
    task = loop.create_task(_update())
    loop.call_later(60, task.cancel)
    
    try:
        loop.run_until_complete(task)
    except asyncio.CancelledError:
        print("Stopped streaming!")
        pass



In [3]:
thread = threading.Thread(target=update_table)
thread.start()

In [None]:
#thread.join()
