Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 12 additions & 18 deletions indexer_app/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from celery import shared_task
from celery.signals import task_revoked, worker_shutdown
from django.conf import settings
from django.core.cache import cache
from django.db import transaction
from django.db.models import Count, DecimalField, Q, Sum, Value
from django.db.models.functions import Cast, NullIf
Expand All @@ -21,7 +20,7 @@
from pots.models import Pot, PotPayout

from .logging import logger
from .utils import cache_block_height, get_block_height
from .utils import get_block_height, save_block_height

CURRENT_BLOCK_HEIGHT_KEY = "current_block_height"

Expand Down Expand Up @@ -62,23 +61,16 @@ async def indexer(from_block: int, to_block: int):
block_count += 1

# Log time before caching block height
cache_start_time = time.time()
# Fire and forget the cache update
asyncio.create_task(
cache_block_height(
CURRENT_BLOCK_HEIGHT_KEY,
streamer_message.block.header.height,
block_count,
streamer_message.block.header.timestamp,
) # current block height
# cache.aset(
# CURRENT_BLOCK_HEIGHT_KEY, streamer_message.block.header.height
# )
save_start_time = time.time()
# Update current block height
await save_block_height(
streamer_message.block.header.height,
streamer_message.block.header.timestamp,
)
cache_end_time = time.time()
save_end_time = time.time()

logger.info(
f"Time to cache block height: {cache_end_time - cache_start_time:.4f} seconds"
f"Time to save block height: {save_end_time - save_start_time:.4f} seconds"
)

# Log time before handling the streamer message
Expand Down Expand Up @@ -107,7 +99,7 @@ def listen_to_near_events():

try:
# Update below with desired network & block height
start_block = get_block_height(CURRENT_BLOCK_HEIGHT_KEY)
start_block = get_block_height()
# start_block = 119_568_113
logger.info(f"what's the start block, pray tell? {start_block-1}")
loop.run_until_complete(indexer(start_block - 1, None))
Expand Down Expand Up @@ -219,7 +211,9 @@ def update_pot_statistics():
jobs_logger.info(f"Total matching pool USD: {pot.total_matching_pool_usd}")

# matching pool balance (get from contract)
url = f"{settings.FASTNEAR_RPC_URL}/account/{pot.account.id}/view/get_config"
url = (
f"{settings.FASTNEAR_RPC_URL}/account/{pot.account.id}/view/get_config"
)
response = requests.get(url)
if response.status_code != 200:
jobs_logger.error(
Expand Down
28 changes: 10 additions & 18 deletions indexer_app/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
import json
from datetime import datetime
from math import log
from asgiref.sync import sync_to_async

import requests
from asgiref.sync import sync_to_async
from django.conf import settings
from django.core.cache import cache
from django.forms.models import model_to_dict
Expand Down Expand Up @@ -691,7 +691,7 @@ async def handle_set_payouts(data: dict, receiver_id: str, receipt: Receipt):
else:
data = response.json()
pot = await Pot.objects.aget(account=receiver_id)
pot.cooldown_end =datetime.fromtimestamp(data["cooldown_end_ms"] / 1000)
pot.cooldown_end = datetime.fromtimestamp(data["cooldown_end_ms"] / 1000)
await pot.asave()
except Exception as e:
logger.error(f"Failed to set payouts, Error: {e}")
Expand Down Expand Up @@ -822,7 +822,7 @@ async def handle_add_factory_deployers(data, receiverId):
except Exception as e:
logger.error(f"Failed to add factory whitelisted deployers, Error: {e}")


async def handle_set_factory_configs(data, receiverId):
logger.info(f"setting factory configs...: {data}, {receiverId}")
try:
Expand All @@ -832,6 +832,7 @@ async def handle_set_factory_configs(data, receiverId):
except Exception as e:
logger.error(f"Failed to update factory configs, Error: {e}")


# # TODO: Need to abstract some actions.
# async def handle_batch_donations(
# receiver_id: str,
Expand Down Expand Up @@ -1185,28 +1186,19 @@ async def handle_new_group(data: dict, created_at: datetime):
logger.error(f"Failed to create group, because: {e}")


async def cache_block_height(
key: str, height: int, block_count: int, block_timestamp: int
):
logger.info(f"caching block height: {height}")
await cache.aset(key, height)
# the cache os the default go to for the restart block, the db is a backup if the redis server crashes.
# if (block_count % int(settings.BLOCK_SAVE_HEIGHT or 400)) == 0:
# logger.info(f"saving daylight, {height}")
async def save_block_height(block_height: int, block_timestamp: int):
await BlockHeight.objects.aupdate_or_create(
id=1,
defaults={
"block_height": height,
"block_height": block_height,
"block_timestamp": datetime.fromtimestamp(block_timestamp / 1000000000),
"updated_at": timezone.now(),
},
) # better than ovverriding model's save method to get a singleton? we need only one entry
# return height


def get_block_height(key: str) -> int:
block_height = cache.get(key)
if not block_height:
record = BlockHeight.objects.filter(id=1).first()
block_height = 104_922_190 if not record else record.block_height
return block_height
def get_block_height() -> int:
record = BlockHeight.objects.filter(id=1).first()
if record:
return record.block_height