Skip to content

alwitt/httpmq

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

HTTP MQ

HTTP/2 based message broker built around NATS JetStream.

License Apache 2 Go Report Card CICD workflow

Table of Content


NATS JetStream is (as described in its documentation) a persistent message streaming solution designed to address "problems identified with streaming in technology today - complexity, fragility, and a lack of scalability". With JetStream serving as the core, httpmq is a thin HTTP/2 API layer which exposes some of core features of JetStream.

The httpmq application serves two API groups:

  • management: allow users to administer JetStream streams, consumers, and subjects.
  • dataplane: allow users to publish and subscribe through JetStream subjects.

Through the management API group, users can:

  • For streams:

    • Define new stream.
    • Fetch parameters of all defined streams.
    • Fetch parameters of one defined stream.
    • Change a stream's data retention policy.
    • Change a stream's subjects of interest.
    • Delete a stream.
  • For consumers:

    • Define new consumer on a stream.
      • Supports delivery / queue groups.
    • Fetch parameters of all defined consumers of a stream.
    • Fetch parameters of one defined consumer of a stream.
    • Delete a consumer.

Through the dataplane API group, users can:

  • Publish messages to a subject.
  • Push-subscribe for messages on a particular subject as a consumer of a stream.
    • Supports delivery / queue groups.

The httpmq application hosts the two API groups as different runtime modes; the application is either serving the management API group, or serving the dataplane API group.

A helper Makefile is included to automate the common development tasks. The available make targets are:

$ make help
lint                           Lint the files
fix                            Lint and fix vialoations
compose                        Run docker-compose to create the DEV ENV
doc                            Generate the OpenAPI spec
mock                           Generate test mock interfaces
test                           Run unittests
build                          Build project binaries
clean                          Clean up DEV ENV
help                           Display this help screen

First, start the local development NATS server with JetStream enabled:

$ make compose
Removing docker_nats_1 ... done
Removing network docker_httpmq-test
Removing volume docker_nats_js_store
Creating network "docker_httpmq-test" with driver "bridge"
Creating volume "docker_nats_js_store" with default driver
Creating docker_nats_1 ... done
$ docker logs docker_nats_1
[1] 2021/12/08 18:15:54.793626 [INF] Starting nats-server
[1] 2021/12/08 18:15:54.793669 [INF]   Version:  2.6.2
[1] 2021/12/08 18:15:54.793673 [INF]   Git:      [f7c3ac5]
[1] 2021/12/08 18:15:54.793675 [INF]   Name:     dev-nats
[1] 2021/12/08 18:15:54.793677 [INF]   Node:     EUUGZUxq
[1] 2021/12/08 18:15:54.793678 [INF]   ID:       ND77HNUBFZG5HCF6N7AOSWO2NOZAU23DTQAB2GZ56JCQDMTZ2RH4YR32
[1] 2021/12/08 18:15:54.793681 [INF] Using configuration file: nats-server.conf
[1] 2021/12/08 18:15:54.794314 [INF] Starting JetStream
[1] 2021/12/08 18:15:54.794547 [INF]     _ ___ _____ ___ _____ ___ ___   _   __  __
[1] 2021/12/08 18:15:54.794558 [INF]  _ | | __|_   _/ __|_   _| _ \ __| /_\ |  \/  |
[1] 2021/12/08 18:15:54.794559 [INF] | || | _|  | | \__ \ | | |   / _| / _ \| |\/| |
[1] 2021/12/08 18:15:54.794560 [INF]  \__/|___| |_| |___/ |_| |_|_\___/_/ \_\_|  |_|
[1] 2021/12/08 18:15:54.794561 [INF]
[1] 2021/12/08 18:15:54.794562 [INF]          https://docs.nats.io/jetstream
[1] 2021/12/08 18:15:54.794564 [INF]
[1] 2021/12/08 18:15:54.794565 [INF] ---------------- JETSTREAM ----------------
[1] 2021/12/08 18:15:54.794571 [INF]   Max Memory:      64.00 MB
[1] 2021/12/08 18:15:54.794573 [INF]   Max Storage:     256.00 MB
[1] 2021/12/08 18:15:54.794574 [INF]   Store Directory: "/mnt/nats/jetstream"
[1] 2021/12/08 18:15:54.794575 [INF] -------------------------------------------
[1] 2021/12/08 18:15:54.795736 [INF] Starting http monitor on 0.0.0.0:8222
[1] 2021/12/08 18:15:54.795840 [INF] Listening for client connections on 0.0.0.0:4222
[1] 2021/12/08 18:15:54.796280 [INF] Server is ready

Verify the project builds, and passes unit-tests

$ make
$ make test
?   	github.com/alwitt/httpmq	[no test files]
?   	github.com/alwitt/httpmq/apis	[no test files]
?   	github.com/alwitt/httpmq/cmd	[no test files]
ok  	github.com/alwitt/httpmq/common	0.323s
?   	github.com/alwitt/httpmq/core	[no test files]
ok  	github.com/alwitt/httpmq/dataplane	0.967s
ok  	github.com/alwitt/httpmq/management	0.081s

