Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 21 additions & 20 deletions backend/app/api/teams.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,42 +37,44 @@

TEAMS_MSG_LIMIT = 28000 # Teams message char limit (approx 28KB)

# In-memory cache for OAuth tokens
_teams_tokens: dict[str, dict] = {} # agent_id -> {access_token, expires_at}


async def _get_teams_access_token(config: ChannelConfig) -> str | None:
"""Get or refresh Microsoft Teams access token.

Supports:
- Client credentials (app_id + app_secret) - default
- Managed Identity (when use_managed_identity is True in extra_config)
Token is cached in Redis (preferred) with in-memory fallback.
Key: clawith:token:teams:{agent_id}
"""
from app.core.token_cache import get_cached_token, set_cached_token

agent_id = str(config.agent_id)
cached = _teams_tokens.get(agent_id)
if cached and cached["expires_at"] > time.time() + 60: # Refresh 60s before expiry
cache_key = f"clawith:token:teams:{agent_id}"

cached = await get_cached_token(cache_key)
if cached:
logger.debug(f"Teams: Using cached access token for agent {agent_id}")
return cached["access_token"]
return cached

# Check if managed identity should be used
use_managed_identity = config.extra_config.get("use_managed_identity", False)

if use_managed_identity:
# Use Azure Managed Identity
try:
from azure.identity.aio import DefaultAzureCredential
from azure.core.credentials import AccessToken

credential = DefaultAzureCredential()
# For Bot Framework, we need the token for the Bot Framework API
# Managed identity needs to be granted permissions to the Bot Framework API
scope = "https://api.botframework.com/.default"
token: AccessToken = await credential.get_token(scope)

_teams_tokens[agent_id] = {
"access_token": token.token,
"expires_at": token.expires_on,
}

# expires_on is a Unix timestamp; TTL = expires_on - now - 60s buffer
ttl = max(int(token.expires_on - time.time()) - 60, 60)
await set_cached_token(cache_key, token.token, ttl)
logger.info(f"Teams: Successfully obtained access token via managed identity for agent {agent_id}, expires at {token.expires_on}")
await credential.close()
return token.token
Expand All @@ -82,7 +84,7 @@ async def _get_teams_access_token(config: ChannelConfig) -> str | None:
except Exception as e:
logger.exception(f"Teams: Failed to get access token via managed identity for agent {agent_id}: {e}")
return None

# Use client credentials (app_id + app_secret)
app_id = config.app_id
app_secret = config.app_secret
Expand Down Expand Up @@ -117,11 +119,10 @@ async def _get_teams_access_token(config: ChannelConfig) -> str | None:
access_token = token_data["access_token"]
expires_in = token_data["expires_in"]

_teams_tokens[agent_id] = {
"access_token": access_token,
"expires_at": time.time() + expires_in,
}
logger.info(f"Teams: Successfully obtained access token for agent {agent_id}, expires in {expires_in}s")
# TTL = expires_in - 60s buffer
ttl = max(expires_in - 60, 60)
await set_cached_token(cache_key, access_token, ttl)
logger.info(f"Teams: Successfully obtained access token for agent {agent_id}, expires in {expires_in}s, TTL={ttl}s")
return access_token
except httpx.HTTPStatusError as e:
error_body = e.response.text if hasattr(e, 'response') and e.response else "No response body"
Expand Down
44 changes: 33 additions & 11 deletions backend/app/api/wecom.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,34 @@
router = APIRouter(tags=["wecom"])


async def _get_wecom_token_cached(corp_id: str, corp_secret: str) -> str:
"""Get WeCom access_token with Redis (preferred) + memory fallback caching.

Key: clawith:token:wecom:{corp_id}
TTL: 6900s (7200s validity - 5 min early refresh)
"""
from app.core.token_cache import get_cached_token, set_cached_token
import httpx as _httpx

cache_key = f"clawith:token:wecom:{corp_id}"
cached = await get_cached_token(cache_key)
if cached:
return cached

async with _httpx.AsyncClient(timeout=10) as _client:
_resp = await _client.get(
"https://qyapi.weixin.qq.com/cgi-bin/gettoken",
params={"corpid": corp_id, "corpsecret": corp_secret},
)
_data = _resp.json()
token = _data.get("access_token", "")
expires_in = int(_data.get("expires_in") or 7200)
if token:
ttl = max(expires_in - 300, 300)
await set_cached_token(cache_key, token, ttl)
return token


# ─── WeCom AES Crypto ──────────────────────────────────

