Skip to content

Commit

Permalink
[Release 0.3.39 -> Master] Ensure failure prevents progress (#1650)
Browse files Browse the repository at this point in the history
* Ensure forward progress is prevented for any thrown exceptions
* Add query exposing latest plays written to solana
* Cherrypick of 355a943
  • Loading branch information
hareeshnagaraj committed Jul 15, 2021
1 parent 5629dc0 commit c3bbe17
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 9 deletions.
20 changes: 20 additions & 0 deletions discovery-provider/src/queries/get_sol_plays.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,26 @@ def get_sol_play(sol_tx_signature):

return sol_play

# Get last x sol specific plays
def get_latest_sol_plays(limit=10):
db = get_db_read_replica()

# Cap max returned db entries
limit = min(limit, 100)

sol_plays = None
with db.scoped_session() as session:
base_query = (
session.query(Play)
.order_by(desc(Play.slot))
.filter(Play.slot != None)
.limit(limit)
)
query_results = base_query.all()
if query_results:
sol_plays = helpers.query_result_to_list(query_results)

return sol_plays

# For the n most recently listened to tracks, return the all time listen counts for those tracks
def get_track_listen_milestones(limit=100):
Expand Down
41 changes: 40 additions & 1 deletion discovery-provider/src/queries/health_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
from src.queries.get_latest_play import get_latest_play
from src.queries.queries import parse_bool_param
from src.queries.get_health import get_health, get_latest_ipld_indexed_block
from src.queries.get_sol_plays import get_latest_sol_plays
from src.api_helpers import success_response
from src.utils import helpers
from src.utils import helpers, redis_connection
from src.utils.redis_cache import get_pickled_key
from src.utils.redis_constants import latest_sol_play_tx_key

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -67,6 +70,42 @@ def play_check():
return success_response(latest_play, 500 if error else 200, sign_response=False)


# Health check for latest play stored in the db
@bp.route("/sol_play_check", methods=["GET"])
def sol_play_check():
"""
limit: number of latest plays to return
max_drift: maximum duration in seconds between `now` and the
latest recorded play record to be considered healthy
"""
limit = request.args.get("limit", type=int, default=20)
max_drift = request.args.get("max_drift", type=int)
redis = redis_connection.get_redis()

latest_db_sol_plays = get_latest_sol_plays(limit)
latest_cached_sol_tx = get_pickled_key(redis, latest_sol_play_tx_key)

response = {
'chain_tx': latest_cached_sol_tx,
'db_info': latest_db_sol_plays
}

error = None

if latest_db_sol_plays:
latest_db_play = latest_db_sol_plays[0]
latest_created_at = latest_db_play['created_at']
drift = (datetime.now() - latest_created_at).total_seconds()

# Error if max drift was provided and the drift is greater than max_drift
error = max_drift and drift > max_drift

return success_response(
response,
500 if error else 200,
sign_response=False
)

@bp.route("/ipld_block_check", methods=["GET"])
def ipld_block_check():
use_redis_cache = parse_bool_param(request.args.get("use_cache"))
Expand Down
44 changes: 36 additions & 8 deletions discovery-provider/src/tasks/index_solana_plays.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from src.models import Play
from src.tasks.celery_app import celery
from src.utils.config import shared_config
from src.utils.redis_cache import pickle_and_set
from src.utils.redis_constants import latest_sol_play_tx_key

TRACK_LISTEN_PROGRAM = shared_config["solana"]["track_listen_count_address"]
SIGNER_GROUP = shared_config["solana"]["signer_group_address"]
Expand Down Expand Up @@ -94,7 +96,17 @@ def get_sol_tx_info(solana_client, tx_sig):
)
retries -= 1
logger.error(f"index_solana_plays.py | Retrying tx fetch: {tx_sig}")
raise Exception(f"index_solana_plays.py | Failed to fetch {tx_sig}")

# Cache the latest value in redis
def cache_latest_tx_redis(solana_client, redis, tx):
try:
tx_sig = tx['signature']
tx_slot = tx['slot']
pickle_and_set(redis, latest_sol_play_tx_key, {'signature': tx_sig, 'slot': tx_slot})
except Exception as e:
logger.error(f"index_solana_plays.py | Failed to cache latest transaction {tx}, {e}")
raise e

