Skip to content

Commit

Permalink
network: Use a byte size for the memory cache limit instead of age
Browse files Browse the repository at this point in the history
Instead of pruning items from the in-memory cache when they reach a
certain age, they are now pruned if the cache reaches a certain size.
Everytime an in-memory item is accessed it is moved to the front of
the list so that only the least-recently accessed ones are removed.

The advantage of this is that it is less likely to fill up the memory.
For version 3 of the protocol the old approach won’t be possible
anyway because it’s not possible to know the age of an object, only
when it will expire.
  • Loading branch information
bpeel committed Nov 30, 2017
1 parent 99cf919 commit 76381db
Showing 1 changed file with 112 additions and 49 deletions.
161 changes: 112 additions & 49 deletions src/ntb-network.c
Expand Up @@ -58,10 +58,12 @@ ntb_network_error;
* from talking to any one else. */
#define NTB_NETWORK_NUM_OUTGOING_PEERS 8

/* If an object is older than this in seconds then we won't bother
* keeping it in memory. It will need to be retrieved from disk if
* something requests it */
#define NTB_NETWORK_INV_CACHE_AGE (10 * 60)
/* Amount of memory to use in bytes for in-memory cached objects. If
* this gets full then the least-recently used ones will be removed
* from memory. If they are needed again they will have to requested
* from the disk cache. */
#define NTB_NETWORK_MEMORY_CACHE_SIZE (64 * 1024 * 1024)

