diff --git a/src/backend/kubernetes_backend.py b/src/backend/kubernetes_backend.py index c10f2dc7b..8b998c1ad 100644 --- a/src/backend/kubernetes_backend.py +++ b/src/backend/kubernetes_backend.py @@ -265,6 +265,12 @@ def ln_cli(self, tank: Tank, command: list[str]): self.log.debug(f"Running lncli {cmd=:} on {tank.index=:}") return self.exec_run(tank.index, ServiceType.LIGHTNING, cmd) + def ln_pub_key(self, tank) -> str: + if tank.lnnode is None: + raise Exception("No LN node configured for tank") + self.log.debug(f"Getting pub key for tank {tank.index}") + return tank.lnnode.get_pub_key() + def get_bitcoin_cli(self, tank: Tank, method: str, params=None): if params: cmd = f"bitcoin-cli -regtest -rpcuser={tank.rpc_user} -rpcport={tank.rpc_port} -rpcpassword={tank.rpc_password} {method} {' '.join(map(str, params))}" @@ -468,14 +474,21 @@ def remove_prometheus_service_monitors(self, tanks): def get_lnnode_hostname(self, index: int) -> str: return f"{self.get_service_name(index, ServiceType.LIGHTNING)}.{self.namespace}" - def create_lnd_container( - self, tank, bitcoind_service_name, volume_mounts - ) -> client.V1Container: + def create_ln_container(self, tank, bitcoind_service_name, volume_mounts) -> client.V1Container: # These args are appended to the Dockerfile `ENTRYPOINT ["lnd"]` bitcoind_rpc_host = f"{bitcoind_service_name}.{self.namespace}" lightning_dns = self.get_lnnode_hostname(tank.index) args = tank.lnnode.get_conf(lightning_dns, bitcoind_rpc_host) self.log.debug(f"Creating lightning container for tank {tank.index} using {args=:}") + lightning_ready_probe = "" + if tank.lnnode.impl == "lnd": + lightning_ready_probe = "lncli --network=regtest getinfo" + elif tank.lnnode.impl == "cln": + lightning_ready_probe = "lightning-cli --network=regtest getinfo" + else: + raise Exception( + f"Lightning node implementation {tank.lnnode.impl} for tank {tank.index} not supported" + ) lightning_container = client.V1Container( name=LN_CONTAINER_NAME, image=tank.lnnode.image, @@ -486,12 +499,10 @@ def create_lnd_container( readiness_probe=client.V1Probe( failure_threshold=1, success_threshold=3, - initial_delay_seconds=1, + initial_delay_seconds=10, period_seconds=2, timeout_seconds=2, - _exec=client.V1ExecAction( - command=["/bin/sh", "-c", "lncli --network=regtest getinfo"] - ), + _exec=client.V1ExecAction(command=["/bin/sh", "-c", lightning_ready_probe]), ), security_context=client.V1SecurityContext( privileged=True, @@ -681,7 +692,7 @@ def deploy_pods(self, warnet): raise e self.client.create_namespaced_service(namespace=self.namespace, body=bitcoind_service) - # Create and deploy LND pod + # Create and deploy a lightning pod if tank.lnnode: conts = [] vols = [] @@ -700,9 +711,9 @@ def deploy_pods(self, warnet): ) # Add circuit breaker container conts.append(self.create_circuitbreaker_container(tank, volume_mounts)) - # Add lnd container + # Add lightning container conts.append( - self.create_lnd_container(tank, bitcoind_service.metadata.name, volume_mounts) + self.create_ln_container(tank, bitcoind_service.metadata.name, volume_mounts) ) # Put it all together in a pod lnd_pod = self.create_pod_object( diff --git a/src/cli/main.py b/src/cli/main.py index f145d8d49..8c0a3a9e7 100644 --- a/src/cli/main.py +++ b/src/cli/main.py @@ -87,6 +87,21 @@ def lncli(node: int, command: tuple, network: str): ) +@cli.command(context_settings={"ignore_unknown_options": True}) +@click.argument("node", type=int) +@click.option("--network", default="warnet", show_default=True, type=str) +def ln_pub_key(node: int, network: str): + """ + Get lightning node pub key on in [network] + """ + print( + rpc_call( + "tank_ln_pub_key", + {"network": network, "node": node}, + ) + ) + + @cli.command() @click.argument("node", type=int, required=True) @click.option("--network", default="warnet", show_default=True) diff --git a/src/scenarios/ln_init.py b/src/scenarios/ln_init.py index ef949dac7..4bc9dc2fa 100644 --- a/src/scenarios/ln_init.py +++ b/src/scenarios/ln_init.py @@ -4,7 +4,6 @@ from scenarios.utils import ensure_miner from warnet.test_framework_bridge import WarnetTestFramework -from warnet.utils import channel_match def cli_help(): @@ -51,7 +50,7 @@ def funded_lnnodes(): for tank in self.warnet.tanks: if tank.lnnode is None: continue - if int(tank.lnnode.get_wallet_balance()["confirmed_balance"]) < (split * 100000000): + if int(tank.lnnode.get_wallet_balance()) < (split * 100000000): return False return True @@ -117,13 +116,13 @@ def funded_lnnodes(): self.log.info(f"Waiting for graph gossip sync, LN nodes remaining: {ln_nodes_gossip}") for index in ln_nodes_gossip: lnnode = self.warnet.tanks[index].lnnode - my_edges = len(lnnode.lncli("describegraph")["edges"]) - my_nodes = len(lnnode.lncli("describegraph")["nodes"]) - if my_edges == len(chan_opens) and my_nodes == len(ln_nodes): + count_channels = len(lnnode.get_graph_channels()) + count_graph_nodes = len(lnnode.get_graph_nodes()) + if count_channels == len(chan_opens) and count_graph_nodes == len(ln_nodes): ln_nodes_gossip.remove(index) else: self.log.info( - f" node {index} not synced (channels: {my_edges}/{len(chan_opens)}, nodes: {my_nodes}/{len(ln_nodes)})" + f" node {index} not synced (channels: {count_channels}/{len(chan_opens)}, nodes: {count_graph_nodes}/{len(ln_nodes)})" ) sleep(5) @@ -142,14 +141,31 @@ def funded_lnnodes(): score = 0 for tank_index, me in enumerate(ln_nodes): you = (tank_index + 1) % len(ln_nodes) - my_channels = self.warnet.tanks[me].lnnode.lncli("describegraph")["edges"] - your_channels = self.warnet.tanks[you].lnnode.lncli("describegraph")["edges"] + my_channels = self.warnet.tanks[me].lnnode.get_graph_channels() + your_channels = self.warnet.tanks[you].lnnode.get_graph_channels() match = True - for chan_index, my_chan in enumerate(my_channels): - your_chan = your_channels[chan_index] - if not channel_match(my_chan, your_chan, allow_flip=False): + for _chan_index, my_chan in enumerate(my_channels): + your_chan = [ + chan + for chan in your_channels + if chan.short_chan_id == my_chan.short_chan_id + ][0] + if not your_chan: + print(f"Channel policy missing for channel: {my_chan.short_chan_id}") + match = False + break + + try: + if not my_chan.channel_match(your_chan): + print( + f"Channel policy doesn't match between tanks {me} & {you}: {my_chan.short_chan_id}" + ) + match = False + break + except Exception as e: + print(f"Error comparing channel policies: {e}") print( - f"Channel policy doesn't match between tanks {me} & {you}: {my_chan['channel_id']}" + f"Channel policy doesn't match between tanks {me} & {you}: {my_chan.short_chan_id}" ) match = False break diff --git a/src/schema/graph_schema.json b/src/schema/graph_schema.json index e9ddd3a39..1c8ff6cb6 100644 --- a/src/schema/graph_schema.json +++ b/src/schema/graph_schema.json @@ -43,7 +43,7 @@ "comment": "A string of configure options used when building Bitcoin Core from source code, e.g. '--without-gui --disable-tests'"}, "ln": { "type": "string", - "comment": "Attach a lightning network node of this implementation (currently only supports 'lnd')"}, + "comment": "Attach a lightning network node of this implementation (currently only supports 'lnd' or 'cln')"}, "ln_image": { "type": "string", "comment": "Specify a lightning network node image from Dockerhub with the format repository/image:tag"}, diff --git a/src/warnet/cln.py b/src/warnet/cln.py new file mode 100644 index 000000000..caf45b2c9 --- /dev/null +++ b/src/warnet/cln.py @@ -0,0 +1,181 @@ +import io +import tarfile + +from backend.kubernetes_backend import KubernetesBackend +from warnet.services import ServiceType +from warnet.utils import exponential_backoff, generate_ipv4_addr, handle_json + +from .lnchannel import LNChannel +from .lnnode import LNNode +from .status import RunningStatus + +CLN_CONFIG_BASE = " ".join( + [ + "--network=regtest", + "--database-upgrade=true", + "--bitcoin-retry-timeout=600", + "--bind-addr=0.0.0.0:9735", + "--developer", + "--dev-fast-gossip", + "--log-level=debug", + ] +) + + +class CLNNode(LNNode): + def __init__(self, warnet, tank, backend: KubernetesBackend, options): + self.warnet = warnet + self.tank = tank + self.backend = backend + self.image = options["ln_image"] + self.cb = options["cb_image"] + self.ln_config = options["ln_config"] + self.ipv4 = generate_ipv4_addr(self.warnet.subnet) + self.rpc_port = 10009 + self.impl = "cln" + + @property + def status(self) -> RunningStatus: + return super().status + + @property + def cb_status(self) -> RunningStatus: + return super().cb_status + + def get_conf(self, ln_container_name, tank_container_name) -> str: + conf = CLN_CONFIG_BASE + conf += f" --alias={self.tank.index}" + conf += f" --grpc-port={self.rpc_port}" + conf += f" --bitcoin-rpcuser={self.tank.rpc_user}" + conf += f" --bitcoin-rpcpassword={self.tank.rpc_password}" + conf += f" --bitcoin-rpcconnect={tank_container_name}" + conf += f" --bitcoin-rpcport={self.tank.rpc_port}" + conf += f" --announce-addr=dns:{ln_container_name}:9735" + return conf + + @exponential_backoff(max_retries=20, max_delay=300) + @handle_json + def lncli(self, cmd) -> dict: + cli = "lightning-cli" + cmd = f"{cli} --network=regtest {cmd}" + return self.backend.exec_run(self.tank.index, ServiceType.LIGHTNING, cmd) + + def getnewaddress(self): + return self.lncli("newaddr")["bech32"] + + def get_pub_key(self): + res = self.lncli("getinfo") + return res["id"] + + def getURI(self): + res = self.lncli("getinfo") + if len(res["address"]) < 1: + return None + return f'{res["id"]}@{res["address"][0]["address"]}:{res["address"][0]["port"]}' + + def get_wallet_balance(self) -> int: + res = self.lncli("listfunds") + return int(sum(o["amount_msat"] for o in res["outputs"]) / 1000) + + # returns the channel point in the form txid:output_index + def open_channel_to_tank(self, index: int, channel_open_data: str) -> str: + tank = self.warnet.tanks[index] + [pubkey, host] = tank.lnnode.getURI().split("@") + res = self.lncli(f"fundchannel id={pubkey} {channel_open_data}") + return f"{res['txid']}:{res['outnum']}" + + def update_channel_policy(self, chan_point: str, policy: str) -> str: + return self.lncli(f"setchannel {chan_point} {policy}") + + def get_graph_nodes(self) -> list[str]: + return list(n["nodeid"] for n in self.lncli("listnodes")["nodes"]) + + def get_graph_channels(self) -> list[LNChannel]: + cln_channels = self.lncli("listchannels")["channels"] + # CLN lists channels twice, once for each direction. This finds the unique channel ids. + short_channel_ids = {chan["short_channel_id"]: chan for chan in cln_channels}.keys() + channels = [] + for short_channel_id in short_channel_ids: + node1, node2 = ( + chans for chans in cln_channels if chans["short_channel_id"] == short_channel_id + ) + channels.append(self.lnchannel_from_json(node1, node2)) + return channels + + @staticmethod + def lnchannel_from_json(node1: object, node2: object) -> LNChannel: + assert node1["short_channel_id"] == node2["short_channel_id"] + assert node1["direction"] != node2["direction"] + return LNChannel( + node1_pub=node1["source"], + node2_pub=node2["source"], + capacity_msat=node1["amount_msat"], + short_chan_id=node1["short_channel_id"], + node1_min_htlc=node1["htlc_minimum_msat"], + node2_min_htlc=node2["htlc_minimum_msat"], + node1_max_htlc=node1["htlc_maximum_msat"], + node2_max_htlc=node2["htlc_maximum_msat"], + node1_base_fee_msat=node1["base_fee_millisatoshi"], + node2_base_fee_msat=node2["base_fee_millisatoshi"], + node1_fee_rate_milli_msat=node1["fee_per_millionth"], + node2_fee_rate_milli_msat=node2["fee_per_millionth"], + ) + + def get_peers(self) -> list[str]: + return list(p["id"] for p in self.lncli("listpeers")["peers"]) + + def connect_to_tank(self, index): + return super().connect_to_tank(index) + + def generate_cli_command(self, command: list[str]): + network = f"--network={self.tank.warnet.bitcoin_network}" + cmd = f"{network} {' '.join(command)}" + cmd = f"lightning-cli {cmd}" + return cmd + + def export(self, config: object, tar_file): + # Retrieve the credentials + ca_cert = self.backend.get_file( + self.tank.index, + ServiceType.LIGHTNING, + "/root/.lightning/regtest/ca.pem", + ) + client_cert = self.backend.get_file( + self.tank.index, + ServiceType.LIGHTNING, + "/root/.lightning/regtest/client.pem", + ) + client_key = self.backend.get_file( + self.tank.index, + ServiceType.LIGHTNING, + "/root/.lightning/regtest/client-key.pem", + ) + name = f"ln-{self.tank.index}" + ca_cert_filename = f"{name}_ca_cert.pem" + client_cert_filename = f"{name}_client_cert.pem" + client_key_filename = f"{name}_client_key.pem" + host = self.backend.get_lnnode_hostname(self.tank.index) + + # Add the files to the in-memory tar archive + tarinfo1 = tarfile.TarInfo(name=ca_cert_filename) + tarinfo1.size = len(ca_cert) + fileobj1 = io.BytesIO(ca_cert) + tar_file.addfile(tarinfo=tarinfo1, fileobj=fileobj1) + tarinfo2 = tarfile.TarInfo(name=client_cert_filename) + tarinfo2.size = len(client_cert) + fileobj2 = io.BytesIO(client_cert) + tar_file.addfile(tarinfo=tarinfo2, fileobj=fileobj2) + tarinfo3 = tarfile.TarInfo(name=client_key_filename) + tarinfo3.size = len(client_key) + fileobj3 = io.BytesIO(client_key) + tar_file.addfile(tarinfo=tarinfo3, fileobj=fileobj3) + + config["nodes"].append( + { + "id": name, + "address": f"https://{host}:{self.rpc_port}", + "ca_cert": f"/simln/{ca_cert_filename}", + "client_cert": f"/simln/{client_cert_filename}", + "client_key": f"/simln/{client_key_filename}", + } + ) diff --git a/src/warnet/lnchannel.py b/src/warnet/lnchannel.py new file mode 100644 index 000000000..90518e6f9 --- /dev/null +++ b/src/warnet/lnchannel.py @@ -0,0 +1,130 @@ +import logging + + +class LNChannel: + def __init__( + self, + node1_pub: str, + node2_pub: str, + capacity_msat: int = 0, + short_chan_id: str = "", + node1_min_htlc: int = 0, + node2_min_htlc: int = 0, + node1_max_htlc: int = 0, + node2_max_htlc: int = 0, + node1_base_fee_msat: int = 0, + node2_base_fee_msat: int = 0, + node1_fee_rate_milli_msat: int = 0, + node2_fee_rate_milli_msat: int = 0, + node1_time_lock_delta: int = 0, + node2_time_lock_delta: int = 0, + ) -> None: + # Ensure that the node with the lower pubkey is node1 + if node1_pub > node2_pub: + node1_pub, node2_pub = node2_pub, node1_pub + node1_min_htlc, node2_min_htlc = node2_min_htlc, node1_min_htlc + node1_max_htlc, node2_max_htlc = node2_max_htlc, node1_max_htlc + node1_base_fee_msat, node2_base_fee_msat = node2_base_fee_msat, node1_base_fee_msat + node1_fee_rate_milli_msat, node2_fee_rate_milli_msat = ( + node2_fee_rate_milli_msat, + node1_fee_rate_milli_msat, + ) + node1_time_lock_delta, node2_time_lock_delta = ( + node2_time_lock_delta, + node1_time_lock_delta, + ) + self.node1_pub = node1_pub + self.node2_pub = node2_pub + self.capacity_msat = capacity_msat + self.short_chan_id = short_chan_id + self.node1_min_htlc = node1_min_htlc + self.node2_min_htlc = node2_min_htlc + self.node1_max_htlc = node1_max_htlc + self.node2_max_htlc = node2_max_htlc + self.node1_base_fee_msat = node1_base_fee_msat + self.node2_base_fee_msat = node2_base_fee_msat + self.node1_fee_rate_milli_msat = node1_fee_rate_milli_msat + self.node2_fee_rate_milli_msat = node2_fee_rate_milli_msat + self.node1_time_lock_delta = node1_time_lock_delta + self.node2_time_lock_delta = node2_time_lock_delta + self.logger = logging.getLogger("lnchan") + + def __str__(self) -> str: + return ( + f"LNChannel(short_chan_id={self.short_chan_id}, " + f"capacity_msat={self.capacity_msat}, " + f"node1_pub={self.node1_pub[:8]}..., " + f"node2_pub={self.node2_pub[:8]}..., " + f"node1_policy=(min_htlc={self.node1_min_htlc}, " + f"max_htlc={self.node1_max_htlc}, " + f"base_fee={self.node1_base_fee_msat}, " + f"fee_rate={self.node1_fee_rate_milli_msat}, " + f"time_lock_delta={self.node1_time_lock_delta}), " + f"node2_policy=(min_htlc={self.node2_min_htlc}, " + f"max_htlc={self.node2_max_htlc}, " + f"base_fee={self.node2_base_fee_msat}, " + f"fee_rate={self.node2_fee_rate_milli_msat}, " + f"time_lock_delta={self.node2_time_lock_delta}))" + ) + + # Only used to compare warnet channels imported from a mainnet source file + # because pubkeys are unpredictable and node 1/2 might be swapped + def flip(self) -> "LNChannel": + return LNChannel( + # Keep the old pubkeys so the constructor doesn't just flip it back + node1_pub=self.node1_pub, + node2_pub=self.node2_pub, + capacity_msat=self.capacity_msat, + short_chan_id=self.short_chan_id, + # Flip the policies + node1_min_htlc=self.node2_min_htlc, + node2_min_htlc=self.node1_min_htlc, + node1_max_htlc=self.node2_max_htlc, + node2_max_htlc=self.node1_max_htlc, + node1_base_fee_msat=self.node2_base_fee_msat, + node2_base_fee_msat=self.node1_base_fee_msat, + node1_fee_rate_milli_msat=self.node2_fee_rate_milli_msat, + node2_fee_rate_milli_msat=self.node1_fee_rate_milli_msat, + node1_time_lock_delta=self.node2_time_lock_delta, + node2_time_lock_delta=self.node1_time_lock_delta, + ) + + def policy_match(self, ch2: "LNChannel") -> bool: + assert isinstance(ch2, LNChannel) + + def compare_attributes(attr1, attr2, min_value=0, attr_name=""): + if attr1 == 0 or attr2 == 0: + return True + result = max(int(attr1), min_value) == max(int(attr2), min_value) + if not result: + self.logger.debug(f"Mismatch in {attr_name}: {attr1} != {attr2}") + return result + + attributes_to_compare = [ + (self.node1_time_lock_delta, ch2.node1_time_lock_delta, 18, "node1_time_lock_delta"), + (self.node2_time_lock_delta, ch2.node2_time_lock_delta, 18, "node2_time_lock_delta"), + (self.node1_min_htlc, ch2.node1_min_htlc, 1, "node1_min_htlc"), + (self.node2_min_htlc, ch2.node2_min_htlc, 1, "node2_min_htlc"), + (self.node1_base_fee_msat, ch2.node1_base_fee_msat, 0, "node1_base_fee_msat"), + (self.node2_base_fee_msat, ch2.node2_base_fee_msat, 0, "node2_base_fee_msat"), + ( + self.node1_fee_rate_milli_msat, + ch2.node1_fee_rate_milli_msat, + 0, + "node1_fee_rate_milli_msat", + ), + ( + self.node2_fee_rate_milli_msat, + ch2.node2_fee_rate_milli_msat, + 0, + "node2_fee_rate_milli_msat", + ), + ] + + return all(compare_attributes(*attrs) for attrs in attributes_to_compare) + + def channel_match(self, ch2: "LNChannel") -> bool: + if self.capacity_msat != ch2.capacity_msat: + self.logger.debug(f"Capacity mismatch: {self.capacity_msat} != {ch2.capacity_msat}") + return False + return self.policy_match(ch2) diff --git a/src/warnet/lnd.py b/src/warnet/lnd.py new file mode 100644 index 000000000..85869425e --- /dev/null +++ b/src/warnet/lnd.py @@ -0,0 +1,191 @@ +import io +import tarfile + +from backend.kubernetes_backend import KubernetesBackend +from warnet.services import ServiceType +from warnet.utils import exponential_backoff, generate_ipv4_addr, handle_json + +from .lnchannel import LNChannel +from .lnnode import LNNode, lnd_to_cl_scid +from .status import RunningStatus + +LND_CONFIG_BASE = " ".join( + [ + "--noseedbackup", + "--norest", + "--debuglevel=debug", + "--accept-keysend", + "--bitcoin.active", + "--bitcoin.regtest", + "--bitcoin.node=bitcoind", + "--maxpendingchannels=64", + "--trickledelay=1", + ] +) + + +class LNDNode(LNNode): + def __init__(self, warnet, tank, backend: KubernetesBackend, options): + self.warnet = warnet + self.tank = tank + self.backend = backend + self.image = options["ln_image"] + self.cb = options["cb_image"] + self.ln_config = options["ln_config"] + self.ipv4 = generate_ipv4_addr(self.warnet.subnet) + self.rpc_port = 10009 + self.impl = "lnd" + + @property + def status(self) -> RunningStatus: + return super().status + + @property + def cb_status(self) -> RunningStatus: + return super().cb_status + + def get_conf(self, ln_container_name, tank_container_name) -> str: + conf = LND_CONFIG_BASE + conf += f" --bitcoind.rpcuser={self.tank.rpc_user}" + conf += f" --bitcoind.rpcpass={self.tank.rpc_password}" + conf += f" --bitcoind.rpchost={tank_container_name}:{self.tank.rpc_port}" + conf += f" --bitcoind.zmqpubrawblock=tcp://{tank_container_name}:{self.tank.zmqblockport}" + conf += f" --bitcoind.zmqpubrawtx=tcp://{tank_container_name}:{self.tank.zmqtxport}" + conf += f" --rpclisten=0.0.0.0:{self.rpc_port}" + conf += f" --alias={self.tank.index}" + conf += f" --externalhosts={ln_container_name}" + conf += f" --tlsextradomain={ln_container_name}" + conf += " " + self.ln_config + return conf + + @exponential_backoff(max_retries=20, max_delay=300) + @handle_json + def lncli(self, cmd) -> dict: + cli = "lncli" + cmd = f"{cli} --network=regtest {cmd}" + return self.backend.exec_run(self.tank.index, ServiceType.LIGHTNING, cmd) + + def getnewaddress(self): + return self.lncli("newaddress p2wkh")["address"] + + def get_pub_key(self): + res = self.lncli("getinfo") + return res["identity_pubkey"] + + def getURI(self): + res = self.lncli("getinfo") + if len(res["uris"]) < 1: + return None + return res["uris"][0] + + def get_wallet_balance(self) -> int: + res = self.lncli("walletbalance")["confirmed_balance"] + return res + + # returns the channel point in the form txid:output_index + def open_channel_to_tank(self, index: int, channel_open_data: str) -> str: + tank = self.warnet.tanks[index] + [pubkey, host] = tank.lnnode.getURI().split("@") + txid = self.lncli(f"openchannel --node_key={pubkey} --connect={host} {channel_open_data}")[ + "funding_txid" + ] + # Why doesn't LND return the output index as well? + # Do they charge by the RPC call or something?! + pending = self.lncli("pendingchannels") + for chan in pending["pending_open_channels"]: + if txid in chan["channel"]["channel_point"]: + return chan["channel"]["channel_point"] + raise Exception(f"Opened channel with txid {txid} not found in pending channels") + + def update_channel_policy(self, chan_point: str, policy: str) -> str: + ret = self.lncli(f"updatechanpolicy --chan_point={chan_point} {policy}") + if len(ret["failed_updates"]) == 0: + return ret + else: + raise Exception(ret) + + def get_graph_nodes(self) -> list[str]: + return list(n["pub_key"] for n in self.lncli("describegraph")["nodes"]) + + def get_graph_channels(self) -> list[LNChannel]: + edges = self.lncli("describegraph")["edges"] + return [self.lnchannel_from_json(edge) for edge in edges] + + @staticmethod + def lnchannel_from_json(edge: object) -> LNChannel: + return LNChannel( + node1_pub=edge["node1_pub"], + node2_pub=edge["node2_pub"], + capacity_msat=(int(edge["capacity"]) * 1000), + short_chan_id=lnd_to_cl_scid(edge["channel_id"]), + node1_min_htlc=int(edge["node1_policy"]["min_htlc"]) if edge["node1_policy"] else 0, + node2_min_htlc=int(edge["node2_policy"]["min_htlc"]) if edge["node2_policy"] else 0, + node1_max_htlc=int(edge["node1_policy"]["max_htlc_msat"]) + if edge["node1_policy"] + else 0, + node2_max_htlc=int(edge["node2_policy"]["max_htlc_msat"]) + if edge["node2_policy"] + else 0, + node1_base_fee_msat=int(edge["node1_policy"]["fee_base_msat"]) + if edge["node1_policy"] + else 0, + node2_base_fee_msat=int(edge["node2_policy"]["fee_base_msat"]) + if edge["node2_policy"] + else 0, + node1_fee_rate_milli_msat=int(edge["node1_policy"]["fee_rate_milli_msat"]) + if edge["node1_policy"] + else 0, + node2_fee_rate_milli_msat=int(edge["node2_policy"]["fee_rate_milli_msat"]) + if edge["node2_policy"] + else 0, + node1_time_lock_delta=int(edge["node1_policy"]["time_lock_delta"]) + if edge["node1_policy"] + else 0, + node2_time_lock_delta=int(edge["node2_policy"]["time_lock_delta"]) + if edge["node2_policy"] + else 0, + ) + + def get_peers(self) -> list[str]: + return list(p["pub_key"] for p in self.lncli("listpeers")["peers"]) + + def connect_to_tank(self, index): + return super().connect_to_tank(index) + + def generate_cli_command(self, command: list[str]): + network = f"--network={self.tank.warnet.bitcoin_network}" + cmd = f"{network} {' '.join(command)}" + cmd = f"lncli {cmd}" + return cmd + + def export(self, config: object, tar_file): + # Retrieve the credentials + macaroon = self.backend.get_file( + self.tank.index, + ServiceType.LIGHTNING, + "/root/.lnd/data/chain/bitcoin/regtest/admin.macaroon", + ) + cert = self.backend.get_file(self.tank.index, ServiceType.LIGHTNING, "/root/.lnd/tls.cert") + name = f"ln-{self.tank.index}" + macaroon_filename = f"{name}_admin.macaroon" + cert_filename = f"{name}_tls.cert" + host = self.backend.get_lnnode_hostname(self.tank.index) + + # Add the files to the in-memory tar archive + tarinfo1 = tarfile.TarInfo(name=macaroon_filename) + tarinfo1.size = len(macaroon) + fileobj1 = io.BytesIO(macaroon) + tar_file.addfile(tarinfo=tarinfo1, fileobj=fileobj1) + tarinfo2 = tarfile.TarInfo(name=cert_filename) + tarinfo2.size = len(cert) + fileobj2 = io.BytesIO(cert) + tar_file.addfile(tarinfo=tarinfo2, fileobj=fileobj2) + + config["nodes"].append( + { + "id": name, + "address": f"https://{host}:{self.rpc_port}", + "macaroon": f"/simln/{macaroon_filename}", + "cert": f"/simln/{cert_filename}", + } + ) diff --git a/src/warnet/lnnode.py b/src/warnet/lnnode.py index ed0c86592..9d27f8138 100644 --- a/src/warnet/lnnode.py +++ b/src/warnet/lnnode.py @@ -1,38 +1,16 @@ -import io -import tarfile +from abc import ABC, abstractmethod from backend.kubernetes_backend import KubernetesBackend from warnet.services import ServiceType -from warnet.utils import exponential_backoff, generate_ipv4_addr, handle_json +from warnet.utils import exponential_backoff, handle_json from .status import RunningStatus -LND_CONFIG_BASE = " ".join( - [ - "--noseedbackup", - "--norest", - "--debuglevel=debug", - "--accept-keysend", - "--bitcoin.active", - "--bitcoin.regtest", - "--bitcoin.node=bitcoind", - "--maxpendingchannels=64", - "--trickledelay=1", - ] -) - - -class LNNode: + +class LNNode(ABC): + @abstractmethod def __init__(self, warnet, tank, backend: KubernetesBackend, options): - self.warnet = warnet - self.tank = tank - self.backend = backend - self.impl = options["impl"] - self.image = options["ln_image"] - self.cb = options["cb_image"] - self.ln_config = options["ln_config"] - self.ipv4 = generate_ipv4_addr(self.warnet.subnet) - self.rpc_port = 10009 + pass @property def status(self) -> RunningStatus: @@ -46,65 +24,52 @@ def cb_status(self) -> RunningStatus: self.tank.index, ServiceType.CIRCUITBREAKER ) + @abstractmethod def get_conf(self, ln_container_name, tank_container_name) -> str: - if self.impl == "lnd": - conf = LND_CONFIG_BASE - conf += f" --bitcoind.rpcuser={self.tank.rpc_user}" - conf += f" --bitcoind.rpcpass={self.tank.rpc_password}" - conf += f" --bitcoind.rpchost={tank_container_name}:{self.tank.rpc_port}" - conf += ( - f" --bitcoind.zmqpubrawblock=tcp://{tank_container_name}:{self.tank.zmqblockport}" - ) - conf += f" --bitcoind.zmqpubrawtx=tcp://{tank_container_name}:{self.tank.zmqtxport}" - conf += f" --rpclisten=0.0.0.0:{self.rpc_port}" - conf += f" --alias={self.tank.index}" - conf += f" --externalhosts={ln_container_name}" - conf += f" --tlsextradomain={ln_container_name}" - conf += " " + self.ln_config - return conf - return "" + pass @exponential_backoff(max_retries=20, max_delay=300) @handle_json + @abstractmethod def lncli(self, cmd) -> dict: - cmd = f"lncli --network=regtest {cmd}" - return self.backend.exec_run(self.tank.index, ServiceType.LIGHTNING, cmd) + pass + @abstractmethod def getnewaddress(self): - res = self.lncli("newaddress p2wkh") - return res["address"] + pass + + @abstractmethod + def get_pub_key(self): + pass + @abstractmethod def getURI(self): - res = self.lncli("getinfo") - if len(res["uris"]) < 1: - return None - return res["uris"][0] + pass - def get_wallet_balance(self): - res = self.lncli("walletbalance") - return res + @abstractmethod + def get_wallet_balance(self) -> int: + pass - # returns the channel point in the form txid:output_index - def open_channel_to_tank(self, index: int, policy: str) -> str: - tank = self.warnet.tanks[index] - [pubkey, host] = tank.lnnode.getURI().split("@") - txid = self.lncli(f"openchannel --node_key={pubkey} --connect={host} {policy}")[ - "funding_txid" - ] - # Why doesn't LND return the output index as well? - # Do they charge by the RPC call or something?! - pending = self.lncli("pendingchannels") - for chan in pending["pending_open_channels"]: - if txid in chan["channel"]["channel_point"]: - return chan["channel"]["channel_point"] - raise Exception(f"Opened channel with txid {txid} not found in pending channels") + @abstractmethod + def open_channel_to_tank(self, index: int, channel_open_data: str) -> str: + """Return the channel point in the form txid:output_index""" + pass + @abstractmethod def update_channel_policy(self, chan_point: str, policy: str) -> str: - ret = self.lncli(f"updatechanpolicy --chan_point={chan_point} {policy}") - if len(ret["failed_updates"]) == 0: - return ret - else: - raise Exception(ret) + pass + + @abstractmethod + def get_graph_nodes(self) -> list[str]: + pass + + @abstractmethod + def get_graph_channels(self) -> list[dict]: + pass + + @abstractmethod + def get_peers(self) -> list[str]: + pass def connect_to_tank(self, index): tank = self.warnet.tanks[index] @@ -112,46 +77,23 @@ def connect_to_tank(self, index): res = self.lncli(f"connect {uri}") return res + @abstractmethod def generate_cli_command(self, command: list[str]): - network = f"--network={self.tank.warnet.bitcoin_network}" - cmd = f"{network} {' '.join(command)}" - match self.impl: - case "lnd": - cmd = f"lncli {cmd}" - case "cln": - cmd = f"lightning-cli {cmd}" - case _: - raise Exception(f"Unsupported LN implementation: {self.impl}") - return cmd + pass + @abstractmethod def export(self, config: object, tar_file): - # Retrieve the credentials - macaroon = self.backend.get_file( - self.tank.index, - ServiceType.LIGHTNING, - "/root/.lnd/data/chain/bitcoin/regtest/admin.macaroon", - ) - cert = self.backend.get_file(self.tank.index, ServiceType.LIGHTNING, "/root/.lnd/tls.cert") - name = f"ln-{self.tank.index}" - macaroon_filename = f"{name}_admin.macaroon" - cert_filename = f"{name}_tls.cert" - host = self.backend.get_lnnode_hostname(self.tank.index) - - # Add the files to the in-memory tar archive - tarinfo1 = tarfile.TarInfo(name=macaroon_filename) - tarinfo1.size = len(macaroon) - fileobj1 = io.BytesIO(macaroon) - tar_file.addfile(tarinfo=tarinfo1, fileobj=fileobj1) - tarinfo2 = tarfile.TarInfo(name=cert_filename) - tarinfo2.size = len(cert) - fileobj2 = io.BytesIO(cert) - tar_file.addfile(tarinfo=tarinfo2, fileobj=fileobj2) - - config["nodes"].append( - { - "id": name, - "address": f"https://{host}:{self.rpc_port}", - "macaroon": f"/simln/{macaroon_filename}", - "cert": f"/simln/{cert_filename}", - } - ) + pass + + +def lnd_to_cl_scid(id) -> str: + s = int(id, 10) + block = s >> 40 + tx = s >> 16 & 0xFFFFFF + output = s & 0xFFFF + return f"{block}x{tx}x{output}" + + +def cl_to_lnd_scid(s) -> int: + s = [int(i) for i in s.split("x")] + return (s[0] << 40) | (s[1] << 16) | s[2] diff --git a/src/warnet/server.py b/src/warnet/server.py index e7522c9e0..5bb9f5190 100644 --- a/src/warnet/server.py +++ b/src/warnet/server.py @@ -142,6 +142,7 @@ def setup_rpc(self): self.jsonrpc.register(self.tank_lncli) self.jsonrpc.register(self.tank_debug_log) self.jsonrpc.register(self.tank_messages) + self.jsonrpc.register(self.tank_ln_pub_key) # Scenarios self.jsonrpc.register(self.scenarios_available) self.jsonrpc.register(self.scenarios_run) @@ -208,6 +209,18 @@ def tank_lncli(self, node: int, command: list[str], network: str = "warnet") -> self.logger.error(msg) raise ServerError(message=msg) from e + def tank_ln_pub_key(self, node: int, network: str = "warnet") -> str: + """ + Get lightning pub key on in [network] + """ + wn = self.get_warnet(network) + try: + return wn.container_interface.ln_pub_key(wn.tanks[node]) + except Exception as e: + msg = f"Error getting pub key: {e}" + self.logger.error(msg) + raise ServerError(message=msg) from e + def tank_debug_log(self, network: str, node: int) -> str: """ Fetch the Bitcoin Core debug log from diff --git a/src/warnet/tank.py b/src/warnet/tank.py index 0b64142ad..64b73e1de 100644 --- a/src/warnet/tank.py +++ b/src/warnet/tank.py @@ -93,13 +93,20 @@ def parse_graph_node(self, node): if "ln" in node: options = { "impl": node["ln"], - "ln_image": node.get("ln_image", "lightninglabs/lnd:v0.18.0-beta"), "cb_image": node.get("ln_cb_image", None), "ln_config": node.get("ln_config", ""), } - from warnet.lnnode import LNNode - - self.lnnode = LNNode(self.warnet, self, self.warnet.container_interface, options) + from warnet.cln import CLNNode + from warnet.lnd import LNDNode + + if options["impl"] == "lnd": + options["ln_image"] = node.get("ln_image", "lightninglabs/lnd:v0.18.0-beta") + self.lnnode = LNDNode(self.warnet, self, self.warnet.container_interface, options) + elif options["impl"] == "cln": + options["ln_image"] = node.get("ln_image", "elementsproject/lightningd:v23.11") + self.lnnode = CLNNode(self.warnet, self, self.warnet.container_interface, options) + else: + raise Exception(f"Unsupported Lightning Network implementation: {options['impl']}") logger.debug( f"Parsed graph node: {self.index} with attributes: {[f'{key}={value}' for key, value in graph_properties.items()]}" diff --git a/src/warnet/utils.py b/src/warnet/utils.py index aba6b0bd7..f0be46eeb 100644 --- a/src/warnet/utils.py +++ b/src/warnet/utils.py @@ -478,29 +478,3 @@ def validate_graph_schema(graph: nx.Graph): validate(instance=graph.nodes[n], schema=graph_schema["node"]) for e in list(graph.edges): validate(instance=graph.edges[e], schema=graph_schema["edge"]) - - -def policy_match(pol1, pol2): - return ( - max(int(pol1["time_lock_delta"]), 18) == max(int(pol2["time_lock_delta"]), 18) - and max(int(pol1["min_htlc"]), 1) == max(int(pol2["min_htlc"]), 1) - and pol1["fee_base_msat"] == pol2["fee_base_msat"] - and pol1["fee_rate_milli_msat"] == pol2["fee_rate_milli_msat"] - # Ignoring this for now since we use capacity/2 - # and pol1["max_htlc_msat"] == pol2["max_htlc_msat"] - ) - - -def channel_match(ch1, ch2, allow_flip=False): - if ch1["capacity"] != ch2["capacity"]: - return False - if policy_match(ch1["node1_policy"], ch2["node1_policy"]) and policy_match( - ch1["node2_policy"], ch2["node2_policy"] - ): - return True - if not allow_flip: - return False - else: - return policy_match(ch1["node1_policy"], ch2["node2_policy"]) and policy_match( - ch1["node2_policy"], ch2["node1_policy"] - ) diff --git a/test/data/ln.graphml b/test/data/ln.graphml index 310bfa651..2eb054039 100644 --- a/test/data/ln.graphml +++ b/test/data/ln.graphml @@ -38,6 +38,12 @@ --bitcoin.timelockdelta=33 + 27.0 + -uacomment=w2 + cln + --cltv-delta=33 + + 27.0 -uacomment=w3 @@ -45,15 +51,20 @@ - + + - + --local_amt=100000 --base_fee_msat=2200 --fee_rate_ppm=13 --time_lock_delta=20 - + --local_amt=100000 --push_amt=50000 --base_fee_msat=5500 --fee_rate_ppm=3 --time_lock_delta=40 + + amount=100000 push_msat=50000000 + feebase=5500 feeppm=3 + - + \ No newline at end of file diff --git a/test/graph_test.py b/test/graph_test.py index 3119963e8..458e29b82 100755 --- a/test/graph_test.py +++ b/test/graph_test.py @@ -7,81 +7,108 @@ from pathlib import Path from test_base import TestBase -from warnet.utils import DEFAULT_TAG, channel_match - -graph_file_path = Path(os.path.dirname(__file__)) / "data" / "services.graphml" -json_file_path = Path(os.path.dirname(__file__)) / "data" / "LN_10.json" -NUM_IMPORTED_NODES = 10 - -base = TestBase() - -# Does not require a running Warnet RPC server yet -test_dir = tempfile.TemporaryDirectory() -tf_create = f"{test_dir.name}/{str(uuid.uuid4())}.graphml" -tf_import = f"{test_dir.name}/{str(uuid.uuid4())}.graphml" - -print(f"\nCLI tool creating test graph file: {tf_create}") -print(base.warcli(f"graph create 10 --outfile={tf_create} --version={DEFAULT_TAG}", network=False)) -base.wait_for_predicate(lambda: Path(tf_create).exists()) - -print(f"\nCLI tool importing json and writing test graph file: {tf_import}") -print( - base.warcli( - f"graph import-json {json_file_path} --outfile={tf_import} --ln_image=carlakirkcohen/lnd:attackathon --cb=carlakirkcohen/circuitbreaker:attackathon-test", - network=False, - ) -) -base.wait_for_predicate(lambda: Path(tf_import).exists()) - -# Validate the graph schema -assert "invalid" not in base.warcli(f"graph validate {Path(tf_create)}", False) -assert "invalid" not in base.warcli(f"graph validate {Path(tf_import)}", False) -assert "invalid" not in base.warcli(f"graph validate {graph_file_path}", False) - -# Test that the graphs actually work... now we need a server -base.start_server() - - -print("\nTesting graph with optional services...") -print(base.warcli(f"network start {graph_file_path}")) -base.wait_for_all_tanks_status(target="running") -base.wait_for_all_edges() -base.warcli("rpc 0 getblockcount") - -print("\nChecking services...") -base.warcli("network down") -base.wait_for_all_tanks_status(target="stopped") - -print("\nTesting created graph...") -print(base.warcli(f"network start {Path(tf_create)} --force")) -base.wait_for_all_tanks_status(target="running") -base.wait_for_all_edges() -base.warcli("rpc 0 getblockcount") -base.warcli("network down") -base.wait_for_all_tanks_status(target="stopped") - -print("\nTesting imported graph...") -print(base.warcli(f"network start {Path(tf_import)} --force")) -base.wait_for_all_tanks_status(target="running") -base.wait_for_all_edges() -base.warcli("rpc 0 getblockcount") -base.warcli("scenarios run ln_init") -base.wait_for_all_scenarios() - -print("Ensuring warnet LN channel policies match imported JSON description") -with open(json_file_path) as file: - actual = json.loads(base.warcli("lncli 0 describegraph"))["edges"] - expected = json.loads(file.read())["edges"] - expected = sorted(expected, key=lambda chan: int(chan["channel_id"])) - for chan_index, actual_chan in enumerate(actual): - expected_chan = expected[chan_index] - if not channel_match(actual_chan, expected_chan, allow_flip=True): - raise Exception( - f"Channel policy doesn't match source: {actual_chan['channel_id']}\n" - + "Actual:\n" - + json.dumps(actual_chan, indent=2) - + "Expected:\n" - + json.dumps(expected_chan, indent=2) +from warnet.lnd import LNDNode +from warnet.utils import DEFAULT_TAG + + +class GraphTest(TestBase): + def __init__(self): + super().__init__() + self.graph_file_path = Path(os.path.dirname(__file__)) / "data" / "services.graphml" + self.json_file_path = Path(os.path.dirname(__file__)) / "data" / "LN_10.json" + self.NUM_IMPORTED_NODES = 10 + self.test_dir = tempfile.TemporaryDirectory() + self.tf_create = f"{self.test_dir.name}/{str(uuid.uuid4())}.graphml" + self.tf_import = f"{self.test_dir.name}/{str(uuid.uuid4())}.graphml" + + def run_test(self): + self.test_graph_creation_and_import() + self.validate_graph_schema() + + self.start_server() + try: + self.test_graph_with_optional_services() + self.test_created_graph() + self.test_imported_graph() + finally: + self.stop_server() + + def test_graph_creation_and_import(self): + self.log.info(f"CLI tool creating test graph file: {self.tf_create}") + self.log.info( + self.warcli( + f"graph create 10 --outfile={self.tf_create} --version={DEFAULT_TAG}", network=False ) - -base.stop_server() + ) + self.wait_for_predicate(lambda: Path(self.tf_create).exists()) + + self.log.info(f"CLI tool importing json and writing test graph file: {self.tf_import}") + self.log.info( + self.warcli( + f"graph import-json {self.json_file_path} --outfile={self.tf_import} --ln_image=carlakirkcohen/lnd:attackathon --cb=carlakirkcohen/circuitbreaker:attackathon-test", + network=False, + ) + ) + self.wait_for_predicate(lambda: Path(self.tf_import).exists()) + + def validate_graph_schema(self): + self.log.info("Validating graph schema") + assert "invalid" not in self.warcli(f"graph validate {Path(self.tf_create)}", False) + assert "invalid" not in self.warcli(f"graph validate {Path(self.tf_import)}", False) + assert "invalid" not in self.warcli(f"graph validate {self.graph_file_path}", False) + + def test_graph_with_optional_services(self): + self.log.info("Testing graph with optional services...") + self.log.info(self.warcli(f"network start {self.graph_file_path}")) + self.wait_for_all_tanks_status(target="running") + self.wait_for_all_edges() + self.warcli("rpc 0 getblockcount") + + self.log.info("Checking services...") + self.warcli("network down") + self.wait_for_all_tanks_status(target="stopped") + + def test_created_graph(self): + self.log.info("Testing created graph...") + self.log.info(self.warcli(f"network start {Path(self.tf_create)} --force")) + self.wait_for_all_tanks_status(target="running") + self.wait_for_all_edges() + self.warcli("rpc 0 getblockcount") + self.warcli("network down") + self.wait_for_all_tanks_status(target="stopped") + + def test_imported_graph(self): + self.log.info("Testing imported graph...") + self.log.info(self.warcli(f"network start {Path(self.tf_import)} --force")) + self.wait_for_all_tanks_status(target="running") + self.wait_for_all_edges() + self.warcli("rpc 0 getblockcount") + self.warcli("scenarios run ln_init") + self.wait_for_all_scenarios() + + self.verify_ln_channel_policies() + + def verify_ln_channel_policies(self): + self.log.info("Ensuring warnet LN channel policies match imported JSON description") + with open(self.json_file_path) as file: + actual = json.loads(self.warcli("lncli 0 describegraph"))["edges"] + expected = json.loads(file.read())["edges"] + expected = sorted(expected, key=lambda chan: int(chan["channel_id"])) + for chan_index, actual_chan_json in enumerate(actual): + expected_chan = LNDNode.lnchannel_from_json(expected[chan_index]) + actual_chan = LNDNode.lnchannel_from_json(actual_chan_json) + if not expected_chan.channel_match(actual_chan): + self.log.info( + f"Channel {chan_index} policy mismatch, testing flipped channel: {actual_chan.short_chan_id}" + ) + if not expected_chan.channel_match(actual_chan.flip()): + raise Exception( + f"Channel policy doesn't match source: {actual_chan.short_chan_id}\n" + + f"Actual:\n{actual_chan}\n" + + f"Expected:\n{expected_chan}\n" + ) + + +if __name__ == "__main__": + test = GraphTest() + test.run_test()