Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 12 additions & 20 deletions src/chat_sdk/state/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,35 +213,27 @@ async def acquire_lock(self, thread_id: str, ttl_ms: int) -> Lock | None:
self._ensure_connected()

token = _generate_token()
expires_at = _pg_timestamp_from_ms(ttl_ms)

# Two-step approach to prevent race condition when lock just expired.
# Step 1: Try INSERT for new locks (no existing row).
# Atomic upsert: INSERT succeeds for new rows; ON CONFLICT DO UPDATE
# fires only when the existing row is expired (WHERE expires_at <= now()).
# Postgres acquires a row lock on the conflicting row, so only one
# concurrent caller can win — eliminating the TOCTOU race that existed
# in the previous two-step INSERT-then-UPDATE approach.
row = await self._pool.fetchrow(
"""INSERT INTO chat_state_locks (key_prefix, thread_id, token, expires_at)
VALUES ($1, $2, $3, $4)
ON CONFLICT (key_prefix, thread_id) DO NOTHING
VALUES ($1, $2, $3, now() + make_interval(secs => $4::float / 1000))
ON CONFLICT (key_prefix, thread_id) DO UPDATE
SET token = EXCLUDED.token,
expires_at = EXCLUDED.expires_at,
updated_at = now()
WHERE chat_state_locks.expires_at <= now()
RETURNING thread_id, token, expires_at""",
self._key_prefix,
thread_id,
token,
expires_at,
ttl_ms,
)

if row is None:
# Step 2: Row exists — try UPDATE only if expired.
# UPDATE acquires a row lock, so only one concurrent caller wins.
row = await self._pool.fetchrow(
"""UPDATE chat_state_locks
SET token = $3, expires_at = $4, updated_at = now()
WHERE key_prefix = $1 AND thread_id = $2 AND expires_at <= now()
RETURNING thread_id, token, expires_at""",
self._key_prefix,
thread_id,
token,
expires_at,
)

if row is None:
return None

Expand Down
Empty file added tests/fixtures/__init__.py
Empty file.
33 changes: 33 additions & 0 deletions tests/fixtures/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""Fixture helpers for loading replay test fixtures.

Fixtures are JSON files from the TS Chat SDK integration tests.
They live in tests/fixtures/replay/ (copied from the TS repo).
"""

from __future__ import annotations

import json
from pathlib import Path

FIXTURE_DIR = Path(__file__).parent / "replay"

# Fallback to TS repo path if fixtures haven't been copied yet
_TS_FIXTURE_DIR = Path("/tmp/vercel-chat/packages/integration-tests/fixtures/replay")


def load_fixture(relative_path: str) -> dict:
"""Load a JSON fixture file by relative path (e.g., 'slack.json')."""
local_path = FIXTURE_DIR / relative_path
if local_path.exists():
return json.loads(local_path.read_text())

ts_path = _TS_FIXTURE_DIR / relative_path
if ts_path.exists():
return json.loads(ts_path.read_text())

raise FileNotFoundError(
f"Fixture not found: {relative_path}\n"
f" Looked in: {FIXTURE_DIR}\n"
f" Fallback: {_TS_FIXTURE_DIR}\n"
f" Run: python tests/fixtures/copy_fixtures.py"
)
36 changes: 36 additions & 0 deletions tests/fixtures/copy_fixtures.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""One-time script to copy fixture files from TS repo.

Run: python tests/fixtures/copy_fixtures.py
"""

import json
import os
import shutil
import sys

SRC = "/tmp/vercel-chat/packages/integration-tests/fixtures/replay"
DST = os.path.join(os.path.dirname(__file__), "replay")


def main():
if not os.path.isdir(SRC):
print(f"Source not found: {SRC}", file=sys.stderr)
sys.exit(1)

count = 0
for root, _dirs, files in os.walk(SRC):
for fname in sorted(files):
if not fname.endswith(".json"):
continue
rel = os.path.relpath(os.path.join(root, fname), SRC)
dst_path = os.path.join(DST, rel)
os.makedirs(os.path.dirname(dst_path), exist_ok=True)
shutil.copy2(os.path.join(root, fname), dst_path)
count += 1
print(f" Copied: {rel}")

print(f"\nCopied {count} fixture files to {DST}")