def _pad(text: bytes) -> bytes:
Expand Down Expand Up @@ -445,13 +473,11 @@ async def _process_wecom_kf_event(agent_id: uuid.UUID, config_obj: ChannelConfig
if not config:
return

async with httpx.AsyncClient(timeout=10) as client:
tok_resp = await client.get("https://qyapi.weixin.qq.com/cgi-bin/gettoken", params={"corpid": config.app_id, "corpsecret": config.app_secret})
token_data = tok_resp.json()
access_token = token_data.get("access_token")
if not access_token:
return
access_token = await _get_wecom_token_cached(config.app_id, config.app_secret)
if not access_token:
return

async with httpx.AsyncClient(timeout=10) as client:
current_cursor = token
has_more = 1
current_ts = int(time.time())
Expand Down Expand Up @@ -596,12 +622,8 @@ async def _process_wecom_text(
# Send reply via WeCom API
wecom_agent_id = (config.extra_config or {}).get("wecom_agent_id", "")
try:
access_token = await _get_wecom_token_cached(config.app_id, config.app_secret)
async with httpx.AsyncClient(timeout=10) as client:
tok_resp = await client.get(
"https://qyapi.weixin.qq.com/cgi-bin/gettoken",
params={"corpid": config.app_id, "corpsecret": config.app_secret},
)
access_token = tok_resp.json().get("access_token", "")
if access_token:
if is_kf and open_kfid:
# For KF messages, need to bridge/trans state first then send via kf/send_msg
Expand Down
64 changes: 64 additions & 0 deletions backend/app/core/token_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""
Unified Redis-backed token cache with in-memory fallback.

Key naming convention: clawith:token:{type}:{identifier}
Examples:
clawith:token:dingtalk_corp:{app_key}
clawith:token:feishu_tenant:{app_id}
clawith:token:wecom:{corp_id}
clawith:token:teams:{agent_id}
"""
import time
from typing import Optional

# In-memory fallback store: {key: (value, expire_at)}
_memory_cache: dict[str, tuple[str, float]] = {}


async def get_cached_token(key: str) -> Optional[str]:
"""Get token from Redis (preferred) or memory fallback."""
# Try Redis first
try:
from app.core.events import get_redis
redis = await get_redis()
if redis:
val = await redis.get(key)
if val:
return val if isinstance(val, str) else val.decode()
except Exception:
pass

# Fallback to memory
if key in _memory_cache:
val, expire_at = _memory_cache[key]
if time.time() < expire_at:
return val
del _memory_cache[key]
return None


async def set_cached_token(key: str, value: str, ttl_seconds: int) -> None:
"""Set token in Redis (preferred) and memory fallback."""
# Try Redis first
try:
from app.core.events import get_redis
redis = await get_redis()
if redis:
await redis.setex(key, ttl_seconds, value)
except Exception:
pass

# Always set in memory as fallback
_memory_cache[key] = (value, time.time() + ttl_seconds)


async def delete_cached_token(key: str) -> None:
"""Delete token from both Redis and memory."""
try:
from app.core.events import get_redis
redis = await get_redis()
if redis:
await redis.delete(key)
except Exception:
pass
_memory_cache.pop(key, None)
24 changes: 20 additions & 4 deletions backend/app/services/auth_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,17 +294,33 @@ async def get_authorization_url(self, redirect_uri: str, state: str) -> str:
return f"{base_url}?{params}"

async def get_app_access_token(self) -> str:
if self._app_access_token:
return self._app_access_token
"""Get or refresh the Feishu app access token.

Cached in Redis (preferred) with in-memory fallback.
Key: clawith:token:feishu_tenant:{app_id}
TTL: 6900s (7200s validity - 5 min early refresh)
"""
from app.core.token_cache import get_cached_token, set_cached_token

cache_key = f"clawith:token:feishu_tenant:{self.app_id}"
cached = await get_cached_token(cache_key)
if cached:
self._app_access_token = cached
return cached

async with httpx.AsyncClient() as client:
resp = await client.post(
self.FEISHU_APP_TOKEN_URL,
json={"app_id": self.app_id, "app_secret": self.app_secret},
)
data = resp.json()
self._app_access_token = data.get("app_access_token", "")
return self._app_access_token
token = data.get("app_access_token", "") or data.get("tenant_access_token", "")
expire = data.get("expire", 7200)
if token:
ttl = max(expire - 300, 60)
await set_cached_token(cache_key, token, ttl)
self._app_access_token = token
return token

async def exchange_code_for_token(self, code: str) -> dict:
app_token = await self.get_app_access_token()
Expand Down
30 changes: 24 additions & 6 deletions backend/app/services/feishu_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,21 +86,39 @@ async def get_app_access_token(self) -> str:
return await self.get_tenant_access_token(self.app_id, self.app_secret)

async def get_tenant_access_token(self, app_id: str = None, app_secret: str = None) -> str:
"""Get or refresh the app-level access token (tenant_access_token)."""
"""Get or refresh the app-level access token (tenant_access_token).

Cached in Redis (preferred) with in-memory fallback.
Key: clawith:token:feishu_tenant:{app_id}
TTL: 6900s (7200s validity - 5 min early refresh)
"""
from app.core.token_cache import get_cached_token, set_cached_token

target_app_id = app_id or self.app_id
target_app_secret = app_secret or self.app_secret

cache_key = f"clawith:token:feishu_tenant:{target_app_id}"

cached = await get_cached_token(cache_key)
if cached:
if not app_id:
self._app_access_token = cached
return cached

async with httpx.AsyncClient() as client:
resp = await client.post(FEISHU_APP_TOKEN_URL, json={
"app_id": target_app_id,
"app_secret": target_app_secret,
})
data = resp.json()

token = data.get("tenant_access_token") or data.get("app_access_token", "")
if not app_id: # only cache default app token
self._app_access_token = token

expire = data.get("expire", 7200)
if token:
ttl = max(expire - 300, 60)
await set_cached_token(cache_key, token, ttl)
if not app_id: # only update instance var for default app token
self._app_access_token = token

return token

async def exchange_code_for_user(self, code: str) -> dict:
Expand Down
40 changes: 32 additions & 8 deletions backend/app/services/org_sync_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -781,12 +781,23 @@ def api_base_url(self) -> str:
return self.DINGTALK_API_URL

async def get_access_token(self) -> str:
if self._access_token and self._token_expires_at and datetime.now() < self._token_expires_at:
return self._access_token
"""Get or refresh the DingTalk access token.

Cached in Redis (preferred) with in-memory fallback.
Key: clawith:token:dingtalk_corp:{app_key}
TTL: expires_in - 300s (5 min early refresh)
"""
from app.core.token_cache import get_cached_token, set_cached_token

if not self.app_key or not self.app_secret:
raise ValueError("DingTalk app_key/app_secret missing in provider config")

cache_key = f"clawith:token:dingtalk_corp:{self.app_key}"
cached = await get_cached_token(cache_key)
if cached:
self._access_token = cached
return cached

async with httpx.AsyncClient() as client:
resp = await client.get(
self.DINGTALK_TOKEN_URL,
Expand All @@ -797,9 +808,11 @@ async def get_access_token(self) -> str:
raise RuntimeError(f"DingTalk token error: {data.get('errmsg') or data}")
token = data.get("access_token") or ""
expires_in = int(data.get("expires_in") or 7200)
self._access_token = token
# refresh a bit earlier
self._token_expires_at = datetime.now() + timedelta(seconds=max(expires_in - 60, 60))
if token:
ttl = max(expires_in - 300, 60)
await set_cached_token(cache_key, token, ttl)
self._access_token = token
self._token_expires_at = datetime.now() + timedelta(seconds=max(expires_in - 60, 60))
return token

async def fetch_departments(self) -> list[ExternalDepartment]:
Expand Down Expand Up @@ -990,19 +1003,30 @@ def api_base_url(self) -> str:
async def get_access_token(self) -> str:
"""Get valid access token using the 通讯录同步 (contact-sync) secret.

Cached in Redis (preferred) with in-memory fallback.
Key: clawith:token:wecom:{corp_id}
TTL: 6900s (7200s validity - 5 min early refresh)

This token can call department/simplelist and user/list_id.
It cannot call user/list or user/get (those raise errcode 48009).
Full user profiles are obtained passively via SSO login instead.
"""
if self._access_token and self._token_expires_at and datetime.now() < self._token_expires_at:
return self._access_token
from app.core.token_cache import get_cached_token, set_cached_token

if not self.corp_id or not self.secret:
raise ValueError("WeCom corp_id or secret missing in provider config")

cache_key = f"clawith:token:wecom:{self.corp_id}"
cached = await get_cached_token(cache_key)
if cached:
self._access_token = cached
return cached

token = await self._fetch_token(self.corp_id, self.secret)
if token:
ttl = max(7200 - 300, 300)
await set_cached_token(cache_key, token, ttl)
self._access_token = token
# Refresh slightly before true expiry to avoid clock-skew issues
self._token_expires_at = datetime.now() + timedelta(seconds=7200 - 300)
return token

Expand Down