Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Work on allowing federates to restart in a dynamic way #2624

Merged
merged 21 commits into from Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
3833f35
Work on allowing federates to restart in a dynamic way
phlptp Feb 29, 2024
8e04a5f
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 29, 2024
2c27b70
clang-tidy fixes
phlptp Mar 1, 2024
4b1c248
run a disconnect operation to allow reset of a new connection for dyn…
phlptp Mar 2, 2024
c2d7e66
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 2, 2024
2b6a9c7
fix disconnect infinite loop
phlptp Mar 2, 2024
41658c1
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 2, 2024
f6df716
clean up the test to be more consistent
phlptp Mar 3, 2024
4cada67
get reconnectable interfaces working
phlptp Mar 5, 2024
f587443
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 5, 2024
f1ccb35
pass endpoint reentrant test
phlptp Mar 6, 2024
33b5355
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 6, 2024
9f96868
allow cores to have dynamic federates after init.
phlptp Mar 7, 2024
c523714
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 7, 2024
cfa7db6
update docs with new features
phlptp Mar 8, 2024
21a8259
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 8, 2024
1db41d1
Update configuration_options_reference.md
trevorhardy Mar 12, 2024
aed276e
Update configuration_options_reference.md
trevorhardy Mar 12, 2024
839344c
resolve merge conflicts
phlptp Mar 13, 2024
de3199f
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 13, 2024
e38df70
Apply suggestions from code review
phlptp Mar 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions CMakeLists.txt
Expand Up @@ -17,14 +17,14 @@ if(DEFINED ENV{VCPKG_ROOT} AND NOT DEFINED CMAKE_TOOLCHAIN_FILE AND NOT HELICS_D
set(CMAKE_TOOLCHAIN_FILE "$ENV{VCPKG_ROOT}/scripts/buildsystems/vcpkg.cmake" CACHE STRING "")
endif()

project(HELICS VERSION 3.5.0)
project(HELICS VERSION 3.5.1)

# -----------------------------------------------------------------------------
# HELICS Version number
# -----------------------------------------------------------------------------
set(HELICS_VERSION_BUILD)
# use ISO date YYYY-MM-DD
set(HELICS_DATE "2024-02-06")
set(HELICS_DATE "2024-03-08")

set(HELICS_VERSION_UNDERSCORE
"${HELICS_VERSION_MAJOR}_${HELICS_VERSION_MINOR}_${HELICS_VERSION_PATCH}"
Expand Down
27 changes: 27 additions & 0 deletions docs/references/configuration_options_reference.md
Expand Up @@ -76,6 +76,7 @@ An example of one publication, subscription, named input, endpoint, and filter i
"terminate_on_error": false,
"source_only": false,
"observer": false,
"dynamic":false,
"only_update_on_change": false,
"only_transmit_on_change": false,
"broker_key": "",
Expand Down Expand Up @@ -394,6 +395,20 @@ Used to indicate to the federation that this federate produces no data and only

---

### `reentrant` [false]

_API:_ `helicsFederateInfoSetFlagOption`
[C++](https://docs.helics.org/en/latest/doxygen/classhelics_1_1CoreFederateInfo.html#a63efa7762fdc8a9d9869bbed6939448e)
| [C](api-reference/C_API.md#federateinfo)
| [Python](https://python.helics.org/api/capi-py/#helicsFederateInfoSetFlagOption)
| [Julia](https://julia.helics.org/latest/api/#HELICS.helicsFederateInfoSetFlagOption-Tuple{HELICS.FederateInfo,%20Union{Int64,%20HELICS.Lib.HelicsFederateFlags},%20Bool})

_Property's enumerated name:_ `HELICS_FLAG_REENTRANT` [38]

Used to indicate to the broker that this federate may disconnect and reconnect at a later time using the same federate name. Without setting this flag, the federate would have to rejoin under a different name and would be considered a new federate by the federation. This flag only has an effect if the "dynamic" flag is also set on the broker.

---

### `broker_key` [null]

_API:_ `helicsFederateSetBrokerKey`
Expand Down Expand Up @@ -912,6 +927,18 @@ _Property's enumerated name:_ `HELICS_HANDLE_OPTION_CONNECTION_OPTIONAL` [402]

When an interface requests a target it tries to find a match in the federation. If it cannot find a match at the time the federation is initialized, then the default is to generate a warning. This will not halt the federation but will display a log message. If the `connections_optional` flag is set on a federate all subsequent `addTarget` calls on any interface will not generate any message if the target is not available.

### `reconnectable` [false]

_API:_ `helicsFederateInfoSetFlagOption`
[C++](https://docs.helics.org/en/latest/doxygen/classhelics_1_1CoreFederateInfo.html#a63efa7762fdc8a9d9869bbed6939448e)
| [C](api-reference/C_API.md#federateinfo)
| [Python](https://python.helics.org/api/capi-py/#helicsFederateInfoSetFlagOption)
| [Julia](https://julia.helics.org/latest/api/#HELICS.helicsFederateInfoSetFlagOption-Tuple{HELICS.FederateInfo,%20Union{Int64,%20HELICS.Lib.HelicsFederateFlags},%20Bool})

_Property's enumerated name:_ `HELICS_HANDLE_OPTION_RECONNECTABLE` [412]

When used to connect to reentrant federates, the reconnectable option can be used to allow automatic reconnection to specific interfaces. This should be used on the interface that is not reentrant.

---

### `default_global` [false]
Expand Down
7 changes: 6 additions & 1 deletion docs/user-guide/advanced_topics/dynamic_federations.md
Expand Up @@ -10,8 +10,9 @@ Dynamic federations can be thought of as being composed of features in increasin
2. Allowing federates that only receive information ("observers") to join the federation after execution has begun. As a part of joining the co-simulation the observer would need the functionality in level one to successfully subscribe to the necessary publications of other federates.
3. Allowing the creation of new publications, endpoints, or filters by existing federates which other members of the federation could then connect to as targets.
4. Allowing federates to join the co-simulation after execution has begun and create arbitrary interfaces (publications, subscriptions, endpoints, etc). This relies on all previous levels of complexity being implemented.
5. Allowing federate to disconnect and reconnect throughout the simulation.

HELICS v3.1 supported levels 1 and 2. HELICS v3.4 supports full dynamic federations (level 4).
HELICS v3.1 supported levels 1 and 2. HELICS v3.4 supports full dynamic federations (level 4). Level 5 is supported as of version v3.5.1.

## Dynamic Subscriptions

Expand Down Expand Up @@ -59,6 +60,10 @@ Dynamic publications and endpoints are implemented as of HELICS v3.4. For any la

Given the above limitations, as of HELICS v3.4 fully dynamic federations are supported. By setting the `--dynamic` flag on the root broker of a federation and any intermediate brokers or cores to which dynamic federates may be added, federates may join the federation late. (HELICS have always been able to leave a federation early.) And as with observer federates, after calling `helicsFederateEnterExecutingMode()`

## Reentrant federates

As of v3.5.1 reentrant federates are allowed. They must be specified with the reentrant flag (`--reentrant`) and be used in a dynamic federation. This allows a federate to disconnect, then rejoin with the same name at a later time in the co-simulation. The second and later joins are like a dynamic federate in terms of timing. Interfaces connecting to a reentrant federate may use the HELICS_OPTION_FLAG_RECONNECTABLE to allow for dynamic/automatic reconnections when an interface with the same name is created from a reentrant federate. This may be done on the same core or a new/different core. The new (reentrant) federate does not inherit any properties (other than the name) from the previous federate with the same name.

## Example

An example of dynamic federation operation is under development though HELICS makes it very easy to support a dynamic federation. Simply add `--dynamic` to the broker initialization string for the root broker (if you are employing a [broker hierarchy](./broker_hierarchies.md)). For example, in a federation with four federates (one of which will be joining late), the call to start the broker is
Expand Down
7 changes: 7 additions & 0 deletions src/helics/application_api/FederateInfo.cpp
Expand Up @@ -147,6 +147,7 @@ static constexpr frozen::unordered_map<std::string_view, int, 95> flagStringsTra
{"interruptible", HELICS_FLAG_INTERRUPTIBLE},
{"debugging", HELICS_FLAG_DEBUGGING},
{"profiling", HELICS_FLAG_PROFILING},
{"reentrant", HELICS_FLAG_REENTRANT},
{"local_profiling_capture", HELICS_FLAG_LOCAL_PROFILING_CAPTURE},
{"profiling_marker", HELICS_FLAG_PROFILING_MARKER},
{"only_update_on_change", HELICS_FLAG_ONLY_UPDATE_ON_CHANGE},
Expand Down Expand Up @@ -215,6 +216,7 @@ static constexpr frozen::unordered_map<std::string_view, int, 95> flagStringsTra
{"buffer_data", HELICS_HANDLE_OPTION_BUFFER_DATA},
{"bufferdata", HELICS_HANDLE_OPTION_BUFFER_DATA},
{"bufferData", HELICS_HANDLE_OPTION_BUFFER_DATA},
{"reconnectable", HELICS_HANDLE_OPTION_RECONNECTABLE},
{"connection_required", HELICS_HANDLE_OPTION_CONNECTION_REQUIRED},
{"connectionrequired", HELICS_HANDLE_OPTION_CONNECTION_REQUIRED},
{"connectionRequired", HELICS_HANDLE_OPTION_CONNECTION_REQUIRED},
Expand All @@ -235,6 +237,7 @@ static constexpr frozen::unordered_map<std::string_view, int, 41> optionStringsT
{"buffer_data", HELICS_HANDLE_OPTION_BUFFER_DATA},
{"bufferdata", HELICS_HANDLE_OPTION_BUFFER_DATA},
{"bufferData", HELICS_HANDLE_OPTION_BUFFER_DATA},
{"reconnectable", HELICS_HANDLE_OPTION_RECONNECTABLE},
{"connectionoptional", HELICS_HANDLE_OPTION_CONNECTION_OPTIONAL},
{"connection_optional", HELICS_HANDLE_OPTION_CONNECTION_OPTIONAL},
{"connectionOptional", HELICS_HANDLE_OPTION_CONNECTION_OPTIONAL},
Expand Down Expand Up @@ -567,6 +570,10 @@ std::unique_ptr<helicsCLI11App> FederateInfo::makeCLIApp()
"--json",
useJsonSerialization,
"tell the core and federate to use JSON based serialization for all messages, to ensure compatibility");
app->add_flag_callback(
"--reentrant",
[this]() { setFlagOption(HELICS_FLAG_REENTRANT, true); },
"specify that the federate can be reentrant (meaning it can stop and be restarted with the same name");
app->add_option(
"--profiler",
profilerFileName,
Expand Down
11 changes: 11 additions & 0 deletions src/helics/core/BaseTimeCoordinator.cpp
Expand Up @@ -229,6 +229,11 @@ void BaseTimeCoordinator::removeDependent(GlobalFederateId fedID)
dependencies.removeDependent(fedID);
}

void BaseTimeCoordinator::resetDependency(GlobalFederateId fedID)
{
dependencies.resetDependency(fedID);
}

const DependencyInfo* BaseTimeCoordinator::getDependencyInfo(GlobalFederateId ofed) const
{
return dependencies.getDependencyInfo(ofed);
Expand Down Expand Up @@ -256,6 +261,12 @@ std::vector<GlobalFederateId> BaseTimeCoordinator::getDependents() const
return deps;
}

Time BaseTimeCoordinator::getLastGrant(GlobalFederateId fedId) const
{
const auto* dep = dependencies.getDependencyInfo(fedId);
return (dep == nullptr) ? timeZero : dep->lastGrant;
}

bool BaseTimeCoordinator::hasActiveTimeDependencies() const
{
return dependencies.hasActiveTimeDependencies();
Expand Down
7 changes: 6 additions & 1 deletion src/helics/core/BaseTimeCoordinator.hpp
Expand Up @@ -56,7 +56,9 @@ class BaseTimeCoordinator {
std::vector<GlobalFederateId> getDependencies() const;
/** get a reference to the dependents vector*/
std::vector<GlobalFederateId> getDependents() const;

/** get the last time grant from a federate */
Time getLastGrant(GlobalFederateId fedId) const;
/** set the source id for the time coordinator*/
void setSourceId(GlobalFederateId sourceId) { mSourceId = sourceId; }
GlobalFederateId sourceId() const { return mSourceId; }
/** compute updates to time values
Expand Down Expand Up @@ -103,6 +105,9 @@ class BaseTimeCoordinator {
/** remove a dependent
@param fedID the identifier of the federate to remove*/
virtual void removeDependent(GlobalFederateId fedID);
/** reset a dependency that has been reintroduced
@param fedID the identifier of the federate to reset*/
virtual void resetDependency(GlobalFederateId fedID);

void setAsChild(GlobalFederateId fedID);
void setAsParent(GlobalFederateId fedID);
Expand Down
78 changes: 55 additions & 23 deletions src/helics/core/CommonCore.cpp
Expand Up @@ -363,9 +363,11 @@ bool CommonCore::isConfigured() const
bool CommonCore::isOpenToNewFederates() const
{
auto cBrokerState = getBrokerState();
return ((cBrokerState != BrokerState::CREATED) && (cBrokerState < BrokerState::OPERATING) &&
(maxFederateCount == std::numeric_limits<int32_t>::max() ||
(federates.lock_shared()->size() < static_cast<size_t>(maxFederateCount))));
return (
(cBrokerState != BrokerState::CREATED) &&
(cBrokerState < (dynamicFederation ? BrokerState::TERMINATING : BrokerState::OPERATING)) &&
(maxFederateCount == std::numeric_limits<int32_t>::max() ||
(federates.lock_shared()->size() < static_cast<size_t>(maxFederateCount))));
}

bool CommonCore::hasError() const
Expand Down Expand Up @@ -726,6 +728,7 @@ LocalFederateId CommonCore::registerFederate(std::string_view name, const CoreFe
}
FederateState* fed = nullptr;
bool checkProperties{false};
bool newFed{true};
LocalFederateId local_id;
{
auto feds = federates.lock();
Expand All @@ -738,36 +741,54 @@ LocalFederateId CommonCore::registerFederate(std::string_view name, const CoreFe
local_id = LocalFederateId(static_cast<int32_t>(*fedid));
fed = (*feds)[*fedid];
} else {
throw(RegistrationFailure(
fmt::format("duplicate names {} detected: multiple federates with the same name",
name)));
if (dynamicFederation) {
fed = feds->find(std::string(name));
local_id = fed->local_id;
if (!fed->getOptionFlag(HELICS_FLAG_REENTRANT) ||
fed->getState() != FederateStates::FINISHED) {
throw(RegistrationFailure(fmt::format(
"duplicate names {} detected: multiple federates with the same name",
name)));
}
newFed = false;
} else {
throw(RegistrationFailure(fmt::format(
"duplicate names {} detected: multiple federates with the same name", name)));
}
}

if (feds->size() == 1) {
if (feds->size() == 1 && newFed) {
checkProperties = true;
}
}
if (fed == nullptr) {
throw(RegistrationFailure("unknown allocation error occurred"));
}
// setting up the Logger
// auto ptr = fed.get();
// if we are using the Logger, log all messages coming from the federates so they can control
// the level*/
fed->setLogger([this](int level, std::string_view ident, std::string_view message) {
sendToLogger(parent_broker_id, LogLevels::FED + level, ident, message);
});
if (newFed) {
// setting up the Logger
// auto ptr = fed.get();
// if we are using the Logger, log all messages coming from the federates so they can
// control the level*/
fed->setLogger([this](int level, std::string_view ident, std::string_view message) {
sendToLogger(parent_broker_id, LogLevels::FED + level, ident, message);
});

fed->local_id = local_id;
fed->setParent(this);
if (enable_profiling) {
fed->setOptionFlag(defs::PROFILING, true);
fed->local_id = local_id;
fed->setParent(this);
if (enable_profiling) {
fed->setOptionFlag(defs::PROFILING, true);
}
} else {
fed->reset(info);
}
ActionMessage reg(CMD_REG_FED);
reg.name(name);
if (observer || fed->getOptionFlag(HELICS_FLAG_OBSERVER)) {
setActionFlag(reg, observer_flag);
}
if (fed->getOptionFlag(HELICS_FLAG_REENTRANT)) {
setActionFlag(reg, reentrant_flag);
}
if (fed->indexGroup > 0) {
reg.counter = static_cast<int16_t>(fed->indexGroup);
}
Expand Down Expand Up @@ -3159,9 +3180,15 @@ void CommonCore::processPriorityCommand(ActionMessage&& command)
routeMessage(pngrep);
}
break;
case CMD_REG_FED:
case CMD_REG_FED: {
// this one in the core needs to be the thread-safe version of getFederate
loopFederates.insert(command.name(), no_search, getFederate(command.name()));
auto insertRes =
loopFederates.insert(command.name(), no_search, getFederate(command.name()));
if (!insertRes && checkActionFlag(command, reentrant_flag)) {
auto lfed = loopFederates.find(command.name());
lfed->state = OperatingState::OPERATING;
}

if (global_broker_id_local != parent_broker_id) {
// forward on to Broker
command.source_id = global_broker_id_local;
Expand All @@ -3170,7 +3197,8 @@ void CommonCore::processPriorityCommand(ActionMessage&& command)
// this will get processed when this core is assigned a global hid
delayTransmitQueue.push(std::move(command));
}
break;

} break;
case CMD_BROKER_LOCATION: {
command.setAction(CMD_PROTOCOL);
command.messageID = NEW_BROKER_INFORMATION;
Expand Down Expand Up @@ -4610,7 +4638,7 @@ void CommonCore::processInitRequest(ActionMessage& cmd)
}
cmd.source_id = global_broker_id_local;
transmit(parent_route_id, cmd);
} else if (checkActionFlag(cmd, observer)) {
} else if (checkActionFlag(cmd, observer) || dynamicFederation) {
cmd.source_id = global_broker_id_local;
transmit(parent_route_id, cmd);
}
Expand Down Expand Up @@ -4659,7 +4687,11 @@ void CommonCore::processInitRequest(ActionMessage& cmd)
}
} else if (checkActionFlag(cmd, observer_flag) ||
checkActionFlag(cmd, dynamic_join_flag)) {
routeMessage(cmd);
loopFederates.apply([&cmd](auto& fed) {
if (fed->getState() == FederateStates::CREATED) {
fed->addAction(cmd);
}
});
}
}

Expand Down