Skip to content

Commit

Permalink
Add UserBalanceChange table and add code to handle tier change notifi…
Browse files Browse the repository at this point in the history
…cations (#1930)

* Add UserBalanceChange table and add code to handle tier change notifications

* Update user balance change calculation
  • Loading branch information
Kyle-Shanks committed Nov 22, 2021
1 parent 8feda2f commit 3dbb5a1
Show file tree
Hide file tree
Showing 13 changed files with 234 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""create_user_balance_changes_table
Revision ID: 591a66d44e5b
Revises: e2a8aea2e2e1
Create Date: 2021-10-01 16:24:24.444296
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '591a66d44e5b'
down_revision = 'e2a8aea2e2e1'
branch_labels = None
depends_on = None


def upgrade():
op.create_table(
"user_balance_changes",
sa.Column("user_id", sa.Integer(), nullable=False, primary_key=True),
sa.Column("blocknumber", sa.Integer(), nullable=False),
sa.Column("current_balance", sa.String(), nullable=False),
sa.Column("previous_balance", sa.String(), nullable=False),
sa.Column("created_at", sa.DateTime(), nullable=False, default=sa.func.now()),
sa.Column("updated_at", sa.DateTime(), nullable=False, default=sa.func.now()),
)


def downgrade():
op.drop_table('user_balance_changes')
2 changes: 2 additions & 0 deletions discovery-provider/src/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
URSMContentNode,
User,
UserBalance,
UserBalanceChange,
UserChallenge,
)
from .related_artist import RelatedArtist
Expand Down Expand Up @@ -111,6 +112,7 @@
"URSMContentNode",
"User",
"UserBalance",
"UserBalanceChange",
"UserChallenge",
"UserBankTransaction",
"UserBankAccount",
Expand Down
58 changes: 39 additions & 19 deletions discovery-provider/src/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ def __repr__(self):
is_current={self.is_current},\
is_delete={self.is_delete},\
updated_at={self.updated_at},\
created_at={self.created_at}>"
created_at={self.created_at})>"


class RepostType(str, enum.Enum):
Expand Down Expand Up @@ -470,7 +470,7 @@ def __repr__(self):
followee_user_id={self.followee_user_id},\
is_current={self.is_current},\
is_delete={self.is_delete},\
created_at={self.created_at}>"
created_at={self.created_at})>"


class SaveType(str, enum.Enum):
Expand Down Expand Up @@ -505,7 +505,7 @@ def __repr__(self):
created_at={self.created_at},\
save_type={self.save_type},\
is_current={self.is_current},\
is_delete={self.is_delete}>"
is_delete={self.is_delete})>"


class Stem(Base):
Expand All @@ -529,7 +529,7 @@ class Remix(Base):

def __repr__(self):
return f"<Remix(parent_track_id={self.parent_track_id},\
child_track_id={self.child_track_id}>"
child_track_id={self.child_track_id})>"


class Play(Base):
Expand Down Expand Up @@ -565,7 +565,7 @@ def __repr__(self):
slot={self.slot}\
signature={self.signature}\
updated_at={self.updated_at}\
created_at={self.created_at}>"
created_at={self.created_at})>"


class AggregatePlays(Base):
Expand All @@ -579,7 +579,7 @@ class AggregatePlays(Base):
def __repr__(self):
return f"<AggregatePlays(\
play_item_id={self.play_item_id},\
count={self.count}>"
count={self.count})>"


class RouteMetrics(Base):
Expand Down Expand Up @@ -763,7 +763,7 @@ def __repr__(self):
return f"<RouteMetricsDayMatview(\
unique_count={self.unique_count},\
count={self.count},\
time={self.time}>"
time={self.time})>"


class RouteMetricsMonthMatview(Base):
Expand All @@ -777,7 +777,7 @@ def __repr__(self):
return f"<RouteMetricsMonthMatview(\
unique_count={self.unique_count},\
count={self.count},\
time={self.time}>"
time={self.time})>"


class RouteMetricsTrailingWeek(Base):
Expand All @@ -790,7 +790,7 @@ class RouteMetricsTrailingWeek(Base):
def __repr__(self):
return f"<RouteMetricsTrailingWeek(\
unique_count={self.unique_count},\
count={self.count}>"
count={self.count})>"


class RouteMetricsTrailingMonth(Base):
Expand All @@ -803,7 +803,7 @@ class RouteMetricsTrailingMonth(Base):
def __repr__(self):
return f"<RouteMetricsTrailingMonth(\
unique_count={self.unique_count},\
count={self.count}>"
count={self.count})>"


