Skip to content

Commit

Permalink
connectd: separate routine to inject message without closing connection.
Browse files Browse the repository at this point in the history
We will want this to send private channel_updates direct to peer.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
  • Loading branch information
rustyrussell committed Jan 31, 2024
1 parent dac8964 commit db6f0da
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 54 deletions.
18 changes: 9 additions & 9 deletions connectd/connectd.c
Original file line number Diff line number Diff line change
Expand Up @@ -1835,24 +1835,24 @@ static void start_shutdown(struct daemon *daemon, const u8 *msg)
take(towire_connectd_start_shutdown_reply(NULL)));
}

/* lightningd tells us to send a msg and disconnect. */
static void peer_final_msg(struct io_conn *conn,
/* lightningd tells us to send a msg. */
static void peer_send_msg(struct io_conn *conn,
struct daemon *daemon, const u8 *msg)
{
struct peer *peer;
struct node_id id;
u64 counter;
u8 *finalmsg;
u8 *sendmsg;

if (!fromwire_connectd_peer_final_msg(tmpctx, msg, &id, &counter,
&finalmsg))
master_badmsg(WIRE_CONNECTD_PEER_FINAL_MSG, msg);
if (!fromwire_connectd_peer_send_msg(tmpctx, msg, &id, &counter,
&sendmsg))
master_badmsg(WIRE_CONNECTD_PEER_SEND_MSG, msg);

/* This can happen if peer hung up on us (or wrong counter
* if it reconnected). */
peer = peer_htable_get(daemon->peers, &id);
if (peer && peer->counter == counter)
multiplex_final_msg(peer, take(finalmsg));
inject_peer_msg(peer, take(sendmsg));
}

static void dev_connect_memleak(struct daemon *daemon, const u8 *msg)
Expand Down Expand Up @@ -2078,8 +2078,8 @@ static struct io_plan *recv_req(struct io_conn *conn,
peer_discard(daemon, msg);
goto out;

case WIRE_CONNECTD_PEER_FINAL_MSG:
peer_final_msg(conn, daemon, msg);
case WIRE_CONNECTD_PEER_SEND_MSG:
peer_send_msg(conn, daemon, msg);
goto out;

case WIRE_CONNECTD_PING:
Expand Down
12 changes: 6 additions & 6 deletions connectd/connectd_wire.csv
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,12 @@ msgtype,connectd_discard_peer,2015
msgdata,connectd_discard_peer,id,node_id,
msgdata,connectd_discard_peer,counter,u64,

# master -> connectd: give message to peer and disconnect.
msgtype,connectd_peer_final_msg,2003
msgdata,connectd_peer_final_msg,id,node_id,
msgdata,connectd_peer_final_msg,counter,u64,
msgdata,connectd_peer_final_msg,len,u16,
msgdata,connectd_peer_final_msg,msg,u8,len
# master -> connectd: give message to peer.
msgtype,connectd_peer_send_msg,2003
msgdata,connectd_peer_send_msg,id,node_id,
msgdata,connectd_peer_send_msg,counter,u64,
msgdata,connectd_peer_send_msg,len,u16,
msgdata,connectd_peer_send_msg,msg,u8,len

# master -> connectd: do you have a memleak?
msgtype,connectd_dev_memleak,2033
Expand Down
9 changes: 2 additions & 7 deletions connectd/multiplex.c
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,6 @@ void inject_peer_msg(struct peer *peer, const u8 *msg TAKES)
msg_enqueue(peer->peer_outq, msg);
}

void multiplex_final_msg(struct peer *peer, const u8 *final_msg TAKES)
{
inject_peer_msg(peer, final_msg);
drain_peer(peer);
}

/* Send warning, close connection to peer */
static void send_warning(struct peer *peer, const char *fmt, ...)
{
Expand All @@ -187,7 +181,8 @@ static void send_warning(struct peer *peer, const char *fmt, ...)
msg = towire_warningfmtv(NULL, NULL, fmt, ap);
va_end(ap);

multiplex_final_msg(peer, take(msg));
inject_peer_msg(peer, take(msg));
drain_peer(peer);
}

/* Kicks off write_to_peer() to look for more gossip to send from store */
Expand Down
4 changes: 0 additions & 4 deletions connectd/multiplex.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ struct feature_set;
struct io_plan *multiplex_peer_setup(struct io_conn *peer_conn,
struct peer *peer);

/* Send this message to peer and disconnect. */
void multiplex_final_msg(struct peer *peer,
const u8 *final_msg TAKES);

/* Inject a message into the output stream. Unlike a raw msg_enqueue,
* this does io logging. */
void inject_peer_msg(struct peer *peer, const u8 *msg TAKES);
Expand Down
12 changes: 8 additions & 4 deletions lightningd/channel_control.c
Original file line number Diff line number Diff line change
Expand Up @@ -1108,10 +1108,14 @@ static void peer_got_shutdown(struct channel *channel, const u8 *msg)

/* Get connectd to send warning, and then allow reconnect. */
subd_send_msg(ld->connectd,
take(towire_connectd_peer_final_msg(NULL,
&channel->peer->id,
channel->peer->connectd_counter,
warning)));
take(towire_connectd_peer_send_msg(NULL,
&channel->peer->id,
channel->peer->connectd_counter,
warning)));
subd_send_msg(ld->connectd,
take(towire_connectd_discard_peer(NULL,
&channel->peer->id,
channel->peer->connectd_counter)));
channel_fail_transient(channel, true, "Bad shutdown scriptpubkey %s",
tal_hex(tmpctx, scriptpubkey));
return;
Expand Down
2 changes: 1 addition & 1 deletion lightningd/connect_control.c
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ static unsigned connectd_msg(struct subd *connectd, const u8 *msg, const int *fd
case WIRE_CONNECTD_DEV_MEMLEAK:
case WIRE_CONNECTD_DEV_SUPPRESS_GOSSIP:
case WIRE_CONNECTD_DEV_REPORT_FDS:
case WIRE_CONNECTD_PEER_FINAL_MSG:
case WIRE_CONNECTD_PEER_SEND_MSG:
case WIRE_CONNECTD_PEER_CONNECT_SUBD:
case WIRE_CONNECTD_PING:
case WIRE_CONNECTD_SEND_ONIONMSG:
Expand Down
12 changes: 8 additions & 4 deletions lightningd/dual_open_control.c
Original file line number Diff line number Diff line change
Expand Up @@ -1576,10 +1576,14 @@ static void handle_peer_wants_to_close(struct subd *dualopend,

/* Get connectd to send warning, and kill subd. */
subd_send_msg(ld->connectd,
take(towire_connectd_peer_final_msg(NULL,
&channel->peer->id,
channel->peer->connectd_counter,
warning)));
take(towire_connectd_peer_send_msg(NULL,
&channel->peer->id,
channel->peer->connectd_counter,
warning)));
subd_send_msg(ld->connectd,
take(towire_connectd_discard_peer(NULL,
&channel->peer->id,
channel->peer->connectd_counter)));
channel_fail_transient(channel, true, "Bad shutdown scriptpubkey %s",
tal_hex(tmpctx, scriptpubkey));
return;
Expand Down
42 changes: 29 additions & 13 deletions lightningd/peer_control.c
Original file line number Diff line number Diff line change
Expand Up @@ -1318,9 +1318,13 @@ static void connect_activate_subd(struct lightningd *ld, struct channel *channel
tal_hex(tmpctx, error));
/* Get connectd to send error and close. */
subd_send_msg(ld->connectd,
take(towire_connectd_peer_final_msg(NULL, &channel->peer->id,
channel->peer->connectd_counter,
error)));
take(towire_connectd_peer_send_msg(NULL, &channel->peer->id,
channel->peer->connectd_counter,
error)));
subd_send_msg(ld->connectd,
take(towire_connectd_discard_peer(NULL,
&channel->peer->id,
channel->peer->connectd_counter)));
}

