Skip to content

Commit

Permalink
Trending fix web3 (#4626)
Browse files Browse the repository at this point in the history
  • Loading branch information
jowlee committed Jan 18, 2023
1 parent 1b6c693 commit 94c624e
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ def test_index_trending_notification(app):
"users": [{}] * 2,
}
populate_mock_db(db, entities)
base_time_int = int(round(BASE_TIME.timestamp()))

index_trending_notifications(db, BASE_TIME)
index_trending_notifications(db, base_time_int)
with db.scoped_session() as session:
notifications = (
session.query(Notification).order_by(asc(Notification.specifier)).all()
Expand Down Expand Up @@ -105,8 +106,9 @@ def test_index_trending_notification(app):
trending_tracks = (trending_tracks, track_ids)
set_json_cached_key(redis, trending_key, trending_tracks)
updated_time = BASE_TIME + timedelta(hours=1)
updated_time_int = int(round(updated_time.timestamp()))

index_trending_notifications(db, updated_time)
index_trending_notifications(db, updated_time_int)
with db.scoped_session() as session:
all_notifications = session.query(Notification).all()
assert len(all_notifications) == 6
Expand All @@ -127,8 +129,9 @@ def test_index_trending_notification(app):
}

updated_time = BASE_TIME + timedelta(hours=24)
updated_time_int = int(round(updated_time.timestamp()))

index_trending_notifications(db, updated_time)
index_trending_notifications(db, updated_time_int)
with db.scoped_session() as session:
all_notifications = session.query(Notification).all()
assert len(all_notifications) == 9
Expand Down
26 changes: 16 additions & 10 deletions discovery-provider/src/tasks/index_trending.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from src.utils.redis_cache import set_json_cached_key
from src.utils.redis_constants import trending_tracks_last_completion_redis_key
from src.utils.session_manager import SessionManager
from src.utils.web3_provider import get_web3
from web3 import Web3

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -215,7 +216,7 @@ def index_trending(self, db: SessionManager, redis: Redis, timestamp):
index_trending_notifications(db, timestamp)


def index_trending_notifications(db: SessionManager, timestamp: datetime):
def index_trending_notifications(db: SessionManager, timestamp: int):
# Get the top 5 trending tracks from the new trending calculations
# Get the most recent trending tracks notifications
# Calculate any diff and write the new notifications if the trending track has moved up in rank
Expand Down Expand Up @@ -268,7 +269,6 @@ def index_trending_notifications(db: SessionManager, timestamp: datetime):
previous_trending = {
n[0]: {"timestamp": n[1], **n[2]} for n in previous_trending_notifications
}
logger.info(previous_trending)

notifications = []

Expand All @@ -279,23 +279,22 @@ def index_trending_notifications(db: SessionManager, timestamp: datetime):
track_id = track["track_id"]
rank = index + 1
previous_track_notification = previous_trending.get(str(track["track_id"]))
logger.info(previous_track_notification)
if previous_track_notification is not None:
current_datetime = datetime.fromtimestamp(timestamp)
prev_notification_datetime = datetime.fromtimestamp(
previous_track_notification["timestamp"].timestamp()
)
if (
timestamp - prev_notification_datetime
current_datetime - prev_notification_datetime
).total_seconds() < NOTIFICATION_INTERVAL_SEC:
continue
prev_rank = previous_track_notification["rank"]
if prev_rank <= rank:
continue

notifications.append(
{
"owner_id": track["owner_id"],
"group_id": f"trending:time_range:week:genre:all:rank:{rank}:track_id:{track_id}:timestamp:{int(timestamp.timestamp())}",
"group_id": f"trending:time_range:week:genre:all:rank:{rank}:track_id:{track_id}:timestamp:{timestamp}",
"track_id": track_id,
"rank": rank,
}
Expand All @@ -305,7 +304,7 @@ def index_trending_notifications(db: SessionManager, timestamp: datetime):
[
Notification(
user_ids=[n["owner_id"]],
timestamp=timestamp,
timestamp=datetime.fromtimestamp(timestamp),
type="trending",
group_id=n["group_id"],
specifier=n["track_id"],
Expand All @@ -319,6 +318,10 @@ def index_trending_notifications(db: SessionManager, timestamp: datetime):
for n in notifications
]
)
logger.info(
"index_trending.py | Created trending notifications",
extra={"job": "index_trending", "subtask": "trending notification"},
)


last_trending_timestamp = "last_trending_timestamp"
Expand Down Expand Up @@ -364,14 +367,17 @@ def find_min_block_above_timestamp(block_number: int, min_timestamp: datetime, w
if prev_timestamp >= min_timestamp:
block = prev_block
curr_block_number -= 1
else:
return block

return block


def get_block(web3, block_number: int):
final_poa_block = helpers.get_final_poa_block(shared_config)
if final_poa_block:
nethermin_block_number = block_number - final_poa_block
return web3.eth.get_block(nethermin_block_number, True)
nethermind_block_number = block_number - final_poa_block
return web3.eth.get_block(nethermind_block_number, True)
else:
return web3.eth.get_block(block_number, True)

Expand Down Expand Up @@ -423,7 +429,7 @@ def index_trending_task(self):
"""Caches all trending combination of time-range and genre (including no genre)."""
db = index_trending_task.db
redis = index_trending_task.redis
web3 = index_trending_task.web3
web3 = get_web3()
have_lock = False
timeout = 60 * 60 * 2
update_lock = redis.lock("index_trending_lock", timeout=timeout)
Expand Down

0 comments on commit 94c624e

Please sign in to comment.