In [1]:
import pandas as pd
import math
import asyncio
import vega
import ipywidgets as widgets
from ipywidgets import VBox, HBox, Label, Button, interact
from vega.widget import VegaWidget

In [2]:
dataframe = pd.read_csv("./data/penguins.csv")
data = dataframe.sample(n=len(dataframe), random_state=42).reset_index(drop=True)
data = list(map(lambda item: [item[0], item[len(item)-2]], data.values))

In [3]:
# data2 = pd.read_json("./data/summary.json")
# data2

In [4]:
spec_no_data = {
  "$schema": "https://vega.github.io/schema/vega-lite/v5.json",
  "title": "Body Mass of Penguin species (g)",
  "width": 400,
  "height": 300,
  "autosize":{
    "type": "pad",
    "resize": "true"
  },
  "data": {
    "name": "penguins"
  },
  "encoding": {
    "y": {
      "field": "species",
      "type": "nominal",
      "title": None
    }
  },
  "layer": [
    {
      "mark": {
        "type": "rule"
      },
      "encoding": {
        "x": {
          "field": "lower",
          "type": "quantitative",
          "scale": {
            "zero": "false"
          },
          "title": None
        },
        "x2": {
          "field": "upper"
        }
      }
    },
    {
      "mark": {
        "type": "bar",
        "size": 14
      },
      "encoding": {
        "x": {
          "field": "q1",
          "type": "quantitative"
        },
        "x2": {
          "field": "q3"
        },
        "color": {
          "field": "species",
          "type": "nominal",
          "legend": None
        }
      }
    },
    {
      "mark": {
        "type": "tick",
        "color": "white",
        "size": 14
      },
      "encoding": {
        "x": {
          "field": "median",
          "type": "quantitative"
        }
      }
    },
    {
      "transform": [
        {
          "flatten": [
            "outliers"
          ]
        }
      ],
      "mark": {
        "type": "point",
        "style": "boxplot-outliers"
      },
      "encoding": {
        "x": {
          "field": "outliers",
          "type": "quantitative"
        }
      }
    }
  ]
}

In [5]:
def quantileSorted(arrSorted, q):
    length = len(arrSorted)
    base = length * q - 0.5
    first = arrSorted[math.ceil(base)]
    second = arrSorted[math.floor(base)]
    return (first + second) / 2

In [6]:

def calculateSummary(key, sortedArr):
    if len(sortedArr) <= 0:
        return {
            "Species": key,
            "lower": 0,
            "q1": 0,
            "median": 0,
            "q3": 0,
            "upper": 0,
            "outliers": 0, 
        }
    q1 = quantileSorted(sortedArr, 0.25)
    median = quantileSorted(sortedArr, 0.5)
    q3 = quantileSorted(sortedArr, 0.75)
    
    iqr = q3 - q1
    top = q3 + 1.5 * iqr
    bottom = q1 - 1.5 * iqr
    upper = sortedArr[0]
    lower = sortedArr[0]
    outliers = []
    for i in range(len(sortedArr)):
        curNum = sortedArr[i]
        if (curNum < bottom) or (curNum > top):
            outliers.append(curNum)
        elif curNum > upper:
            upper = curNum
        elif curNum < lower:
            lower = curNum
    
    return {
        "species": key,
        "lower": lower,
        "q1": q1,
        "median": median,
        "q3": q3,
        "upper": upper,
        "outliers": outliers,
    }


In [7]:
def summaryData(dataMerged):
    summary = [];
    for key in dataMerged.keys():
        masses = dataMerged[key]
        summary.append(calculateSummary(key, masses))
    return summary

In [8]:
def group(data):
    res = {'Adelie': [], 'Chinstrap': [], 'Gentoo': []}
    for item in data:
        species = item[0];
        res[species].append(item[1])
    return res

In [9]:
def mergeSortedData(oldData, newData):
    newLength = len(newData)
    if (newLength == 0):
        return oldData
    oldLength = len(oldData)
    if(oldLength == 0):
        return newData
    result = []
    m = 0
    n = 0
    while m < oldLength and n < newLength:
        if oldData[m] <= newData[n]:
            result.append(oldData[m])
            m += 1
        else:
            result.append(newData[n])
            n += 1
    
    if m >= oldLength:
        result.append(newData[n])
        n += 1
        while n < newLength:
            result.append(newData[n])
            n += 1
    elif n >= newLength:
        result.append(oldData[m])
        m += 1
        while m < oldLength:
            result.append(oldData[m])
            m += 1
    
    return result


