Transfer EPICS Channel Access (CA) messages into Kafka via the Kafka Connect interface.
Leverages Kafka as infrastructure - uses the Kafka Connect API to ensure a higher degree of fault-tolerance, scalability, and security that would be hard to achieve with ad-hoc implementations using the Kafka Producer API.
- Grab Project
git clone https://github.com/JeffersonLab/epics2kafka
cd epics2kafka
- Launch Compose
docker compose up
- Listen to the Kafka topic "topic1"
docker exec kafka kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic topic1 --from-beginning --property "key.separator==" --property print.key=true
- Put a value into the "channel1" EPICS channel
Single:
docker exec softioc caput channel1 1
Continuous:
docker exec -it softioc /scripts/feed-ca.sh channel1
Note: The Docker Compose project creates the following containers:
- softioc
- Kafka
- Connect
Note: If running multiple times, and your containers are maintaining state you do not wish to keep use the command docker compose down
to remove the images.
Note: The docker containers require significant resources; tested with 4 CPUs and 4GB memory allocated.
Three steps are required to deploy the CA Source Connector to an existing Kafka installation in standalone mode:
- Copy the connector and dependency jar files into a plugin directory:
mkdir /opt/kafka/plugins/epics2kafka
cp /tmp/epics2kafka.jar /opt/kafka/plugins/epics2kafka
cp /tmp/jca-2.4.6.jar /opt/kafka/plugins/epics2kafka
cp /tmp/kafka-common*.jar /opt/kafka/plugins/epics2kafka
Note: Dependencies already found in the kafka/libs directory should NOT be included in the plugin subdirectory. Jars needed by multiple plugins should be placed in kafka/libs instead of in each plugin subdirectory to avoid odd class loader and symbol resolution errors.
- Update the Kafka config (standalone environment shown):
# Edit existing config file for server
vi config/connect-standalone.properties
# Uncomment property and set value:
plugin.path=/opt/kafka/plugins
# Create new config file for connector
vi config/ca-source.properties
Example ca-source.properties
- Launch the Kafka Connect server:
cd /opt/kafka
bin/connect-standalone.sh config/connect-standalone.properties config/ca-source.properties
For distributed mode you must copy the connector and dependency jar files to all nodes in the cluster. Ensure plugin path is set in config/connect-distributed.properties instead of config/connect-standalone.properties. You control connectors in distributed mode using a REST API. For example, first start a distributed node:
bin/connect-distributed.sh config/connect-distributed.properties
Next start the connector:
curl -X POST -H "Content-Type:application/json" -d @./examples/connect-config/distributed/ca-source.json http://localhost:8083/connectors
Example ca-source.json
The Connect runtime is configured via either connect-standalone.properties
or connect-distributed.properties
and also includes the Connect internal topic configurations. Besides the plugin.path
discussed in the Install section, another important configuration is the max.message.size
config of the connect-configs
topic and the max.request.size
property of connect-*.properties
. If these aren't large enough the Connector will silently fail.
All of the common Connect options apply, plus the following Connector specific ones:
Option | Description | Default |
---|---|---|
monitor.addr.list | Space-separated list of addresses for PV resolution. Each address should be of the form ip:port or hostname:port |
|
monitor.auto.addr.list | Whether or not network interfaces should be discovered at runtime for the purpose of PV resolution | true |
monitor.connection.timeout | Time in seconds between CA beacons before client initiates health-check query which may result in disconnect if not promptly responded to | 30.0 |
monitor.repeater.port | Port number to use for the beacon repeater process, which will automatically be started if not already running | 5065 |
monitor.max.array.bytes | Maximum size of CA array, in bytes | 16384 |
monitor.thread.pool.size | Number of threads for processing network events | 5 |
monitor.poll.millis | Milliseconds between polls for CA changes - sets max CA monitor update frequency | 1000 |
command.topic | Name of Kafka command topic to monitor for channels list | epics-channels |
command.group | Name of Kafka consumer group to use when monitoring the command topic | ca-source |
command.poll.millis | Milliseconds between polls for command topic changes | 5000 |
command.max.poll.records | The maximum number of records returned in a single call to poll(), and also the maximum batch size returned in the batch call-back. | 5000 |
command.bootstrap.servers | Comma-separated list of host and port pairs that are the addresses of the Kafka brokers used to query the command topic | localhost:9092 |
command.settle.seconds | How many seconds to wait loading the initial channels list before timeout | 30 |
command.load.timeout.seconds | How many seconds to wait after a command is issued for no more commands before requesting reconfigure. This is done because reconfigure is a heavy operation so batching commands is ideal. | 15 |
Note: The monitor options (except poll.millis) map to Java Channel Access (JCA) options.
Options are specified in JSON format when running the connector in distributed mode (ca-source.json). In standalone mode the options are specified in a Java properties file (ca-source.properties).
The internal Schema structure can be serialized to various forms using Converters. The following are common converters:
Converter | Description |
---|---|
org.apache.kafka.connect.storage.StringConverter | Use the underlying connector struct schema in String form |
org.apache.kafka.connect.converters.ByteArrayConverter | Use the underlying connector struct schema in byte form |
org.apache.kafka.connect.json.JsonConverter | JSON formatted, by default the schema is embedded and top level keys are schema and payload. Disable embedded schema with additional option value.converter.schemas.enable=false |
io.confluent.connect.json.JsonSchemaConverter | Confluent Schema Registry backed JSON format |
io.confluent.connect.avro.AvroConverter | Confluent Schema Registry backed AVRO format |
io.confluent.connect.protobuf.ProtobufConverter | Confluent Schema Registry backed protocolbuffers format |
You can also create your own or use other Converters - they're pluggable.
You can control the value schema using the option value.converter. For example, to set the converter to JSON with an implicit schema (i.e. not included in the message or available to lookup in a registry):
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
Note: Confluent Schema Registry backed converters require a schema registry server specified with an additional option: value.converter.schema.registry.url
Note: Output topics use channel name as key (String key SCHEMA) by default, but if outkey is set then that is used instead. A key is required to support topic compaction and is especially useful when using a shared output topic.
The connector determines which EPICS channels to publish into Kafka by listening to a Kafka topic for commands, by default the topic "epics-channels" (configurable). The command topic is Event Sourced so that it can be treated like a database. Tombstone records are honored, topic compaction should be configured, and clients should rewind and replay messages to determine the full configuration.
{"topic":"Kafka topic name","channel":"EPICS CA channel name"}={"mask":"v, a, or va","outkey":"optional - output message key, defaults to channel"}
Each message key on the command topic is a JSON object containing the topic to produce messages on and the EPICS channel name to monitor. It is acceptable to re-use the same topic with multiple EPICS channels (merge updates). It is also possible to establish multiple monitors on a single channel by specifying unique topics for messages to be produced on.
Note: Kafka topic names generally can only contain alphanumeric characters with a few exceptions (like hyphen, but only if period and underscore are NOT used). Therefore, some EPICS channel names may be invalid Kafka topic names (such as channels containing the colon character).
The message value is a JSON object containing the EPICS CA event mask, which should be specified as either "v" or "a" or "va" representing value, alarm, or both. By default, messages are produced using the channel name as key. You can set this to a string of your choice using the optional value parameter outkey. Therefore it is possible to map multiple EPICS channels to a single message key if desired, or otherwise rename a channel.
There are various ways to produce command messages:
- Interactive kafka-console-producer.sh
You can command the connector to monitor a new EPICS CA channel with a JSON formatted message such as:
docker exec -it kafka /kafka/bin/kafka-console-producer.sh --bootstrap-server kafka:9092 --topic epics-channels --property "parse.key=true" --property "key.separator=="
> {"topic":"channel1","channel":"channel1"}={"mask":"va"}
>
- Bulk from File
Channels can be batch loaded from a file using shell file redirection such as with the example channels file found in the Connect docker image:
/kafka/bin/kafka-console-producer.sh --bootstrap-server kafka:9092 --topic epics-channels --property "parse.key=true" --property "key.separator==" --property "linger.ms=100" --property "compression.type=snappy" < /config/channels
- Adding Channels by Script
Alternatively, a bash script can be used to simplify the process for individual channels. For example to execute the script in the provided docker example:
docker exec connect /scripts/set-monitored.sh -t channel1 -c channel1 -m va
- Removing Channels by Script
You can command the connector to stop listening to a channel by writing a tombstone record (key with null value) or use the example bash script to unset (-u) the record:
docker exec connect /scripts/set-monitored.sh -t channel1 -c channel1 -u
Note: The kafka-console-producer.sh script currently does not support producing tombstone records.
docker exec connect /scripts/list-monitored.sh
The connector listens to the command topic and re-configures the connector tasks dynamically so no manual restart is required. Kafka Incremental Cooperative Rebalancing attempts to avoid a stop-the-world restart of the connector, but some EPICS CA events can be missed. When an EPICS monitor is established (or re-established) it always reports the current state - so although changes during a rebalance may be missed, the state of the system will be re-reported at the end of the rebalance. Channel monitors are divided as evenly as possible among the configured number of tasks. It is recommended to populate the initial set of channels to monitor before starting the connector to avoid task rebalancing being triggered repeatedly. You may wish to configure scheduled.rebalance.max.delay.ms
to a small number to avoid long periods of waiting to see if an assigned task is coming back or not in a task failure scenario.
The following environment variables are required by the scripts:
Name | Description |
---|---|
BOOTSTRAP_SERVERS | Comma separated values list of host and port pairs pointing to a Kafka server to bootstrap the client connection to a Kafka Cluser; example: kafka:9092 |
KAFKA_HOME | Path to Kafka installation; example: /opt/kafka |
Note: The scripts are Bash (Linux) and wrap the scripts provided with Kafka; plus two single-file java apps are included due to limitations with Kafka provided scripts: TombstoneProducer.java produces tombstone messages and SnapshotConsumer.java consumes a snashot of a topic (rewinds to beginning, replays all messages, and exits once last message is read).
This project is built with Java 17 (compiled to Java 11 bytecode), and uses the Gradle 7 build tool to automatically download dependencies and build the project from source:
git clone https://github.com/JeffersonLab/epics2kafka
cd epics2kafka
gradlew installDist
Note: If you do not already have Gradle installed, it will be installed automatically by the wrapper script included in the source
Note for JLab On-Site Users: Jefferson Lab has an intercepting proxy
See: Docker Development Quick Reference
Set up the build environment following the Build instructions.
In order to avoid installing Kafka locally and managing start/stop/reset of connect service you can use a Docker build:
docker compose -f build.yml up
Note: To deploy new changes you must stop (docker compose -f build.yml rm -svf connect
) and re-start (docker compose -f build.yml -d up connect
) the connect container. A full reset of all containers can be done with docker compose -f build.yml down
followed by docker compose -f build.yml up
Note: Alternative methods of development (with potentialy quicker iteration) include (1) using the bind.yml
to mount the build app artifact into a connect container and (2) running the app direclty on the local workstation and running service dependencies with deps.yml
.
Unit tests are run automatically upon building the project.
The integration tests depend on a running Kafka instance, generally in Docker. The tests run automatically via the CI GitHub Action on every commit (unless [no ci] is included in the commit message). The tests can be run locally during development. Set up the development environment following the Develop instructions. Then with the build.yml Docker containers running execute:
gradlew integrationTest
- Bump the version number in build.gradle and commit and push to GitHub (using Semantic Versioning).
- Create a new release on the GitHub Releases page corresponding to same version in build.gradle (Enumerate changes and link issues). Run dist build and attach zip to release.
- Publish to DockerHub GitHub Action should run automatically.
- Bump and commit quick start image version after confirming new image works