Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 32 additions & 8 deletions connectd/connectd.c
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ struct connecting {

struct daemon *daemon;

struct io_conn *conn;

/* The ID of the peer (not necessarily unique, in transit!) */
struct node_id id;

Expand Down Expand Up @@ -276,16 +278,26 @@ static void connected_to_peer(struct daemon *daemon,
struct io_conn *conn,
const struct node_id *id)
{
/* Don't call destroy_io_conn */
io_set_finish(conn, NULL, NULL);
struct connecting *outgoing;

/* We allocate 'conn' as a child of 'connect': we don't want to free
* it just yet though. tal_steal() it onto the permanent 'daemon'
* struct. */
tal_steal(daemon, conn);

/* Now free the 'connecting' struct. */
tal_free(find_connecting(daemon, id));
/* This is either us (if conn is an outgoing connection), or
* NULL or a separate attempt (if we're an incoming): in
* that case, kill the outgoing in favor of our successful
* incoming connection. */
outgoing = find_connecting(daemon, id);
if (outgoing) {
/* Don't call destroy_io_conn, since we're done. */
if (outgoing->conn)
io_set_finish(outgoing->conn, NULL, NULL);

/* Now free the 'connecting' struct. */
tal_free(outgoing);
}
}

/*~ Every per-peer daemon needs a connection to the gossip daemon; this allows
Expand Down Expand Up @@ -669,7 +681,7 @@ void add_errors_to_error_list(struct connecting *connect, const char *error)
"%s. ", error);
}

/*~ This is the destructor for the (unsuccessful) connection. We accumulate
/*~ This is the destructor for the (unsuccessful) outgoing connection. We accumulate
* the errors which occurred, so we can report to lightningd properly in case
* they all fail, and try the next address.
*
Expand Down Expand Up @@ -782,6 +794,9 @@ static void try_connect_one_addr(struct connecting *connect)
bool use_proxy = connect->daemon->use_proxy_always;
const struct wireaddr_internal *addr = &connect->addrs[connect->addrnum];

/* In case we fail without a connection, make destroy_io_conn happy */
connect->conn = NULL;

/* Out of addresses? */
if (connect->addrnum == tal_count(connect->addrs)) {
connect_failed(connect->daemon, &connect->id,
Expand Down Expand Up @@ -860,9 +875,9 @@ static void try_connect_one_addr(struct connecting *connect)
/* This creates the new connection using our fd, with the initialization
* function one of the above. */
if (use_proxy)
notleak(io_new_conn(connect, fd, conn_proxy_init, connect));
connect->conn = io_new_conn(connect, fd, conn_proxy_init, connect);
else
notleak(io_new_conn(connect, fd, conn_init, connect));
connect->conn = io_new_conn(connect, fd, conn_init, connect);
}

/*~ connectd is responsible for incoming connections, but it's the process of
Expand Down Expand Up @@ -1457,8 +1472,17 @@ static void try_connect_peer(struct daemon *daemon,
return;

/* If we're trying to connect it right now, that's OK. */
if (find_connecting(daemon, id))
if ((connect = find_connecting(daemon, id))) {
/* If we've been passed in new connection details
* for this connection, update our addrhint + add
* to addresses to check */
if (addrhint) {
connect->addrhint = tal_steal(connect, addrhint);
tal_arr_expand(&connect->addrs, *addrhint);
}

return;
}

/* Start an array of addresses to try. */
addrs = tal_arr(tmpctx, struct wireaddr_internal, 0);
Expand Down
33 changes: 33 additions & 0 deletions tests/plugins/slow_start.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!/usr/bin/env python3
"""This plugin is used to check that updated connection hints work properly.

"""
from pyln.client import Plugin

import socket
import time

plugin = Plugin()


@plugin.async_method('waitconn')
def wait_connection(request, plugin):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('localhost', 0))
sock.listen(1)
print("listening for connections on port {}".format(sock.getsockname()[1]))

# We are a one and done socket connection!
conn, client_addr = sock.accept()
try:
print("connection from {}".format(client_addr))
time.sleep(3)

finally:
conn.close()

print("closing socket")
sock.close()


plugin.run()
26 changes: 25 additions & 1 deletion tests/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,30 @@ def test_reconnect_channel_peers(node_factory, executor):
fut3.result(10)


def test_connection_moved(node_factory, executor):
slow_start = os.path.join(os.getcwd(), 'tests/plugins/slow_start.py')
options = {'may_reconnect': True, 'plugin': slow_start}
l1, l2 = node_factory.get_nodes(2, opts=options)

# Set up the plugin to wait for a connection
executor.submit(l1.rpc.waitconn)
log = l1.daemon.wait_for_log('listening for connections')
match = re.search(r'on port (\d*)', log)
assert match and len(match.groups()) == 1
hang_port = match.groups()[0]

# Attempt connection
fut_hang = executor.submit(l1.rpc.connect, l2.info['id'],
'localhost', hang_port)
l1.daemon.wait_for_log('connection from')

# Provide correct connection details
l1.rpc.connect(l2.info['id'], 'localhost', l2.port)

# If we failed to update the connection, this call will error
fut_hang.result(TIMEOUT)


def test_balance(node_factory):
l1, l2 = node_factory.line_graph(2, fundchannel=True)
p1 = only_one(l1.rpc.getpeer(peer_id=l2.info['id'], level='info')['channels'])
Expand Down Expand Up @@ -429,7 +453,7 @@ def test_connect_stresstest(node_factory, executor):
# We fire off random connect/disconnect commands.
actions = [
(l2.rpc.connect, l1.info['id'], 'localhost', l1.port),
(l3.rpc.connect, l1.info['id'], 'localhost', l3.port),
(l3.rpc.connect, l1.info['id'], 'localhost', l1.port),
(l1.rpc.connect, l2.info['id'], 'localhost', l2.port),
(l1.rpc.connect, l3.info['id'], 'localhost', l3.port),
(l1.rpc.disconnect, l2.info['id'])
Expand Down