Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gossip store optimizations II: the gossipd parts #2545

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
486ce62
gossipd: use u32 for timestamp.
rustyrussell Apr 9, 2019
46323ea
gossip_store: avoid gratuitous copy on load.
rustyrussell Apr 10, 2019
267eef7
gossipd: embed broadcast information into each structure.
rustyrussell Apr 10, 2019
5867d45
gossipd: make gossip loading stats accurate.
rustyrussell Apr 10, 2019
d656746
gossipd: use file offset within store as broadcast index.
rustyrussell Apr 10, 2019
65ae60b
gossipd: gossip_store_get() to load a single store entry.
rustyrussell Apr 10, 2019
db2c21c
broadcast: don't keep payload pointer.
rustyrussell Apr 10, 2019
3248de1
gossipd: make routing_add_channel_update discard old timestamps.
rustyrussell Apr 10, 2019
47ce6e9
gossipd: handle duplicate nodes from unverified channel_announces pro…
rustyrussell Apr 10, 2019
8229aea
patch refine-test_gossip_persistence.patch
rustyrussell Apr 11, 2019
865433f
gossipd: don't create struct chan for yet-to-be-updated channels.
rustyrussell Apr 11, 2019
dbd903b
gossipd: don't use cached node_announcement for redundancy checking
rustyrussell Apr 11, 2019
f425e62
gossipd: free tmpctx children in gossip_store_load loop.
rustyrussell Apr 11, 2019
3769f66
gossipd: don't keep node_announcement messages in memory.
rustyrussell Apr 11, 2019
252f70a
gossipd: remove info fields from struct node.
rustyrussell Apr 11, 2019
b4eef30
gossipd: unify is_chan_public / is_chan_announced.
rustyrussell Apr 11, 2019
2f0f859
gossipd: don't keep channel_announcement messages in memory.
rustyrussell Apr 11, 2019
2368255
gossipd: remove some fields from struct chan.
rustyrussell Apr 11, 2019
843c54b
gossipd: allow reading from the store during load.
rustyrussell Apr 11, 2019
d2a8545
gossipd: use pread in the store.
rustyrussell Apr 11, 2019
118ab4a
gossipd: don't keep channel_updates in memory.
rustyrussell Apr 11, 2019
169207a
gossipd: adjust peers' broadcast_offset when compacting store.
rustyrussell Apr 11, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 0 additions & 6 deletions cli/test/run-large-input.c
Expand Up @@ -25,12 +25,6 @@ int test_printf(const char *format, ...);
#undef main

/* AUTOGENERATED MOCKS START */
/* Generated stub for amount_sat_eq */
bool amount_sat_eq(struct amount_sat a UNNEEDED, struct amount_sat b UNNEEDED)
{ fprintf(stderr, "amount_sat_eq called!\n"); abort(); }
/* Generated stub for amount_sat_less */
bool amount_sat_less(struct amount_sat a UNNEEDED, struct amount_sat b UNNEEDED)
{ fprintf(stderr, "amount_sat_less called!\n"); abort(); }
/* Generated stub for version_and_exit */
char *version_and_exit(const void *unused UNNEEDED)
{ fprintf(stderr, "version_and_exit called!\n"); abort(); }
Expand Down
143 changes: 91 additions & 52 deletions gossipd/broadcast.c
Expand Up @@ -8,88 +8,126 @@
#include <common/pseudorand.h>
#include <common/status.h>
#include <common/type_to_string.h>
#include <errno.h>
#include <gossipd/broadcast.h>
#include <gossipd/gossip_store.h>
#include <wire/gen_peer_wire.h>

struct queued_message {
/* Broadcast index. */
u64 index;

/* Timestamp, for filtering. */
u32 timestamp;

/* Serialized payload */
const u8 *payload;
};
static void destroy_broadcast_state(struct broadcast_state *bstate)
{
uintmap_clear(&bstate->broadcasts);
}

