diff --git a/ccan/README b/ccan/README index 022df6bf65a6..8b589bc9eaae 100644 --- a/ccan/README +++ b/ccan/README @@ -1,3 +1,3 @@ CCAN imported from http://ccodearchive.net. -CCAN version: init-2605-gc47bf0d9 +CCAN version: init-2606-g5f219f03 diff --git a/ccan/ccan/io/poll.c b/ccan/ccan/io/poll.c index 7fe9e2c5e0bb..656cc0e3a33c 100644 --- a/ccan/ccan/io/poll.c +++ b/ccan/ccan/io/poll.c @@ -373,6 +373,8 @@ static void restore_pollfds(void) void *io_loop(struct timers *timers, struct timer **expired) { void *ret; + /* This ensures we don't always service lower fds first */ + static int fairness_counter; /* if timers is NULL, expired must be. If not, not. */ assert(!timers == !expired); @@ -384,17 +386,12 @@ void *io_loop(struct timers *timers, struct timer **expired) while (!io_loop_return) { int i, r, ms_timeout = -1; - if (handle_always()) { - /* Could have started/finished more. */ - continue; - } - /* Everything closed? */ if (num_fds == 0) break; /* You can't tell them all to go to sleep! */ - assert(num_waiting); + assert(num_waiting || num_always); if (timers) { struct timemono now, first; @@ -417,6 +414,10 @@ void *io_loop(struct timers *timers, struct timer **expired) } } + /* Don't wait if we have always requests pending! */ + if (num_always != 0) + ms_timeout = 0; + /* We do this temporarily, assuming exclusive is unusual */ exclude_pollfds(); r = pollfn(pollfds, num_fds, ms_timeout); @@ -430,15 +431,29 @@ void *io_loop(struct timers *timers, struct timer **expired) break; } - for (i = 0; i < num_fds && !io_loop_return; i++) { - struct io_conn *c = (void *)fds[i]; - int events = pollfds[i].revents; + fairness_counter++; + for (size_t rotation = 0; rotation < num_fds && !io_loop_return; rotation++) { + struct io_conn *c; + int events; + + i = (rotation + fairness_counter) % num_fds; + c = (void *)fds[i]; /* Clear so we don't get confused if exclusive next time */ + events = pollfds[i].revents; pollfds[i].revents = 0; - if (r == 0) + /* Timeout? */ + if (r == 0) { + handle_always(); break; + } + + /* We interleave always before the first fd */ + if (i == 0 && handle_always()) { + /* Could have started/finished more. */ + break; + } if (fds[i]->listener) { struct io_listener *l = (void *)fds[i]; diff --git a/common/json_parse_simple.c b/common/json_parse_simple.c index f3b3da808f39..348be9ef6976 100644 --- a/common/json_parse_simple.c +++ b/common/json_parse_simple.c @@ -486,24 +486,24 @@ bool json_parse_input(jsmn_parser *parser, again: ret = jsmn_parse(parser, input, len, *toks, tal_count(*toks) - 1); - - switch (ret) { - case JSMN_ERROR_INVAL: + if (ret == JSMN_ERROR_INVAL) return false; - case JSMN_ERROR_NOMEM: - tal_resize(toks, tal_count(*toks) * 2); - goto again; - } /* Check whether we read at least one full root element, i.e., root * element has its end set. */ if ((*toks)[0].type == JSMN_UNDEFINED || (*toks)[0].end == -1) { + /* If it ran out of tokens, provide more. */ + if (ret == JSMN_ERROR_NOMEM) { + tal_resize(toks, tal_count(*toks) * 2); + goto again; + } + /* Otherwise, must be incomplete */ *complete = false; return true; } /* If we read a partial element at the end of the stream we'll get a - * ret=JSMN_ERROR_PART, but due to the previous check we know we read at + * errro, but due to the previous check we know we read at * least one full element, so count tokens that are part of this root * element. */ ret = json_next(*toks) - *toks; diff --git a/common/json_stream.h b/common/json_stream.h index 05e2df74cedb..7756c013d98f 100644 --- a/common/json_stream.h +++ b/common/json_stream.h @@ -9,6 +9,7 @@ # include #include +#include #include #include #include @@ -36,6 +37,8 @@ struct wireaddr; struct wireaddr_internal; struct json_stream { + struct list_node list; + struct json_out *jout; /* Who is writing to this buffer now; NULL if nobody is. */ diff --git a/common/jsonrpc_io.c b/common/jsonrpc_io.c index 48c46c42c608..e302434e9b43 100644 --- a/common/jsonrpc_io.c +++ b/common/jsonrpc_io.c @@ -8,7 +8,7 @@ #include #include -#define READ_CHUNKSIZE 64 +#define READ_CHUNKSIZE (1024*1024) struct jsonrpc_io { MEMBUF(char) membuf; @@ -22,13 +22,14 @@ struct jsonrpc_io { struct jsonrpc_io *jsonrpc_io_new(const tal_t *ctx) { struct jsonrpc_io *json_in; + const size_t bufsize = READ_CHUNKSIZE * 2; json_in = tal(ctx, struct jsonrpc_io); json_in->bytes_read = 0; membuf_init(&json_in->membuf, - tal_arr(json_in, char, READ_CHUNKSIZE), - READ_CHUNKSIZE, membuf_tal_resize); + tal_arr(json_in, char, bufsize), + bufsize, membuf_tal_resize); json_in->toks = toks_alloc(json_in); jsmn_init(&json_in->parser); diff --git a/common/trace.c b/common/trace.c index 793c72e21777..6c1d607cadeb 100644 --- a/common/trace.c +++ b/common/trace.c @@ -1,7 +1,10 @@ #include "config.h" #include +#include +#include #include #include +#include #include #include #include @@ -67,7 +70,25 @@ struct span { bool suspended; }; -static struct span *active_spans = NULL; +static size_t span_keyof(const struct span *span) +{ + return span->key; +} + +static size_t span_key_hash(size_t key) +{ + return siphash24(siphash_seed(), &key, sizeof(key)); +} + +static bool span_key_eq(const struct span *span, size_t key) +{ + return span->key == key; +} +HTABLE_DEFINE_NODUPS_TYPE(struct span, span_keyof, span_key_hash, span_key_eq, + span_htable); + +static struct span fixed_spans[8]; +static struct span_htable *spans = NULL; static struct span *current; static void init_span(struct span *s, @@ -83,6 +104,8 @@ static void init_span(struct span *s, s->parent = parent; s->name = name; s->suspended = false; + for (size_t i = 0; i < SPAN_MAX_TAGS; i++) + s->tags[i].name = NULL; /* If this is a new root span we also need to associate a new * trace_id with it. */ @@ -93,6 +116,7 @@ static void init_span(struct span *s, s->trace_id_hi = current->trace_id_hi; s->trace_id_lo = current->trace_id_lo; } + span_htable_add(spans, s); } /* FIXME: forward decls for minimal patch size */ @@ -118,7 +142,7 @@ static void trace_inject_traceparent(void) current = trace_span_slot(); assert(current); - init_span(current, trace_key(&active_spans), "", NULL); + init_span(current, trace_key(&spans), "", NULL); assert(current && !current->parent); if (!hex_decode(traceparent + 3, 16, &trace_hi, sizeof(trace_hi)) @@ -135,58 +159,31 @@ static void trace_inject_traceparent(void) } } -#ifdef TRACE_DEBUG - -/** Quickly print out the entries in the `active_spans`. */ -static void trace_spans_print(void) -{ - for (size_t j = 0; j < tal_count(active_spans); j++) { - struct span *s = &active_spans[j], *parent = s->parent; - TRACE_DBG(" > %zu: %s (key=%zu, parent=%s, " - "parent_key=%zu)\n", - j, s->name, s->key, parent ? parent->name : "-", - parent ? parent->key : 0); - } -} - -/** Small helper to check for consistency in the linking. The idea is - * that we should be able to reach the root (a span without a - * `parent`) in less than the number of spans. */ -static void trace_check_tree(void) +static void memleak_scan_spans(struct htable *memtable, struct span_htable *spantable) { - /* `current` is either NULL or a valid entry. */ - - /* Walk the tree structure from leaves to their roots. It - * should not take more than the number of spans. */ - struct span *c; - for (size_t i = 0; i < tal_count(active_spans); i++) { - c = &active_spans[i]; - for (int j = 0; j < tal_count(active_spans); j++) - if (c->parent == NULL) - break; - else - c = c->parent; - if (c->parent != NULL) { - TRACE_DBG("Cycle in the trace tree structure!\n"); - trace_spans_print(); - abort(); - } - - assert(c->parent == NULL); + struct span_htable_iter i; + const struct span *span; + + for (span = span_htable_first(spantable, &i); + span; + span = span_htable_next(spantable, &i)) { + memleak_ptr(memtable, span); + memleak_scan_region(memtable, span, sizeof(*span)); } } -#else -static inline void trace_check_tree(void) {} -#endif static void trace_init(void) { const char *dev_trace_file; - if (active_spans) + if (spans) return; - active_spans = notleak(tal_arrz(NULL, struct span, 1)); + /* We can't use new_htable here because we put non-tal + * objects in our htable, and that breaks memleak_scan_htable! */ + spans = notleak(tal(NULL, struct span_htable)); + memleak_add_helper(spans, memleak_scan_spans); + span_htable_init(spans); current = NULL; dev_trace_file = getenv("CLN_DEV_TRACE_FILE"); @@ -210,14 +207,7 @@ static size_t trace_key(const void *key) static struct span *trace_span_find(size_t key) { - for (size_t i = 0; i < tal_count(active_spans); i++) - if (active_spans[i].key == key) - return &active_spans[i]; - - /* Return NULL to signal that there is no such span yet. Used - * to check for accidental collisions that'd reuse the span - * `key`. */ - return NULL; + return span_htable_get(spans, key); } /** @@ -225,44 +215,14 @@ static struct span *trace_span_find(size_t key) */ static struct span *trace_span_slot(void) { - /* Empty slots are defined as having `key=NULL`, so search for - * that, and we should get an empty slot. */ - struct span *s = trace_span_find(0); - - /* In the unlikely case this fails, double it */ - if (!s) { - /* Adjust current and parents when we reallocate! */ - size_t num_active = tal_count(active_spans); - size_t current_off COMPILER_WANTS_INIT("11.4.0-1ubuntu1~22.04 -03"); - size_t parent_off[num_active]; - if (current) - current_off = current - active_spans; - for (size_t i = 0; i < num_active; i++) { - if (!active_spans[i].parent) - continue; - parent_off[i] = active_spans[i].parent - active_spans; - } - TRACE_DBG("%u: out of %zu spans, doubling!\n", - getpid(), tal_count(active_spans)); - tal_resizez(&active_spans, tal_count(active_spans) * 2); - s = trace_span_find(0); - if (current) - current = active_spans + current_off; - for (size_t i = 0; i < num_active; i++) { - if (!active_spans[i].parent) - continue; - active_spans[i].parent = active_spans + parent_off[i]; - } + /* Look for a free fixed slot. */ + for (size_t i = 0; i < ARRAY_SIZE(fixed_spans); i++) { + if (fixed_spans[i].key == 0) + return &fixed_spans[i]; } - assert(s->parent == NULL); - /* Be extra careful not to create cycles. If we return the - * position that is pointed at by current then we can only - * stub the trace by removing the parent link here. */ - if (s == current) - current = NULL; - - return s; + /* Those are used up, we have to allocate. */ + return tal(spans, struct span); } #define MAX_BUF_SIZE 2048 @@ -323,7 +283,17 @@ static void trace_emit(struct span *s) */ static void trace_span_clear(struct span *s) { - memset(s, 0, sizeof(*s)); + if (!span_htable_del(spans, s)) + abort(); + + /* If s is actually in fixed_spans, just zero it out. */ + if (s >= fixed_spans && s < fixed_spans + ARRAY_SIZE(fixed_spans)) { + s->key = 0; + return; + } + + /* Dynamically allocated, so we need to free it */ + tal_free(s); } void trace_span_start_(const char *name, const void *key) @@ -333,7 +303,6 @@ void trace_span_start_(const char *name, const void *key) if (disable_trace) return; trace_init(); - trace_check_tree(); assert(trace_span_find(numkey) == NULL); struct span *s = trace_span_slot(); @@ -341,7 +310,6 @@ void trace_span_start_(const char *name, const void *key) return; init_span(s, numkey, name, current); current = s; - trace_check_tree(); DTRACE_PROBE1(lightningd, span_start, s->id); if (trace_to_file) { fprintf(trace_to_file, "span_start %016"PRIx64"\n", s->id); @@ -364,8 +332,6 @@ void trace_span_end(const void *key) assert(s && "Span to end not found"); assert(s == current && "Ending a span that isn't the current one"); - trace_check_tree(); - struct timeabs now = time_now(); /* discouraged: but tracing wants non-dev time */ s->end_time = (now.ts.tv_sec * 1000000) + now.ts.tv_nsec / 1000; DTRACE_PROBE1(lightningd, span_end, s->id); @@ -380,7 +346,6 @@ void trace_span_end(const void *key) /* Now reset the span */ trace_span_clear(s); - trace_check_tree(); } void trace_span_tag(const void *key, const char *name, const char *value) @@ -473,7 +438,7 @@ void trace_span_resume_(const void *key, const char *lbl) void trace_cleanup(void) { - active_spans = tal_free(active_spans); + spans = tal_free(spans); } #else /* HAVE_USDT */ diff --git a/contrib/pyln-client/pyln/client/plugin.py b/contrib/pyln-client/pyln/client/plugin.py index 24277dca3065..5dfbaa208d75 100644 --- a/contrib/pyln-client/pyln/client/plugin.py +++ b/contrib/pyln-client/pyln/client/plugin.py @@ -59,6 +59,7 @@ def __init__(self, name: str, func: Callable[..., JSONType], self.description = description self.before: List[str] = [] self.after: List[str] = [] + self.filters: Optional[List[Union[str, int]]] = None def get_usage(self): # Handles out-of-order use of parameters like: @@ -546,7 +547,8 @@ def decorator(f: Callable[..., JSONType]) -> Callable[..., JSONType]: def add_hook(self, name: str, func: Callable[..., JSONType], background: bool = False, before: Optional[List[str]] = None, - after: Optional[List[str]] = None) -> None: + after: Optional[List[str]] = None, + filters: Optional[List[Union[str, int]]] = None) -> None: """Register a hook that is called synchronously by lightningd on events """ if name in self.methods: @@ -574,17 +576,19 @@ def add_hook(self, name: str, func: Callable[..., JSONType], method.after = [] if after: method.after = after + method.filters = filters self.methods[name] = method def hook(self, method_name: str, before: List[str] = None, - after: List[str] = None) -> JsonDecoratorType: + after: List[str] = None, + filters: List[Union[str, int]] = None) -> JsonDecoratorType: """Decorator to add a plugin hook to the dispatch table. Internally uses add_hook. """ def decorator(f: Callable[..., JSONType]) -> Callable[..., JSONType]: - self.add_hook(method_name, f, background=False, before=before, after=after) + self.add_hook(method_name, f, background=False, before=before, after=after, filters=filters) return f return decorator @@ -961,9 +965,12 @@ def _getmanifest(self, **kwargs) -> JSONType: continue if method.mtype == MethodType.HOOK: - hooks.append({'name': method.name, - 'before': method.before, - 'after': method.after}) + hook = {'name': method.name, + 'before': method.before, + 'after': method.after} + if method.filters: + hook['filters'] = method.filters + hooks.append(hook) continue # For compatibility with lightningd prior to 24.08, we must diff --git a/contrib/pyln-testing/pyln/testing/db.py b/contrib/pyln-testing/pyln/testing/db.py index 7930e6a37d71..373df50f3742 100644 --- a/contrib/pyln-testing/pyln/testing/db.py +++ b/contrib/pyln-testing/pyln/testing/db.py @@ -51,13 +51,17 @@ def query(self, query: str) -> Union[List[Dict[str, Union[int, bytes]]], List[Di db.close() return result - def execute(self, query: str) -> None: - db = sqlite3.connect(self.path) - c = db.cursor() - c.execute(query) - db.commit() - c.close() - db.close() + def execute(self, query: str, params: tuple = ()) -> None: + """Execute a single statement with bound params. Placeholders: '?'""" + with sqlite3.connect(self.path) as db: + db.execute("PRAGMA busy_timeout = 5000") + db.execute(query, params) + + def executemany(self, query: str, seq_of_params: list[tuple]) -> None: + """Batch execute with bound params. Placeholders: '?'""" + with sqlite3.connect(self.path) as db: + db.execute("PRAGMA busy_timeout = 5000") + db.executemany(query, seq_of_params) def stop(self): pass @@ -100,9 +104,15 @@ def query(self, query): cur.close() return res - def execute(self, query): + def execute(self, query: str, params: tuple = ()) -> None: + """Execute a single statement with bound params. Placeholders: '%s'""" + with self.conn, self.conn.cursor() as cur: + cur.execute(query, params) + + def executemany(self, query: str, seq_of_params: list[tuple]) -> None: + """Batch execute with bound params. Placeholders: '%s'""" with self.conn, self.conn.cursor() as cur: - cur.execute(query) + cur.executemany(query.replace('?', '%s'), seq_of_params) def stop(self): """Clean up the database. diff --git a/contrib/pyln-testing/pyln/testing/utils.py b/contrib/pyln-testing/pyln/testing/utils.py index b47da3e8e71e..59ebd5259ffc 100644 --- a/contrib/pyln-testing/pyln/testing/utils.py +++ b/contrib/pyln-testing/pyln/testing/utils.py @@ -1027,7 +1027,7 @@ def fundwallet(self, sats, addrtype="bech32", mine_block=True): addr = self.rpc.newaddr(addrtype)[addrtype] if mine_block: txid = self.bitcoin.send_and_mine_block(addr, sats) - self.daemon.wait_for_log('Owning output .* txid {} CONFIRMED'.format(txid)) + wait_for(lambda: any([t['hash'] == txid for t in self.rpc.listtransactions()['transactions']])) else: txid = self.bitcoin.rpc.sendtoaddress(addr, sats / 10**8) @@ -1778,7 +1778,8 @@ def join_nodes(self, nodes, fundchannel=True, fundamount=FUNDAMOUNT, wait_for_an # getpeers. if not fundchannel: for src, dst in connections: - dst.daemon.wait_for_log(r'{}-connectd: Handed peer, entering loop'.format(src.info['id'])) + wait_for(lambda: src.rpc.listpeers(dst.info['id'])['peers'] != []) + wait_for(lambda: dst.rpc.listpeers(src.info['id'])['peers'] != []) return bitcoind = nodes[0].bitcoin diff --git a/db/common.h b/db/common.h index 829a3d2cf932..2533d7be1238 100644 --- a/db/common.h +++ b/db/common.h @@ -29,6 +29,8 @@ struct db { char *filename; const char *in_transaction; + /* For lazy transaction activation */ + bool transaction_started; /* DB-specific context */ void *conn; diff --git a/db/db_sqlite3.c b/db/db_sqlite3.c index 5df248b6fadb..ed63989d4f66 100644 --- a/db/db_sqlite3.c +++ b/db/db_sqlite3.c @@ -474,12 +474,14 @@ static char **prepare_table_manip(const tal_t *ctx, /* But core insists we're "in a transaction" for all ops, so fake it */ db->in_transaction = "Not really"; + db->transaction_started = true; /* Turn off foreign keys first. */ db_prepare_for_changes(db); db_exec_prepared_v2(take(db_prepare_untranslated(db, "PRAGMA foreign_keys = OFF;"))); db_report_changes(db, NULL, 0); db->in_transaction = NULL; + db->transaction_started = false; db_begin_transaction(db); cmd = tal_fmt(tmpctx, "ALTER TABLE %s RENAME TO temp_%s;", @@ -525,11 +527,13 @@ static bool complete_table_manip(struct db *db, /* Allow links between them (esp. cascade deletes!) */ db->in_transaction = "Not really"; + db->transaction_started = true; db_prepare_for_changes(db); db_exec_prepared_v2(take(db_prepare_untranslated(db, "PRAGMA foreign_keys = ON;"))); db_report_changes(db, NULL, 0); db->in_transaction = NULL; + db->transaction_started = false; /* migrations are performed inside transactions, so start one. */ db_begin_transaction(db); diff --git a/db/exec.c b/db/exec.c index 5ab6e9b0403f..382ad9f15e95 100644 --- a/db/exec.c +++ b/db/exec.c @@ -121,19 +121,29 @@ static void db_data_version_incr(struct db *db) void db_begin_transaction_(struct db *db, const char *location) { - bool ok; if (db->in_transaction) db_fatal(db, "Already in transaction from %s", db->in_transaction); + db->in_transaction = location; /* No writes yet. */ db->dirty = false; +} + +void db_need_transaction(struct db *db, const char *location) +{ + bool ok; + + if (!db->in_transaction) + db_fatal(db, "Not in a transaction for %s", location); + + if (db->transaction_started) + return; db_prepare_for_changes(db); ok = db->config->begin_tx_fn(db); if (!ok) db_fatal(db, "Failed to start DB transaction: %s", db->error); - - db->in_transaction = location; + db->transaction_started = true; } bool db_in_transaction(struct db *db) @@ -150,6 +160,13 @@ void db_commit_transaction(struct db *db) { bool ok; assert(db->in_transaction); + + if (!db->transaction_started) { + db->in_transaction = NULL; + assert(!db->dirty); + return; + } + db_assert_no_outstanding_statements(db); /* Increment before reporting changes to an eventual plugin. */ @@ -164,4 +181,5 @@ void db_commit_transaction(struct db *db) db->in_transaction = NULL; db->dirty = false; + db->transaction_started = false; } diff --git a/db/exec.h b/db/exec.h index d923ae0f6bb0..c852d9501e9a 100644 --- a/db/exec.h +++ b/db/exec.h @@ -40,6 +40,11 @@ void db_begin_transaction_(struct db *db, const char *location); bool db_in_transaction(struct db *db); +/** + * db_need_transaction: we now need to actually enable the transaction, if not + * already. */ +void db_need_transaction(struct db *db, const char *location); + /** * db_commit_transaction - Commit a running transaction * diff --git a/db/utils.c b/db/utils.c index 25bce2f4f003..d6234179df5a 100644 --- a/db/utils.c +++ b/db/utils.c @@ -3,6 +3,7 @@ #include #include #include +#include #include /* Matches the hash function used in devtools/sql-rewrite.py */ @@ -174,9 +175,12 @@ bool db_step(struct db_stmt *stmt) void db_exec_prepared_v2(struct db_stmt *stmt TAKES) { + bool ret; + + db_need_transaction(stmt->db, stmt->query->query); trace_span_start("db_exec_prepared", stmt); trace_span_tag(stmt, "query", stmt->query->query); - bool ret = stmt->db->config->exec_fn(stmt); + ret = stmt->db->config->exec_fn(stmt); trace_span_end(stmt); if (stmt->db->readonly) @@ -358,6 +362,7 @@ struct db *db_open_(const tal_t *ctx, const char *filename, db_fatal(db, "Unable to find DB queries for %s", db->config->name); db->in_transaction = NULL; + db->transaction_started = false; db->changes = NULL; /* This must be outside a transaction, so catch it */ diff --git a/doc/developers-guide/plugin-development/hooks.md b/doc/developers-guide/plugin-development/hooks.md index ba2c53daa45e..9c2299538932 100644 --- a/doc/developers-guide/plugin-development/hooks.md +++ b/doc/developers-guide/plugin-development/hooks.md @@ -506,7 +506,8 @@ The `htlc_accepted` hook is a chained hook, i.e., multiple plugins can register ### `rpc_command` -The `rpc_command` hook allows a plugin to take over any RPC command. It sends the received JSON-RPC request (for any method!) to the registered plugin, +The `rpc_command` hook allows a plugin to take over any RPC command. It sends the received JSON-RPC request to the registered plugin. You can optionally specify a "filters" array, containing the command names you want to intercept: without this, all commands will be sent to this hook. + ```json { @@ -584,7 +585,7 @@ Note: The `rpc_command` hook is chainable. If two or more plugins try to replace ### `custommsg` -The `custommsg` plugin hook is the receiving counterpart to the [`sendcustommsg`](ref:sendcustommsg) RPC method and allows plugins to handle messages that are not handled internally. The goal of these two components is to allow the implementation of custom protocols or prototypes on top of a Core Lightning node, without having to change the node's implementation itself. +The `custommsg` plugin hook is the receiving counterpart to the [`sendcustommsg`](ref:sendcustommsg) RPC method and allows plugins to handle messages that are not handled internally. The goal of these two components is to allow the implementation of custom protocols or prototypes on top of a Core Lightning node, without having to change the node's implementation itself. Note that if the hook registration specifies "filters" then that should be a JSON array of message numbers, and the hook will only be called for those. Otherwise, the hook is called for all messages not handled internally. The payload for a call follows this format: diff --git a/lightningd/connect_control.c b/lightningd/connect_control.c index e57bc975a0b2..e374ae961606 100644 --- a/lightningd/connect_control.c +++ b/lightningd/connect_control.c @@ -375,11 +375,11 @@ static void custommsg_payload_serialize(struct custommsg_payload *payload, json_add_node_id(stream, "peer_id", &payload->peer_id); } -REGISTER_PLUGIN_HOOK(custommsg, - custommsg_cb, - custommsg_final, - custommsg_payload_serialize, - struct custommsg_payload *); +REGISTER_PLUGIN_HOOK_INTFILTER(custommsg, + custommsg_cb, + custommsg_final, + custommsg_payload_serialize, + struct custommsg_payload *); static void handle_custommsg_in(struct lightningd *ld, const u8 *msg) { @@ -393,7 +393,7 @@ static void handle_custommsg_in(struct lightningd *ld, const u8 *msg) } notify_custommsg(ld, &p->peer_id, p->msg); - plugin_hook_call_custommsg(ld, NULL, p); + plugin_hook_call_custommsg(ld, fromwire_peektype(p->msg), NULL, p); } static void handle_onionmsg_forward_fail(struct lightningd *ld, const u8 *msg) diff --git a/lightningd/jsonrpc.c b/lightningd/jsonrpc.c index 5520fdcec53f..27d7efb3d39a 100644 --- a/lightningd/jsonrpc.c +++ b/lightningd/jsonrpc.c @@ -96,8 +96,15 @@ struct json_connection { /* Our json_streams (owned by the commands themselves while running). * Since multiple streams could start returning data at once, we - * always service these in order, freeing once empty. */ - struct json_stream **js_arr; + * always service these in order. */ + struct list_head jsouts; +}; + +/* We don't put usage inside struct json_command as it's good practice + * to have those const. */ +struct cmd_and_usage { + const struct json_command *command; + const char *usage; }; /** @@ -108,11 +115,9 @@ struct json_connection { */ struct jsonrpc { struct io_listener *rpc_listener; - struct json_command **commands; - /* Map from json command names to usage strings: we don't put this inside - * struct json_command as it's good practice to have those const. */ - STRMAP(const char *) usagemap; + /* Can't be const: we set ->usage later */ + STRMAP(struct cmd_and_usage *) cmdmap; }; /* The command itself usually owns the stream, because jcon may get closed. @@ -125,25 +130,10 @@ static struct json_stream *jcon_new_json_stream(const tal_t *ctx, /* Wake writer to start streaming, in case it's not already. */ io_wake(jcon); - - /* FIXME: Keep streams around for recycling. */ - tal_arr_expand(&jcon->js_arr, js); + list_add_tail(&jcon->jsouts, &js->list); return js; } -static void jcon_remove_json_stream(struct json_connection *jcon, - struct json_stream *js) -{ - for (size_t i = 0; i < tal_count(jcon->js_arr); i++) { - if (js != jcon->js_arr[i]) - continue; - - tal_arr_remove(&jcon->js_arr, i); - return; - } - abort(); -} - /* jcon and cmd have separate lifetimes: we detach them on either destruction */ static void destroy_jcon(struct json_connection *jcon) { @@ -415,54 +405,42 @@ static const struct json_command dev_command = { }; AUTODATA(json_command, &dev_command); -static size_t num_cmdlist; - -static struct json_command **get_cmdlist(void) +static struct json_command **get_cmdlist(size_t *num_cmdlist) { static struct json_command **cmdlist; if (!cmdlist) - cmdlist = autodata_get(json_command, &num_cmdlist); + cmdlist = autodata_get(json_command, num_cmdlist); return cmdlist; } -static void json_add_help_command(struct command *cmd, - struct json_stream *response, - struct json_command *json_command) +struct json_help_info { + struct command *cmd; + struct json_stream *response; +}; + +/* Used as a strmap_iterate function: returns true to continue */ +static bool json_add_help_command(const char *cmdname, + struct cmd_and_usage *cmd, + struct json_help_info *hinfo) { char *usage; /* If they disallow deprecated APIs, don't even list them */ - if (!command_deprecated_out_ok(cmd, NULL, - json_command->depr_start, - json_command->depr_end)) { - return; - } - - usage = tal_fmt(cmd, "%s%s %s", - json_command->name, - json_command->depr_start ? " (DEPRECATED!)" : "", - strmap_get(&cmd->ld->jsonrpc->usagemap, - json_command->name)); - json_object_start(response, NULL); - json_add_string(response, "command", usage); - json_object_end(response); -} - -static const struct json_command *find_command(struct json_command **commands, - const char *cmdname) -{ - for (size_t i = 0; i < tal_count(commands); i++) { - if (streq(cmdname, commands[i]->name)) - return commands[i]; + if (!command_deprecated_out_ok(hinfo->cmd, NULL, + cmd->command->depr_start, + cmd->command->depr_end)) { + return true; } - return NULL; -} -static int compare_commands_name(struct json_command *const *a, - struct json_command *const *b, void *unused) -{ - return strcmp((*a)->name, (*b)->name); + usage = tal_fmt(tmpctx, "%s%s %s", + cmd->command->name, + cmd->command->depr_start ? " (DEPRECATED!)" : "", + cmd->usage); + json_object_start(hinfo->response, NULL); + json_add_string(hinfo->response, "command", usage); + json_object_end(hinfo->response); + return true; } static struct command_result *json_help(struct command *cmd, @@ -470,30 +448,28 @@ static struct command_result *json_help(struct command *cmd, const jsmntok_t *obj UNNEEDED, const jsmntok_t *params) { - struct json_stream *response; const char *cmdname; - struct json_command **commands; - const struct json_command *one_cmd; + struct cmd_and_usage *one_cmd; + struct json_help_info hinfo; if (!param_check(cmd, buffer, params, p_opt("command", param_string, &cmdname), NULL)) return command_param_failed(); - commands = cmd->ld->jsonrpc->commands; if (cmdname) { - one_cmd = find_command(commands, cmdname); + one_cmd = strmap_get(&cmd->ld->jsonrpc->cmdmap, cmdname); if (!one_cmd) return command_fail(cmd, JSONRPC2_METHOD_NOT_FOUND, "Unknown command %s", cmdname); if (!command_deprecated_in_ok(cmd, NULL, - one_cmd->depr_start, - one_cmd->depr_end)) + one_cmd->command->depr_start, + one_cmd->command->depr_end)) return command_fail(cmd, JSONRPC2_METHOD_NOT_FOUND, "Deprecated command %s", cmdname); - if (!cmd->ld->developer && one_cmd->dev_only) + if (!cmd->ld->developer && one_cmd->command->dev_only) return command_fail(cmd, JSONRPC2_METHOD_NOT_FOUND, "Developer-only command %s", cmdname); @@ -503,31 +479,32 @@ static struct command_result *json_help(struct command *cmd, if (command_check_only(cmd)) return command_check_done(cmd); - asort(commands, tal_count(commands), compare_commands_name, NULL); - - response = json_stream_success(cmd); - json_array_start(response, "help"); - for (size_t i = 0; i < tal_count(commands); i++) { - if (!one_cmd || one_cmd == commands[i]) - json_add_help_command(cmd, response, commands[i]); + hinfo.cmd = cmd; + hinfo.response = json_stream_success(cmd); + json_array_start(hinfo.response, "help"); + if (one_cmd) + json_add_help_command(cmdname, one_cmd, &hinfo); + else { + strmap_iterate(&cmd->ld->jsonrpc->cmdmap, + json_add_help_command, &hinfo); } - json_array_end(response); + json_array_end(hinfo.response); /* Tell cli this is simple enough to be formatted flat for humans */ - json_add_string(response, "format-hint", "simple"); + json_add_string(hinfo.response, "format-hint", "simple"); - return command_success(cmd, response); + return command_success(cmd, hinfo.response); } static const struct json_command *find_cmd(const struct jsonrpc *rpc, const char *buffer, const jsmntok_t *tok) { - struct json_command **commands = rpc->commands; + const struct cmd_and_usage *cmd; - for (size_t i = 0; i < tal_count(commands); i++) - if (json_tok_streq(buffer, tok, commands[i]->name)) - return commands[i]; + cmd = strmap_getn(&rpc->cmdmap, buffer + tok->start, tok->end - tok->start); + if (cmd) + return cmd->command; return NULL; } @@ -801,11 +778,6 @@ static struct command_result *command_exec(struct json_connection *jcon, if (res == &pending) assert(cmd->pending); - /* The command might outlive the connection. */ - if (jcon) - list_for_each(&jcon->commands, cmd, list) - assert(cmd->pending); - return res; } @@ -1023,18 +995,19 @@ rpc_command_hook_callback(struct rpc_command_hook_payload *p, return true; } -REGISTER_PLUGIN_HOOK(rpc_command, - rpc_command_hook_callback, - rpc_command_hook_final, - rpc_command_hook_serialize, - struct rpc_command_hook_payload *); +REGISTER_PLUGIN_HOOK_STRFILTER(rpc_command, + rpc_command_hook_callback, + rpc_command_hook_final, + rpc_command_hook_serialize, + struct rpc_command_hook_payload *); /* We return struct command_result so command_fail return value has a natural * sink; we don't actually use the result. */ static struct command_result * parse_request(struct json_connection *jcon, const char *buffer, - const jsmntok_t tok[]) + const jsmntok_t tok[], + const char **methodname) { const jsmntok_t *method, *id, *params, *filter, *jsonrpc; struct command *c; @@ -1142,9 +1115,11 @@ parse_request(struct json_connection *jcon, rpc_hook->custom_replace = NULL; rpc_hook->custom_buffer = NULL; + *methodname = c->json_cmd->name; trace_span_start("lightningd/jsonrpc", &c); trace_span_tag(&c, "method", c->json_cmd->name); - completed = plugin_hook_call_rpc_command(jcon->ld, c->id, rpc_hook); + /* They can filter by command name */ + completed = plugin_hook_call_rpc_command(jcon->ld, c->json_cmd->name, c->id, rpc_hook); trace_span_end(&c); /* If it's deferred, mark it (otherwise, it's completed) */ @@ -1161,13 +1136,16 @@ static struct io_plan *stream_out_complete(struct io_conn *conn, static struct io_plan *start_json_stream(struct io_conn *conn, struct json_connection *jcon) { + struct json_stream *js; + /* If something has created an output buffer, start streaming. */ - if (tal_count(jcon->js_arr)) { + js = list_top(&jcon->jsouts, struct json_stream, list); + if (js) { size_t len; - const char *p = json_out_contents(jcon->js_arr[0]->jout, &len); + const char *p = json_out_contents(js->jout, &len); if (len) log_io(jcon->log, LOG_IO_OUT, NULL, "", p, len); - return json_stream_output(jcon->js_arr[0], conn, + return json_stream_output(js, conn, stream_out_complete, jcon); } @@ -1191,7 +1169,7 @@ static struct io_plan *stream_out_complete(struct io_conn *conn, struct json_stream *js, struct json_connection *jcon) { - jcon_remove_json_stream(jcon, js); + list_del_from(&jcon->jsouts, &js->list); tal_free(js); /* Wait for more output. */ @@ -1206,13 +1184,15 @@ static struct io_plan *read_json(struct io_conn *conn, size_t len_read; const jsmntok_t *toks; const char *buffer, *error; + size_t num_parsed = 0; + const char *last_method = NULL; buffer = jsonrpc_newly_read(jcon->json_in, &len_read); if (len_read) log_io(jcon->log, LOG_IO_IN, NULL, "", buffer, len_read); /* We wait for pending output to be consumed, to avoid DoS */ - if (tal_count(jcon->js_arr) != 0) { + if (!list_empty(&jcon->jsouts)) { return io_wait(conn, conn, read_json, jcon); } @@ -1232,23 +1212,26 @@ static struct io_plan *read_json(struct io_conn *conn, db_begin_transaction(jcon->ld->wallet->db); in_transaction = true; } - parse_request(jcon, buffer, toks); + parse_request(jcon, buffer, toks, &last_method); jsonrpc_io_parse_done(jcon->json_in); + /* Don't ever process for more than 100 commands or 250 msec + * without giving others a chance */ + if (num_parsed++ == 100 + || time_greater(timemono_between(time_mono(), start_time), + time_from_msec(250))) { + db_commit_transaction(jcon->ld->wallet->db); + log_debug(jcon->log, "Pausing parsing after %zu requests and %"PRIu64"msec (last method=%s)", + num_parsed, + time_to_msec(timemono_between(time_mono(), start_time)), + last_method ? last_method : "NONE"); + /* Call us back, as if we read nothing new */ + return io_always(conn, read_json, jcon); + } + if (!jcon->db_batching) { db_commit_transaction(jcon->ld->wallet->db); in_transaction = false; - } else { - /* FIXME: io_always() should interleave with - * real IO, and then we should rotate order we - * service fds in, to avoid starvation. */ - if (time_greater(timemono_between(time_mono(), - start_time), - time_from_msec(250))) { - db_commit_transaction(jcon->ld->wallet->db); - /* Call us back, as if we read nothing new */ - return io_always(conn, read_json, jcon); - } } goto again; @@ -1267,7 +1250,7 @@ static struct io_plan *jcon_connected(struct io_conn *conn, jcon = notleak(tal(conn, struct json_connection)); jcon->conn = conn; jcon->ld = ld; - jcon->js_arr = tal_arr(jcon, struct json_stream *, 0); + list_head_init(&jcon->jsouts); jcon->json_in = jsonrpc_io_new(jcon); jcon->notifications_enabled = false; jcon->db_batching = false; @@ -1301,27 +1284,26 @@ static struct io_plan *incoming_jcon_connected(struct io_conn *conn, static void destroy_json_command(struct json_command *command, struct jsonrpc *rpc) { - strmap_del(&rpc->usagemap, command->name, NULL); - for (size_t i = 0; i < tal_count(rpc->commands); i++) { - if (rpc->commands[i] == command) { - tal_arr_remove(&rpc->commands, i); - return; - } - } - abort(); + struct cmd_and_usage *cmd; + + if (!strmap_del(&rpc->cmdmap, command->name, &cmd)) + abort(); + tal_free(cmd); } -static bool command_add(struct jsonrpc *rpc, struct json_command *command) +static struct cmd_and_usage *command_add(struct jsonrpc *rpc, struct json_command *command) { - size_t count = tal_count(rpc->commands); + struct cmd_and_usage *cmd; /* Check that we don't clobber a method */ - for (size_t i = 0; i < count; i++) - if (streq(rpc->commands[i]->name, command->name)) - return false; + if (strmap_get(&rpc->cmdmap, command->name)) + return NULL; - tal_arr_expand(&rpc->commands, command); - return true; + cmd = tal(rpc, struct cmd_and_usage); + cmd->command = command; + cmd->usage = NULL; + strmap_add(&rpc->cmdmap, command->name, cmd); + return cmd; } /* Built-in commands get called to construct usage string via param() */ @@ -1338,22 +1320,23 @@ static void setup_command_usage(struct lightningd *ld, dummy->json_cmd = command; res = command->dispatch(dummy, NULL, NULL, NULL); assert(res == ¶m_failed); - assert(strmap_get(&ld->jsonrpc->usagemap, command->name)); + assert(strmap_get(&ld->jsonrpc->cmdmap, command->name)->usage); } bool jsonrpc_command_add(struct jsonrpc *rpc, struct json_command *command, const char *usage TAKES) { - const char *unescaped; + struct cmd_and_usage *cmd; - if (!command_add(rpc, command)) + cmd = command_add(rpc, command); + if (!cmd) return false; - unescaped = json_escape_unescape_len(command, usage, strlen(usage)); - if (!unescaped) + cmd->usage = json_escape_unescape_len(cmd, usage, strlen(usage)); + if (!cmd->usage) { + tal_free(cmd); return false; - - strmap_add(&rpc->usagemap, command->name, unescaped); + } tal_add_destructor2(command, destroy_json_command, rpc); return true; } @@ -1370,22 +1353,22 @@ static bool jsonrpc_command_add_perm(struct lightningd *ld, static void destroy_jsonrpc(struct jsonrpc *jsonrpc) { - strmap_clear(&jsonrpc->usagemap); + strmap_clear(&jsonrpc->cmdmap); } static void memleak_help_jsonrpc(struct htable *memtable, struct jsonrpc *jsonrpc) { - memleak_scan_strmap(memtable, &jsonrpc->usagemap); + memleak_scan_strmap(memtable, &jsonrpc->cmdmap); } void jsonrpc_setup(struct lightningd *ld) { - struct json_command **commands = get_cmdlist(); + size_t num_cmdlist; + struct json_command **commands = get_cmdlist(&num_cmdlist); ld->jsonrpc = tal(ld, struct jsonrpc); - strmap_init(&ld->jsonrpc->usagemap); - ld->jsonrpc->commands = tal_arr(ld->jsonrpc, struct json_command *, 0); + strmap_init(&ld->jsonrpc->cmdmap); for (size_t i=0; ijsonrpc, commands[i])) fatal("Cannot add duplicate command %s", @@ -1422,9 +1405,11 @@ void command_log(struct command *cmd, enum log_level level, void command_set_usage(struct command *cmd, const char *usage TAKES) { - usage = tal_strdup(cmd->ld, usage); - if (!strmap_add(&cmd->ld->jsonrpc->usagemap, cmd->json_cmd->name, usage)) - fatal("Two usages for command %s?", cmd->json_cmd->name); + struct cmd_and_usage *cmd_and_usage; + + cmd_and_usage = strmap_get(&cmd->ld->jsonrpc->cmdmap, cmd->json_cmd->name); + assert(!cmd_and_usage->usage); + cmd_and_usage->usage = tal_strdup(cmd_and_usage, usage); } bool command_check_only(const struct command *cmd) diff --git a/lightningd/plugin.c b/lightningd/plugin.c index 9ce2fc44954d..f4a2f1a6bf1f 100644 --- a/lightningd/plugin.c +++ b/lightningd/plugin.c @@ -124,7 +124,8 @@ static bool plugins_all_in_state(const struct plugins *plugins, } /* Once they've all replied with their manifests, we can order them. */ -static void check_plugins_manifests(struct plugins *plugins) +static void check_plugins_manifests(struct plugins *plugins, + struct logger *log) { struct plugin *plugin; struct plugin **depfail; @@ -133,7 +134,7 @@ static void check_plugins_manifests(struct plugins *plugins) return; /* Now things are settled, try to order hooks. */ - depfail = plugin_hooks_make_ordered(tmpctx); + depfail = plugin_hooks_make_ordered(tmpctx, log); for (size_t i = 0; i < tal_count(depfail); i++) { /* Only complain and free plugins! */ if (depfail[i]->plugin_state != NEEDS_INIT) @@ -284,7 +285,7 @@ static void destroy_plugin(struct plugin *p) /* If this was last one manifests were waiting for, handle deps */ if (p->plugin_state == AWAITING_GETMANIFEST_RESPONSE) - check_plugins_manifests(p->plugins); + check_plugins_manifests(p->plugins, p->plugins->ld->log); /* Daemon shutdown overrules plugin's importance; aborts init checks */ if (p->plugins->ld->state == LD_STATE_SHUTDOWN) { @@ -371,7 +372,7 @@ struct plugin *plugin_register(struct plugins *plugins, const char* path TAKES, p->can_check = false; p->plugin_state = UNCONFIGURED; - p->js_arr = tal_arr(p, struct json_stream *, 0); + list_head_init(&p->jsouts); p->notification_topics = tal_arr(p, const char *, 0); p->subscriptions = NULL; p->dynamic = false; @@ -473,8 +474,8 @@ void plugin_kill(struct plugin *plugin, enum log_level loglevel, */ static void plugin_send(struct plugin *plugin, struct json_stream *stream) { - tal_steal(plugin->js_arr, stream); - tal_arr_expand(&plugin->js_arr, stream); + tal_steal(plugin, stream); + list_add_tail(&plugin->jsouts, &stream->list); io_wake(plugin); } @@ -709,6 +710,7 @@ static struct io_plan *plugin_read_json(struct io_conn *conn, const char *new_bytes, *buffer; const jsmntok_t *toks; size_t new_bytes_len; + size_t num_responses = 0; /* wallet is NULL in really early code */ bool want_transaction = (plugin->plugins->want_db_transaction && wallet != NULL); @@ -802,6 +804,12 @@ static struct io_plan *plugin_read_json(struct io_conn *conn, } jsonrpc_io_parse_done(plugin->json_in); + /* Don't let it flood us with logs/responses and starve everyone else */ + if (num_responses++ == 100) { + log_debug(plugin->log, "Pausing response parsing after %zu response", num_responses); + /* Call us back, as if we read nothing new */ + return io_always(conn, plugin_read_json, plugin); + } } /* Now read more from the connection */ @@ -815,10 +823,7 @@ static struct io_plan *plugin_write_json(struct io_conn *conn, static struct io_plan *plugin_stream_complete(struct io_conn *conn, struct json_stream *js, struct plugin *plugin) { - assert(tal_count(plugin->js_arr) > 0); - /* Remove js and shift all remainig over */ - tal_arr_remove(&plugin->js_arr, 0); - + list_del_from(&plugin->jsouts, &js->list); /* It got dropped off the queue, free it. */ tal_free(js); @@ -828,8 +833,11 @@ static struct io_plan *plugin_stream_complete(struct io_conn *conn, struct json_ static struct io_plan *plugin_write_json(struct io_conn *conn, struct plugin *plugin) { - if (tal_count(plugin->js_arr)) { - return json_stream_output(plugin->js_arr[0], plugin->stdin_conn, plugin_stream_complete, plugin); + struct json_stream *js; + + js = list_top(&plugin->jsouts, struct json_stream, list); + if (js) { + return json_stream_output(js, plugin->stdin_conn, plugin_stream_complete, plugin); } return io_out_wait(conn, plugin, plugin_write_json, plugin); @@ -1471,7 +1479,7 @@ static const char *plugin_subscriptions_add(struct plugin *plugin, static const char *plugin_hooks_add(struct plugin *plugin, const char *buffer, const jsmntok_t *resulttok) { - const jsmntok_t *t, *hookstok, *beforetok, *aftertok; + const jsmntok_t *t, *hookstok, *beforetok, *aftertok, *filterstok; size_t i; hookstok = json_get_member(buffer, resulttok, "hooks"); @@ -1481,6 +1489,7 @@ static const char *plugin_hooks_add(struct plugin *plugin, const char *buffer, json_for_each_arr(i, t, hookstok) { char *name; struct plugin_hook *hook; + const char *err; if (t->type == JSMN_OBJECT) { const jsmntok_t *nametok; @@ -1493,21 +1502,16 @@ static const char *plugin_hooks_add(struct plugin *plugin, const char *buffer, name = json_strdup(tmpctx, buffer, nametok); beforetok = json_get_member(buffer, t, "before"); aftertok = json_get_member(buffer, t, "after"); + filterstok = json_get_member(buffer, t, "filters"); } else { /* FIXME: deprecate in 3 releases after v0.9.2! */ name = json_strdup(tmpctx, buffer, t); - beforetok = aftertok = NULL; - } - - hook = plugin_hook_register(plugin, name); - if (!hook) { - return tal_fmt(plugin, - "could not register hook '%s', either the " - "name doesn't exist or another plugin " - "already registered it.", - name); + beforetok = aftertok = filterstok = NULL; } + err = plugin_hook_register(plugin, name, buffer, filterstok, &hook); + if (err) + return err; plugin_hook_add_deps(hook, plugin, buffer, beforetok, aftertok); tal_free(name); } @@ -1843,7 +1847,7 @@ static void plugin_manifest_cb(const char *buffer, plugin_kill(plugin, LOG_INFORM, "Not a dynamic plugin"); else - check_plugins_manifests(plugin->plugins); + check_plugins_manifests(plugin->plugins, plugin->log); } /* If this is a valid plugin return full path name, otherwise NULL */ diff --git a/lightningd/plugin.h b/lightningd/plugin.h index fb16d4b252e4..23b7554b3702 100644 --- a/lightningd/plugin.h +++ b/lightningd/plugin.h @@ -78,10 +78,9 @@ struct plugin { /* Stuff we read */ struct jsonrpc_io *json_in; - /* Our json_streams. Since multiple streams could start - * returning data at once, we always service these in order, - * freeing once empty. */ - struct json_stream **js_arr; + /* Our plugin_jstream_out list. Since multiple streams could start + * returning data at once, we always service these in order. */ + struct list_head jsouts; struct logger *log; diff --git a/lightningd/plugin_hook.c b/lightningd/plugin_hook.c index 934fa16355e1..5c561e1f7f3e 100644 --- a/lightningd/plugin_hook.c +++ b/lightningd/plugin_hook.c @@ -1,5 +1,6 @@ #include "config.h" #include +#include #include #include #include @@ -9,36 +10,30 @@ * dispatch an eventual plugin_hook response. */ struct plugin_hook_request { const char *cmd_id; - const struct plugin_hook *hook; + struct plugin_hook *hook; void *cb_arg; /* db_hook doesn't have ld yet */ struct db *db; struct lightningd *ld; - /* Where are we up to in the hooks[] array below */ + /* Only one of these can be non-NULL */ + const char *strfilterfield; + u64 intfilterfield; + + /* Where are we up to in the hook->hooks[] array */ size_t hook_index; - /* A snapshot taken at the start: destructors may NULL some out! */ - struct hook_instance **hooks; }; -static void destroy_hook_in_ph_req(struct hook_instance *hook, - struct plugin_hook_request *ph_req) -{ - for (size_t i = 0; i < tal_count(ph_req->hooks); i++) { - if (ph_req->hooks[i] == hook) { - ph_req->hooks[i] = NULL; - return; - } - } - abort(); -} - struct hook_instance { /* What plugin registered */ struct plugin *plugin; /* Dependencies it asked for. */ const char **before, **after; + + /* Optional filter fields. */ + const char **strfilters; + const u64 *intfilters; }; static struct plugin_hook **get_hooks(size_t *num) @@ -62,36 +57,140 @@ static struct plugin_hook *plugin_hook_by_name(const char *name) return NULL; } -/* When we destroy a plugin, we remove its hooks */ -static void destroy_hook_instance(struct hook_instance *h, - struct plugin_hook *hook) +/* When we destroy a plugin, we NULL out any hooks it registered */ +static void remove_hook_instance(const struct hook_instance *h, + struct hook_instance **hookarr) { - for (size_t i = 0; i < tal_count(hook->hooks); i++) { - if (h == hook->hooks[i]) { - tal_arr_remove(&hook->hooks, i); + for (size_t i = 0; i < tal_count(hookarr); i++) { + if (h == hookarr[i]) { + hookarr[i] = NULL; return; } } abort(); } -struct plugin_hook *plugin_hook_register(struct plugin *plugin, const char *method) +static void destroy_hook_instance(struct hook_instance *h, + struct plugin_hook *hook) { - struct hook_instance *h; - struct plugin_hook *hook = plugin_hook_by_name(method); - if (!hook) { - /* No such hook name registered */ + /* NULL it out. */ + remove_hook_instance(h, hook->hooks); + + /* If there's a pending set of hooks, remove ourselves there too! */ + if (hook->new_hooks) + remove_hook_instance(h, hook->new_hooks); +} + +/* Filters in an array of strings */ +static const char *parse_str_filters(const tal_t *ctx, + const char *buffer, + const jsmntok_t *filterstok, + const char ***filters) +{ + size_t i; + const jsmntok_t *t; + + if (!filterstok) { + *filters = NULL; return NULL; } + if (filterstok->type != JSMN_ARRAY) + return tal_fmt(ctx, "filters token must be an array"); + + *filters = tal_arr(ctx, const char *, filterstok->size); + json_for_each_arr(i, t, filterstok) { + if (t->type != JSMN_STRING) + return tal_fmt(ctx, "filters must be array of strings, not '%.*s'", + json_tok_full_len(t), + json_tok_full(buffer, t)); + (*filters)[i] = json_strdup(*filters, buffer, t); + } + return NULL; +} + +/* Filters in an array of ints */ +static const char *parse_int_filters(const tal_t *ctx, + const char *buffer, + const jsmntok_t *filterstok, + u64 **filters) +{ + size_t i; + const jsmntok_t *t; + + if (!filterstok) { + *filters = NULL; + return NULL; + } + + if (filterstok->type != JSMN_ARRAY) + return tal_fmt(ctx, "filters token must be an array"); + + *filters = tal_arr(ctx, u64, filterstok->size); + json_for_each_arr(i, t, filterstok) { + if (!json_to_u64(buffer, t, &(*filters)[i])) + return tal_fmt(ctx, "filters must be array of unsigned integers, not '%.*s'", + json_tok_full_len(t), + json_tok_full(buffer, t)); + } + return NULL; +} + +const char *plugin_hook_register(struct plugin *plugin, + const char *method, + const char *buf, const jsmntok_t *filterstok, + struct plugin_hook **plugin_hook) +{ + struct hook_instance *h; + struct plugin_hook *hook; + const char *err; + const char **strfilters; + u64 *intfilters; + + hook = plugin_hook_by_name(method); + if (!hook) + return tal_fmt(plugin, "Unknown hook name %s", method); + + switch (hook->filter_type) { + case JSMN_UNDEFINED: + if (filterstok) + return tal_fmt(plugin, "Hook %s does not allow filters", method); + intfilters = NULL; + strfilters = NULL; + break; + case JSMN_PRIMITIVE: + strfilters = NULL; + err = parse_int_filters(plugin, buf, filterstok, &intfilters); + if (err) + return err; + break; + case JSMN_STRING: + intfilters = NULL; + err = parse_str_filters(plugin, buf, filterstok, &strfilters); + if (err) + return err; + break; + + /* Nothing else is valid (yet?) */ + default: + abort(); + } + /* Make sure the hook_elements array is initialized. */ - if (hook->hooks == NULL) + if (hook->hooks == NULL) { hook->hooks = notleak(tal_arr(NULL, struct hook_instance *, 0)); + hook->new_hooks = NULL; + hook->num_users = 0; + } /* Ensure we don't register the same plugin multple times. */ - for (size_t i=0; ihooks); i++) + for (size_t i=0; ihooks); i++) { + if (!hook->hooks[i]) + continue; if (hook->hooks[i]->plugin == plugin) - return NULL; + return tal_fmt(plugin, "Registered for hook %s multiple times", + method); + } /* Ok, we're sure they can register and they aren't yet registered, so * register them. */ @@ -99,14 +198,17 @@ struct plugin_hook *plugin_hook_register(struct plugin *plugin, const char *meth h->plugin = plugin; h->before = tal_arr(h, const char *, 0); h->after = tal_arr(h, const char *, 0); + h->strfilters = tal_steal(h, strfilters); + h->intfilters = tal_steal(h, intfilters); tal_add_destructor2(h, destroy_hook_instance, hook); tal_arr_expand(&hook->hooks, h); - return hook; + *plugin_hook = hook; + return NULL; } /* Mutual recursion */ -static void plugin_hook_call_next(struct plugin_hook_request *ph_req); +static bool plugin_hook_call_next(struct plugin_hook_request *ph_req); static void plugin_hook_callback(const char *buffer, const jsmntok_t *toks, const jsmntok_t *idtok, struct plugin_hook_request *r); @@ -117,19 +219,31 @@ bool plugin_hook_continue(void *unused, const char *buffer, const jsmntok_t *tok return resrestok && json_tok_streq(buffer, resrestok, "continue"); } -static void cleanup_ph_req(struct plugin_hook_request *ph_req) +static void hook_start(struct plugin_hook *hook) { - /* We need to remove the destructors from the remaining - * call-chain, otherwise they'd still be called when the - * plugin dies or we shut down. */ - for (size_t i=0; ihooks); i++) { - tal_del_destructor2(ph_req->hooks[i], - destroy_hook_in_ph_req, ph_req); + hook->num_users++; +} + +static void hook_done(struct lightningd *ld, + struct plugin_hook *hook, + void *cb_arg) +{ + /* If we're the last one out, we can update hooks */ + if (--hook->num_users == 0) { + if (hook->new_hooks) { + log_unusual(ld->log, "Updating hooks for %s now usage is done.", + hook->name); + /* Free this later (after final_cb) if not already done */ + tal_steal(tmpctx, hook->hooks); + hook->hooks = hook->new_hooks; + hook->new_hooks = NULL; + } } - tal_free(ph_req); + hook->final_cb(cb_arg); } + /** * Callback to be passed to the jsonrpc_request. * @@ -144,8 +258,9 @@ static void plugin_hook_callback(const char *buffer, const jsmntok_t *toks, const struct hook_instance *h; enum jsonrpc_errcode ecode; - assert(ph_req->hook_index < tal_count(ph_req->hooks)); - h = ph_req->hooks[ph_req->hook_index]; + assert(ph_req->hook_index < tal_count(ph_req->hook->hooks)); + /* NULL if it vanished */ + h = ph_req->hook->hooks[ph_req->hook_index]; /* destructor NULLs out hooks[], but we get called first at the moment. * We handle either */ @@ -173,7 +288,7 @@ static void plugin_hook_callback(const char *buffer, const jsmntok_t *toks, if (!ph_req->hook->deserialize_cb(ph_req->cb_arg, buffer, resulttok)) { tal_free(ph_req->cb_arg); - cleanup_ph_req(ph_req); + tal_free(ph_req); return; } } else { @@ -184,7 +299,38 @@ static void plugin_hook_callback(const char *buffer, const jsmntok_t *toks, plugin_hook_call_next(ph_req); } -static void plugin_hook_call_next(struct plugin_hook_request *ph_req) +static bool hook_callable(const struct hook_instance *hook, + const char *strfilterfield, + u64 intfilterfield) +{ + /* NULL? Skip */ + if (!hook) + return false; + + /* String filters? If there are some we must match one. */ + if (hook->strfilters) { + for (size_t i = 0; i < tal_count(hook->strfilters); i++) { + if (streq(strfilterfield, hook->strfilters[i])) + return true; + } + return false; + } + + /* Integer filters? */ + if (hook->intfilters) { + for (size_t i = 0; i < tal_count(hook->intfilters); i++) { + if (intfilterfield == hook->intfilters[i]) + return true; + } + return false; + } + + /* No filters: always call. */ + return true; +} + +/* Returns true if it finished all the hooks (and thus didn't call anything) */ +static bool plugin_hook_call_next(struct plugin_hook_request *ph_req) { struct jsonrpc_request *req; const struct plugin_hook *hook = ph_req->hook; @@ -193,14 +339,16 @@ static void plugin_hook_call_next(struct plugin_hook_request *ph_req) /* Find next non-NULL hook: call final if we're done */ do { ph_req->hook_index++; - if (ph_req->hook_index >= tal_count(ph_req->hooks)) { - ph_req->hook->final_cb(ph_req->cb_arg); - cleanup_ph_req(ph_req); - return; + if (ph_req->hook_index >= tal_count(hook->hooks)) { + hook_done(ph_req->ld, ph_req->hook, ph_req->cb_arg); + tal_free(ph_req); + return true; } - } while (ph_req->hooks[ph_req->hook_index] == NULL); + } while (!hook_callable(hook->hooks[ph_req->hook_index], + ph_req->strfilterfield, + ph_req->intfilterfield)); - plugin = ph_req->hooks[ph_req->hook_index]->plugin; + plugin = hook->hooks[ph_req->hook_index]->plugin; log_trace(ph_req->ld->log, "Calling %s hook of plugin %s", ph_req->hook->name, plugin->shortname); req = jsonrpc_request_start(NULL, hook->name, ph_req->cmd_id, @@ -216,12 +364,17 @@ static void plugin_hook_call_next(struct plugin_hook_request *ph_req) req->stream); plugin_request_send(plugin, req); + return false; } -bool plugin_hook_call_(struct lightningd *ld, const struct plugin_hook *hook, +bool plugin_hook_call_(struct lightningd *ld, + struct plugin_hook *hook, + const char *strfilterfield TAKES, + u64 intfilterfield, const char *cmd_id TAKES, tal_t *cb_arg STEALS) { + hook_start(hook); if (tal_count(hook->hooks)) { /* If we have a plugin that has registered for this * hook, serialize and call it */ @@ -236,23 +389,17 @@ bool plugin_hook_call_(struct lightningd *ld, const struct plugin_hook *hook, ph_req->db = ld->wallet->db; ph_req->ld = ld; ph_req->cmd_id = tal_strdup_or_null(ph_req, cmd_id); - ph_req->hooks = tal_dup_talarr(ph_req, - struct hook_instance *, - hook->hooks); - /* If hook goes away, NULL out our snapshot */ - for (size_t i=0; ihooks); i++) - tal_add_destructor2(ph_req->hooks[i], - destroy_hook_in_ph_req, ph_req); ph_req->hook_index = -1; - plugin_hook_call_next(ph_req); - return false; + ph_req->strfilterfield = tal_strdup_or_null(ph_req, strfilterfield); + ph_req->intfilterfield = intfilterfield; + return plugin_hook_call_next(ph_req); } else { /* If no plugin has registered for this hook, just * call the callback with a NULL result. Saves us the * roundtrip to the serializer and deserializer. If we * were expecting a default response it should have * been part of the `cb_arg`. */ - hook->final_cb(cb_arg); + hook_done(ld, hook, cb_arg); return true; } } @@ -315,38 +462,45 @@ static void db_hook_response(const char *buffer, const jsmntok_t *toks, void plugin_hook_db_sync(struct db *db) { - const struct plugin_hook *hook = &db_write_hook; + struct plugin_hook *hook = &db_write_hook; struct jsonrpc_request *req; struct plugin_hook_request *ph_req; void *ret; struct plugin **plugin_arr; struct plugins *plugins; size_t i; - size_t num_hooks; - + size_t num_live_hooks; const char **changes = db_changes(db); - num_hooks = tal_count(hook->hooks); - if (num_hooks == 0) + + /* Common fast path */ + if (tal_count(hook->hooks) == 0) return; - plugin_arr = notleak(tal_arr(NULL, struct plugin *, - num_hooks)); - for (i = 0; i < num_hooks; ++i) - plugin_arr[i] = hook->hooks[i]->plugin; + /* Could still have no non-NULL ones, if a plugin was removed. */ + plugin_arr = tal_arr(NULL, struct plugin *, 0); + for (i = 0; i < tal_count(hook->hooks); ++i) { + if (hook->hooks[i]) + tal_arr_expand(&plugin_arr, hook->hooks[i]->plugin); + } + num_live_hooks = tal_count(plugin_arr); + if (num_live_hooks == 0) { + tal_free(plugin_arr); + return; + } plugins = plugin_arr[0]->plugins; ph_req = notleak(tal(hook->hooks, struct plugin_hook_request)); ph_req->hook = hook; ph_req->db = db; - ph_req->cb_arg = &num_hooks; + ph_req->cb_arg = &num_live_hooks; - for (i = 0; i < num_hooks; ++i) { + for (i = 0; i < num_live_hooks; ++i) { /* Create an object for this plugin. */ struct db_write_hook_req *dwh_req; dwh_req = tal(ph_req, struct db_write_hook_req); dwh_req->plugin = plugin_arr[i]; dwh_req->ph_req = ph_req; - dwh_req->num_hooks = &num_hooks; + dwh_req->num_hooks = &num_live_hooks; /* FIXME: id_prefix from caller? */ /* FIXME: do IO logging for this! */ @@ -378,7 +532,7 @@ void plugin_hook_db_sync(struct db *db) log_debug(plugins->ld->log, "io_break: %s", __func__); io_break(ret); } - assert(num_hooks == 0); + assert(num_live_hooks == 0); tal_free(plugin_arr); tal_free(ph_req); } @@ -407,6 +561,8 @@ void plugin_hook_add_deps(struct plugin_hook *hook, /* We just added this, it must exist */ for (size_t i = 0; i < tal_count(hook->hooks); i++) { + if (!hook->hooks[i]) + continue; if (hook->hooks[i]->plugin == plugin) { h = hook->hooks[i]; break; @@ -453,20 +609,29 @@ static struct hook_node *get_best_candidate(struct hook_node *graph) } static struct plugin **plugin_hook_make_ordered(const tal_t *ctx, + struct logger *log, struct plugin_hook *hook) { struct hook_node *graph, *n; struct hook_instance **done; /* Populate graph nodes */ - graph = tal_arr(tmpctx, struct hook_node, tal_count(hook->hooks)); - for (size_t i = 0; i < tal_count(graph); i++) { - graph[i].finished = false; - graph[i].hook = hook->hooks[i]; - graph[i].num_incoming = 0; - graph[i].outgoing = tal_arr(graph, struct hook_node *, 0); + graph = tal_arr(tmpctx, struct hook_node, 0); + for (size_t i = 0; i < tal_count(hook->hooks); i++) { + struct hook_node hn; + + if (!hook->hooks[i]) + continue; + hn.finished = false; + hn.hook = hook->hooks[i]; + hn.num_incoming = 0; + hn.outgoing = tal_arr(graph, struct hook_node *, 0); + tal_arr_expand(&graph, hn); } + if (tal_count(graph) == 0) + return NULL; + /* Add edges. */ for (size_t i = 0; i < tal_count(graph); i++) { for (size_t j = 0; j < tal_count(graph[i].hook->before); j++) { @@ -505,7 +670,7 @@ static struct plugin **plugin_hook_make_ordered(const tal_t *ctx, n->outgoing[i]->num_incoming--; } - if (tal_count(done) != tal_count(hook->hooks)) { + if (tal_count(done) != tal_count(graph)) { struct plugin **ret = tal_arr(ctx, struct plugin *, 0); for (size_t i = 0; i < tal_count(graph); i++) { if (!graph[i].finished) @@ -514,9 +679,20 @@ static struct plugin **plugin_hook_make_ordered(const tal_t *ctx, return ret; } - /* Success! Copy ordered hooks back. */ - if (hook->hooks) - memcpy(hook->hooks, done, tal_bytelen(hook->hooks)); + /* If we had previous update pending, this subsumes it */ + tal_free(hook->new_hooks); + hook->new_hooks = notleak(tal_steal(NULL, done)); + + /* If nobody is using it now, we can just replace the hooks array. + * Otherwise defer. */ + if (hook->num_users == 0) { + tal_free(hook->hooks); + hook->hooks = hook->new_hooks; + hook->new_hooks = NULL; + } else + /* If this ever live locks, we will see this in the log! */ + log_unusual(log, "Deferring registration of hook %s until it's not in use.", + hook->name); return NULL; } @@ -530,14 +706,15 @@ static void append_plugin_once(struct plugin ***ret, struct plugin *p) tal_arr_expand(ret, p); } -struct plugin **plugin_hooks_make_ordered(const tal_t *ctx) +struct plugin **plugin_hooks_make_ordered(const tal_t *ctx, + struct logger *log) { size_t num_hooks; struct plugin_hook **hooks = get_hooks(&num_hooks); struct plugin **ret = tal_arr(ctx, struct plugin *, 0); for (size_t i=0; i #include /** @@ -47,9 +48,19 @@ struct plugin_hook { void (*serialize_payload)(void *src, struct json_stream *dest, struct plugin *plugin); + /* Type of filters we allow (JSMN_UNDEFINED means none) */ + jsmntype_t filter_type; + /* Which plugins have registered this hook? This is a `tal_arr` * initialized at creation. */ struct hook_instance **hooks; + + /* Reference count for using the hook right now */ + size_t num_users; + + /* If someone was using the hooks while we were trying to update, + * we put the hook here for later use. */ + struct hook_instance **new_hooks; }; AUTODATA_TYPE(hooks, struct plugin_hook); @@ -60,7 +71,9 @@ AUTODATA_TYPE(hooks, struct plugin_hook); * still waiting on a plugin response. */ bool plugin_hook_call_(struct lightningd *ld, - const struct plugin_hook *hook, + struct plugin_hook *hook, + const char *strfilterfield TAKES, + u64 intfilterfield, const char *cmd_id TAKES, tal_t *cb_arg STEALS); @@ -73,11 +86,25 @@ bool plugin_hook_continue(void *arg, const char *buffer, const jsmntok_t *toks); * the method-name is correct for the call. */ /* FIXME: Find a way to avoid back-to-back declaration and definition */ -#define PLUGIN_HOOK_CALL_DEF(name, cb_arg_type) \ +#define PLUGIN_HOOK_CALL_DEF_NOFILTER(name, cb_arg_type) \ UNNEEDED static inline bool plugin_hook_call_##name( \ struct lightningd *ld, const char *cmd_id TAKES, cb_arg_type cb_arg STEALS) \ { \ - return plugin_hook_call_(ld, &name##_hook_gen, cmd_id, cb_arg); \ + return plugin_hook_call_(ld, &name##_hook_gen, NULL, 0, cmd_id, cb_arg); \ + } + +#define PLUGIN_HOOK_CALL_DEF_STRFILTER(name, cb_arg_type) \ + UNNEEDED static inline bool plugin_hook_call_##name( \ + struct lightningd *ld, const char *strfilterfield, const char *cmd_id TAKES, cb_arg_type cb_arg STEALS) \ + { \ + return plugin_hook_call_(ld, &name##_hook_gen, strfilterfield, 0, cmd_id, cb_arg); \ + } + +#define PLUGIN_HOOK_CALL_DEF_INTFILTER(name, cb_arg_type) \ + UNNEEDED static inline bool plugin_hook_call_##name( \ + struct lightningd *ld, u64 intfilterfield, const char *cmd_id TAKES, cb_arg_type cb_arg STEALS) \ + { \ + return plugin_hook_call_(ld, &name##_hook_gen, NULL, intfilterfield, cmd_id, cb_arg); \ } /* Typechecked registration of a plugin hook. We check that the @@ -88,7 +115,7 @@ bool plugin_hook_continue(void *arg, const char *buffer, const jsmntok_t *toks); * response_cb function accepts the deserialized response format and * an arbitrary extra argument used to maintain context. */ -#define REGISTER_PLUGIN_HOOK(name, deserialize_cb, final_cb, \ +#define REGISTER_PLUGIN_HOOK2(name, filter_type, deserialize_cb, final_cb, \ serialize_payload, cb_arg_type) \ struct plugin_hook name##_hook_gen = { \ stringify(name), \ @@ -102,13 +129,31 @@ bool plugin_hook_continue(void *arg, const char *buffer, const jsmntok_t *toks); void (*)(void *, struct json_stream *, struct plugin *), \ void (*)(cb_arg_type, struct json_stream *, struct plugin *), \ serialize_payload), \ - NULL, /* .plugins */ \ + (filter_type), \ + NULL, /* .plugins */ \ }; \ - AUTODATA(hooks, &name##_hook_gen); \ - PLUGIN_HOOK_CALL_DEF(name, cb_arg_type) + AUTODATA(hooks, &name##_hook_gen) + +#define REGISTER_PLUGIN_HOOK(name, deserialize_cb, final_cb, \ + serialize_payload, cb_arg_type) \ + REGISTER_PLUGIN_HOOK2(name, JSMN_UNDEFINED, deserialize_cb, final_cb, serialize_payload, cb_arg_type); \ + PLUGIN_HOOK_CALL_DEF_NOFILTER(name, cb_arg_type) + +#define REGISTER_PLUGIN_HOOK_STRFILTER(name, deserialize_cb, final_cb, \ + serialize_payload, cb_arg_type) \ + REGISTER_PLUGIN_HOOK2(name, JSMN_STRING, deserialize_cb, final_cb, serialize_payload, cb_arg_type) \ + PLUGIN_HOOK_CALL_DEF_STRFILTER(name, cb_arg_type) + +#define REGISTER_PLUGIN_HOOK_INTFILTER(name, deserialize_cb, final_cb, \ + serialize_payload, cb_arg_type) \ + REGISTER_PLUGIN_HOOK2(name, JSMN_PRIMITIVE, deserialize_cb, final_cb, serialize_payload, cb_arg_type) \ + PLUGIN_HOOK_CALL_DEF_INTFILTER(name, cb_arg_type) -struct plugin_hook *plugin_hook_register(struct plugin *plugin, - const char *method); +/* Returns the error, or NULL and populates *plugin_hook */ +const char *plugin_hook_register(struct plugin *plugin, + const char *method, + const char *buf, const jsmntok_t *filterstok, + struct plugin_hook **plugin_hook); /* Special sync plugin hook for db. */ void plugin_hook_db_sync(struct db *db); @@ -121,6 +166,7 @@ void plugin_hook_add_deps(struct plugin_hook *hook, const jsmntok_t *after); /* Returns array of plugins which cannot be ordered (empty on success) */ -struct plugin **plugin_hooks_make_ordered(const tal_t *ctx); +struct plugin **plugin_hooks_make_ordered(const tal_t *ctx, + struct logger *log); #endif /* LIGHTNING_LIGHTNINGD_PLUGIN_HOOK_H */ diff --git a/lightningd/test/run-find_my_abspath.c b/lightningd/test/run-find_my_abspath.c index b03fcfda012e..80c2a42c60bc 100644 --- a/lightningd/test/run-find_my_abspath.c +++ b/lightningd/test/run-find_my_abspath.c @@ -163,7 +163,9 @@ void onchaind_replay_channels(struct lightningd *ld UNNEEDED) { fprintf(stderr, "onchaind_replay_channels called!\n"); abort(); } /* Generated stub for plugin_hook_call_ */ bool plugin_hook_call_(struct lightningd *ld UNNEEDED, - const struct plugin_hook *hook UNNEEDED, + struct plugin_hook *hook UNNEEDED, + const char *strfilterfield TAKES UNNEEDED, + u64 intfilterfield UNNEEDED, const char *cmd_id TAKES UNNEEDED, tal_t *cb_arg STEALS UNNEEDED) { fprintf(stderr, "plugin_hook_call_ called!\n"); abort(); } diff --git a/lightningd/test/run-invoice-select-inchan.c b/lightningd/test/run-invoice-select-inchan.c index 64e4168e67b1..97bee8f73a84 100644 --- a/lightningd/test/run-invoice-select-inchan.c +++ b/lightningd/test/run-invoice-select-inchan.c @@ -583,7 +583,9 @@ bool peer_start_openingd(struct peer *peer UNNEEDED, { fprintf(stderr, "peer_start_openingd called!\n"); abort(); } /* Generated stub for plugin_hook_call_ */ bool plugin_hook_call_(struct lightningd *ld UNNEEDED, - const struct plugin_hook *hook UNNEEDED, + struct plugin_hook *hook UNNEEDED, + const char *strfilterfield TAKES UNNEEDED, + u64 intfilterfield UNNEEDED, const char *cmd_id TAKES UNNEEDED, tal_t *cb_arg STEALS UNNEEDED) { fprintf(stderr, "plugin_hook_call_ called!\n"); abort(); } diff --git a/lightningd/test/run-jsonrpc.c b/lightningd/test/run-jsonrpc.c index c25cb535d7e6..c28ca212ae88 100644 --- a/lightningd/test/run-jsonrpc.c +++ b/lightningd/test/run-jsonrpc.c @@ -81,7 +81,9 @@ u32 penalty_feerate(struct chain_topology *topo UNNEEDED) { fprintf(stderr, "penalty_feerate called!\n"); abort(); } /* Generated stub for plugin_hook_call_ */ bool plugin_hook_call_(struct lightningd *ld UNNEEDED, - const struct plugin_hook *hook UNNEEDED, + struct plugin_hook *hook UNNEEDED, + const char *strfilterfield TAKES UNNEEDED, + u64 intfilterfield UNNEEDED, const char *cmd_id TAKES UNNEEDED, tal_t *cb_arg STEALS UNNEEDED) { fprintf(stderr, "plugin_hook_call_ called!\n"); abort(); } diff --git a/plugins/bkpr/bookkeeper.c b/plugins/bkpr/bookkeeper.c index 7521b210c016..87caf8786add 100644 --- a/plugins/bkpr/bookkeeper.c +++ b/plugins/bkpr/bookkeeper.c @@ -88,8 +88,7 @@ static void parse_and_log_channel_move(struct command *cmd, const char *buf, const jsmntok_t *channelmove, - struct refresh_info *rinfo, - bool log); + struct refresh_info *rinfo); static struct command_result *datastore_done(struct command *cmd, const char *method, @@ -110,6 +109,7 @@ static struct fee_sum *find_sum_for_txid(struct fee_sum **sums, return NULL; } +#define LISTCHANNELMOVES_LIMIT 10000 static struct command_result *listchannelmoves_done(struct command *cmd, const char *method, const char *buf, @@ -122,15 +122,8 @@ static struct command_result *listchannelmoves_done(struct command *cmd, be64 be_index; moves = json_get_member(buf, result, "channelmoves"); - if (moves->size > 2) { - plugin_log(cmd->plugin, LOG_DBG, - "%u channelmoves, only logging first and last", - moves->size); - } - json_for_each_arr(i, t, moves) - parse_and_log_channel_move(cmd, buf, t, rinfo, - i == 0 || i == moves->size - 1); + parse_and_log_channel_move(cmd, buf, t, rinfo); be_index = cpu_to_be64(bkpr->channelmoves_index); jsonrpc_set_datastore_binary(cmd, "bookkeeper/channelmoves_index", @@ -139,7 +132,7 @@ static struct command_result *listchannelmoves_done(struct command *cmd, datastore_done, NULL, use_rinfo(rinfo)); /* If there might be more, try asking for more */ - if (moves->size != 0) + if (moves->size == LISTCHANNELMOVES_LIMIT) limited_listchannelmoves(cmd, rinfo); return rinfo_one_done(cmd, rinfo); @@ -158,7 +151,7 @@ static struct command_result *limited_listchannelmoves(struct command *cmd, use_rinfo(rinfo)); json_add_string(req->js, "index", "created"); json_add_u64(req->js, "start", bkpr->channelmoves_index + 1); - json_add_u64(req->js, "limit", 1000); + json_add_u64(req->js, "limit", LISTCHANNELMOVES_LIMIT); return send_outreq(req); } @@ -238,12 +231,12 @@ getblockheight_done(struct command *cmd, if (!blockheight_tok) plugin_err(cmd->plugin, "getblockheight: " "getinfo gave no 'blockheight'? '%.*s'", - result->end - result->start, buf); + result->end - result->start, buf + result->start); if (!json_to_u32(buf, blockheight_tok, &blockheight)) plugin_err(cmd->plugin, "getblockheight: " "getinfo gave non-unsigned-32-bit 'blockheight'? '%.*s'", - result->end - result->start, buf); + result->end - result->start, buf + result->start); /* Get the income events */ apys = compute_channel_apys(cmd, bkpr, cmd, @@ -1013,7 +1006,7 @@ listinvoices_done(struct command *cmd, "listinvoices:" " description/bolt11/bolt12" " not found (%.*s)", - result->end - result->start, buf); + result->end - result->start, buf + result->start); return rinfo_one_done(cmd, phinfo->rinfo); } @@ -1053,7 +1046,7 @@ listsendpays_done(struct command *cmd, plugin_log(cmd->plugin, LOG_DBG, "listpays: bolt11/bolt12 not found:" "(%.*s)", - result->end - result->start, buf); + result->end - result->start, buf + result->start); return rinfo_one_done(cmd, phinfo->rinfo); } @@ -1286,8 +1279,7 @@ static void parse_and_log_channel_move(struct command *cmd, const char *buf, const jsmntok_t *channelmove, - struct refresh_info *rinfo, - bool log) + struct refresh_info *rinfo) { struct channel_event *e = tal(cmd, struct channel_event); struct account *acct; @@ -1334,12 +1326,11 @@ parse_and_log_channel_move(struct command *cmd, err = tal_free(err); } - if (log) - plugin_log(cmd->plugin, LOG_DBG, "coin_move 2 (%s) %s -%s %s %"PRIu64, - e->tag, - fmt_amount_msat(tmpctx, e->credit), - fmt_amount_msat(tmpctx, e->debit), - CHANNEL_MOVE, e->timestamp); + plugin_log(cmd->plugin, LOG_DBG, "coin_move 2 (%s) %s -%s %s %"PRIu64, + e->tag, + fmt_amount_msat(tmpctx, e->credit), + fmt_amount_msat(tmpctx, e->debit), + CHANNEL_MOVE, e->timestamp); /* Go find the account for this event */ acct = find_account(bkpr, acct_name); diff --git a/plugins/chanbackup.c b/plugins/chanbackup.c index 3b0d128606a9..cebd2201328f 100644 --- a/plugins/chanbackup.c +++ b/plugins/chanbackup.c @@ -1126,10 +1126,14 @@ static const struct plugin_notification notifs[] = { }, }; +static u64 custommsg_types[] = { WIRE_PEER_STORAGE, WIRE_PEER_STORAGE_RETRIEVAL }; + static const struct plugin_hook hooks[] = { { - "custommsg", - handle_your_peer_storage, + .name = "custommsg", + .handle = handle_your_peer_storage, + .intfilters = custommsg_types, + .num_intfilters = ARRAY_SIZE(custommsg_types), }, { "peer_connected", diff --git a/plugins/commando.c b/plugins/commando.c index f5022be8dd14..d0d2e91c8bd0 100644 --- a/plugins/commando.c +++ b/plugins/commando.c @@ -603,10 +603,19 @@ static struct command_result *handle_custommsg(struct command *cmd, return command_hook_success(cmd); } +static u64 custommsg_types[] = { + COMMANDO_MSG_CMD_CONTINUES, + COMMANDO_MSG_CMD_TERM, + COMMANDO_MSG_REPLY_CONTINUES, + COMMANDO_MSG_REPLY_TERM, +}; + static const struct plugin_hook hooks[] = { { - "custommsg", - handle_custommsg + .name = "custommsg", + .handle = handle_custommsg, + .intfilters = custommsg_types, + .num_intfilters = ARRAY_SIZE(custommsg_types), }, }; diff --git a/plugins/libplugin.c b/plugins/libplugin.c index db49590d62f8..22d4995402e2 100644 --- a/plugins/libplugin.c +++ b/plugins/libplugin.c @@ -1018,7 +1018,8 @@ static void destroy_cmd_mark_freed(struct command *cmd, bool *cmd_freed) *cmd_freed = true; } -static void handle_rpc_reply(struct plugin *plugin, const char *buf, const jsmntok_t *toks) +static void handle_rpc_reply(const tal_t *working_ctx, + struct plugin *plugin, const char *buf, const jsmntok_t *toks) { const jsmntok_t *idtok, *contenttok; struct out_req *out; @@ -1042,7 +1043,7 @@ static void handle_rpc_reply(struct plugin *plugin, const char *buf, const jsmnt } /* We want to free this if callback doesn't. */ - tal_steal(tmpctx, out); + tal_steal(working_ctx, out); /* If they return complete, cmd should have been freed! */ cmd_freed = false; @@ -1302,6 +1303,19 @@ handle_getmanifest(struct command *getmanifest_cmd, p->hook_subs[i].after[j]); json_array_end(params); } + if (p->hook_subs[i].num_strfilters) { + json_array_start(params, "filters"); + for (size_t j = 0; j < p->hook_subs[i].num_strfilters; j++) + json_add_string(params, NULL, + p->hook_subs[i].strfilters[j]); + json_array_end(params); + } else if (p->hook_subs[i].num_intfilters) { + json_array_start(params, "filters"); + for (size_t j = 0; j < p->hook_subs[i].num_intfilters; j++) + json_add_u64(params, NULL, + p->hook_subs[i].intfilters[j]); + json_array_end(params); + } json_object_end(params); } json_array_end(params); @@ -1343,6 +1357,8 @@ static void rpc_conn_finished(struct io_conn *conn, static struct io_plan *rpc_conn_read_response(struct io_conn *conn, struct plugin *plugin) { + const tal_t *working_ctx = tal(NULL, char); + /* Gather an parse any new bytes */ for (;;) { const jsmntok_t *toks; @@ -1358,10 +1374,14 @@ static struct io_plan *rpc_conn_read_response(struct io_conn *conn, if (!toks) break; - handle_rpc_reply(plugin, buf, toks); + handle_rpc_reply(working_ctx, plugin, buf, toks); jsonrpc_io_parse_done(plugin->jsonrpc_in); } + /* Explicitly free any expired requests now; xpay uses this to + * fire more commands! */ + tal_free(working_ctx); + /* Read more */ return jsonrpc_io_read(conn, plugin->jsonrpc_in, rpc_conn_read_response, diff --git a/plugins/libplugin.h b/plugins/libplugin.h index cec028b2fed5..bcc68dc03948 100644 --- a/plugins/libplugin.h +++ b/plugins/libplugin.h @@ -99,6 +99,14 @@ struct plugin_hook { const jsmntok_t *params); /* If non-NULL, these are NULL-terminated arrays of deps */ const char **before, **after; + + /* String filters (you can only set this *or* intfilters) */ + const char **strfilters; + size_t num_strfilters; + + /* Integer filters */ + const u64 *intfilters; + size_t num_intfilters; }; /* Return the feature set of the current lightning node */ diff --git a/plugins/sql.c b/plugins/sql.c index 17c63faf7614..7f8631b000e5 100644 --- a/plugins/sql.c +++ b/plugins/sql.c @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -111,6 +112,8 @@ struct table_desc { const char *name; /* e.g. "payments" for listsendpays */ const char *arrname; + /* name if we need to wait for changes */ + const char *waitname; struct column **columns; char *update_stmt; /* If we are a subtable */ @@ -127,18 +130,30 @@ struct table_desc { struct db_query *dbq); /* some refresh functions maintain changed and created indexes */ u64 last_created_index; + /* Do we need a refresh? */ + bool needs_refresh; /* Are we refreshing now? */ bool refreshing; + /* When did we start refreshing? */ + struct timemono refresh_start; /* Any other commands waiting for the refresh completion */ struct list_head refresh_waiters; }; -static STRMAP(struct table_desc *) tablemap; -static size_t max_dbmem = 500000000; -static struct sqlite3 *db; -static char *dbfilename; -static int gosstore_fd = -1; -static size_t gosstore_nodes_off = 0, gosstore_channels_off = 0; -static u64 next_rowid = 1; + +typedef STRMAP(struct table_desc *) tablemap; +struct sql { + tablemap tablemap; + struct sqlite3 *db; + char *dbfilename; + int gosstore_fd ; + size_t gosstore_nodes_off, gosstore_channels_off; + u64 next_rowid; +}; + +static struct sql *sql_of(struct plugin *plugin) +{ + return plugin_get_data(plugin, struct sql); +} /* It was tempting to put these in the schema, but they're really * just for our usage. Though that would allow us to autogen the @@ -217,9 +232,10 @@ static struct sqlite3 *sqlite_setup(struct plugin *plugin) int err; struct sqlite3 *db; char *errmsg; + struct sql *sql = sql_of(plugin); - if (dbfilename) { - err = sqlite3_open_v2(dbfilename, &db, + if (sql->dbfilename) { + err = sqlite3_open_v2(sql->dbfilename, &db, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL); } else { @@ -252,17 +268,21 @@ static struct sqlite3 *sqlite_setup(struct plugin *plugin) sqlite3_limit(db, SQLITE_LIMIT_TRIGGER_DEPTH, 1); sqlite3_limit(db, SQLITE_LIMIT_WORKER_THREADS, 1); - /* Default is now 4k pages, so allow 500MB */ - err = sqlite3_exec(db, tal_fmt(tmpctx, "PRAGMA max_page_count = %zu;", - max_dbmem / 4096), - NULL, NULL, &errmsg); - if (err != SQLITE_OK) - plugin_err(plugin, "Could not set max_page_count: %s", errmsg); - err = sqlite3_exec(db, "PRAGMA foreign_keys = ON;", NULL, NULL, &errmsg); if (err != SQLITE_OK) plugin_err(plugin, "Could not set foreign_keys: %s", errmsg); + if (sql->dbfilename) { + err = sqlite3_exec(db, + "PRAGMA synchronous = OFF;" + "PRAGMA journal_mode = OFF;" + "PRAGMA temp_store = MEMORY;" + , NULL, NULL, + &errmsg); + if (err != SQLITE_OK) + plugin_err(plugin, "Could not disable sync: %s", errmsg); + } + return db; } @@ -299,7 +319,8 @@ static int sqlite_authorize(void *dbq_, int code, /* You can do a column read: takes a table name, column name */ if (code == SQLITE_READ) { - struct table_desc *td = strmap_get(&tablemap, a); + struct sql *sql = sql_of(dbq->cmd->plugin); + struct table_desc *td = strmap_get(&sql->tablemap, a); struct column *col; if (!td) { dbq->authfail = tal_fmt(dbq, "Unknown table %s", a); @@ -400,6 +421,7 @@ static int sqlite_authorize(void *dbq_, int code, static struct command_result *refresh_complete(struct command *cmd, struct db_query *dbq) { + struct sql *sql = sql_of(cmd->plugin); char *errmsg; int err, num_cols; size_t num_rows; @@ -464,7 +486,7 @@ static struct command_result *refresh_complete(struct command *cmd, } if (err != SQLITE_DONE) errmsg = tal_fmt(cmd, "Executing statement: %s", - sqlite3_errmsg(db)); + sqlite3_errmsg(sql->db)); sqlite3_finalize(dbq->stmt); @@ -491,6 +513,8 @@ static struct command_result *refresh_complete(struct command *cmd, static void init_indices(struct plugin *plugin, const struct table_desc *td) { + struct sql *sql = sql_of(plugin); + for (size_t i = 0; i < ARRAY_SIZE(indices); i++) { char *errmsg, *cmd; int err; @@ -505,7 +529,7 @@ static void init_indices(struct plugin *plugin, const struct table_desc *td) if (indices[i].fields[1]) tal_append_fmt(&cmd, ", %s", indices[i].fields[1]); tal_append_fmt(&cmd, ");"); - err = sqlite3_exec(db, cmd, NULL, NULL, &errmsg); + err = sqlite3_exec(sql->db, cmd, NULL, NULL, &errmsg); if (err != SQLITE_OK) plugin_err(plugin, "Failed '%s': %s", cmd, errmsg); } @@ -515,20 +539,68 @@ static void init_indices(struct plugin *plugin, const struct table_desc *td) static struct command_result *refresh_tables(struct command *cmd, struct db_query *dbq); +static struct command_result *next_refresh(struct command *cmd, + struct db_query *dbq) +{ + /* Remove that, iterate */ + tal_arr_remove(&dbq->tables, 0); + return refresh_tables(cmd, dbq); +} + +static struct command_result *wait_done(struct command *cmd, + const char *method, + const char *buf, + const jsmntok_t *result, + struct table_desc *td) +{ + td->needs_refresh = true; + return aux_command_done(cmd); +} + static struct command_result *one_refresh_done(struct command *cmd, - struct db_query *dbq) + struct db_query *dbq, + bool was_limited) { struct table_desc *td = dbq->tables[0]; struct list_head waiters; struct refresh_waiter *rw; + struct timerel refresh_duration = timemono_since(td->refresh_start); + + /* If we may have more, keep going. */ + if (was_limited) + return td->refresh(cmd, dbq->tables[0], dbq); /* We are no longer refreshing */ assert(td->refreshing); td->refreshing = false; + plugin_log(cmd->plugin, LOG_DBG, + "Time to refresh %s: %"PRIu64".%09"PRIu64" seconds (last=%"PRIu64")", + td->name, + (u64)refresh_duration.ts.tv_sec, + (u64)refresh_duration.ts.tv_nsec, + td->last_created_index); if (!td->indices_created) { init_indices(cmd->plugin, td); td->indices_created = 1; + refresh_duration = timemono_since(td->refresh_start); + plugin_log(cmd->plugin, LOG_DBG, + "Time to refresh + create indices for %s: %"PRIu64".%09"PRIu64" seconds", + td->name, + (u64)refresh_duration.ts.tv_sec, + (u64)refresh_duration.ts.tv_nsec); + } + + /* We put in a wait command to we get told when we need to refresh */ + if (td->waitname) { + struct out_req *req; + td->needs_refresh = false; + req = jsonrpc_request_start(aux_command(cmd), "wait", wait_done, + plugin_broken_cb, td); + json_add_string(req->js, "subsystem", td->waitname); + json_add_string(req->js, "indexname", "created"); + json_add_u64(req->js, "nextvalue", td->last_created_index+1); + send_outreq(req); } /* Transfer refresh waiters onto local list */ @@ -545,10 +617,7 @@ static struct command_result *one_refresh_done(struct command *cmd, tal_arr_remove(&rwdbq->tables, 0); refresh_tables(rwcmd, rwdbq); } - - /* Remove that, iterate */ - tal_arr_remove(&dbq->tables, 0); - return refresh_tables(cmd, dbq); + return next_refresh(cmd, dbq); } /* Mutual recursion */ @@ -605,6 +674,7 @@ static struct command_result *process_json_obj(struct command *cmd, sqlite3_stmt *stmt, u64 *last_created_index) { + struct sql *sql = sql_of(cmd->plugin); int err; /* Subtables have row, arrindex as first two columns. */ @@ -748,7 +818,7 @@ static struct command_result *process_json_obj(struct command *cmd, "Error executing %s on row %zu: %s", td->update_stmt, row, - sqlite3_errmsg(db)); + sqlite3_errmsg(sql->db)); } return process_json_subobjs(cmd, buf, t, td, this_rowid, last_created_index); @@ -762,17 +832,18 @@ static struct command_result *process_json_list(struct command *cmd, const struct table_desc *td, u64 *last_created_index) { + struct sql *sql = sql_of(cmd->plugin); size_t i; const jsmntok_t *t; int err; sqlite3_stmt *stmt; struct command_result *ret = NULL; - err = sqlite3_prepare_v2(db, td->update_stmt, -1, &stmt, NULL); + err = sqlite3_prepare_v2(sql->db, td->update_stmt, -1, &stmt, NULL); if (err != SQLITE_OK) { return command_fail(cmd, LIGHTNINGD, "preparing '%s' failed: %s", td->update_stmt, - sqlite3_errmsg(db)); + sqlite3_errmsg(sql->db)); } json_for_each_arr(i, t, arr) { @@ -781,7 +852,7 @@ static struct command_result *process_json_list(struct command *cmd, u64 this_rowid; if (!td->has_created_index) { - this_rowid = next_rowid++; + this_rowid = sql->next_rowid++; /* First entry is always the rowid */ sqlite3_bind_int64(stmt, off++, this_rowid); } else { @@ -807,11 +878,20 @@ static struct command_result *process_json_result(struct command *cmd, const char *buf, const jsmntok_t *result, const struct table_desc *td, - u64 *last_created_index) + u64 *last_created_index, + size_t *num_entries) { - return process_json_list(cmd, buf, - json_get_member(buf, result, td->arrname), - NULL, td, last_created_index); + const jsmntok_t *arr; + struct timerel so_far = timemono_since(td->refresh_start); + plugin_log(cmd->plugin, LOG_DBG, + "Time to call %s: %"PRIu64".%09"PRIu64" seconds", + td->cmdname, + (u64)so_far.ts.tv_sec, (u64)so_far.ts.tv_nsec); + + arr = json_get_member(buf, result, td->arrname); + if (num_entries) + *num_entries = arr->size; + return process_json_list(cmd, buf, arr, NULL, td, last_created_index); } static struct command_result *default_list_done(struct command *cmd, @@ -820,24 +900,25 @@ static struct command_result *default_list_done(struct command *cmd, const jsmntok_t *result, struct db_query *dbq) { + struct sql *sql = sql_of(cmd->plugin); const struct table_desc *td = dbq->tables[0]; struct command_result *ret; int err; char *errmsg; /* FIXME: this is where a wait / pagination API is useful! */ - err = sqlite3_exec(db, tal_fmt(tmpctx, "DELETE FROM %s;", td->name), + err = sqlite3_exec(sql->db, tal_fmt(tmpctx, "DELETE FROM %s;", td->name), NULL, NULL, &errmsg); if (err != SQLITE_OK) { return command_fail(cmd, LIGHTNINGD, "cleaning '%s' failed: %s", td->name, errmsg); } - ret = process_json_result(cmd, buf, result, td, dbq->last_created_index); + ret = process_json_result(cmd, buf, result, td, dbq->last_created_index, NULL); if (ret) return ret; - return one_refresh_done(cmd, dbq); + return one_refresh_done(cmd, dbq, false); } static struct command_result *default_refresh(struct command *cmd, @@ -888,10 +969,11 @@ static bool extract_scid(int gosstore_fd, size_t off, u16 type, static void delete_channel_from_db(struct command *cmd, struct short_channel_id scid) { + struct sql *sql = sql_of(cmd->plugin); int err; char *errmsg; - err = sqlite3_exec(db, + err = sqlite3_exec(sql->db, tal_fmt(tmpctx, "DELETE FROM channels" " WHERE short_channel_id = '%s'", @@ -915,7 +997,7 @@ static struct command_result *listchannels_one_done(struct command *cmd, const struct table_desc *td = dbq->tables[0]; struct command_result *ret; - ret = process_json_result(cmd, buf, result, td, dbq->last_created_index); + ret = process_json_result(cmd, buf, result, td, dbq->last_created_index, NULL); if (ret) return ret; @@ -927,42 +1009,43 @@ static struct command_result *channels_refresh(struct command *cmd, const struct table_desc *td, struct db_query *dbq) { + struct sql *sql = sql_of(cmd->plugin); struct out_req *req; size_t msglen; u16 type, flags; - if (gosstore_fd == -1) { - gosstore_fd = open("gossip_store", O_RDONLY); - if (gosstore_fd == -1) + if (sql->gosstore_fd == -1) { + sql->gosstore_fd = open("gossip_store", O_RDONLY); + if (sql->gosstore_fd == -1) plugin_err(cmd->plugin, "Could not open gossip_store: %s", strerror(errno)); } /* First time, set off to end and load from scratch */ - if (gosstore_channels_off == 0) { - gosstore_channels_off = find_gossip_store_end(gosstore_fd, 1); + if (sql->gosstore_channels_off == 0) { + sql->gosstore_channels_off = find_gossip_store_end(sql->gosstore_fd, 1); return default_refresh(cmd, td, dbq); } plugin_log(cmd->plugin, LOG_DBG, "Refreshing channels @%zu...", - gosstore_channels_off); + sql->gosstore_channels_off); /* OK, try catching up! */ - while (gossip_store_readhdr(gosstore_fd, gosstore_channels_off, + while (gossip_store_readhdr(sql->gosstore_fd, sql->gosstore_channels_off, &msglen, NULL, &flags, &type)) { struct short_channel_id scid; - size_t off = gosstore_channels_off; + size_t off = sql->gosstore_channels_off; - gosstore_channels_off += sizeof(struct gossip_hdr) + msglen; + sql->gosstore_channels_off += sizeof(struct gossip_hdr) + msglen; if (flags & GOSSIP_STORE_DELETED_BIT) continue; if (type == WIRE_GOSSIP_STORE_ENDED) { /* Force a reopen */ - gosstore_channels_off = gosstore_nodes_off = 0; - close(gosstore_fd); - gosstore_fd = -1; + sql->gosstore_channels_off = sql->gosstore_nodes_off = 0; + close(sql->gosstore_fd); + sql->gosstore_fd = -1; return channels_refresh(cmd, td, dbq); } @@ -971,8 +1054,8 @@ static struct command_result *channels_refresh(struct command *cmd, if (type == WIRE_CHANNEL_UPDATE || type == WIRE_GOSSIP_STORE_PRIVATE_UPDATE_OBS) { /* This can fail if entry not fully written yet. */ - if (!extract_scid(gosstore_fd, off, type, &scid)) { - gosstore_channels_off = off; + if (!extract_scid(sql->gosstore_fd, off, type, &scid)) { + sql->gosstore_channels_off = off; break; } @@ -989,8 +1072,8 @@ static struct command_result *channels_refresh(struct command *cmd, return send_outreq(req); } else if (type == WIRE_GOSSIP_STORE_DELETE_CHAN) { /* This can fail if entry not fully written yet. */ - if (!extract_scid(gosstore_fd, off, type, &scid)) { - gosstore_channels_off = off; + if (!extract_scid(sql->gosstore_fd, off, type, &scid)) { + sql->gosstore_channels_off = off; break; } plugin_log(cmd->plugin, LOG_DBG, "Deleting channel: %s", @@ -999,7 +1082,7 @@ static struct command_result *channels_refresh(struct command *cmd, } } - return one_refresh_done(cmd, dbq); + return one_refresh_done(cmd, dbq, false); } static struct command_result *nodes_refresh(struct command *cmd, @@ -1015,7 +1098,7 @@ static struct command_result *listnodes_one_done(struct command *cmd, const struct table_desc *td = dbq->tables[0]; struct command_result *ret; - ret = process_json_result(cmd, buf, result, td, dbq->last_created_index); + ret = process_json_result(cmd, buf, result, td, dbq->last_created_index, NULL); if (ret) return ret; @@ -1026,10 +1109,11 @@ static struct command_result *listnodes_one_done(struct command *cmd, static void delete_node_from_db(struct command *cmd, const struct node_id *id) { + struct sql *sql = sql_of(cmd->plugin); int err; char *errmsg; - err = sqlite3_exec(db, + err = sqlite3_exec(sql->db, tal_fmt(tmpctx, "DELETE FROM nodes" " WHERE nodeid = X'%s'", @@ -1073,46 +1157,47 @@ static struct command_result *nodes_refresh(struct command *cmd, const struct table_desc *td, struct db_query *dbq) { + struct sql *sql = sql_of(cmd->plugin); struct out_req *req; size_t msglen; u16 type, flags; - if (gosstore_fd == -1) { - gosstore_fd = open("gossip_store", O_RDONLY); - if (gosstore_fd == -1) + if (sql->gosstore_fd == -1) { + sql->gosstore_fd = open("gossip_store", O_RDONLY); + if (sql->gosstore_fd == -1) plugin_err(cmd->plugin, "Could not open gossip_store: %s", strerror(errno)); } /* First time, set off to end and load from scratch */ - if (gosstore_nodes_off == 0) { - gosstore_nodes_off = find_gossip_store_end(gosstore_fd, 1); + if (sql->gosstore_nodes_off == 0) { + sql->gosstore_nodes_off = find_gossip_store_end(sql->gosstore_fd, 1); return default_refresh(cmd, td, dbq); } /* OK, try catching up! */ - while (gossip_store_readhdr(gosstore_fd, gosstore_nodes_off, + while (gossip_store_readhdr(sql->gosstore_fd, sql->gosstore_nodes_off, &msglen, NULL, &flags, &type)) { struct node_id id; - size_t off = gosstore_nodes_off; + size_t off = sql->gosstore_nodes_off; - gosstore_nodes_off += sizeof(struct gossip_hdr) + msglen; + sql->gosstore_nodes_off += sizeof(struct gossip_hdr) + msglen; if (flags & GOSSIP_STORE_DELETED_BIT) continue; if (type == WIRE_GOSSIP_STORE_ENDED) { /* Force a reopen */ - gosstore_nodes_off = gosstore_channels_off = 0; - close(gosstore_fd); - gosstore_fd = -1; + sql->gosstore_nodes_off = sql->gosstore_channels_off = 0; + close(sql->gosstore_fd); + sql->gosstore_fd = -1; return nodes_refresh(cmd, td, dbq); } if (type == WIRE_NODE_ANNOUNCEMENT) { /* This can fail if entry not fully written yet. */ - if (!extract_node_id(gosstore_fd, off, type, &id)) { - gosstore_nodes_off = off; + if (!extract_node_id(sql->gosstore_fd, off, type, &id)) { + sql->gosstore_nodes_off = off; break; } @@ -1129,7 +1214,7 @@ static struct command_result *nodes_refresh(struct command *cmd, /* FIXME: Add WIRE_GOSSIP_STORE_DELETE_NODE marker! */ } - return one_refresh_done(cmd, dbq); + return one_refresh_done(cmd, dbq, false); } static struct command_result *refresh_tables(struct command *cmd, @@ -1153,8 +1238,12 @@ static struct command_result *refresh_tables(struct command *cmd, return command_still_pending(cmd); } + if (!td->needs_refresh) + return next_refresh(cmd, dbq); + dbq->last_created_index = &dbq->tables[0]->last_created_index; td->refreshing = true; + td->refresh_start = time_mono(); return td->refresh(cmd, dbq->tables[0], dbq); } @@ -1162,6 +1251,7 @@ static struct command_result *json_sql(struct command *cmd, const char *buffer, const jsmntok_t *params) { + struct sql *sql = sql_of(cmd->plugin); struct db_query *dbq = tal(cmd, struct db_query); const char *query; int err; @@ -1181,17 +1271,18 @@ static struct command_result *json_sql(struct command *cmd, /* This both checks we're not altering, *and* tells us what * tables to refresh. */ - err = sqlite3_set_authorizer(db, sqlite_authorize, dbq); + err = sqlite3_set_authorizer(sql->db, sqlite_authorize, dbq); if (err != SQLITE_OK) { plugin_err(cmd->plugin, "Could not set authorizer: %s", - sqlite3_errmsg(db)); + sqlite3_errmsg(sql->db)); } - err = sqlite3_prepare_v2(db, query, -1, &dbq->stmt, NULL); - sqlite3_set_authorizer(db, NULL, NULL); + err = sqlite3_prepare_v2(sql->db, query, -1, &dbq->stmt, NULL); + sqlite3_set_authorizer(sql->db, NULL, NULL); if (err != SQLITE_OK) { - char *errmsg = tal_fmt(tmpctx, "query failed with %s", sqlite3_errmsg(db)); + char *errmsg = tal_fmt(tmpctx, "query failed with %s", + sqlite3_errmsg(sql->db)); if (dbq->authfail) tal_append_fmt(&errmsg, " (%s)", dbq->authfail); return command_fail(cmd, LIGHTNINGD, "%s", errmsg); @@ -1215,7 +1306,8 @@ static struct command_result *param_tablename(struct command *cmd, const jsmntok_t *tok, struct table_desc **td) { - *td = strmap_getn(&tablemap, buffer + tok->start, + struct sql *sql = sql_of(cmd->plugin); + *td = strmap_getn(&sql->tablemap, buffer + tok->start, tok->end - tok->start); if (!*td) return command_fail_badparam(cmd, name, buffer, tok, @@ -1298,6 +1390,7 @@ static struct command_result *json_listsqlschemas(struct command *cmd, const char *buffer, const jsmntok_t *params) { + struct sql *sql = sql_of(cmd->plugin); struct table_desc *td; struct json_stream *ret; @@ -1311,7 +1404,7 @@ static struct command_result *json_listsqlschemas(struct command *cmd, if (td) json_add_schema(ret, td); else - strmap_iterate(&tablemap, add_one_schema, ret); + strmap_iterate(&sql->tablemap, add_one_schema, ret); json_array_end(ret); return command_finished(cmd, ret); } @@ -1356,6 +1449,7 @@ static const char *primary_key_name(const struct table_desc *td) /* Creates sql statements, initializes table */ static void finish_td(struct plugin *plugin, struct table_desc *td) { + struct sql *sql = sql_of(plugin); char *create_stmt; int err; char *errmsg; @@ -1409,7 +1503,7 @@ static void finish_td(struct plugin *plugin, struct table_desc *td) tal_append_fmt(&create_stmt, ");"); tal_append_fmt(&td->update_stmt, ");"); - err = sqlite3_exec(db, create_stmt, NULL, NULL, &errmsg); + err = sqlite3_exec(sql->db, create_stmt, NULL, NULL, &errmsg); if (err != SQLITE_OK) plugin_err(plugin, "Could not create %s: %s", td->name, errmsg); @@ -1461,6 +1555,8 @@ static const char *db_table_name(const tal_t *ctx, const char *cmdname) return ret; } +#define LIMIT_PER_LIST 10000 + static struct command_result *limited_list_done(struct command *cmd, const char *method, const char *buf, @@ -1469,12 +1565,15 @@ static struct command_result *limited_list_done(struct command *cmd, { struct table_desc *td = dbq->tables[0]; struct command_result *ret; + size_t num_entries; - ret = process_json_result(cmd, buf, result, td, dbq->last_created_index); + ret = process_json_result(cmd, buf, result, td, dbq->last_created_index, + &num_entries); if (ret) return ret; - return one_refresh_done(cmd, dbq); + /* If we got the number we asked for, we need to ask again. */ + return one_refresh_done(cmd, dbq, num_entries == LIMIT_PER_LIST); } /* The simplest case: append-only lists */ @@ -1488,6 +1587,7 @@ static struct command_result *refresh_by_created_index(struct command *cmd, dbq); json_add_string(req->js, "index", "created"); json_add_u64(req->js, "start", *dbq->last_created_index + 1); + json_add_u64(req->js, "limit", LIMIT_PER_LIST); return send_outreq(req); } @@ -1496,29 +1596,30 @@ struct refresh_funcs { struct command_result *(*refresh)(struct command *cmd, const struct table_desc *td, struct db_query *dbq); + const char *waitname; }; static const struct refresh_funcs refresh_funcs[] = { /* These are special, using gossmap */ - { "listchannels", channels_refresh }, - { "listnodes", nodes_refresh }, - /* FIXME: These support wait and full pagination */ - { "listhtlcs", default_refresh }, - { "listforwards", default_refresh }, - { "listinvoices", default_refresh }, - { "listsendpays", default_refresh }, + { "listchannels", channels_refresh, NULL }, + { "listnodes", nodes_refresh, NULL }, + /* FIXME: These support wait and full pagination, but we need to watch for deletes, too! */ + { "listhtlcs", default_refresh, NULL }, + { "listforwards", default_refresh, NULL }, + { "listinvoices", default_refresh, NULL }, + { "listsendpays", default_refresh, NULL }, /* These are never changed or deleted */ - { "listchainmoves", refresh_by_created_index }, - { "listchannelmoves", refresh_by_created_index }, + { "listchainmoves", refresh_by_created_index, "chainmoves" }, + { "listchannelmoves", refresh_by_created_index, "channelmoves" }, /* No pagination support */ - { "listoffers", default_refresh }, - { "listpeers", default_refresh }, - { "listpeerchannels", default_refresh }, - { "listclosedchannels", default_refresh }, - { "listtransactions", default_refresh }, - { "bkpr-listaccountevents", default_refresh }, - { "bkpr-listincome", default_refresh }, - { "listnetworkevents", default_refresh }, + { "listoffers", default_refresh, NULL }, + { "listpeers", default_refresh, NULL }, + { "listpeerchannels", default_refresh, NULL }, + { "listclosedchannels", default_refresh, NULL }, + { "listtransactions", default_refresh, NULL }, + { "bkpr-listaccountevents", default_refresh, NULL }, + { "bkpr-listincome", default_refresh, NULL }, + { "listnetworkevents", default_refresh, NULL }, }; static const struct refresh_funcs *find_command_refresh(const char *cmdname) @@ -1531,6 +1632,7 @@ static const struct refresh_funcs *find_command_refresh(const char *cmdname) } static struct table_desc *new_table_desc(const tal_t *ctx, + tablemap *tablemap, struct table_desc *parent, const jsmntok_t *cmd, const jsmntok_t *arrname, @@ -1553,6 +1655,7 @@ static struct table_desc *new_table_desc(const tal_t *ctx, td->columns = tal_arr(td, struct column *, 0); td->last_created_index = 0; td->has_created_index = false; + td->needs_refresh = true; td->refreshing = false; td->indices_created = false; list_head_init(&td->refresh_waiters); @@ -1561,17 +1664,19 @@ static struct table_desc *new_table_desc(const tal_t *ctx, if (!parent) { refresh_func = find_command_refresh(td->cmdname); td->refresh = refresh_func->refresh; + td->waitname = refresh_func->waitname; } /* sub-objects are a JSON thing, not a real table! */ if (!td->is_subobject) - strmap_add(&tablemap, td->name, td); + strmap_add(tablemap, td->name, td); return td; } /* Recursion */ -static void add_table_object(struct table_desc *td, const jsmntok_t *tok); +static void add_table_object(tablemap *tablemap, + struct table_desc *td, const jsmntok_t *tok); /* Simple case for arrays of a simple type. */ static void add_table_singleton(struct table_desc *td, @@ -1618,7 +1723,8 @@ static bool add_deprecated(const char *buffer, const jsmntok_t *tok, return true; } -static void add_table_properties(struct table_desc *td, +static void add_table_properties(tablemap *tablemap, + struct table_desc *td, const jsmntok_t *properties) { const jsmntok_t *t; @@ -1650,15 +1756,15 @@ static void add_table_properties(struct table_desc *td, items = json_get_member(schemas, t+1, "items"); type = json_get_member(schemas, items, "type"); - col->sub = new_table_desc(col, td, t, t, false); + col->sub = new_table_desc(col, tablemap, td, t, t, false); /* Array of primitives? Treat as single-entry obj */ if (!json_tok_streq(schemas, type, "object")) add_table_singleton(col->sub, t, items); else - add_table_object(col->sub, items); + add_table_object(tablemap, col->sub, items); } else if (json_tok_streq(schemas, type, "object")) { - col->sub = new_table_desc(col, td, t, t, true); - add_table_object(col->sub, t+1); + col->sub = new_table_desc(col, tablemap, td, t, t, true); + add_table_object(tablemap, col->sub, t+1); } else { col->ftype = find_fieldtype(type); col->sub = NULL; @@ -1675,7 +1781,8 @@ static void add_table_properties(struct table_desc *td, } /* tok is the JSON schema member for an object */ -static void add_table_object(struct table_desc *td, const jsmntok_t *tok) +static void add_table_object(tablemap *tablemap, + struct table_desc *td, const jsmntok_t *tok) { const jsmntok_t *t, *properties, *allof, *cond; size_t i; @@ -1683,24 +1790,24 @@ static void add_table_object(struct table_desc *td, const jsmntok_t *tok) /* This might not exist inside allOf, for example */ properties = json_get_member(schemas, tok, "properties"); if (properties) - add_table_properties(td, properties); + add_table_properties(tablemap, td, properties); allof = json_get_member(schemas, tok, "allOf"); if (allof) { json_for_each_arr(i, t, allof) - add_table_object(td, t); + add_table_object(tablemap, td, t); } /* We often find interesting things in then and else branches! */ cond = json_get_member(schemas, tok, "then"); if (cond) - add_table_object(td, cond); + add_table_object(tablemap, td, cond); cond = json_get_member(schemas, tok, "else"); if (cond) - add_table_object(td, cond); + add_table_object(tablemap, td, cond); } /* plugin is NULL if we're just doing --print-docs */ -static void init_tablemap(struct plugin *plugin) +static void init_tablemap(struct plugin *plugin, tablemap *tablemap) { const jsmntok_t *toks, *t; const tal_t *ctx; @@ -1711,7 +1818,7 @@ static void init_tablemap(struct plugin *plugin) else ctx = tmpctx; - strmap_init(&tablemap); + strmap_init(tablemap); toks = json_parse_simple(tmpctx, schemas, strlen(schemas)); json_for_each_obj(i, t, toks) { @@ -1726,8 +1833,8 @@ static void init_tablemap(struct plugin *plugin) type = json_get_member(schemas, items, "type"); assert(json_tok_streq(schemas, type, "object")); - td = new_table_desc(ctx, NULL, t, cmd, false); - add_table_object(td, items); + td = new_table_desc(ctx, tablemap, NULL, t, cmd, false); + add_table_object(tablemap, td, items); td->has_created_index = find_column(td, "created_index"); if (plugin) @@ -1737,16 +1844,17 @@ static void init_tablemap(struct plugin *plugin) static void memleak_mark_tablemap(struct plugin *p, struct htable *memtable) { - memleak_ptr(memtable, dbfilename); - memleak_scan_strmap(memtable, &tablemap); + struct sql *sql = sql_of(p); + memleak_scan_strmap(memtable, &sql->tablemap); } static const char *init(struct command *init_cmd, const char *buf UNUSED, const jsmntok_t *config UNUSED) { struct plugin *plugin = init_cmd->plugin; - db = sqlite_setup(plugin); - init_tablemap(plugin); + struct sql *sql = sql_of(plugin); + sql->db = sqlite_setup(plugin); + init_tablemap(plugin, &sql->tablemap); plugin_set_memleak_handler(plugin, memleak_mark_tablemap); return NULL; @@ -1861,23 +1969,32 @@ static bool print_one_table(const char *member, int main(int argc, char *argv[]) { + struct sql *sql; setup_locale(); if (argc == 2 && streq(argv[1], "--print-docs")) { + tablemap tablemap; common_setup(argv[0]); + /* plugin is NULL, so just sets up tables */ - init_tablemap(NULL); + init_tablemap(NULL, &tablemap); printf("The following tables are currently supported:\n"); strmap_iterate(&tablemap, print_one_table, NULL); common_shutdown(); return 0; } - plugin_main(argv, init, NULL, PLUGIN_RESTARTABLE, true, NULL, commands, ARRAY_SIZE(commands), + + sql = tal(NULL, struct sql); + sql->dbfilename = NULL; + sql->gosstore_fd = -1; + sql->gosstore_nodes_off = sql->gosstore_channels_off = 0; + sql->next_rowid = 1; + plugin_main(argv, init, take(sql), PLUGIN_RESTARTABLE, true, NULL, commands, ARRAY_SIZE(commands), NULL, 0, NULL, 0, NULL, 0, plugin_option_dev("dev-sqlfilename", "string", "Use on-disk sqlite3 file instead of in memory (e.g. debugging)", - charp_option, NULL, &dbfilename), + charp_option, NULL, &sql->dbfilename), NULL); } diff --git a/plugins/xpay/xpay.c b/plugins/xpay/xpay.c index 78bb17cda43e..70001888d69c 100644 --- a/plugins/xpay/xpay.c +++ b/plugins/xpay/xpay.c @@ -2500,10 +2500,13 @@ static struct command_result *handle_rpc_command(struct command *cmd, return command_hook_success(cmd); } +static const char *cmd_hook_filters[] = {"pay"}; static const struct plugin_hook hooks[] = { { - "rpc_command", - handle_rpc_command, + .name = "rpc_command", + .handle = handle_rpc_command, + .strfilters = cmd_hook_filters, + .num_strfilters = ARRAY_SIZE(cmd_hook_filters), }, }; diff --git a/pyproject.toml b/pyproject.toml index aa34f7cca33d..4003937614da 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,8 @@ dev = [ "flaky>=3.7.0", "requests>=2.32.0", "flask-socketio>=5", + "tqdm", + "pytest-benchmark", ] [project.optional-dependencies] diff --git a/tests/benchmark.py b/tests/benchmark.py index 030cb85781de..ec0062e82f7a 100644 --- a/tests/benchmark.py +++ b/tests/benchmark.py @@ -1,17 +1,41 @@ from concurrent import futures from fixtures import * # noqa: F401,F403 -from time import time from tqdm import tqdm +from utils import (wait_for, TIMEOUT) +import os import pytest import random +import statistics +import threading +import time num_workers = 480 num_payments = 10000 +def get_bench_node(node_factory, extra_options={}): + """Get a node which is optimized for benchmarking""" + options = extra_options.copy() + # The normal log-level trace makes for a lot of IO. + options['log-level'] = 'info' + node = node_factory.get_node(start=False, options=options) + # Memleak detection here creates significant overhead! + del node.daemon.env["LIGHTNINGD_DEV_MEMLEAK"] + # Don't bother recording all our io. + del node.daemon.opts['dev-save-plugin-io'] + node.start() + return node + + +def get_bench_line_graph(node_factory, num_nodes, wait_for_announce=False): + nodes = [get_bench_node(node_factory) for _ in range(num_nodes)] + node_factory.join_nodes(nodes, wait_for_announce=wait_for_announce) + return nodes + + @pytest.fixture def executor(): ex = futures.ThreadPoolExecutor(max_workers=num_workers) @@ -20,8 +44,8 @@ def executor(): def test_single_hop(node_factory, executor): - l1 = node_factory.get_node() - l2 = node_factory.get_node() + l1 = get_bench_node(node_factory) + l2 = get_bench_node(node_factory) l1.rpc.connect(l2.rpc.getinfo()['id'], 'localhost:%d' % l2.port) l1.openchannel(l2, 4000000) @@ -53,7 +77,7 @@ def do_pay(i, s): def test_single_payment(node_factory, benchmark): - l1, l2 = node_factory.line_graph(2) + l1, l2 = get_bench_line_graph(node_factory, 2) def do_pay(l1, l2): invoice = l2.rpc.invoice(1000, 'invoice-{}'.format(random.random()), 'desc')['bolt11'] @@ -63,7 +87,7 @@ def do_pay(l1, l2): def test_forward_payment(node_factory, benchmark): - l1, l2, l3 = node_factory.line_graph(3, wait_for_announce=True) + l1, l2, l3 = get_bench_line_graph(node_factory, 3, wait_for_announce=True) def do_pay(src, dest): invoice = dest.rpc.invoice(1000, 'invoice-{}'.format(random.random()), 'desc')['bolt11'] @@ -73,7 +97,7 @@ def do_pay(src, dest): def test_long_forward_payment(node_factory, benchmark): - nodes = node_factory.line_graph(21, wait_for_announce=True) + nodes = get_bench_line_graph(node_factory, 21, wait_for_announce=True) def do_pay(src, dest): invoice = dest.rpc.invoice(1000, 'invoice-{}'.format(random.random()), 'desc')['bolt11'] @@ -83,7 +107,7 @@ def do_pay(src, dest): def test_invoice(node_factory, benchmark): - l1 = node_factory.get_node() + l1 = get_bench_node(node_factory) def bench_invoice(): l1.rpc.invoice(1000, 'invoice-{}'.format(time()), 'desc') @@ -92,7 +116,7 @@ def bench_invoice(): def test_pay(node_factory, benchmark): - l1, l2 = node_factory.line_graph(2) + l1, l2 = get_bench_line_graph(node_factory, 2) invoices = [] for _ in range(1, 100): @@ -107,3 +131,100 @@ def do_pay(l1, l2): def test_start(node_factory, benchmark): benchmark(node_factory.get_node) + + +def test_generate_coinmoves(node_factory, bitcoind, executor, benchmark): + l1, l2, l3 = get_bench_line_graph(node_factory, 3, wait_for_announce=True) + + # Route some payments + l1.rpc.xpay(l3.rpc.invoice(1, "test_generate_coinmoves", "test_generate_coinmoves")['bolt11']) + # Make some payments + l2.rpc.xpay(l3.rpc.invoice(1, "test_generate_coinmoves3", "test_generate_coinmoves3")['bolt11']) + # Receive some payments + l1.rpc.xpay(l2.rpc.invoice(1, "test_generate_coinmoves", "test_generate_coinmoves")['bolt11']) + wait_for(lambda: all([c['htlcs'] == [] for c in l1.rpc.listpeerchannels()['channels']])) + + l2.stop() + entries = l2.db.query('SELECT * FROM channel_moves ORDER BY id;') + assert len(entries) == 4 + next_id = entries[-1]['id'] + 1 + next_timestamp = entries[-1]['timestamp'] + 1 + + batch = [] + # Let's make 5 million entries. + for _ in range(5_000_000 // len(entries)): + # Random payment_hash + entries[0]['payment_hash'] = entries[1]['payment_hash'] = random.randbytes(32) + entries[2]['payment_hash'] = random.randbytes(32) + entries[3]['payment_hash'] = random.randbytes(32) + # Incrementing timestamps + for e in entries: + e['timestamp'] = next_timestamp + next_timestamp += 1 + + for e in entries: + batch.append(( + next_id, + e['account_channel_id'], + e['account_nonchannel_id'], + e['tag_bitmap'], + e['credit_or_debit'], + e['timestamp'], + e['payment_hash'], + e['payment_part_id'], + e['payment_group_id'], + e['fees'], + )) + next_id += 1 + + l2.db.executemany("INSERT INTO channel_moves" + " (id, account_channel_id, account_nonchannel_id, tag_bitmap, credit_or_debit," + " timestamp, payment_hash, payment_part_id, payment_group_id, fees)" + " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + batch) + l2.start() + + def measure_latency(node, stop_event): + latencies = [] + + while not stop_event.is_set(): + time.sleep(0.1) + + start = time.time() + node.rpc.help() + end = time.time() + + latencies.append(end - start) + + return latencies + + stopme = threading.Event() + fut = executor.submit(measure_latency, l2, stopme) + + # This makes bkpr parse it all. + benchmark(l2.rpc.bkpr_listbalances) + + stopme.set() + latencies = fut.result(TIMEOUT) + + # FIXME: Print this somewhere! + benchmark.extra_info = {"title": "Latency details:", + "min": min(latencies), + "median": statistics.median(latencies), + "max": max(latencies)} + + +def test_spam_commands(node_factory, bitcoind, benchmark): + plugin = os.path.join(os.getcwd(), "tests/plugins/test_libplugin") + l1 = get_bench_node(node_factory, extra_options={"plugin": plugin}) + + # This calls "batch" 1M times (which doesn't need a transaction) + benchmark(l1.rpc.spamcommand, 1_000_000) + + +def test_spam_listcommands(node_factory, bitcoind, benchmark): + plugin = os.path.join(os.getcwd(), "tests/plugins/test_libplugin") + l1 = get_bench_node(node_factory, extra_options={"plugin": plugin}) + + # This calls "listinvoice" 100,000 times (which doesn't need a transaction commit) + benchmark(l1.rpc.spamlistcommand, 100_000) diff --git a/tests/plugins/custommsg_b.py b/tests/plugins/custommsg_b.py index 6282701f4e9f..63ddcaf8d4bb 100755 --- a/tests/plugins/custommsg_b.py +++ b/tests/plugins/custommsg_b.py @@ -4,7 +4,7 @@ plugin = Plugin() -@plugin.hook('custommsg') +@plugin.hook('custommsg', filters=[0xaaff]) def on_custommsg(peer_id, payload, plugin, message=None, **kwargs): plugin.log("Got custommessage_b {msg} from peer {peer_id}".format( msg=payload, diff --git a/tests/plugins/dep_a.py b/tests/plugins/dep_a.py index be1af29ed5b1..10b9c2e7ecb6 100755 --- a/tests/plugins/dep_a.py +++ b/tests/plugins/dep_a.py @@ -1,5 +1,7 @@ #!/usr/bin/env python3 from pyln.client import Plugin +import time + """A simple plugin that must come before dep_b. """ @@ -8,6 +10,7 @@ @plugin.hook('htlc_accepted', before=['dep_b.py']) def on_htlc_accepted(htlc, plugin, **kwargs): + time.sleep(1) print("htlc_accepted called") return {'result': 'continue'} diff --git a/tests/plugins/test_libplugin.c b/tests/plugins/test_libplugin.c index f73378e625a8..c51ab55be7ec 100644 --- a/tests/plugins/test_libplugin.c +++ b/tests/plugins/test_libplugin.c @@ -197,6 +197,64 @@ static struct command_result *json_checkthis(struct command *cmd, return send_outreq(req); } +static struct command_result *spam_done(struct command *cmd, void *unused) +{ + return command_success(cmd, json_out_obj(cmd, NULL, NULL)); +} + +static struct command_result *spam_errcb(struct command *cmd, + const char *method, + const char *buf, + const jsmntok_t *tok, + void *unused) +{ + plugin_err(cmd->plugin, "%.*s", + json_tok_full_len(tok), + json_tok_full(buf, tok)); +} + +static struct command_result *json_spamcommand(struct command *cmd, + const char *buf, + const jsmntok_t *params) +{ + u64 *iterations; + struct request_batch *batch; + + if (!param(cmd, buf, params, + p_req("iterations", param_u64, &iterations), + NULL)) + return command_param_failed(); + + batch = request_batch_new(cmd, NULL, spam_errcb, spam_done, NULL); + for (size_t i = 0; i < *iterations; i++) { + struct out_req *req = add_to_batch(cmd, batch, "batching"); + json_add_bool(req->js, "enable", true); + send_outreq(req); + } + return batch_done(cmd, batch); +} + +static struct command_result *json_spamlistcommand(struct command *cmd, + const char *buf, + const jsmntok_t *params) +{ + u64 *iterations; + struct request_batch *batch; + + if (!param(cmd, buf, params, + p_req("iterations", param_u64, &iterations), + NULL)) + return command_param_failed(); + + batch = request_batch_new(cmd, NULL, spam_errcb, spam_done, NULL); + for (size_t i = 0; i < *iterations; i++) { + struct out_req *req = add_to_batch(cmd, batch, "listinvoices"); + send_outreq(req); + } + return batch_done(cmd, batch); +} + + static char *set_dynamic(struct plugin *plugin, const char *arg, bool check_only, @@ -270,6 +328,14 @@ static const struct plugin_command commands[] = { { "checkthis", json_checkthis, }, + { + "spamcommand", + json_spamcommand, + }, + { + "spamlistcommand", + json_spamlistcommand, + }, }; static const char *before[] = { "dummy", NULL }; diff --git a/tests/test_misc.py b/tests/test_misc.py index 6adcfad86520..31a9c3c9e600 100644 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -2903,6 +2903,12 @@ def test_sendcustommsg(node_factory): msg=msg, peer_id=l2.info['id']), ]) + # custommessage_b plugin only registers for 0xaaff msgs, so it won't see this one: + msg2 = 'aa' + ('fd' * 30) + 'bb' + l2.rpc.sendcustommsg(l4.info['id'], msg2) + l4.daemon.wait_for_log(f'Got custommessage_a {msg2} from peer') + assert not l4.daemon.is_in_log(f'Got custommessage_b {msg2} from peer') + def test_custommsg_triggers_notification(node_factory): """Check that a notification is triggered when a node receives diff --git a/tests/test_plugin.py b/tests/test_plugin.py index 0989e89c5e22..a036ecc2c59a 100644 --- a/tests/test_plugin.py +++ b/tests/test_plugin.py @@ -2315,7 +2315,7 @@ def test_important_plugin(node_factory): n = node_factory.get_node(options={"important-plugin": os.path.join(pluginsdir, "nonexistent")}, may_fail=True, expect_fail=True, # Other plugins can complain as lightningd stops suddenly: - broken_log='Plugin marked as important, shutting down lightningd|Reading JSON input: Connection reset by peer|Lost connection to the RPC socket', + broken_log='Plugin marked as important, shutting down lightningd|Reading sync lightningd: Connection reset by peer|Lost connection to the RPC socket', start=False) n.daemon.start(wait_for_initialized=False, stderr_redir=True) @@ -2610,6 +2610,39 @@ def test_htlc_accepted_hook_failonion(node_factory): l1.rpc.pay(inv) +def test_hook_in_use(node_factory): + """If a hook is in use when we add a plugin to it, we have to defer""" + dep_a = os.path.join(os.path.dirname(__file__), 'plugins/dep_a.py') + dep_b = os.path.join(os.path.dirname(__file__), 'plugins/dep_b.py') + + l1, l2 = node_factory.line_graph(2, opts=[{}, {'plugin': [dep_a]}]) + + NUM_ITERATIONS = 10 + + invs = [l2.rpc.invoice(1, f'test_hook_in_use{i}', 'test_hook_in_use') for i in range(NUM_ITERATIONS)] + + route = [{'amount_msat': 1, + 'id': l2.info['id'], + 'delay': 20, + 'channel': l1.get_channel_scid(l2)}] + + for i in range(NUM_ITERATIONS): + l1.rpc.sendpay(route, + amount_msat=1, + payment_hash=invs[i]['payment_hash'], + payment_secret=invs[i]['payment_secret']) + if i % 2 == 1: + l1.rpc.waitsendpay(payment_hash=invs[i - 1]['payment_hash']) + l1.rpc.waitsendpay(payment_hash=invs[i]['payment_hash']) + else: + l2.rpc.plugin_start(plugin=dep_b) + l2.rpc.plugin_stop(plugin=dep_b) + + # We should have deferred hook update at least once! + l2.daemon.wait_for_log("UNUSUAL plugin-dep_b.py: Deferring registration of hook htlc_accepted until it's not in use.") + l2.daemon.wait_for_log("UNUSUAL lightningd: Updating hooks for htlc_accepted now usage is done.") + + def test_htlc_accepted_hook_fwdto(node_factory): plugin = os.path.join(os.path.dirname(__file__), 'plugins/htlc_accepted-fwdto.py') l1, l2, l3 = node_factory.line_graph(3, opts=[{}, {'plugin': plugin}, {}], wait_for_announce=True) diff --git a/tests/test_xpay.py b/tests/test_xpay.py index 087ade6bbe40..ed24a4b76909 100644 --- a/tests/test_xpay.py +++ b/tests/test_xpay.py @@ -387,8 +387,14 @@ def test_xpay_takeover(node_factory, executor): # Simple bolt11/bolt12 payment. inv = l3.rpc.invoice(100000, "test_xpay_takeover1", "test_xpay_takeover1")['bolt11'] l1.rpc.pay(inv) + l1.daemon.wait_for_log('Calling rpc_command hook of plugin cln-xpay') l1.daemon.wait_for_log('Redirecting pay->xpay') + # Quickly test that xpay does NOT receive other commands now. + l1.rpc.help() + assert not l1.daemon.is_in_log('Calling rpc_command hook of plugin cln-xpay', + start=l1.daemon.logsearch_start) + # Array version inv = l3.rpc.invoice(100000, "test_xpay_takeover2", "test_xpay_takeover2")['bolt11'] subprocess.check_output(['cli/lightning-cli', diff --git a/uv.lock b/uv.lock index 0ebadbe0b8cf..e00e2b9e0bd2 100644 --- a/uv.lock +++ b/uv.lock @@ -415,11 +415,13 @@ dev = [ { name = "flaky" }, { name = "flask-socketio" }, { name = "pytest" }, + { name = "pytest-benchmark" }, { name = "pytest-custom-exit-code" }, { name = "pytest-test-groups" }, { name = "pytest-timeout" }, { name = "pytest-xdist" }, { name = "requests" }, + { name = "tqdm" }, ] [package.metadata] @@ -444,11 +446,13 @@ dev = [ { name = "flaky", specifier = ">=3.7.0" }, { name = "flask-socketio", specifier = ">=5" }, { name = "pytest", specifier = ">=8.0.0" }, + { name = "pytest-benchmark" }, { name = "pytest-custom-exit-code", specifier = "==0.3.0" }, { name = "pytest-test-groups", specifier = ">=1.2.0" }, { name = "pytest-timeout", specifier = ">=2.4.0" }, { name = "pytest-xdist", specifier = ">=3.6.0" }, { name = "requests", specifier = ">=2.32.0" }, + { name = "tqdm" }, ] [[package]] @@ -1244,6 +1248,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ad/53/73196ebc19d6fbfc22427b982fbc98698b7b9c361e5e7707e3a3247cf06d/psycopg2_binary-2.9.10-cp39-cp39-win_amd64.whl", hash = "sha256:30e34c4e97964805f715206c7b789d54a78b70f3ff19fbe590104b71c45600e5", size = 1163958, upload-time = "2024-10-16T11:24:51.882Z" }, ] +[[package]] +name = "py-cpuinfo" +version = "9.0.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/37/a8/d832f7293ebb21690860d2e01d8115e5ff6f2ae8bbdc953f0eb0fa4bd2c7/py-cpuinfo-9.0.0.tar.gz", hash = "sha256:3cdbbf3fac90dc6f118bfd64384f309edeadd902d7c8fb17f02ffa1fc3f49690", size = 104716, upload-time = "2022-10-25T20:38:06.303Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e0/a9/023730ba63db1e494a271cb018dcd361bd2c917ba7004c3e49d5daf795a2/py_cpuinfo-9.0.0-py3-none-any.whl", hash = "sha256:859625bc251f64e21f077d099d4162689c762b5d6a4c3c97553d56241c9674d5", size = 22335, upload-time = "2022-10-25T20:38:27.636Z" }, +] + [[package]] name = "pycodestyle" version = "2.14.0" @@ -1506,6 +1519,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a8/a4/20da314d277121d6534b3a980b29035dcd51e6744bd79075a6ce8fa4eb8d/pytest-8.4.2-py3-none-any.whl", hash = "sha256:872f880de3fc3a5bdc88a11b39c9710c3497a547cfa9320bc3c5e62fbf272e79", size = 365750, upload-time = "2025-09-04T14:34:20.226Z" }, ] +[[package]] +name = "pytest-benchmark" +version = "5.2.3" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "py-cpuinfo" }, + { name = "pytest" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/24/34/9f732b76456d64faffbef6232f1f9dbec7a7c4999ff46282fa418bd1af66/pytest_benchmark-5.2.3.tar.gz", hash = "sha256:deb7317998a23c650fd4ff76e1230066a76cb45dcece0aca5607143c619e7779", size = 341340, upload-time = "2025-11-09T18:48:43.215Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/33/29/e756e715a48959f1c0045342088d7ca9762a2f509b945f362a316e9412b7/pytest_benchmark-5.2.3-py3-none-any.whl", hash = "sha256:bc839726ad20e99aaa0d11a127445457b4219bdb9e80a1afc4b51da7f96b0803", size = 45255, upload-time = "2025-11-09T18:48:39.765Z" }, +] + [[package]] name = "pytest-custom-exit-code" version = "0.3.0" @@ -1840,6 +1866,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/6e/c2/61d3e0f47e2b74ef40a68b9e6ad5984f6241a942f7cd3bbfbdbd03861ea9/tomli-2.2.1-py3-none-any.whl", hash = "sha256:cb55c73c5f4408779d0cf3eef9f762b9c9f147a77de7b258bef0a5628adc85cc", size = 14257, upload-time = "2024-11-27T22:38:35.385Z" }, ] +[[package]] +name = "tqdm" +version = "4.67.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "colorama", marker = "sys_platform == 'win32'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/a8/4b/29b4ef32e036bb34e4ab51796dd745cdba7ed47ad142a9f4a1eb8e0c744d/tqdm-4.67.1.tar.gz", hash = "sha256:f8aef9c52c08c13a65f30ea34f4e5aac3fd1a34959879d7e59e63027286627f2", size = 169737, upload-time = "2024-11-24T20:12:22.481Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d0/30/dc54f88dd4a2b5dc8a0279bdd7270e735851848b762aeb1c1184ed1f6b14/tqdm-4.67.1-py3-none-any.whl", hash = "sha256:26445eca388f82e72884e0d580d5464cd801a3ea01e63e5601bdff9ba6a48de2", size = 78540, upload-time = "2024-11-24T20:12:19.698Z" }, +] + [[package]] name = "types-protobuf" version = "6.32.1.20250918" diff --git a/wallet/test/run-wallet.c b/wallet/test/run-wallet.c index dd5856fa8707..74e02307fe54 100644 --- a/wallet/test/run-wallet.c +++ b/wallet/test/run-wallet.c @@ -604,7 +604,9 @@ bool peer_start_openingd(struct peer *peer UNNEEDED, { fprintf(stderr, "peer_start_openingd called!\n"); abort(); } /* Generated stub for plugin_hook_call_ */ bool plugin_hook_call_(struct lightningd *ld UNNEEDED, - const struct plugin_hook *hook UNNEEDED, + struct plugin_hook *hook UNNEEDED, + const char *strfilterfield TAKES UNNEEDED, + u64 intfilterfield UNNEEDED, const char *cmd_id TAKES UNNEEDED, tal_t *cb_arg STEALS UNNEEDED) { fprintf(stderr, "plugin_hook_call_ called!\n"); abort(); }