Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 79 additions & 9 deletions pyobas/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

REDIRECT_MSG = (
"pyobas detected a {status_code} ({reason!r}) redirection. You must update "
"your OpenBVAS URL to the correct URL to avoid issues. The redirection was from: "
"your OpenBAS URL to the correct URL to avoid issues. The redirection was from: "
"{source!r} to {target!r}"
)

Expand Down Expand Up @@ -202,25 +202,95 @@ def http_request(
if 200 <= result.status_code < 300:
return result.response

error_message = result.content
# Extract a meaningful error message from the server response
error_message: Any = None

# First, try to get the raw text content
try:
error_json = result.json()
for k in ("message", "error"):
if k in error_json:
error_message = error_json[k]
except (KeyError, ValueError, TypeError):
raw_text = result.content.decode("utf-8", errors="ignore").strip()
# If it's a simple text message (not JSON), use it directly
if (
raw_text
and not raw_text.startswith("{")
and not raw_text.startswith("[")
):
error_message = raw_text[:500]
except Exception:
pass

# If we don't have a message yet, try JSON parsing
if not error_message:
try:
error_json = result.json()
# Common fields
if isinstance(error_json, dict):
# First priority: look for a 'message' field (most specific)
if "message" in error_json:
error_message = error_json.get("message")
elif "execution_message" in error_json:
error_message = error_json.get("execution_message")
elif "error" in error_json:
err = error_json.get("error")
if isinstance(err, dict) and "message" in err:
error_message = err.get("message")
elif err and err not in [
"Internal Server Error",
"Bad Request",
"Not Found",
"Unauthorized",
"Forbidden",
]:
# Only use 'error' field if it's not a generic HTTP status
error_message = str(err)
elif "errors" in error_json:
errs = error_json.get("errors")
if isinstance(errs, list) and errs:
# Join any messages in the list
messages = []
for item in errs:
if isinstance(item, dict) and "message" in item:
messages.append(str(item.get("message")))
else:
messages.append(str(item))
error_message = "; ".join(messages)
elif isinstance(error_json, str):
error_message = error_json
# Fallback to serialized json if we still have nothing
if not error_message:
error_message = utils.json_dumps(error_json)[:500]
except Exception:
# If JSON parsing fails, use the raw text we might have
if not error_message:
try:
error_message = result.response.text[:500]
except Exception:
try:
error_message = result.content.decode(errors="ignore")[
:500
]
except Exception:
error_message = str(result.content)[:500]

# If still no message or a generic HTTP status, use status text
if not error_message or error_message == result.response.reason:
error_message = result.response.reason or "Unknown error"

if result.status_code == 401:
raise exceptions.OpenBASAuthenticationError(
response_code=result.status_code,
error_message=error_message,
error_message=error_message or "Authentication failed",
response_body=result.content,
)

# Use the extracted error message, not the HTTP reason
final_error_message = error_message
if not final_error_message or final_error_message == result.response.reason:
# Only use HTTP reason as last resort
final_error_message = result.response.reason or "Unknown error"

raise exceptions.OpenBASHttpError(
response_code=result.status_code,
error_message=error_message,
error_message=final_error_message,
response_body=result.content,
)

Expand Down
85 changes: 83 additions & 2 deletions pyobas/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,90 @@ def __init__(
self.error_message = error_message

def __str__(self) -> str:
# Start with the provided error message
message = self.error_message

# List of generic HTTP status messages that indicate we should look deeper
generic_messages = (
"Internal Server Error",
"Bad Request",
"Not Found",
"Unauthorized",
"Forbidden",
"Service Unavailable",
"Gateway Timeout",
"Unknown error",
)

# Only try to extract from response body if message is truly generic
# Don't override if we already have a specific error message
if (
not message or (message in generic_messages and len(message) < 30)
) and self.response_body is not None:
try:
import json

body = self.response_body.decode(errors="ignore")
data = json.loads(body)
extracted_msg = None

if isinstance(data, dict):
# Try various common error fields
if "error" in data:
err = data.get("error")
if isinstance(err, dict) and "message" in err:
extracted_msg = err.get("message")
elif isinstance(err, str):
extracted_msg = err
elif "message" in data:
extracted_msg = data.get("message")
elif "execution_message" in data:
extracted_msg = data.get("execution_message")
elif "detail" in data:
extracted_msg = data.get("detail")
elif "errors" in data:
errs = data.get("errors")
if isinstance(errs, list) and errs:
# Join any messages in the list
parts = []
for item in errs:
if isinstance(item, dict) and "message" in item:
parts.append(str(item.get("message")))
else:
parts.append(str(item))
extracted_msg = "; ".join(parts)
elif isinstance(errs, str):
extracted_msg = errs

# Use extracted message if it's better than what we have
if extracted_msg and extracted_msg not in generic_messages:
message = str(extracted_msg)
elif not message:
# Last resort: use the raw body
message = body[:500]

except json.JSONDecodeError:
# Not JSON, use raw text if we don't have a good message
if not message or message in generic_messages:
try:
decoded = self.response_body.decode(errors="ignore")[:500]
if decoded and decoded not in generic_messages:
message = decoded
except Exception:
pass
except Exception:
pass

# Final fallback
if not message:
message = "Unknown error"

# Clean up the message - remove extra whitespace and newlines
message = " ".join(message.split())

if self.response_code is not None:
return f"{self.response_code}: {self.error_message}"
return f"{self.error_message}"
return f"{self.response_code}: {message}"
return f"{message}"


class OpenBASAuthenticationError(OpenBASError):
Expand Down