class RouteMetricsAllTime(Base):
Expand All @@ -816,7 +816,7 @@ class RouteMetricsAllTime(Base):
def __repr__(self):
return f"<RouteMetricsTrailingAllTime(\
unique_count={self.unique_count},\
count={self.count}>"
count={self.count})>"


class AppMetricsTrailingWeek(Base):
Expand All @@ -828,7 +828,7 @@ class AppMetricsTrailingWeek(Base):
def __repr__(self):
return f"<AppMetricsTrailingWeek(\
name={self.name},\
count={self.count}>"
count={self.count})>"


class AppMetricsTrailingMonth(Base):
Expand All @@ -840,7 +840,7 @@ class AppMetricsTrailingMonth(Base):
def __repr__(self):
return f"<AppMetricsTrailingMonth(\
name={self.name},\
count={self.count}>"
count={self.count})>"


class AppMetricsAllTime(Base):
Expand All @@ -852,7 +852,7 @@ class AppMetricsAllTime(Base):
def __repr__(self):
return f"<AppMetricsAllTime(\
name={self.name},\
count={self.count}>"
count={self.count})>"


class TagTrackUserMatview(Base):
Expand All @@ -868,7 +868,7 @@ def __repr__(self):
return f"<TagTrackUserMatview(\
tag={self.tag},\
track_id={self.track_id},\
owner_id={self.owner_id}>"
owner_id={self.owner_id})>"


class URSMContentNode(Base):
Expand Down Expand Up @@ -927,7 +927,27 @@ def __repr__(self):
balance={self.balance},\
associated_wallets_balance={self.associated_wallets_balance}\
associated_sol_wallets_balance={self.associated_sol_wallets_balance}\
waudio={self.waudio}>"
waudio={self.waudio})>"


class UserBalanceChange(Base):
__tablename__ = "user_balance_changes"

user_id = Column(Integer, nullable=False, primary_key=True)
blocknumber = Column(Integer, ForeignKey("blocks.number"), nullable=False)
current_balance = Column(String, nullable=False)
previous_balance = Column(String, nullable=False)
created_at = Column(DateTime, nullable=False, default=func.now())
updated_at = Column(
DateTime, nullable=False, default=func.now(), onupdate=func.now()
)

def __repr__(self):
return f"<UserBalanceChange(\
user_id={self.user_id},\
blocknumber={self.blocknumber},\
current_balance={self.current_balance},\
previous_balance={self.previous_balance})>"


class WalletChain(str, enum.Enum):
Expand Down Expand Up @@ -980,7 +1000,7 @@ def __repr__(self):
follower_count={self.follower_count},\
following_count={self.following_count},\
repost_count={self.repost_count},\
track_save_count={self.track_save_count}>"
track_save_count={self.track_save_count})>"


class AggregateTrack(Base):
Expand All @@ -996,7 +1016,7 @@ def __repr__(self):
return f"<AggregateTrack(\
track_id={self.track_id},\
repost_count={self.repost_count},\
save_count={self.save_count}>"
save_count={self.save_count})>"


class AggregatePlaylist(Base):
Expand All @@ -1014,7 +1034,7 @@ def __repr__(self):
playlist_id={self.playlist_id},\
is_album={self.is_album},\
repost_count={self.repost_count},\
save_count={self.save_count}>"
save_count={self.save_count})>"


class SkippedTransaction(Base):
Expand Down
43 changes: 43 additions & 0 deletions discovery-provider/src/queries/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
Remix,
AggregateUser,
ChallengeDisbursement,
UserBalanceChange,
)
from src.models.milestone import Milestone
from src.queries import response_name_constants as const
Expand Down Expand Up @@ -442,6 +443,48 @@ def notifications():
const.playlists
] = playlist_favorite_dict

#
# Query relevant tier change information
#
balance_change_query = session.query(UserBalanceChange)

# Impose min block number restriction
balance_change_query = balance_change_query.filter(
UserBalanceChange.blocknumber > min_block_number,
UserBalanceChange.blocknumber <= max_block_number,
)

balance_change_results = balance_change_query.all()
tier_change_notifications = []

for entry in balance_change_results:
prev = int(entry.previous_balance)
current = int(entry.current_balance)
# Check for a tier change and add to tier_change_notification
tier = None
if prev < 100000 <= current:
tier = 'platinum'
elif prev < 10000 <= current:
tier = 'gold'
elif prev < 100 <= current:
tier = 'silver'
elif prev < 10 <= current:
tier = 'bronze'

