Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions backend/app/api/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -971,7 +971,13 @@ async def bind_identity(
user_info = await auth_provider.get_user_info(access_token)

# Check if identity is already linked to another user
existing_user = await sso_service.check_duplicate_identity(db, provider, user_info.provider_user_id)
lookup_provider_user_id = user_info.provider_union_id or user_info.provider_user_id
existing_user = await sso_service.check_duplicate_identity(
db,
provider,
lookup_provider_user_id,
identity_data=user_info.raw_data,
)
if existing_user and existing_user.id != current_user.id:
raise HTTPException(
status_code=409,
Expand All @@ -983,7 +989,7 @@ async def bind_identity(
db,
str(current_user.id),
provider,
user_info.provider_user_id,
lookup_provider_user_id,
user_info.raw_data,
)

Expand Down
6 changes: 4 additions & 2 deletions backend/app/api/feishu.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,8 @@ async def process_feishu_event(agent_id: uuid.UUID, body: dict, db: AsyncSession
"email": sender_email,
"mobile": _user_info.get("mobile"),
"avatar_url": _avatar_url,
"unionid": _user_info.get("user_id"), # tenant-level user_id
"external_id": _user_info.get("user_id"),
"unionid": _user_info.get("union_id"),
"open_id": sender_open_id,
}
logger.info(f"[Feishu] Resolved sender: {sender_name} (user_id={sender_user_id_feishu})")
Expand Down Expand Up @@ -1185,7 +1186,8 @@ async def _handle_feishu_file(db, agent_id, config, message, sender_open_id, cha
"avatar_url": _avatar_url,
"email": _user_info.get("email"),
"mobile": _user_info.get("mobile"),
"unionid": _user_info.get("user_id"),
"external_id": _user_info.get("user_id"),
"unionid": _user_info.get("union_id"),
"open_id": sender_open_id,
}
except Exception:
Expand Down
17 changes: 5 additions & 12 deletions backend/app/services/auth_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,19 +110,12 @@ async def find_or_create_user(
# 1. Try lookup via sso_service (which now uses OrgMember)
provider_user_id = user_info.provider_union_id or user_info.provider_user_id
user = await sso_service.resolve_user_identity(
db, provider_user_id, self.provider_type, tenant_id=tenant_id
db,
provider_user_id,
self.provider_type,
tenant_id=tenant_id,
identity_data=user_info.raw_data,
)

# Feishu: fallback to open_id if union_id lookup misses
if (
not user
and self.provider_type == "feishu"
and user_info.provider_union_id
and user_info.provider_user_id
):
user = await sso_service.resolve_user_identity(
db, user_info.provider_user_id, self.provider_type, tenant_id=tenant_id
)

is_new = False
if not user:
Expand Down
44 changes: 34 additions & 10 deletions backend/app/services/channel_user_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,27 @@
class ChannelUserService:
"""Service for resolving channel users via OrgMember and SSO patterns."""

def _get_channel_ids(
self,
channel_type: str,
external_user_id: str,
extra_info: dict[str, Any],
) -> tuple[str | None, str | None, str | None]:
unionid = (extra_info.get("unionid") or extra_info.get("union_id") or "").strip() or None
open_id = (extra_info.get("open_id") or "").strip() or None
external_id = (extra_info.get("external_id") or external_user_id or "").strip() or None

if channel_type == "feishu":
open_id = open_id or external_user_id
external_id = (extra_info.get("external_id") or "").strip() or None
elif channel_type == "dingtalk":
open_id = open_id or None
elif channel_type == "wecom":
unionid = None
open_id = open_id or None

return unionid, open_id, external_id

async def resolve_channel_user(
self,
db: AsyncSession,
Expand Down Expand Up @@ -104,13 +125,15 @@ async def resolve_channel_user(
db, user.id, provider.id, tenant_id
)
if existing_member:
# Reuse the org-synced record: update its channel-specific IDs
# so future lookups by external_id work without a new shell.
if channel_type == "feishu":
if external_user_id.startswith("on_"):
existing_member.unionid = existing_member.unionid or external_user_id
elif external_user_id.startswith("ou_"):
existing_member.open_id = existing_member.open_id or external_user_id
unionid, open_id, external_id = self._get_channel_ids(
channel_type, external_user_id, extra_info
)
if unionid and not existing_member.unionid:
existing_member.unionid = unionid
if open_id and not existing_member.open_id:
existing_member.open_id = open_id
if external_id and not existing_member.external_id:
existing_member.external_id = external_id
logger.info(
f"[{channel_type}] Reusing org-synced OrgMember {existing_member.id} "
f"for user {user.id} instead of creating a duplicate shell"
Expand Down Expand Up @@ -232,16 +255,17 @@ async def _create_org_member_shell(
) -> OrgMember:
"""Create a shell OrgMember record for this identity."""
name = extra_info.get("name") or f"{channel_type.capitalize()} User {external_user_id[:8]}"
unionid, open_id, external_id = self._get_channel_ids(channel_type, external_user_id, extra_info)

member = OrgMember(
name=name,
email=extra_info.get("email"),
provider_id=provider.id,
user_id=linked_user_id,
tenant_id=provider.tenant_id,
external_id=external_user_id,
unionid=extra_info.get("unionid"),
open_id=extra_info.get("open_id"),
external_id=external_id,
unionid=unionid,
open_id=open_id,
avatar_url=extra_info.get("avatar_url"),
phone=extra_info.get("mobile"),
title=extra_info.get("title", ""),
Expand Down
99 changes: 79 additions & 20 deletions backend/app/services/org_sync_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ async def sync_org_structure(self, db: AsyncSession) -> dict[str, Any]:
user_count = 0
profile_count = 0
sync_start = datetime.now()
partial_failure = False

# Ensure provider exists
provider = await self._ensure_provider(db)
Expand All @@ -149,6 +150,7 @@ async def sync_org_structure(self, db: AsyncSession) -> dict[str, Any]:
await self._upsert_department(db, provider, dept)
dept_count += 1
except Exception as e:
partial_failure = True
errors.append(f"Department {dept.external_id}: {str(e)}")
logger.error(f"[OrgSync] Failed to sync department {dept.external_id}: {e}")

Expand All @@ -157,6 +159,7 @@ async def sync_org_structure(self, db: AsyncSession) -> dict[str, Any]:
try:
users = await self.fetch_users(dept.external_id)
except Exception as e:
partial_failure = True
logger.error(f"[OrgSync] Failed to fetch users in department {dept.external_id}: {e}")
errors.append(f"Fetch users in dept {dept.external_id}: {str(e)}")
continue
Expand All @@ -171,6 +174,7 @@ async def sync_org_structure(self, db: AsyncSession) -> dict[str, Any]:
profile_count += 1
member_count += 1
except Exception as e:
partial_failure = True
logger.error(f"[OrgSync] Failed to sync member {user.external_id} ({user.name}): {e}")
errors.append(f"Member {user.external_id}: {str(e)}")

Expand All @@ -181,9 +185,15 @@ async def sync_org_structure(self, db: AsyncSession) -> dict[str, Any]:
self.provider.config = config
await db.flush()

# Reconciliation: mark records not updated in this sync as deleted
await self._reconcile(db, provider.id, sync_start)
await db.flush()
if partial_failure:
logger.warning(
f"[OrgSync] Skipping reconcile for provider {provider.id} because this sync had partial failures"
)
errors.append("Reconcile skipped due to partial sync failures")
else:
# Reconciliation: mark records not updated in this sync as deleted
await self._reconcile(db, provider.id, sync_start)
await db.flush()

# Recalculate member counts for all departments (crucial for DingTalk/WeCom)
await self._update_member_counts(db, provider.id)
Expand Down Expand Up @@ -386,6 +396,7 @@ async def _upsert_member(
) -> dict[str, Any]:
"""Insert or update a member, platform user, and identity."""
stats = {"user_created": False, "profile_synced": False}
self._validate_member_identifiers(provider, user)

# Find department using user's actual department list.
# DingTalk's dept_id_list last item is the most specific (leaf) department.
Expand Down Expand Up @@ -413,23 +424,7 @@ async def _upsert_member(
)
department = dept_result.scalars().first()

# Check if exists by unionid or external_id or open_id (any matches), and provider
conditions = []
if user.unionid:
conditions.append(OrgMember.unionid == user.unionid)
if user.external_id:
conditions.append(OrgMember.external_id == user.external_id)
if user.open_id:
conditions.append(OrgMember.open_id == user.open_id)

if conditions:
result = await db.execute(
select(OrgMember).where(
OrgMember.provider_id == provider.id,
or_(*conditions)
)
)
existing_member = result.scalars().first()
existing_member = await self._find_existing_member(db, provider, user)

now = datetime.now()

Expand Down Expand Up @@ -535,6 +530,70 @@ async def _upsert_member(
await db.flush()
return stats

def _provider_requires_unionid(self, provider: IdentityProvider) -> bool:
provider_type = (provider.provider_type or self.provider_type or "").lower()
return provider_type in {"feishu", "dingtalk"}

def _validate_member_identifiers(self, provider: IdentityProvider, user: ExternalUser) -> None:
user.unionid = (user.unionid or "").strip()
user.external_id = (user.external_id or "").strip()
user.open_id = (user.open_id or "").strip()

if self._provider_requires_unionid(provider) and not user.unionid:
raise ValueError(
f"unionid is required for {provider.provider_type} org sync user {user.external_id or user.name}"
)
Comment on lines +542 to +545
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Prevent reconcile deletion on unionid validation failures

Raising on missing unionid here causes _upsert_member to fail for that user, but sync_org_structure continues and then runs reconciliation, which marks non-updated members as deleted. That means a transient provider response/permission issue that omits unionid can soft-delete previously valid Feishu/DingTalk members in bulk on a single sync run, which is a data-loss regression introduced by this strict validation path.

Useful? React with 👍 / 👎.


if user.unionid and user.external_id and user.unionid == user.external_id:
raise ValueError(
f"invalid unionid for org sync user {user.external_id or user.name}: unionid must not equal external_id"
)

async def _find_existing_member(
self,
db: AsyncSession,
provider: IdentityProvider,
user: ExternalUser,
) -> OrgMember | None:
if user.unionid:
result = await db.execute(
select(OrgMember).where(
OrgMember.provider_id == provider.id,
OrgMember.unionid == user.unionid,
)
)
existing_member = result.scalars().first()
if existing_member:
return existing_member

fallback_conditions = []
if user.external_id:
fallback_conditions.append(OrgMember.external_id == user.external_id)
if user.open_id:
fallback_conditions.append(OrgMember.open_id == user.open_id)

if not fallback_conditions:
return None

fallback_query = select(OrgMember).where(
OrgMember.provider_id == provider.id,
or_(*fallback_conditions),
)

# When unionid is required, only allow external/open id fallback to attach
# shell records that do not have a conflicting unionid yet.
if self._provider_requires_unionid(provider) and user.unionid:
fallback_query = fallback_query.where(
or_(
OrgMember.unionid.is_(None),
OrgMember.unionid == "",
OrgMember.unionid == user.unionid,
)
Comment on lines +585 to +591
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Match legacy members when reconciling corrected unionid

The new fallback filter only allows external_id/open_id matches when existing unionid is null/empty or already equal to the incoming value. That blocks updates to legacy rows with incorrect non-empty unionid values (a known historical state), so _upsert_member creates a new row instead of updating the existing linked one; if email/mobile is absent, the replacement row loses user_id, and a later reconcile can delete the previously linked row. This introduces account-link breakage during normal sync migration.

Useful? React with 👍 / 👎.

)

result = await db.execute(fallback_query)
return result.scalars().first()

async def _resolve_platform_user(self, db: AsyncSession, user: ExternalUser) -> User | None:
"""Resolve platform user from external user info."""
# 1. Try by Email matching (primary way now)
Expand Down
24 changes: 18 additions & 6 deletions backend/app/services/registration_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,14 @@ async def handle_sso_registration(
tenant_id = tenant.id if tenant else None

# Check if identity already exists
existing = await sso_service.resolve_user_identity(db, provider_user_id, provider_type, tenant_id=tenant_id)
lookup_provider_user_id = user_info.get("union_id") or user_info.get("unionId") or provider_user_id
existing = await sso_service.resolve_user_identity(
db,
lookup_provider_user_id,
provider_type,
tenant_id=tenant_id,
identity_data=user_info,
)

if existing:
# Identity already linked
Expand All @@ -279,7 +286,7 @@ async def handle_sso_registration(
db,
str(existing_user.id),
provider_type,
provider_user_id,
lookup_provider_user_id,
user_info,
tenant_id=str(existing_user.tenant_id) if existing_user.tenant_id else tenant_id,
)
Expand Down Expand Up @@ -360,8 +367,13 @@ async def register_with_sso(
tenant_id = tenant.id if tenant else None

# Try to find existing user by identity
lookup_provider_user_id = user_info_obj.provider_union_id or user_info_obj.provider_user_id
existing_user = await sso_service.resolve_user_identity(
db, user_info_obj.provider_user_id, provider_type, tenant_id=tenant_id
db,
lookup_provider_user_id,
provider_type,
tenant_id=tenant_id,
identity_data=user_info,
)

if existing_user:
Expand All @@ -377,7 +389,7 @@ async def register_with_sso(
db,
str(existing_by_email.id),
provider_type,
user_info_obj.provider_user_id,
lookup_provider_user_id,
user_info,
tenant_id=str(existing_by_email.tenant_id) if existing_by_email.tenant_id else tenant_id,
)
Expand All @@ -387,7 +399,7 @@ async def register_with_sso(
user, is_new = await self.handle_sso_registration(
db,
provider_type,
user_info_obj.provider_user_id,
lookup_provider_user_id,
user_info,
)

Expand Down Expand Up @@ -525,4 +537,4 @@ async def sync_org_member_contact_from_user(


# Global registration service
registration_service = RegistrationService()
registration_service = RegistrationService()
Loading