New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Keep a table of cluster members #3234
Comments
Also, the protocol by which we do member discovery must also support symmetric encryption with a shared key. This will allow us to work in untrusted networks. |
This will break backward compatibility. We can add a synonym to "cluster" and deprecate "cluster" in future major releases. Will it work? |
@kyukhin 2.x is not released yet, so I don't see anything criminal in breaking backwards compatibility there. Current "cluster" thing just confuses me to be honest. |
Box/info.h defines info_handler interface with a set of virtual functions. It allows to hide Lua from code not depending on this language, and is used in things line index:info(), box.info() to build Lua table with some info. But is does not depend on box/ so move it to src/. Also, this API is needed for the forthcoming SWIM module which is going to be placed into src/. Needed for #3234
At first, global strategy is to get rid of C++. At second, coming SWIM module needs socket API wrapped with Tarantool diagnostics, and some socket utilities which appeared from sio by coincidence. Needed for #3234
SWIM - Scalable Weakly-consistent Infection-style process group Membership protocol. Original SWIM paper is described in cognominal paper by Abhinandan Das, Indranil Gupta, Ashish Motivala. Here only superficial description is presented. SWIM consists of two components: dissemination and failure detection, both works via UDP protocol on each cluster member. Dissemination component discovers cluster members: their statuses, addresses, some additional info. Each member periodically sends part of its known members table to another random member. Failure detection component detects which members are dead - it means they do not respond to requests. Failure detector sends pings to randomly selected members. When a member does not respond to requests during some time, it is marked as dead and deleted from members tables of other cluster participants. This patch implements only dissemination component and some basic things. Part of #3234
Failure detection components allows to find which members are already dead. Public API: swim.cfg({server = <uri>, members = <array of uris>, heartbeat = <seconds>}) Configures the SWIM module. @server - URI of UDP server to which other cluster members will send SWIM messages. It should have the format "ip:port". @members - array of URIs explicitly defined by a user. These members are never deleted from members table until they are removed from the configuration explicitly. SWIM downloads from them their members tables, merges with its own and repeats. @Heartbeat - how often send a part of members table to another member. Note, that it is not how ofter send the whole table, nor how ofter to send the table to all members. It is only one step of the protocol. swim.stop() Stops the SWIM module: shuts down the server, closes socket, destroys queues, frees memory. Note that after it swim.cfg can be called again. swim.info() Show info about each known member in the format: { ["ip:port"] = { status = <alive/dead>, incarnation = <growing unsigned number> } } Closes #3234
Failure detection components allows to find which members are already dead. Public API: swim.cfg({server = <uri>, members = <array of uris>, heartbeat = <seconds>}) Configures the SWIM module. @server - URI of UDP server to which other cluster members will send SWIM messages. It should have the format "ip:port". @members - array of URIs explicitly defined by a user. These members are never deleted from members table until they are removed from the configuration explicitly. SWIM downloads from them their members tables, merges with its own and repeats. @Heartbeat - how often send a part of members table to another member. Note, that it is not how ofter send the whole table, nor how ofter to send the table to all members. It is only one step of the protocol. swim.stop() Stops the SWIM module: shuts down the server, closes socket, destroys queues, frees memory. Note that after it swim.cfg can be called again. swim.info() Show info about each known member in the format: { ["ip:port"] = { status = <alive/dead>, incarnation = <growing unsigned number> } } Part of #3234
SWIM - Scalable Weakly-consistent Infection-style Process Group Membership Protocol. It consists of 2 components: events dissemination and failure detection, and stores in memory a table of known remote hosts - members. Also some SWIM implementations have additional component: anti-entropy - periodical broadcast of a random subset of members table. Each SWIM component is different from others in both message structures and goals, they even could be sent in different messages. But SWIM describes piggybacking of messages: a ping message can piggyback a dissemination's one. SWIM has a main operating cycle during which it randomly chooses members from a member table and sends them events + ping. Answers are processed out of the main cycle asynchronously. Random selection provides even network load about ~1 message to each member regardless of the cluster size. Without randomness a member would get a network load of N messages each protocol step, since all other members will choose the same member on each step where N is the cluster size. Also SWIM describes a kind of fairness: when selecting a next member to ping, the protocol prefers LRU members. In code it would too complicated, so Tarantool's implementation is slightly different, easier. Tarantool splits protocol operation into rounds. At the beginning of a round all members are randomly reordered and linked into a list. At each round step a member is popped from the list head, a message is sent to him, and he waits for the next round. In such implementation all random selection of the original SWIM is executed once per round. The round is 'planned' actually. A list is used instead of an array since new members can be added to its tail without realloc, and dead members can be removed as easy as that. Also Tarantool implements third component - anti-entropy. Why is it needed and even vital? Consider the example: two SWIM nodes, both are alive. Nothing happens, so the events list is empty, only pings are being sent periodically. Then a third node appears. It knows about one of existing nodes. How should it learn about another one? The cluster is stable, no new events, so the only chance is to wait until another server stops and event about it will be broadcasted. Anti-entropy is an extra simple component, it just piggybacks random part of members table with each regular ping. In the example above the new node will learn about the third one via anti-entropy messages of the second one. This commit introduces the first component - anti-entropy. With this component a member can discover other members, but can not detect who has already dead. It is a part of next commit. Part of #3234
Failure detection components allows to find which members are already dead. Part of #3234
Dissemination components broadcasts events about member status updates. Public API: swim.cfg({server = <uri>, members = <array of uris>, heartbeat = <seconds>}) Configures the SWIM module. @server - URI of UDP server to which other cluster members will send SWIM messages. It should have the format "ip:port". @members - array of URIs explicitly defined by a user. These members are never deleted from members table until they are removed from the configuration explicitly. SWIM downloads from them their members tables, merges with its own and repeats. @Heartbeat - how often send a part of members table to another member. Note, that it is not how ofter send the whole table, nor how ofter to send the table to all members. It is only one step of the protocol. swim.stop() Stops the SWIM module: shuts down the server, closes socket, destroys queues, frees memory. Note that after it swim.cfg can be called again. swim.info() Show info about each known member in the format: { ["ip:port"] = { status = <alive/dead>, incarnation = <growing unsigned number> } } Closes #3234
SWIM - Scalable Weakly-consistent Infection-style Process Group Membership Protocol. It consists of 2 components: events dissemination and failure detection, and stores in memory a table of known remote hosts - members. Also some SWIM implementations have additional component: anti-entropy - periodical broadcast of a random subset of members table. Each SWIM component is different from others in both message structures and goals, they even could be sent in different messages. But SWIM describes piggybacking of messages: a ping message can piggyback a dissemination's one. SWIM has a main operating cycle during which it randomly chooses members from a member table and sends them events + ping. Answers are processed out of the main cycle asynchronously. Random selection provides even network load about ~1 message to each member regardless of the cluster size. Without randomness a member would get a network load of N messages each protocol step, since all other members will choose the same member on each step where N is the cluster size. Also SWIM describes a kind of fairness: when selecting a next member to ping, the protocol prefers LRU members. In code it would too complicated, so Tarantool's implementation is slightly different, easier. Tarantool splits protocol operation into rounds. At the beginning of a round all members are randomly reordered and linked into a list. At each round step a member is popped from the list head, a message is sent to him, and he waits for the next round. In such implementation all random selection of the original SWIM is executed once per round. The round is 'planned' actually. A list is used instead of an array since new members can be added to its tail without realloc, and dead members can be removed as easy as that. Also Tarantool implements third component - anti-entropy. Why is it needed and even vital? Consider the example: two SWIM nodes, both are alive. Nothing happens, so the events list is empty, only pings are being sent periodically. Then a third node appears. It knows about one of existing nodes. How should it learn about another one? The cluster is stable, no new events, so the only chance is to wait until another server stops and event about it will be broadcasted. Anti-entropy is an extra simple component, it just piggybacks random part of members table with each regular ping. In the example above the new node will learn about the third one via anti-entropy messages of the second one. This commit introduces the first component - anti-entropy. With this component a member can discover other members, but can not detect who has already dead. It is a part of next commit. Part of #3234
Failure detection components allows to find which members are already dead. Part of #3234
Dissemination components broadcasts events about member status updates. Public API: swim.cfg({server = <uri>, members = <array of uris>, heartbeat = <seconds>}) Configures the SWIM module. @server - URI of UDP server to which other cluster members will send SWIM messages. It should have the format "ip:port". @members - array of URIs explicitly defined by a user. These members are never deleted from members table until they are removed from the configuration explicitly. SWIM downloads from them their members tables, merges with its own and repeats. @Heartbeat - how often send a part of members table to another member. Note, that it is not how ofter send the whole table, nor how ofter to send the table to all members. It is only one step of the protocol. swim.stop() Stops the SWIM module: shuts down the server, closes socket, destroys queues, frees memory. Note that after it swim.cfg can be called again. swim.info() Show info about each known member in the format: { ["ip:port"] = { status = <alive/dead>, incarnation = <growing unsigned number> } } Closes #3234
Now a member dies "gradually". After some failed pings it is declared as suspected. After more failed pings it is finaly dead. New members in a config are declared as suspected because the instance can not be sure whether they are alive or not. Follow up #3234
Dissemination components broadcasts events about member status updates. Public API: swim.cfg({server = <uri>, members = <array of uris>, heartbeat = <seconds>}) Configures the SWIM module. @server - URI of UDP server to which other cluster members will send SWIM messages. It should have the format "ip:port". @members - array of URIs explicitly defined by a user. These members are never deleted from members table until they are removed from the configuration explicitly. SWIM downloads from them their members tables, merges with its own and repeats. @Heartbeat - how often send a part of members table to another member. Note, that it is not how ofter send the whole table, nor how ofter to send the table to all members. It is only one step of the protocol. swim.stop() Stops the SWIM module: shuts down the server, closes socket, destroys queues, frees memory. Note that after it swim.cfg can be called again. swim.info() Show info about each known member in the format: { ["ip:port"] = { status = <alive/dead>, incarnation = <growing unsigned number> } } Closes #3234
Now a member dies "gradually". After some failed pings it is declared as suspected. After more failed pings it is finaly dead. New members in a config are declared as suspected because the instance can not be sure whether they are alive or not. Follow up #3234
During a SWIM round a message is being handed out consisting of at most 3 sections. Parts of the message change rarely, by member attributes update, or by removal of some of them. So it is possible to cache the message and send it during several round steps in a row. Or even do not rebuild it the whole round. Follow up #3234
Box/info.h defines info_handler interface with a set of virtual functions. It allows to hide Lua from code not depending on this language, and is used in things line index:info(), box.info() to build Lua table with some info. But is does not depend on box/ so move it to src/. Also, this API is needed for the forthcoming SWIM module which is going to be placed into src/. Needed for #3234
SWIM - Scalable Weakly-consistent Infection-style Process Group Membership Protocol. It consists of 2 components: events dissemination and failure detection, and stores in memory a table of known remote hosts - members. Also some SWIM implementations have additional component: anti-entropy - periodical broadcast of a random subset of members table. Each SWIM component is different from others in both message structures and goals, they even could be sent in different messages. But SWIM describes piggybacking of messages: a ping message can piggyback a dissemination's one. SWIM has a main operating cycle during which it randomly chooses members from a member table and sends them events + ping. Answers are processed out of the main cycle asynchronously. Random selection provides even network load about ~1 message to each member regardless of the cluster size. Without randomness a member would get a network load of N messages each protocol step, since all other members will choose the same member on each step where N is the cluster size. Also SWIM describes a kind of fairness: when selecting a next member to ping, the protocol prefers LRU members. In code it would too complicated, so Tarantool's implementation is slightly different, easier. Tarantool splits protocol operation into rounds. At the beginning of a round all members are randomly reordered and linked into a list. At each round step a member is popped from the list head, a message is sent to him, and he waits for the next round. In such implementation all random selection of the original SWIM is executed once per round. The round is 'planned' actually. A list is used instead of an array since new members can be added to its tail without realloc, and dead members can be removed as easy as that. Also Tarantool implements third component - anti-entropy. Why is it needed and even vital? Consider the example: two SWIM nodes, both are alive. Nothing happens, so the events list is empty, only pings are being sent periodically. Then a third node appears. It knows about one of existing nodes. How should it learn about another one? The cluster is stable, no new events, so the only chance is to wait until another server stops and event about it will be broadcasted. Anti-entropy is an extra simple component, it just piggybacks random part of members table with each regular ping. In the example above the new node will learn about the third one via anti-entropy messages of the second one. This commit introduces the first component - anti-entropy. With this component a member can discover other members, but can not detect who has already dead. It is a part of next commit. Part of #3234
Failure detection components allows to find which members are already dead. Part of #3234
Similar methods validate their arguments: add_member, remove_member. Validate here as well for consistency. Part of #3234
SWIM as a library can be useful not only for server internals, but for users as well. This commit exposes Lua bindings to SWIM C API. Here only basic bindings are introduced to create, delete, quit, check a SWIM instance. With sanity tests. Part of #3234
Expose methods to add, remove, probe members by uri, uuid. Expose broadcast method to probe multiple members by port. Part of #3234
Expose API to search members by UUID, to read their attributes, to set payload. Part of #3234
Expose iterators API to be able to iterate over a member table in a 'for' loop like it would just be a Lua table. Part of #3234
Sometimes, especially in tests, it is useful to make something like this: s:add_member({uuid = member:uuid(), uri = member:uri()}) But member:uuid() is cdata struct tt_uuid. This commit allows that. Part of #3234
Users of Lua SWIM module likely will use Lua objects as a payload. Lua objects are serialized into MessagePack automatically, and deserialized back on other instances. But deserialization of 1.2Kb payload on each member:payload() invocation is quite heavy operation. This commit caches decoded payloads to return them again until change. A microbenchmark showed, that cached payload is returned ~100 times faster, than it is decoded each time. Even though a tested payload was quite small and simple: s:set_payload({a = 100, b = 200}) Even this payload is returned 100 times faster, and does not affect GC. Part of #3234
Each time a member was returned from a SWIM instance object, it was wrapped by a table with a special metatable, cached payload. But next the same lookup returned a new table. It - created garbage as a new member wrapper; - lost cached decoded payload. This commit caches in a private table all wrapped members and returns an existing wrapper on a next lookup. A microbenchmark showed, that cached result retrieval is 10 times faster, than each time create a new table. Cache table keeps week references - it means, that when a member object looses all its references in a user's application, it is automatically dropped from the table. Part of #3234
At this moment swim_scheduler_on_output() is a relatively simple function. It takes a task, builds its meta and flushes a result into the network. But soon SWIM will be able to encrypt messages. It means, that in addition to regular preprocessing like building meta headers a new phase will appear - encryption. What is more - conditional encryption, because a user may want to do not encrypt messages. All the same is about swim_scheduler_on_input() - if a SWIM instance uses encryption, it should decrypt incoming messages before forwarding them into the SWIM core logic. The chosen strategy - lets reuse on_output/on_input virtuality and create two version of on_input/on_output functions: swim_on_plain_input() | swim_on_encrypted_input() swim_on_plain_output() | swim_on_encrypted_output() One of these pairs is chosen depending on if the instance uses encryption. To make these 4 functions as simple and short as possible this commit creates two sets of functions, doing all the logic except encryption: swim_begin_send() swim_do_send() swim_complete_send() swim_begin_recv() swim_do_recv() swim_complete_recv() These functions will be used by on_input/on_output functions with different arguments. Part of #3234
SWIM is going to be used in and between datacenters, which means, that its packets will go through public networks. Therefore raw SWIM packets are vulnerable to attacks. An attacker can do any and all of the following things: 1) Extract secret information from member payloads, like credentials to Tarantool binary ports; 2) Change UUIDs and addresses in the packets and break a topology; 3) Catch the packets and pretend being a Tarantool instance, which could lead to undefined behaviour depending on an application logic. SWIM packets need a protection layer. This commit introduces it. SWIM transport level allows to choose an encryption algorithm with a private key to encrypt each packet with that key. Besides, each packet is encrypted using a random public key prepended to the packet. SWIM now provides a public API to choose an encryption algorithm and a private key. Part of #3234
Empty string as a no-payload-flag was not a good idea, because then a user can't write something like: if not member:payload() then ... Follow up #3234
They are caused by * too slow network, when SWIM tests are run under high load; * UDP packets late arrival or drop. Follow up #3234
First of all, the problem in a nutshell was that ev_timer with non-zero 'repeat' field in fact is a ev_periodic. It is restarted *automatically*, even if a user does not write ev_timer_again() nor ev_timer_start(). This led to a situation, that a round message send is scheduled, and next round step timer alarm happens before the message is actually sent. It, in turn, led to an assertion on attempt to schedule a task twice. This patch fixes the swim test harness to behave like ev_timer with 'repeat' > 0, and on first idle round step stops the timer - it will be restarted once the currently hanging task will be finally sent. Follow up #3234
One another problem discovered with UDP broadcast test is that it can affect other tests, even after termination. Doing swim:broadcast() on one test a programmer can't be sure, who will listen it, answer, and break the test scenario. This commit reduces probability of such a problem by * allowance to set a codec before swim:cfg(). It allows to protect SWIM nodes of different tests from each other - they will not understand messages from other tests. By the way, the same problem can appear in real applications too; * do not binding again a URI passed by test-run into the test and closed here. If a test closes a URI given to it, it can't be sure, that next bind() will be successful - test-run could already reuse it. Follow up #3234
One another problem discovered with UDP broadcast test is that it can affect other tests, even after termination. Doing swim:broadcast() on one test a programmer can't be sure, who will listen it, answer, and break the test scenario. This commit reduces probability of such a problem by * allowance to set a codec before swim:cfg(). It allows to protect SWIM nodes of different tests from each other - they will not understand messages from other tests. By the way, the same problem can appear in real applications too; * do not binding again a URI passed by test-run into the test and closed here. If a test closes a URI given to it, it can't be sure, that next bind() will be successful - test-run could already reuse it. Follow up #3234
First of all, the problem in a nutshell was that ev_timer with non-zero 'repeat' field in fact is a ev_periodic. It is restarted *automatically*, even if a user does not write ev_timer_again() nor ev_timer_start(). This led to a situation, that a round message send is scheduled, and next round step timer alarm happens before the message is actually sent. It, in turn, led to an assertion on attempt to schedule a task twice. This patch fixes the swim test harness to behave like ev_timer with 'repeat' > 0, and on first idle round step stops the timer - it will be restarted once the currently hanging task will be finally sent. Follow up #3234
One another problem discovered with UDP broadcast test is that it can affect other tests, even after termination. Doing swim:broadcast() on one test a programmer can't be sure, who will listen it, answer, and break the test scenario. This commit reduces probability of such a problem by * allowance to set a codec before swim:cfg(). It allows to protect SWIM nodes of different tests from each other - they will not understand messages from other tests. By the way, the same problem can appear in real applications too; * do not binding again a URI passed by test-run into the test and closed here. If a test closes a URI given to it, it can't be sure, that next bind() will be successful - test-run could already reuse it. Follow up #3234
They are caused by * too slow network, when SWIM tests are run under high load; * UDP packets late arrival or drop. Follow up #3234
First of all, the problem in a nutshell was that ev_timer with non-zero 'repeat' field in fact is a ev_periodic. It is restarted *automatically*, even if a user does not write ev_timer_again() nor ev_timer_start(). This led to a situation, that a round message send is scheduled, and next round step timer alarm happens before the message is actually sent. It, in turn, led to an assertion on attempt to schedule a task twice. This patch fixes the swim test harness to behave like ev_timer with 'repeat' > 0, and on first idle round step stops the timer - it will be restarted once the currently hanging task will be finally sent. Follow up #3234
One another problem discovered with UDP broadcast test is that it can affect other tests, even after termination. Doing swim:broadcast() on one test a programmer can't be sure, who will listen it, answer, and break the test scenario. This commit reduces probability of such a problem by * allowance to set a codec before swim:cfg(). It allows to protect SWIM nodes of different tests from each other - they will not understand messages from other tests. By the way, the same problem can appear in real applications too; * do not binding again a URI passed by test-run into the test and closed here. If a test closes a URI given to it, it can't be sure, that next bind() will be successful - test-run could already reuse it. Follow up #3234
FFI can't survive yields. A yield in ffi.C.func() leads to a crash; yield in ffi.gc is not documented as allowed. Yield in any GC function leads to garbage collector stuck until the yield is finished. This patch makes SWIM GC callback non-yielding. Now yielding swim_delete() is called in a separate fiber created in GC callback, but started at the end of event loop only. Follow up #3234
FFI can't survive yields. A yield in ffi.C.func() leads to a crash; yield in ffi.gc is not documented as allowed. Yield in any GC function leads to garbage collector stuck until the yield is finished. This patch makes SWIM GC callback non-yielding. Now yielding swim_delete() is called in a separate fiber created in GC callback, but started at the end of event loop only. Follow up #3234
In order to develop high-level cluster management functionality (including UI, routers, sharding, blue-green app upgrades, etc) I propose to add a low-level mechanism of discovery: a non-replicated, persistent table with cluster members. This table may get populated by a discovery protocol (like SWIM or other infection-style protocols) and kept updated to reflect node state.
This functionality will allow higher level code to achieve:
--cluster-name=SOME_CLUSTER
will automatically discover and join )This probably means that we need to rename our current "cluster" settings by something like "replicaset" to better reflect reality.
Many existing distributed databases have such thing, and we probably should too, especially because we allow to build distributed apps.
The text was updated successfully, but these errors were encountered: