Skip to content

Commit

Permalink
[PAY-175][PAY-173][PAY-146] Discovery: Reaction Notifications (#3078)
Browse files Browse the repository at this point in the history
  • Loading branch information
piazzatron committed May 13, 2022
1 parent f07e199 commit ac2a331
Show file tree
Hide file tree
Showing 14 changed files with 241 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""Change reaction column names
Revision ID: 0d2067242dd5
Revises: f11f9e83b28b
Create Date: 2022-05-09 22:03:16.838837
"""
import inspect

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "0d2067242dd5"
down_revision = "f11f9e83b28b"
branch_labels = None
depends_on = None


def column_exists(table_name, column_name):
bind = op.get_context().bind
insp = sa.inspect(bind)
columns = insp.get_columns(table_name)
return any(c["name"] == column_name for c in columns)


def upgrade():
# Handle lack of idempotency for this migration
if column_exists("reactions", "entity_id"):
op.alter_column("reactions", "entity_id", new_column_name="reacted_to")
if column_exists("reactions", "entity_type"):
op.alter_column("reactions", "entity_type", new_column_name="reaction_type")
if column_exists("reactions", "reaction"):
op.alter_column("reactions", "reaction", new_column_name="reaction_value")
op.create_index(
op.f("ix_reactions_reacted_to_reaction_type"),
"reactions",
["reacted_to", "reaction_type"],
unique=False,
info={"if_not_exists": True},
)


def downgrade():
if column_exists("reactions", "reacted_to"):
op.alter_column("reactions", "reacted_to", new_column_name="entity_id")
if column_exists("reactions", "reaction_type"):
op.alter_column("reactions", "reaction_type", new_column_name="entity_type")
if column_exists("reactions", "reaction_value"):
op.alter_column("reactions", "reaction_value", new_column_name="reaction")
op.drop_index(
op.f("ix_reactions_reacted_to_reaction_type"),
table_name="reaction",
info={"if_exists": True},
)
2 changes: 2 additions & 0 deletions discovery-provider/src/api/v1/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from src.api.v1.models.users import ns as models_ns
from src.api.v1.playlists import full_ns as full_playlists_ns
from src.api.v1.playlists import ns as playlists_ns
from src.api.v1.reactions import ns as reactions_ns
from src.api.v1.resolve import ns as resolve_ns
from src.api.v1.search import full_ns as full_search_ns
from src.api.v1.tips import full_ns as full_tips_ns
Expand Down Expand Up @@ -46,3 +47,4 @@ def specs_url(self):
api_v1_full.add_namespace(full_users_ns)
api_v1_full.add_namespace(full_search_ns)
api_v1_full.add_namespace(full_tips_ns)
api_v1_full.add_namespace(reactions_ns)
7 changes: 7 additions & 0 deletions discovery-provider/src/api/v1/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from src.queries.get_challenges import ChallengeResponse
from src.queries.get_support_for_user import SupportResponse
from src.queries.get_undisbursed_challenges import UndisbursedChallengeResponse
from src.queries.reactions import ReactionResponse
from src.utils.config import shared_config
from src.utils.helpers import decode_string_id, encode_int_id
from src.utils.spl_audio import to_wei_string
Expand Down Expand Up @@ -306,6 +307,12 @@ def extend_supporting(support: SupportResponse):
}


def extend_reaction(reaction: ReactionResponse):
new_reaction = reaction.copy()
new_reaction["sender_user_id"] = encode_int_id(reaction["sender_user_id"])
return new_reaction


def extend_tip(tip):
new_tip = tip.copy()
new_tip["amount"] = to_wei_string(tip["amount"])
Expand Down
13 changes: 13 additions & 0 deletions discovery-provider/src/api/v1/models/reactions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from flask_restx import fields

from .common import ns

reaction = ns.model(
"reaction",
{
"reaction_value": fields.String(required=True),
"reaction_type": fields.String(required=True),
"sender_user_id": fields.String(required=True),
"reacted_to": fields.String(required=True),
},
)
50 changes: 50 additions & 0 deletions discovery-provider/src/api/v1/reactions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from flask_restx import Namespace, Resource, fields, marshal_with, reqparse
from src.api.v1.helpers import (
DescriptiveArgument,
extend_reaction,
make_response,
success_response,
)
from src.api.v1.models.reactions import reaction
from src.queries.reactions import get_reactions
from src.utils.db_session import get_db_read_replica
from src.utils.redis_cache import cache
from src.utils.redis_metrics import record_metrics

ns = Namespace("reactions", description="Reaction related operations")

get_reactions_parser = reqparse.RequestParser(argument_class=DescriptiveArgument)
get_reactions_parser.add_argument(
"type", required=False, description="The type of reactions for which to query."
)
get_reactions_parser.add_argument(
"tx_signatures",
required=True,
action="split",
description="The `reacted_to` transaction id(s) of the reactions in question.",
)

get_reactions_response = make_response(
"reactions", ns, fields.List(fields.Nested(reaction))
)


@ns.route("")
class BulkReactions(Resource):
@record_metrics
@ns.doc(
id="Bulk get Reactions",
description="Gets reactions by transaction_id and type",
responses={200: "Success", 400: "Bad request", 500: "Server error"},
)
@ns.expect(get_reactions_parser)
@marshal_with(get_reactions_response)
@cache(ttl_sec=5)
def get(self):
args = get_reactions_parser.parse_args()
tx_ids, type = args.get("tx_signatures"), args.get("type")
db = get_db_read_replica()
with db.scoped_session() as session:
reactions = get_reactions(session, tx_ids, type)
reactions = list(map(extend_reaction, reactions))
return success_response(reactions)
6 changes: 3 additions & 3 deletions discovery-provider/src/models/reaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ class Reaction(Base, RepresentableMixin):

id = Column(Integer, nullable=False, primary_key=True)
slot = Column(Integer, nullable=False)
reaction = Column(Integer, nullable=False)
reaction_value = Column(Integer, nullable=False)
sender_wallet = Column(String, nullable=False)
entity_type = Column(String, nullable=False)
entity_id = Column(String, nullable=False)
reaction_type = Column(String, nullable=False)
reacted_to = Column(String, nullable=False)
timestamp = Column(DateTime, nullable=False)
tx_signature = Column(String, nullable=True)
30 changes: 29 additions & 1 deletion discovery-provider/src/queries/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from redis import Redis
from sqlalchemy import desc
from src import api_helpers
from src.api.v1.users import User
from src.models import (
AggregateUser,
Block,
Expand All @@ -23,6 +24,7 @@
UserBalanceChange,
UserTip,
)
from src.models.reaction import Reaction
from src.queries import response_name_constants as const
from src.queries.get_prev_track_entries import get_prev_track_entries
from src.queries.query_helpers import (
Expand Down Expand Up @@ -1030,7 +1032,7 @@ def solana_notifications():
Response - Json object w/ the following fields
notifications: Array of notifications of shape:
type: 'ChallengeReward' | 'MilestoneListen' | 'SupporterRankUp'
type: 'ChallengeReward' | 'MilestoneListen' | 'SupporterRankUp' | 'Reaction'
slot: (int) slot number of notification
initiator: (int) the user id that caused this notification
metadata?: (any) additional information about the notification
Expand Down Expand Up @@ -1158,10 +1160,36 @@ def solana_notifications():
}
)

reaction_results: List[Reaction] = (
session.query(Reaction, User.user_id)
.join(User, User.wallet == Reaction.sender_wallet)
.filter(
Reaction.slot >= min_slot_number,
Reaction.slot <= max_slot_number,
User.is_current == True,
)
.all()
)

reactions = []
for (reaction, user_id) in reaction_results:
reactions.append(
{
const.solana_notification_type: const.solana_notification_type_reaction,
const.solana_notification_slot: reaction.slot,
const.notification_initiator: user_id,
const.solana_notification_metadata: {
const.solana_notification_reaction_type: reaction.reaction_type,
const.solana_notification_reaction_reaction_value: reaction.reaction_value,
const.solana_notification_reaction_reacted_to: reaction.reacted_to,
},
}
)
notifications_unsorted.extend(challenge_reward_notifications)
notifications_unsorted.extend(track_listen_milestones)
notifications_unsorted.extend(supporter_rank_ups)
notifications_unsorted.extend(tips)
notifications_unsorted.extend(reactions)

# Final sort
sorted_notifications = sorted(
Expand Down
39 changes: 39 additions & 0 deletions discovery-provider/src/queries/reactions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from typing import List, Optional, Tuple, TypedDict

from sqlalchemy.orm.session import Session
from src.models.models import User
from src.models.reaction import Reaction


class ReactionResponse(TypedDict):
reaction_value: int
reaction_type: str
reacted_to: str
sender_user_id: int


def get_reactions(
session: Session, transaction_ids: List[str], type: Optional[str]
) -> List[ReactionResponse]:
filters = [Reaction.reacted_to.in_(transaction_ids), User.is_current == True]
if type:
filters.append(Reaction.reaction_type == type)

results: List[Tuple[Reaction, int]] = (
session.query(Reaction, User.user_id)
.join(User, User.wallet == Reaction.sender_wallet)
.filter(
*filters,
)
.all()
)

return [
{
"reaction_value": r.reaction_value,
"reaction_type": r.reaction_type,
"reacted_to": r.reacted_to,
"sender_user_id": user_id,
}
for (r, user_id) in results
]
7 changes: 7 additions & 0 deletions discovery-provider/src/queries/response_name_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,5 +125,12 @@
solana_notification_tip_rank = "rank"
solana_notification_tip_amount = "amount"

solana_notification_type_reaction = "reaction"
solana_notification_reaction_type = "reaction_type"
solana_notification_reaction_type_tip = "tip"
solana_notification_reaction_reacted_to = "reacted_to"
solana_notification_reaction_reaction_value = "reaction_value"


# Trending
owner_follower_count = "owner_follower_count"
12 changes: 6 additions & 6 deletions discovery-provider/src/tasks/index_reactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@
class ReactionResponse(TypedDict):
id: int
slot: int
reaction: int
reactionValue: int
senderWallet: str
entityId: str
entityType: str
reactedTo: str
reactionType: str
createdAt: str
updatedAt: str

Expand All @@ -46,10 +46,10 @@ def reaction_dict_to_model(reaction: ReactionResponse) -> Union[Reaction, None]:
try:
reaction_model = Reaction(
slot=reaction["slot"],
reaction=reaction["reaction"],
reaction_value=reaction["reactionValue"],
sender_wallet=reaction["senderWallet"],
entity_type=reaction["entityType"],
entity_id=reaction["entityId"],
reaction_type=reaction["reactionType"],
reacted_to=reaction["reactedTo"],
timestamp=cast(datetime, reaction["createdAt"]),
tx_signature=None, # no tx_signature for now
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
'use strict'

module.exports = {
up: (queryInterface, Sequelize) => {
return queryInterface.renameColumn('Reactions', 'entityId', 'reactedTo')
.then(() => queryInterface.renameColumn('Reactions', 'entityType', 'reactionType'))
.then(() => queryInterface.renameColumn('Reactions', 'reaction', 'reactionValue'))
},

down: (queryInterface, Sequelize) => {
return queryInterface.renameColumn('Reactions', 'reactedTo', 'entityId')
.then(() => queryInterface.renameColumn('Reactions', 'reactionType', 'entityType'))
.then(() => queryInterface.renameColumn('Reactions', 'reactionValue', 'reaction'))
}
}
6 changes: 3 additions & 3 deletions identity-service/src/models/reaction.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@ module.exports = (sequelize, DataTypes) => {
type: DataTypes.INTEGER,
allowNull: false
},
reaction: {
reactionValue: {
type: DataTypes.INTEGER,
allowNull: false
},
senderWallet: {
type: DataTypes.STRING,
allowNull: false
},
entityId: {
reactedTo: {
type: DataTypes.STRING,
allowNull: false
},
entityType: {
reactionType: {
type: DataTypes.STRING,
allowNull: false
},
Expand Down
Loading

0 comments on commit ac2a331

Please sign in to comment.