static void peer_connected_hook_final(struct peer_connected_hook_payload *payload STEALS)
Expand Down Expand Up @@ -1367,12 +1371,16 @@ static void peer_connected_hook_final(struct peer_connected_hook_payload *payloa
/* Developer hack to fail all channels on permfail line. */
if (dev_disconnect_permanent(ld)) {
list_for_each(&peer->channels, channel, list) {
subd_send_msg(ld->connectd,
take(towire_connectd_peer_send_msg(NULL, &peer->id,
peer->connectd_counter,
channel->error)));
subd_send_msg(ld->connectd,
take(towire_connectd_discard_peer(NULL,
&peer->id,
peer->connectd_counter)));
channel_fail_permanent(channel, REASON_LOCAL,
"dev_disconnect permfail");
subd_send_msg(ld->connectd,
take(towire_connectd_peer_final_msg(NULL, &peer->id,
peer->connectd_counter,
channel->error)));
}
return;
}
Expand All @@ -1394,9 +1402,13 @@ static void peer_connected_hook_final(struct peer_connected_hook_payload *payloa
tal_hex(tmpctx, error));
/* Get connectd to send error and close. */
subd_send_msg(ld->connectd,
take(towire_connectd_peer_final_msg(NULL, &peer->id,
peer->connectd_counter,
error)));
take(towire_connectd_peer_send_msg(NULL, &peer->id,
peer->connectd_counter,
error)));
subd_send_msg(ld->connectd,
take(towire_connectd_discard_peer(NULL,
&peer->id,
peer->connectd_counter)));
}