def mergeData(dataMerged, dataSegment):
    for key in dataMerged.keys():
        valuesOld = dataMerged[key]
        valuesNew = dataSegment[key]
        valuesNew.sort()
        # sorted(valuesNew, key=lambda a,b: a-b)
        dataMerged[key] = mergeSortedData(valuesOld, valuesNew)
    return dataMerged


In [10]:
mergeSortedData([1, 3, 4], [2, 6])

[1, 2, 3, 4, 6]

In [11]:
widget = VegaWidget(spec=spec_no_data)#, opt={"height": "300"})
stopButton = Button(
    description='Stop/Continue',
    disabled=False,
    button_style='',
    tooltip='click to stop calculation and view udpate'
)
restartButton = Button(
    description='Restart',
    disabled=False,
    button_style='',
    tooltip='click to restart calculation and view update'
)

In [12]:
"""shared data"""
summary = []  # result calculated by 'progressiveSummary', and used by 'updateWidgetAsync' to update Widgets
stop = False
activeLock = asyncio.Lock()  # default: False
syncActiveTask = None
progressiveSummaryTask = None
updateWidgetAsyncTask = None

In [13]:
async def progressiveSummary():
    """
    Task1
    Split data into segments. caclulate summary progressively
    """
    global summary
    global activeLock
    batchSize = 30
    itrNum = math.ceil(len(data) / batchSize)
    dataMerged = {'Adelie': [], 'Chinstrap': [], 'Gentoo': []}

    for itr in range(itrNum):
        await activeLock.acquire()
        activeLock.release()
        dataSegment = data[itr * batchSize: (itr+1) * batchSize]  # get current data segment
        dataGrouped = group(dataSegment)  # group data by "species" field
        dataMerged = mergeData(dataMerged, dataGrouped)  # merge current data segment with previous data
        summary = summaryData(dataMerged)  # calculate summary
        print("calculated")
        await asyncio.sleep(1)

In [14]:
async def updateWidgetAsync():
    """
    Task2
    Update widget with the global variable 'summary' per second
    """
    global summary
    global activeLock
    while True:
        await activeLock.acquire()
        activeLock.release()
        widget.update("penguins", remove="true", insert=summary)
        print("widget updated")
        await asyncio.sleep(1)

In [15]:
async def syncActiveLock():
    """
    Task3
    lock/unlock the "activeLock" according to the "stop" flag
    """
    global stop
    global activeLock
    while True:
        await asyncio.sleep(0.1)
        if not stop and activeLock.locked():
            activeLock.release()
        elif stop and not activeLock.locked():
            await activeLock.acquire()
            
            

In [16]:
def stopButtonListener(b):
    global stop
    if stop:
        stop = False
        print("************* continue **************")
    else:
        stop = True
        print("************* stop **************")

stopButton.on_click(stopButtonListener)

In [17]:
def restartButtonListener(b):
    global stop
    global activeLock
    global summary
    global syncActiveTask
    global progressiveSummaryTask
    global updateWidgetAsyncTask
    summary = []
    stop = False
    if activeLock.locked():
        activeLock.release()
    if syncActiveTask:
        syncActiveTask.cancel()
    if progressiveSummaryTask:
        progressiveSummaryTask.cancel()
    if updateWidgetAsyncTask:
        updateWidgetAsyncTask.cancel()
    loop = asyncio.get_event_loop()
    syncActiveTask = loop.create_task(syncActiveLock())
    progressiveSummaryTask = loop.create_task(progressiveSummary());
    updateWidgetAsyncTask = loop.create_task(updateWidgetAsync());
    print("************* restart **************")

restartButton.on_click(restartButtonListener)

In [18]:
VBox([widget, HBox([stopButton, restartButton])])

VBox(children=(VegaWidget(), HBox(children=(Button(description='Stop/Continue', style=ButtonStyle(), tooltip='…

In [19]:
global syncActiveTask
global progressiveSummaryTask
global updateWidgetAsyncTask
loop = asyncio.get_event_loop();
syncActiveTask = loop.create_task(syncActiveLock())
progressiveSummaryTask = loop.create_task(progressiveSummary());
updateWidgetAsyncTask = loop.create_task(updateWidgetAsync());