Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ def submit_indexing_job(

self.log.info("Druid ingestion spec: %s", json_index_spec)
req_index = requests.post(
url, data=json_index_spec, headers=self.header, auth=self.get_auth(), verify=self.get_verify()
url, data=json_index_spec, headers=self.header, auth=self.get_auth(), verify=self.get_verify(),
timeout=30,
)

code = req_index.status_code
Expand All @@ -165,14 +166,15 @@ def submit_indexing_job(

sec = 0
while running:
req_status = requests.get(druid_task_status_url, auth=self.get_auth(), verify=self.get_verify())
req_status = requests.get(druid_task_status_url, auth=self.get_auth(), verify=self.get_verify(), timeout=30)

self.log.info("Job still running for %s seconds...", sec)

if self.max_ingestion_time and sec > self.max_ingestion_time:
# ensure that the job gets killed if the max ingestion time is exceeded
requests.post(
f"{url}/{druid_task_id}/shutdown", auth=self.get_auth(), verify=self.get_verify()
f"{url}/{druid_task_id}/shutdown", auth=self.get_auth(), verify=self.get_verify(),
timeout=30,
)
raise AirflowException(f"Druid ingestion took more than {self.max_ingestion_time} seconds")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -778,7 +778,7 @@ def check_kueue_deployment_running(
@staticmethod
def get_yaml_content_from_file(kueue_yaml_url) -> list[dict]:
"""Download content of YAML file and separate it into several dictionaries."""
response = requests.get(kueue_yaml_url, allow_redirects=True)
response = requests.get(kueue_yaml_url, allow_redirects=True, timeout=30)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be okay to apply 30s timeout across K8s provider as usually all calls should be short. But this here is a simple search&add. The K8s provider uses a K8s API Client, is a timeout should be applied (and is not already) then it needs to be passed down to API client (where a generic retry layer was added recently)

if response.status_code != 200:
raise AirflowException("Was not able to read the yaml file from given URL")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ def __init__(self, appbuilder):
def _get_authentik_jwks(self, jwks_url) -> dict:
import requests

resp = requests.get(jwks_url)
resp = requests.get(jwks_url, timeout=30)
if resp.status_code == 200:
return resp.json()
return {}
Expand Down Expand Up @@ -2326,7 +2326,7 @@ def _rotate_session_id(self) -> None:
def _get_microsoft_jwks(self) -> list[dict[str, Any]]:
import requests

return requests.get(MICROSOFT_KEY_SET_URL).json()
return requests.get(MICROSOFT_KEY_SET_URL, timeout=30).json()

def _decode_and_validate_azure_jwt(self, id_token: str) -> dict[str, str]:
verify_signature = self.oauth_remotes["azure"].client_kwargs.get("verify_signature", False)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def get_jobs_for_job_group(self, job_id: int) -> dict[str, Any]:
"""
endpoint_path = f"{self.api_version}/jobGroups/{job_id}/jobs"
url: str = urljoin(self._base_url, endpoint_path)
response = requests.get(url, headers=self._headers)
response = requests.get(url, headers=self._headers, timeout=30)
self._raise_for_status(response)
return response.json()

Expand All @@ -117,7 +117,7 @@ def get_job_group(self, job_group_id: int, embed: str, include_deleted: bool) ->
params: dict[str, Any] = {"embed": embed, "includeDeleted": include_deleted}
endpoint_path = f"{self.api_version}/jobGroups/{job_group_id}"
url: str = urljoin(self._base_url, endpoint_path)
response = requests.get(url, headers=self._headers, params=params)
response = requests.get(url, headers=self._headers, params=params, timeout=30)
self._raise_for_status(response)
return response.json()

Expand All @@ -135,7 +135,7 @@ def run_job_group(self, body_request: dict) -> dict[str, Any]:
"""
endpoint_path = f"{self.api_version}/jobGroups"
url: str = urljoin(self._base_url, endpoint_path)
response = requests.post(url, headers=self._headers, data=json.dumps(body_request))
response = requests.post(url, headers=self._headers, data=json.dumps(body_request), timeout=30)
self._raise_for_status(response)
return response.json()

Expand All @@ -149,7 +149,7 @@ def create_flow(self, *, body_request: dict) -> dict:
"""
endpoint = f"/{self.api_version}/flows"
url: str = urljoin(self._base_url, endpoint)
response = requests.post(url, headers=self._headers, data=json.dumps(body_request))
response = requests.post(url, headers=self._headers, data=json.dumps(body_request), timeout=30)
self._raise_for_status(response)
return response.json()

Expand All @@ -172,7 +172,7 @@ def copy_flow(
"description": description,
"copyDatasources": copy_datasources,
}
response = requests.post(url, headers=self._headers, data=json.dumps(body_request))
response = requests.post(url, headers=self._headers, data=json.dumps(body_request), timeout=30)
self._raise_for_status(response)
return response.json()

Expand All @@ -185,7 +185,7 @@ def delete_flow(self, *, flow_id: int) -> None:
"""
endpoint_path = f"{self.api_version}/flows/{flow_id}"
url: str = urljoin(self._base_url, endpoint_path)
response = requests.delete(url, headers=self._headers)
response = requests.delete(url, headers=self._headers, timeout=30)
self._raise_for_status(response)

@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
Expand All @@ -198,7 +198,7 @@ def run_flow(self, *, flow_id: int, body_request: dict) -> dict:
"""
endpoint = f"{self.api_version}/flows/{flow_id}/run"
url: str = urljoin(self._base_url, endpoint)
response = requests.post(url, headers=self._headers, data=json.dumps(body_request))
response = requests.post(url, headers=self._headers, data=json.dumps(body_request), timeout=30)
self._raise_for_status(response)
return response.json()

Expand All @@ -211,7 +211,7 @@ def get_job_group_status(self, *, job_group_id: int) -> JobGroupStatuses:
"""
endpoint = f"/{self.api_version}/jobGroups/{job_group_id}/status"
url: str = urljoin(self._base_url, endpoint)
response = requests.get(url, headers=self._headers)
response = requests.get(url, headers=self._headers, timeout=30)
self._raise_for_status(response)
return response.json()

Expand All @@ -232,7 +232,7 @@ def create_imported_dataset(self, *, body_request: dict) -> dict:
"""
endpoint = f"/{self.api_version}/importedDatasets"
url: str = urljoin(self._base_url, endpoint)
response = requests.post(url, headers=self._headers, data=json.dumps(body_request))
response = requests.post(url, headers=self._headers, data=json.dumps(body_request), timeout=30)
self._raise_for_status(response)
return response.json()

Expand All @@ -247,7 +247,7 @@ def create_wrangled_dataset(self, *, body_request: dict) -> dict:
"""
endpoint = f"/{self.api_version}/wrangledDatasets"
url: str = urljoin(self._base_url, endpoint)
response = requests.post(url, headers=self._headers, data=json.dumps(body_request))
response = requests.post(url, headers=self._headers, data=json.dumps(body_request), timeout=30)
self._raise_for_status(response)
return response.json()

Expand All @@ -262,7 +262,7 @@ def create_output_object(self, *, body_request: dict) -> dict:
"""
endpoint = f"/{self.api_version}/outputObjects"
url: str = urljoin(self._base_url, endpoint)
response = requests.post(url, headers=self._headers, data=json.dumps(body_request))
response = requests.post(url, headers=self._headers, data=json.dumps(body_request), timeout=30)
self._raise_for_status(response)
return response.json()

Expand All @@ -277,7 +277,7 @@ def create_write_settings(self, *, body_request: dict) -> dict:
"""
endpoint = f"/{self.api_version}/writeSettings"
url: str = urljoin(self._base_url, endpoint)
response = requests.post(url, headers=self._headers, data=json.dumps(body_request))
response = requests.post(url, headers=self._headers, data=json.dumps(body_request), timeout=30)
self._raise_for_status(response)
return response.json()

Expand All @@ -290,5 +290,5 @@ def delete_imported_dataset(self, *, dataset_id: int) -> None:
"""
endpoint = f"/{self.api_version}/importedDatasets/{dataset_id}"
url: str = urljoin(self._base_url, endpoint)
response = requests.delete(url, headers=self._headers)
response = requests.delete(url, headers=self._headers, timeout=30)
self._raise_for_status(response)
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def deploy_function(self, overwrite_function_if_exist: bool, body: dict[str, Any
else:
url = self.get_conn().host + self.DEPLOY_FUNCTION
self.log.info("Deploying function %s", url)
response = requests.post(url, body)
response = requests.post(url, body, timeout=30)
if response.status_code != OK_STATUS_CODE:
self.log.error("Response status %d", response.status_code)
self.log.error("Failed to deploy")
Expand All @@ -82,7 +82,7 @@ def invoke_async_function(self, body: dict[str, Any]) -> None:
"""Invoke function asynchronously."""
url = self.get_conn().host + self.INVOKE_ASYNC_FUNCTION + self.function_name
self.log.info("Invoking function asynchronously %s", url)
response = requests.post(url, body)
response = requests.post(url, body, timeout=30)
if response.ok:
self.log.info("Invoked %s", self.function_name)
else:
Expand All @@ -93,7 +93,7 @@ def invoke_function(self, body: dict[str, Any]) -> None:
"""Invoke function synchronously. This will block until function completes and returns."""
url = self.get_conn().host + self.INVOKE_FUNCTION + self.function_name
self.log.info("Invoking function synchronously %s", url)
response = requests.post(url, body)
response = requests.post(url, body, timeout=30)
if response.ok:
self.log.info("Invoked %s", self.function_name)
self.log.info("Response code %s", response.status_code)
Expand All @@ -106,7 +106,7 @@ def update_function(self, body: dict[str, Any]) -> None:
"""Update OpenFaaS function."""
url = self.get_conn().host + self.UPDATE_FUNCTION
self.log.info("Updating function %s", url)
response = requests.put(url, body)
response = requests.put(url, body, timeout=30)
if response.status_code != OK_STATUS_CODE:
self.log.error("Response status %d", response.status_code)
self.log.error("Failed to update response %s", response.content.decode("utf-8"))
Expand All @@ -117,7 +117,7 @@ def does_function_exist(self) -> bool:
"""Whether OpenFaaS function exists or not."""
url = self.get_conn().host + self.GET_FUNCTION + self.function_name

response = requests.get(url)
response = requests.get(url, timeout=30)
if response.ok:
return True
self.log.error("Failed to find function %s", self.function_name)
Expand Down