# Add value stream example

To use this example, start a Diffusion server and update the connection details below as needed. Then create a topic of the right type and run all the cells in this notebook.

Changing the topic values (e.g. in the console, or via another session) will trigger the stream callback, which will show in the output at the bottom of the notebook during the session's duration.

In [1]:
import asyncio
import diffusion

In [2]:
server_url = "ws://localhost:8080"
principal = "admin"
credentials = diffusion.Credentials("password")

In [3]:
topic_selector = "foo/bar"
topic_type = diffusion.datatypes.STRING

session_duration = 30

In [4]:
def on_update(*, old_value, topic_path, topic_value, **kwargs):
    print("Topic:", topic_path)
    if old_value is None:
        print("  Initial value:", topic_value)
    else:
        print("  Value updated")
        print("    Old value:", old_value)
        print("    New value:", topic_value)

def on_subscribe(*, topic_path, **kwargs):
    print(f"Subscribed to {topic_path}")
    
def on_unsubscribe(*, reason, topic_path, **kwargs):
    print(f"Unsubscribed from {topic_path} because {str(reason)}")

In [5]:
value_stream = diffusion.topics.ValueStreamHandler(
    data_type=topic_type, update=on_update, subscribe=on_subscribe, unsubscribe=on_unsubscribe
)

In [6]:
# creating the session
async with diffusion.Session(url=server_url, principal=principal, credentials=credentials) as session:

    print("Adding value stream")
    session.topics.add_value_stream(topic_selector=topic_selector, stream=value_stream)

    print(f"Subscribing to {topic_selector}")
    await session.topics.subscribe(topic_selector)

    await asyncio.sleep(session_duration)

    print(f"Unsubscribing from {topic_selector}")
    await session.topics.unsubscribe(topic_selector)

    await asyncio.sleep(1)  # keep alive to display the unsubscription message

Adding value stream
Subscribing to foo/bar
Unsubscribing from foo/bar