if tier is not None:
tier_change_notif = {
const.notification_type: const.notification_type_tier_change,
const.notification_blocknumber: entry.blocknumber,
const.notification_timestamp: datetime.now(),
const.notification_initiator: entry.user_id,
const.notification_metadata: {
const.notification_tier: tier,
},
}
tier_change_notifications.append(tier_change_notif)

notifications_unsorted.extend(tier_change_notifications)

#
# Query relevant repost information
#
Expand Down
3 changes: 3 additions & 0 deletions discovery-provider/src/queries/response_name_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
notification_type_remix_create = "RemixCreate"
notification_type_remix_cosign = "RemixCosign"
notification_type_playlist_update = "PlaylistUpdate"
notification_type_tier_change = "TierChange"

notification_blocknumber = "blocknumber"
notification_initiator = "initiator"
Expand All @@ -98,6 +99,8 @@
notification_playlist_update_timestamp = "playlist_update_timestamp"
notification_playlist_update_users = "playlist_update_users"

notification_tier = "tier"

# solana notification metadata
solana_notification_type = "type"
solana_notification_type_challenge_reward = "ChallengeReward"
Expand Down
39 changes: 36 additions & 3 deletions discovery-provider/src/tasks/cache_user_balance.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
from sqlalchemy import and_
from sqlalchemy.orm.session import Session
from spl.token.client import Token
from web3 import Web3
from solana.rpc.api import Client
from solana.publickey import PublicKey

from src.utils.session_manager import SessionManager
from src.app import eth_abi_values
from src.tasks.celery_app import celery
from src.models import UserBalance, User, AssociatedWallet, UserBankAccount
from src.models import UserBalance, UserBalanceChange, User, AssociatedWallet, UserBankAccount
from src.queries.get_balances import (
does_user_balance_need_refresh,
IMMEDIATE_REFRESH_REDIS_PREFIX,
Expand Down Expand Up @@ -74,7 +75,6 @@ def get_immediate_refresh_user_ids(redis: Redis) -> List[int]:
redis_user_ids = redis.smembers(IMMEDIATE_REFRESH_REDIS_PREFIX)
return [int(user_id.decode()) for user_id in redis_user_ids]


# *Explanation of user balance caching*
# In an effort to minimize eth calls, we look up users embedded in track metadata once per user,
# and current users (logged in dapp users, who might be changing their balance) on an interval.
Expand Down Expand Up @@ -272,9 +272,42 @@ def refresh_user_ids(
)
waudio_balance = bal_info["result"]["value"]["amount"]


# update the balance on the user model
user_balance = user_balances[user_id]
user_balance.balance = owner_wallet_balance

# Sol balances have 8 decimals, so they need to be increased to 18 to match eth balances
waudio_in_wei = int(waudio_balance) * (10 ** 10)
assoc_sol_balance_in_wei = associated_sol_balance * (10 ** 10)

user_waudio_in_wei = int(user_balance.waudio) * (10 ** 10)
user_assoc_sol_balance_in_wei = int(user_balance.associated_sol_wallets_balance) * (10 ** 10)

# Get values for user balance change
current_total_balance = (
owner_wallet_balance
+ associated_balance
+ waudio_in_wei
+ assoc_sol_balance_in_wei
)
prev_total_balance = (
int(user_balance.balance)
+ int(user_balance.associated_wallets_balance)
+ user_waudio_in_wei
+ user_assoc_sol_balance_in_wei
)

# Write to user_balance_changes table
session.add(
UserBalanceChange(
user_id=user_id,
blocknumber=Web3.eth.blockNumber,
current_balance=str(current_total_balance),
previous_balance=str(prev_total_balance),
)
)

user_balance.balance = str(owner_wallet_balance)
user_balance.associated_wallets_balance = str(associated_balance)
user_balance.waudio = waudio_balance
user_balance.associated_sol_wallets_balance = str(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ async function fetchNotificationMetadata (audius, userIds = [], notifications, f

for (let notification of notifications) {
switch (notification.type) {
case NotificationType.Follow: {
case NotificationType.Follow:
case NotificationType.ChallengeReward:
case NotificationType.TierChange: {
userIdsToFetch.push(
...notification.actions
.map(({ actionEntityId }) => actionEntityId).slice(0, USER_FETCH_LIMIT)
Expand Down

0 comments on commit 3dbb5a1

Please sign in to comment.