-
Notifications
You must be signed in to change notification settings - Fork 128
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature - Add support for RabbitMQ source #1911
Feature - Add support for RabbitMQ source #1911
Conversation
- Fixed failing tests
- Fixed failing tests
- Fixed lint - Added go_test_stub.txt
- Reverted docs.go
- Update required field on host and queue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nitzangoldfeder, FYI
internal/pkg/pubsub/amqp/client.go
Outdated
amqp "github.com/rabbitmq/amqp091-go" | ||
) | ||
|
||
var ErrInvalidCredentials = errors.New("your kafka credentials are invalid. please verify you're providing the correct credentials") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nitzangoldfeder, we can remove this.
internal/pkg/pubsub/amqp/client.go
Outdated
msgs, err := ch.Consume( | ||
q.Name, // queue | ||
"", // consumer | ||
true, // auto-ack |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nitzangoldfeder, preferably, we should ack only when we've successfully written it to our internal queue. We should do this in the for loop when reading the message.
- Fixed PR comments - Added DLQ in case of failure - Ack / Nack messages - Updated UI
- Reverted docs.go
- Reverted api/ui/build/go_test_stub.txt
- Cleaned files
return err | ||
} | ||
defer ch.Close() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'll also be a good idea to close conn
here to as well
defer conn.Close()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nitzangoldfeder We closed ch
, but we didn't close conn
here.
internal/pkg/pubsub/amqp/client.go
Outdated
} | ||
} | ||
|
||
<-forever |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will block forever till the process exists. I don't think you need this, since the Consume()
function already runs in a goroutine.
The cancellation context is also not being used to signal to the Consume()
function when to return/end so that the process can release the connection
and channel
resources.
One way to approach this is to check if k.ctx
has been cancelled.
for d := range msgs {
if errors.Is(k.ctx.Err(), context.Canceled) {
return
}
// rest of code
}
And then close the connection
and channel
before you return from the Consume()
function.
func (k *Amqp) Consume() {
conn, err := k.dialer()
if err != nil {
log.WithError(err).Error("failed to instanciate a connection")
return
}
ch, err := conn.Channel()
if err != nil {
log.WithError(err).Error("failed to instanciate a channel")
return
}
defer conn.Close()
defer ch.Close()
// rest of code
}
- Using consume with ctx - Checking k.ctx is error and exiting function to release connections
- Removed check for ctx in msgs loop
|
||
for d := range msgs { | ||
ctx := context.Background() | ||
if err := k.handler(ctx, k.source, string(d.Body)); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nitzangoldfeder, we should prefer k.ctx
here instead of context.Background()
return err | ||
} | ||
defer ch.Close() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nitzangoldfeder We closed ch
, but we didn't close conn
here.
@Dotunj, this lgtm!, do you have any other requests? |
- Fixed linter
LGTM |
Hello Team
I've added support for RabbitMQ source for outgoing projects
The UI will enable to create a new pubsub called RabbitMQ / AMQP
Its required to provide shcema / host / port and queue name
On the backend:
Tested creation of this source on my local machine and validated that messages are being consumed on the backend