Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/connection_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ def post_connection(*, session: Session = NEW_SESSION) -> APIResponse:
raise AlreadyExists(detail=f"Connection already exist. ID: {conn_id}")


@mark_fastapi_migration_done
@security.requires_access_connection("POST")
def test_connection() -> APIResponse:
"""
Expand Down
7 changes: 7 additions & 0 deletions airflow/api_fastapi/core_api/datamodels/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ class ConnectionCollectionResponse(BaseModel):
total_entries: int


class ConnectionTestResponse(BaseModel):
"""Connection Test serializer for responses."""

status: bool
message: str


# Request Models
class ConnectionBody(BaseModel):
"""Connection Serializer for requests body."""
Expand Down
61 changes: 61 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1213,6 +1213,53 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/connections/test:
post:
tags:
- Connection
summary: Test Connection
description: 'Test an API connection.


This method first creates an in-memory transient conn_id & exports that to
an env var,

as some hook classes tries to find out the `conn` from their __init__ method
& errors out if not found.

It also deletes the conn id env variable after the test.'
operationId: test_connection
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/ConnectionBody'
required: true
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/ConnectionTestResponse'
'401':
description: Unauthorized
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
'403':
description: Forbidden
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/{dag_run_id}:
get:
tags:
Expand Down Expand Up @@ -3568,6 +3615,20 @@ components:
- extra
title: ConnectionResponse
description: Connection serializer for responses.
ConnectionTestResponse:
properties:
status:
type: boolean
title: Status
message:
type: string
title: Message
type: object
required:
- status
- message
title: ConnectionTestResponse
description: Connection Test serializer for responses.
DAGCollectionResponse:
properties:
dags:
Expand Down
46 changes: 46 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations

import os
from typing import Annotated

from fastapi import Depends, HTTPException, Query, status
Expand All @@ -29,10 +30,14 @@
ConnectionBody,
ConnectionCollectionResponse,
ConnectionResponse,
ConnectionTestResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.configuration import conf
from airflow.models import Connection
from airflow.secrets.environment_variables import CONN_ENV_PREFIX
from airflow.utils import helpers
from airflow.utils.strings import get_random_string

connections_router = AirflowRouter(tags=["Connection"], prefix="/connections")

Expand Down Expand Up @@ -181,3 +186,44 @@ def patch_connection(
for key, val in data.items():
setattr(connection, key, val)
return ConnectionResponse.model_validate(connection, from_attributes=True)


@connections_router.post(
"/test",
responses=create_openapi_http_exception_doc(
[
status.HTTP_401_UNAUTHORIZED,
status.HTTP_403_FORBIDDEN,
]
),
)
def test_connection(
test_body: ConnectionBody,
) -> ConnectionTestResponse:
"""
Test an API connection.

