Skip to content

Commit

Permalink
[AUD-223] Set a TTL for latest block cache in redis (#1269)
Browse files Browse the repository at this point in the history
* Set a TTL for latest block in redis

* Move this variable definition

* Rename env var to include units with _sec

* Refactor this logic into itss own function

* lint

* lint

* Cleanup
  • Loading branch information
dmanjunath committed Mar 11, 2021
1 parent a440004 commit ba56bec
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 24 deletions.
1 change: 1 addition & 0 deletions discovery-provider/default_config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ loglevel_flask = DEBUG
; set log level via command line in docker yml files instead
; loglevel_celery = INFO
block_processing_window = 20
block_processing_interval_sec = 5
blacklist_block_processing_window = 600
blacklist_block_indexing_interval = 60
peer_refresh_interval = 3000
Expand Down
4 changes: 3 additions & 1 deletion discovery-provider/src/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,8 @@ def configure_celery(flask_app, celery, test_config=None):
database_url = test_config["db"]["url"]

ipld_interval = int(shared_config["discprov"]["blacklist_block_indexing_interval"])
# default is 5 seconds
indexing_interval_sec = int(shared_config["discprov"]["block_processing_interval_sec"])

# Update celery configuration
celery.conf.update(
Expand All @@ -312,7 +314,7 @@ def configure_celery(flask_app, celery, test_config=None):
beat_schedule={
"update_discovery_provider": {
"task": "update_discovery_provider",
"schedule": timedelta(seconds=5),
"schedule": timedelta(seconds=indexing_interval_sec),
},
"update_ipld_blacklist": {
"task": "update_ipld_blacklist",
Expand Down
8 changes: 5 additions & 3 deletions discovery-provider/src/api_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
from flask import jsonify

# pylint: disable=R0401
from src.utils import helpers
from src.utils import helpers, web3_provider
from src.utils.config import shared_config
from src.utils.redis_constants import latest_block_redis_key, most_recent_indexed_block_redis_key
from src.utils.redis_constants import most_recent_indexed_block_redis_key
from src.queries.get_health import get_latest_chain_block_set_if_nx

redis_url = shared_config["redis"]["url"]
redis = redis.Redis.from_url(url=redis_url)
web3_connection = web3_provider.get_web3()
logger = logging.getLogger(__name__)
disc_prov_version = helpers.get_discovery_provider_version()

Expand Down Expand Up @@ -50,7 +52,7 @@ def response_dict_with_metadata(response_dictionary, sign_response):
response_dictionary['success'] = True

latest_indexed_block = redis.get(most_recent_indexed_block_redis_key)
latest_chain_block = redis.get(latest_block_redis_key)
latest_chain_block, _ = get_latest_chain_block_set_if_nx(redis, web3_connection)

response_dictionary['latest_indexed_block'] = (int(latest_indexed_block) if latest_indexed_block else None)
response_dictionary['latest_chain_block'] = (int(latest_chain_block) if latest_chain_block else None)
Expand Down
81 changes: 63 additions & 18 deletions discovery-provider/src/queries/get_health.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

default_healthy_block_diff = int(
shared_config["discprov"]["healthy_block_diff"])
default_indexing_interval_seconds = int(
shared_config["discprov"]["block_processing_interval_sec"])

# Returns DB block state & diff
def _get_db_block_state():
Expand Down Expand Up @@ -110,27 +112,14 @@ def get_health(args, use_redis_cache=True):

latest_block_num = None
latest_block_hash = None

# Get latest web block info
if use_redis_cache:
stored_latest_block_num = redis.get(latest_block_redis_key)
if stored_latest_block_num is not None:
latest_block_num = int(stored_latest_block_num)

stored_latest_blockhash = redis.get(latest_block_hash_redis_key)
if stored_latest_blockhash is not None:
latest_block_hash = stored_latest_blockhash.decode("utf-8")

if latest_block_num is None or latest_block_hash is None:
latest_block = web3.eth.getBlock("latest", True)
latest_block_num = latest_block.number
latest_block_hash = latest_block.hash.hex()

latest_indexed_block_num = None
latest_indexed_block_hash = None

# Get latest indexed block info
if use_redis_cache:
# get latest blockchain state from redis cache, or fallback to chain if None
latest_block_num, latest_block_hash = get_latest_chain_block_set_if_nx(redis, web3)

# get latest db state from redis cache
latest_indexed_block_num = redis.get(
most_recent_indexed_block_redis_key)
if latest_indexed_block_num is not None:
Expand All @@ -142,7 +131,19 @@ def get_health(args, use_redis_cache=True):
latest_indexed_block_hash = latest_indexed_block_hash.decode(
"utf-8")

if latest_indexed_block_num is None or latest_indexed_block_hash is None:
# fetch latest blockchain state from web3 if:
# we explicitly don't want to use redis cache or
# value from redis cache is None
if not use_redis_cache or latest_block_num is None or latest_block_hash is None:
# get latest blockchain state from web3
latest_block = web3.eth.getBlock("latest", True)
latest_block_num = latest_block.number
latest_block_hash = latest_block.hash.hex()

# fetch latest db state if:
# we explicitly don't want to use redis cache or
# value from redis cache is None
if not use_redis_cache or latest_indexed_block_num is None or latest_indexed_block_hash is None:
db_block_state = _get_db_block_state()
latest_indexed_block_num = db_block_state["number"] or 0
latest_indexed_block_hash = db_block_state["blockhash"]
Expand Down Expand Up @@ -192,3 +193,47 @@ def get_health(args, use_redis_cache=True):
return health_results, True

return health_results, False

def get_latest_chain_block_set_if_nx(redis=None, web3=None):
"""
Retrieves the latest block number and blockhash from redis if the keys exist.
Otherwise it sets these values in redis by querying web3 and returns them
:param redis: redis connection
:param web3: web3 connection
:rtype (int, string)
"""

latest_block_num = None
latest_block_hash = None

if redis is None or web3 is None:
raise Exception(f"Invalid arguments for get_latest_chain_block_set_if_nx")

# also check for 'eth' attribute in web3 which means it's initialized and connected to a provider
if not hasattr(web3, 'eth'):
raise Exception(f"Invalid web3 argument for get_latest_chain_block_set_if_nx, web3 is not initialized")

stored_latest_block_num = redis.get(latest_block_redis_key)
if stored_latest_block_num is not None:
latest_block_num = int(stored_latest_block_num)

stored_latest_blockhash = redis.get(latest_block_hash_redis_key)
if stored_latest_blockhash is not None:
latest_block_hash = stored_latest_blockhash.decode("utf-8")

if latest_block_num is None or latest_block_hash is None:
latest_block = web3.eth.getBlock("latest", True)
latest_block_num = latest_block.number
latest_block_hash = latest_block.hash.hex()

# if we had attempted to use redis cache and the values weren't there, set the values now
try:
# ex sets expiration time and nx only sets if key doesn't exist in redis
redis.set(latest_block_redis_key, latest_block_num, ex=default_indexing_interval_seconds, nx=True)
redis.set(latest_block_hash_redis_key, latest_block_hash, ex=default_indexing_interval_seconds, nx=True)
except Exception as e:
logger.error(f"Could not set values in redis for get_latest_chain_block_set_if_nx: {e}")

return latest_block_num, latest_block_hash
6 changes: 4 additions & 2 deletions discovery-provider/src/tasks/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,11 @@ def get_latest_block(db):

def update_latest_block_redis():
latest_block_from_chain = update_task.web3.eth.getBlock('latest', True)
default_indexing_interval_seconds = int(update_task.shared_config["discprov"]["block_processing_interval_sec"])
redis = update_task.redis
redis.set(latest_block_redis_key, latest_block_from_chain.number)
redis.set(latest_block_hash_redis_key, latest_block_from_chain.hash.hex())
# these keys have a TTL which is the indexing interval
redis.set(latest_block_redis_key, latest_block_from_chain.number, ex=default_indexing_interval_seconds)
redis.set(latest_block_hash_redis_key, latest_block_from_chain.hash.hex(), ex=default_indexing_interval_seconds)

def fetch_tx_receipt(transaction):
web3 = update_task.web3
Expand Down

0 comments on commit ba56bec

Please sign in to comment.