Skip to content

Commit

Permalink
Add bridge_session_expiry_interval option for MQTT v5.0 bridges.
Browse files Browse the repository at this point in the history
  • Loading branch information
ralight committed Nov 14, 2021
1 parent cf3be2e commit 99eddeb
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 8 deletions.
1 change: 1 addition & 0 deletions ChangeLog.txt
Expand Up @@ -85,6 +85,7 @@ Broker:
- Allow multiple instances of mosquitto to run as services on Windows. See
README-windows.txt.
- Add ability to deny wildcard subscriptions for a role to the dynsec plugin.
- Add bridge_session_expiry_interval option for MQTT v5.0 bridges.

Client library:
- Add MOSQ_OPT_DISABLE_SOCKETPAIR to allow the disabling of the socketpair
Expand Down
15 changes: 15 additions & 0 deletions man/mosquitto.conf.5.xml
Expand Up @@ -1828,6 +1828,21 @@ openssl dhparam -out dhparam.pem 2048</programlisting>
<replaceable>mqttv311</replaceable>.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>bridge_session_expiry_interval</option> <replaceable>interval</replaceable></term>
<listitem>
<para>
If the bridge is using MQTT v5.0 then use
<option>bridge_session_expiry_interval</option>
to set the session expiry interval. Set to
<replaceable>0</replaceable> to have the session expire
immediately when the connection drops. Set to
<replaceable>0xFFFFFFFF</replaceable> to have an infinite
expiry interval. Otherwise the expiry interval is set to
the number of seconds that you specify. Defaults to 0.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>bridge_tcp_keepalive</option> <replaceable>idle</replaceable> <replaceable>interval</replaceable> <replaceable>counter</replaceable></term>
<listitem>
Expand Down
8 changes: 8 additions & 0 deletions mosquitto.conf
Expand Up @@ -778,6 +778,14 @@
# remote broker, and delivered when the bridge reconnects.
#cleansession false

# If the bridge is using MQTT v5.0 then use bridge_session_expiry_interval to
# set the session expiry interval. Set to 0 to have the session expire
# immediately when the connection drops. Set to 0xFFFFFFFF to have an infinite
# expiry interval. Otherwise the expiry interval is set to the number of
# seconds that you specify.
# Defaults to 0.
#bridge_session_expiry_interval 0

