Conversation
📝 WalkthroughWalkthroughThis PR implements an on-chain webhook event system with anti-sybil detection. It adds two database migrations to create anti-sybil detection tables ( Estimated code review effort🎯 4 (Complex) | ⏱️ ~70 minutes Possibly related PRs
Suggested labels
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Important Merge conflicts detected (Beta)
✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 24
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/alembic/versions/006_add_anti_sybil_tables.py`:
- Around line 91-124: Add a DB-level foreign key for sybil_appeals.flag_id to
enforce referential integrity: update the op.create_table call that defines
table sybil_appeals (the sa.Column("flag_id", ...) entry) to include a
ForeignKey pointing to sybil_flags.id (e.g., use sa.ForeignKey("sybil_flags.id",
ondelete="CASCADE") in the sa.Column) or, if you prefer to keep columns
unchanged, call op.create_foreign_key after creating sybil_appeals to reference
sybil_flags.id; ensure the constraint name is unique and choose an appropriate
ondelete behavior (CASCADE or RESTRICT) for your deletion semantics.
- Around line 148-167: The migration creates a redundant index for
wallet_funding_map.wallet: since the column is declared unique in the
op.create_table call (wallet_funding_map, column "wallet" with unique=True),
drop the explicit op.create_index("ix_wallet_funding_map_wallet",
"wallet_funding_map", ["wallet"]) call (the index name
ix_wallet_funding_map_wallet) to avoid duplicating the unique index; leave the
other indexes (ix_wallet_funding_map_funding_source and
ix_wallet_funding_map_user_id) intact.
In `@backend/alembic/versions/007_outbound_webhook_batching.py`:
- Around line 18-42: The migration uses PostgreSQL-only UUID and server-default
generation; replace postgresql.UUID(as_uuid=True) usages in upgrade() for the
"outbound_webhook_queue" table (columns "id" and "user_id") with the portable
GUID type from app.database (import GUID) and remove
server_default=sa.text("gen_random_uuid()") for "id" so UUIDs are generated in
application code (e.g., uuid.uuid4()) instead of DB defaults; keep the table
name outbound_webhook_queue and other columns as-is, and ensure any DB-specific
server_default for UUIDs is removed to restore SQLite compatibility.
In `@backend/app/api/anti_sybil.py`:
- Around line 205-216: The admin_list_appeals endpoint is comparing the string
parameter appeal_status to an Enum DB column (SybilAppealTable.status); change
appeal_status from str to Optional[AppealStatus] (or validate/convert the
incoming string to the AppealStatus enum) so FastAPI/Pydantic will enforce the
enum type and the DB comparison uses an enum value; update the function
signature in admin_list_appeals, add the AppealStatus import, and adjust any
callers/tests that pass raw strings or handle None accordingly.
- Around line 86-89: The db parameter in the my_flags endpoint lacks a type
annotation; update the my_flags signature to type-hint db as AsyncSession (from
sqlalchemy.ext.asyncio) and ensure AsyncSession is imported at the top of the
file; keep the dependency injection via db=Depends(get_db) but change the
annotation so the function reads with db: AsyncSession = Depends(get_db) and
maintain the return type of list[SybilFlagResponse].
- Around line 130-147: The admin_list_flags handler compares a plain str to an
Enum column (SybilFlagTable.flag_type), causing mismatches; fix by using the
FlagType enum for the query: import the FlagType enum and either change the
flag_type parameter to Optional[FlagType] (so FastAPI validates/coerces it) or,
if keeping a str param, convert/validate it with FlagType(flag_type) before
using it in the q.where(...) expression so the comparison is enum-to-enum
(reference symbols: admin_list_flags, flag_type param, SybilFlagTable.flag_type,
FlagType).
In `@backend/app/api/contributor_webhooks.py`:
- Around line 134-141: The endpoint get_webhook_stats currently calls
ContributorWebhookService.get_delivery_stats(webhook_id) without verifying
ownership or handling missing webhooks; update the endpoint to pass the
authenticated user_id (from get_current_user_id) into the service call (e.g.,
service.get_delivery_stats(webhook_id, user_id)), then update the
ContributorWebhookService.get_delivery_stats signature/implementation to accept
user_id, check that the webhook belongs to that user (raise WebhookNotFoundError
or a permission error if not found/not owned) before querying logs, and add
exception handling in get_webhook_stats to catch WebhookNotFoundError and raise
an HTTPException with status_code=404 so unauthorized users cannot view other
users' stats and missing webhooks return 404.
- Around line 157-165: The endpoint function test_webhook currently doesn't
handle WebhookNotFoundError thrown by ContributorWebhookService.test_webhook
causing a 500 instead of the documented 404; update test_webhook to catch
WebhookNotFoundError (from wherever it is defined) around the await
service.test_webhook(...) call and raise an HTTPException(status_code=404,
detail=...) or reformat the response accordingly so the OpenAPI 404 is honored;
ensure you reference test_webhook and ContributorWebhookService.test_webhook
when making the change so the handler is added in the correct function.
In `@backend/app/api/webhooks/solana.py`:
- Around line 44-109: Add idempotency for incoming transactions by checking and
recording processed transaction signatures before dispatching events: use the
`signature` extracted from each `tx` in the loop in
`backend/app/api/webhooks/solana.py`, query a persistent or fast cache
(DB/Redis) to see if the signature was already processed and skip/continue if
so, and after successfully calling `wh_service.dispatch_event` for the
tokenTransfers store the signature as processed (with an expiry if using cache).
Alternatively or additionally, make `wh_service.dispatch_event` idempotent by
having it ignore duplicate `tx_signature` values, but ensure at least one of
these mechanisms is implemented to prevent dispatching duplicate events for the
same `signature`.
- Around line 56-81: The code unconditionally dispatches both escrow and stake
events in the transfer handling (see wh_service.dispatch_event calls for
"escrow.locked", "stake.deposited", "escrow.released", "stake.withdrawn")
causing duplicate/incorrect events; update the transfer handling logic to
discriminate treasury escrow vs staking actions by inspecting transaction
metadata (e.g., memo field), specific mint id, sender/recipient patterns, or
other heuristics available in the webhook payload (use to_user, from_user, mint,
amount, signature, slot), and only call the appropriate dispatch_event for that
case; add a brief comment in the transfer-handling block explaining the chosen
discriminator and fallback behavior (log and skip ambiguous cases) so
stakeholders know why only one of the events is emitted.
- Around line 22-37: The helius_webhook handler lacks signature verification and
improperly returns 200 on JSON parse errors; fix by reading the raw request body
(await request.body()), obtain the X-Helius-Signature header, compute the
HMAC-SHA256 (or the algorithm Helius uses) over the raw body using the
configured webhook secret (e.g., HELIUS_WEBHOOK_SECRET), perform a constant-time
comparison against the header and return an HTTP 401/403 JSON response if
verification fails, then parse JSON only after verification; also change the
JSON parse error path in helius_webhook to return an HTTP 400 Bad Request JSON
response instead of HTTP 200.
In `@backend/app/models/anti_sybil.py`:
- Line 62: The PostgreSQL-only server_default using "'{}'::jsonb" on the details
Column breaks SQLite table creation; update the Column definition in the
AntiSybil model (the details = Column(...)) to use a dialect-compatible default
such as sa.text("'{}'") (or remove the cast) so the default works for both
PostgreSQL JSONB and SQLite JSON/text variants, ensuring the
JSONB().with_variant(sa.JSON, "sqlite") column retains cross-dialect
compatibility.
- Around line 79-102: SybilAppealTable.flag_id is missing a ForeignKey
constraint; update the flag_id Column in the SybilAppealTable class to reference
sybil_flags.id (e.g., add ForeignKey("sybil_flags.id", ondelete="CASCADE" or
desired behavior)) so DB-level referential integrity prevents orphaned appeals,
and ensure sqlalchemy.ForeignKey is imported where Column, GUID, SAEnum, etc.
are defined; keep the existing nullable=False and index=True settings.
In `@backend/app/models/contributor_webhook.py`:
- Around line 117-120: The module raises NameError because sa is not imported
where OutboundWebhookLogDB defines webhook_id using sa.ForeignKey; fix by adding
the missing import (e.g., add "import sqlalchemy as sa" near the top imports or
alternatively import ForeignKey directly via "from sqlalchemy import
ForeignKey") so that the webhook_id Column(...) call in OutboundWebhookLogDB can
resolve sa.ForeignKey.
In `@backend/app/services/anti_sybil_service.py`:
- Around line 529-600: The resolve_appeal and resolve_flag helpers currently
allow overwriting prior moderation data; update resolve_appeal (function
resolve_appeal) to first check the existing SybilAppealTable.status and reject
if it is not AppealStatus.PENDING (raise an error), and validate the incoming
status is a terminal value (only allow APPROVED or REJECTED, disallow setting
back to PENDING); similarly update resolve_flag (function resolve_flag) to check
SybilFlagTable.resolved and raise if already True to prevent re-resolving.
Preserve the existing audit_event calls after successful commits and ensure
reviewer/resolver fields, notes, and timestamps are only set when the checks
pass.
- Around line 282-303: record_wallet_funding currently only inserts and never
updates WalletFundingMapTable, so wallets first saved with funding_source=None
(or wrong user_id/funding_source) never get corrected and check_wallet_cluster()
sees stale data; change record_wallet_funding to perform an upsert: after
selecting the existing row from WalletFundingMapTable (using wallet) update its
funding_source and user_id when the incoming values differ (including updating
None→value) and commit, otherwise insert as today; apply the same upsert logic
to the analogous function/block referenced (the later block handling user/funder
mapping) so corrections and late-discovered funding sources are persisted.
In `@backend/app/services/contributor_webhook_service.py`:
- Around line 329-379: get_delivery_stats currently accepts only webhook_id and
reads OutboundWebhookLogDB rows directly, creating an authorization gap unlike
test_webhook which enforces ownership; change get_delivery_stats to accept the
caller's user_id (or fetch the ContributorWebhook record and verify its owner)
and explicitly filter OutboundWebhookLogDB queries by the verified webhook owner
(or raise a permission error) before returning WebhookDeliveryStats; ensure you
reuse the same UUID(webhook_id) conversion and apply the ownership check prior
to counting/slicing history so callers cannot read other subscribers' delivery
history.
- Around line 205-221: periodic_batch_dispatch currently selects unprocessed
OutboundWebhookQueueDB rows, sets processed=True and commits before any HTTP
delivery, which causes lost or duplicated deliveries; instead perform an atomic
claim for work (e.g., using an UPDATE ... WHERE processed=False ... RETURNING
id,... or SELECT ... FOR UPDATE SKIP LOCKED inside a transaction) so rows are
marked/locked as claimed in the same DB operation/transaction, then commit only
after successful delivery and mark processed=True (or delete/archival)
post-delivery; update the logic in periodic_batch_dispatch (and the similar
blocks around the referenced code paths) to use a DB-level claim pattern and
avoid committing before HTTP delivery.
- Around line 329-335: The service methods (e.g., get_delivery_stats) currently
call UUID(webhook_id) directly on an unvalidated string which raises ValueError
and produces 500s for malformed IDs; wrap the conversion in a try/except or
accept a UUID-typed parameter at the API layer and return a controlled 400 on
failure: import uuid and replace UUID(webhook_id) with a safe parse (try: uid =
uuid.UUID(webhook_id) except ValueError: raise HTTPException(status_code=400,
detail="invalid webhook_id")) and use uid in the query (same fix for the other
method referenced around lines 381-396); this ensures proper 4xx error handling
and keeps the UUID object typed for the OutboundWebhookLogDB.where() call.
- Around line 57-81: The file is missing imports for WebhookPayload and the uuid
module causing NameError in _build_single_payload and _build_batch_payload;
update the import block to import WebhookPayload (from
app.models.contributor_webhook) and either add import uuid or replace uses of
uuid.UUID/getnode/uuid4 with the already-imported UUID where appropriate; ensure
_build_single_payload and _build_batch_payload reference the newly-imported
WebhookPayload and that batch_id generation uses a valid uuid call (e.g.,
uuid.getnode()/uuid.UUID()/uuid.uuid4()) after adding the uuid module import.
- Around line 245-252: Concurrent calls to _deliver_batch() use the shared
self._db session causing transaction corruption; change each delivery task to
use its own AsyncSession instead of the shared session by instantiating a new
session from your async_sessionmaker (e.g., self._async_sessionmaker()) inside
the task creation loop and pass that session into _deliver_batch and
_record_delivery (update those methods to accept a session parameter and use it
for execute/add/commit), ensuring each task commits on its own session and
closes it when done; if refactoring is not possible, serialize DB access by
running deliveries sequentially or protect DB-mutating calls with an
asyncio.Lock in _record_delivery.
In `@backend/tests/test_anti_sybil.py`:
- Around line 665-676: Replace the deprecated
asyncio.get_event_loop().run_until_complete(_init()) call with
asyncio.run(_init()) and ensure test isolation by adding cleanup to drop the
tables after the test: call asyncio.run to create the tables via the existing
_init() (which uses engine and Base), run the test client
(TestClient(test_app,...)), then call a teardown coroutine that uses async with
engine.begin() as conn: await conn.run_sync(Base.metadata.drop_all) to remove
created tables (or use a dedicated test-only engine/fixture that creates and
disposes its own Base metadata) so the test does not pollute the shared
in-memory DB.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 851a0bcd-283f-42e9-999d-6d0093790418
📒 Files selected for processing (19)
backend/alembic/versions/006_add_anti_sybil_tables.pybackend/alembic/versions/007_outbound_webhook_batching.pybackend/app/api/anti_sybil.pybackend/app/api/bounties.pybackend/app/api/contributor_webhooks.pybackend/app/api/webhooks/solana.pybackend/app/core/anti_sybil_config.pybackend/app/database.pybackend/app/main.pybackend/app/models/anti_sybil.pybackend/app/models/contributor_webhook.pybackend/app/services/anti_sybil_service.pybackend/app/services/auth_service.pybackend/app/services/contributor_webhook_service.pybackend/app/services/reputation_service.pybackend/docs/webhooks_catalog.mdbackend/tests/test_anti_sybil.pyfrontend/src/components/ContributorDashboard.tsxfrontend/src/components/webhooks/WebhookSettings.tsx
| # sybil_appeals | ||
| op.create_table( | ||
| "sybil_appeals", | ||
| sa.Column("id", sa.CHAR(36), primary_key=True), | ||
| sa.Column("user_id", sa.String(100), nullable=False), | ||
| sa.Column("flag_id", sa.CHAR(36), nullable=False), | ||
| sa.Column("reason", sa.Text(), nullable=False), | ||
| sa.Column( | ||
| "status", | ||
| sa.Enum( | ||
| "pending", | ||
| "approved", | ||
| "rejected", | ||
| name="appeal_status_enum", | ||
| create_constraint=False, | ||
| ), | ||
| nullable=False, | ||
| server_default="pending", | ||
| ), | ||
| sa.Column("reviewer_note", sa.Text(), nullable=True), | ||
| sa.Column("reviewed_by", sa.String(100), nullable=True), | ||
| sa.Column( | ||
| "created_at", | ||
| sa.DateTime(timezone=True), | ||
| nullable=False, | ||
| server_default=sa.func.now(), | ||
| ), | ||
| sa.Column("resolved_at", sa.DateTime(timezone=True), nullable=True), | ||
| ) | ||
| op.create_index("ix_sybil_appeals_user_id", "sybil_appeals", ["user_id"]) | ||
| op.create_index("ix_sybil_appeals_flag_id", "sybil_appeals", ["flag_id"]) | ||
| op.create_index( | ||
| "ix_sybil_appeals_user_flag", "sybil_appeals", ["user_id", "flag_id"] | ||
| ) |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Missing ForeignKey constraint on sybil_appeals.flag_id.
The flag_id column in sybil_appeals (line 96) lacks a ForeignKey constraint to sybil_flags.id. Without this:
- No referential integrity at the database level
- Appeals can reference non-existent flags
- Orphaned appeals remain when flags are deleted
♻️ Proposed fix: Add ForeignKey
- sa.Column("flag_id", sa.CHAR(36), nullable=False),
+ sa.Column("flag_id", sa.CHAR(36), sa.ForeignKey("sybil_flags.id", ondelete="CASCADE"), nullable=False),🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/alembic/versions/006_add_anti_sybil_tables.py` around lines 91 - 124,
Add a DB-level foreign key for sybil_appeals.flag_id to enforce referential
integrity: update the op.create_table call that defines table sybil_appeals (the
sa.Column("flag_id", ...) entry) to include a ForeignKey pointing to
sybil_flags.id (e.g., use sa.ForeignKey("sybil_flags.id", ondelete="CASCADE") in
the sa.Column) or, if you prefer to keep columns unchanged, call
op.create_foreign_key after creating sybil_appeals to reference sybil_flags.id;
ensure the constraint name is unique and choose an appropriate ondelete behavior
(CASCADE or RESTRICT) for your deletion semantics.
| op.create_table( | ||
| "wallet_funding_map", | ||
| sa.Column("id", sa.CHAR(36), primary_key=True), | ||
| sa.Column("wallet", sa.String(64), nullable=False, unique=True), | ||
| sa.Column("funding_source", sa.String(64), nullable=True), | ||
| sa.Column("user_id", sa.String(100), nullable=False), | ||
| sa.Column( | ||
| "created_at", | ||
| sa.DateTime(timezone=True), | ||
| nullable=False, | ||
| server_default=sa.func.now(), | ||
| ), | ||
| ) | ||
| op.create_index("ix_wallet_funding_map_wallet", "wallet_funding_map", ["wallet"]) | ||
| op.create_index( | ||
| "ix_wallet_funding_map_funding_source", | ||
| "wallet_funding_map", | ||
| ["funding_source"], | ||
| ) | ||
| op.create_index("ix_wallet_funding_map_user_id", "wallet_funding_map", ["user_id"]) |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Redundant index on wallet_funding_map.wallet column.
The wallet column is defined with unique=True (line 151), which automatically creates a unique index. Then line 161 explicitly creates another index ix_wallet_funding_map_wallet on the same column. This is redundant and wastes storage.
♻️ Proposed fix: Remove redundant index
op.create_index("ix_wallet_funding_map_wallet", "wallet_funding_map", ["wallet"])
- op.create_index(
- "ix_wallet_funding_map_funding_source",
+ op.create_index(
+ "ix_wallet_funding_map_funding_source",Actually, just remove line 161:
- op.create_index("ix_wallet_funding_map_wallet", "wallet_funding_map", ["wallet"])🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/alembic/versions/006_add_anti_sybil_tables.py` around lines 148 -
167, The migration creates a redundant index for wallet_funding_map.wallet:
since the column is declared unique in the op.create_table call
(wallet_funding_map, column "wallet" with unique=True), drop the explicit
op.create_index("ix_wallet_funding_map_wallet", "wallet_funding_map",
["wallet"]) call (the index name ix_wallet_funding_map_wallet) to avoid
duplicating the unique index; leave the other indexes
(ix_wallet_funding_map_funding_source and ix_wallet_funding_map_user_id) intact.
| def upgrade() -> None: | ||
| # outbound_webhook_queue | ||
| op.create_table( | ||
| "outbound_webhook_queue", | ||
| sa.Column( | ||
| "id", | ||
| postgresql.UUID(as_uuid=True), | ||
| primary_key=True, | ||
| server_default=sa.text("gen_random_uuid()"), | ||
| ), | ||
| sa.Column("event_type", sa.String(50), nullable=False), | ||
| sa.Column("user_id", postgresql.UUID(as_uuid=True), nullable=True), | ||
| sa.Column("payload", sa.Text, nullable=False), | ||
| sa.Column( | ||
| "created_at", | ||
| sa.DateTime(timezone=True), | ||
| nullable=False, | ||
| server_default=sa.text("now()"), | ||
| ), | ||
| sa.Column("processed", sa.Boolean(), nullable=False, server_default=sa.text("false")), | ||
| ) | ||
| op.create_index("ix_outbound_webhook_queue_user_id", "outbound_webhook_queue", ["user_id"]) | ||
| op.create_index("ix_outbound_webhook_queue_processed", "outbound_webhook_queue", ["processed"]) | ||
| op.create_index("ix_outbound_webhook_queue_created_at", "outbound_webhook_queue", ["created_at"]) | ||
|
|
There was a problem hiding this comment.
PostgreSQL-specific constructs break SQLite test compatibility.
This migration uses several PostgreSQL-only features:
postgresql.UUID(as_uuid=True)(lines 24, 29, 48, 54) - Not supported on SQLiteserver_default=sa.text("gen_random_uuid()")(lines 26, 50) - PostgreSQL 13+ function, doesn't exist in SQLite
The existing codebase uses GUID type from app.database (see backend/app/database.py lines 69-126) which handles cross-database UUID compatibility. The anti-sybil migration (006) correctly uses sa.CHAR(36) for UUIDs.
🐛 Proposed fix: Use portable UUID handling
-from sqlalchemy.dialects import postgresql
+# Remove postgresql import
def upgrade() -> None:
# outbound_webhook_queue
op.create_table(
"outbound_webhook_queue",
- sa.Column(
- "id",
- postgresql.UUID(as_uuid=True),
- primary_key=True,
- server_default=sa.text("gen_random_uuid()"),
- ),
+ sa.Column("id", sa.CHAR(36), primary_key=True),
sa.Column("event_type", sa.String(50), nullable=False),
- sa.Column("user_id", postgresql.UUID(as_uuid=True), nullable=True),
+ sa.Column("user_id", sa.CHAR(36), nullable=True),UUID generation should be handled in application code (Python's uuid.uuid4()) rather than database defaults for portability.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/alembic/versions/007_outbound_webhook_batching.py` around lines 18 -
42, The migration uses PostgreSQL-only UUID and server-default generation;
replace postgresql.UUID(as_uuid=True) usages in upgrade() for the
"outbound_webhook_queue" table (columns "id" and "user_id") with the portable
GUID type from app.database (import GUID) and remove
server_default=sa.text("gen_random_uuid()") for "id" so UUIDs are generated in
application code (e.g., uuid.uuid4()) instead of DB defaults; keep the table
name outbound_webhook_queue and other columns as-is, and ensure any DB-specific
server_default for UUIDs is removed to restore SQLite compatibility.
| async def my_flags( | ||
| user_id: str = Depends(get_current_user_id), | ||
| db=Depends(get_db), | ||
| ) -> list[SybilFlagResponse]: |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Missing type annotation for db parameter.
The db parameter lacks a type hint. For consistency with other endpoints and IDE support:
async def my_flags(
user_id: str = Depends(get_current_user_id),
- db=Depends(get_db),
+ db: AsyncSession = Depends(get_db),
) -> list[SybilFlagResponse]:Add import at top if needed: from sqlalchemy.ext.asyncio import AsyncSession
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/api/anti_sybil.py` around lines 86 - 89, The db parameter in the
my_flags endpoint lacks a type annotation; update the my_flags signature to
type-hint db as AsyncSession (from sqlalchemy.ext.asyncio) and ensure
AsyncSession is imported at the top of the file; keep the dependency injection
via db=Depends(get_db) but change the annotation so the function reads with db:
AsyncSession = Depends(get_db) and maintain the return type of
list[SybilFlagResponse].
| async def admin_list_flags( | ||
| user_id: Optional[str] = Query(None), | ||
| flag_type: Optional[str] = Query(None), | ||
| resolved: Optional[bool] = Query(None), | ||
| skip: Annotated[int, Query(ge=0)] = 0, | ||
| limit: Annotated[int, Query(ge=1, le=200)] = 50, | ||
| actor: str = Depends(require_admin), | ||
| db=Depends(get_db), | ||
| ) -> list[SybilFlagResponse]: | ||
| """List sybil flags with optional filters.""" | ||
| q = select(SybilFlagTable).order_by(SybilFlagTable.created_at.desc()) | ||
| if user_id: | ||
| q = q.where(SybilFlagTable.user_id == user_id) | ||
| if flag_type: | ||
| q = q.where(SybilFlagTable.flag_type == flag_type) | ||
| if resolved is not None: | ||
| q = q.where(SybilFlagTable.resolved.is_(resolved)) | ||
| q = q.offset(skip).limit(limit) |
There was a problem hiding this comment.
Enum filter comparison may fail: string vs Enum type mismatch.
The flag_type parameter is a str (line 132), but SybilFlagTable.flag_type is an Enum column. The comparison SybilFlagTable.flag_type == flag_type (line 144) compares an Enum column to a plain string, which may not match correctly depending on SQLAlchemy's handling.
Consider validating and converting the string to FlagType enum:
♻️ Proposed fix
+from app.models.anti_sybil import FlagType
+
`@router.get`(
"/admin/sybil/flags",
...
)
async def admin_list_flags(
user_id: Optional[str] = Query(None),
- flag_type: Optional[str] = Query(None),
+ flag_type: Optional[FlagType] = Query(None),
...
):
...
if flag_type:
- q = q.where(SybilFlagTable.flag_type == flag_type)
+ q = q.where(SybilFlagTable.flag_type == flag_type) # Now type-safeUsing FlagType as the parameter type provides automatic validation and proper enum comparison.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/api/anti_sybil.py` around lines 130 - 147, The admin_list_flags
handler compares a plain str to an Enum column (SybilFlagTable.flag_type),
causing mismatches; fix by using the FlagType enum for the query: import the
FlagType enum and either change the flag_type parameter to Optional[FlagType]
(so FastAPI validates/coerces it) or, if keeping a str param, convert/validate
it with FlagType(flag_type) before using it in the q.where(...) expression so
the comparison is enum-to-enum (reference symbols: admin_list_flags, flag_type
param, SybilFlagTable.flag_type, FlagType).
| result = await self._db.execute( | ||
| select(OutboundWebhookQueueDB) | ||
| .where(OutboundWebhookQueueDB.processed.is_(False)) | ||
| .order_by(OutboundWebhookQueueDB.created_at.asc()) | ||
| ) | ||
| queue_items = result.scalars().all() | ||
| if not queue_items: | ||
| return | ||
|
|
||
| # 2. Mark as processed immediately to prevent double-processing | ||
| item_ids = [item.id for item in queue_items] | ||
| await self._db.execute( | ||
| update(OutboundWebhookQueueDB) | ||
| .where(OutboundWebhookQueueDB.id.in_(item_ids)) | ||
| .values(processed=True) | ||
| ) | ||
| await self._db.commit() |
There was a problem hiding this comment.
Queue items are acknowledged before delivery, so this path can both lose and duplicate events.
periodic_batch_dispatch() marks every fetched queue row processed=True and commits before any HTTP delivery happens. If the process crashes, gets cancelled, or a batch task fails after Line 221, those events are already acknowledged and will never be retried. The same SELECT unprocessed → UPDATE processed split is also not an atomic claim, so if more than one dispatcher loop runs at the same time, both workers can read the same rows and deliver duplicates. That is a reliability break in the new batched-delivery pipeline.
Also applies to: 251-252, 439-451
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/services/contributor_webhook_service.py` around lines 205 - 221,
periodic_batch_dispatch currently selects unprocessed OutboundWebhookQueueDB
rows, sets processed=True and commits before any HTTP delivery, which causes
lost or duplicated deliveries; instead perform an atomic claim for work (e.g.,
using an UPDATE ... WHERE processed=False ... RETURNING id,... or SELECT ... FOR
UPDATE SKIP LOCKED inside a transaction) so rows are marked/locked as claimed in
the same DB operation/transaction, then commit only after successful delivery
and mark processed=True (or delete/archival) post-delivery; update the logic in
periodic_batch_dispatch (and the similar blocks around the referenced code
paths) to use a DB-level claim pattern and avoid committing before HTTP
delivery.
| tasks = [] | ||
| for wh in webhooks: | ||
| events = webhook_to_events.get(wh.id, []) | ||
| if events: | ||
| tasks.append(self._deliver_batch(wh, events)) | ||
|
|
||
| tasks = [self._deliver_with_retry(wh, event, payload_bytes) for wh in webhooks] | ||
| if tasks: | ||
| await asyncio.gather(*tasks, return_exceptions=True) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
nl -ba backend/app/services/contributor_webhook_service.py | sed -n '193,330p'Repository: SolFoundry/solfoundry
Length of output: 105
🏁 Script executed:
# First, let's get the file size and then read the relevant sections
wc -l backend/app/services/contributor_webhook_service.pyRepository: SolFoundry/solfoundry
Length of output: 120
🏁 Script executed:
# Read lines 240-330 to see the full context of the concern
sed -n '240,330p' backend/app/services/contributor_webhook_service.py | cat -nRepository: SolFoundry/solfoundry
Length of output: 4233
🏁 Script executed:
# Also check the _deliver_batch method and any database session initialization
sed -n '1,100p' backend/app/services/contributor_webhook_service.py | cat -nRepository: SolFoundry/solfoundry
Length of output: 3717
🏁 Script executed:
grep -n "_record_delivery" backend/app/services/contributor_webhook_service.pyRepository: SolFoundry/solfoundry
Length of output: 307
🏁 Script executed:
# Read the _record_delivery method implementation
sed -n '330,451p' backend/app/services/contributor_webhook_service.py | cat -nRepository: SolFoundry/solfoundry
Length of output: 5724
🏁 Script executed:
# Find database configuration
fd -t f "database" backend/ | head -20Repository: SolFoundry/solfoundry
Length of output: 121
🏁 Script executed:
# Check the get_db_session function
grep -r "def get_db_session" backend/Repository: SolFoundry/solfoundry
Length of output: 116
🏁 Script executed:
# Read database configuration
find backend -name "database.py" -o -name "*db*" -type f | head -5Repository: SolFoundry/solfoundry
Length of output: 88
🏁 Script executed:
cat backend/app/database.pyRepository: SolFoundry/solfoundry
Length of output: 6695
🏁 Script executed:
# Find the periodic_batch_dispatch method in the service
sed -n '200,250p' backend/app/services/contributor_webhook_service.py | cat -nRepository: SolFoundry/solfoundry
Length of output: 2325
Multiple concurrent webhook delivery tasks share a single database session, causing transaction corruption.
Lines 245-252 create concurrent _deliver_batch() tasks via asyncio.gather(), but all tasks mutate delivery records through the shared self._db session. Each task calls _record_delivery() (which executes SQL and commits) and then directly executes self._db.add() and self._db.commit(). SQLAlchemy's AsyncSession is not safe for concurrent access—the interleaved execute(), add(), and commit() calls will corrupt transaction state or fail with "Session is already executing" errors.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/services/contributor_webhook_service.py` around lines 245 - 252,
Concurrent calls to _deliver_batch() use the shared self._db session causing
transaction corruption; change each delivery task to use its own AsyncSession
instead of the shared session by instantiating a new session from your
async_sessionmaker (e.g., self._async_sessionmaker()) inside the task creation
loop and pass that session into _deliver_batch and _record_delivery (update
those methods to accept a session parameter and use it for execute/add/commit),
ensuring each task commits on its own session and closes it when done; if
refactoring is not possible, serialize DB access by running deliveries
sequentially or protect DB-mutating calls with an asyncio.Lock in
_record_delivery.
| async def get_delivery_stats(self, webhook_id: str) -> WebhookDeliveryStats: | ||
| """Calculate delivery stats and return recent history for the dashboard.""" | ||
| from app.models.contributor_webhook import OutboundWebhookLogDB | ||
|
|
||
| # Total attempted | ||
| count_result = await self._db.execute( | ||
| select(func.count()).select_from(OutboundWebhookLogDB).where(OutboundWebhookLogDB.webhook_id == UUID(webhook_id)) | ||
| ) | ||
| total = count_result.scalar_one() | ||
|
|
||
| if total == 0: | ||
| return WebhookDeliveryStats( | ||
| total_deliveries=0, | ||
| success_rate=0.0, | ||
| failure_rate=0.0, | ||
| last_10_deliveries=[], | ||
| ) | ||
|
|
||
| # Success count | ||
| success_result = await self._db.execute( | ||
| select(func.count()).select_from(OutboundWebhookLogDB).where( | ||
| OutboundWebhookLogDB.webhook_id == UUID(webhook_id), | ||
| OutboundWebhookLogDB.status == "success" | ||
| ) | ||
| ) | ||
| success_count = success_result.scalar_one() | ||
|
|
||
| # Last 10 | ||
| history_result = await self._db.execute( | ||
| select(OutboundWebhookLogDB) | ||
| .where(OutboundWebhookLogDB.webhook_id == UUID(webhook_id)) | ||
| .order_by(OutboundWebhookLogDB.delivered_at.desc()) | ||
| .limit(10) | ||
| ) | ||
| history = history_result.scalars().all() | ||
|
|
||
| return WebhookDeliveryStats( | ||
| total_deliveries=total, | ||
| success_rate=round(success_count / total, 4), | ||
| failure_rate=round((total - success_count) / total, 4), | ||
| last_10_deliveries=[ | ||
| { | ||
| "batch_id": h.batch_id, | ||
| "status": h.status, | ||
| "response_code": h.response_code, | ||
| "delivered_at": h.delivered_at.isoformat(), | ||
| "error": h.error_message, | ||
| } | ||
| for h in history | ||
| ], | ||
| ) |
There was a problem hiding this comment.
get_delivery_stats() cannot enforce webhook ownership.
Unlike test_webhook(), which receives user_id and filters by both webhook ID and owner, this method only accepts webhook_id and reads logs directly. Unless the new stats route performs a separate ownership lookup before calling it, any caller with another webhook UUID can read that subscriber’s delivery history and response codes. The service API itself has no authorization boundary here. As per coding guidelines, backend/**: Python FastAPI backend. Analyze thoroughly: Authentication/authorization gaps.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/services/contributor_webhook_service.py` around lines 329 - 379,
get_delivery_stats currently accepts only webhook_id and reads
OutboundWebhookLogDB rows directly, creating an authorization gap unlike
test_webhook which enforces ownership; change get_delivery_stats to accept the
caller's user_id (or fetch the ContributorWebhook record and verify its owner)
and explicitly filter OutboundWebhookLogDB queries by the verified webhook owner
(or raise a permission error) before returning WebhookDeliveryStats; ensure you
reuse the same UUID(webhook_id) conversion and apply the ownership check prior
to counting/slicing history so callers cannot read other subscribers' delivery
history.
| async def get_delivery_stats(self, webhook_id: str) -> WebhookDeliveryStats: | ||
| """Calculate delivery stats and return recent history for the dashboard.""" | ||
| from app.models.contributor_webhook import OutboundWebhookLogDB | ||
|
|
||
| # Total attempted | ||
| count_result = await self._db.execute( | ||
| select(func.count()).select_from(OutboundWebhookLogDB).where(OutboundWebhookLogDB.webhook_id == UUID(webhook_id)) |
There was a problem hiding this comment.
Malformed webhook IDs will currently surface as 500s.
Both new service methods call UUID(webhook_id) on raw strings. The provided test route snippet in backend/app/api/contributor_webhooks.py:156-164 declares webhook_id as str, so a non-UUID path value will raise ValueError inside the service instead of returning a controlled 4xx. The same risk exists for the new stats path if it is wired the same way. As per coding guidelines, backend/**: Python FastAPI backend. Analyze thoroughly: Input validation and SQL injection vectors; Error handling and edge case coverage.
Also applies to: 381-396
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/services/contributor_webhook_service.py` around lines 329 - 335,
The service methods (e.g., get_delivery_stats) currently call UUID(webhook_id)
directly on an unvalidated string which raises ValueError and produces 500s for
malformed IDs; wrap the conversion in a try/except or accept a UUID-typed
parameter at the API layer and return a controlled 400 on failure: import uuid
and replace UUID(webhook_id) with a safe parse (try: uid = uuid.UUID(webhook_id)
except ValueError: raise HTTPException(status_code=400, detail="invalid
webhook_id")) and use uid in the query (same fix for the other method referenced
around lines 381-396); this ensures proper 4xx error handling and keeps the UUID
object typed for the OutboundWebhookLogDB.where() call.
| async def _init(): | ||
| from app.database import engine, Base | ||
| from app.models.anti_sybil import SybilFlagTable, SybilAppealTable, IpAccountMapTable, WalletFundingMapTable # noqa | ||
| async with engine.begin() as conn: | ||
| await conn.run_sync(Base.metadata.create_all) | ||
| asyncio.get_event_loop().run_until_complete(_init()) | ||
|
|
||
| client = TestClient(test_app, raise_server_exceptions=True) | ||
| resp = client.get("/anti-sybil/my-flags", headers={"Authorization": "Bearer token"}) | ||
| assert resp.status_code == 200 | ||
| assert resp.json() == [] | ||
|
|
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Deprecated asyncio.get_event_loop() pattern and test isolation concern.
Line 670 uses asyncio.get_event_loop().run_until_complete(_init()) which is deprecated in Python 3.10+ and may cause warnings or errors. Use asyncio.run(_init()) instead.
Additionally, this test creates tables in a shared in-memory database during test execution, which can cause test pollution if other tests run in the same process.
♻️ Proposed fix
- asyncio.get_event_loop().run_until_complete(_init())
+ asyncio.run(_init())🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/tests/test_anti_sybil.py` around lines 665 - 676, Replace the
deprecated asyncio.get_event_loop().run_until_complete(_init()) call with
asyncio.run(_init()) and ensure test isolation by adding cleanup to drop the
tables after the test: call asyncio.run to create the tables via the existing
_init() (which uses engine and Base), run the test client
(TestClient(test_app,...)), then call a teardown coroutine that uses async with
engine.begin() as conn: await conn.run_sync(Base.metadata.drop_all) to remove
created tables (or use a dedicated test-only engine/fixture that creates and
disposes its own Base metadata) so the test does not pollute the shared
in-memory DB.
Description
This PR extends the existing webhook system to trigger notifications for various on-chain events on Solana, handles enriched transaction data from indexers (Helius/Shyft), and implements high-efficiency batched delivery with a monitoring dashboard.
Closes #508
Solana Wallet for Payout
Wallet: 4QhseKvBuaCQhdkP248iXoUxohPzVC5m8pE9hAv4nMYw
Type of Change
Checklist
console.logor debugging code left behindTesting
Screenshots (if applicable)
Additional Notes
escrow.*andstake.*events./api/webhooks/solana/heliusto map on-chain transfers to platform events.