Skip to content

Commit

Permalink
Refresh discprov cnodes list from new identity route, to ensure newly…
Browse files Browse the repository at this point in the history
… added cnodes are integrated on registration (#963)
  • Loading branch information
SidSethi committed Oct 22, 2020
1 parent 728e09d commit f7c8be7
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 2 deletions.
27 changes: 26 additions & 1 deletion discovery-provider/src/tasks/index.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from urllib.parse import urljoin
import logging
import requests

from src import contract_addresses
from src.models import Block, User, Track, Repost, Follow, Playlist, Save
Expand All @@ -13,7 +15,7 @@
latest_block_hash_redis_key, most_recent_indexed_block_hash_redis_key, \
most_recent_indexed_block_redis_key
from src.utils.redis_cache import remove_cached_user_ids, \
remove_cached_track_ids, remove_cached_playlist_ids
remove_cached_track_ids, remove_cached_playlist_ids, use_redis_cache

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -427,6 +429,22 @@ def revert_blocks(self, db, revert_blocks_list):

# TODO - if we enable revert, need to set the most_recent_indexed_block_redis_key key in redis

# calls GET identityservice/registered_creator_nodes to retrieve creator nodes currently registered on chain
def fetch_cnode_endpoints_from_chain(task_context):
try:
identity_url = task_context.shared_config['discprov']['identity_service_url']
identity_endpoint = urljoin(identity_url, 'registered_creator_nodes')

r = requests.get(identity_endpoint, timeout=3)
if r.status_code != 200:
raise Exception(f"Query to identity_endpoint failed with status code {r.status_code}")

registered_cnodes = r.json()
logger.info(f"Fetched registered creator nodes from chain via {identity_endpoint}")
return registered_cnodes
except Exception as e:
logger.error(f"Identity fetch failed {e}")
return []

######## IPFS PEER REFRESH ########
def refresh_peer_connections(task_context):
Expand Down Expand Up @@ -456,6 +474,13 @@ def refresh_peer_connections(task_context):
user_node_url = task_context.shared_config["discprov"]["user_metadata_service_url"]
cnode_endpoints[user_node_url] = True

# update cnode_endpoints with list of creator nodes registered on chain, on 30s refresh interval
registered_cnodes = use_redis_cache(
'registered_cnodes_from_identity', 30, lambda: fetch_cnode_endpoints_from_chain(task_context)
)
for node_info in registered_cnodes:
cnode_endpoints[node_info['endpoint']] = True

# Update creator node list
ipfs_client.update_cnode_urls(list(cnode_endpoints.keys()))

Expand Down
2 changes: 1 addition & 1 deletion discovery-provider/src/utils/redis_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def extract_key(path, arg_items):
return key

def use_redis_cache(key, ttl_sec, work_func):
"""Attemps to return value by key, otherwise cahces and returns `work_func`"""
"""Attemps to return value by key, otherwise caches and returns `work_func`"""
redis = redis_connection.get_redis()
cached_value = redis.get(key)

Expand Down
9 changes: 9 additions & 0 deletions identity-service/src/routes/ethRelay.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,13 @@ module.exports = function (app) {
let gasInfo = await ethTxRelay.getProdGasInfo(req.app.get('redis'), req.logger)
return successResponse(gasInfo)
}))

/**
* Queries and returns all registered content nodes from chain
*/
app.get('/registered_creator_nodes', handleResponse(async (req, res, next) => {
const audiusLibsInstance = req.app.get('audiusLibs')
const creatorNodes = await audiusLibsInstance.ethContracts.ServiceProviderFactoryClient.getServiceProviderList('creator-node')
return successResponse(creatorNodes)
}))
}

0 comments on commit f7c8be7

Please sign in to comment.