This method first creates an in-memory transient conn_id & exports that to an env var,
as some hook classes tries to find out the `conn` from their __init__ method & errors out if not found.
It also deletes the conn id env variable after the test.
"""
if conf.get("core", "test_connection", fallback="Disabled").lower().strip() != "enabled":
raise HTTPException(
403,
"Testing connections is disabled in Airflow configuration. "
"Contact your deployment admin to enable it.",
)

transient_conn_id = get_random_string()
conn_env_var = f"{CONN_ENV_PREFIX}{transient_conn_id.upper()}"
try:
data = test_body.model_dump(by_alias=True)
data["conn_id"] = transient_conn_id
conn = Connection(**data)
os.environ[conn_env_var] = conn.get_uri()
test_status, test_message = conn.test_connection()
return ConnectionTestResponse.model_validate(
{"status": test_status, "message": test_message}, from_attributes=True
)
finally:
os.environ.pop(conn_env_var, None)
3 changes: 3 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,9 @@ export type BackfillServiceCreateBackfillMutationResult = Awaited<
export type ConnectionServicePostConnectionMutationResult = Awaited<
ReturnType<typeof ConnectionService.postConnection>
>;
export type ConnectionServiceTestConnectionMutationResult = Awaited<
ReturnType<typeof ConnectionService.testConnection>
>;
export type PoolServicePostPoolMutationResult = Awaited<
ReturnType<typeof PoolService.postPool>
>;
Expand Down
43 changes: 43 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1646,6 +1646,49 @@ export const useConnectionServicePostConnection = <
}) as unknown as Promise<TData>,
...options,
});
/**
* Test Connection
* Test an API connection.
*
* This method first creates an in-memory transient conn_id & exports that to an env var,
* as some hook classes tries to find out the `conn` from their __init__ method & errors out if not found.
* It also deletes the conn id env variable after the test.
* @param data The data for the request.
* @param data.requestBody
* @returns ConnectionTestResponse Successful Response
* @throws ApiError
*/
export const useConnectionServiceTestConnection = <
TData = Common.ConnectionServiceTestConnectionMutationResult,
TError = unknown,
TContext = unknown,
>(
options?: Omit<
UseMutationOptions<
TData,
TError,
{
requestBody: ConnectionBody;
},
TContext
>,
"mutationFn"
>,
) =>
useMutation<
TData,
TError,
{
requestBody: ConnectionBody;
},
TContext
>({
mutationFn: ({ requestBody }) =>
ConnectionService.testConnection({
requestBody,
}) as unknown as Promise<TData>,
...options,
});
/**
* Post Pool
* Create a Pool.
Expand Down
17 changes: 17 additions & 0 deletions airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,23 @@ export const $ConnectionResponse = {
description: "Connection serializer for responses.",
} as const;

export const $ConnectionTestResponse = {
properties: {
status: {
type: "boolean",
title: "Status",
},
message: {
type: "string",
title: "Message",
},
},
type: "object",
required: ["status", "message"],
title: "ConnectionTestResponse",
description: "Connection Test serializer for responses.",
} as const;

export const $DAGCollectionResponse = {
properties: {
dags: {
Expand Down
30 changes: 30 additions & 0 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ import type {
GetConnectionsResponse,
PostConnectionData,
PostConnectionResponse,
TestConnectionData,
TestConnectionResponse,
GetDagRunData,
GetDagRunResponse,
DeleteDagRunData,
Expand Down Expand Up @@ -772,6 +774,34 @@ export class ConnectionService {
},
});
}

/**
* Test Connection
* Test an API connection.
*
* This method first creates an in-memory transient conn_id & exports that to an env var,
* as some hook classes tries to find out the `conn` from their __init__ method & errors out if not found.
* It also deletes the conn id env variable after the test.
* @param data The data for the request.
* @param data.requestBody
* @returns ConnectionTestResponse Successful Response
* @throws ApiError
*/
public static testConnection(
data: TestConnectionData,
): CancelablePromise<TestConnectionResponse> {
return __request(OpenAPI, {
method: "POST",
url: "/public/connections/test",
body: data.requestBody,
mediaType: "application/json",
errors: {
401: "Unauthorized",
403: "Forbidden",
422: "Validation Error",
},
});
}
}

export class DagRunService {
Expand Down
37 changes: 37 additions & 0 deletions airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,14 @@ export type ConnectionResponse = {
extra: string | null;
};

/**
* Connection Test serializer for responses.
*/
export type ConnectionTestResponse = {
status: boolean;
message: string;
};

/**
* DAG Collection serializer for responses.
*/
Expand Down Expand Up @@ -1005,6 +1013,12 @@ export type PostConnectionData = {

export type PostConnectionResponse = ConnectionResponse;

export type TestConnectionData = {
requestBody: ConnectionBody;
};

export type TestConnectionResponse = ConnectionTestResponse;

export type GetDagRunData = {
dagId: string;
dagRunId: string;
Expand Down Expand Up @@ -1845,6 +1859,29 @@ export type $OpenApiTs = {
};
};
};
"/public/connections/test": {
post: {
req: TestConnectionData;
res: {
/**
* Successful Response
*/
200: ConnectionTestResponse;
/**
* Unauthorized
*/
401: HTTPExceptionResponse;
/**
* Forbidden
*/
403: HTTPExceptionResponse;
/**
* Validation Error
*/
422: HTTPValidationError;
};
};
};
"/public/dags/{dag_id}/dagRuns/{dag_run_id}": {
get: {
req: GetDagRunData;
Expand Down
Loading