# Set the amount of time a bridge using the lazy start type must be idle before
# it will be stopped. Defaults to 60 seconds.
#idle_timeout 60
Expand Down
34 changes: 26 additions & 8 deletions src/bridge.c
Expand Up @@ -366,7 +366,9 @@ static int bridge__connect_step2(struct mosquitto *context)
int bridge__connect_step3(struct mosquitto *context)
{
int rc;
mosquitto_property topic_alias_max, *topic_alias_max_prop = NULL;
mosquitto_property topic_alias_max;
mosquitto_property session_expiry_interval;
mosquitto_property *properties = NULL;

rc = net__socket_connect_step3(context, context->bridge->addresses[context->bridge->cur_address].address);
if(rc > 0){
Expand Down Expand Up @@ -394,14 +396,21 @@ int bridge__connect_step3(struct mosquitto *context)
#endif

if(context->bridge->max_topic_alias != 0){
topic_alias_max.next = NULL;
topic_alias_max.value.i16 = context->bridge->max_topic_alias;
topic_alias_max.identifier = MQTT_PROP_TOPIC_ALIAS_MAXIMUM;
topic_alias_max.client_generated = false;
topic_alias_max_prop = &topic_alias_max;
topic_alias_max.next = properties;
properties = &topic_alias_max;
}
if(context->bridge->session_expiry_interval != 0){
session_expiry_interval.value.i32 = context->bridge->session_expiry_interval;
session_expiry_interval.identifier = MQTT_PROP_SESSION_EXPIRY_INTERVAL;
session_expiry_interval.client_generated = false;
session_expiry_interval.next = properties;
properties = &session_expiry_interval;
}

rc = send__connect(context, context->keepalive, context->clean_start, topic_alias_max_prop);
rc = send__connect(context, context->keepalive, context->clean_start, properties);
if(rc == MOSQ_ERR_SUCCESS){
return MOSQ_ERR_SUCCESS;
}else if(rc == MOSQ_ERR_ERRNO && errno == ENOTCONN){
Expand Down Expand Up @@ -429,7 +438,9 @@ int bridge__connect(struct mosquitto *context)
uint8_t notification_payload;
struct mosquitto__bridge_topic *cur_topic;
uint8_t qos;
mosquitto_property topic_alias_max, *topic_alias_max_prop = NULL;
mosquitto_property topic_alias_max;
mosquitto_property session_expiry_interval;
mosquitto_property *properties = NULL;

if(!context || !context->bridge) return MOSQ_ERR_INVAL;

Expand Down Expand Up @@ -550,14 +561,21 @@ int bridge__connect(struct mosquitto *context)
#endif

if(context->bridge->max_topic_alias){
topic_alias_max.next = NULL;
topic_alias_max.value.i16 = context->bridge->max_topic_alias;
topic_alias_max.identifier = MQTT_PROP_TOPIC_ALIAS_MAXIMUM;
topic_alias_max.client_generated = false;
topic_alias_max_prop = &topic_alias_max;
topic_alias_max.next = properties;
properties = &topic_alias_max;
}
if(context->bridge->session_expiry_interval != 0){
session_expiry_interval.value.i32 = context->bridge->session_expiry_interval;
session_expiry_interval.identifier = MQTT_PROP_SESSION_EXPIRY_INTERVAL;
session_expiry_interval.client_generated = false;
session_expiry_interval.next = properties;
properties = &session_expiry_interval;
}

rc2 = send__connect(context, context->keepalive, context->clean_start, topic_alias_max_prop);
rc2 = send__connect(context, context->keepalive, context->clean_start, properties);
if(rc2 == MOSQ_ERR_SUCCESS){
return rc;
}else if(rc2 == MOSQ_ERR_ERRNO && errno == ENOTCONN){
Expand Down
19 changes: 19 additions & 0 deletions src/conf.c
Expand Up @@ -1292,6 +1292,25 @@ static int config__read_file_core(struct mosquitto__config *config, bool reload,
}
#else
log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Bridge support not available.");
#endif
}else if(!strcmp(token, "bridge_session_expiry_interval")){
#if defined(WITH_BRIDGE)
if(reload) continue; /* Bridges not valid for reloading. */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
}
if(conf__parse_int(&token, "bridge_session_expiry_interval", &tmp_int, &saveptr)) return MOSQ_ERR_INVAL;
if(tmp_int < 0){
log__printf(NULL, MOSQ_LOG_ERR, "Error: bridge_session_expiry_interval must not be negative.");
return MOSQ_ERR_INVAL;
}else if((uint64_t)tmp_int > (uint64_t)UINT32_MAX){
log__printf(NULL, MOSQ_LOG_ERR, "Error: bridge_session_expiry_interval must be lower than %u.", UINT32_MAX);
return MOSQ_ERR_INVAL;
}
cur_bridge->session_expiry_interval = (uint32_t)tmp_int;
#else
log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Bridge support not available.");
#endif
}else if(!strcmp(token, "bridge_tcp_keepalive")){
#ifdef WITH_BRIDGE
Expand Down
1 change: 1 addition & 0 deletions src/mosquitto_broker_internal.h
Expand Up @@ -581,6 +581,7 @@ struct mosquitto__bridge{
int backoff_cap;
int threshold;
uint32_t maximum_packet_size;
uint32_t session_expiry_interval;
bool lazy_reconnect;
bool attempt_unsubscribe;
bool initial_notification_done;
Expand Down

0 comments on commit 99eddeb

Please sign in to comment.