static bool
Expand Down Expand Up @@ -1764,9 +1776,13 @@ void peer_spoke(struct lightningd *ld, const u8 *msg)
tal_hex(tmpctx, error));
/* Get connectd to send error and close. */
subd_send_msg(ld->connectd,
take(towire_connectd_peer_final_msg(NULL, &peer->id,
peer->connectd_counter,
error)));
take(towire_connectd_peer_send_msg(NULL, &peer->id,
peer->connectd_counter,
error)));
subd_send_msg(ld->connectd,
take(towire_connectd_discard_peer(NULL,
&peer->id,
peer->connectd_counter)));
return;

tell_connectd:
Expand Down
9 changes: 6 additions & 3 deletions lightningd/test/run-invoice-select-inchan.c
Original file line number Diff line number Diff line change
Expand Up @@ -930,12 +930,15 @@ u8 *towire_channeld_dev_memleak(const tal_t *ctx UNNEEDED)
/* Generated stub for towire_channeld_dev_reenable_commit */
u8 *towire_channeld_dev_reenable_commit(const tal_t *ctx UNNEEDED)
{ fprintf(stderr, "towire_channeld_dev_reenable_commit called!\n"); abort(); }
/* Generated stub for towire_connectd_discard_peer */
u8 *towire_connectd_discard_peer(const tal_t *ctx UNNEEDED, const struct node_id *id UNNEEDED, u64 counter UNNEEDED)
{ fprintf(stderr, "towire_connectd_discard_peer called!\n"); abort(); }
/* Generated stub for towire_connectd_peer_connect_subd */
u8 *towire_connectd_peer_connect_subd(const tal_t *ctx UNNEEDED, const struct node_id *id UNNEEDED, u64 counter UNNEEDED, const struct channel_id *channel_id UNNEEDED)
{ fprintf(stderr, "towire_connectd_peer_connect_subd called!\n"); abort(); }
/* Generated stub for towire_connectd_peer_final_msg */
u8 *towire_connectd_peer_final_msg(const tal_t *ctx UNNEEDED, const struct node_id *id UNNEEDED, u64 counter UNNEEDED, const u8 *msg UNNEEDED)
{ fprintf(stderr, "towire_connectd_peer_final_msg called!\n"); abort(); }
/* Generated stub for towire_connectd_peer_send_msg */
u8 *towire_connectd_peer_send_msg(const tal_t *ctx UNNEEDED, const struct node_id *id UNNEEDED, u64 counter UNNEEDED, const u8 *msg UNNEEDED)
{ fprintf(stderr, "towire_connectd_peer_send_msg called!\n"); abort(); }
/* Generated stub for towire_dualopend_dev_memleak */
u8 *towire_dualopend_dev_memleak(const tal_t *ctx UNNEEDED)
{ fprintf(stderr, "towire_dualopend_dev_memleak called!\n"); abort(); }
Expand Down
9 changes: 6 additions & 3 deletions wallet/test/run-wallet.c
Original file line number Diff line number Diff line change
Expand Up @@ -993,12 +993,15 @@ u8 *towire_channeld_offer_htlc(const tal_t *ctx UNNEEDED, struct amount_msat amo
/* Generated stub for towire_channeld_sending_commitsig_reply */
u8 *towire_channeld_sending_commitsig_reply(const tal_t *ctx UNNEEDED)
{ fprintf(stderr, "towire_channeld_sending_commitsig_reply called!\n"); abort(); }
/* Generated stub for towire_connectd_discard_peer */
u8 *towire_connectd_discard_peer(const tal_t *ctx UNNEEDED, const struct node_id *id UNNEEDED, u64 counter UNNEEDED)
{ fprintf(stderr, "towire_connectd_discard_peer called!\n"); abort(); }
/* Generated stub for towire_connectd_peer_connect_subd */
u8 *towire_connectd_peer_connect_subd(const tal_t *ctx UNNEEDED, const struct node_id *id UNNEEDED, u64 counter UNNEEDED, const struct channel_id *channel_id UNNEEDED)
{ fprintf(stderr, "towire_connectd_peer_connect_subd called!\n"); abort(); }
/* Generated stub for towire_connectd_peer_final_msg */
u8 *towire_connectd_peer_final_msg(const tal_t *ctx UNNEEDED, const struct node_id *id UNNEEDED, u64 counter UNNEEDED, const u8 *msg UNNEEDED)
{ fprintf(stderr, "towire_connectd_peer_final_msg called!\n"); abort(); }
/* Generated stub for towire_connectd_peer_send_msg */
u8 *towire_connectd_peer_send_msg(const tal_t *ctx UNNEEDED, const struct node_id *id UNNEEDED, u64 counter UNNEEDED, const u8 *msg UNNEEDED)
{ fprintf(stderr, "towire_connectd_peer_send_msg called!\n"); abort(); }
/* Generated stub for towire_dualopend_dev_memleak */
u8 *towire_dualopend_dev_memleak(const tal_t *ctx UNNEEDED)
{ fprintf(stderr, "towire_dualopend_dev_memleak called!\n"); abort(); }
Expand Down

0 comments on commit db6f0da

Please sign in to comment.