Skip to content

Commit

Permalink
Fail task if FailedToNavigateToUrl (#209)
Browse files Browse the repository at this point in the history
  • Loading branch information
ykeremy committed Apr 19, 2024
1 parent d059619 commit 393b3fb
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 52 deletions.
2 changes: 2 additions & 0 deletions skyvern/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ def __init__(self, workflow_parameter_id: str) -> None:

class FailedToNavigateToUrl(SkyvernException):
def __init__(self, url: str, error_message: str) -> None:
self.url = url
self.error_message = error_message
super().__init__(f"Failed to navigate to url {url}. Error message: {error_message}")


Expand Down
141 changes: 89 additions & 52 deletions skyvern/forge/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from skyvern import analytics
from skyvern.exceptions import (
BrowserStateMissingPage,
FailedToNavigateToUrl,
FailedToSendWebhook,
InvalidWorkflowTaskURLState,
MissingBrowserStatePage,
Expand Down Expand Up @@ -309,6 +310,28 @@ async def execute_step(
step=step,
)
return step, detailed_output, next_step
except FailedToNavigateToUrl as e:
# Fail the task if we can't navigate to the URL and send the response
LOG.error(
"Failed to navigate to URL, marking task as failed, and sending webhook response",
task_id=task.task_id,
step_id=step.step_id,
url=e.url,
error_message=e.error_message,
)
task = await self.update_task(
task,
status=TaskStatus.failed,
failure_reason=f"Failed to navigate to URL. URL:{e.url}, Error:{e.error_message}",
)
await self.send_task_response(
task=task,
last_step=step,
api_key=api_key,
close_browser_on_completion=close_browser_on_completion,
skip_artifacts=True,
)
return step, detailed_output, next_step

async def agent_step(
self,
Expand Down Expand Up @@ -711,6 +734,7 @@ async def send_task_response(
last_step: Step,
api_key: str | None = None,
close_browser_on_completion: bool = True,
skip_artifacts: bool = False,
) -> None:
"""
send the task response to the webhook callback url
Expand All @@ -727,6 +751,14 @@ async def send_task_response(
task = refreshed_task
# log the task status as an event
analytics.capture("skyvern-oss-agent-task-status", {"status": task.status})
# We skip the artifacts and send the webhook response directly only when there is an issue with the browser
# initialization. In this case, we don't have any artifacts to send and we can't take final screenshots etc.
# since the browser is not initialized properly or the proxy is not working.
if skip_artifacts:
await app.ARTIFACT_MANAGER.wait_for_upload_aiotasks_for_task(task.task_id)
await self.execute_task_webhook(task=task, last_step=last_step, api_key=api_key)
return

# Take one last screenshot and create an artifact before closing the browser to see the final state
browser_state: BrowserState = await app.BROWSER_MANAGER.get_or_create_for_task(task)
await browser_state.get_or_create_page()
Expand Down Expand Up @@ -760,7 +792,9 @@ async def send_task_response(

await self.execute_task_webhook(task=task, last_step=last_step, api_key=api_key)

async def execute_task_webhook(self, task: Task, last_step: Step, api_key: str | None) -> None:
async def execute_task_webhook(
self, task: Task, last_step: Step, api_key: str | None, skip_artifacts: bool = False
) -> None:
if not api_key:
LOG.warning(
"Request has no api key. Not sending task response",
Expand All @@ -775,65 +809,68 @@ async def execute_task_webhook(self, task: Task, last_step: Step, api_key: str |
)
return

# get the artifact of the screenshot and get the screenshot_url
screenshot_artifact = await app.DATABASE.get_artifact(
task_id=task.task_id,
step_id=last_step.step_id,
artifact_type=ArtifactType.SCREENSHOT_FINAL,
organization_id=task.organization_id,
)
screenshot_url = None
if screenshot_artifact:
screenshot_url = await app.ARTIFACT_MANAGER.get_share_link(screenshot_artifact)
if not skip_artifacts:
# get the artifact of the screenshot and get the screenshot_url
screenshot_artifact = await app.DATABASE.get_artifact(
task_id=task.task_id,
step_id=last_step.step_id,
artifact_type=ArtifactType.SCREENSHOT_FINAL,
organization_id=task.organization_id,
)
screenshot_url = None
if screenshot_artifact:
screenshot_url = await app.ARTIFACT_MANAGER.get_share_link(screenshot_artifact)

recording_artifact = await app.DATABASE.get_artifact(
task_id=task.task_id,
step_id=last_step.step_id,
artifact_type=ArtifactType.RECORDING,
organization_id=task.organization_id,
)
recording_url = None
if recording_artifact:
recording_url = await app.ARTIFACT_MANAGER.get_share_link(recording_artifact)
recording_artifact = await app.DATABASE.get_artifact(
task_id=task.task_id,
step_id=last_step.step_id,
artifact_type=ArtifactType.RECORDING,
organization_id=task.organization_id,
)
recording_url = None
if recording_artifact:
recording_url = await app.ARTIFACT_MANAGER.get_share_link(recording_artifact)

# get the artifact of the last TASK_RESPONSE_ACTION_SCREENSHOT_COUNT screenshots and get the screenshot_url
latest_action_screenshot_artifacts = await app.DATABASE.get_latest_n_artifacts(
task_id=task.task_id,
organization_id=task.organization_id,
artifact_types=[ArtifactType.SCREENSHOT_ACTION],
n=SettingsManager.get_settings().TASK_RESPONSE_ACTION_SCREENSHOT_COUNT,
)
latest_action_screenshot_urls = []
if latest_action_screenshot_artifacts:
for artifact in latest_action_screenshot_artifacts:
screenshot_url = await app.ARTIFACT_MANAGER.get_share_link(artifact)
if screenshot_url:
latest_action_screenshot_urls.append(screenshot_url)
else:
LOG.error(
"Failed to get share link for action screenshot",
artifact_id=artifact.artifact_id,
)
else:
LOG.error("Failed to get latest action screenshots")
# get the artifact of the last TASK_RESPONSE_ACTION_SCREENSHOT_COUNT screenshots and get the screenshot_url
latest_action_screenshot_artifacts = await app.DATABASE.get_latest_n_artifacts(
task_id=task.task_id,
organization_id=task.organization_id,
artifact_types=[ArtifactType.SCREENSHOT_ACTION],
n=SettingsManager.get_settings().TASK_RESPONSE_ACTION_SCREENSHOT_COUNT,
)
latest_action_screenshot_urls = []
if latest_action_screenshot_artifacts:
for artifact in latest_action_screenshot_artifacts:
screenshot_url = await app.ARTIFACT_MANAGER.get_share_link(artifact)
if screenshot_url:
latest_action_screenshot_urls.append(screenshot_url)
else:
LOG.error(
"Failed to get share link for action screenshot",
artifact_id=artifact.artifact_id,
)
else:
LOG.error("Failed to get latest action screenshots")

# get the latest task from the db to get the latest status, extracted_information, and failure_reason
task_from_db = await app.DATABASE.get_task(task_id=task.task_id, organization_id=task.organization_id)
if not task_from_db:
LOG.error("Failed to get task from db when sending task response")
raise TaskNotFound(task_id=task.task_id)

# get the latest task from the db to get the latest status, extracted_information, and failure_reason
task_from_db = await app.DATABASE.get_task(task_id=task.task_id, organization_id=task.organization_id)
if not task_from_db:
LOG.error("Failed to get task from db when sending task response")
raise TaskNotFound(task_id=task.task_id)
task = task_from_db
task_response = task.to_task_response(
action_screenshot_urls=latest_action_screenshot_urls,
screenshot_url=screenshot_url,
recording_url=recording_url,
)
else:
task_response = task.to_task_response()

task = task_from_db
if not task.webhook_callback_url:
LOG.info("Task has no webhook callback url. Not sending task response")
return

task_response = task.to_task_response(
action_screenshot_urls=latest_action_screenshot_urls,
screenshot_url=screenshot_url,
recording_url=recording_url,
)

# send task_response to the webhook callback url
# TODO: use async requests (httpx)
timestamp = str(int(datetime.utcnow().timestamp()))
Expand Down

0 comments on commit 393b3fb

Please sign in to comment.