Skip to content

Commit

Permalink
Add registered discovery nodes as signers (#4562)
Browse files Browse the repository at this point in the history
  • Loading branch information
isaacsolo committed Jan 17, 2023
1 parent 14a0529 commit b9484dd
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 10 deletions.
1 change: 1 addition & 0 deletions discovery-provider/default_config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ infra_setup =
indexing_transaction_index_sort_order_start_block =
get_users_cnode_ttl_sec = 5
enable_save_cid = false
max_signers = 0

[flask]
debug = true
Expand Down
91 changes: 82 additions & 9 deletions discovery-provider/src/tasks/index_network_peers.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import logging

import requests
from src.tasks.celery_app import celery
from src.utils.eth_contracts_helpers import fetch_all_registered_content_nodes
from src.utils.get_all_other_nodes import get_all_other_nodes
from src.utils.prometheus_metric import save_duration_metric

logger = logging.getLogger(__name__)

LOCAL_RPC = "http://chain:8545"
DOUBLE_CAST_ERROR_CODE = -32603

# What is a "Peer" in this context?
# A peer represents another known entity in the network
Expand All @@ -15,15 +19,89 @@


# Query the L1 set of audius protocol contracts and retrieve a list of peer endpoints
def retrieve_peers_from_eth_contracts(self):
def index_content_node_peers(self):
cid_metadata_client = update_network_peers.cid_metadata_client
shared_config = update_network_peers.shared_config
eth_web3 = update_network_peers.eth_web3
redis = update_network_peers.redis
eth_abi_values = update_network_peers.eth_abi_values
return fetch_all_registered_content_nodes(
content_nodes = fetch_all_registered_content_nodes(
eth_web3, shared_config, redis, eth_abi_values
)

content_peers = list(content_nodes)
# Update creator node url list in CID Metadata Client
# This list of known nodes is used to traverse and retrieve metadata from gateways
cid_metadata_client.update_cnode_urls(content_peers)
logger.info(f"index_network_peers.py | All known content peers {content_nodes}")


def clique_propose(wallet: str, vote: bool):
propose_data = (
'{"method":"clique_propose","params":["'
+ wallet
+ '", '
+ str(vote).lower()
+ "]}"
)
response = requests.post(LOCAL_RPC, data=propose_data)
return response.json()


def index_discovery_node_peers(self):
shared_config = update_network_peers.shared_config
current_wallet = shared_config["delegate"]["owner_wallet"].lower()

# the maximum signers in addition to the registered static nodes
max_signers = int(shared_config["discprov"]["max_signers"])

other_wallets = set([wallet.lower() for wallet in get_all_other_nodes()[1]])
logger.info(
f"index_network_peers.py | Other registered discovery addresses: {other_wallets}"
)

# get current signers
get_signers_data = '{"method":"clique_getSigners","params":[]}'
signers_response = requests.post(LOCAL_RPC, data=get_signers_data)
signers_response_dict = signers_response.json()
current_signers = set(
[wallet.lower() for wallet in signers_response_dict["result"]]
)
logger.info(f"index_network_peers.py | Current chain signers: {current_signers}")

# only signers can propose
if current_wallet not in current_signers:
return

# propose registered nodes as signers
current_signers.remove(current_wallet)
add_wallets = sorted(list(other_wallets - current_signers))[:max_signers]
for wallet in add_wallets:
response_dict = clique_propose(wallet, True)
if (
"error" in response_dict
and response_dict["error"]["code"] != DOUBLE_CAST_ERROR_CODE
):
logger.error(
f"index_network_peers.py | Failed to add signer {wallet} with error {response_dict['error']['message']}"
)
else:
logger.info(f"index_network_peers.py | Proposed to add signer {wallet}")

# remove unregistered nodes as signers
remove_wallets = sorted(list(current_signers - other_wallets))
for wallet in remove_wallets:
response_dict = clique_propose(wallet, False)
if (
"error" in response_dict
and response_dict["error"]["code"] != DOUBLE_CAST_ERROR_CODE
):
logger.error(
f"index_network_peers.py | Failed to remove signer {wallet} with error {response_dict['error']['message']}"
)
else:
logger.info(f"index_network_peers.py | Proposed to remove signer {wallet}")


# ####### CELERY TASKS ####### #
@celery.task(name="update_network_peers", bind=True)
Expand All @@ -33,7 +111,6 @@ def update_network_peers(self):
# Details regarding custom task context can be found in wiki
# Custom Task definition can be found in src/app.py
redis = update_network_peers.redis
cid_metadata_client = update_network_peers.cid_metadata_client
# Define lock acquired boolean
have_lock = False
# Define redis lock object
Expand All @@ -43,13 +120,9 @@ def update_network_peers(self):
have_lock = update_lock.acquire(blocking=False)
if have_lock:
# An object returned from web3 chain queries
all_peers = retrieve_peers_from_eth_contracts(self)
index_content_node_peers(self)

logger.info(f"index_network_peers.py | All known peers {all_peers}")
peers_list = list(all_peers)
# Update creator node url list in CID Metadata Client
# This list of known nodes is used to traverse and retrieve metadata from gateways
cid_metadata_client.update_cnode_urls(peers_list)
index_discovery_node_peers(self)
else:
logger.info(
"index_network_peers.py | Failed to acquire update_network_peers"
Expand Down
2 changes: 1 addition & 1 deletion discovery-provider/src/utils/get_all_other_nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ async def fetch_results():
wallet = node_info[3]
if wallet != shared_config["delegate"]["owner_wallet"]:
endpoint = node_info[1]
all_other_wallets.append(wallet)
if is_fqdn(endpoint):
all_other_nodes.append(endpoint)
all_other_wallets.append(wallet)
except Exception as e:
logger.error(
f"get_all_other_nodes.py | ERROR in fetching node info {node_info} generated {e}"
Expand Down

0 comments on commit b9484dd

Please sign in to comment.