# Check for both SECP and SignerGroup
# Ensures that a signature recovery was performed within the expected SignerGroup
Expand All @@ -110,7 +122,14 @@ def is_valid_tx(account_keys):
def parse_sol_play_transaction(session, solana_client, tx_sig):
try:
tx_info = get_sol_tx_info(solana_client, tx_sig)
logger.info(f"index_solana_plays.py | Got transaction: {tx_sig} | {tx_info}")
logger.info(
f"index_solana_plays.py | Got transaction: {tx_sig} | {tx_info}"
)
meta = tx_info['result']['meta']
error = meta['err']
if error:
logger.info(f"index_solana_plays.py | Skipping error transaction from chain {tx_info}")
return
if is_valid_tx(tx_info["result"]["transaction"]["message"]["accountKeys"]):
audius_program_index = tx_info["result"]["transaction"]["message"][
"accountKeys"
Expand Down Expand Up @@ -153,6 +172,7 @@ def parse_sol_play_transaction(session, solana_client, tx_sig):
logger.error(
f"index_solana_plays.py | Error processing {tx_sig}, {e}", exc_info=True
)
raise e


# Query the highest traversed solana slot
Expand Down Expand Up @@ -265,9 +285,7 @@ def get_tx_in_db(session, tx_sig):
This is performed by simply slicing the tx_batches array and discarding the newest transactions until an intersection
is found - these limiting parameters are defined as TX_SIGNATURES_MAX_BATCHES, TX_SIGNATURES_RESIZE_LENGTH
"""


def process_solana_plays(solana_client):
def process_solana_plays(solana_client, redis):
try:
base58.b58decode(TRACK_LISTEN_PROGRAM)
except ValueError:
Expand All @@ -294,13 +312,15 @@ def process_solana_plays(solana_client):

last_tx_signature = None

# Current batch
page_count = 0

# Traverse recent records until an intersection is found with existing Plays table
while not intersection_found:
transactions_history = solana_client.get_confirmed_signature_for_address2(
TRACK_LISTEN_PROGRAM, before=last_tx_signature, limit=100
)
transactions_array = transactions_history["result"]

if not transactions_array:
# This is considered an 'intersection' since there are no further transactions to process but
# really represents the end of known history for this ProgramId
Expand All @@ -309,6 +329,9 @@ def process_solana_plays(solana_client):
f"index_solana_plays.py | No transactions found before {last_tx_signature}"
)
else:
# Cache latest transaction from chain
if page_count == 0:
cache_latest_tx_redis(solana_client, redis, transactions_array[0])
with db.scoped_session() as read_session:
for tx in transactions_array:
tx_sig = tx["signature"]
Expand Down Expand Up @@ -356,9 +379,13 @@ def process_solana_plays(solana_client):

# Reset batch state
transaction_signature_batch = []

logger.info(
f"index_solana_plays.py | intersection_found={intersection_found}, last_tx_signature={last_tx_signature}"
f"index_solana_plays.py | intersection_found={intersection_found},\
last_tx_signature={last_tx_signature},\
page_count={page_count}"
)
page_count = page_count + 1

logger.info(
f"index_solana_plays.py | {transaction_signatures}, {len(transaction_signatures)} entries"
Expand All @@ -371,7 +398,7 @@ def process_solana_plays(solana_client):
num_txs_processed = 0

for tx_sig_batch in transaction_signatures:
logger.error(f"index_solana_plays.py | processing {tx_sig_batch}")
logger.info(f"index_solana_plays.py | processing {tx_sig_batch}")
batch_start_time = time.time()
# Process each batch in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
Expand All @@ -389,6 +416,7 @@ def process_solana_plays(solana_client):
num_txs_processed += 1
except Exception as exc:
logger.error(f"index_solana_plays.py | {exc}")
raise exc

batch_end_time = time.time()
batch_duration = batch_end_time - batch_start_time
Expand Down Expand Up @@ -416,7 +444,7 @@ def index_solana_plays(self):
have_lock = update_lock.acquire(blocking=False)
if have_lock:
logger.info("index_solana_plays.py | Acquired lock")
process_solana_plays(solana_client)
process_solana_plays(solana_client, redis)
else:
logger.info("index_solana_plays.py | Failed to acquire lock")
except Exception as e:
Expand Down
1 change: 1 addition & 0 deletions discovery-provider/src/utils/redis_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
trending_playlists_last_completion_redis_key = "trending-playlists:last-completion"
challenges_last_processed_event_redis_key = "challenges:last-processed-event"
user_balances_refresh_last_completion_redis_key = "user_balances:last-completion"
latest_sol_play_tx_key = "latest_sol_play_tx_key"

0 comments on commit c3bbe17

Please sign in to comment.