Skip to content

Commit

Permalink
swim: allow to set codec before cfg
Browse files Browse the repository at this point in the history
One another problem discovered with UDP broadcast test is that
it can affect other tests, even after termination. Doing
swim:broadcast() on one test a programmer can't be sure, who will
listen it, answer, and break the test scenario.

This commit reduces probability of such a problem by

    * allowance to set a codec before swim:cfg(). It allows to
      protect SWIM nodes of different tests from each other -
      they will not understand messages from other tests. By the
      way, the same problem can appear in real applications too;

    * do not binding again a URI passed by test-run into the
      test and closed here. If a test closes a URI given to it,
      it can't be sure, that next bind() will be successful -
      test-run could already reuse it.

Follow up #3234
  • Loading branch information
Gerold103 committed May 22, 2019
1 parent 7a5ac3d commit c0a7556
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 19 deletions.
1 change: 1 addition & 0 deletions src/lua/swim.lua
Expand Up @@ -804,6 +804,7 @@ local swim_not_configured_mt = {
__index = {
delete = swim_delete,
is_configured = swim_is_configured,
set_codec = swim_set_codec,
},
__serialize = swim_serialize
}
Expand Down
22 changes: 22 additions & 0 deletions test/swim/box.lua
Expand Up @@ -7,6 +7,28 @@ listen_port = require('uri').parse(listen_uri).service

box.cfg{}

--
-- SWIM instances, using broadcast, should protect themselves
-- with encryption. Otherwise they can accidentally discover
-- SWIM instances from other tests.
--
enc_key = box.info.uuid
enc_algo = 'aes128'

--
-- Wrap swim.new with a codec to prevent test workers affecting
-- each other.
--
local original_new = swim.new
swim.new = function(...)
local s, err = original_new(...)
if s == nil then
return s, err
end
assert(s:set_codec({algo = enc_algo, key = enc_key, key_size = 16}))
return s
end

function uuid(i)
local min_valid_prefix = '00000000-0000-1000-8000-'
if i < 10 then
Expand Down
26 changes: 15 additions & 11 deletions test/swim/swim.result
Expand Up @@ -333,7 +333,7 @@ s1:delete()
s1 = swim.new({uuid = uuid(1), uri = uri()})
---
...
s2 = swim.new({uuid = uuid(2), uri = listen_uri})
s2 = swim.new({uuid = uuid(2), uri = uri()})
---
...
s1.probe_member()
Expand Down Expand Up @@ -361,7 +361,7 @@ s1:size()
---
- 1
...
s1:probe_member(listen_uri)
s1:probe_member(s2:self():uri())
---
- true
...
Expand Down Expand Up @@ -698,10 +698,10 @@ self:is_dropped()
s1 = swim.new({uuid = uuid(1), uri = uri(), heartbeat_rate = 0.01})
---
...
s2 = swim.new({uuid = uuid(2), uri = listen_port, heartbeat_rate = 0.01})
s2 = swim.new({uuid = uuid(2), uri = uri(), heartbeat_rate = 0.01})
---
...
s1:add_member({uuid = uuid(2), uri = listen_port})
s1:add_member({uuid = uuid(2), uri = s2:self():uri()})
---
- true
...
Expand Down Expand Up @@ -828,7 +828,7 @@ s:delete()
--
-- Payload caching.
--
s1 = swim.new({uuid = uuid(1), uri = uri(listen_port), heartbeat_rate = 0.01})
s1 = swim.new({uuid = uuid(1), uri = uri(), heartbeat_rate = 0.01})
---
...
s2 = swim.new({uuid = uuid(2), uri = uri(), heartbeat_rate = 0.01})
Expand Down Expand Up @@ -902,7 +902,7 @@ s1_view:payload()
...
s1_view:incarnation()
---
- 2
- 3
...
s1:cfg({heartbeat_rate = 0.01})
---
Expand All @@ -928,7 +928,7 @@ p
...
s1_view:incarnation()
---
- 2
- 3
...
s1:delete()
---
Expand Down Expand Up @@ -1099,7 +1099,7 @@ s1:set_codec(cfg)
---
- true
...
-- S2 does not use encryption and can't decode the ping.
-- S2 uses different encryption and can't decode the ping.
s1:probe_member(s2:self():uri())
---
- true
Expand Down Expand Up @@ -1199,12 +1199,16 @@ s1:probe_member(s2:self():uri())
---
- true
...
while s1:size() ~= 2 do fiber.sleep(0.01) end
while s1:member_by_uuid(s2:self():uuid()) == nil do fiber.sleep(0.01) end
---
...
s2:size()
s2:member_by_uuid(s1:self():uuid())
---
- 2
- uri: 127.0.0.1:<port>
status: alive
incarnation: 1
uuid: 00000000-0000-1000-8000-000000000001
payload_size: 0
...
s1:delete()
---
Expand Down
16 changes: 8 additions & 8 deletions test/swim/swim.test.lua
Expand Up @@ -114,15 +114,15 @@ s1:remove_member(uuid(1))
s1:delete()

