Skip to content

Commit

Permalink
Refactor fetching CID metadata into util (#2819)
Browse files Browse the repository at this point in the history
  • Loading branch information
isaacsolo committed Apr 4, 2022
1 parent 1bbe885 commit 0aa7336
Show file tree
Hide file tree
Showing 13 changed files with 117 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def __init__(self, *args, **kwargs):
self.__dict__ = self


class IPFSClient:
class CIDMetadataClient:
def __init__(self, metadata_dict):
self.metadata_dict = metadata_dict

Expand All @@ -19,8 +19,8 @@ def get_metadata(self, multihash, format, endpoint):


class UpdateTask:
def __init__(self, ipfs_client, web3, challenge_event_bus, redis=None):
self.ipfs_client = ipfs_client
def __init__(self, cid_metadata_client, web3, challenge_event_bus, redis=None):
self.cid_metadata_client = cid_metadata_client
self.web3 = web3
self.challenge_event_bus: ChallengeEventBus = challenge_event_bus
self.redis = redis
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def test_index_operations(celery_app, celery_app_contracts, mocker):
new_user_id = seed_data["new_user_id"]

mocker.patch(
"src.utils.ipfs_lib.IPFSClient._get_gateway_endpoints",
"src.utils.cid_metadata_client.CIDMetadataClient._get_gateway_endpoints",
return_value=["https://test-content-node.audius.co"],
autospec=True,
)
Expand Down Expand Up @@ -260,7 +260,7 @@ def fetch_metadata_stub(*_, should_fetch_from_replica_set):
raise Exception("Broken fetch")

mocker.patch(
"src.utils.ipfs_lib.IPFSClient.fetch_metadata_from_gateway_endpoints",
"src.utils.cid_metadata_client.CIDMetadataClient.fetch_metadata_from_gateway_endpoints",
side_effect=fetch_metadata_stub,
)

Expand Down Expand Up @@ -380,7 +380,7 @@ def parse_track_event(*_):

seed_data = seed_contract_data(task, celery_app_contracts, web3)
mocker.patch(
"src.utils.ipfs_lib.IPFSClient._get_gateway_endpoints",
"src.utils.cid_metadata_client.CIDMetadataClient._get_gateway_endpoints",
return_value=["https://test-content-node.audius.co"],
autospec=True,
)
Expand Down Expand Up @@ -436,7 +436,7 @@ def test_index_operations_indexing_error_on_commit(

seed_data = seed_contract_data(task, celery_app_contracts, web3)
mocker.patch(
"src.utils.ipfs_lib.IPFSClient._get_gateway_endpoints",
"src.utils.cid_metadata_client.CIDMetadataClient._get_gateway_endpoints",
return_value=["https://test-content-node.audius.co"],
autospec=True,
)
Expand Down Expand Up @@ -508,7 +508,7 @@ def test_index_operations_skip_block(celery_app, celery_app_contracts, mocker):

seed_data = seed_contract_data(task, celery_app_contracts, web3)
mocker.patch(
"src.utils.ipfs_lib.IPFSClient._get_gateway_endpoints",
"src.utils.cid_metadata_client.CIDMetadataClient._get_gateway_endpoints",
return_value=["https://test-content-node.audius.co"],
autospec=True,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import random
from datetime import datetime

from integration_tests.challenges.index_helpers import AttrDict, IPFSClient, UpdateTask
from integration_tests.challenges.index_helpers import (
AttrDict,
CIDMetadataClient,
UpdateTask,
)
from src.challenges.challenge_event_bus import setup_challenge_bus
from src.models import Block, Playlist, SkippedTransaction, SkippedTransactionLevel
from src.tasks.playlists import (
Expand Down Expand Up @@ -133,10 +137,10 @@ def test_index_playlist(app):
"""Tests that playlists are indexed correctly"""
with app.app_context():
db = get_db()
ipfs_client = IPFSClient({})
cid_metadata_client = CIDMetadataClient({})
web3 = Web3()
challenge_event_bus = setup_challenge_bus()
update_task = UpdateTask(ipfs_client, web3, challenge_event_bus)
update_task = UpdateTask(cid_metadata_client, web3, challenge_event_bus)

with db.scoped_session() as session:
# ================= Test playlist_created Event =================
Expand Down Expand Up @@ -344,10 +348,10 @@ def test_playlist_indexing_skip_tx(app, mocker):
"""Tests that playlists skip cursed txs without throwing an error and are able to process other tx in block"""
with app.app_context():
db = get_db()
ipfs_client = IPFSClient({})
cid_metadata_client = CIDMetadataClient({})
web3 = Web3()
challenge_event_bus = setup_challenge_bus()
update_task = UpdateTask(ipfs_client, web3, challenge_event_bus)
update_task = UpdateTask(cid_metadata_client, web3, challenge_event_bus)

class TestPlaylistTransaction:
pass
Expand Down
24 changes: 17 additions & 7 deletions discovery-provider/integration_tests/tasks/test_index_tracks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
from datetime import datetime
from unittest.mock import patch

from integration_tests.challenges.index_helpers import AttrDict, IPFSClient, UpdateTask
from integration_tests.challenges.index_helpers import (
AttrDict,
CIDMetadataClient,
UpdateTask,
)
from src.challenges.challenge_event_bus import ChallengeEventBus, setup_challenge_bus
from src.models import (
Block,
Expand Down Expand Up @@ -86,7 +90,7 @@ def get_delete_track_event():
+ b"\xba3\xf8\xc8<|%*{\x11\xc1\xe2/\xd7\xee\xd7q"
)

ipfs_client = IPFSClient(
cid_metadata_client = CIDMetadataClient(
{
multihash: {
"owner_id": 1,
Expand Down Expand Up @@ -194,7 +198,7 @@ def test_index_tracks(mock_index_task, app):
db = get_db()
challenge_event_bus: ChallengeEventBus = setup_challenge_bus()
web3 = Web3()
update_task = UpdateTask(ipfs_client, web3, challenge_event_bus)
update_task = UpdateTask(cid_metadata_client, web3, challenge_event_bus)

pending_track_routes = []

Expand Down Expand Up @@ -250,7 +254,9 @@ def test_index_tracks(mock_index_task, app):
b"@\xfe\x1f\x02\xf3i%\xa5+\xec\x8dh\x82\xc5}"
+ b"\x17\x91\xb9\xa1\x8dg j\xc0\xcd\x879K\x80\xf2\xdbg"
)
track_metadata = update_task.ipfs_client.get_metadata(entry_multihash, "", "")
track_metadata = update_task.cid_metadata_client.get_metadata(
entry_multihash, "", ""
)

track_record = parse_track_event(
None, # self - not used
Expand Down Expand Up @@ -330,7 +336,9 @@ def test_index_tracks(mock_index_task, app):
b"\x93\x7f\xa2\xe6\xf0\xe5\xb5f\xca\x14(4m.B"
+ b"\xba3\xf8\xc8<|%*{\x11\xc1\xe2/\xd7\xee\xd7q"
)
track_metadata = update_task.ipfs_client.get_metadata(entry_multihash, "", "")
track_metadata = update_task.cid_metadata_client.get_metadata(
entry_multihash, "", ""
)

parse_track_event(
None,
Expand Down Expand Up @@ -396,7 +404,9 @@ def test_index_tracks(mock_index_task, app):
b"@\xfe\x1f\x02\xf3i%\xa5+\xec\x8dh\x82\xc5}"
+ b"\x17\x91\xb9\xa1\x8dg j\xc0\xcd\x879K\x80\xf2\xdbg"
)
track_metadata = update_task.ipfs_client.get_metadata(entry_multihash, "", "")
track_metadata = update_task.cid_metadata_client.get_metadata(
entry_multihash, "", ""
)

parse_track_event(
None,
Expand Down Expand Up @@ -530,7 +540,7 @@ def test_track_indexing_skip_tx(app, mocker):
db = get_db()
challenge_event_bus: ChallengeEventBus = setup_challenge_bus()
web3 = Web3()
update_task = UpdateTask(ipfs_client, web3, challenge_event_bus)
update_task = UpdateTask(cid_metadata_client, web3, challenge_event_bus)

class TestTrackTransaction:
pass
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from datetime import datetime

from integration_tests.challenges.index_helpers import AttrDict, IPFSClient, UpdateTask
from integration_tests.challenges.index_helpers import (
AttrDict,
CIDMetadataClient,
UpdateTask,
)
from src.challenges.challenge_event_bus import setup_challenge_bus
from src.models import (
Block,
Expand All @@ -20,10 +24,10 @@ def test_user_replica_set_indexing_skip_tx(app, mocker):
"""Tests that URSM indexing skips cursed txs without throwing an error and are able to process other tx in block"""
with app.app_context():
db = get_db()
ipfs_client = IPFSClient({})
cid_metadata_client = CIDMetadataClient({})
web3 = Web3()
challenge_event_bus = setup_challenge_bus()
update_task = UpdateTask(ipfs_client, web3, challenge_event_bus)
update_task = UpdateTask(cid_metadata_client, web3, challenge_event_bus)

class TestUserReplicaSetTransaction:
pass
Expand Down
14 changes: 8 additions & 6 deletions discovery-provider/integration_tests/tasks/test_index_users.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from datetime import datetime
from unittest import mock

from integration_tests.challenges.index_helpers import AttrDict, IPFSClient
from integration_tests.challenges.index_helpers import AttrDict, CIDMetadataClient
from src.challenges.challenge_event import ChallengeEvent
from src.database_task import DatabaseTask
from src.models import (
Expand Down Expand Up @@ -142,7 +142,7 @@ def get_update_creator_node_endpoint_event():
+ b"\x93\xdf\x9bf?\xe7h\xb3y\xa6\x19\x0c\x81\xb0"
)

ipfs_client = IPFSClient(
cid_metadata_client = CIDMetadataClient(
{
multihash: {
"is_creator": True,
Expand Down Expand Up @@ -216,7 +216,7 @@ def test_index_users(bus_mock: mock.MagicMock, app):
web3 = Web3()
bus_mock(redis)
update_task = DatabaseTask(
ipfs_client=ipfs_client,
cid_metadata_client=cid_metadata_client,
web3=web3,
challenge_event_bus=bus_mock,
redis=redis,
Expand Down Expand Up @@ -467,7 +467,7 @@ def test_index_users(bus_mock: mock.MagicMock, app):
entry, # Contains the event args used for updating
event_type, # String that should one of user_event_types_lookup
user_record, # User ORM instance
update_task.ipfs_client.get_metadata(
update_task.cid_metadata_client.get_metadata(
helpers.multihash_digest_to_cid(entry.args._multihashDigest),
user_metadata_format,
"",
Expand All @@ -477,7 +477,9 @@ def test_index_users(bus_mock: mock.MagicMock, app):
session.flush()

entry_multihash = helpers.multihash_digest_to_cid(entry.args._multihashDigest)
ipfs_metadata = update_task.ipfs_client.get_metadata(entry_multihash, "", "")
ipfs_metadata = update_task.cid_metadata_client.get_metadata(
entry_multihash, "", ""
)

assert user_record.profile_picture == ipfs_metadata["profile_picture"]
assert user_record.cover_photo == ipfs_metadata["cover_photo"]
Expand Down Expand Up @@ -566,7 +568,7 @@ def test_user_indexing_skip_tx(bus_mock: mock.MagicMock, app, mocker):
web3 = Web3()
bus_mock(redis)
update_task = DatabaseTask(
ipfs_client=ipfs_client,
cid_metadata_client=cid_metadata_client,
web3=web3,
challenge_event_bus=bus_mock,
redis=redis,
Expand Down
8 changes: 4 additions & 4 deletions discovery-provider/src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
from src.solana.solana_client_manager import SolanaClientManager
from src.tasks import celery_app
from src.utils import helpers
from src.utils.cid_metadata_client import CIDMetadataClient
from src.utils.config import ConfigIni, config_files, shared_config
from src.utils.ipfs_lib import IPFSClient
from src.utils.multi_provider import MultiProvider
from src.utils.redis_metrics import METRICS_INTERVAL, SYNCHRONIZE_METRICS_INTERVAL
from src.utils.session_manager import SessionManager
Expand Down Expand Up @@ -529,8 +529,8 @@ def configure_celery(celery, test_config=None):
# Initialize Redis connection
redis_inst = redis.Redis.from_url(url=redis_url)

# Initialize IPFS client for celery task context
ipfs_client = IPFSClient(
# Initialize CIDMetadataClient for celery task context
cid_metadata_client = CIDMetadataClient(
eth_web3,
shared_config,
redis_inst,
Expand Down Expand Up @@ -573,7 +573,7 @@ def __init__(self, *args, **kwargs):
abi_values=abi_values,
eth_abi_values=eth_abi_values,
shared_config=shared_config,
ipfs_client=ipfs_client,
cid_metadata_client=cid_metadata_client,
redis=redis_inst,
eth_web3_provider=eth_web3,
solana_client_manager=solana_client_manager,
Expand Down
8 changes: 4 additions & 4 deletions discovery-provider/src/database_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def __init__(
abi_values=None,
eth_abi_values=None,
shared_config=None,
ipfs_client=None,
cid_metadata_client=None,
redis=None,
eth_web3_provider=None,
solana_client_manager=None,
Expand All @@ -23,7 +23,7 @@ def __init__(
self._abi_values = abi_values
self._eth_abi_values = eth_abi_values
self._shared_config = shared_config
self._ipfs_client = ipfs_client
self._cid_metadata_client = cid_metadata_client
self._redis = redis
self._eth_web3_provider = eth_web3_provider
self._solana_client_manager = solana_client_manager
Expand All @@ -50,8 +50,8 @@ def shared_config(self):
return self._shared_config

@property
def ipfs_client(self):
return self._ipfs_client
def cid_metadata_client(self):
return self._cid_metadata_client

@property
def redis(self) -> Redis:
Expand Down
32 changes: 14 additions & 18 deletions discovery-provider/src/tasks/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,15 +335,13 @@ def fetch_cid_metadata(
# first attempt - fetch all CIDs from replica set
try:
cid_metadata.update(
asyncio.run(
update_task.ipfs_client.fetch_metadata_from_gateway_endpoints(
cid_metadata.keys(),
cids_txhash_set,
cid_to_user_id,
user_to_replica_set,
cid_type,
should_fetch_from_replica_set=True,
)
update_task.cid_metadata_client.fetch_metadata_from_gateway_endpoints(
cid_metadata.keys(),
cids_txhash_set,
cid_to_user_id,
user_to_replica_set,
cid_type,
should_fetch_from_replica_set=True,
)
)
except asyncio.TimeoutError:
Expand All @@ -353,15 +351,13 @@ def fetch_cid_metadata(
# second attempt - fetch missing CIDs from other cnodes
if len(cid_metadata) != len(cids_txhash_set):
cid_metadata.update(
asyncio.run(
update_task.ipfs_client.fetch_metadata_from_gateway_endpoints(
cid_metadata.keys(),
cids_txhash_set,
cid_to_user_id,
user_to_replica_set,
cid_type,
should_fetch_from_replica_set=False,
)
update_task.cid_metadata_client.fetch_metadata_from_gateway_endpoints(
cid_metadata.keys(),
cids_txhash_set,
cid_to_user_id,
user_to_replica_set,
cid_type,
should_fetch_from_replica_set=False,
)
)

Expand Down
1 change: 1 addition & 0 deletions discovery-provider/src/tasks/index_blacklist.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ def index_blocks(self, db, blocks_list):
ipld_blacklist_factory_txs,
block_number,
block_timestamp,
get_contract_addresses(),
)

# Add the block number of the most recently processed ipld block to redis
Expand Down
6 changes: 3 additions & 3 deletions discovery-provider/src/tasks/index_network_peers.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def update_network_peers(self):
# Details regarding custom task context can be found in wiki
# Custom Task definition can be found in src/app.py
redis = update_network_peers.redis
ipfs_client = update_network_peers.ipfs_client
cid_metadata_client = update_network_peers.cid_metadata_client
# Define lock acquired boolean
have_lock = False
# Define redis lock object
Expand Down Expand Up @@ -87,9 +87,9 @@ def update_network_peers(self):

logger.info(f"index_network_peers.py | All known peers {all_peers}")
peers_list = list(all_peers)
# Update creator node url list in IPFS Client
# Update creator node url list in CID Metadata Client
# This list of known nodes is used to traverse and retrieve metadata from gateways
ipfs_client.update_cnode_urls(peers_list)
cid_metadata_client.update_cnode_urls(peers_list)
else:
logger.info(
"index_network_peers.py | Failed to acquire update_network_peers"
Expand Down

0 comments on commit 0aa7336

Please sign in to comment.