/* If any objects claim to be created this far in the future then
* we'll ignore them */
#define NTB_NETWORK_INV_FUTURE_AGE (30 * 60)
Expand Down Expand Up @@ -188,7 +190,10 @@ struct ntb_network {

struct ntb_hash_table *inventory_hash;

struct ntb_list accepted_inventories;
size_t in_memory_cache_size;

struct ntb_list in_memory_inventories;
struct ntb_list on_disk_inventories;
struct ntb_list rejected_inventories;
int n_rejected_inventories;

Expand All @@ -211,10 +216,14 @@ enum ntb_network_inv_state {
* we don't care about, such as those whose proof-of-work is
* too low or that have a bad time stamp */
NTB_NETWORK_INV_STATE_REJECTED,
/* Accepted objects are those that we are willing to
* distribute. These will either be in memory or on the disk
* cache */
NTB_NETWORK_INV_STATE_ACCEPTED

/* The next two states represent those that we are willing to
* distribute */

/* Stored in the disk cache and will need to be retrieved */
NTB_NETWORK_INV_STATE_ON_DISK,
/* Stored in the in-memory cache */
NTB_NETWORK_INV_STATE_IN_MEMORY
};

struct ntb_network_inventory {
Expand Down Expand Up @@ -340,9 +349,12 @@ static void
free_inventory(struct ntb_network_inventory *inv)
{
switch (inv->state) {
case NTB_NETWORK_INV_STATE_ACCEPTED:
if (inv->blob)
ntb_blob_unref(inv->blob);
case NTB_NETWORK_INV_STATE_IN_MEMORY:
assert(inv->blob);
ntb_blob_unref(inv->blob);
break;
case NTB_NETWORK_INV_STATE_ON_DISK:
assert(inv->blob == NULL);
break;
case NTB_NETWORK_INV_STATE_STUB:
case NTB_NETWORK_INV_STATE_REJECTED:
Expand Down Expand Up @@ -722,23 +734,33 @@ send_addresses(struct ntb_network *nw,
}

static void
send_inventory(struct ntb_network *nw,
struct ntb_network_peer *peer)
send_inventory_list(int64_t now,
struct ntb_list *list,
struct ntb_network_peer *peer)
{
struct ntb_network_inventory *inv;
int64_t now = ntb_main_context_get_wall_clock(NULL);
int64_t age;

ntb_connection_begin_inv(peer->connection);

ntb_list_for_each(inv, &nw->accepted_inventories, link) {
ntb_list_for_each(inv, list, link) {
age = now - inv->timestamp;

if (age >= ntb_proto_get_max_age_for_type(inv->type))
continue;

ntb_connection_add_inv_hash(peer->connection, inv->hash);
}
}

static void
send_inventory(struct ntb_network *nw,
struct ntb_network_peer *peer)
{
int64_t now = ntb_main_context_get_wall_clock(NULL);

ntb_connection_begin_inv(peer->connection);

send_inventory_list(now, &nw->in_memory_inventories, peer);
send_inventory_list(now, &nw->on_disk_inventories, peer);

ntb_connection_end_inv(peer->connection);
}
Expand Down Expand Up @@ -967,6 +989,19 @@ handle_addr(struct ntb_network *nw,
return true;
}

static void
use_inventory(struct ntb_network *nw,
struct ntb_network_inventory *inv)
{
if (inv->state != NTB_NETWORK_INV_STATE_IN_MEMORY)
return;

/* Move the item to the beginning of the list to make it the
* last thing to be removed when the cache is full. */
ntb_list_remove(&inv->link);
ntb_list_insert(&nw->in_memory_inventories, &inv->link);
}

static bool
handle_getdata(struct ntb_network *nw,
struct ntb_network_peer *peer,
Expand All @@ -980,6 +1015,7 @@ handle_getdata(struct ntb_network *nw,
event->hashes + i *
NTB_PROTO_HASH_LENGTH);
if (inv && inv->state != NTB_NETWORK_INV_STATE_REJECTED) {
use_inventory(nw, inv);
ntb_connection_send_blob(peer->connection,
inv->hash,
inv->blob);
Expand Down Expand Up @@ -1065,6 +1101,40 @@ reject_inventory(struct ntb_network *nw,
ntb_list_insert(&nw->rejected_inventories, &inv->link);
}

static void
add_in_memory_inventory(struct ntb_network *nw,
struct ntb_network_inventory *inv)
{
struct ntb_network_inventory *last_inv;

assert(inv->blob);

nw->in_memory_cache_size += inv->blob->size;
ntb_list_insert(&nw->in_memory_inventories, &inv->link);
inv->state = NTB_NETWORK_INV_STATE_IN_MEMORY;

/* Prune the cache. Keep removing the last entry (ie, the
* least-recently used) until the cache size is again below
* the threshold or there is only one item left. */
while (nw->in_memory_cache_size > NTB_NETWORK_MEMORY_CACHE_SIZE) {
last_inv = ntb_container_of(nw->in_memory_inventories.prev,
struct ntb_network_inventory,
link);

if (last_inv == inv)
break;

assert(last_inv->blob);

nw->in_memory_cache_size -= last_inv->blob->size;
ntb_blob_unref(last_inv->blob);
last_inv->blob = NULL;
ntb_list_remove(&last_inv->link);
ntb_list_insert(&nw->on_disk_inventories, &last_inv->link);
last_inv->state = NTB_NETWORK_INV_STATE_ON_DISK;
}
}

static void
add_object(struct ntb_network *nw,
enum ntb_proto_inv_type type,
Expand Down Expand Up @@ -1138,26 +1208,15 @@ add_object(struct ntb_network *nw,

ntb_store_save_blob(NULL, hash, inv->blob);

ntb_list_insert(&nw->accepted_inventories, &inv->link);
add_in_memory_inventory(nw, inv);
inv->type = type;
inv->state = NTB_NETWORK_INV_STATE_ACCEPTED;

if ((flags & NTB_NETWORK_DELAY))
broadcast_delayed_inv(nw, hash);
else
broadcast_inv(nw, hash);

ntb_signal_emit(&nw->new_object_signal, inv->blob);

/* If the blob is not quite new then we won't bother
* keeping it in memory under the assumption that's
* less likely that a peer will request it. If
* something does request it we'll have to load it
* from disk */
if (age >= NTB_NETWORK_INV_CACHE_AGE) {
ntb_blob_unref(inv->blob);
inv->blob = NULL;
}
}
}

Expand Down Expand Up @@ -1263,10 +1322,12 @@ gc_inventories(struct ntb_network *nw,

ntb_list_for_each_safe(inv, tmp, list, link) {
age = now - inv->timestamp;
if (inv->state == NTB_NETWORK_INV_STATE_ACCEPTED)
if (inv->state == NTB_NETWORK_INV_STATE_ON_DISK ||
inv->state == NTB_NETWORK_INV_STATE_IN_MEMORY) {
type = inv->type;
else
} else {
type = NTB_PROTO_INV_TYPE_MSG;
}

if (age <= -NTB_NETWORK_INV_FUTURE_AGE ||
age >= (ntb_proto_get_max_age_for_type(type) +
Expand All @@ -1278,11 +1339,6 @@ gc_inventories(struct ntb_network *nw,
ntb_list_remove(&inv->link);
ntb_hash_table_remove(nw->inventory_hash, inv);
free_inventory(inv);
} else if (age >= NTB_NETWORK_INV_CACHE_AGE &&
inv->blob &&
inv->state != NTB_NETWORK_INV_STATE_REJECTED) {
ntb_blob_unref(inv->blob);
inv->blob = NULL;
}
}
}
Expand Down Expand Up @@ -1311,7 +1367,8 @@ gc_timeout_cb(struct ntb_main_context_source *source,
ntb_list_for_each(peer, &nw->peers, link)
gc_requested_inventories(nw, peer);

gc_inventories(nw, &nw->accepted_inventories);
gc_inventories(nw, &nw->in_memory_inventories);
gc_inventories(nw, &nw->on_disk_inventories);
gc_inventories(nw, &nw->rejected_inventories);

gc_addrs(nw);
Expand Down Expand Up @@ -1371,9 +1428,9 @@ store_for_each_blob_cb(enum ntb_proto_inv_type type,
inv->timestamp = timestamp;
inv->blob = NULL;
ntb_hash_table_set(nw->inventory_hash, inv);
ntb_list_insert(&nw->accepted_inventories, &inv->link);
ntb_list_insert(&nw->on_disk_inventories, &inv->link);
inv->type = type;
inv->state = NTB_NETWORK_INV_STATE_ACCEPTED;
inv->state = NTB_NETWORK_INV_STATE_ON_DISK;
}

static void
Expand Down Expand Up @@ -1532,8 +1589,10 @@ ntb_network_new(bool add_default_nodes)
ntb_list_init(&nw->listen_sockets);
ntb_list_init(&nw->peers);
ntb_list_init(&nw->addrs);
ntb_list_init(&nw->accepted_inventories);
ntb_list_init(&nw->on_disk_inventories);
ntb_list_init(&nw->in_memory_inventories);
ntb_list_init(&nw->rejected_inventories);
nw->in_memory_cache_size = 0;
nw->n_rejected_inventories = 0;
ntb_list_init(&nw->delayed_broadcasts);

Expand Down Expand Up @@ -1714,18 +1773,21 @@ ntb_network_get_object(struct ntb_network *nw,

inv = ntb_hash_table_get(nw->inventory_hash, hash);

if (inv == NULL ||
inv->state != NTB_NETWORK_INV_STATE_ACCEPTED)
if (inv == NULL)
return NTB_NETWORK_OBJECT_LOCATION_NOWHERE;

if (inv->blob) {
if (blob)
*blob = inv->blob;
return NTB_NETWORK_OBJECT_LOCATION_MEMORY;
} else {
switch (inv->state) {
case NTB_NETWORK_INV_STATE_ON_DISK:
if (blob)
*blob = NULL;
return NTB_NETWORK_OBJECT_LOCATION_STORE;
case NTB_NETWORK_INV_STATE_IN_MEMORY:
assert(inv->blob);
if (blob)
*blob = inv->blob;
return NTB_NETWORK_OBJECT_LOCATION_MEMORY;
default:
return NTB_NETWORK_OBJECT_LOCATION_NOWHERE;
}
}

Expand Down Expand Up @@ -1789,7 +1851,8 @@ ntb_network_free(struct ntb_network *nw)
free_peers(nw);
free_addrs(nw);
free_listen_sockets(nw);
free_inventories_in_list(&nw->accepted_inventories);
free_inventories_in_list(&nw->in_memory_inventories);
free_inventories_in_list(&nw->on_disk_inventories);
free_inventories_in_list(&nw->rejected_inventories);

free_delayed_broadcasts(&nw->delayed_broadcasts);
Expand Down

0 comments on commit 76381db

Please sign in to comment.