Skip to content
Permalink
Browse files

gossipd: adjust peers' broadcast_offset when compacting store.

When we compact the store, we need to adjust the broadast index for
peers so they know where they're up to.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
  • Loading branch information...
rustyrussell committed Apr 11, 2019
1 parent 118ab4a commit 169207a27eeddc15fd4da416b97d411a4cdef88d
@@ -25,12 +25,6 @@ int test_printf(const char *format, ...);
#undef main

/* AUTOGENERATED MOCKS START */
/* Generated stub for amount_sat_eq */
bool amount_sat_eq(struct amount_sat a UNNEEDED, struct amount_sat b UNNEEDED)
{ fprintf(stderr, "amount_sat_eq called!\n"); abort(); }
/* Generated stub for amount_sat_less */
bool amount_sat_less(struct amount_sat a UNNEEDED, struct amount_sat b UNNEEDED)
{ fprintf(stderr, "amount_sat_less called!\n"); abort(); }
/* Generated stub for version_and_exit */
char *version_and_exit(const void *unused UNNEEDED)
{ fprintf(stderr, "version_and_exit called!\n"); abort(); }
@@ -19,12 +19,14 @@ static void destroy_broadcast_state(struct broadcast_state *bstate)
}

struct broadcast_state *new_broadcast_state(struct routing_state *rstate,
struct gossip_store *gs)
struct gossip_store *gs,
struct list_head *peers)
{
struct broadcast_state *bstate = tal(rstate, struct broadcast_state);
uintmap_init(&bstate->broadcasts);
bstate->count = 0;
bstate->gs = gs;
bstate->peers = peers;
tal_add_destructor(bstate, destroy_broadcast_state);
return bstate;
}
@@ -63,6 +65,8 @@ void insert_broadcast(struct broadcast_state **bstate,
const u8 *msg,
struct broadcastable *bcast)
{
u32 offset;

/* If we're loading from the store, we already have index */
if (!bcast->index) {
u64 idx;
@@ -79,7 +83,9 @@ void insert_broadcast(struct broadcast_state **bstate,
insert_broadcast_nostore(*bstate, bcast);

/* If it compacts, it replaces *bstate */
gossip_store_maybe_compact((*bstate)->gs, bstate);
gossip_store_maybe_compact((*bstate)->gs, bstate, &offset);
if (offset)
update_peers_broadcast_index((*bstate)->peers, offset);
}

struct broadcastable *next_broadcast_raw(struct broadcast_state *bstate,
@@ -24,6 +24,7 @@ struct broadcast_state {
UINTMAP(struct broadcastable *) broadcasts;
size_t count;
struct gossip_store *gs;
struct list_head *peers;
};

static inline void broadcastable_init(struct broadcastable *bcast)
@@ -32,7 +33,8 @@ static inline void broadcastable_init(struct broadcastable *bcast)
}

struct broadcast_state *new_broadcast_state(struct routing_state *rstate,
struct gossip_store *gs);
struct gossip_store *gs,
struct list_head *peers);

/* Append a queued message for broadcast. Must be explicitly deleted.
* Also adds it to the gossip store. */
@@ -68,4 +70,6 @@ u64 broadcast_final_index(const struct broadcast_state *bstate);
struct broadcast_state *broadcast_state_check(struct broadcast_state *b,
const char *abortstr);

/* Callback for after we compacted the store */
void update_peers_broadcast_index(struct list_head *peers, u32 offset);
#endif /* LIGHTNING_GOSSIPD_BROADCAST_H */
@@ -253,9 +253,13 @@ static bool add_local_unnannounced(int in_fd, int out_fd,
*
* Creates a new file, writes all the updates from the `broadcast_state`, and
* then atomically swaps the files.
*
* Returns the amount of shrinkage in @offset on success, otherwise @offset
* is unchanged.
*/
bool gossip_store_compact(struct gossip_store *gs,
struct broadcast_state **bs)
struct broadcast_state **bs,
u32 *offset)
{
size_t count = 0;
int fd;
@@ -274,7 +278,7 @@ bool gossip_store_compact(struct gossip_store *gs,
"Compacting gossip_store with %zu entries, %zu of which are stale",
gs->count, gs->count - oldb->count);

newb = new_broadcast_state(gs->rstate, gs);
newb = new_broadcast_state(gs->rstate, gs, oldb->peers);
fd = open(GOSSIP_STORE_TEMP_FILENAME, O_RDWR|O_APPEND|O_CREAT, 0600);

if (fd < 0) {
@@ -349,6 +353,7 @@ bool gossip_store_compact(struct gossip_store *gs,
"Compaction completed: dropped %zu messages, new count %zu, len %"PRIu64,
gs->count - count, count, len);
gs->count = count;
*offset = gs->len - len;
gs->len = len;
close(gs->fd);
gs->fd = fd;
@@ -368,8 +373,11 @@ bool gossip_store_compact(struct gossip_store *gs,
}

void gossip_store_maybe_compact(struct gossip_store *gs,
struct broadcast_state **bs)
struct broadcast_state **bs,
u32 *offset)
{
*offset = 0;

/* Don't compact while loading! */
if (!gs->writable)
return;
@@ -378,7 +386,7 @@ void gossip_store_maybe_compact(struct gossip_store *gs,
if (gs->count < (*bs)->count * 1.25)
return;

gossip_store_compact(gs, bs);
gossip_store_compact(gs, bs, offset);
}

u64 gossip_store_add(struct gossip_store *gs, const u8 *gossip_msg)
@@ -51,12 +51,20 @@ const u8 *gossip_store_get(const tal_t *ctx,
* If we need to compact the gossip store, do so.
* @gs: the gossip store.
* @bs: a pointer to the broadcast state: replaced if we compact it.
* @offset: the change in the store, if any.
*
* If @offset is non-zero on return, caller must update peers.
*/
void gossip_store_maybe_compact(struct gossip_store *gs,
struct broadcast_state **bs);
struct broadcast_state **bs,
u32 *offset);


/* Expose for dev-compact-gossip-store to force compaction. */
bool gossip_store_compact(struct gossip_store *gs,
struct broadcast_state **bs);
struct broadcast_state **bs,
u32 *offset);

/* Callback for when gossip_store indexes move */

#endif /* LIGHTNING_GOSSIPD_GOSSIP_STORE_H */
@@ -695,6 +695,21 @@ static u8 *handle_gossip_timestamp_filter(struct peer *peer, const u8 *msg)
return NULL;
}

/*~ When we compact the gossip store, all the broadcast indexs move.
* We simply offset everyone, which means in theory they could retransmit
* some, but that's a lesser evil than skipping some. */
void update_peers_broadcast_index(struct list_head *peers, u32 offset)
{
struct peer *peer;

list_for_each(peers, peer, list) {
if (peer->broadcast_index < offset)
peer->broadcast_index = 0;
else
peer->broadcast_index -= offset;
}
}

/*~ We can send multiple replies when the peer queries for all channels in
* a given range of blocks; each one indicates the range of blocks it covers. */
static void reply_channel_range(struct peer *peer,
@@ -1956,6 +1971,7 @@ static struct io_plan *gossip_init(struct io_conn *conn,
chainparams_by_chainhash(&daemon->chain_hash),
&daemon->id,
update_channel_interval * 2,
&daemon->peers,
dev_gossip_time,
dev_unknown_channel_satoshis);

@@ -2565,8 +2581,15 @@ static struct io_plan *dev_compact_store(struct io_conn *conn,
struct daemon *daemon,
const u8 *msg)
{
u32 offset;
bool done = gossip_store_compact(daemon->rstate->broadcasts->gs,
&daemon->rstate->broadcasts);
&daemon->rstate->broadcasts,
&offset);

/* Peers keep an offset into where they are with gossip. */
if (done)
update_peers_broadcast_index(&daemon->peers, offset);

daemon_conn_send(daemon->master,
take(towire_gossip_dev_compact_store_reply(NULL,
done)));
@@ -181,12 +181,14 @@ struct routing_state *new_routing_state(const tal_t *ctx,
const struct chainparams *chainparams,
const struct node_id *local_id,
u32 prune_timeout,
struct list_head *peers,
const u32 *dev_gossip_time,
const struct amount_sat *dev_unknown_channel_satoshis)
{
struct routing_state *rstate = tal(ctx, struct routing_state);
rstate->nodes = empty_node_map(rstate);
rstate->broadcasts = new_broadcast_state(rstate, gossip_store_new(rstate));
rstate->broadcasts
= new_broadcast_state(rstate, gossip_store_new(rstate), peers);
rstate->chainparams = chainparams;
rstate->local_id = *local_id;
rstate->prune_timeout = prune_timeout;
@@ -226,6 +226,7 @@ struct routing_state *new_routing_state(const tal_t *ctx,
const struct chainparams *chainparams,
const struct node_id *local_id,
u32 prune_timeout,
struct list_head *peers,
const u32 *dev_gossip_time,
const struct amount_sat *dev_unknown_channel_satoshis);

@@ -97,6 +97,9 @@ u8 *towire_gossip_store_local_add_channel(const tal_t *ctx UNNEEDED, const u8 *l
/* Generated stub for towire_gossip_store_node_announcement */
u8 *towire_gossip_store_node_announcement(const tal_t *ctx UNNEEDED, const u8 *announcement UNNEEDED)
{ fprintf(stderr, "towire_gossip_store_node_announcement called!\n"); abort(); }
/* Generated stub for update_peers_broadcast_index */
void update_peers_broadcast_index(struct list_head *peers UNNEEDED, u32 offset UNNEEDED)
{ fprintf(stderr, "update_peers_broadcast_index called!\n"); abort(); }
/* Generated stub for wire_type_name */
const char *wire_type_name(int e UNNEEDED)
{ fprintf(stderr, "wire_type_name called!\n"); abort(); }
@@ -214,7 +217,7 @@ int main(int argc, char *argv[])
setup_tmpctx();

me = nodeid(0);
rstate = new_routing_state(tmpctx, NULL, &me, 0, NULL, NULL);
rstate = new_routing_state(tmpctx, NULL, &me, 0, NULL, NULL, NULL);
opt_register_noarg("--perfme", opt_set_bool, &perfme,
"Run perfme-start and perfme-stop around benchmark");

@@ -86,6 +86,9 @@ u8 *towire_gossip_store_local_add_channel(const tal_t *ctx UNNEEDED, const u8 *l
/* Generated stub for towire_gossip_store_node_announcement */
u8 *towire_gossip_store_node_announcement(const tal_t *ctx UNNEEDED, const u8 *announcement UNNEEDED)
{ fprintf(stderr, "towire_gossip_store_node_announcement called!\n"); abort(); }
/* Generated stub for update_peers_broadcast_index */
void update_peers_broadcast_index(struct list_head *peers UNNEEDED, u32 offset UNNEEDED)
{ fprintf(stderr, "update_peers_broadcast_index called!\n"); abort(); }
/* Generated stub for wire_type_name */
const char *wire_type_name(int e UNNEEDED)
{ fprintf(stderr, "wire_type_name called!\n"); abort(); }
@@ -171,7 +174,7 @@ int main(void)
strlen("02cca6c5c966fcf61d121e3a70e03a1cd9eeeea024b26ea666ce974d43b242e636"),
&d);

rstate = new_routing_state(tmpctx, NULL, &a, 0, NULL, NULL);
rstate = new_routing_state(tmpctx, NULL, &a, 0, NULL, NULL, NULL);

/* [{'active': True, 'short_id': '6990:2:1/1', 'fee_per_kw': 10, 'delay': 5, 'message_flags': 0, 'channel_flags': 1, 'destination': '0230ad0e74ea03976b28fda587bb75bdd357a1938af4424156a18265167f5e40ae', 'source': '02ea622d5c8d6143f15ed3ce1d501dd0d3d09d3b1c83a44d0034949f8a9ab60f06', 'last_update': 1504064344}, */

@@ -84,6 +84,9 @@ u8 *towire_gossip_store_local_add_channel(const tal_t *ctx UNNEEDED, const u8 *l
/* Generated stub for towire_gossip_store_node_announcement */
u8 *towire_gossip_store_node_announcement(const tal_t *ctx UNNEEDED, const u8 *announcement UNNEEDED)
{ fprintf(stderr, "towire_gossip_store_node_announcement called!\n"); abort(); }
/* Generated stub for update_peers_broadcast_index */
void update_peers_broadcast_index(struct list_head *peers UNNEEDED, u32 offset UNNEEDED)
{ fprintf(stderr, "update_peers_broadcast_index called!\n"); abort(); }
/* Generated stub for wire_type_name */
const char *wire_type_name(int e UNNEEDED)
{ fprintf(stderr, "wire_type_name called!\n"); abort(); }
@@ -205,7 +208,7 @@ int main(void)

memset(&tmp, 'a', sizeof(tmp));
node_id_from_privkey(&tmp, &a);
rstate = new_routing_state(tmpctx, NULL, &a, 0, NULL, NULL);
rstate = new_routing_state(tmpctx, NULL, &a, 0, NULL, NULL, NULL);

new_node(rstate, &a);

0 comments on commit 169207a

Please sign in to comment.
You can’t perform that action at this time.