struct broadcast_state *new_broadcast_state(tal_t *ctx)
struct broadcast_state *new_broadcast_state(struct routing_state *rstate,
struct gossip_store *gs,
struct list_head *peers)
{
struct broadcast_state *bstate = tal(ctx, struct broadcast_state);
struct broadcast_state *bstate = tal(rstate, struct broadcast_state);
uintmap_init(&bstate->broadcasts);
/* Skip 0 because we initialize peers with 0 */
bstate->next_index = 1;
bstate->count = 0;
bstate->gs = gs;
bstate->peers = peers;
tal_add_destructor(bstate, destroy_broadcast_state);
return bstate;
}

void broadcast_del(struct broadcast_state *bstate, u64 index, const u8 *payload)
void broadcast_del(struct broadcast_state *bstate,
struct broadcastable *bcast)
{
const struct queued_message *q = uintmap_del(&bstate->broadcasts, index);
if (q != NULL) {
assert(q->payload == payload);
tal_free(q);
const struct broadcastable *b
= uintmap_del(&bstate->broadcasts, bcast->index);
if (b != NULL) {
assert(b == bcast);
bstate->count--;
broadcast_state_check(bstate, "broadcast_del");
bcast->index = 0;
}
}

static void destroy_queued_message(struct queued_message *msg,
struct broadcast_state *bstate)
static void add_broadcast(struct broadcast_state *bstate,
struct broadcastable *bcast)
{
broadcast_del(bstate, msg->index, msg->payload);
bstate->count--;
assert(bcast);
assert(bcast->index);
if (!uintmap_add(&bstate->broadcasts, bcast->index, bcast))
abort();
bstate->count++;
}

static struct queued_message *new_queued_message(const tal_t *ctx,
struct broadcast_state *bstate,
const u8 *payload,
u32 timestamp,
u64 index)
void insert_broadcast_nostore(struct broadcast_state *bstate,
struct broadcastable *bcast)
{
struct queued_message *msg = tal(ctx, struct queued_message);
assert(payload);
msg->payload = payload;
msg->index = index;
msg->timestamp = timestamp;
uintmap_add(&bstate->broadcasts, index, msg);
tal_add_destructor2(msg, destroy_queued_message, bstate);
bstate->count++;
return msg;
add_broadcast(bstate, bcast);
broadcast_state_check(bstate, "insert_broadcast");
}

u64 insert_broadcast(struct broadcast_state *bstate,
const u8 *payload, u32 timestamp)
void insert_broadcast(struct broadcast_state **bstate,
const u8 *msg,
struct broadcastable *bcast)
{
/* Free payload, free index. */
new_queued_message(payload, bstate, payload, timestamp,
bstate->next_index);
broadcast_state_check(bstate, "insert_broadcast");
return bstate->next_index++;
u32 offset;

/* If we're loading from the store, we already have index */
if (!bcast->index) {
u64 idx;

bcast->index = idx = gossip_store_add((*bstate)->gs, msg);
if (!idx)
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"Could not add to gossip store: %s",
strerror(errno));
/* We assume we can fit in 32 bits for now! */
assert(idx == bcast->index);
}

insert_broadcast_nostore(*bstate, bcast);

/* If it compacts, it replaces *bstate */
gossip_store_maybe_compact((*bstate)->gs, bstate, &offset);
if (offset)
update_peers_broadcast_index((*bstate)->peers, offset);
}

struct broadcastable *next_broadcast_raw(struct broadcast_state *bstate,
u32 *last_index)
{
struct broadcastable *b;
u64 idx = *last_index;

b = uintmap_after(&bstate->broadcasts, &idx);
if (!b)
return NULL;
/* Assert no overflow */
*last_index = idx;
assert(*last_index == idx);
return b;
}

