From 76fb5efe67f9ac57554665b75c3d66f8c4f503b6 Mon Sep 17 00:00:00 2001 From: Lachlan Glen <54282009+lachlanglen@users.noreply.github.com> Date: Thu, 25 Jul 2024 11:53:04 -0400 Subject: [PATCH] remove caching of blockheight (just save in db) --- indexer_app/tasks.py | 30 ++++++++++++------------------ indexer_app/utils.py | 28 ++++++++++------------------ 2 files changed, 22 insertions(+), 36 deletions(-) diff --git a/indexer_app/tasks.py b/indexer_app/tasks.py index c0853c1..dfeed8a 100644 --- a/indexer_app/tasks.py +++ b/indexer_app/tasks.py @@ -8,7 +8,6 @@ from celery import shared_task from celery.signals import task_revoked, worker_shutdown from django.conf import settings -from django.core.cache import cache from django.db import transaction from django.db.models import Count, DecimalField, Q, Sum, Value from django.db.models.functions import Cast, NullIf @@ -21,7 +20,7 @@ from pots.models import Pot, PotPayout from .logging import logger -from .utils import cache_block_height, get_block_height +from .utils import get_block_height, save_block_height CURRENT_BLOCK_HEIGHT_KEY = "current_block_height" @@ -62,23 +61,16 @@ async def indexer(from_block: int, to_block: int): block_count += 1 # Log time before caching block height - cache_start_time = time.time() - # Fire and forget the cache update - asyncio.create_task( - cache_block_height( - CURRENT_BLOCK_HEIGHT_KEY, - streamer_message.block.header.height, - block_count, - streamer_message.block.header.timestamp, - ) # current block height - # cache.aset( - # CURRENT_BLOCK_HEIGHT_KEY, streamer_message.block.header.height - # ) + save_start_time = time.time() + # Update current block height + await save_block_height( + streamer_message.block.header.height, + streamer_message.block.header.timestamp, ) - cache_end_time = time.time() + save_end_time = time.time() logger.info( - f"Time to cache block height: {cache_end_time - cache_start_time:.4f} seconds" + f"Time to save block height: {save_end_time - save_start_time:.4f} seconds" ) # Log time before handling the streamer message @@ -107,7 +99,7 @@ def listen_to_near_events(): try: # Update below with desired network & block height - start_block = get_block_height(CURRENT_BLOCK_HEIGHT_KEY) + start_block = get_block_height() # start_block = 119_568_113 logger.info(f"what's the start block, pray tell? {start_block-1}") loop.run_until_complete(indexer(start_block - 1, None)) @@ -219,7 +211,9 @@ def update_pot_statistics(): jobs_logger.info(f"Total matching pool USD: {pot.total_matching_pool_usd}") # matching pool balance (get from contract) - url = f"{settings.FASTNEAR_RPC_URL}/account/{pot.account.id}/view/get_config" + url = ( + f"{settings.FASTNEAR_RPC_URL}/account/{pot.account.id}/view/get_config" + ) response = requests.get(url) if response.status_code != 200: jobs_logger.error( diff --git a/indexer_app/utils.py b/indexer_app/utils.py index 7a2a143..20f8a21 100644 --- a/indexer_app/utils.py +++ b/indexer_app/utils.py @@ -2,9 +2,9 @@ import json from datetime import datetime from math import log -from asgiref.sync import sync_to_async import requests +from asgiref.sync import sync_to_async from django.conf import settings from django.core.cache import cache from django.forms.models import model_to_dict @@ -691,7 +691,7 @@ async def handle_set_payouts(data: dict, receiver_id: str, receipt: Receipt): else: data = response.json() pot = await Pot.objects.aget(account=receiver_id) - pot.cooldown_end =datetime.fromtimestamp(data["cooldown_end_ms"] / 1000) + pot.cooldown_end = datetime.fromtimestamp(data["cooldown_end_ms"] / 1000) await pot.asave() except Exception as e: logger.error(f"Failed to set payouts, Error: {e}") @@ -822,7 +822,7 @@ async def handle_add_factory_deployers(data, receiverId): except Exception as e: logger.error(f"Failed to add factory whitelisted deployers, Error: {e}") - + async def handle_set_factory_configs(data, receiverId): logger.info(f"setting factory configs...: {data}, {receiverId}") try: @@ -832,6 +832,7 @@ async def handle_set_factory_configs(data, receiverId): except Exception as e: logger.error(f"Failed to update factory configs, Error: {e}") + # # TODO: Need to abstract some actions. # async def handle_batch_donations( # receiver_id: str, @@ -1185,18 +1186,11 @@ async def handle_new_group(data: dict, created_at: datetime): logger.error(f"Failed to create group, because: {e}") -async def cache_block_height( - key: str, height: int, block_count: int, block_timestamp: int -): - logger.info(f"caching block height: {height}") - await cache.aset(key, height) - # the cache os the default go to for the restart block, the db is a backup if the redis server crashes. - # if (block_count % int(settings.BLOCK_SAVE_HEIGHT or 400)) == 0: - # logger.info(f"saving daylight, {height}") +async def save_block_height(block_height: int, block_timestamp: int): await BlockHeight.objects.aupdate_or_create( id=1, defaults={ - "block_height": height, + "block_height": block_height, "block_timestamp": datetime.fromtimestamp(block_timestamp / 1000000000), "updated_at": timezone.now(), }, @@ -1204,9 +1198,7 @@ async def cache_block_height( # return height -def get_block_height(key: str) -> int: - block_height = cache.get(key) - if not block_height: - record = BlockHeight.objects.filter(id=1).first() - block_height = 104_922_190 if not record else record.block_height - return block_height +def get_block_height() -> int: + record = BlockHeight.objects.filter(id=1).first() + if record: + return record.block_height