if __name__ == "__main__":
main()
106 changes: 106 additions & 0 deletions tests/fixtures/replay/actions-reactions/gchat.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
{
"botName": "Chat SDK Demo",
"botUserId": "users/100000000000000000002",
"mention": {
"commonEventObject": {
"userLocale": "en",
"hostApp": "CHAT",
"platform": "WEB"
},
"chat": {
"user": {
"name": "users/100000000000000000001",
"displayName": "Test User",
"type": "HUMAN"
},
"eventTime": "2026-01-02T23:32:28.751807Z",
"messagePayload": {
"space": {
"name": "spaces/AAQAO1heGsE",
"type": "ROOM",
"displayName": "Test Chat SDK 4",
"spaceType": "SPACE"
},
"message": {
"name": "spaces/AAQAO1heGsE/messages/A6woZAHrIjs.A6woZAHrIjs",
"sender": {
"name": "users/100000000000000000001",
"displayName": "Test User",
"type": "HUMAN"
},
"createTime": "2026-01-02T23:32:28.751807Z",
"text": "@Chat SDK Demo Hey",
"annotations": [
{
"type": "USER_MENTION",
"startIndex": 0,
"length": 14,
"userMention": {
"user": {
"name": "users/100000000000000000002",
"displayName": "Chat SDK Demo",
"type": "BOT"
},
"type": "MENTION"
}
}
],
"thread": { "name": "spaces/AAQAO1heGsE/threads/A6woZAHrIjs" },
"space": { "name": "spaces/AAQAO1heGsE" }
}
}
}
},
"action": {
"commonEventObject": {
"userLocale": "en",
"hostApp": "CHAT",
"platform": "WEB",
"parameters": {
"actionId": "hello"
}
},
"chat": {
"user": {
"name": "users/100000000000000000001",
"displayName": "Test User",
"type": "HUMAN"
},
"eventTime": "2026-01-02T23:32:34.175996Z",
"buttonClickedPayload": {
"space": {
"name": "spaces/AAQAO1heGsE",
"type": "ROOM",
"displayName": "Test Chat SDK 4",
"spaceType": "SPACE"
},
"message": {
"name": "spaces/AAQAO1heGsE/messages/A6woZAHrIjs.9qQCF6JGUNg",
"sender": {
"name": "users/100000000000000000002",
"displayName": "Chat SDK Demo",
"type": "BOT"
},
"createTime": "2026-01-02T23:32:31.646079Z",
"thread": { "name": "spaces/AAQAO1heGsE/threads/A6woZAHrIjs" },
"space": { "name": "spaces/AAQAO1heGsE" }
}
}
}
},
"reaction": {
"message": {
"attributes": {
"ce-datacontenttype": "application/json",
"ce-id": "spaces/AAQADmenPpY/spaceEvents/MTc2NzMyNjEwMzIzODkyNF81OF9jcmVhdGVk",
"ce-source": "//workspaceevents.googleapis.com/subscriptions/chat-spaces-czpBQVFBRG1lblBwWToxMTc5OTQ4NzMzNTQzNzU4NjAwODk6MTEzOTc3OTE2MjAxNTUyMzQ2MTQ2",
"ce-specversion": "1.0",
"ce-subject": "//chat.googleapis.com/spaces/AAQADmenPpY",
"ce-time": "2026-01-02T03:55:03.238924Z",
"ce-type": "google.workspace.chat.reaction.v1.created"
},
"data": "eyJyZWFjdGlvbiI6eyJuYW1lIjoic3BhY2VzL0FBUUFEbWVuUHBZL21lc3NhZ2VzL2hDQ3lET2hhNy1vLjRsUXZNdUVHTTBnL3JlYWN0aW9ucy8xMDAwMDAwMDAwMDAwMDAwMDAwMDEuVlU1SlEwOUVSUzd3bjVHTiIsInVzZXIiOnsibmFtZSI6InVzZXJzLzEwMDAwMDAwMDAwMDAwMDAwMDAwMSIsInR5cGUiOiJIVU1BTiJ9LCJlbW9qaSI6eyJ1bmljb2RlIjoi8J+RjSJ9fX0="
},
"subscription": "projects/example-chat-project-123456/subscriptions/chat-messages-push"
}
}
Loading
Loading