Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 0 additions & 56 deletions aixplain/modules/pipeline/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,35 +161,6 @@ def poll(self, poll_url: Text, name: Text = "pipeline_process") -> Dict:
resp = {"status": "FAILED"}
return resp

def _should_fallback_to_v2(self, response: Dict, version: str) -> bool:
"""Determine if the pipeline should fallback to version 2.0 based on the response.

Args:
response (Dict): The response from the pipeline call.
version (str): The version of the pipeline being used.

Returns:
bool: True if fallback is needed, False otherwise.
"""
# If the version is not 3.0, no fallback is needed
if version != self.VERSION_3_0:
return False

should_fallback = False
if "status" not in response or response["status"] == "FAILED":
should_fallback = True
elif response["status"] == "SUCCESS" and ("data" not in response or not response["data"]):
should_fallback = True
# Check for conditions that require a fallback

if should_fallback:
logging.warning(
f"Pipeline Run Error: Failed to run pipeline {self.id} with version {version}. "
f"Trying with version {self.VERSION_2_0}."
)

return should_fallback

def run(
self,
data: Union[Text, Dict],
Expand All @@ -198,7 +169,6 @@ def run(
timeout: float = 20000.0,
wait_time: float = 1.0,
batch_mode: bool = True,
version: str = None,
**kwargs,
) -> Dict:
"""Runs a pipeline call.
Expand All @@ -215,15 +185,13 @@ def run(
Returns:
Dict: parsed output from pipeline
"""
version = version or self.VERSION_3_0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be VERSION_3_0 VERSION_2_0 constants at somewhere if you also want to remove.

start = time.time()
try:
response = self.run_async(
data,
data_asset=data_asset,
name=name,
batch_mode=batch_mode,
version=version,
**kwargs,
)

Expand All @@ -236,16 +204,6 @@ def run(
end = time.time()
response = self.__polling(poll_url, name=name, timeout=timeout, wait_time=wait_time)

if self._should_fallback_to_v2(response, version):
return self.run(
data,
data_asset=data_asset,
name=name,
batch_mode=batch_mode,
version=self.VERSION_2_0,
**kwargs,
)
response["version"] = version
return response
except Exception as e:
error_message = f"Error in request for {name}: {str(e)}"
Expand All @@ -256,7 +214,6 @@ def run(
"status": "FAILED",
"error": error_message,
"elapsed_time": end - start,
"version": version,
}

def __prepare_payload(
Expand Down Expand Up @@ -373,7 +330,6 @@ def run_async(
data_asset: Optional[Union[Text, Dict]] = None,
name: Text = "pipeline_process",
batch_mode: bool = True,
version: str = None,
**kwargs,
) -> Dict:
"""Runs asynchronously a pipeline call.
Expand All @@ -388,15 +344,13 @@ def run_async(
Returns:
Dict: polling URL in response
"""
version = version or self.VERSION_3_0
headers = {
"x-api-key": self.api_key,
"Content-Type": "application/json",
}

payload = self.__prepare_payload(data=data, data_asset=data_asset)
payload["batchmode"] = batch_mode
payload["version"] = version
payload.update(kwargs)
payload = json.dumps(payload)
call_url = f"{self.url}/{self.id}"
Expand Down Expand Up @@ -433,16 +387,6 @@ def run_async(
if resp is not None:
response["error"] = resp

if self._should_fallback_to_v2(response, version):
return self.run_async(
data,
data_asset=data_asset,
name=name,
batch_mode=batch_mode,
version=self.VERSION_2_0,
**kwargs,
)
response["version"] = version
return response

def update(
Expand Down
15 changes: 0 additions & 15 deletions tests/functional/pipelines/fallback_test.py

This file was deleted.