diff --git a/connectd/connectd.c b/connectd/connectd.c index 986ea2511050..5f889ee9436f 100644 --- a/connectd/connectd.c +++ b/connectd/connectd.c @@ -173,6 +173,8 @@ struct connecting { struct daemon *daemon; + struct io_conn *conn; + /* The ID of the peer (not necessarily unique, in transit!) */ struct node_id id; @@ -276,16 +278,26 @@ static void connected_to_peer(struct daemon *daemon, struct io_conn *conn, const struct node_id *id) { - /* Don't call destroy_io_conn */ - io_set_finish(conn, NULL, NULL); + struct connecting *outgoing; /* We allocate 'conn' as a child of 'connect': we don't want to free * it just yet though. tal_steal() it onto the permanent 'daemon' * struct. */ tal_steal(daemon, conn); - /* Now free the 'connecting' struct. */ - tal_free(find_connecting(daemon, id)); + /* This is either us (if conn is an outgoing connection), or + * NULL or a separate attempt (if we're an incoming): in + * that case, kill the outgoing in favor of our successful + * incoming connection. */ + outgoing = find_connecting(daemon, id); + if (outgoing) { + /* Don't call destroy_io_conn, since we're done. */ + if (outgoing->conn) + io_set_finish(outgoing->conn, NULL, NULL); + + /* Now free the 'connecting' struct. */ + tal_free(outgoing); + } } /*~ Every per-peer daemon needs a connection to the gossip daemon; this allows @@ -669,7 +681,7 @@ void add_errors_to_error_list(struct connecting *connect, const char *error) "%s. ", error); } -/*~ This is the destructor for the (unsuccessful) connection. We accumulate +/*~ This is the destructor for the (unsuccessful) outgoing connection. We accumulate * the errors which occurred, so we can report to lightningd properly in case * they all fail, and try the next address. * @@ -782,6 +794,9 @@ static void try_connect_one_addr(struct connecting *connect) bool use_proxy = connect->daemon->use_proxy_always; const struct wireaddr_internal *addr = &connect->addrs[connect->addrnum]; + /* In case we fail without a connection, make destroy_io_conn happy */ + connect->conn = NULL; + /* Out of addresses? */ if (connect->addrnum == tal_count(connect->addrs)) { connect_failed(connect->daemon, &connect->id, @@ -860,9 +875,9 @@ static void try_connect_one_addr(struct connecting *connect) /* This creates the new connection using our fd, with the initialization * function one of the above. */ if (use_proxy) - notleak(io_new_conn(connect, fd, conn_proxy_init, connect)); + connect->conn = io_new_conn(connect, fd, conn_proxy_init, connect); else - notleak(io_new_conn(connect, fd, conn_init, connect)); + connect->conn = io_new_conn(connect, fd, conn_init, connect); } /*~ connectd is responsible for incoming connections, but it's the process of @@ -1457,8 +1472,17 @@ static void try_connect_peer(struct daemon *daemon, return; /* If we're trying to connect it right now, that's OK. */ - if (find_connecting(daemon, id)) + if ((connect = find_connecting(daemon, id))) { + /* If we've been passed in new connection details + * for this connection, update our addrhint + add + * to addresses to check */ + if (addrhint) { + connect->addrhint = tal_steal(connect, addrhint); + tal_arr_expand(&connect->addrs, *addrhint); + } + return; + } /* Start an array of addresses to try. */ addrs = tal_arr(tmpctx, struct wireaddr_internal, 0); diff --git a/tests/plugins/slow_start.py b/tests/plugins/slow_start.py new file mode 100755 index 000000000000..f40addbdcd8b --- /dev/null +++ b/tests/plugins/slow_start.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python3 +"""This plugin is used to check that updated connection hints work properly. + +""" +from pyln.client import Plugin + +import socket +import time + +plugin = Plugin() + + +@plugin.async_method('waitconn') +def wait_connection(request, plugin): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.bind(('localhost', 0)) + sock.listen(1) + print("listening for connections on port {}".format(sock.getsockname()[1])) + + # We are a one and done socket connection! + conn, client_addr = sock.accept() + try: + print("connection from {}".format(client_addr)) + time.sleep(3) + + finally: + conn.close() + + print("closing socket") + sock.close() + + +plugin.run() diff --git a/tests/test_connection.py b/tests/test_connection.py index 9e733a7560dd..09f3d2c7768a 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -111,6 +111,30 @@ def test_reconnect_channel_peers(node_factory, executor): fut3.result(10) +def test_connection_moved(node_factory, executor): + slow_start = os.path.join(os.getcwd(), 'tests/plugins/slow_start.py') + options = {'may_reconnect': True, 'plugin': slow_start} + l1, l2 = node_factory.get_nodes(2, opts=options) + + # Set up the plugin to wait for a connection + executor.submit(l1.rpc.waitconn) + log = l1.daemon.wait_for_log('listening for connections') + match = re.search(r'on port (\d*)', log) + assert match and len(match.groups()) == 1 + hang_port = match.groups()[0] + + # Attempt connection + fut_hang = executor.submit(l1.rpc.connect, l2.info['id'], + 'localhost', hang_port) + l1.daemon.wait_for_log('connection from') + + # Provide correct connection details + l1.rpc.connect(l2.info['id'], 'localhost', l2.port) + + # If we failed to update the connection, this call will error + fut_hang.result(TIMEOUT) + + def test_balance(node_factory): l1, l2 = node_factory.line_graph(2, fundchannel=True) p1 = only_one(l1.rpc.getpeer(peer_id=l2.info['id'], level='info')['channels']) @@ -429,7 +453,7 @@ def test_connect_stresstest(node_factory, executor): # We fire off random connect/disconnect commands. actions = [ (l2.rpc.connect, l1.info['id'], 'localhost', l1.port), - (l3.rpc.connect, l1.info['id'], 'localhost', l3.port), + (l3.rpc.connect, l1.info['id'], 'localhost', l1.port), (l1.rpc.connect, l2.info['id'], 'localhost', l2.port), (l1.rpc.connect, l3.info['id'], 'localhost', l3.port), (l1.rpc.disconnect, l2.info['id'])