Skip to content

Commit

Permalink
Fix TODO in AmqpConsumerActor; add "to" AMQP property.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch-si.com>
  • Loading branch information
yufei-cai committed Oct 31, 2019
1 parent 5a51063 commit ea5fac6
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -286,13 +286,15 @@ private void throttleMessageConsumer() {
return new ThrottleState(interval, nextMessages);
});
if (state.currentMessagePerInterval >= throttlingLimit) {
// TODO: add monitoring logs after merge
log.info("Stopping message consumer, message limit of {}/{} exceeded.", throttlingLimit,
throttlingInterval);
stopMessageConsumer();
// calculate timestamp of next interval when the consumer should be restarted
final long restartConsumerAt = (interval + 1) * throttlingInterval.toMillis();
getSelf().tell(new RestartMessageConsumer(restartConsumerAt), ActorRef.noSender());
inboundMonitor.getCounter().recordFailure();
inboundMonitor.getLogger()
.failure("Source <{0}> is rate-limited due to excessive messaging.", sourceAddress);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public final class AmqpPublisherActor extends BasePublisherActor<AmqpTarget> {
JMS_HEADER_MAPPING.put("message-id", wrap(Message::setJMSMessageID));
JMS_HEADER_MAPPING.put("reply-to", wrap((message, value) -> message.setJMSReplyTo(new JmsQueue(value))));
JMS_HEADER_MAPPING.put("subject", wrap(Message::setJMSType));
JMS_HEADER_MAPPING.put("to", wrap(Message::setJMSType));
JMS_HEADER_MAPPING.put(DittoHeaderDefinition.CONTENT_TYPE.getKey(), wrap((message, value) -> {
if (message instanceof JmsMessage) {
final JmsMessageFacade facade = ((JmsMessage) message).getFacade();
Expand Down

0 comments on commit ea5fac6

Please sign in to comment.