Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 0 additions & 132 deletions packages/discovery-provider/src/tasks/entity_manager/entities/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from src.models.users.user_events import UserEvent
from src.models.users.user_payout_wallet_history import UserPayoutWalletHistory
from src.queries.get_balances import enqueue_immediate_balance_refresh
from src.solana.solana_client_manager import SolanaClientManager
from src.solana.solana_helpers import SPL_TOKEN_ID
from src.tasks.entity_manager.utils import (
CHARACTER_LIMIT_USER_BIO,
Expand Down Expand Up @@ -117,7 +116,6 @@ def validate_user_tx(params: ManageEntityParameters):

def validate_user_metadata(
session: Session,
solana_client_manager: SolanaClientManager,
user_record: User,
user_metadata: Dict,
):
Expand Down Expand Up @@ -218,7 +216,6 @@ def create_user(
if user_metadata is not None:
validate_user_metadata(
params.session,
params.solana_client_manager,
user_record,
user_metadata,
)
Expand Down Expand Up @@ -260,7 +257,6 @@ def update_user(

validate_user_metadata(
params.session,
params.solana_client_manager,
user_record,
params.metadata,
)
Expand Down Expand Up @@ -312,8 +308,6 @@ def update_user_metadata(
user_record: User, metadata: Dict, params: ManageEntityParameters
):
session = params.session
redis = params.redis
web3 = params.web3
challenge_event_bus = params.challenge_bus
# Iterate over the user_record keys
user_record_attributes = user_record.get_attributes_dict()
Expand All @@ -335,51 +329,6 @@ def update_user_metadata(
if user_record.verified_with_tiktok:
user_record.tiktok_handle = user_record.handle

if "collectibles" in metadata:
# Dual-write for collectibles data to support legacy indexing
# TODO: Remove after clients updated to use new transactions
# https://linear.app/audius/issue/PAY-3894/remove-collection-and-other-cid-metadata-indexing
collectibles = Collectibles(
user_id=user_record.user_id,
data=metadata["collectibles"],
blockhash=params.event_blockhash,
blocknumber=params.block_number,
)

# We can just add_record here. Outer EM logic will take care
# of deleting previous record if it exists
params.add_record(user_record.user_id, collectibles, EntityType.COLLECTIBLES)

if (
metadata["collectibles"]
and isinstance(metadata["collectibles"], dict)
and metadata["collectibles"].items()
):
user_record.has_collectibles = True
else:
user_record.has_collectibles = False

if "associated_wallets" in metadata:
update_user_associated_wallets(
session,
web3,
redis,
user_record,
metadata["associated_wallets"],
"eth",
params,
)

if "associated_sol_wallets" in metadata:
update_user_associated_wallets(
session,
web3,
redis,
user_record,
metadata["associated_sol_wallets"],
"sol",
params,
)
if "events" in metadata and metadata["events"]:
update_user_events(user_record, metadata["events"], challenge_event_bus, params)

Expand Down Expand Up @@ -509,87 +458,6 @@ def update_user_events(
raise e


def update_user_associated_wallets(
session, web3, redis, user_record, associated_wallets, chain, params
):
"""Updates the user associated wallets table"""
try:
if not isinstance(associated_wallets, dict):
# With malformed associated wallets, we update the associated wallets
# to be an empty dict. This has the effect of generating new rows for the
# already associated wallets and marking them as deleted.
associated_wallets = {}
previous_wallets = []
for _, wallet in params.existing_records[EntityType.ASSOCIATED_WALLET].items():
if wallet.chain == chain and wallet.user_id == user_record.user_id:
previous_wallets.append(wallet)

# Verify the wallet signatures and create the user id to wallet associations
current_wallets = []
for associated_wallet, wallet_metadata in associated_wallets.items():
if "signature" not in wallet_metadata or not isinstance(
wallet_metadata["signature"], str
):
continue
is_valid_signature = validate_signature(
chain,
web3,
user_record.user_id,
associated_wallet,
wallet_metadata["signature"],
)

if is_valid_signature:
# Check that the wallet doesn't already exist
associated_wallet_entry = AssociatedWallet(
user_id=user_record.user_id,
wallet=associated_wallet,
chain=chain,
is_current=True,
is_delete=False,
blocknumber=user_record.blocknumber,
blockhash=user_record.blockhash,
)
current_wallets.append(associated_wallet_entry)

# Create wallet address sets for each list
previous_wallets_set = set([wallet.wallet for wallet in previous_wallets])
current_wallets_set = set()
for wallet in current_wallets:
# Check and throw if current_wallets has duplicate wallet addresses
if wallet.wallet in current_wallets_set:
raise Exception("Duplicate wallet in list of current wallets")

current_wallets_set.add(wallet.wallet)

# Get the net new and removed wallet addresses
added_wallets_set = current_wallets_set - previous_wallets_set
removed_wallets_set = previous_wallets_set - current_wallets_set

# Make the added and removed wallet lists for updates
added_wallets = [
wallet for wallet in current_wallets if wallet.wallet in added_wallets_set
]
removed_wallets = [
wallet
for wallet in previous_wallets
if wallet.wallet in removed_wallets_set
]

if added_wallets or removed_wallets:
for wallet in added_wallets:
session.add(wallet)
for wallet in removed_wallets:
session.delete(wallet)

enqueue_immediate_balance_refresh(redis, [user_record.user_id])
except Exception as e:
logger.error(
f"index.py | users.py | Fatal updating user associated wallets while indexing {e}",
exc_info=True,
)


def add_associated_wallet(
params: ManageEntityParameters,
):
Expand Down