Skip to content

Commit

Permalink
Fetch cnode endpoints on IPFSClient init (#2280)
Browse files Browse the repository at this point in the history
* Fetch cnode endpoints on IPFSClient init

* lint

* lint

* more lint

* alphabetical imports is dumb

* alphabetical imports is very dumb

* Final lint

* Final one

* Code review comments

* Modify FQDN to work locally + unit test
  • Loading branch information
dmanjunath committed Jan 13, 2022
1 parent 26ed12c commit 982ab6d
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 79 deletions.
15 changes: 11 additions & 4 deletions discovery-provider/src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,14 +507,20 @@ def configure_celery(celery, test_config=None):
database_url, ast.literal_eval(shared_config["db"]["engine_args_literal"])
)
logger.info("Database instance initialized!")
# Initialize IPFS client for celery task context
ipfs_client = IPFSClient(
shared_config["ipfs"]["host"], shared_config["ipfs"]["port"]
)

# Initialize Redis connection
redis_inst = redis.Redis.from_url(url=redis_url)

# Initialize IPFS client for celery task context
ipfs_client = IPFSClient(
shared_config["ipfs"]["host"],
shared_config["ipfs"]["port"],
eth_web3,
shared_config,
redis_inst,
eth_abi_values,
)

# Clear last scanned redis block on startup
delete_last_scanned_eth_block_redis(redis_inst)

