Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,20 @@ excerpt: "WebSockets with the Tornado web framework is a simple, robust way to
handle streaming data. I walk through a minimal example and discuss why these
tools are good for the job."
tags:
- python
- streaming
- tornado
- websocket
header:
overlay_image: /assets/images/cool-backgrounds/cool-background8.png
caption: 'Photo credit: [coolbackgrounds.io](https://coolbackgrounds.io/)'
last_modified_at: 2021-09-27
last_modified_at: 2021-06-13
search: false
---

{% if page.noindex == true %}
<meta name="robots" content="noindex">
{% endif %}

A lot of data science and machine learning practice assumes a static dataset,
maybe with some MLOps tooling for rerunning a model pipeline with the freshest
version of the dataset.
Expand All @@ -31,20 +35,20 @@ requests with REST endpoints). Of course, Tornado has pretty good support for
WebSockets as well.

In this blog post I'll give a minimal example of using Tornado and WebSockets
to handle streaming data. The toy example I have is one app (`server.py`)
writing samples of a Bernoulli to a WebSocket, and another app (`client.py`)
to handle streaming data. The toy example I have is one app (`transmitter.py`)
writing samples of a Bernoulli to a WebSocket, and another app (`receiver.py`)
listening to the WebSocket and keeping track of the posterior distribution for
a [Beta-Binomial conjugate model](https://eigenfoo.xyz/bayesian-bandits/).
After walking through the code, I'll discuss these tools, and why they're good
choices for working with streaming data.

For another tutorial on this same topic, you can check out [`proft`'s blog
For another good tutorial on this same topic, you can check out [`proft`'s blog
post](https://en.proft.me/2014/05/16/realtime-web-application-tornado-and-websocket/).

## Server
## Transmitter

- When `WebSocketServer` is registered to a REST endpoint (in `main`), it keeps
track of any processes who are listening to that endpoint, and pushes
- When `WebSocketHandler` is registered to a REST endpoint (on line 44), it
keeps track of any processes who are listening to that endpoint, and pushes
messages to them when `send_message` is called.
* Note that `clients` is a class variable, so `send_message` is a class
method.
Expand All @@ -56,20 +60,12 @@ post](https://en.proft.me/2014/05/16/realtime-web-application-tornado-and-websoc
case. For example, you could watch a file for any modifications using
[`watchdog`](https://pythonhosted.org/watchdog/), and dump the changes into
the WebSocket.
- The [`websocket_ping_interval` and `websocket_ping_timeout` arguments to
`tornado.Application`](https://www.tornadoweb.org/en/stable/web.html?highlight=websocket_ping#tornado.web.Application.settings)
configure periodic pings of WebSocket connections, keeping connections alive
and allowing dropped connections to be detected and closed.
- It's also worth noting that there's a
[`tornado.websocket.WebSocketHandler.websocket_max_message_size`](https://www.tornadoweb.org/en/stable/websocket.html?highlight=websocket_max_message_size#tornado.websocket.WebSocketHandler)
attribute. While this is set to a generous 10 MiB, it's important that the
WebSocket messages don't exceed this limit!

<script src="https://gist.github.com/eigenfoo/22f46166fa6924d684d68ca06e08b055.js"></script>
<script src="https://gist.github.com/eigenfoo/cb07fe6f026d544b013b29143e125a38.js"></script>

## Client
## Receiver

- `WebSocketClient` is a class that:
- `WebSocketReceiver` is a class that:
1. Can be `start`ed and `stop`ped to connect/disconnect to the WebSocket and
start/stop listening to it in a separate thread
2. Can process every message (`on_message`) it hears from the WebSocket: in
Expand All @@ -78,39 +74,17 @@ post](https://en.proft.me/2014/05/16/realtime-web-application-tornado-and-websoc
but this processing could theoretically be anything. For example, you
could do some further processing of the message and then dump that into a
separate WebSocket for other apps (or even users!) to subscribe to.
- To connect to the WebSocket, we need to use a WebSocket library: thankfully
Tornado has a built-in WebSocket functionality (`tornado.websocket`), but
we're also free to use other libraries such as the creatively named
[`websockets`](https://github.com/aaugustin/websockets) or
- To connect to the WebSocket, we need to use a WebSocket client, such as the
creatively named
[`websocket-client`](https://github.com/websocket-client/websocket-client).
- Note that we run `on_message` on the same thread as we run
`connect_and_read`. This isn't a problem so long as `on_message` is fast
enough, but a potentially wiser choice would be to offload `connect_and_read`
to a separate thread by instantiating a
[`concurrent.futures.ThreadPoolExecutor`](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor)
and calling
[`tornado.ioloop.IOLoop.run_in_executor`](https://www.tornadoweb.org/en/stable/ioloop.html#tornado.ioloop.IOLoop.run_in_executor),
so as not to block the thread where the `on_message` processing happens.
- The `io_loop` instantiated in `main` (as well as in `server.py`) is
important: it's how Tornado schedules tasks (a.k.a. _callbacks_) for delayed
- Note that we run `read` is a separate thread, so as not to block the main
thread (where the `on_message` processing happens).
- The `io_loop` instantiated on line 50 (as well as in `transmitter.py`) is
important - it's how Tornado schedules tasks (a.k.a. _callbacks_) for delayed
(a.k.a. _asynchronous_) execution. To add a callback, we simply call
`io_loop.add_callback()`.
- The [`ping_interval` and `ping_timeout` arguments to
`websocket_connect`](https://www.tornadoweb.org/en/stable/websocket.html?highlight=ping_#tornado.websocket.websocket_connect)
configure periodic pings of the WebSocket connection, keeping connections
alive and allowing dropped connections to be detected and closed.
- The `callback=self.maybe_retry_connection` is [run on a future
`WebSocketClientConnection`](https://github.com/tornadoweb/tornado/blob/1db5b45918da8303d2c6958ee03dbbd5dc2709e9/tornado/websocket.py#L1654-L1655).
Here, we simply get the `future.result()` (i.e. the WebSocket client
connection itself) — I don't actually do anything with the `self.connection`,
but you could if you wanted. In the event of an exception while doing that,
we assume there's a problem with the WebSocket connection and retry
`connect_and_read` after 3 seconds. This all has the effect of recovering
gracefully if the WebSocket is dropped or `server.py` experiences a brief
outage for whatever reason (both of which are probably inevitable for
long-running apps using WebSockets).

<script src="https://gist.github.com/eigenfoo/341f6c6c578d34120bccc4229e434377.js"></script>

<script src="https://gist.github.com/eigenfoo/a693b67167c775f7fe67329f3797595d.js"></script>

## Why Tornado?

Expand Down Expand Up @@ -153,21 +127,6 @@ SSE)](https://www.smashingmagazine.com/2018/02/sse-websockets-data-flow-http2/):
it seems to be a cleaner protocol for unidirectional data flow, which is really
all that we need.

Additionally, [Armin
Ronacher](https://lucumr.pocoo.org/2012/9/24/websockets-101/) has a much
starker view of WebSockets, seeing no value in using WebSockets over TCP/IP
sockets for this application:

> Websockets make you sad. [...] Websockets are complex, way more complex than I
> anticipated. I can understand that they work that way but I definitely don't
> see a value in using websockets instead of regular TCP connections if all you
> want is to exchange data between different endpoints and neither is a browser.

My thought after reading these criticisms is that perhaps WebSockets aren't the
ideal technology for handling streaming data (from a maintainability or
architectural point of view), but that doesn't mean that they aren't good
scalable technologies when they do work.

---

[^1]: There is [technically a difference](https://sqlstream.com/real-time-vs-streaming-a-short-explanation/) between "real-time" and "streaming": "real-time" refers to data that comes in as it is created, whereas "streaming" refers to a system that processes data continuously. You stream your TV show from Netflix, but since the show was created long before you watched it, you aren't viewing it in real-time.