In [1]:
from ipywidgets import VBox, HBox
from pyspark.sql import SparkSession
from perspective import PerspectiveWidget

from helpers.data import (
    HOST,
    MACHINES_PORT,
    USAGE_PORT,
    STATUS_PORT,
    JOBS_PORT,
)
from helpers.spark import (
    MACHINE_SCHEMA,
    MACHINE_SCHEMA_SPARK,
    USAGE_SCHEMA,
    USAGE_SCHEMA_SPARK,
    STATUS_SCHEMA,
    STATUS_SCHEMA_SPARK,
    JOBS_SCHEMA,
    JOBS_SCHEMA_SPARK,
)

In [2]:
# Important imports
from helpers.spark import (
    get_df_from_server,
    push_to_perspective,
)
from helpers.fastapi import (
    perspective_spark_bridge,
    start_server,
)

In [3]:
from helpers.data import machines, usage

In [4]:
m = machines()
m[0]

{'machine_id': '4332a1fd80ab',
 'kind': 'edge',
 'cores': 4,
 'region': 'eu',
 'zone': 'B'}

In [5]:
u = usage(m[0])
u

{'machine_id': '4332a1fd80ab',
 'kind': 'edge',
 'cores': 4,
 'region': 'eu',
 'zone': 'B',
 'cpu': 0,
 'mem': 0,
 'free': 100,
 'network': 0,
 'disk': 0}

In [6]:
u = usage(u)
u

{'machine_id': '4332a1fd80ab',
 'kind': 'edge',
 'cores': 4,
 'region': 'eu',
 'zone': 'B',
 'cpu': 38.75,
 'mem': 59.79,
 'free': 40.21,
 'network': 74.57,
 'disk': 69.98}

In [7]:
spark = SparkSession.builder.appName("Perspective Demo").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/11 21:58:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [8]:
# Get spark streaming dfs
machines_df = get_df_from_server(spark, MACHINE_SCHEMA_SPARK, HOST, MACHINES_PORT)
usage_df = get_df_from_server(spark, USAGE_SCHEMA_SPARK, HOST, USAGE_PORT)
status_df = get_df_from_server(spark, STATUS_SCHEMA_SPARK, HOST, STATUS_PORT)
jobs_df = get_df_from_server(spark, JOBS_SCHEMA_SPARK, HOST, JOBS_PORT)

24/06/11 21:58:29 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
24/06/11 21:58:29 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
24/06/11 21:58:29 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
24/06/11 21:58:29 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.


In [9]:
# construct 4 separate perspective widgets. Each will have its own table internally
machines_widget = PerspectiveWidget(MACHINE_SCHEMA, index="machine_id", settings=False)
usage_widget = PerspectiveWidget(USAGE_SCHEMA, index="machine_id", settings=False)
status_widget = PerspectiveWidget(STATUS_SCHEMA, index="machine_id", sort=[["last_update", "desc"]], settings=False)
jobs_widget = PerspectiveWidget(JOBS_SCHEMA, sort=[["start_time", "desc"]], settings=False)

In [10]:
# a little bit of layout with ipywidgets
VBox(children=[
    HBox(children=[machines_widget, usage_widget]),
    HBox(children=[status_widget, jobs_widget]),
])

VBox(children=(HBox(children=(PerspectiveWidget(columns=['machine_id', 'kind', 'cores', 'region', 'zone'], setâ€¦

In [35]:
app = perspective_spark_bridge(
    {
        "machines": machines_widget,
        "usage": usage_widget,
        "status": status_widget,
        "jobs": jobs_widget,
    }
)
port = start_server(app)

CRITICAL:root:Listening on http://localhost:51701
INFO:     Started server process [22]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:51701 (Press CTRL+C to quit)


In [57]:
push_to_perspective(machines_df, "machines", "localhost", port)
push_to_perspective(usage_df, "usage", "localhost", port)
push_to_perspective(status_df, "status", "localhost", port)
push_to_perspective(jobs_df, "jobs", "localhost", port)

24/06/10 23:15:09 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-57efb414-d05a-4fb1-baaf-629d5b6ccb6f. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/06/10 23:15:09 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/06/10 23:15:10 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-d8beda44-1cbc-4ffe-95f8-b46b4bb8e235. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/06/10 23:15:10 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not support

In [12]:
status_widget.plugin = "X Bar"
status_widget.group_by = ["status"]
status_widget.columns = ["machine_id"]
status_widget.aggregates = {"status": "last"}

In [13]:
jobs_widget.group_by = ["machine_id"]
jobs_widget.columns = ["job_id", "name", "units", "start_time", "end_time"],
jobs_widget.aggregates = {"job_id": "count", "name": "last", "units": "sum", "start_time": "last", "end_time": "last"}