This repository shows you how to connect to one or more Kafka topics to stream data into Deephaven. It uses an example producer, which connects to a DevExperts DXFeed demo feed and publishes its quote and trade events as JSON messages through Kafka.
The demo feed contains a handful of symbols with 15 minute delayed publication during trading hours. In order to provide events during non-trading hours, the demo feed will replay random old events every few seconds.
The Kafka producer used in this guide creates quotes
and trades
topics using the local system as the broker.
To launch the latest release, you can clone the repository via:
git clone https://github.com/deephaven-examples/kafka-sample-producer.git
cd kafka-sample-producer
A Docker container containing a Confluent Kafka Community Edition broker and the DXFeed Kafka producer is available in this repository as: kafka_sample_producer.tar.gz
.
To load this container, inside the repository directory run:
docker load < kafka_sample_producer.tar.gz
Note if you use lfs you might night to run
git lfs pull
To download the full file.
Next, one needs to run the broker. To do this you need to list your IP address. On a Mac the command is:
ipconfig getifaddr en0
To run it, execute:
docker run -e HOSTIP=<IP_address> -dp 9092:9092 ghcr.io/kafka/sample_producer:latest
IP_address
is an address of your Docker host that is reachable from your Deephaven Community Core container.9092:9092
exposes port 9092 (the default port for Kafka) from the sample producer container to the host's IP.-d
indicates to run disconnected, rather than echoing container output back to the Docker host's console.
In the examples above, this container was started with HOSTIP=192.168.86.155
, and the queries were then able to connect to it to obtain the quotes
and trades
topics.
This app runs using Deephaven with Docker. See our Quickstart for more information.
To run:
docker-compose pull
docker-compose up -d
Navigate to http://localhost:10000/ide for the Deephaven IDE.
The following query creates the quotes
table:
from deephaven import kafka_consumer as ck
from deephaven.stream.kafka.consumer import TableType, KeyValueSpec
import deephaven.dtypes as dh
quotes = ck.consume({ 'bootstrap.servers' : '192.168.86.155:9092' },
'quotes',
key_spec=KeyValueSpec.IGNORE,
value_spec=ck.json_spec([ ('Sym', dh.string),
('AskSize', dh.int_),
('AskPrice', dh.double),
('BidSize', dh.int_),
('BidPrice', dh.double),
('AskExchange', dh.string),
('BidExchange', dh.string),
('AskTime', dh.int_),
('BidTime', dh.int_)]),
table_type=TableType.Append) \
.update_view(["AskTime = millisToTime(AskTime)"]) \
.update_view(["BidTime = millisToTime(BidTime)"])
Let's walk through the query step-by-step:
-
kafka_consumer
provides theconsumeToTable
method, which is used to connect to a Kafka broker and receive events. -
The
Types
module, imported asdh
, contains data type definitions to be used when adding columns/fields for parsing from JSON messages to a Deephaven table. Note the trailing underscore, which is a required part of the name for the int and long types used here. -
quotes
is the name of the table to create from theconsumeToTable
call. -
The
consume
call includes the following:- The first argument is a dictionary which can contain Kafka client properties. The one property which we need to set is the
bootstrap.servers
list (the broker to connect to), including its name, or IP address, and port. 9092 is the default port for Kafka. - The next argument is the name of the topic to connect to (
quotes
). - The next two arguments are the key/value pair to parse from Kafka messages. In this case, we are going to accept all keys (
KafkaTools.IGNORE
), and parse the values as JSON (KafkaTools.json
), with the array of theKafkaTools.json
argument being the list of columns/fields and their data types.
- The first argument is a dictionary which can contain Kafka client properties. The one property which we need to set is the
-
The last two statements (
.update_view
) operate on thequotes
table to convert epoch milliseconds Ask and Bid time values to DeephavenDBDateTime
values.
The result is a quotes
table which populates as new events arrive:
New messages are added to the bottom of the table, by default, so you may want to .reverse()
it, or sort in descending order by KafkaTimestamp
, in order to see new events tick in.
A similar query gets the trades
topic data from Kafka:
from deephaven import kafka_consumer as ck
from deephaven.stream.kafka.consumer import TableType, KeyValueSpec
import deephaven.Types as dh
trades = ck.consume({ 'bootstrap.servers' : '10.128.0.252:9092' },
'trades',
key_spec=KeyValueSpec.IGNORE,
value_spec=ck.json_spec([ ('Sym', dh.string),
('Size', dh.int_),
('Price', dh.double),
('DayVolume', dh.int_),
('Exchange', dh.string),
('Time', dh.int_)]),
table_type=TableType.Append) \
.update_view(["Time = millisToTime(Time)"])
We can then write other queries which combine and/or aggregate data from these streams. This query uses an as-of join to correlate trade events with the most recent bid for the same symbol.
relatedQuotes = trades.aj(table = quotes, on =["Sym,Time=BidTime"], joins =["BidTime,BidPrice,BidSize,BidExchange"])
The next query calculates the volume average price for each symbol on a per-minute basis, using the start of the minute as the binning value.
from deephaven import agg as agg
vwap = trades.view(formulas = ["Sym","Size","Price","TimeBin=lowerBin(Time,1*MINUTE)","GrossPrice=Price*Size"])\
.agg_by([\
agg.avg(["AvgPrice = Price"]),\
agg.sum_(["Volume = Size"]),\
agg.sum_(["TotalGross = GrossPrice"])],\
by = ["Sym","TimeBin"])\
.update_view(["VWAP=TotalGross/Volume"])
:::note
The relatedQuotes
and vwap
tables do not work well outside of trading hours, because the before/after hours events lack real timestamps.
:::