Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ccan/README
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
CCAN imported from http://ccodearchive.net.

CCAN version: init-2595-ge43c61c4
CCAN version: init-2597-gc4760b30
6 changes: 4 additions & 2 deletions ccan/ccan/io/io.c
Original file line number Diff line number Diff line change
Expand Up @@ -583,8 +583,10 @@ struct io_plan *io_sock_shutdown(struct io_conn *conn)
if (shutdown(io_conn_fd(conn), SHUT_WR) != 0)
return io_close(conn);

/* And leave unset .*/
return &conn->plan[IO_IN];
/* We need to make sure we don't try to write again, so
* "wait" on something which will never be woken: otherwise
* we will try to write, fail, and immediately close. */
return io_wait_dir(conn, io_sock_shutdown, IO_OUT, io_never, NULL);
}

bool io_flush_sync(struct io_conn *conn)
Expand Down
94 changes: 56 additions & 38 deletions connectd/connectd.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,31 +100,11 @@ static struct connecting *find_connecting(struct daemon *daemon,
return connecting_htable_get(daemon->connecting, id);
}

/*~ When we free a peer, we remove it from the daemon's hashtable.
* We also call this manually if we want to elegantly drain peer's
* queues. */
void destroy_peer(struct peer *peer)
/*~ When we free a peer, we remove it from the daemon's hashtable. */
static void destroy_peer(struct peer *peer)
{
assert(!peer->draining);

if (!peer_htable_del(peer->daemon->peers, peer))
abort();

/* Tell gossipd to stop asking this peer gossip queries */
daemon_conn_send(peer->daemon->gossipd,
take(towire_gossipd_peer_gone(NULL, &peer->id)));

/* Tell lightningd it's really disconnected */
daemon_conn_send(peer->daemon->master,
take(towire_connectd_peer_disconnect_done(NULL,
&peer->id,
peer->counter)));
/* This makes multiplex.c routines not feed us more, but
* *also* means that if we're freed directly, the ->to_peer
* destructor won't call drain_peer(). */
peer->draining = true;

schedule_reconnect_if_important(peer->daemon, &peer->id);
}

/*~ This is where we create a new peer. */
Expand All @@ -146,7 +126,7 @@ static struct peer *new_peer(struct daemon *daemon,
peer->peer_in = NULL;
peer->sent_to_peer = NULL;
peer->urgent = false;
peer->draining = false;
peer->draining_state = NOT_DRAINING;
peer->peer_in_lastmsg = -1;
peer->peer_outq = msg_queue_new(peer, false);
peer->last_recv_time = time_now();
Expand Down Expand Up @@ -277,6 +257,22 @@ static void reset_reconnect_timer(struct peer *peer)
imp->reconnect_secs = INITIAL_WAIT_SECONDS;
}

void send_disconnected(struct daemon *daemon,
const struct node_id *id,
u64 connectd_counter)
{
/* lightningd: it's gone */
daemon_conn_send(daemon->master,
take(towire_connectd_peer_disconnected(NULL, id, connectd_counter)));

/* Tell gossipd to stop asking this peer gossip queries */
daemon_conn_send(daemon->gossipd,
take(towire_gossipd_peer_gone(NULL, id)));

/* Start reconnection process if we care */
schedule_reconnect_if_important(daemon, id);
}

