The Pulsar Beam{external-link-icon} project is an HTTP-based streaming and queueing system for use with Apache Pulsar.
With Pulsar Beam, you can send messages over HTTP, push messages to a webhook or cloud function, chain webhooks and functions together, or stream messages via server-sent events{external-link-icon}.
In this guide we will install a minimal DataStax Pulsar Helm chart that includes Pulsar Beam.
You will need the following prerequisites in place to complete this guide:
-
Helm 3 CLI{external-link-icon} (this example uses version 3.8.0)
-
Enough access to a K8s cluster to create a namespace, deployments, and pods
-
Kubectl CLI{external-link-icon} (this example uses version 1.23.4)
In a new terminal window, run the following curl command to begin streaming server-sent events. See configuring your local environment for Astra Streaming.
TOPIC="my-beam-topic"
curl http://127.0.0.1:8085/v2/sse/persistent/public/default/$TOPIC?SubscriptionInitialPosition=earliest&SubscriptionType=exclusive&SubscriptionName=my-beam-subscription
Note the use of SubscriptionInitialPosition=earliest
in the message consumer.
This instructs Beam to create a subscription on the topic starting at the earliest message.
Try changing the value to latest
to only receive new messages that arrive.
Return to the original terminal window and run the following script to produce a new message. Don’t worry about creating the new topic - if a topic doesn’t exist, it is created automatically.
TOPIC="my-beam-topic"
curl --request POST \
-d "Hi there" \
http://127.0.0.1:8085/v2/firehose/persistent/public/default/$TOPIC
The message consumer will output the data of the new message just produced.
id: {9 0 0 0 <nil> 0xc002287ad0}
data: Hi there
You have now completed the basics of using Beam in a Pulsar Cluster. Refer to the project’s readme{external-link-icon} to see all the possibilities!
This is another example of producing and consuming messages using Beam. Instead of using curl, this example will use the "requests" Python library to issue HTTP requests.
-
Create a new file to hold the Python code.
touch beam.py
-
Paste the following code into "beam.py".
link:{luna-streaming-examples-repo}/beam/produce-consume.py[role=include]
-
Run the Python example (we used Python version 3.8.10).
python3 beam.py
-
The output should show 1 message was published and 1 message was consumed.
Published 1 message (200) Consumed messages (200): { "limit": 10, "size": 1, "messages": [ { "payload": "SGkgdGhlcmUh", "topic": "persistent://public/default/my-beam-topic", "eventTime": "2023-01-03T15:52:24.054Z", "publishTime": "2023-01-03T15:52:24.054Z", "messageId": "&{ledgerID:16 entryID:0 batchIdx:0 partitionIdx:0 tracker:<nil> consumer:0xc002287e10}", "key": "" } ] }
Note
|
The payload is base64 encoded. Simply decode the string to see the actual message data. |
Here are links to resources and guides you might be interested in:
-
Learn more{external-link-icon} about the Pulsar Beam project
-
Pulsar Beam API{external-link-icon}