In [1]:
import os, sys
import pandas as pd
import geopandas as gpd
import shapely
import numpy as np
from kafka import KafkaProducer
import json
import time
import bokeh.models as bokeh_models

sys.path.append(os.path.abspath('../../src')) # Ideally ST_Visions will be installed as a module, therefore this is more of a development setting for quick testing.

from st_visions.visualization.st_visualizer import st_visualizer
from st_visions.streaming.st_vizstream import ST_KafkaStream

%load_ext autoreload
%autoreload 2
os.environ["BOKEH_ALLOW_WS_ORIGIN"] = "*" # DEV ENVIROMENT BANDAID FOR VSCODE VISUALIZATIONS, will be edited


In [2]:
def simulate_kafka_stream(
    csv_path,
    topic="test_topic",
    bootstrap_servers="localhost:9092",
    key_field=None,
    delay=0.5
):
    df = pd.read_csv(csv_path).head(30)
    print(f"Loaded CSV: {len(df)} rows. Streaming to '{topic}'...")

    producer = KafkaProducer(
        bootstrap_servers=bootstrap_servers,
        value_serializer=lambda v: json.dumps(v).encode("utf-8"),
    )

    records = df.to_dict(orient="records")  #TODO: orient(list)
    for record in records:
        producer.send(topic, value=record)
        print(f"[KAFKA PRODUCER] Sending the following:  {record}")
        producer.flush()
        time.sleep(delay) 

    producer.close()
    print("Test stream finalized")

In [3]:
st_viz = st_visualizer(limit=5000) # Initialize a VISIONS Instance (ST Visualizer Object)

In [4]:
stream = ST_KafkaStream(topic_name='st-viz-topic')

Topic 'st-viz-topic' exists.
Stream thread initialized
Connected to stream successfully!
---------------------------------
Listening to topic 'st-viz-topic'...


In [5]:
simulate_kafka_stream(r'..\..\data\unipi_ais_dynamic_2017\unipi_ais_dynamic_dec2017.csv', 'st-viz-topic')

Loaded CSV: 30 rows. Streaming to 'st-viz-topic'...
[KAFKA PRODUCER] Sending the following:  {'t': 1512132540000, 'vessel_id': '7517504dc74ad38fd05748602a97e112f19a54bbb36cd764e6a463f9abe8f018', 'lon': 23.5557483333333, 'lat': 37.963016666666704, 'heading': nan, 'speed': 4.6, 'course': 211.0}
[KAFKA PRODUCER] Sending the following:  {'t': 1512132540000, 'vessel_id': '698b77dbaed14cc7fe67ee1e5017095b904643454fde596c1ec902c404aa100c', 'lon': 23.62548166666669, 'lat': 37.9407766666667, 'heading': 153.0, 'speed': 0.0, 'course': 0.0}
[KAFKA PRODUCER] Sending the following:  {'t': 1512132541000, 'vessel_id': '11e59aaa59860630711db1173860c9b322292d9da5a3311c469093a8d754c058', 'lon': 23.640975, 'lat': 37.9448533333333, 'heading': nan, 'speed': 0.1, 'course': 304.9}
[KAFKA PRODUCER] Sending the following:  {'t': 1512132542000, 'vessel_id': 'aa42d53ba2d4bd0cc50cb096190a4312347569661f629b4f1f4f897d48534f8d', 'lon': 23.6386733333333, 'lat': 37.947715, 'heading': 179.0, 'speed': 0.0, 'course': 0.0}

In [6]:
st_viz.get_data_stream(stream)

In [7]:
st_viz.create_canvas(title=f'Prototype Plot', tile_provider="CARTODBPOSITRON", sizing_mode='scale_width', height=540, tools="pan, box_zoom, lasso_select, wheel_zoom, hover, save, reset")

In [8]:
circ = st_viz.add_marker(marker='circle', size=10, color='royalblue', alpha=0.7, fill_alpha=0.5, muted_alpha=0, legend_label=f'Vessel GPS Locations')

In [9]:
st_viz.add_lasso_select()

st_viz.figure.legend.location = "top_left"
st_viz.figure.legend.click_policy = "mute"
st_viz.figure.toolbar.active_scroll = st_viz.figure.select_one(bokeh_models.WheelZoomTool)

In [10]:
st_viz.show_figures(notebook=True, height=300, width=300, sizing_mode='stretch_both')

In [11]:
stream.stop()

Stopping stream...
Stream stopped
