diff --git a/ccan/README b/ccan/README index 5aee71080460..8dbc5520b270 100644 --- a/ccan/README +++ b/ccan/README @@ -1,3 +1,3 @@ CCAN imported from http://ccodearchive.net. -CCAN version: init-2595-ge43c61c4 +CCAN version: init-2597-gc4760b30 diff --git a/ccan/ccan/io/io.c b/ccan/ccan/io/io.c index 58ae19bdbd83..baa9a497cc56 100644 --- a/ccan/ccan/io/io.c +++ b/ccan/ccan/io/io.c @@ -583,8 +583,10 @@ struct io_plan *io_sock_shutdown(struct io_conn *conn) if (shutdown(io_conn_fd(conn), SHUT_WR) != 0) return io_close(conn); - /* And leave unset .*/ - return &conn->plan[IO_IN]; + /* We need to make sure we don't try to write again, so + * "wait" on something which will never be woken: otherwise + * we will try to write, fail, and immediately close. */ + return io_wait_dir(conn, io_sock_shutdown, IO_OUT, io_never, NULL); } bool io_flush_sync(struct io_conn *conn) diff --git a/connectd/connectd.c b/connectd/connectd.c index de841c3ff3c1..f04f2caa798d 100644 --- a/connectd/connectd.c +++ b/connectd/connectd.c @@ -100,31 +100,11 @@ static struct connecting *find_connecting(struct daemon *daemon, return connecting_htable_get(daemon->connecting, id); } -/*~ When we free a peer, we remove it from the daemon's hashtable. - * We also call this manually if we want to elegantly drain peer's - * queues. */ -void destroy_peer(struct peer *peer) +/*~ When we free a peer, we remove it from the daemon's hashtable. */ +static void destroy_peer(struct peer *peer) { - assert(!peer->draining); - if (!peer_htable_del(peer->daemon->peers, peer)) abort(); - - /* Tell gossipd to stop asking this peer gossip queries */ - daemon_conn_send(peer->daemon->gossipd, - take(towire_gossipd_peer_gone(NULL, &peer->id))); - - /* Tell lightningd it's really disconnected */ - daemon_conn_send(peer->daemon->master, - take(towire_connectd_peer_disconnect_done(NULL, - &peer->id, - peer->counter))); - /* This makes multiplex.c routines not feed us more, but - * *also* means that if we're freed directly, the ->to_peer - * destructor won't call drain_peer(). */ - peer->draining = true; - - schedule_reconnect_if_important(peer->daemon, &peer->id); } /*~ This is where we create a new peer. */ @@ -146,7 +126,7 @@ static struct peer *new_peer(struct daemon *daemon, peer->peer_in = NULL; peer->sent_to_peer = NULL; peer->urgent = false; - peer->draining = false; + peer->draining_state = NOT_DRAINING; peer->peer_in_lastmsg = -1; peer->peer_outq = msg_queue_new(peer, false); peer->last_recv_time = time_now(); @@ -277,6 +257,22 @@ static void reset_reconnect_timer(struct peer *peer) imp->reconnect_secs = INITIAL_WAIT_SECONDS; } +void send_disconnected(struct daemon *daemon, + const struct node_id *id, + u64 connectd_counter) +{ + /* lightningd: it's gone */ + daemon_conn_send(daemon->master, + take(towire_connectd_peer_disconnected(NULL, id, connectd_counter))); + + /* Tell gossipd to stop asking this peer gossip queries */ + daemon_conn_send(daemon->gossipd, + take(towire_gossipd_peer_gone(NULL, id))); + + /* Start reconnection process if we care */ + schedule_reconnect_if_important(daemon, id); +} + /*~ Note the lack of static: this is called by peer_exchange_initmsg.c once the * INIT messages are exchanged, and also by the retry code above. */ struct io_plan *peer_connected(struct io_conn *conn, @@ -290,17 +286,20 @@ struct io_plan *peer_connected(struct io_conn *conn, bool incoming) { u8 *msg; - struct peer *peer; + struct peer *peer, *oldpeer; int unsup; size_t depender, missing; int subd_fd; bool option_gossip_queries; struct connecting *connect; + u64 prev_connectd_counter; /* We remove any previous connection immediately, on the assumption it's dead */ - peer = peer_htable_get(daemon->peers, id); - if (peer) - tal_free(peer); + oldpeer = peer_htable_get(daemon->peers, id); + if (oldpeer) { + prev_connectd_counter = oldpeer->counter; + destroy_peer_immediately(oldpeer); + } /* We promised we'd take it by marking it TAKEN above; prepare to free it. */ if (taken(their_features)) @@ -318,6 +317,9 @@ struct io_plan *peer_connected(struct io_conn *conn, unsup = features_unsupported(daemon->our_features, their_features, INIT_FEATURE); if (unsup != -1) { + /* We were going to send a reconnect message, but not now! */ + if (oldpeer) + send_disconnected(daemon, id, prev_connectd_counter); status_peer_unusual(id, "Unsupported feature %u", unsup); msg = towire_warningfmt(NULL, NULL, "Unsupported feature %u", unsup); @@ -326,6 +328,9 @@ struct io_plan *peer_connected(struct io_conn *conn, } if (!feature_check_depends(their_features, &depender, &missing)) { + /* We were going to send a reconnect message, but not now! */ + if (oldpeer) + send_disconnected(daemon, id, prev_connectd_counter); status_peer_unusual(id, "Feature %zu requires feature %zu", depender, missing); msg = towire_warningfmt(NULL, NULL, @@ -362,23 +367,37 @@ struct io_plan *peer_connected(struct io_conn *conn, peer = new_peer(daemon, id, cs, their_features, is_websocket, conn, &subd_fd); /* Only takes over conn if it succeeds. */ - if (!peer) + if (!peer) { + /* We were going to send a reconnect message, but not now! */ + if (oldpeer) + send_disconnected(daemon, id, prev_connectd_counter); return io_close(conn); + } /* Tell gossipd it can ask query this new peer for gossip */ option_gossip_queries = feature_negotiated(daemon->our_features, their_features, OPT_GOSSIP_QUERIES); - msg = towire_gossipd_new_peer(NULL, id, option_gossip_queries); - daemon_conn_send(daemon->gossipd, take(msg)); /* Get ready for streaming gossip from the store */ setup_peer_gossip_store(peer, daemon->our_features, their_features); - /* Create message to tell master peer has connected. */ - msg = towire_connectd_peer_connected(NULL, id, peer->counter, - addr, remote_addr, - incoming, their_features); + /* Create message to tell master peer has connected/reconnected. */ + if (oldpeer) { + msg = towire_connectd_peer_reconnected(NULL, id, + prev_connectd_counter, + peer->counter, + addr, remote_addr, + incoming, their_features); + } else { + /* Tell gossipd about new peer */ + msg = towire_gossipd_new_peer(NULL, id, option_gossip_queries); + daemon_conn_send(daemon->gossipd, take(msg)); + + msg = towire_connectd_peer_connected(NULL, id, peer->counter, + addr, remote_addr, + incoming, their_features); + } /*~ daemon_conn is a message queue for inter-daemon communication: we * queue up the `connect_peer_connected` message to tell lightningd @@ -1923,9 +1942,7 @@ static void peer_disconnect(struct daemon *daemon, const u8 *msg) if (peer->counter != counter) return; - /* We make sure any final messages from the subds are sent! */ - status_peer_debug(&id, "disconnect_peer"); - drain_peer(peer); + disconnect_peer(peer); } /* lightningd tells us a peer is no longer "important". */ @@ -2371,7 +2388,8 @@ static struct io_plan *recv_req(struct io_conn *conn, case WIRE_CONNECTD_PING_DONE: case WIRE_CONNECTD_GOT_ONIONMSG_TO_US: case WIRE_CONNECTD_CUSTOMMSG_IN: - case WIRE_CONNECTD_PEER_DISCONNECT_DONE: + case WIRE_CONNECTD_PEER_DISCONNECTED: + case WIRE_CONNECTD_PEER_RECONNECTED: case WIRE_CONNECTD_START_SHUTDOWN_REPLY: case WIRE_CONNECTD_INJECT_ONIONMSG_REPLY: case WIRE_CONNECTD_ONIONMSG_FORWARD_FAIL: diff --git a/connectd/connectd.h b/connectd/connectd.h index d3fb0c33abf7..5774c220f4da 100644 --- a/connectd/connectd.h +++ b/connectd/connectd.h @@ -46,6 +46,15 @@ enum pong_expect_type { PONG_EXPECTED_PROBING = 2, }; +enum draining_state { + /* Normal state */ + NOT_DRAINING, + /* First, reading remaining messages from subds */ + READING_FROM_SUBDS, + /* Finally, writing any queued messages to peer */ + WRITING_TO_PEER, +}; + /*~ We keep a hash table (ccan/htable) of peers, which tells us what peers are * already connected (by peer->id). */ struct peer { @@ -60,15 +69,15 @@ struct peer { /* Counters and keys for symmetric crypto */ struct crypto_state cs; - /* Connection to the peer */ + /* Connection to the peer (NULL if it's disconnected and we're flushing) */ struct io_conn *to_peer; + /* Non-zero if shutting down. */ + enum draining_state draining_state; + /* Counter to distinguish this connection from the next re-connection */ u64 counter; - /* Is this draining? If so, just keep writing until queue empty */ - bool draining; - /* Connections to the subdaemons */ struct subd **subds; @@ -376,8 +385,13 @@ struct io_plan *peer_connected(struct io_conn *conn, enum is_websocket is_websocket, bool incoming); -/* Removes peer from hash table, tells gossipd and lightningd. */ -void destroy_peer(struct peer *peer); +/* Tell gossipd and lightningd that this peer is gone. */ +void send_disconnected(struct daemon *daemon, + const struct node_id *id, + u64 connectd_counter); + +/* Free peer immediately (don't wait for draining). */ +void destroy_peer_immediately(struct peer *peer); /* Remove a random connection, when under stress. */ void close_random_connection(struct daemon *daemon); diff --git a/connectd/connectd_wire.csv b/connectd/connectd_wire.csv index cd9b65353df2..23b7d37da131 100644 --- a/connectd/connectd_wire.csv +++ b/connectd/connectd_wire.csv @@ -84,9 +84,20 @@ msgdata,connectd_peer_connected,flen,u16, msgdata,connectd_peer_connected,features,u8,flen # connectd -> master: peer disconnected. -msgtype,connectd_peer_disconnect_done,2006 -msgdata,connectd_peer_disconnect_done,id,node_id, -msgdata,connectd_peer_disconnect_done,counter,u64, +msgtype,connectd_peer_disconnected,2006 +msgdata,connectd_peer_disconnected,id,node_id, +msgdata,connectd_peer_disconnected,counter,u64, + +# Connectd -> master: peer reconnected (disconnect & connect) +msgtype,connectd_peer_reconnected,2010 +msgdata,connectd_peer_reconnected,id,node_id, +msgdata,connectd_peer_reconnected,prev_counter,u64, +msgdata,connectd_peer_reconnected,counter,u64, +msgdata,connectd_peer_reconnected,addr,wireaddr_internal, +msgdata,connectd_peer_reconnected,remote_addr,?wireaddr, +msgdata,connectd_peer_reconnected,incoming,bool, +msgdata,connectd_peer_reconnected,flen,u16, +msgdata,connectd_peer_reconnected,features,u8,flen # Master -> connectd: make peer active immediately (we want to talk) (+ fd to subd). msgtype,connectd_peer_connect_subd,2004 diff --git a/connectd/multiplex.c b/connectd/multiplex.c index 99137a0f5b9a..77101e207603 100644 --- a/connectd/multiplex.c +++ b/connectd/multiplex.c @@ -65,6 +65,9 @@ struct subd { bool rcvd_tx_abort; }; +/* FIXME: reorder! */ +static void destroy_connected_subd(struct subd *subd); + static struct subd *find_subd(struct peer *peer, const struct channel_id *channel_id) { @@ -90,18 +93,6 @@ static struct subd *find_subd(struct peer *peer, return NULL; } -/* Except for a reconnection, we finally free a peer when the io_conn - * is closed and all subds are gone. */ -static void maybe_free_peer(struct peer *peer) -{ - if (peer->to_peer) - return; - if (tal_count(peer->subds) != 0) - return; - status_debug("maybe_free_peer freeing peer!"); - tal_free(peer); -} - /* We try to send the final messages, but if buffer is full and they're * not reading, we have to give up. */ static void close_peer_io_timeout(struct peer *peer) @@ -112,60 +103,61 @@ static void close_peer_io_timeout(struct peer *peer) static void close_subd_timeout(struct subd *subd) { - status_peer_debug(&subd->peer->id, "Subd did not close, forcing close"); + status_peer_broken(&subd->peer->id, "Subd did not close, forcing close"); io_close(subd->conn); } -void drain_peer(struct peer *peer) +void inject_peer_msg(struct peer *peer, const u8 *msg TAKES) { - status_debug("drain_peer"); - assert(!peer->draining); + status_peer_io(LOG_IO_OUT, &peer->id, msg); + msg_enqueue(peer->peer_outq, msg); +} + +static void drain_peer(struct peer *peer) +{ + assert(tal_count(peer->subds) == 0); + + /* You have five seconds to drain. */ + peer->draining_state = WRITING_TO_PEER; + status_peer_debug(&peer->id, "disconnect_peer: draining with 5 second timer."); + notleak(new_reltimer(&peer->daemon->timers, + peer->to_peer, time_from_sec(5), + close_peer_io_timeout, peer)); + io_wake(peer->peer_outq); - /* Since we immediately free any subds we didn't connect yet, - * we need peer->to_peer set so it won't free peer! */ - assert(peer->to_peer); + /* We will discard what they send us, but listen so we catch closes */ + io_wake(&peer->peer_in); +} + +void disconnect_peer(struct peer *peer) +{ + peer->draining_state = READING_FROM_SUBDS; - /* Give the subds 5 seconds to close their fds to us. */ for (size_t i = 0; i < tal_count(peer->subds); i++) { - if (!peer->subds[i]->conn) { - /* Deletes itself from array, so be careful! */ - tal_free(peer->subds[i]); + /* Start timer in case it doesn't close by itself */ + if (peer->subds[i]->conn) { + status_peer_debug(&peer->id, "disconnect_peer: setting 5 second timer for subd %zu/%zu.", + i, tal_count(peer->subds)); + notleak(new_reltimer(&peer->daemon->timers, peer->subds[i], + time_from_sec(5), + close_subd_timeout, peer->subds[i])); + } else { + /* We told lightningd that peer spoke, but it hasn't returned yet. */ + tal_arr_remove(&peer->subds, i); i--; - continue; } - status_debug("drain_peer draining subd!"); - notleak(new_reltimer(&peer->daemon->timers, - peer->subds[i], time_from_sec(5), - close_subd_timeout, peer->subds[i])); - /* Wake any outgoing queued on subd */ - io_wake(peer->subds[i]->outq); } - /* Wake them to ensure they notice the close! */ - io_wake(&peer->subds); - - if (peer->to_peer) { - /* You have 5 seconds to drain... */ - notleak(new_reltimer(&peer->daemon->timers, - peer->to_peer, time_from_sec(5), - close_peer_io_timeout, peer)); + if (tal_count(peer->subds) != 0) { + status_peer_debug(&peer->id, "disconnect_peer: waking %zu subds.", + tal_count(peer->subds)); + /* Wake them up so we read again */ + io_wake(&peer->subds); + } else { + status_peer_debug(&peer->id, "disconnect_peer: no subds, draining now."); + /* No subds left, start draining peer */ + drain_peer(peer); } - - /* Clean peer from hashtable; we no longer exist. */ - destroy_peer(peer); - tal_del_destructor(peer, destroy_peer); - - /* This is a 5-second leak, worst case! */ - notleak(peer); - - /* Start draining process! */ - io_wake(peer->peer_outq); -} - -void inject_peer_msg(struct peer *peer, const u8 *msg TAKES) -{ - status_peer_io(LOG_IO_OUT, &peer->id, msg); - msg_enqueue(peer->peer_outq, msg); } /* Send warning, close connection to peer */ @@ -183,7 +175,20 @@ static void send_warning(struct peer *peer, const char *fmt, ...) va_end(ap); inject_peer_msg(peer, take(msg)); - drain_peer(peer); + + /* Free all the subds immediately */ + for (size_t i = 0; i < tal_count(peer->subds); i++) { + /* Once conn exists, subd is a child of the conn. Free conn, free subd. */ + if (peer->subds[i]->conn) { + tal_del_destructor(peer->subds[i], destroy_connected_subd); + tal_free(peer->subds[i]->conn); + } else { + /* We told lightningd that peer spoke, but it hasn't returned yet. */ + tal_free(peer->subds[i]); + } + } + tal_resize(&peer->subds, 0); + disconnect_peer(peer); } /* Kicks off write_to_peer() to look for more gossip to send from store */ @@ -1081,16 +1086,15 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn, /* Still nothing to send? */ if (!msg) { - /* Draining? We're done when subds are done. */ - if (peer->draining && tal_count(peer->subds) == 0) { + /* Draining? Shutdown socket (to avoid losing msgs) */ + if (peer->draining_state == WRITING_TO_PEER) { + status_peer_debug(&peer->id, "draining done, shutting down"); io_wake(&peer->peer_in); return io_sock_shutdown(peer_conn); } /* If they want us to send gossip, do so now. */ - if (!peer->draining) - msg = maybe_gossip_msg(NULL, peer); - + msg = maybe_gossip_msg(NULL, peer); if (!msg) { /* Tell them to read again, */ io_wake(&peer->subds); @@ -1102,6 +1106,10 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn, } } + if (peer->draining_state == WRITING_TO_PEER) + status_peer_debug(&peer->id, "draining, but sending %s.", + peer_wire_name(fromwire_peektype(msg))); + /* dev_disconnect can disable writes */ if (peer->dev_writes_enabled) { if (*peer->dev_writes_enabled == 0) { @@ -1174,7 +1182,7 @@ static struct io_plan *write_to_subd(struct io_conn *subd_conn, return io_write_wire(subd_conn, take(msg), write_to_subd, subd); } -static void destroy_subd(struct subd *subd) +static void destroy_connected_subd(struct subd *subd) { struct peer *peer = subd->peer; size_t pos; @@ -1188,13 +1196,15 @@ static void destroy_subd(struct subd *subd) * have been waiting for write_to_subd) */ io_wake(&peer->peer_in); - /* If this is the last subd, and we're draining, wake outgoing - * now (it will start shutdown). */ - if (tal_count(peer->subds) == 0 && peer->to_peer && peer->draining) - msg_wake(peer->peer_outq); - - /* Maybe we were last subd out? */ - maybe_free_peer(peer); + if (tal_count(peer->subds) == 0) { + if (!peer->to_peer) { + /* Nothing left */ + tal_free(peer); + } else if (peer->draining_state == READING_FROM_SUBDS) { + /* We've finished draining subds, start draining peer */ + drain_peer(peer); + } + } } static struct subd *new_subd(struct peer *peer, @@ -1213,8 +1223,6 @@ static struct subd *new_subd(struct peer *peer, /* Connect it to the peer */ tal_arr_expand(&peer->subds, subd); - tal_add_destructor(subd, destroy_subd); - return subd; } @@ -1264,7 +1272,7 @@ static struct io_plan *read_body_from_peer_done(struct io_conn *peer_conn, peer->last_recv_time = time_now(); /* Don't process packets while we're closing */ - if (peer->draining) + if (peer->draining_state != NOT_DRAINING) return next_read(peer_conn, peer); /* If we swallow this, just try again. */ @@ -1384,23 +1392,53 @@ static struct io_plan *subd_conn_init(struct io_conn *subd_conn, /* subd is a child of the conn: free when it closes! */ tal_steal(subd->conn, subd); + tal_add_destructor(subd, destroy_connected_subd); return io_duplex(subd_conn, read_from_subd(subd_conn, subd), write_to_subd(subd_conn, subd)); } +/* Peer disconnected (we remove this if *we* close). */ static void destroy_peer_conn(struct io_conn *peer_conn, struct peer *peer) { assert(peer->to_peer == peer_conn); - /* If subds need cleaning, this will do it */ - if (!peer->draining) - drain_peer(peer); - + /* We are no longer connected. Tell lightningd & gossipd*/ peer->to_peer = NULL; + send_disconnected(peer->daemon, &peer->id, peer->counter); + + /* Wake subds: give them 5 seconds to flush. */ + for (size_t i = 0; i < tal_count(peer->subds); i++) { + /* Might not be connected yet (no destructor, simple) */ + if (!peer->subds[i]->conn) { + tal_arr_remove(&peer->subds, i); + i--; + continue; + } + /* Wake the writers to subd: you have 5 seconds */ + notleak(new_reltimer(&peer->daemon->timers, + peer->subds[i], time_from_sec(5), + close_subd_timeout, peer->subds[i])); + io_wake(peer->subds[i]->outq); + } + + /* If there were no subds, free peer immediately. */ + if (tal_count(peer->subds) == 0) + tal_free(peer); +} - /* Or if there were no subds, this will free the peer. */ - maybe_free_peer(peer); +/* When peer reconnects, we close the old connection unceremoniously. */ +void destroy_peer_immediately(struct peer *peer) +{ + /* Forgo normal destructors which involve timeouts */ + if (peer->to_peer) + tal_del_destructor2(peer->to_peer, destroy_peer_conn, peer); + + for (size_t i = 0; i < tal_count(peer->subds); i++) { + if (peer->subds[i]->conn) + tal_del_destructor(peer->subds[i], destroy_connected_subd); + } + tal_free(peer); } struct io_plan *multiplex_peer_setup(struct io_conn *peer_conn, @@ -1456,7 +1494,7 @@ void peer_connect_subd(struct daemon *daemon, const u8 *msg, int fd) } /* Could be disconnecting now */ - if (!peer->to_peer) { + if (!peer->to_peer || peer->draining_state != NOT_DRAINING) { close(fd); return; } diff --git a/connectd/multiplex.h b/connectd/multiplex.h index 5988049c7538..1a20143a6764 100644 --- a/connectd/multiplex.h +++ b/connectd/multiplex.h @@ -34,7 +34,10 @@ void set_custommsgs(struct daemon *daemon, const u8 *msg); /* Lightningd wants to talk to you. */ void peer_connect_subd(struct daemon *daemon, const u8 *msg, int fd); -/* Start shutting down peer. */ -void drain_peer(struct peer *peer); +/* Disconnect peer: give outgoing msgs time to drain though. */ +void disconnect_peer(struct peer *peer); + +/* Get rid of this immediately. */ +void destroy_peer_immediately(struct peer *peer); #endif /* LIGHTNING_CONNECTD_MULTIPLEX_H */ diff --git a/lightningd/connect_control.c b/lightningd/connect_control.c index 2fef1503c998..2cdeb610ba48 100644 --- a/lightningd/connect_control.c +++ b/lightningd/connect_control.c @@ -246,7 +246,7 @@ static struct command_result *json_connect(struct command *cmd, subd_send_msg(cmd->ld->connectd, take(towire_connectd_connect_to_peer(NULL, &id_addr.id, addr, true))); - /* Leave this here for peer_connected, connect_failed or peer_disconnect_done. */ + /* Leave this here for peer_connected, connect_failed or peer_disconnected. */ new_connect(cmd->ld, &id_addr.id, cmd); return command_still_pending(cmd); } @@ -500,15 +500,16 @@ static unsigned connectd_msg(struct subd *connectd, const u8 *msg, const int *fd break; case WIRE_CONNECTD_PEER_CONNECTED: - peer_connected(connectd->ld, msg); + case WIRE_CONNECTD_PEER_RECONNECTED: + handle_peer_connected(connectd->ld, msg); break; case WIRE_CONNECTD_PEER_SPOKE: - peer_spoke(connectd->ld, msg); + handle_peer_spoke(connectd->ld, msg); break; - case WIRE_CONNECTD_PEER_DISCONNECT_DONE: - peer_disconnect_done(connectd->ld, msg); + case WIRE_CONNECTD_PEER_DISCONNECTED: + handle_peer_disconnected(connectd->ld, msg); break; case WIRE_CONNECTD_CONNECT_FAILED: diff --git a/lightningd/dual_open_control.c b/lightningd/dual_open_control.c index 3ad5cfb1dbb7..1e46c06a991c 100644 --- a/lightningd/dual_open_control.c +++ b/lightningd/dual_open_control.c @@ -3696,6 +3696,28 @@ static void handle_commit_received(struct subd *dualopend, abort(); } +static void handle_dualopend_got_announcement(struct subd *dualopend, const u8 *msg) +{ + struct channel *channel = dualopend->channel; + secp256k1_ecdsa_signature remote_ann_node_sig; + secp256k1_ecdsa_signature remote_ann_bitcoin_sig; + struct short_channel_id scid; + + if (!fromwire_dualopend_got_announcement(msg, + &scid, + &remote_ann_node_sig, + &remote_ann_bitcoin_sig)) { + channel_internal_error(channel, + "bad dualopend_got_announcement %s", + tal_hex(tmpctx, msg)); + return; + } + + channel_gossip_got_announcement_sigs(channel, scid, + &remote_ann_node_sig, + &remote_ann_bitcoin_sig); +} + static unsigned int dual_opend_msg(struct subd *dualopend, const u8 *msg, const int *fds) { @@ -3758,6 +3780,9 @@ static unsigned int dual_opend_msg(struct subd *dualopend, case WIRE_DUALOPEND_UPDATE_REQUIRE_CONFIRMED: handle_update_require_confirmed(dualopend, msg); return 0; + case WIRE_DUALOPEND_GOT_ANNOUNCEMENT: + handle_dualopend_got_announcement(dualopend, msg); + return 0; /* Messages we send */ case WIRE_DUALOPEND_INIT: case WIRE_DUALOPEND_REINIT: diff --git a/lightningd/notification.c b/lightningd/notification.c index 22761aabd396..75134ecdea91 100644 --- a/lightningd/notification.c +++ b/lightningd/notification.c @@ -90,7 +90,7 @@ static void disconnect_notification_serialize(struct json_stream *stream, } REGISTER_NOTIFICATION(disconnect); -void notify_disconnect(struct lightningd *ld, struct node_id *nodeid) +void notify_disconnect(struct lightningd *ld, const struct node_id *nodeid) { struct jsonrpc_notification *n = notify_start(ld, "disconnect"); if (!n) diff --git a/lightningd/notification.h b/lightningd/notification.h index 20794a93a845..91454a550ceb 100644 --- a/lightningd/notification.h +++ b/lightningd/notification.h @@ -25,7 +25,7 @@ void notify_connect(struct lightningd *ld, const struct node_id *nodeid, bool incoming, const struct wireaddr_internal *addr); -void notify_disconnect(struct lightningd *ld, struct node_id *nodeid); +void notify_disconnect(struct lightningd *ld, const struct node_id *nodeid); void notify_warning(struct lightningd *ld, struct log_entry *l); diff --git a/lightningd/peer_control.c b/lightningd/peer_control.c index 35b88089991f..19780fae7e7a 100644 --- a/lightningd/peer_control.c +++ b/lightningd/peer_control.c @@ -74,6 +74,12 @@ #include #include +/* FIXME: Reorder! */ +static void peer_disconnected(struct lightningd *ld, + const struct node_id *id, + u64 connectd_counter, + bool fail_connect_attempts); + /* Common pattern: create a sockpair for this channel, return one as a peer_fd */ struct peer_fd *sockpair(const tal_t *ctx, struct channel *channel, int *otherfd, const u8 **warning) @@ -1722,9 +1728,9 @@ REGISTER_PLUGIN_HOOK(peer_connected, peer_connected_serialize, struct peer_connected_hook_payload *); -/* Connectd tells us a peer has connected: it never hands us duplicates, since +/* Connectd tells us a peer has connected/reconnected: it never hands us duplicates, since * it holds them until we say peer_disconnected. */ -void peer_connected(struct lightningd *ld, const u8 *msg) +void handle_peer_connected(struct lightningd *ld, const u8 *msg) { struct node_id id; u8 *their_features; @@ -1738,13 +1744,27 @@ void peer_connected(struct lightningd *ld, const u8 *msg) hook_payload->ld = ld; hook_payload->error = NULL; if (!fromwire_connectd_peer_connected(hook_payload, msg, - &id, &connectd_counter, - &hook_payload->addr, - &hook_payload->remote_addr, - &hook_payload->incoming, - &their_features)) - fatal("Connectd gave bad CONNECT_PEER_CONNECTED message %s", - tal_hex(msg, msg)); + &id, &connectd_counter, + &hook_payload->addr, + &hook_payload->remote_addr, + &hook_payload->incoming, + &their_features)) { + u64 prev_connectd_counter; + if (!fromwire_connectd_peer_reconnected(hook_payload, msg, + &id, &prev_connectd_counter, + &connectd_counter, + &hook_payload->addr, + &hook_payload->remote_addr, + &hook_payload->incoming, + &their_features)) { + fatal("Connectd gave bad CONNECT_PEER_(RE)CONNECTED message %s", + tal_hex(msg, msg)); + } + /* Reconnect? Mark the disconnect *first*, but don't + * fail any connect attempts: this is a race. */ + log_peer_debug(ld->log, &id, "peer reconnected"); + peer_disconnected(ld, &id, prev_connectd_counter, false); + } /* If we connected, and it's a normal address */ if (!hook_payload->incoming @@ -1881,7 +1901,7 @@ static void send_reestablish(struct peer *peer, * a subd. Normally this is a race, but it happens for real when opening * a new channel, or referring to a channel we no longer want to talk to * it about. */ -void peer_spoke(struct lightningd *ld, const u8 *msg) +void handle_peer_spoke(struct lightningd *ld, const u8 *msg) { struct node_id id; u16 msgtype; @@ -2081,41 +2101,40 @@ static void destroy_disconnect_command(struct disconnect_command *dc) list_del(&dc->list); } -void peer_disconnect_done(struct lightningd *ld, const u8 *msg) +/* This is also called when we reconnect, but we don't fail connect attempts */ +static void peer_disconnected(struct lightningd *ld, + const struct node_id *id, + u64 connectd_counter, + bool fail_connect_attempts) { - struct node_id id; - u64 connectd_counter; struct disconnect_command *i, *next; struct peer *p; - if (!fromwire_connectd_peer_disconnect_done(msg, &id, &connectd_counter)) - fatal("Connectd gave bad PEER_DISCONNECT_DONE message %s", - tal_hex(msg, msg)); - /* If we still have peer, it's disconnected now */ - /* FIXME: We should keep peers until it tells us they're disconnected, - * and not free when no more channels. */ - p = peer_by_id(ld, &id); + p = peer_by_id(ld, id); if (p) { struct channel *channel; assert(p->connectd_counter == connectd_counter); - log_peer_debug(ld->log, &id, "peer_disconnect_done"); + log_peer_debug(ld->log, id, "peer_disconnected"); p->connected = PEER_DISCONNECTED; + /* Note: we don't force subds to stop. They should exit soon, + * but if we get a (re)connection we'll force them to stop */ list_for_each(&p->channels, channel, list) channel_gossip_channel_disconnect(channel); } /* If you were trying to connect, it failed. */ - connect_failed_disconnect(ld, &id, - p && !p->connected_incoming ? &p->addr : NULL); + if (fail_connect_attempts) + connect_failed_disconnect(ld, id, + p && !p->connected_incoming ? &p->addr : NULL); /* Fire off plugin notifications */ - notify_disconnect(ld, &id); + notify_disconnect(ld, id); /* Wake any disconnect commands (removes self from list) */ list_for_each_safe(&ld->disconnect_commands, i, next, list) { - if (!node_id_eq(&i->id, &id)) + if (!node_id_eq(&i->id, id)) continue; was_pending(command_success(i->cmd, @@ -2127,6 +2146,18 @@ void peer_disconnect_done(struct lightningd *ld, const u8 *msg) maybe_delete_peer(p); } +void handle_peer_disconnected(struct lightningd *ld, const u8 *msg) +{ + struct node_id id; + u64 connectd_counter; + + if (!fromwire_connectd_peer_disconnected(msg, &id, &connectd_counter)) + fatal("Connectd gave bad PEER_DISCONNECTED message %s", + tal_hex(msg, msg)); + + peer_disconnected(ld, &id, connectd_counter, true); +} + void update_channel_from_inflight(struct lightningd *ld, struct channel *channel, const struct channel_inflight *inflight, diff --git a/lightningd/peer_control.h b/lightningd/peer_control.h index 2a28183fdace..382d0826baf1 100644 --- a/lightningd/peer_control.h +++ b/lightningd/peer_control.h @@ -87,9 +87,9 @@ struct peer *peer_from_json(struct lightningd *ld, const jsmntok_t *peeridtok); /* connectd tells us what peer is doing */ -void peer_connected(struct lightningd *ld, const u8 *msg); -void peer_disconnect_done(struct lightningd *ld, const u8 *msg); -void peer_spoke(struct lightningd *ld, const u8 *msg); +void handle_peer_connected(struct lightningd *ld, const u8 *msg); +void handle_peer_disconnected(struct lightningd *ld, const u8 *msg); +void handle_peer_spoke(struct lightningd *ld, const u8 *msg); /* Could be configurable. */ #define OUR_CHANNEL_FLAGS CHANNEL_FLAGS_ANNOUNCE_CHANNEL diff --git a/lightningd/test/run-invoice-select-inchan.c b/lightningd/test/run-invoice-select-inchan.c index fc7c642496ab..00eef5e44b70 100644 --- a/lightningd/test/run-invoice-select-inchan.c +++ b/lightningd/test/run-invoice-select-inchan.c @@ -318,9 +318,12 @@ bool fromwire_channeld_dev_memleak_reply(const void *p UNNEEDED, bool *leak UNNE /* Generated stub for fromwire_connectd_peer_connected */ bool fromwire_connectd_peer_connected(const tal_t *ctx UNNEEDED, const void *p UNNEEDED, struct node_id *id UNNEEDED, u64 *counter UNNEEDED, struct wireaddr_internal *addr UNNEEDED, struct wireaddr **remote_addr UNNEEDED, bool *incoming UNNEEDED, u8 **features UNNEEDED) { fprintf(stderr, "fromwire_connectd_peer_connected called!\n"); abort(); } -/* Generated stub for fromwire_connectd_peer_disconnect_done */ -bool fromwire_connectd_peer_disconnect_done(const void *p UNNEEDED, struct node_id *id UNNEEDED, u64 *counter UNNEEDED) -{ fprintf(stderr, "fromwire_connectd_peer_disconnect_done called!\n"); abort(); } +/* Generated stub for fromwire_connectd_peer_disconnected */ +bool fromwire_connectd_peer_disconnected(const void *p UNNEEDED, struct node_id *id UNNEEDED, u64 *counter UNNEEDED) +{ fprintf(stderr, "fromwire_connectd_peer_disconnected called!\n"); abort(); } +/* Generated stub for fromwire_connectd_peer_reconnected */ +bool fromwire_connectd_peer_reconnected(const tal_t *ctx UNNEEDED, const void *p UNNEEDED, struct node_id *id UNNEEDED, u64 *prev_counter UNNEEDED, u64 *counter UNNEEDED, struct wireaddr_internal *addr UNNEEDED, struct wireaddr **remote_addr UNNEEDED, bool *incoming UNNEEDED, u8 **features UNNEEDED) +{ fprintf(stderr, "fromwire_connectd_peer_reconnected called!\n"); abort(); } /* Generated stub for fromwire_connectd_peer_spoke */ bool fromwire_connectd_peer_spoke(const tal_t *ctx UNNEEDED, const void *p UNNEEDED, struct node_id *id UNNEEDED, u64 *counter UNNEEDED, u16 *msgtype UNNEEDED, struct channel_id *channel_id UNNEEDED, wirestring **error UNNEEDED) { fprintf(stderr, "fromwire_connectd_peer_spoke called!\n"); abort(); } @@ -775,7 +778,7 @@ void notify_connect(struct lightningd *ld UNNEEDED, const struct wireaddr_internal *addr UNNEEDED) { fprintf(stderr, "notify_connect called!\n"); abort(); } /* Generated stub for notify_disconnect */ -void notify_disconnect(struct lightningd *ld UNNEEDED, struct node_id *nodeid UNNEEDED) +void notify_disconnect(struct lightningd *ld UNNEEDED, const struct node_id *nodeid UNNEEDED) { fprintf(stderr, "notify_disconnect called!\n"); abort(); } /* Generated stub for notify_invoice_creation */ void notify_invoice_creation(struct lightningd *ld UNNEEDED, diff --git a/openingd/dualopend.c b/openingd/dualopend.c index d140d37b6c4b..8e902fa59670 100644 --- a/openingd/dualopend.c +++ b/openingd/dualopend.c @@ -4169,12 +4169,33 @@ static u8 *handle_master_in(struct state *state) case WIRE_DUALOPEND_VALIDATE_LEASE: case WIRE_DUALOPEND_VALIDATE_INPUTS: case WIRE_DUALOPEND_UPDATE_REQUIRE_CONFIRMED: + case WIRE_DUALOPEND_GOT_ANNOUNCEMENT: break; } status_failed(STATUS_FAIL_MASTER_IO, "Unknown msg %s", tal_hex(tmpctx, msg)); } +static void handle_announcement_signatures(struct state *state, const u8 *msg) +{ + struct channel_id chanid; + struct short_channel_id remote_scid; + secp256k1_ecdsa_signature remote_node_sig, remote_bitcoin_sig; + + if (!fromwire_announcement_signatures(msg, + &chanid, + &remote_scid, + &remote_node_sig, + &remote_bitcoin_sig)) + open_err_fatal(state, "Bad announcement_signatures %s", tal_hex(msg, msg)); + + wire_sync_write(REQ_FD, + take(towire_dualopend_got_announcement(NULL, + remote_scid, + &remote_node_sig, + &remote_bitcoin_sig))); +} + /*~ Standard "peer sent a message, handle it" demuxer. Though it really only * handles a few messages, we use the standard form as principle of least * surprise. */ @@ -4221,6 +4242,9 @@ static u8 *handle_peer_in(struct state *state) case WIRE_TX_ABORT: handle_tx_abort(state, msg); return NULL; + case WIRE_ANNOUNCEMENT_SIGNATURES: + handle_announcement_signatures(state, msg); + return NULL; /* Otherwise we fall through */ case WIRE_INIT: case WIRE_ERROR: @@ -4239,7 +4263,6 @@ static u8 *handle_peer_in(struct state *state) case WIRE_UPDATE_FEE: case WIRE_UPDATE_BLOCKHEIGHT: case WIRE_CHANNEL_REESTABLISH: - case WIRE_ANNOUNCEMENT_SIGNATURES: case WIRE_GOSSIP_TIMESTAMP_FILTER: case WIRE_ONION_MESSAGE: case WIRE_ACCEPT_CHANNEL2: diff --git a/openingd/dualopend_wire.csv b/openingd/dualopend_wire.csv index 1c2589abf9f0..34841392ffa7 100644 --- a/openingd/dualopend_wire.csv +++ b/openingd/dualopend_wire.csv @@ -288,3 +288,8 @@ msgdata,dualopend_validate_lease,their_pubkey,pubkey, msgtype,dualopend_validate_lease_reply,7127 msgdata,dualopend_validate_lease_reply,err_msg,?wirestring, + +msgtype,dualopend_got_announcement,7031 +msgdata,dualopend_got_announcement,scid,short_channel_id, +msgdata,dualopend_got_announcement,remote_ann_node_sig,secp256k1_ecdsa_signature, +msgdata,dualopend_got_announcement,remote_ann_bitcoin_sig,secp256k1_ecdsa_signature, diff --git a/tests/test_connection.py b/tests/test_connection.py index 8f10c2555cff..e3701acfe633 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -1397,7 +1397,8 @@ def test_funding_external_wallet_corners(node_factory, bitcoind): @pytest.mark.openchannel('v2') def test_funding_v2_corners(node_factory, bitcoind): - l1 = node_factory.get_node(may_reconnect=True) + # dualopend doesn't listen :( + l1 = node_factory.get_node(may_reconnect=True, broken_log='Subd did not close, forcing close') l2 = node_factory.get_node(may_reconnect=True) # We have wumbo, it's OK @@ -2971,7 +2972,7 @@ def test_opener_feerate_reconnect(node_factory, bitcoind): # Wait until they reconnect. l1.daemon.wait_for_logs(['Peer transient failure in CHANNELD_NORMAL', - 'peer_disconnect_done']) + 'peer_disconnected']) wait_for(lambda: l1.rpc.getpeer(l2.info['id'])['connected']) # Should work normally. @@ -4506,7 +4507,7 @@ def test_reconnect_no_additional_transient_failure(node_factory, bitcoind): l1.stop() # We wait for l2 to disconnect, ofc we also see an expected "Peer transient failure" here. l2.daemon.wait_for_logs([f"{l1id}-channeld-chan#1: Peer connection lost", - f"{l1id}-lightningd: peer_disconnect_done", + f"{l1id}-lightningd: peer_disconnected", f"{l1id}-chan#1: Peer transient failure in CHANNELD_NORMAL: channeld: Owning subdaemon channeld died"]) # When we restart l1 we should not see another Peer transient failure message. diff --git a/tests/test_opening.py b/tests/test_opening.py index e533d1f873bf..f2208d5ccc5c 100644 --- a/tests/test_opening.py +++ b/tests/test_opening.py @@ -931,7 +931,7 @@ def test_rbf_reconnect_tx_construct(node_factory, bitcoind, chainparams): l1.daemon.wait_for_logs([r'Got dualopend reestablish', r'No commitment, not sending our sigs', r'dev_disconnect: -WIRE_COMMITMENT_SIGNED', - 'peer_disconnect_done']) + 'peer_disconnected']) assert not l1.rpc.getpeer(l2.info['id'])['connected'] l1.rpc.connect(l2.info['id'], 'localhost', l2.port) diff --git a/wallet/test/run-wallet.c b/wallet/test/run-wallet.c index c1cd3b066f25..5a6ad596c3b7 100644 --- a/wallet/test/run-wallet.c +++ b/wallet/test/run-wallet.c @@ -314,9 +314,12 @@ bool fromwire_channeld_sending_commitsig(const tal_t *ctx UNNEEDED, const void * /* Generated stub for fromwire_connectd_peer_connected */ bool fromwire_connectd_peer_connected(const tal_t *ctx UNNEEDED, const void *p UNNEEDED, struct node_id *id UNNEEDED, u64 *counter UNNEEDED, struct wireaddr_internal *addr UNNEEDED, struct wireaddr **remote_addr UNNEEDED, bool *incoming UNNEEDED, u8 **features UNNEEDED) { fprintf(stderr, "fromwire_connectd_peer_connected called!\n"); abort(); } -/* Generated stub for fromwire_connectd_peer_disconnect_done */ -bool fromwire_connectd_peer_disconnect_done(const void *p UNNEEDED, struct node_id *id UNNEEDED, u64 *counter UNNEEDED) -{ fprintf(stderr, "fromwire_connectd_peer_disconnect_done called!\n"); abort(); } +/* Generated stub for fromwire_connectd_peer_disconnected */ +bool fromwire_connectd_peer_disconnected(const void *p UNNEEDED, struct node_id *id UNNEEDED, u64 *counter UNNEEDED) +{ fprintf(stderr, "fromwire_connectd_peer_disconnected called!\n"); abort(); } +/* Generated stub for fromwire_connectd_peer_reconnected */ +bool fromwire_connectd_peer_reconnected(const tal_t *ctx UNNEEDED, const void *p UNNEEDED, struct node_id *id UNNEEDED, u64 *prev_counter UNNEEDED, u64 *counter UNNEEDED, struct wireaddr_internal *addr UNNEEDED, struct wireaddr **remote_addr UNNEEDED, bool *incoming UNNEEDED, u8 **features UNNEEDED) +{ fprintf(stderr, "fromwire_connectd_peer_reconnected called!\n"); abort(); } /* Generated stub for fromwire_connectd_peer_spoke */ bool fromwire_connectd_peer_spoke(const tal_t *ctx UNNEEDED, const void *p UNNEEDED, struct node_id *id UNNEEDED, u64 *counter UNNEEDED, u16 *msgtype UNNEEDED, struct channel_id *channel_id UNNEEDED, wirestring **error UNNEEDED) { fprintf(stderr, "fromwire_connectd_peer_spoke called!\n"); abort(); } @@ -773,7 +776,7 @@ void notify_connect(struct lightningd *ld UNNEEDED, const struct wireaddr_internal *addr UNNEEDED) { fprintf(stderr, "notify_connect called!\n"); abort(); } /* Generated stub for notify_disconnect */ -void notify_disconnect(struct lightningd *ld UNNEEDED, struct node_id *nodeid UNNEEDED) +void notify_disconnect(struct lightningd *ld UNNEEDED, const struct node_id *nodeid UNNEEDED) { fprintf(stderr, "notify_disconnect called!\n"); abort(); } /* Generated stub for notify_forward_event */ void notify_forward_event(struct lightningd *ld UNNEEDED,