s1 = swim.new({uuid = uuid(1), uri = uri()})
s2 = swim.new({uuid = uuid(2), uri = listen_uri})
s2 = swim.new({uuid = uuid(2), uri = uri()})
s1.probe_member()
s1:probe_member()
s1:probe_member(true)
-- Not existing URI is ok - nothing happens.
s1:probe_member('127.0.0.1:1')
fiber.yield()
s1:size()
s1:probe_member(listen_uri)
s1:probe_member(s2:self():uri())
while s1:size() ~= 2 do fiber.sleep(0.01) end
s2:size()

Expand Down Expand Up @@ -233,8 +233,8 @@ self:is_dropped()
-- Check payload dissemination.
--
s1 = swim.new({uuid = uuid(1), uri = uri(), heartbeat_rate = 0.01})
s2 = swim.new({uuid = uuid(2), uri = listen_port, heartbeat_rate = 0.01})
s1:add_member({uuid = uuid(2), uri = listen_port})
s2 = swim.new({uuid = uuid(2), uri = uri(), heartbeat_rate = 0.01})
s1:add_member({uuid = uuid(2), uri = s2:self():uri()})
while s2:size() ~= 2 do fiber.sleep(0.01) end
s1_view = s2:member_by_uuid(uuid(1))
s1_view:payload()
Expand Down Expand Up @@ -269,7 +269,7 @@ s:delete()
--
-- Payload caching.
--
s1 = swim.new({uuid = uuid(1), uri = uri(listen_port), heartbeat_rate = 0.01})
s1 = swim.new({uuid = uuid(1), uri = uri(), heartbeat_rate = 0.01})
s2 = swim.new({uuid = uuid(2), uri = uri(), heartbeat_rate = 0.01})
s1_self = s1:self()
_ = s1:add_member({uuid = s2:self():uuid(), uri = s2:self():uri()})
Expand Down Expand Up @@ -371,7 +371,7 @@ cfg.key = '1234567812345678'
cfg.key_size = 16
s1:set_codec(cfg)

-- S2 does not use encryption and can't decode the ping.
-- S2 uses different encryption and can't decode the ping.
s1:probe_member(s2:self():uri())
fiber.sleep(0.01)
s1:size()
Expand Down Expand Up @@ -407,8 +407,8 @@ s2:size()
s1:set_codec({algo = 'none'})
s2:set_codec({algo = 'none'})
s1:probe_member(s2:self():uri())
while s1:size() ~= 2 do fiber.sleep(0.01) end
s2:size()
while s1:member_by_uuid(s2:self():uuid()) == nil do fiber.sleep(0.01) end
s2:member_by_uuid(s1:self():uuid())

s1:delete()
s2:delete()
Expand Down

0 comments on commit c0a7556

Please sign in to comment.