const u8 *next_broadcast(struct broadcast_state *bstate,
const u8 *next_broadcast(const tal_t *ctx,
struct broadcast_state *bstate,
u32 timestamp_min, u32 timestamp_max,
u64 *last_index)
u32 *last_index)
{
struct queued_message *m;
struct broadcastable *b;

while ((m = uintmap_after(&bstate->broadcasts, last_index)) != NULL) {
if (m->timestamp >= timestamp_min
&& m->timestamp <= timestamp_max)
return m->payload;
while ((b = next_broadcast_raw(bstate, last_index)) != NULL) {
if (b->timestamp >= timestamp_min
&& b->timestamp <= timestamp_max) {
return gossip_store_get(ctx, bstate->gs, b->index);
}
}
return NULL;
}

u64 broadcast_final_index(const struct broadcast_state *bstate)
{
u64 idx;

if (!uintmap_last(&bstate->broadcasts, &idx))
return 0;
return idx;
}

#ifdef PEDANTIC
static const struct pubkey *
pubkey_keyof(const struct pubkey *pk)
Expand Down Expand Up @@ -134,7 +172,8 @@ struct broadcast_state *broadcast_state_check(struct broadcast_state *b,
struct pubkey node_id_1, node_id_2, bitcoin_key;
u32 timestamp, fees;
u16 flags, expiry;
u64 index = 0, htlc_minimum_msat;
u32 index = 0;
u64 htlc_minimum_msat;
struct pubkey_set pubkeys;
/* We actually only need a set, not a map. */
UINTMAP(u64 *) channels;
Expand Down
56 changes: 46 additions & 10 deletions gossipd/broadcast.h
Expand Up @@ -9,31 +9,67 @@

/* Common functionality to implement staggered broadcasts with replacement. */

struct routing_state;

/* This is nested inside a node, chan or half_chan; rewriting the store can
* cause it to change! */
struct broadcastable {
/* This is also the offset within the gossip_store; even with 1M
* channels we still have a factor of 8 before this wraps. */
u32 index;
u32 timestamp;
};

struct broadcast_state {
u64 next_index;
UINTMAP(struct queued_message *) broadcasts;
UINTMAP(struct broadcastable *) broadcasts;
size_t count;
struct gossip_store *gs;
struct list_head *peers;
};

struct broadcast_state *new_broadcast_state(tal_t *ctx);
static inline void broadcastable_init(struct broadcastable *bcast)
{
bcast->index = 0;
}

struct broadcast_state *new_broadcast_state(struct routing_state *rstate,
struct gossip_store *gs,
struct list_head *peers);

/* Append a queued message for broadcast. Freeing the msg will remove it. */
u64 insert_broadcast(struct broadcast_state *bstate, const u8 *msg,
u32 timestamp);
/* Append a queued message for broadcast. Must be explicitly deleted.
* Also adds it to the gossip store. */
void insert_broadcast(struct broadcast_state **bstate,
const u8 *msg,
struct broadcastable *bcast);

/* Manually delete a broadcast: not usually needed, since destructor does it */
void broadcast_del(struct broadcast_state *bstate, u64 index, const u8 *payload);
/* Add to broadcast, but not store: for gossip store compaction. */
void insert_broadcast_nostore(struct broadcast_state *bstate,
struct broadcastable *bcast);

/* Delete a broadcast: not usually needed, since destructor does it */
void broadcast_del(struct broadcast_state *bstate,
struct broadcastable *bcast);

/* Return the next broadcastable entry; doesn't load it. */
struct broadcastable *next_broadcast_raw(struct broadcast_state *bstate,
u32 *last_index);

/* Return the broadcast with index >= *last_index, timestamp >= min and <= max
* and update *last_index.
* There's no broadcast with index 0. */
const u8 *next_broadcast(struct broadcast_state *bstate,
const u8 *next_broadcast(const tal_t *ctx,
struct broadcast_state *bstate,
u32 timestamp_min, u32 timestamp_max,
u64 *last_index);
u32 *last_index);

/* index of last entry. */
u64 broadcast_final_index(const struct broadcast_state *bstate);

/* Returns b if all OK, otherwise aborts if abortstr non-NULL, otherwise returns
* NULL. */
struct broadcast_state *broadcast_state_check(struct broadcast_state *b,
const char *abortstr);

/* Callback for after we compacted the store */
void update_peers_broadcast_index(struct list_head *peers, u32 offset);
#endif /* LIGHTNING_GOSSIPD_BROADCAST_H */