Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions api/integrations/azure_devops/client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,37 @@
from integrations.azure_devops.client.api import list_projects
from integrations.azure_devops.client.api import (
list_projects,
list_pull_requests,
list_repositories,
list_work_items,
)
from integrations.azure_devops.client.exceptions import (
AzureDevOpsAuthError,
AzureDevOpsError,
AzureDevOpsNotFoundError,
)
from integrations.azure_devops.client.types import AdoProject, AdoProjectsPage
from integrations.azure_devops.client.types import (
AdoProject,
AdoProjectsPage,
AdoPullRequest,
AdoPullRequestsPage,
AdoRepository,
AdoWorkItem,
AdoWorkItemsPage,
)

__all__ = [
"AdoProject",
"AdoProjectsPage",
"AdoPullRequest",
"AdoPullRequestsPage",
"AdoRepository",
"AdoWorkItem",
"AdoWorkItemsPage",
"AzureDevOpsAuthError",
"AzureDevOpsError",
"AzureDevOpsNotFoundError",
"list_projects",
"list_pull_requests",
"list_repositories",
"list_work_items",
]
184 changes: 182 additions & 2 deletions api/integrations/azure_devops/client/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,15 @@
AzureDevOpsAuthError,
AzureDevOpsNotFoundError,
)
from integrations.azure_devops.client.types import AdoProject, AdoProjectsPage
from integrations.azure_devops.client.types import (
AdoProject,
AdoProjectsPage,
AdoPullRequest,
AdoPullRequestsPage,
AdoRepository,
AdoWorkItem,
AdoWorkItemsPage,
)
from integrations.azure_devops.constants import (
AZURE_DEVOPS_API_VERSION,
AZURE_DEVOPS_CLIENT_TIMEOUT_SECONDS,
Expand All @@ -23,12 +31,19 @@ def _ado_request(
json_body: dict[str, Any] | None = None,
) -> requests.Response:
base = organisation_url.rstrip("/")
# `path` may be either a bare segment ("projects") or already contain
# "_apis/" (for project-scoped endpoints like
# "{ado_project_id}/_apis/git/..."). Honour both.
if "_apis/" in path:
url = f"{base}/{path}"
else:
url = f"{base}/_apis/{path}"
query: dict[str, Any] = {"api-version": AZURE_DEVOPS_API_VERSION}
if params:
query.update(params)
response = requests.request(
method,
f"{base}/_apis/{path}",
url,
auth=("", pat),
params=query,
json=json_body,
Expand Down Expand Up @@ -73,3 +88,168 @@ def list_projects(
]
next_token = response.headers.get("x-ms-continuationtoken")
return AdoProjectsPage(results=results, continuation_token=next_token)


def list_repositories(
*,
organisation_url: str,
pat: str,
ado_project_id: str,
) -> list[AdoRepository]:
response = _ado_request(
"GET",
organisation_url,
pat,
path=f"{ado_project_id}/_apis/git/repositories",
)
payload = response.json()
return [
AdoRepository(
id=item["id"],
name=item["name"],
default_branch=item.get("defaultBranch", ""),
)
for item in payload.get("value", [])
]


def list_pull_requests(
*,
organisation_url: str,
pat: str,
ado_project_id: str,
state: str = "active",
top: int | None = None,
continuation_token: str | None = None,
) -> AdoPullRequestsPage:
params: dict[str, Any] = {"searchCriteria.status": state}
if top is not None:
params["$top"] = top
if continuation_token is not None:
params["continuationToken"] = continuation_token

response = _ado_request(
"GET",
organisation_url,
pat,
path=f"{ado_project_id}/_apis/git/pullrequests",
params=params,
)
payload = response.json()
results: list[AdoPullRequest] = [
AdoPullRequest(
id=item["pullRequestId"],
title=item["title"],
state=item["status"],
is_draft=item.get("isDraft", False),
web_url=item.get("_links", {}).get("web", {}).get("href", ""),
repository_name=item.get("repository", {}).get("name", ""),
)
for item in payload.get("value", [])
]
next_token = response.headers.get("x-ms-continuationtoken")
return AdoPullRequestsPage(results=results, continuation_token=next_token)


_WORK_ITEM_FIELDS = [
"System.Id",
"System.Title",
"System.State",
"System.WorkItemType",
]


def _escape_wiql_string(value: str) -> str:
# WIQL escapes single quotes by doubling them. There is no other
# escape character. We control the column names; only user-supplied
# values need this.
return value.replace("'", "''")


def _wiql_query_for_work_items(
*,
search_text: str | None,
state: str | None,
work_item_type: str | None,
) -> str:
clauses = ["[System.TeamProject] = @project"]
if state:
clauses.append(f"[System.State] = '{_escape_wiql_string(state)}'")
if work_item_type:
clauses.append(
f"[System.WorkItemType] = '{_escape_wiql_string(work_item_type)}'"
)
if search_text:
clauses.append(f"[System.Title] CONTAINS '{_escape_wiql_string(search_text)}'")
where = " AND ".join(clauses)
return (
"SELECT [System.Id] FROM WorkItems "
f"WHERE {where} "
"ORDER BY [System.ChangedDate] DESC"
)


def list_work_items(
*,
organisation_url: str,
pat: str,
ado_project_id: str,
search_text: str | None = None,
state: str | None = None,
work_item_type: str | None = None,
top: int = 100,
continuation_token: str | None = None,
) -> AdoWorkItemsPage:
"""List ADO work items in a project, filterable by title text, state,
and work-item type. Implemented as a WIQL query for the IDs followed
by a batch fetch for the rows we want to display.

Pagination is offset-based on the WIQL ID list (ADO returns up to
20,000 IDs in one WIQL response per the docs). ``continuation_token``
encodes the offset into the WIQL ID list as a string integer; the
response's ``continuation_token`` is the offset to ask for next, or
``None`` if no further pages remain.
"""
query = _wiql_query_for_work_items(
search_text=search_text,
state=state,
work_item_type=work_item_type,
)
wiql_response = _ado_request(
"POST",
organisation_url,
pat,
path=f"{ado_project_id}/_apis/wit/wiql",
json_body={"query": query},
)
wiql_payload = wiql_response.json()
all_ids: list[int] = [item["id"] for item in wiql_payload.get("workItems", [])]
if not all_ids:
return AdoWorkItemsPage(results=[], continuation_token=None)

offset = int(continuation_token) if continuation_token is not None else 0
end = offset + top
page_ids = all_ids[offset:end]
if not page_ids:
return AdoWorkItemsPage(results=[], continuation_token=None)

batch_response = _ado_request(
"POST",
organisation_url,
pat,
path="wit/workitemsbatch",
json_body={"ids": page_ids, "fields": _WORK_ITEM_FIELDS},
)
batch_payload = batch_response.json()
results: list[AdoWorkItem] = [
AdoWorkItem(
id=item["id"],
title=item.get("fields", {}).get("System.Title", ""),
state=item.get("fields", {}).get("System.State", ""),
work_item_type=item.get("fields", {}).get("System.WorkItemType", ""),
web_url=item.get("_links", {}).get("html", {}).get("href", ""),
)
for item in batch_payload.get("value", [])
]
next_token = str(end) if end < len(all_ids) else None
return AdoWorkItemsPage(results=results, continuation_token=next_token)
33 changes: 33 additions & 0 deletions api/integrations/azure_devops/client/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,36 @@ class AdoProject(TypedDict):
class AdoProjectsPage(TypedDict):
results: list[AdoProject]
continuation_token: str | None


class AdoRepository(TypedDict):
id: str
name: str
default_branch: str


class AdoPullRequest(TypedDict):
id: int
title: str
state: str
is_draft: bool
web_url: str
repository_name: str


class AdoPullRequestsPage(TypedDict):
results: list[AdoPullRequest]
continuation_token: str | None


class AdoWorkItem(TypedDict):
id: int
title: str
state: str
work_item_type: str
web_url: str


class AdoWorkItemsPage(TypedDict):
results: list[AdoWorkItem]
continuation_token: str | None
30 changes: 30 additions & 0 deletions api/integrations/azure_devops/serializers/browse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from rest_framework import serializers

_PR_STATE_CHOICES = ("active", "completed", "abandoned", "all")


class AdoBrowseQueryParamsSerializer(serializers.Serializer[None]):
top = serializers.IntegerField(default=100, min_value=1, max_value=200)
continuation_token = serializers.CharField(required=False, allow_blank=True)


class AdoRepositoriesQueryParamsSerializer(AdoBrowseQueryParamsSerializer):
ado_project_id = serializers.CharField()
Comment on lines +1 to +12
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

Security Vulnerability: Path Traversal in ado_project_id

The ado_project_id query parameter is interpolated directly into the Azure DevOps API request paths (e.g., f"{ado_project_id}/_apis/git/repositories"). Since there is no validation on ado_project_id, an attacker with access to the browse endpoints could pass path traversal sequences (such as ../../) to manipulate the request URL. This would allow them to perform unauthorized API requests against other organizations or projects using the stored Personal Access Token (PAT).

Remediation:
Add a RegexValidator to ado_project_id in AdoRepositoriesQueryParamsSerializer to ensure it does not contain slashes (/) or backslashes (\\), which are forbidden in valid Azure DevOps project IDs and names anyway.

Suggested change
from rest_framework import serializers
_PR_STATE_CHOICES = ("active", "completed", "abandoned", "all")
class AdoBrowseQueryParamsSerializer(serializers.Serializer[None]):
top = serializers.IntegerField(default=100, min_value=1, max_value=200)
continuation_token = serializers.CharField(required=False, allow_blank=True)
class AdoRepositoriesQueryParamsSerializer(AdoBrowseQueryParamsSerializer):
ado_project_id = serializers.CharField()
from django.core.validators import RegexValidator
from rest_framework import serializers
_PR_STATE_CHOICES = ("active", "completed", "abandoned", "all")
class AdoBrowseQueryParamsSerializer(serializers.Serializer[None]):
top = serializers.IntegerField(default=100, min_value=1, max_value=200)
continuation_token = serializers.CharField(required=False, allow_blank=True)
class AdoRepositoriesQueryParamsSerializer(AdoBrowseQueryParamsSerializer):
ado_project_id = serializers.CharField(
validators=[
RegexValidator(
regex=r"^[^/\\\\]+$",
message="Invalid Azure DevOps project ID or name.",
)
]
)



class AdoPullRequestsQueryParamsSerializer(AdoRepositoriesQueryParamsSerializer):
state = serializers.ChoiceField(choices=_PR_STATE_CHOICES, default="active")


class AdoWorkItemsQueryParamsSerializer(AdoRepositoriesQueryParamsSerializer):
# Override the base CharField — the work-items client interprets the
# token as a non-negative integer offset into the WIQL ID list.
# Validating here prevents negative-slice leaks and ValueErrors.
# The ignore below is for the intentional field-type narrowing; DRF
# supports replacing inherited field types but Mypy flags the variance.
continuation_token = serializers.IntegerField( # type: ignore[assignment]
required=False, min_value=0
)
search_text = serializers.CharField(required=False, allow_blank=True)
state = serializers.CharField(required=False, allow_blank=True)
work_item_type = serializers.CharField(required=False, allow_blank=True)
14 changes: 13 additions & 1 deletion api/integrations/azure_devops/views/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
from integrations.azure_devops.views.browse_azure_devops import (
BrowseAdoProjects,
BrowseAdoPullRequests,
BrowseAdoRepositories,
BrowseAdoWorkItems,
)
from integrations.azure_devops.views.configuration import (
AzureDevOpsConfigurationViewSet,
)

__all__ = ["AzureDevOpsConfigurationViewSet"]
__all__ = [
"AzureDevOpsConfigurationViewSet",
"BrowseAdoProjects",
"BrowseAdoPullRequests",
"BrowseAdoRepositories",
"BrowseAdoWorkItems",
]
Loading
Loading