diff --git a/src/bentoml/_internal/cloud/bentocloud.py b/src/bentoml/_internal/cloud/bentocloud.py index 169c6469e34..9118139f455 100644 --- a/src/bentoml/_internal/cloud/bentocloud.py +++ b/src/bentoml/_internal/cloud/bentocloud.py @@ -10,7 +10,7 @@ from tempfile import NamedTemporaryFile import fs -import requests +import httpx from rich.live import Live from simple_di import Provide from simple_di import inject @@ -266,7 +266,7 @@ def filter_( ) try: if presigned_upload_url is not None: - resp = requests.put(presigned_upload_url, data=tar_io) + resp = httpx.put(presigned_upload_url, content=tar_io) if resp.status_code != 200: finish_req = FinishUploadBentoSchema( status=BentoUploadStatus.FAILED, @@ -321,8 +321,8 @@ def chunk_upload( ) with CallbackIOWrapper(chunk, read_cb=io_cb) as chunk_io: - resp = requests.put( - remote_bento.presigned_upload_url, data=chunk_io + resp = httpx.put( + remote_bento.presigned_upload_url, content=chunk_io ) if resp.status_code != 200: return FinishUploadBentoSchema( @@ -510,27 +510,28 @@ def pull_model(model_tag: Tag): name, version ) presigned_download_url = remote_bento.presigned_download_url - response = requests.get(presigned_download_url, stream=True) - if response.status_code != 200: - raise BentoMLException( - f'Failed to download bento "{_tag}": {response.text}' - ) - total_size_in_bytes = int(response.headers.get("content-length", 0)) - block_size = 1024 # 1 Kibibyte with NamedTemporaryFile() as tar_file: - self.transmission_progress.update( - download_task_id, - completed=0, - total=total_size_in_bytes, - visible=True, - ) - self.transmission_progress.start_task(download_task_id) - for data in response.iter_content(block_size): + with httpx.stream("GET", presigned_download_url) as response: + if response.status_code != 200: + raise BentoMLException( + f'Failed to download bento "{_tag}": {response.text}' + ) + total_size_in_bytes = int(response.headers.get("content-length", 0)) + block_size = 1024 # 1 Kibibyte self.transmission_progress.update( - download_task_id, advance=len(data) + download_task_id, + completed=0, + total=total_size_in_bytes, + visible=True, ) - tar_file.write(data) + self.transmission_progress.start_task(download_task_id) + for data in response.iter_bytes(block_size): + self.transmission_progress.update( + download_task_id, advance=len(data) + ) + tar_file.write(data) + self.log_progress.add_task( f'[bold green]Finished downloading all bento "{_tag}" files' ) @@ -707,7 +708,7 @@ def io_cb(x: int): ) try: if presigned_upload_url is not None: - resp = requests.put(presigned_upload_url, data=tar_io) + resp = httpx.put(presigned_upload_url, content=tar_io) if resp.status_code != 200: finish_req = FinishUploadModelSchema( status=ModelUploadStatus.FAILED, @@ -763,8 +764,8 @@ def chunk_upload( ) with CallbackIOWrapper(chunk, read_cb=io_cb) as chunk_io: - resp = requests.put( - remote_model.presigned_upload_url, data=chunk_io + resp = httpx.put( + remote_model.presigned_upload_url, content=chunk_io ) if resp.status_code != 200: return FinishUploadModelSchema( @@ -833,6 +834,7 @@ def chunk_upload( version=version, req=finish_req, ) + if finish_req.status != ModelUploadStatus.SUCCESS: self.log_progress.add_task( f'[bold red]Failed pushing model "{model.tag}" : {finish_req.reason}' @@ -954,25 +956,28 @@ def _do_pull_model( ) presigned_download_url = remote_model.presigned_download_url - response = requests.get(presigned_download_url, stream=True) - if response.status_code != 200: - raise BentoMLException( - f'Failed to download model "{_tag}": {response.text}' + with NamedTemporaryFile() as tar_file: + with httpx.stream("GET", presigned_download_url) as response: + if response.status_code != 200: + raise BentoMLException( + f'Failed to download model "{_tag}": {response.text}' + ) + + total_size_in_bytes = int(response.headers.get("content-length", 0)) + block_size = 1024 # 1 Kibibyte + self.transmission_progress.update( + download_task_id, + description=f'Downloading model "{_tag}"', + total=total_size_in_bytes, + visible=True, ) + self.transmission_progress.start_task(download_task_id) + for data in response.iter_bytes(block_size): + self.transmission_progress.update( + download_task_id, advance=len(data) + ) + tar_file.write(data) - total_size_in_bytes = int(response.headers.get("content-length", 0)) - block_size = 1024 # 1 Kibibyte - with NamedTemporaryFile() as tar_file: - self.transmission_progress.update( - download_task_id, - description=f'Downloading model "{_tag}"', - total=total_size_in_bytes, - visible=True, - ) - self.transmission_progress.start_task(download_task_id) - for data in response.iter_content(block_size): - self.transmission_progress.update(download_task_id, advance=len(data)) - tar_file.write(data) self.log_progress.add_task( f'[bold green]Finished downloading model "{_tag}" files' ) diff --git a/src/bentoml/_internal/cloud/client.py b/src/bentoml/_internal/cloud/client.py index 60e1fb49d96..3de0e8ed358 100644 --- a/src/bentoml/_internal/cloud/client.py +++ b/src/bentoml/_internal/cloud/client.py @@ -4,7 +4,7 @@ import typing as t from urllib.parse import urljoin -import requests +import httpx from ...exceptions import CloudRESTApiClientError from ..configuration import BENTOML_VERSION @@ -41,7 +41,7 @@ class RestApiClient: def __init__(self, endpoint: str, api_token: str) -> None: self.endpoint = endpoint - self.session = requests.Session() + self.session = httpx.Client() self.session.headers.update( { "X-YATAI-API-TOKEN": api_token, @@ -50,7 +50,7 @@ def __init__(self, endpoint: str, api_token: str) -> None: } ) - def _is_not_found(self, resp: requests.Response) -> bool: + def _is_not_found(self, resp: httpx.Response) -> bool: # We used to return 400 for record not found, handle both cases return ( resp.status_code == 404 @@ -58,7 +58,7 @@ def _is_not_found(self, resp: requests.Response) -> bool: and "record not found" in resp.text ) - def _check_resp(self, resp: requests.Response) -> None: + def _check_resp(self, resp: httpx.Response) -> None: if resp.status_code != 200: raise CloudRESTApiClientError( f"request failed with status code {resp.status_code}: {resp.text}" @@ -96,7 +96,7 @@ def create_bento_repository( self, req: CreateBentoRepositorySchema ) -> BentoRepositorySchema: url = urljoin(self.endpoint, "/api/v1/bento_repositories") - resp = self.session.post(url, data=schema_to_json(req)) + resp = self.session.post(url, content=schema_to_json(req)) self._check_resp(resp) return schema_from_json(resp.text, BentoRepositorySchema) @@ -117,7 +117,7 @@ def create_bento( url = urljoin( self.endpoint, f"/api/v1/bento_repositories/{bento_repository_name}/bentos" ) - resp = self.session.post(url, data=schema_to_json(req)) + resp = self.session.post(url, content=schema_to_json(req)) self._check_resp(resp) return schema_from_json(resp.text, BentoSchema) @@ -128,7 +128,7 @@ def update_bento( self.endpoint, f"/api/v1/bento_repositories/{bento_repository_name}/bentos/{version}", ) - resp = self.session.patch(url, data=schema_to_json(req)) + resp = self.session.patch(url, content=schema_to_json(req)) self._check_resp(resp) return schema_from_json(resp.text, BentoSchema) @@ -175,7 +175,7 @@ def presign_bento_multipart_upload_url( self.endpoint, f"/api/v1/bento_repositories/{bento_repository_name}/bentos/{version}/presign_multipart_upload_url", ) - resp = self.session.patch(url, data=schema_to_json(req)) + resp = self.session.patch(url, content=schema_to_json(req)) self._check_resp(resp) return schema_from_json(resp.text, BentoSchema) @@ -189,7 +189,7 @@ def complete_bento_multipart_upload( self.endpoint, f"/api/v1/bento_repositories/{bento_repository_name}/bentos/{version}/complete_multipart_upload", ) - resp = self.session.patch(url, data=schema_to_json(req)) + resp = self.session.patch(url, content=schema_to_json(req)) self._check_resp(resp) return schema_from_json(resp.text, BentoSchema) @@ -211,7 +211,7 @@ def finish_upload_bento( self.endpoint, f"/api/v1/bento_repositories/{bento_repository_name}/bentos/{version}/finish_upload", ) - resp = self.session.patch(url, data=schema_to_json(req)) + resp = self.session.patch(url, content=schema_to_json(req)) self._check_resp(resp) return schema_from_json(resp.text, BentoSchema) @@ -224,7 +224,7 @@ def upload_bento( ) resp = self.session.put( url, - data=data, + content=data, headers=dict( self.session.headers, **{"Content-Type": "application/octet-stream"} ), @@ -234,14 +234,14 @@ def upload_bento( def download_bento( self, bento_repository_name: str, version: str - ) -> requests.Response: + ) -> httpx.Response: url = urljoin( self.endpoint, f"/api/v1/bento_repositories/{bento_repository_name}/bentos/{version}/download", ) - resp = self.session.get(url, stream=True) - self._check_resp(resp) - return resp + with self.session.stream("GET", url) as resp: + self._check_resp(resp) + return resp def get_model_repository( self, model_repository_name: str @@ -259,7 +259,7 @@ def create_model_repository( self, req: CreateModelRepositorySchema ) -> ModelRepositorySchema: url = urljoin(self.endpoint, "/api/v1/model_repositories") - resp = self.session.post(url, data=schema_to_json(req)) + resp = self.session.post(url, content=schema_to_json(req)) self._check_resp(resp) return schema_from_json(resp.text, ModelRepositorySchema) @@ -280,7 +280,7 @@ def create_model( url = urljoin( self.endpoint, f"/api/v1/model_repositories/{model_repository_name}/models" ) - resp = self.session.post(url, data=schema_to_json(req)) + resp = self.session.post(url, content=schema_to_json(req)) self._check_resp(resp) return schema_from_json(resp.text, ModelSchema) @@ -327,7 +327,7 @@ def presign_model_multipart_upload_url( self.endpoint, f"/api/v1/model_repositories/{model_repository_name}/models/{version}/presign_multipart_upload_url", ) - resp = self.session.patch(url, data=schema_to_json(req)) + resp = self.session.patch(url, content=schema_to_json(req)) self._check_resp(resp) return schema_from_json(resp.text, ModelSchema) @@ -341,7 +341,7 @@ def complete_model_multipart_upload( self.endpoint, f"/api/v1/model_repositories/{model_repository_name}/models/{version}/complete_multipart_upload", ) - resp = self.session.patch(url, data=schema_to_json(req)) + resp = self.session.patch(url, content=schema_to_json(req)) self._check_resp(resp) return schema_from_json(resp.text, ModelSchema) @@ -363,7 +363,7 @@ def finish_upload_model( self.endpoint, f"/api/v1/model_repositories/{model_repository_name}/models/{version}/finish_upload", ) - resp = self.session.patch(url, data=schema_to_json(req)) + resp = self.session.patch(url, content=schema_to_json(req)) self._check_resp(resp) return schema_from_json(resp.text, ModelSchema) @@ -376,7 +376,7 @@ def upload_model( ) resp = self.session.put( url, - data=data, + content=data, headers=dict( self.session.headers, **{"Content-Type": "application/octet-stream"} ), @@ -386,14 +386,14 @@ def upload_model( def download_model( self, model_repository_name: str, version: str - ) -> requests.Response: + ) -> httpx.Response: url = urljoin( self.endpoint, f"/api/v1/model_repositories/{model_repository_name}/models/{version}/download", ) - resp = self.session.get(url, stream=True) - self._check_resp(resp) - return resp + with self.session.stream("GET", url) as resp: + self._check_resp(resp) + return resp def get_bento_repositories_list( self, bento_repository_name: str @@ -435,7 +435,7 @@ def create_deployment( self, cluster_name: str, create_schema: CreateDeploymentSchema ) -> DeploymentSchema | None: url = urljoin(self.endpoint, f"/api/v1/clusters/{cluster_name}/deployments") - resp = self.session.post(url, data=schema_to_json(create_schema)) + resp = self.session.post(url, content=schema_to_json(create_schema)) self._check_resp(resp) return schema_from_json(resp.text, DeploymentSchema) @@ -463,7 +463,7 @@ def update_deployment( self.endpoint, f"/api/v1/clusters/{cluster_name}/namespaces/{kube_namespace}/deployments/{deployment_name}", ) - resp = self.session.patch(url, data=schema_to_json(update_schema)) + resp = self.session.patch(url, content=schema_to_json(update_schema)) if self._is_not_found(resp): return None self._check_resp(resp) diff --git a/src/bentoml/_internal/cloud/yatai.py b/src/bentoml/_internal/cloud/yatai.py index 8f0c8b36a22..888d9c025ea 100644 --- a/src/bentoml/_internal/cloud/yatai.py +++ b/src/bentoml/_internal/cloud/yatai.py @@ -10,7 +10,7 @@ from tempfile import NamedTemporaryFile import fs -import requests +import httpx from rich.live import Live from simple_di import Provide from simple_di import inject @@ -254,7 +254,7 @@ def filter_( ) try: if presigned_upload_url is not None: - resp = requests.put(presigned_upload_url, data=tar_io) + resp = httpx.put(presigned_upload_url, data=tar_io) if resp.status_code != 200: finish_req = FinishUploadBentoSchema( status=BentoUploadStatus.FAILED, @@ -309,7 +309,7 @@ def chunk_upload( ) with CallbackIOWrapper(chunk, read_cb=io_cb) as chunk_io: - resp = requests.put( + resp = httpx.put( remote_bento.presigned_upload_url, data=chunk_io ) if resp.status_code != 200: @@ -498,27 +498,27 @@ def pull_model(model_tag: Tag): name, version ) presigned_download_url = remote_bento.presigned_download_url - response = requests.get(presigned_download_url, stream=True) - if response.status_code != 200: - raise BentoMLException( - f'Failed to download bento "{_tag}": {response.text}' - ) - total_size_in_bytes = int(response.headers.get("content-length", 0)) - block_size = 1024 # 1 Kibibyte with NamedTemporaryFile() as tar_file: - self.transmission_progress.update( - download_task_id, - completed=0, - total=total_size_in_bytes, - visible=True, - ) - self.transmission_progress.start_task(download_task_id) - for data in response.iter_content(block_size): + with httpx.stream("GET", presigned_download_url) as response: + if response.status_code != 200: + raise BentoMLException( + f'Failed to download bento "{_tag}": {response.text}' + ) + total_size_in_bytes = int(response.headers.get("content-length", 0)) + block_size = 1024 # 1 Kibibyte self.transmission_progress.update( - download_task_id, advance=len(data) + download_task_id, + completed=0, + total=total_size_in_bytes, + visible=True, ) - tar_file.write(data) + self.transmission_progress.start_task(download_task_id) + for data in response.iter_bytes(block_size): + self.transmission_progress.update( + download_task_id, advance=len(data) + ) + tar_file.write(data) self.log_progress.add_task( f'[bold green]Finished downloading all bento "{_tag}" files' ) @@ -690,7 +690,7 @@ def io_cb(x: int): ) try: if presigned_upload_url is not None: - resp = requests.put(presigned_upload_url, data=tar_io) + resp = httpx.put(presigned_upload_url, data=tar_io) if resp.status_code != 200: finish_req = FinishUploadModelSchema( status=ModelUploadStatus.FAILED, @@ -746,8 +746,8 @@ def chunk_upload( ) with CallbackIOWrapper(chunk, read_cb=io_cb) as chunk_io: - resp = requests.put( - remote_model.presigned_upload_url, data=chunk_io + resp = httpx.put( + remote_model.presigned_upload_url, content=chunk_io ) if resp.status_code != 200: return FinishUploadModelSchema( @@ -937,25 +937,26 @@ def _do_pull_model( ) presigned_download_url = remote_model.presigned_download_url - response = requests.get(presigned_download_url, stream=True) - if response.status_code != 200: - raise BentoMLException( - f'Failed to download model "{_tag}": {response.text}' - ) - - total_size_in_bytes = int(response.headers.get("content-length", 0)) - block_size = 1024 # 1 Kibibyte with NamedTemporaryFile() as tar_file: - self.transmission_progress.update( - download_task_id, - description=f'Downloading model "{_tag}"', - total=total_size_in_bytes, - visible=True, - ) - self.transmission_progress.start_task(download_task_id) - for data in response.iter_content(block_size): - self.transmission_progress.update(download_task_id, advance=len(data)) - tar_file.write(data) + with httpx.stream("GET", presigned_download_url) as response: + if response.status_code != 200: + raise BentoMLException( + f'Failed to download model "{_tag}": {response.text}' + ) + total_size_in_bytes = int(response.headers.get("content-length", 0)) + block_size = 1024 # 1 Kibibyte + self.transmission_progress.update( + download_task_id, + description=f'Downloading model "{_tag}"', + total=total_size_in_bytes, + visible=True, + ) + self.transmission_progress.start_task(download_task_id) + for data in response.iter_bytes(block_size): + self.transmission_progress.update( + download_task_id, advance=len(data) + ) + tar_file.write(data) self.log_progress.add_task( f'[bold green]Finished downloading model "{_tag}" files' )