In [1]:
STREAM_NAME = "TEST"

In [2]:
from jsm.api.client import Client as JS

In [3]:
js = JS()
await js.connect()

- Create a new stream

In [4]:
response = await js.stream_create("TEST", subjects=["test.>"], max_consumers=10, raise_on_error=True)
response.config.dict()

{'name': 'TEST',
 'subjects': ['test.>'],
 'retention': <Retention.limits: 'limits'>,
 'max_consumers': 10,
 'max_msgs': -1,
 'max_msgs_per_subject': -1,
 'max_bytes': -1,
 'max_age': 0,
 'max_msg_size': -1,
 'storage': <Storage.file: 'file'>,
 'num_replicas': 1,
 'no_ack': False,
 'template_owner': None,
 'discard': <Discard.old: 'old'>,
 'duplicate_window': 120000000000,
 'placement': None,
 'mirror': None,
 'sources': None}

- List stream names

In [5]:
response = await js.stream_names()
response.streams

['TEST']

- List streams

In [6]:
response = await js.stream_list()

In [7]:
response.streams[0].dict()

{'config': {'name': 'TEST',
  'subjects': ['test.>'],
  'retention': <Retention.limits: 'limits'>,
  'max_consumers': 10,
  'max_msgs': -1,
  'max_msgs_per_subject': -1,
  'max_bytes': -1,
  'max_age': 0,
  'max_msg_size': -1,
  'storage': <Storage.file: 'file'>,
  'num_replicas': 1,
  'no_ack': False,
  'template_owner': None,
  'discard': <Discard.old: 'old'>,
  'duplicate_window': 120000000000,
  'placement': None,
  'mirror': None,
  'sources': None},
 'state': {'messages': 0,
  'bytes': 0,
  'first_seq': 0,
  'first_ts': datetime.datetime(1, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),
  'last_seq': 0,
  'last_ts': datetime.datetime(1, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),
  'deleted': None,
  'num_deleted': None,
  'lost': None,
  'consumer_count': 0},
 'created': '2021-08-09T00:55:58.1222356Z',
 'mirror': None,
 'sources': None,
 'cluster': None}

- Get stream info

In [8]:
response = await js.stream_info(STREAM_NAME)

In [9]:
response.dict()

{'type': 'io.nats.jetstream.api.v1.stream_info_response',
 'config': {'name': 'TEST',
  'subjects': ['test.>'],
  'retention': <Retention.limits: 'limits'>,
  'max_consumers': 10,
  'max_msgs': -1,
  'max_msgs_per_subject': -1,
  'max_bytes': -1,
  'max_age': 0,
  'max_msg_size': -1,
  'storage': <Storage.file: 'file'>,
  'num_replicas': 1,
  'no_ack': False,
  'template_owner': None,
  'discard': <Discard.old: 'old'>,
  'duplicate_window': 120000000000,
  'placement': None,
  'mirror': None,
  'sources': None},
 'state': {'messages': 0,
  'bytes': 0,
  'first_seq': 0,
  'first_ts': datetime.datetime(1, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),
  'last_seq': 0,
  'last_ts': datetime.datetime(1, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),
  'deleted': None,
  'num_deleted': None,
  'lost': None,
  'consumer_count': 0},
 'created': '2021-08-09T00:55:58.1222356Z',
 'mirror': None,
 'sources': None,
 'cluster': None}

- By defaults errors are not raised, but it can be easily changed using the `raise_on_error` keyword argument:

In [10]:
response = await js.stream_info("toto")
print(response)

type='io.nats.jetstream.api.v1.stream_info_response' error=IoNatsJetstreamApiV1ErrorItem(code=404, description='stream not found')


In [11]:
from jsm.api.errors import JetStreamError

try:
    await js.stream_info("toto", raise_on_error=True)
except JetStreamError as err:
    print(f"Type: {err.type}")
    print(f"Code: {err.code}")
    print(f"Description: {err.description}")

Type: io.nats.jetstream.api.v1.stream_info_response
Code: 404
Description: stream not found


- Update a stream

In [12]:
response = await js.stream_update(STREAM_NAME, subjects=["test.>", "demo.>"])

In [13]:
response.config

Config(name='TEST', subjects=['test.>', 'demo.>'], retention=<Retention.limits: 'limits'>, max_consumers=10, max_msgs=-1, max_msgs_per_subject=-1, max_bytes=-1, max_age=0, max_msg_size=-1, storage=<Storage.file: 'file'>, num_replicas=1, no_ack=False, template_owner=None, discard=<Discard.old: 'old'>, duplicate_window=120000000000, placement=None, mirror=None, sources=None)

Checking that configuration was indeed updated can be done easily:

In [14]:
assert (await js.stream_info(STREAM_NAME)).config.subjects == ["test.>", "demo.>"]

- Delete a stream

In [15]:
await js.stream_delete(STREAM_NAME)