/*~ Note the lack of static: this is called by peer_exchange_initmsg.c once the
* INIT messages are exchanged, and also by the retry code above. */
struct io_plan *peer_connected(struct io_conn *conn,
Expand All @@ -290,17 +286,20 @@ struct io_plan *peer_connected(struct io_conn *conn,
bool incoming)
{
u8 *msg;
struct peer *peer;
struct peer *peer, *oldpeer;
int unsup;
size_t depender, missing;
int subd_fd;
bool option_gossip_queries;
struct connecting *connect;
u64 prev_connectd_counter;

/* We remove any previous connection immediately, on the assumption it's dead */
peer = peer_htable_get(daemon->peers, id);
if (peer)
tal_free(peer);
oldpeer = peer_htable_get(daemon->peers, id);
if (oldpeer) {
prev_connectd_counter = oldpeer->counter;
destroy_peer_immediately(oldpeer);
}

/* We promised we'd take it by marking it TAKEN above; prepare to free it. */
if (taken(their_features))
Expand All @@ -318,6 +317,9 @@ struct io_plan *peer_connected(struct io_conn *conn,
unsup = features_unsupported(daemon->our_features, their_features,
INIT_FEATURE);
if (unsup != -1) {
/* We were going to send a reconnect message, but not now! */
if (oldpeer)
send_disconnected(daemon, id, prev_connectd_counter);
status_peer_unusual(id, "Unsupported feature %u", unsup);
msg = towire_warningfmt(NULL, NULL, "Unsupported feature %u",
unsup);
Expand All @@ -326,6 +328,9 @@ struct io_plan *peer_connected(struct io_conn *conn,
}

if (!feature_check_depends(their_features, &depender, &missing)) {
/* We were going to send a reconnect message, but not now! */
if (oldpeer)
send_disconnected(daemon, id, prev_connectd_counter);
status_peer_unusual(id, "Feature %zu requires feature %zu",
depender, missing);
msg = towire_warningfmt(NULL, NULL,
Expand Down Expand Up @@ -362,23 +367,37 @@ struct io_plan *peer_connected(struct io_conn *conn,
peer = new_peer(daemon, id, cs, their_features, is_websocket, conn,
&subd_fd);
/* Only takes over conn if it succeeds. */
if (!peer)
if (!peer) {
/* We were going to send a reconnect message, but not now! */
if (oldpeer)
send_disconnected(daemon, id, prev_connectd_counter);
return io_close(conn);
}

/* Tell gossipd it can ask query this new peer for gossip */
option_gossip_queries = feature_negotiated(daemon->our_features,
their_features,
OPT_GOSSIP_QUERIES);
msg = towire_gossipd_new_peer(NULL, id, option_gossip_queries);
daemon_conn_send(daemon->gossipd, take(msg));

/* Get ready for streaming gossip from the store */
setup_peer_gossip_store(peer, daemon->our_features, their_features);

/* Create message to tell master peer has connected. */
msg = towire_connectd_peer_connected(NULL, id, peer->counter,
addr, remote_addr,
incoming, their_features);
/* Create message to tell master peer has connected/reconnected. */
if (oldpeer) {
msg = towire_connectd_peer_reconnected(NULL, id,
prev_connectd_counter,
peer->counter,
addr, remote_addr,
incoming, their_features);
} else {
/* Tell gossipd about new peer */
msg = towire_gossipd_new_peer(NULL, id, option_gossip_queries);
daemon_conn_send(daemon->gossipd, take(msg));

msg = towire_connectd_peer_connected(NULL, id, peer->counter,
addr, remote_addr,
incoming, their_features);
}

/*~ daemon_conn is a message queue for inter-daemon communication: we
* queue up the `connect_peer_connected` message to tell lightningd
Expand Down Expand Up @@ -1923,9 +1942,7 @@ static void peer_disconnect(struct daemon *daemon, const u8 *msg)
if (peer->counter != counter)
return;

/* We make sure any final messages from the subds are sent! */
status_peer_debug(&id, "disconnect_peer");
drain_peer(peer);
disconnect_peer(peer);
}

/* lightningd tells us a peer is no longer "important". */
Expand Down Expand Up @@ -2371,7 +2388,8 @@ static struct io_plan *recv_req(struct io_conn *conn,
case WIRE_CONNECTD_PING_DONE:
case WIRE_CONNECTD_GOT_ONIONMSG_TO_US:
case WIRE_CONNECTD_CUSTOMMSG_IN:
case WIRE_CONNECTD_PEER_DISCONNECT_DONE:
case WIRE_CONNECTD_PEER_DISCONNECTED:
case WIRE_CONNECTD_PEER_RECONNECTED:
case WIRE_CONNECTD_START_SHUTDOWN_REPLY:
case WIRE_CONNECTD_INJECT_ONIONMSG_REPLY:
case WIRE_CONNECTD_ONIONMSG_FORWARD_FAIL:
Expand Down
26 changes: 20 additions & 6 deletions connectd/connectd.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ enum pong_expect_type {
PONG_EXPECTED_PROBING = 2,
};

enum draining_state {
/* Normal state */
NOT_DRAINING,
/* First, reading remaining messages from subds */
READING_FROM_SUBDS,
/* Finally, writing any queued messages to peer */
WRITING_TO_PEER,
};

/*~ We keep a hash table (ccan/htable) of peers, which tells us what peers are
* already connected (by peer->id). */
struct peer {
Expand All @@ -60,15 +69,15 @@ struct peer {
/* Counters and keys for symmetric crypto */
struct crypto_state cs;

/* Connection to the peer */
/* Connection to the peer (NULL if it's disconnected and we're flushing) */
struct io_conn *to_peer;

/* Non-zero if shutting down. */
enum draining_state draining_state;

/* Counter to distinguish this connection from the next re-connection */
u64 counter;

/* Is this draining? If so, just keep writing until queue empty */
bool draining;

/* Connections to the subdaemons */
struct subd **subds;

Expand Down Expand Up @@ -376,8 +385,13 @@ struct io_plan *peer_connected(struct io_conn *conn,
enum is_websocket is_websocket,
bool incoming);

/* Removes peer from hash table, tells gossipd and lightningd. */
void destroy_peer(struct peer *peer);
/* Tell gossipd and lightningd that this peer is gone. */
void send_disconnected(struct daemon *daemon,
const struct node_id *id,
u64 connectd_counter);

/* Free peer immediately (don't wait for draining). */
void destroy_peer_immediately(struct peer *peer);

/* Remove a random connection, when under stress. */
void close_random_connection(struct daemon *daemon);
Expand Down
17 changes: 14 additions & 3 deletions connectd/connectd_wire.csv
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,20 @@ msgdata,connectd_peer_connected,flen,u16,
msgdata,connectd_peer_connected,features,u8,flen

# connectd -> master: peer disconnected.
msgtype,connectd_peer_disconnect_done,2006
msgdata,connectd_peer_disconnect_done,id,node_id,
msgdata,connectd_peer_disconnect_done,counter,u64,
msgtype,connectd_peer_disconnected,2006
msgdata,connectd_peer_disconnected,id,node_id,
msgdata,connectd_peer_disconnected,counter,u64,

# Connectd -> master: peer reconnected (disconnect & connect)
msgtype,connectd_peer_reconnected,2010
msgdata,connectd_peer_reconnected,id,node_id,
msgdata,connectd_peer_reconnected,prev_counter,u64,
msgdata,connectd_peer_reconnected,counter,u64,
msgdata,connectd_peer_reconnected,addr,wireaddr_internal,
msgdata,connectd_peer_reconnected,remote_addr,?wireaddr,
msgdata,connectd_peer_reconnected,incoming,bool,
msgdata,connectd_peer_reconnected,flen,u16,
msgdata,connectd_peer_reconnected,features,u8,flen

# Master -> connectd: make peer active immediately (we want to talk) (+ fd to subd).
msgtype,connectd_peer_connect_subd,2004
Expand Down
Loading
Loading