Skip to content

Commit

Permalink
Create and use an in-memory list of records to be added (#1665)
Browse files Browse the repository at this point in the history
* Create an in-memory list of records to be added and check that in addition to the DB

* Use arguments to maintain state, keep state in tracks.py
  • Loading branch information
rickyrombo committed Jul 19, 2021
1 parent cc4200e commit 7a3935a
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 51 deletions.
167 changes: 118 additions & 49 deletions discovery-provider/src/tasks/tracks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import time
import functools
from datetime import datetime
from typing import Optional
from typing import List, Optional
from sqlalchemy.orm.session import make_transient
from sqlalchemy.sql import null, functions
from src.app import contract_addresses
Expand Down Expand Up @@ -49,6 +49,7 @@ def track_state_update(
track_contract = update_task.web3.eth.contract(
address=contract_addresses["track_factory"], abi=track_abi
)
pending_track_routes = List[TrackRoute]
track_events = {}
for tx_receipt in track_factory_txs:
txhash = update_task.web3.toHex(tx_receipt.transactionHash)
Expand Down Expand Up @@ -87,6 +88,7 @@ def track_state_update(
event_type,
track_events[track_id]["track"],
block_timestamp,
pending_track_routes,
)

# If track record object is None, it has a blacklisted metadata CID
Expand Down Expand Up @@ -206,7 +208,7 @@ def wrapper(*args, **kargs):


@time_method
def add_old_style_route(session, track_record, track_metadata):
def add_old_style_route(session, track_record, track_metadata, pending_track_routes):
"""Temporary method to add the old style routes to the track_routes db while we
transition the clients to use the new routing API.
"""
Expand All @@ -221,14 +223,24 @@ def add_old_style_route(session, track_record, track_metadata):
new_track_slug_title = f"{new_track_slug_title}-{track_record.track_id}"

# Check to make sure the route doesn't exist
existing_track_route = (
session.query(TrackRoute)
.filter(
TrackRoute.slug == new_track_slug_title,
TrackRoute.owner_id == track_record.owner_id,
)
.one_or_none()
existing_track_route = next(
(
route
for route in pending_track_routes
if route.slug == new_track_slug_title
and route.owner_id == track_record.owner_id
),
None,
)
if existing_track_route is None:
existing_track_route = (
session.query(TrackRoute)
.filter(
TrackRoute.slug == new_track_slug_title,
TrackRoute.owner_id == track_record.owner_id,
)
.one_or_none()
)

if existing_track_route is None:
# Add the new track route
Expand All @@ -246,8 +258,8 @@ def add_old_style_route(session, track_record, track_metadata):
new_track_route.txhash = track_record.txhash
session.add(new_track_route)

# Commit this so it gets in before the new route creation
session.commit()
# Add to in-memory store to make sure we don't try to add it twice
pending_track_routes.append(new_track_route)
else:
logger.error(
f"Cannot add 'old-style' track_route to Track={track_record}\
Expand All @@ -256,7 +268,9 @@ def add_old_style_route(session, track_record, track_metadata):


@time_method
def update_track_routes_table(session, track_record, track_metadata):
def update_track_routes_table(
session, track_record, track_metadata, pending_track_routes
):
"""Creates the route for the given track and commits it to the track_routes table"""

# Check if the title is staying the same, and if so, return early
Expand All @@ -269,13 +283,25 @@ def update_track_routes_table(session, track_record, track_metadata):
new_track_slug = new_track_slug_title

# Find the current route for the track
prev_track_route_record = (
session.query(TrackRoute)
.filter(
TrackRoute.track_id == track_record.track_id, TrackRoute.is_current == True
) # noqa: E712
.one_or_none()
# Check the pending track route updates first
prev_track_route_record = next(
(
route
for route in pending_track_routes
if route.is_current and route.track_id == track_record.track_id
),
None,
)
# Then query the DB if necessary
if prev_track_route_record is None:
prev_track_route_record = (
session.query(TrackRoute)
.filter(
TrackRoute.track_id == track_record.track_id,
TrackRoute.is_current == True,
) # noqa: E712
.one_or_none()
)

if prev_track_route_record is not None:
if prev_track_route_record.title_slug == new_track_slug_title:
Expand All @@ -285,27 +311,51 @@ def update_track_routes_table(session, track_record, track_metadata):
prev_track_route_record.is_current = False

# Check for collisions by slug titles, and get the max collision_id
max_collision_id = (
session.query(functions.max(TrackRoute.collision_id))
.filter(
TrackRoute.title_slug == new_track_slug_title,
TrackRoute.owner_id == track_record.owner_id,
)
.one_or_none()
)[0]
max_collision_id: Optional[int] = None
# Check pending updates first
for route in pending_track_routes:
if (
route.title_slug == new_track_slug_title
and route.owner_id == track_record.owner_id
):
max_collision_id = (
route.collision_id
if max_collision_id is None
else max(max_collision_id, route.collision_id)
)
# Check DB if necessary
if max_collision_id is None:
max_collision_id = (
session.query(functions.max(TrackRoute.collision_id))
.filter(
TrackRoute.title_slug == new_track_slug_title,
TrackRoute.owner_id == track_record.owner_id,
)
.one_or_none()
)[0]

existing_track_route: Optional[TrackRoute] = None
# If the new track_slug ends in a digit, there's a possibility it collides
# with an existing route when the collision_id is appended to its title_slug
if new_track_slug[-1].isdigit():
existing_track_route = (
session.query(TrackRoute)
.filter(
TrackRoute.slug == new_track_slug,
TrackRoute.owner_id == track_record.owner_id,
)
.one_or_none()
existing_track_route = next(
(
route
for route in pending_track_routes
if route.slug == new_track_slug
and route.owner_id == track_record.owner_id
),
None,
)
if existing_track_route is None:
existing_track_route = (
session.query(TrackRoute)
.filter(
TrackRoute.slug == new_track_slug,
TrackRoute.owner_id == track_record.owner_id,
)
.one_or_none()
)

new_collision_id = 0
has_collisions = existing_track_route is not None
Expand Down Expand Up @@ -340,14 +390,24 @@ def update_track_routes_table(session, track_record, track_metadata):
# - Use collision_id: 1, slug: 'track-1-1'
#
# This may be expensive with many collisions, but should be rare.
existing_track_route = (
session.query(TrackRoute)
.filter(
TrackRoute.slug == new_track_slug,
TrackRoute.owner_id == track_record.owner_id,
)
.one_or_none()
existing_track_route = next(
(
route
for route in pending_track_routes
if route.slug == new_track_slug
and route.owner_id == track_record.owner_id
),
None,
)
if existing_track_route is None:
existing_track_route = (
session.query(TrackRoute)
.filter(
TrackRoute.slug == new_track_slug,
TrackRoute.owner_id == track_record.owner_id,
)
.one_or_none()
)
has_collisions = existing_track_route is not None

# Add the new track route
Expand All @@ -363,12 +423,19 @@ def update_track_routes_table(session, track_record, track_metadata):
new_track_route.txhash = track_record.txhash
session.add(new_track_route)

# Make sure to commit so we don't add the same route twice
session.commit()
# Add to pending track routes so we don't add the same route twice
pending_track_routes.append(new_track_route)


def parse_track_event(
self, session, update_task, entry, event_type, track_record, block_timestamp
self,
session,
update_task,
entry,
event_type,
track_record,
block_timestamp,
pending_track_routes,
):
event_args = entry["args"]
# Just use block_timestamp as integer
Expand Down Expand Up @@ -410,9 +477,10 @@ def parse_track_event(
track_metadata_multihash, track_metadata_format, creator_node_endpoint
)

# Note: These will commit the session
add_old_style_route(session, track_record, track_metadata)
update_track_routes_table(session, track_record, track_metadata)
add_old_style_route(session, track_record, track_metadata, pending_track_routes)
update_track_routes_table(
session, track_record, track_metadata, pending_track_routes
)
track_record = populate_track_record_metadata(
track_record, track_metadata, handle
)
Expand Down Expand Up @@ -471,9 +539,10 @@ def parse_track_event(
upd_track_metadata_multihash, track_metadata_format, creator_node_endpoint
)

# Note: These will commit the session
add_old_style_route(session, track_record, track_metadata)
update_track_routes_table(session, track_record, track_metadata)
add_old_style_route(session, track_record, track_metadata, pending_track_routes)
update_track_routes_table(
session, track_record, track_metadata, pending_track_routes
)
track_record = populate_track_record_metadata(
track_record, track_metadata, handle
)
Expand Down
19 changes: 17 additions & 2 deletions discovery-provider/tests/test_index_tracks.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ def test_index_tracks(mock_index_task, app):
db = get_db()

update_task = UpdateTask(ipfs_client, web3)
pending_track_routes = []

with db.scoped_session() as session:
# ================== Test New Track Event ==================
Expand Down Expand Up @@ -236,6 +237,7 @@ def test_index_tracks(mock_index_task, app):
event_type, # String that should one of user_event_types_lookup
track_record, # User ORM instance
block_timestamp, # Used to update the user.updated_at field
pending_track_routes,
)

# updated_at should be updated every parse_track_event
Expand Down Expand Up @@ -302,7 +304,14 @@ def test_index_tracks(mock_index_task, app):

event_type, entry = get_update_track_event()
parse_track_event(
None, session, update_task, entry, event_type, track_record, block_timestamp
None,
session,
update_task,
entry,
event_type,
track_record,
block_timestamp,
pending_track_routes,
)

# Check that track routes are updated appropriately
Expand Down Expand Up @@ -371,6 +380,7 @@ def test_index_tracks(mock_index_task, app):
event_type,
track_record_dupe,
block_timestamp,
pending_track_routes,
)

# Check that track routes are assigned appropriately
Expand Down Expand Up @@ -406,6 +416,7 @@ def test_index_tracks(mock_index_task, app):
event_type,
track_record_dupe,
block_timestamp,
pending_track_routes,
)

# Check that track routes are assigned appropriately
Expand All @@ -430,8 +441,11 @@ def test_index_tracks(mock_index_task, app):
)
assert track_route

# Make sure the blocks are committed
session.commit()
pending_track_routes.clear()
revert_blocks(mock_index_task, db, [second_block])

# Commit the revert
session.commit()

track_routes = session.query(TrackRoute).all()
Expand Down Expand Up @@ -466,6 +480,7 @@ def test_index_tracks(mock_index_task, app):
event_type, # String that should one of user_event_types_lookup
track_record, # User ORM instance
block_timestamp, # Used to update the user.updated_at field
pending_track_routes,
)

# updated_at should be updated every parse_track_event
Expand Down

0 comments on commit 7a3935a

Please sign in to comment.