Skip to content

Commit

Permalink
AppConfig/advertiseSubscriptions as a safety switch for SDK (#295)
Browse files Browse the repository at this point in the history
Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com>
  • Loading branch information
dorjesinpo committed May 24, 2024
1 parent da8e609 commit a9116fc
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 78 deletions.
3 changes: 3 additions & 0 deletions src/groups/bmq/bmqp/bmqp_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,9 @@ const char MessagePropertiesFeatures::k_FIELD_NAME[] = "MPS";
const char MessagePropertiesFeatures::k_MESSAGE_PROPERTIES_EX[] =
"MESSAGE_PROPERTIES_EX";

const char SubscriptionsFeatures::k_FIELD_NAME[] = "SUBSCRIPTIONS";
const char SubscriptionsFeatures::k_CONFIGURE_STREAM[] = "CONFIGURE_STREAM";

// -----------------
// struct OptionType
// -----------------
Expand Down
9 changes: 9 additions & 0 deletions src/groups/bmq/bmqp/bmqp_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,15 @@ struct MessagePropertiesFeatures {
static const char k_MESSAGE_PROPERTIES_EX[];
};

/// TEMPORARILY. This struct defines feature names related to Subscriptions
struct SubscriptionsFeatures {
/// Field name of the encoding features
static const char k_FIELD_NAME[];

// CONSTANTS
static const char k_CONFIGURE_STREAM[];
};

// =================
// struct OptionType
// =================
Expand Down
18 changes: 14 additions & 4 deletions src/groups/mqb/mqba/mqba_sessionnegotiator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,17 @@ void loadBrokerIdentity(bmqp_ctrlmsg::ClientIdentity* identity,
.append(bmqp::MessagePropertiesFeatures::k_FIELD_NAME)
.append(":")
.append(bmqp::MessagePropertiesFeatures::k_MESSAGE_PROPERTIES_EX);

const mqbcfg::AppConfig& theConfig = mqbcfg::BrokerConfig::get();

if (theConfig.brokerVersion() == 999999 ||
(theConfig.configureStream() &&
theConfig.advertiseSubscriptions())) {
features.append(";")
.append(bmqp::SubscriptionsFeatures::k_FIELD_NAME)
.append(":")
.append(bmqp::SubscriptionsFeatures::k_CONFIGURE_STREAM);
}
}

identity->protocolVersion() = bmqp::Protocol::k_VERSION;
Expand All @@ -172,14 +183,13 @@ void loadBrokerIdentity(bmqp_ctrlmsg::ClientIdentity* identity,
// as a means to prevent SDK from generating 'V2'.
// Regardless of SDK, brokers now decompress MPs and send ConfigureStream.

if (mqbcfg::BrokerConfig::get().brokerVersion() == 999999) {
const mqbcfg::AppConfig& theConfig = mqbcfg::BrokerConfig::get();
if (theConfig.brokerVersion() == 999999) {
// Always advertise v2 (EX) support in test build (developer workflow,
// CI, Jenkins, etc).
shouldExtendMessageProperties = true;
}
else if (mqbcfg::BrokerConfig::get()
.messagePropertiesV2()
.advertiseV2Support()) {
else if (theConfig.messagePropertiesV2().advertiseV2Support()) {
// In non test build (i.e., dev and non-dev deployments, advertise v2
// (EX) support only if configured like that.

Expand Down
4 changes: 3 additions & 1 deletion src/groups/mqb/mqbcfg/mqbcfg.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@
bmqconfConfig........: configuration for bmqconf
plugins..............: configuration for the plugins
msgPropertiesSupport.: information about if/how to advertise support for v2 message properties
configureStream......: send new ConfigureStream instead of old ConfigureQueue/>
configureStream......: send new ConfigureStream instead of old ConfigureQueue
advertiseSubscriptions.: temporarily control use of ConfigureStream in SDK/>
</documentation>
</annotation>
<sequence>
Expand All @@ -97,6 +98,7 @@
<element name='plugins' type='tns:Plugins'/>
<element name='messagePropertiesV2' type='tns:MessagePropertiesV2'/>
<element name='configureStream' type='boolean' default='false'/>
<element name='advertiseSubscriptions' type='boolean' default='false'/>
</sequence>
</complexType>

Expand Down
88 changes: 53 additions & 35 deletions src/groups/mqb/mqbcfg/mqbcfg_messages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5158,6 +5158,8 @@ const char AppConfig::DEFAULT_INITIALIZER_LATENCY_MONITOR_DOMAIN[] =

const bool AppConfig::DEFAULT_INITIALIZER_CONFIGURE_STREAM = false;

const bool AppConfig::DEFAULT_INITIALIZER_ADVERTISE_SUBSCRIPTIONS = false;

const bdlat_AttributeInfo AppConfig::ATTRIBUTE_INFO_ARRAY[] = {
{ATTRIBUTE_ID_BROKER_INSTANCE_NAME,
"brokerInstanceName",
Expand Down Expand Up @@ -5243,14 +5245,19 @@ const bdlat_AttributeInfo AppConfig::ATTRIBUTE_INFO_ARRAY[] = {
"configureStream",
sizeof("configureStream") - 1,
"",
bdlat_FormattingMode::e_TEXT},
{ATTRIBUTE_ID_ADVERTISE_SUBSCRIPTIONS,
"advertiseSubscriptions",
sizeof("advertiseSubscriptions") - 1,
"",
bdlat_FormattingMode::e_TEXT}};

// CLASS METHODS

const bdlat_AttributeInfo* AppConfig::lookupAttributeInfo(const char* name,
int nameLength)
{
for (int i = 0; i < 17; ++i) {
for (int i = 0; i < 18; ++i) {
const bdlat_AttributeInfo& attributeInfo =
AppConfig::ATTRIBUTE_INFO_ARRAY[i];

Expand Down Expand Up @@ -5300,6 +5307,8 @@ const bdlat_AttributeInfo* AppConfig::lookupAttributeInfo(int id)
return &ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_MESSAGE_PROPERTIES_V2];
case ATTRIBUTE_ID_CONFIGURE_STREAM:
return &ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_CONFIGURE_STREAM];
case ATTRIBUTE_ID_ADVERTISE_SUBSCRIPTIONS:
return &ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_ADVERTISE_SUBSCRIPTIONS];
default: return 0;
}
}
Expand All @@ -5325,6 +5334,7 @@ AppConfig::AppConfig(bslma::Allocator* basicAllocator)
, d_logsObserverMaxSize()
, d_isRunningOnDev()
, d_configureStream(DEFAULT_INITIALIZER_CONFIGURE_STREAM)
, d_advertiseSubscriptions(DEFAULT_INITIALIZER_ADVERTISE_SUBSCRIPTIONS)
{
}

Expand All @@ -5347,6 +5357,7 @@ AppConfig::AppConfig(const AppConfig& original,
, d_logsObserverMaxSize(original.d_logsObserverMaxSize)
, d_isRunningOnDev(original.d_isRunningOnDev)
, d_configureStream(original.d_configureStream)
, d_advertiseSubscriptions(original.d_advertiseSubscriptions)
{
}

Expand All @@ -5369,7 +5380,8 @@ AppConfig::AppConfig(AppConfig&& original) noexcept
d_configVersion(bsl::move(original.d_configVersion)),
d_logsObserverMaxSize(bsl::move(original.d_logsObserverMaxSize)),
d_isRunningOnDev(bsl::move(original.d_isRunningOnDev)),
d_configureStream(bsl::move(original.d_configureStream))
d_configureStream(bsl::move(original.d_configureStream)),
d_advertiseSubscriptions(bsl::move(original.d_advertiseSubscriptions))
{
}

Expand All @@ -5393,6 +5405,7 @@ AppConfig::AppConfig(AppConfig&& original, bslma::Allocator* basicAllocator)
, d_logsObserverMaxSize(bsl::move(original.d_logsObserverMaxSize))
, d_isRunningOnDev(bsl::move(original.d_isRunningOnDev))
, d_configureStream(bsl::move(original.d_configureStream))
, d_advertiseSubscriptions(bsl::move(original.d_advertiseSubscriptions))
{
}
#endif
Expand All @@ -5406,23 +5419,24 @@ AppConfig::~AppConfig()
AppConfig& AppConfig::operator=(const AppConfig& rhs)
{
if (this != &rhs) {
d_brokerInstanceName = rhs.d_brokerInstanceName;
d_brokerVersion = rhs.d_brokerVersion;
d_configVersion = rhs.d_configVersion;
d_etcDir = rhs.d_etcDir;
d_hostName = rhs.d_hostName;
d_hostTags = rhs.d_hostTags;
d_hostDataCenter = rhs.d_hostDataCenter;
d_isRunningOnDev = rhs.d_isRunningOnDev;
d_logsObserverMaxSize = rhs.d_logsObserverMaxSize;
d_brokerInstanceName = rhs.d_brokerInstanceName;
d_brokerVersion = rhs.d_brokerVersion;
d_configVersion = rhs.d_configVersion;
d_etcDir = rhs.d_etcDir;
d_hostName = rhs.d_hostName;
d_hostTags = rhs.d_hostTags;
d_hostDataCenter = rhs.d_hostDataCenter;
d_isRunningOnDev = rhs.d_isRunningOnDev;
d_logsObserverMaxSize = rhs.d_logsObserverMaxSize;
d_latencyMonitorDomain = rhs.d_latencyMonitorDomain;
d_dispatcherConfig = rhs.d_dispatcherConfig;
d_stats = rhs.d_stats;
d_networkInterfaces = rhs.d_networkInterfaces;
d_bmqconfConfig = rhs.d_bmqconfConfig;
d_plugins = rhs.d_plugins;
d_messagePropertiesV2 = rhs.d_messagePropertiesV2;
d_configureStream = rhs.d_configureStream;
d_dispatcherConfig = rhs.d_dispatcherConfig;
d_stats = rhs.d_stats;
d_networkInterfaces = rhs.d_networkInterfaces;
d_bmqconfConfig = rhs.d_bmqconfConfig;
d_plugins = rhs.d_plugins;
d_messagePropertiesV2 = rhs.d_messagePropertiesV2;
d_configureStream = rhs.d_configureStream;
d_advertiseSubscriptions = rhs.d_advertiseSubscriptions;
}

return *this;
Expand All @@ -5433,23 +5447,24 @@ AppConfig& AppConfig::operator=(const AppConfig& rhs)
AppConfig& AppConfig::operator=(AppConfig&& rhs)
{
if (this != &rhs) {
d_brokerInstanceName = bsl::move(rhs.d_brokerInstanceName);
d_brokerVersion = bsl::move(rhs.d_brokerVersion);
d_configVersion = bsl::move(rhs.d_configVersion);
d_etcDir = bsl::move(rhs.d_etcDir);
d_hostName = bsl::move(rhs.d_hostName);
d_hostTags = bsl::move(rhs.d_hostTags);
d_hostDataCenter = bsl::move(rhs.d_hostDataCenter);
d_isRunningOnDev = bsl::move(rhs.d_isRunningOnDev);
d_logsObserverMaxSize = bsl::move(rhs.d_logsObserverMaxSize);
d_brokerInstanceName = bsl::move(rhs.d_brokerInstanceName);
d_brokerVersion = bsl::move(rhs.d_brokerVersion);
d_configVersion = bsl::move(rhs.d_configVersion);
d_etcDir = bsl::move(rhs.d_etcDir);
d_hostName = bsl::move(rhs.d_hostName);
d_hostTags = bsl::move(rhs.d_hostTags);
d_hostDataCenter = bsl::move(rhs.d_hostDataCenter);
d_isRunningOnDev = bsl::move(rhs.d_isRunningOnDev);
d_logsObserverMaxSize = bsl::move(rhs.d_logsObserverMaxSize);
d_latencyMonitorDomain = bsl::move(rhs.d_latencyMonitorDomain);
d_dispatcherConfig = bsl::move(rhs.d_dispatcherConfig);
d_stats = bsl::move(rhs.d_stats);
d_networkInterfaces = bsl::move(rhs.d_networkInterfaces);
d_bmqconfConfig = bsl::move(rhs.d_bmqconfConfig);
d_plugins = bsl::move(rhs.d_plugins);
d_messagePropertiesV2 = bsl::move(rhs.d_messagePropertiesV2);
d_configureStream = bsl::move(rhs.d_configureStream);
d_dispatcherConfig = bsl::move(rhs.d_dispatcherConfig);
d_stats = bsl::move(rhs.d_stats);
d_networkInterfaces = bsl::move(rhs.d_networkInterfaces);
d_bmqconfConfig = bsl::move(rhs.d_bmqconfConfig);
d_plugins = bsl::move(rhs.d_plugins);
d_messagePropertiesV2 = bsl::move(rhs.d_messagePropertiesV2);
d_configureStream = bsl::move(rhs.d_configureStream);
d_advertiseSubscriptions = bsl::move(rhs.d_advertiseSubscriptions);
}

return *this;
Expand All @@ -5475,6 +5490,7 @@ void AppConfig::reset()
bdlat_ValueTypeFunctions::reset(&d_plugins);
bdlat_ValueTypeFunctions::reset(&d_messagePropertiesV2);
d_configureStream = DEFAULT_INITIALIZER_CONFIGURE_STREAM;
d_advertiseSubscriptions = DEFAULT_INITIALIZER_ADVERTISE_SUBSCRIPTIONS;
}

// ACCESSORS
Expand Down Expand Up @@ -5502,6 +5518,8 @@ AppConfig::print(bsl::ostream& stream, int level, int spacesPerLevel) const
printer.printAttribute("plugins", this->plugins());
printer.printAttribute("messagePropertiesV2", this->messagePropertiesV2());
printer.printAttribute("configureStream", this->configureStream());
printer.printAttribute("advertiseSubscriptions",
this->advertiseSubscriptions());
printer.end();
return stream;
}
Expand Down Expand Up @@ -5819,7 +5837,7 @@ Configuration::print(bsl::ostream& stream, int level, int spacesPerLevel) const
} // close package namespace
} // close enterprise namespace

// GENERATED BY BLP_BAS_CODEGEN_2024.04.18
// GENERATED BY @BLP_BAS_CODEGEN_VERSION@
// USING bas_codegen.pl -m msg --noAggregateConversion --noExternalization
// --noIdent --package mqbcfg --msgComponent messages mqbcfg.xsd
// ----------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit a9116fc

Please sign in to comment.