Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NATS JetStream output not acknowledging delivered messages #1433

Closed
davidandradeduarte opened this issue Sep 5, 2022 · 3 comments
Closed
Labels
needs more info An issue that may be a bug or useful feature, but requires more information outputs Any tasks or issues relating specifically to outputs question

Comments

@davidandradeduarte
Copy link
Contributor

davidandradeduarte commented Sep 5, 2022

Hi 👋

I'm trying to use the nats_jetstream output, but it seems like it's not acknowledging delivered messages.
This simple pipeline doesn't work for me:

input:
  generate:
    mapping: root = {"test":"message","id":uuid_v4()}
    count: 5

output:
  nats_jetstream:
    urls:
      - nats://localhost:4222
    subject: foo

First 5 messages are delivered without any error, after that it keeps trying to deliver (this time with a timeout error) on an infinite loop.

I have a repository with repro steps, if you'd like to use it.
Not sure if I'm doing something wrong, as this is such a simple pipeline.
I've tried to have a look at how this output ack's messages, but had no luck. Also looking at the integration tests for this output, it doesn't seem like it's sending anything to the input, it's just testing connectivity.

Anyway, hope anyone can help. 🤗

@mihaitodor
Copy link
Collaborator

Works for me on my Macbook...

> # Remove everything from `~/.nsc` and ~/.config/nats
> nats-server --version
nats-server: v2.8.4
> nats-server -js
[5013] 2022/09/05 16:32:31.876112 [INF] Starting nats-server
[5013] 2022/09/05 16:32:31.876253 [INF]   Version:  2.8.4
[5013] 2022/09/05 16:32:31.876256 [INF]   Git:      [not set]
[5013] 2022/09/05 16:32:31.876261 [INF]   Name:     NDI4H73VAL6L4IGAFQYQT46TYPLGVLAQJHRHZMIA5WE5AWMV3VQGOPU6
[5013] 2022/09/05 16:32:31.876267 [INF]   Node:     iKRCTI23
[5013] 2022/09/05 16:32:31.876269 [INF]   ID:       NDI4H73VAL6L4IGAFQYQT46TYPLGVLAQJHRHZMIA5WE5AWMV3VQGOPU6
[5013] 2022/09/05 16:32:31.880842 [INF] Starting JetStream
[5013] 2022/09/05 16:32:31.881162 [INF]     _ ___ _____ ___ _____ ___ ___   _   __  __
[5013] 2022/09/05 16:32:31.881170 [INF]  _ | | __|_   _/ __|_   _| _ \ __| /_\ |  \/  |
[5013] 2022/09/05 16:32:31.881174 [INF] | || | _|  | | \__ \ | | |   / _| / _ \| |\/| |
[5013] 2022/09/05 16:32:31.881175 [INF]  \__/|___| |_| |___/ |_| |_|_\___/_/ \_\_|  |_|
[5013] 2022/09/05 16:32:31.881177 [INF]
[5013] 2022/09/05 16:32:31.881179 [INF]          https://docs.nats.io/jetstream
[5013] 2022/09/05 16:32:31.881180 [INF]
[5013] 2022/09/05 16:32:31.881182 [INF] ---------------- JETSTREAM ----------------
[5013] 2022/09/05 16:32:31.881186 [INF]   Max Memory:      12.00 GB
[5013] 2022/09/05 16:32:31.881189 [INF]   Max Storage:     16.71 GB
[5013] 2022/09/05 16:32:31.881191 [INF]   Store Directory: "/var/folders/6d/qn3_3gvs5vb3t1cpmjkcm_jc0000gn/T/nats/jetstream"
[5013] 2022/09/05 16:32:31.883365 [INF] -------------------------------------------
[5013] 2022/09/05 16:32:31.884692 [INF] Listening for client connections on 0.0.0.0:4222
[5013] 2022/09/05 16:32:31.884904 [INF] Server is ready
^C[5013] 2022/09/05 16:35:55.104226 [INF] Initiating Shutdown...
[5013] 2022/09/05 16:35:55.104988 [INF] Initiating JetStream Shutdown...
[5013] 2022/09/05 16:35:55.107473 [INF] JetStream Shutdown
[5013] 2022/09/05 16:35:55.108339 [INF] Server Exiting..
> nats stream info foo
Information for Stream foo created 2022-09-05T16:33:10+01:00

Configuration:

             Subjects: foo
     Acknowledgements: true
            Retention: File - Limits
             Replicas: 1
       Discard Policy: Old
     Duplicate Window: 2m0s
    Allows Msg Delete: true
         Allows Purge: true
       Allows Rollups: false
     Maximum Messages: unlimited
        Maximum Bytes: unlimited
          Maximum Age: unlimited
 Maximum Message Size: unlimited
    Maximum Consumers: unlimited