IoNatsJetstreamApiV1StreamDeleteResponse(type='io.nats.jetstream.api.v1.stream_delete_response', success=True)

- Recreate the stream

In [16]:
await js.stream_create(STREAM_NAME, subjects=["test.>"])

IoNatsJetstreamApiV1StreamCreateResponse(type='io.nats.jetstream.api.v1.stream_create_response', config=Config(name='TEST', subjects=['test.>'], retention=<Retention.limits: 'limits'>, max_consumers=-1, max_msgs=-1, max_msgs_per_subject=-1, max_bytes=-1, max_age=0, max_msg_size=-1, storage=<Storage.file: 'file'>, num_replicas=1, no_ack=False, template_owner=None, discard=<Discard.old: 'old'>, duplicate_window=120000000000, placement=None, mirror=None, sources=None), state=State(messages=0, bytes=0, first_seq=0, first_ts=datetime.datetime(1, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), last_seq=0, last_ts=datetime.datetime(1, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), deleted=None, num_deleted=None, lost=None, consumer_count=0), created='2021-08-09T00:55:59.1782441Z', mirror=None, sources=None, cluster=None)

- Publish a message to the stream

In [17]:
await js.publish("test.demo", b"hello world")

- Check the stream state

In [18]:
response = await js.stream_info(STREAM_NAME)
response.state

State(messages=1, bytes=50, first_seq=1, first_ts=datetime.datetime(2021, 8, 9, 0, 55, 59, 225812, tzinfo=datetime.timezone.utc), last_seq=1, last_ts=datetime.datetime(2021, 8, 9, 0, 55, 59, 225812, tzinfo=datetime.timezone.utc), deleted=None, num_deleted=None, lost=None, consumer_count=0)

We can see that there is 1 message, with `first_seq` being 1 and `last_seq` being also 1. We can fetch a message by its sequence:

In [19]:
response = await js.stream_msg_get(STREAM_NAME, 1)

In [20]:
response.message

Message(subject='test.demo', seq=1, data='aGVsbG8gd29ybGQ=', time=datetime.datetime(2021, 8, 9, 0, 55, 59, 225812, tzinfo=datetime.timezone.utc), hdrs=None)

- Delete a message

In [21]:
response = await js.stream_msg_delete(STREAM_NAME, 1)

In [22]:
response.success

True

- Check the stream state again

In [23]:
response = await js.stream_info(STREAM_NAME)

In [24]:
response.state

State(messages=0, bytes=0, first_seq=2, first_ts=datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), last_seq=1, last_ts=datetime.datetime(2021, 8, 9, 0, 55, 59, 225812, tzinfo=datetime.timezone.utc), deleted=None, num_deleted=None, lost=None, consumer_count=0)

As you can see, the `first_seq` is now 2 because we deleted the previous first message. Attempting to fetch a message not received yet will lead to a Jetstream error:

In [25]:
try:
    await js.stream_msg_get(STREAM_NAME, 2, raise_on_error=True)
except JetStreamError as err:
    print(f"Type: {err.type}")
    print(f"Code: {err.code}")
    print(f"Description: {err.description}")

Type: io.nats.jetstream.api.v1.stream_msg_get_response
Code: 404
Description: no message found


- Let's publish a new message to the stream

In [26]:
await js.publish("test.demo", b"hello again")

In [27]:
response = await js.stream_info(STREAM_NAME)
response.state

State(messages=1, bytes=50, first_seq=2, first_ts=datetime.datetime(2021, 8, 9, 0, 56, 0, 477100, tzinfo=datetime.timezone.utc), last_seq=2, last_ts=datetime.datetime(2021, 8, 9, 0, 56, 0, 477100, tzinfo=datetime.timezone.utc), deleted=None, num_deleted=None, lost=None, consumer_count=0)

- The second message is now available

In [28]:
await js.stream_msg_get(STREAM_NAME, 2)

IoNatsJetstreamApiV1StreamMsgGetResponse(type='io.nats.jetstream.api.v1.stream_msg_get_response', message=Message(subject='test.demo', seq=2, data='aGVsbG8gYWdhaW4=', time=datetime.datetime(2021, 8, 9, 0, 56, 0, 477100, tzinfo=datetime.timezone.utc), hdrs=None))

- Let's purge the stream

In [29]:
await js.stream_purge(STREAM_NAME)

IoNatsJetstreamApiV1StreamPurgeResponse(type='io.nats.jetstream.api.v1.stream_purge_response', success=True, purged=1)

- And fetch the stream info one last time

In [30]:
response = await js.stream_info(STREAM_NAME)
response.state

State(messages=0, bytes=0, first_seq=3, first_ts=datetime.datetime(1, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), last_seq=2, last_ts=datetime.datetime(2021, 8, 9, 0, 56, 0, 477100, tzinfo=datetime.timezone.utc), deleted=None, num_deleted=None, lost=None, consumer_count=0)

`first_ts` is reinitialized but `last_ts` is kept

- Backup a whole stream

TODO