Expand Down Expand Up @@ -547,6 +553,7 @@ def __init__(self, *args, **kwargs):
db=db,
web3=web3,
abi_values=abi_values,
eth_abi_values=eth_abi_values,
shared_config=shared_config,
ipfs_client=ipfs_client,
redis=redis_inst,
Expand Down
6 changes: 6 additions & 0 deletions discovery-provider/src/database_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ def __init__(
db=None,
web3=None,
abi_values=None,
eth_abi_values=None,
shared_config=None,
ipfs_client=None,
redis=None,
Expand All @@ -20,6 +21,7 @@ def __init__(
self._db = db
self._web3_provider = web3
self._abi_values = abi_values
self._eth_abi_values = eth_abi_values
self._shared_config = shared_config
self._ipfs_client = ipfs_client
self._redis = redis
Expand All @@ -31,6 +33,10 @@ def __init__(
def abi_values(self):
return self._abi_values

@property
def eth_abi_values(self):
return self._eth_abi_values

@property
def web3(self):
return self._web3_provider
Expand Down
74 changes: 6 additions & 68 deletions discovery-provider/src/tasks/index_network_peers.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
import concurrent.futures
import logging

from src.app import get_eth_abi_values
from src.models import User
from src.tasks.celery_app import celery
from src.utils.helpers import get_ipfs_info_from_cnode_endpoint, is_fqdn
from src.utils.redis_cache import get_pickled_key, get_sp_id_key, pickle_and_set
from src.utils.eth_contracts_helpers import fetch_all_registered_content_nodes
from src.utils.helpers import get_ipfs_info_from_cnode_endpoint

logger = logging.getLogger(__name__)

sp_factory_registry_key = bytes("ServiceProviderFactory", "utf-8")
content_node_service_type = bytes("content-node", "utf-8")

cnode_info_redis_ttl = 1800

# What is a "Peer" in this context?
# A peer represents another known entity in the network
Expand All @@ -24,72 +19,15 @@
# concept is very much distinct.


# Perform eth web3 call to fetch endpoint info
def fetch_cnode_info(sp_id, sp_factory_instance):
redis = update_network_peers.redis
sp_id_key = get_sp_id_key(sp_id)
sp_info_cached = get_pickled_key(redis, sp_id_key)
if sp_info_cached:
logger.info(
f"index_network_peers.py | Found cached value for spID={sp_id} - {sp_info_cached}"
)
return sp_info_cached

cn_endpoint_info = sp_factory_instance.functions.getServiceEndpointInfo(
content_node_service_type, sp_id
).call()
pickle_and_set(redis, sp_id_key, cn_endpoint_info, cnode_info_redis_ttl)
logger.info(
f"index_network_peers.py | Configured redis {sp_id_key} - {cn_endpoint_info} - TTL {cnode_info_redis_ttl}"
)
return cn_endpoint_info


# Query the L1 set of audius protocol contracts and retrieve a list of peer endpoints
def retrieve_peers_from_eth_contracts(self):
shared_config = update_network_peers.shared_config
eth_web3 = update_network_peers.eth_web3
eth_registry_address = update_network_peers.eth_web3.toChecksumAddress(
shared_config["eth_contracts"]["registry"]
)
eth_registry_instance = eth_web3.eth.contract(
address=eth_registry_address, abi=get_eth_abi_values()["Registry"]["abi"]
)
sp_factory_address = eth_registry_instance.functions.getContract(
sp_factory_registry_key
).call()
sp_factory_inst = eth_web3.eth.contract(
address=sp_factory_address,
abi=get_eth_abi_values()["ServiceProviderFactory"]["abi"],
redis = update_network_peers.redis
eth_abi_values = update_network_peers.eth_abi_values
return fetch_all_registered_content_nodes(
eth_web3, shared_config, redis, eth_abi_values
)
total_cn_type_providers = sp_factory_inst.functions.getTotalServiceTypeProviders(
content_node_service_type
).call()
ids_list = list(range(1, total_cn_type_providers + 1))
eth_cn_endpoints_set = set()
# Given the total number of nodes in the network we can now fetch node info in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
fetch_cnode_futures = {
executor.submit(fetch_cnode_info, i, sp_factory_inst): i for i in ids_list
}
for future in concurrent.futures.as_completed(fetch_cnode_futures):
single_cnode_fetch_op = fetch_cnode_futures[future]
try:
cn_endpoint_info = future.result()
# Validate the endpoint on chain
# As endpoints get deregistered, this peering system must not slow down with failed connections
# or unanticipated load
eth_sp_endpoint = cn_endpoint_info[1]
valid_endpoint = is_fqdn(eth_sp_endpoint)
# Only valid FQDN strings are worth validating
if valid_endpoint:
eth_cn_endpoints_set.add(cn_endpoint_info[1])
except Exception as exc:
logger.error(
f"index_network_peers.py | ERROR in fetch_cnode_futures {single_cnode_fetch_op} generated {exc}"
)
# Return dictionary with key = endpoint, formatted as { endpoint: True }
return eth_cn_endpoints_set


# Determine the known set of distinct peers currently within a user replica set
Expand Down
4 changes: 2 additions & 2 deletions discovery-provider/src/tasks/user_replica_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
from sqlalchemy.orm.session import make_transient
from src.app import get_contract_addresses, get_eth_abi_values
from src.models import URSMContentNode
from src.tasks.index_network_peers import (
from src.tasks.users import invalidate_old_user, lookup_user_record
from src.utils.eth_contracts_helpers import (
content_node_service_type,
sp_factory_registry_key,
)
from src.tasks.users import invalidate_old_user, lookup_user_record
from src.utils.indexing_errors import IndexingError
from src.utils.redis_cache import get_pickled_key, get_sp_id_key
from src.utils.user_event_constants import (
Expand Down
76 changes: 76 additions & 0 deletions discovery-provider/src/utils/eth_contracts_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import concurrent.futures
import logging

from src.utils.helpers import is_fqdn
from src.utils.redis_cache import get_pickled_key, get_sp_id_key, pickle_and_set

logger = logging.getLogger(__name__)

sp_factory_registry_key = bytes("ServiceProviderFactory", "utf-8")
content_node_service_type = bytes("content-node", "utf-8")

cnode_info_redis_ttl = 1800


def fetch_cnode_info(sp_id, sp_factory_instance, redis):
sp_id_key = get_sp_id_key(sp_id)
sp_info_cached = get_pickled_key(redis, sp_id_key)
if sp_info_cached:
logger.info(
f"eth_contract_helpers.py | Found cached value for spID={sp_id} - {sp_info_cached}"
)
return sp_info_cached

cn_endpoint_info = sp_factory_instance.functions.getServiceEndpointInfo(
content_node_service_type, sp_id
).call()
pickle_and_set(redis, sp_id_key, cn_endpoint_info, cnode_info_redis_ttl)
logger.info(
f"eth_contract_helpers.py | Configured redis {sp_id_key} - {cn_endpoint_info} - TTL {cnode_info_redis_ttl}"
)
return cn_endpoint_info


def fetch_all_registered_content_nodes(
eth_web3, shared_config, redis, eth_abi_values
) -> set:
eth_registry_address = eth_web3.toChecksumAddress(
shared_config["eth_contracts"]["registry"]
)
eth_registry_instance = eth_web3.eth.contract(
address=eth_registry_address, abi=eth_abi_values["Registry"]["abi"]
)
sp_factory_address = eth_registry_instance.functions.getContract(
sp_factory_registry_key
).call()
sp_factory_inst = eth_web3.eth.contract(
address=sp_factory_address, abi=eth_abi_values["ServiceProviderFactory"]["abi"]
)
total_cn_type_providers = sp_factory_inst.functions.getTotalServiceTypeProviders(
content_node_service_type
).call()
ids_list = list(range(1, total_cn_type_providers + 1))
eth_cn_endpoints_set = set()
# Given the total number of nodes in the network we can now fetch node info in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
fetch_cnode_futures = {
executor.submit(fetch_cnode_info, i, sp_factory_inst, redis): i
for i in ids_list
}
for future in concurrent.futures.as_completed(fetch_cnode_futures):
single_cnode_fetch_op = fetch_cnode_futures[future]
try:
cn_endpoint_info = future.result()
# Validate the endpoint on chain
# As endpoints get deregistered, this peering system must not slow down with failed connections
# or unanticipated load
eth_sp_endpoint = cn_endpoint_info[1]
valid_endpoint = is_fqdn(eth_sp_endpoint)
# Only valid FQDN strings are worth validating
if valid_endpoint:
eth_cn_endpoints_set.add(eth_sp_endpoint)
except Exception as exc:
logger.error(
f"eth_contract_helpers.py | ERROR in fetch_cnode_futures {single_cnode_fetch_op} generated {exc}"
)
return eth_cn_endpoints_set
2 changes: 1 addition & 1 deletion discovery-provider/src/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def bytes32_to_str(bytes32input):

# Regex used to verify valid FQDN
fqdn_regex = re.compile(
r"^(?:^|[ \t])((https?:\/\/)?(?:localhost|[\w-]+(?:\.[\w-]+)+)(:\d+)?(\/\S*)?)$"
r"^(?:^|[ \t])((https?:\/\/)?(?:localhost|(cn[0-9]_creator-node_1:[0-9]+)|[\w-]+(?:\.[\w-]+)+)(:\d+)?(\/\S*)?)$"
)


Expand Down
9 changes: 8 additions & 1 deletion discovery-provider/src/utils/helpers_unit_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from src.utils.helpers import create_track_slug
from src.utils.helpers import create_track_slug, is_fqdn


def test_create_track_slug_normal_title():
Expand Down Expand Up @@ -55,3 +55,10 @@ def test_create_track_slug_trailing_spaces():
assert create_track_slug(" some track title ", 0, 0) == "some-track-title"
assert create_track_slug("some ( original mix )", 0, 0) == "some-original-mix"
assert create_track_slug("( some track title )", 0, 0) == "some-track-title"


def test_is_fqdn_url():
assert is_fqdn("https://validurl1.domain.com") == True
assert is_fqdn("http://validurl2.subdomain.domain.com") == True
assert is_fqdn("http://cn2_creator-node_1:4001") == True
assert is_fqdn("http://www.example.$com\and%26here.html") == False
38 changes: 35 additions & 3 deletions discovery-provider/src/utils/ipfs_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import ipfshttpclient
import requests
from src.utils.eth_contracts_helpers import fetch_all_registered_content_nodes
from src.utils.helpers import get_valid_multiaddr_from_id_json

logger = logging.getLogger(__name__)
Expand All @@ -15,11 +16,38 @@
class IPFSClient:
"""Helper class for Audius Discovery Provider + IPFS interaction"""

def __init__(self, ipfs_peer_host, ipfs_peer_port):
def __init__(
self,
ipfs_peer_host,
ipfs_peer_port,
eth_web3=None,
shared_config=None,
redis=None,
eth_abi_values=None,
):
self._api = ipfshttpclient.connect(
f"/dns/{ipfs_peer_host}/tcp/{ipfs_peer_port}/http"
)
self._cnode_endpoints = []
logger.warning("IPFSCLIENT | initializing")

# Fetch list of registered content nodes to use during init.
# During indexing, if ipfs fetch fails, _cnode_endpoints and user_replica_set are empty
# it might fail to find content and throw an error. To prevent race conditions between
# indexing starting and this getting populated, run this on init in the instance
# in the celery worker
if eth_web3 and shared_config and redis and eth_abi_values:
self._cnode_endpoints = list(
fetch_all_registered_content_nodes(
eth_web3, shared_config, redis, eth_abi_values
)
)
logger.warning(
f"IPFSCLIENT | fetch _cnode_endpoints on init got {self._cnode_endpoints}"
)
else:
self._cnode_endpoints = []
logger.warning("IPFSCLIENT | couldn't fetch _cnode_endpoints on init")

self._ipfsid = self._api.id()
self._multiaddr = get_valid_multiaddr_from_id_json(self._ipfsid)

Expand Down Expand Up @@ -258,7 +286,11 @@ def connect_peer(self, peer):
logger.error(e)

def update_cnode_urls(self, cnode_endpoints):
self._cnode_endpoints = cnode_endpoints
if len(cnode_endpoints):
logger.info(
f"IPFSCLIENT | update_cnode_urls with endpoints {cnode_endpoints}"
)
self._cnode_endpoints = cnode_endpoints

def ipfs_id_multiaddr(self):
return self._multiaddr
Expand Down

0 comments on commit 982ab6d

Please sign in to comment.