State:

             Messages: 20
                Bytes: 1.9 KiB
             FirstSeq: 1 @ 2022-09-05T15:33:24 UTC
              LastSeq: 20 @ 2022-09-05T15:35:34 UTC
     Active Consumers: 0

> nats sub foo
16:35:18 Subscribing on foo
[#1] Received on "foo" with reply "_INBOX.73qGS98drYD0yi7N1c6wFI.ZWtlP5iv"
{"id":"87e2960a-a746-4c97-ba09-5761cf2b89de","test":"message"}

[#2] Received on "foo" with reply "_INBOX.73qGS98drYD0yi7N1c6wFI.Sed7y8h4"
{"id":"f9ddbf1e-019d-40ad-ae29-139b92ab88ff","test":"message"}

[#3] Received on "foo" with reply "_INBOX.73qGS98drYD0yi7N1c6wFI.WP6KzLBn"
{"id":"e1f59ae2-aa9e-4f8d-aea1-aecfc06d872b","test":"message"}

[#4] Received on "foo" with reply "_INBOX.73qGS98drYD0yi7N1c6wFI.uXYIUHIu"
{"id":"b0c0779d-b0fc-4add-b64e-880800da5cf6","test":"message"}

[#5] Received on "foo" with reply "_INBOX.73qGS98drYD0yi7N1c6wFI.uAGyeVJy"
{"id":"bd2c59be-3b3c-4fe6-aad1-3c46d9a2d7d4","test":"message"}

[#6] Received on "foo" with reply "_INBOX.pIWtdp4RsPHKKCi3hg0nwy.plqO6JkN"
{"id":"38a6d14e-54c7-48a9-a813-8451a26f3cbd","test":"message"}

[#7] Received on "foo" with reply "_INBOX.pIWtdp4RsPHKKCi3hg0nwy.axyrMDXH"
{"id":"b427d512-5014-4632-9b89-d3e3176e3ea2","test":"message"}

[#8] Received on "foo" with reply "_INBOX.pIWtdp4RsPHKKCi3hg0nwy.CaYmLZLQ"
{"id":"acd82570-ac77-48f8-801b-b77307832b62","test":"message"}

[#9] Received on "foo" with reply "_INBOX.pIWtdp4RsPHKKCi3hg0nwy.tqAWzdxA"
{"id":"0213b021-4688-44bc-bd92-38fe6bd9bd04","test":"message"}

[#10] Received on "foo" with reply "_INBOX.pIWtdp4RsPHKKCi3hg0nwy.XOqYhBzf"
{"id":"c7cc1fe5-75d7-4d3d-8d17-b065a41925c1","test":"message"}

16:35:55 Disconnected due to: EOF, will attempt reconnect
^C
> benthos --version
Version: 4.6.0
> cat config.yaml
input:
  generate:
    mapping: root = {"test":"message","id":uuid_v4()}
    count: 5

output:
  nats_jetstream:
    urls:
      - nats://localhost:4222
    subject: foo
> benthos -c ./config.yaml
INFO Running main config from specified file       @service=benthos path=./config.yaml
INFO Launching a benthos instance, use CTRL+C to close  @service=benthos
INFO Listening for HTTP requests at: http://0.0.0.0:4195  @service=benthos
INFO Sending NATS messages to JetStream subject: foo  @service=benthos label="" path=root.output
INFO Pipeline has terminated. Shutting down the service  @service=benthos
> benthos -c ./config.yaml
INFO Running main config from specified file       @service=benthos path=./config.yaml
INFO Launching a benthos instance, use CTRL+C to close  @service=benthos
INFO Listening for HTTP requests at: http://0.0.0.0:4195  @service=benthos
INFO Sending NATS messages to JetStream subject: foo  @service=benthos label="" path=root.output
INFO Pipeline has terminated. Shutting down the service  @service=benthos

Would you mind trying again? Maybe you're using some outdated version of Jetstream or you have some specific Jestream configuration that's causing trouble...

@mihaitodor mihaitodor added outputs Any tasks or issues relating specifically to outputs needs more info An issue that may be a bug or useful feature, but requires more information question labels Sep 5, 2022
@davidandradeduarte
Copy link
Contributor Author

Thanks, @mihaitodor.

I've found that the issue only happens when there's no stream for such subject.
So I guess I'll need to manually create the NATS streams beforehand.

I'll probably add a processor to handle stream creation, since I want it to be dynamically named.
Anyway, thank you.

Closing the issue 👍

@mihaitodor
Copy link
Collaborator

@davidandradeduarte Yeah, I think that's a Jetstream gotcha. I think NATS itself doesn't require it, but I haven't checked

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs more info An issue that may be a bug or useful feature, but requires more information outputs Any tasks or issues relating specifically to outputs question
Projects
None yet
Development

No branches or pull requests

2 participants