Start httpmq serving the management API group

$ ./httpmq.bin -l info management
2021/12/28 15:21:00  info Created JetStream client  component=jetstream-backend instance=nats://127.0.0.1:4222 module=core
2021/12/28 15:21:00  info Started HTTP server on http://127.0.0.1:3000 component=management instance=dvm-personal module=cmd

Start httpmq serving the dataplane API group

$ ./httpmq.bin -l info dataplane
2021/12/28 15:21:19  info Created JetStream client  component=jetstream-backend instance=nats://127.0.0.1:4222 module=core
2021/12/28 15:21:19  info Started HTTP server on http://127.0.0.1:3001 component=dataplane instance=dvm-personal module=cmd

Define a test stream

$ curl -X POST 'http://127.0.0.1:3000/v1/admin/stream' \
--header 'Content-Type: application/json' \
--data-raw '{
    "name": "testStream00",
    "max_age": 300000000000,
    "subjects": [
        "test-subject.00",
        "test-subject.01"
    ]
}'
{"success":true}

Verify the stream is defined

$ curl 'http://127.0.0.1:3000/v1/admin/stream/testStream00' | jq '.'
{
  "success": true,
  "stream": {
    "config": {
      "name": "testStream00",
      "subjects": [
        "test-subject.00",
        "test-subject.01"
      ],
      "max_consumers": -1,
      "max_msgs": -1,
      "max_bytes": -1,
      "max_age": 300000000000,
      "max_msgs_per_subject": -1,
      "max_msg_size": -1
    },
    "created": "2021-12-27T18:26:48.419816409Z",
    "state": {
      "messages": 0,
      "bytes": 0,
      "first_seq": 0,
      "first_ts": "0001-01-01T00:00:00Z",
      "last_seq": 0,
      "last_ts": "0001-01-01T00:00:00Z",
      "consumer_count": 0
    }
  }
}

Define a test consumer on this stream

$ curl -X POST 'http://127.0.0.1:3000/v1/admin/stream/testStream00/consumer' \
--header 'Content-Type: application/json' \
--data-raw '{
    "max_inflight": 4,
    "mode": "push",
    "name": "testConsumer00",
    "filter_subject": "test-subject.01"
}'
{"success":true}

Verify the consumer is defined

$ curl 'http://127.0.0.1:3000/v1/admin/stream/testStream00/consumer/testConsumer00' | jq '.'
{
  "success": true,
  "consumer": {
    "stream_name": "testStream00",
    "name": "testConsumer00",
    "created": "2021-12-27T18:27:58.055568398Z",
    "config": {
      "deliver_subject": "_INBOX.NZbAf8BCfeTA5s4Yxwnxnh",
      "max_deliver": -1,
      "ack_wait": 30000000000,
      "filter_subject": "test-subject.01",
      "max_ack_pending": 4
    },
    "delivered": {
      "consumer_seq": 0,
      "stream_seq": 0
    },
    "ack_floor": {
      "consumer_seq": 0,
      "stream_seq": 0
    },
    "num_ack_pending": 0,
    "num_redelivered": 0,
    "num_waiting": 0,
    "num_pending": 0
  }
}

Publish a message for a subject

$ curl -X POST 'http://127.0.0.1:3001/v1/data/subject/test-subject.01' --header 'Content-Type: text/plain' --data-raw "$(echo 'Hello World' | base64)"
{"success":true}

IMPORTANT: The message body must be Base64 encoded.

$ echo "Hello World" | base64
SGVsbG8gV29ybGQK

Subscribe to messages for a consumer on a stream

$ curl http://127.0.0.1:3001/v1/data/stream/testStream00/consumer/testConsumer00?subject_name=test-subject.01 --http2-prior-knowledge
{"success":true,"stream":"testStream00","subject":"test-subject.01","consumer":"testConsumer00","sequence":{"stream":1,"consumer":1},"b64_msg":"SGVsbG8gV29ybGQK"}
...

After receiving a message, acknowledge receiving that message with

$ curl -X POST 'http://127.0.0.1:3001/v1/data/stream/testStream00/consumer/testConsumer00/ack' --header 'Content-Type: application/json' --data-raw '{"consumer": 1,"stream": 1}'
{"success":true}

The consumer and stream fields are the sequence numbers which arrived with the message.

If an acknowledgement is not sent within the consumer's configured max ACK wait duration, the message will be sent through this consumer's subscription again. This time, the stream sequence number is unchanged, but the consumer sequence number is increased by one.

{"success":true,"stream":"testStream00","subject":"test-subject.01","consumer":"testConsumer00","sequence":{"stream":1,"consumer":2},"b64_msg":"SGVsbG8gV29ybGQK"}

When acknowledging this message now, use '{"consumer": 2,"stream": 1}' as the payload.

$ curl -X POST 'http://127.0.0.1:3001/v1/data/stream/testStream00/consumer/testConsumer00/ack' --header 'Content-Type: application/json' --data-raw '{"consumer": 2,"stream": 1}'
{"success":true}
Language SDK Notes
Golang httpmq-go
Python httpmq-python Requires asyncio