Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: handle error for http format #3548

Merged
merged 6 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 0 additions & 1 deletion src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,6 @@ impl Server for HttpServer {
async fn handle_error(err: BoxError) -> Json<HttpResponse> {
error!(err; "Unhandled internal error");
Json(HttpResponse::Error(ErrorResponse::from_error_message(
ResponseFormat::GreptimedbV1,
StatusCode::Unexpected,
format!("Unhandled internal error: {err}"),
)))
Expand Down
43 changes: 20 additions & 23 deletions src/servers/src/http/arrow_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,49 +60,46 @@ async fn write_arrow_bytes(
}

impl ArrowResponse {
pub async fn from_output(mut outputs: Vec<crate::error::Result<Output>>) -> HttpResponse {
if outputs.len() != 1 {
pub async fn from_output(mut outputs: Vec<error::Result<Output>>) -> HttpResponse {
if outputs.len() > 1 {
return HttpResponse::Error(ErrorResponse::from_error_message(
ResponseFormat::Arrow,
StatusCode::InvalidArguments,
"Multi-statements and empty query are not allowed".to_string(),
"cannot output multi-statements result in arrow format".to_string(),
));
}

match outputs.remove(0) {
Ok(output) => match output.data {
OutputData::AffectedRows(_rows) => HttpResponse::Arrow(ArrowResponse {
match outputs.pop() {
None => HttpResponse::Arrow(ArrowResponse {
data: vec![],
execution_time_ms: 0,
}),
Some(Ok(output)) => match output.data {
OutputData::AffectedRows(_) => HttpResponse::Arrow(ArrowResponse {
data: vec![],
execution_time_ms: 0,
}),
OutputData::RecordBatches(recordbatches) => {
let schema = recordbatches.schema();
match write_arrow_bytes(recordbatches.as_stream(), schema.arrow_schema()).await
{
OutputData::RecordBatches(batches) => {
let schema = batches.schema();
match write_arrow_bytes(batches.as_stream(), schema.arrow_schema()).await {
Ok(payload) => HttpResponse::Arrow(ArrowResponse {
data: payload,
execution_time_ms: 0,
}),
Err(e) => {
HttpResponse::Error(ErrorResponse::from_error(ResponseFormat::Arrow, e))
}
Err(e) => HttpResponse::Error(ErrorResponse::from_error(e)),
}
}

OutputData::Stream(recordbatches) => {
let schema = recordbatches.schema();
match write_arrow_bytes(recordbatches, schema.arrow_schema()).await {
OutputData::Stream(batches) => {
let schema = batches.schema();
match write_arrow_bytes(batches, schema.arrow_schema()).await {
Ok(payload) => HttpResponse::Arrow(ArrowResponse {
data: payload,
execution_time_ms: 0,
}),
Err(e) => {
HttpResponse::Error(ErrorResponse::from_error(ResponseFormat::Arrow, e))
}
Err(e) => HttpResponse::Error(ErrorResponse::from_error(e)),
}
}
},
Err(e) => HttpResponse::Error(ErrorResponse::from_error(ResponseFormat::Arrow, e)),
Some(Err(e)) => HttpResponse::Error(ErrorResponse::from_error(e)),
}
}

Expand All @@ -127,7 +124,7 @@ impl IntoResponse for ArrowResponse {
),
(
&GREPTIME_DB_HEADER_FORMAT,
HeaderValue::from_static("ARROW"),
HeaderValue::from_static(ResponseFormat::Arrow.as_str()),
shuiyisong marked this conversation as resolved.
Show resolved Hide resolved
),
(
&GREPTIME_DB_HEADER_EXECUTION_TIME,
Expand Down
15 changes: 5 additions & 10 deletions src/servers/src/http/authorize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use session::context::QueryContextBuilder;
use snafu::{ensure, OptionExt, ResultExt};

use super::header::{GreptimeDbName, GREPTIME_TIMEZONE_HEADER_NAME};
use super::{ResponseFormat, PUBLIC_APIS};
use super::PUBLIC_APIS;
use crate::error::{
self, InvalidAuthHeaderInvisibleASCIISnafu, InvalidAuthHeaderSnafu, InvalidParameterSnafu,
NotFoundInfluxAuthSnafu, Result, UnsupportedAuthSchemeSnafu, UrlDecodeSnafu,
Expand Down Expand Up @@ -88,7 +88,7 @@ pub async fn inner_auth<B>(
crate::metrics::METRIC_AUTH_FAILURE
.with_label_values(&[e.status_code().as_ref()])
.inc();
return Err(err_response(is_influxdb_request(&req), e).into_response());
return Err(err_response(e));
}
};

Expand All @@ -112,7 +112,7 @@ pub async fn inner_auth<B>(
crate::metrics::METRIC_AUTH_FAILURE
.with_label_values(&[e.status_code().as_ref()])
.inc();
Err(err_response(is_influxdb_request(&req), e).into_response())
Err(err_response(e))
}
}
}
Expand All @@ -128,13 +128,8 @@ pub async fn check_http_auth<B>(
}
}

fn err_response(is_influxdb: bool, err: impl ErrorExt) -> impl IntoResponse {
let ty = if is_influxdb {
ResponseFormat::InfluxdbV1
} else {
ResponseFormat::GreptimedbV1
};
(StatusCode::UNAUTHORIZED, ErrorResponse::from_error(ty, err))
fn err_response(err: impl ErrorExt) -> Response {
(StatusCode::UNAUTHORIZED, ErrorResponse::from_error(err)).into_response()
}

pub fn extract_catalog_and_schema<B>(request: &Request<B>) -> (&str, &str) {
Expand Down
11 changes: 6 additions & 5 deletions src/servers/src/http/csv_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,13 @@ pub struct CsvResponse {

impl CsvResponse {
pub async fn from_output(outputs: Vec<crate::error::Result<Output>>) -> HttpResponse {
match handler::from_output(ResponseFormat::Csv, outputs).await {
match handler::from_output(outputs).await {
Err(err) => HttpResponse::Error(err),
Ok((output, _)) => {
if output.len() > 1 {
HttpResponse::Error(ErrorResponse::from_error_message(
ResponseFormat::Csv,
StatusCode::InvalidArguments,
"Multi-statements are not allowed".to_string(),
"cannot output multi-statements result in csv format".to_string(),
))
} else {
HttpResponse::Csv(CsvResponse {
Expand Down Expand Up @@ -100,8 +99,10 @@ impl IntoResponse for CsvResponse {
payload,
)
.into_response();
resp.headers_mut()
.insert(&GREPTIME_DB_HEADER_FORMAT, HeaderValue::from_static("CSV"));
resp.headers_mut().insert(
&GREPTIME_DB_HEADER_FORMAT,
HeaderValue::from_static(ResponseFormat::Csv.as_str()),
);
resp.headers_mut().insert(
&GREPTIME_DB_HEADER_EXECUTION_TIME,
HeaderValue::from(execution_time),
Expand Down
15 changes: 4 additions & 11 deletions src/servers/src/http/error_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,17 @@ use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

use crate::http::header::constants::GREPTIME_DB_HEADER_ERROR_CODE;
use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT};
use crate::http::ResponseFormat;
use crate::http::header::GREPTIME_DB_HEADER_EXECUTION_TIME;

#[derive(Serialize, Deserialize, Debug, JsonSchema)]
pub struct ErrorResponse {
#[serde(skip)]
ty: ResponseFormat,
code: u32,
error: String,
execution_time_ms: u64,
}

impl ErrorResponse {
pub fn from_error(ty: ResponseFormat, error: impl ErrorExt) -> Self {
pub fn from_error(error: impl ErrorExt) -> Self {
let code = error.status_code();

if code.should_log_error() {
Expand All @@ -44,12 +41,11 @@ impl ErrorResponse {
debug!("Failed to handle HTTP request, err: {:?}", error);
}

Self::from_error_message(ty, code, error.output_msg())
Self::from_error_message(code, error.output_msg())
}

pub fn from_error_message(ty: ResponseFormat, code: StatusCode, msg: String) -> Self {
pub fn from_error_message(code: StatusCode, msg: String) -> Self {
ErrorResponse {
ty,
code: code as u32,
error: msg,
execution_time_ms: 0,
Expand All @@ -76,14 +72,11 @@ impl ErrorResponse {

impl IntoResponse for ErrorResponse {
fn into_response(self) -> Response {
let ty = self.ty.as_str();
let code = self.code;
let execution_time = self.execution_time_ms;
let mut resp = Json(self).into_response();
resp.headers_mut()
.insert(GREPTIME_DB_HEADER_ERROR_CODE, HeaderValue::from(code));
resp.headers_mut()
.insert(&GREPTIME_DB_HEADER_FORMAT, HeaderValue::from_static(ty));
resp.headers_mut().insert(
&GREPTIME_DB_HEADER_EXECUTION_TIME,
HeaderValue::from(execution_time),
Expand Down
4 changes: 2 additions & 2 deletions src/servers/src/http/greptime_result_v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub struct GreptimedbV1Response {

impl GreptimedbV1Response {
pub async fn from_output(outputs: Vec<crate::error::Result<Output>>) -> HttpResponse {
match handler::from_output(ResponseFormat::GreptimedbV1, outputs).await {
match handler::from_output(outputs).await {
Ok((output, resp_metrics)) => HttpResponse::GreptimedbV1(Self {
output,
execution_time_ms: 0,
Expand Down Expand Up @@ -77,7 +77,7 @@ impl IntoResponse for GreptimedbV1Response {

resp.headers_mut().insert(
&GREPTIME_DB_HEADER_FORMAT,
HeaderValue::from_static("greptimedb_v1"),
HeaderValue::from_static(ResponseFormat::GreptimedbV1.as_str()),
);
resp.headers_mut().insert(
&GREPTIME_DB_HEADER_EXECUTION_TIME,
Expand Down
13 changes: 6 additions & 7 deletions src/servers/src/http/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ pub async fn sql(
let outputs = match result {
Err((status, msg)) => {
return HttpResponse::Error(
ErrorResponse::from_error_message(format, status, msg)
ErrorResponse::from_error_message(status, msg)
.with_execution_time(start.elapsed().as_millis() as u64),
);
}
Expand All @@ -130,7 +130,6 @@ pub async fn sql(

/// Create a response from query result
pub async fn from_output(
ty: ResponseFormat,
outputs: Vec<crate::error::Result<Output>>,
) -> Result<(Vec<GreptimeQueryOutput>, HashMap<String, Value>), ErrorResponse> {
// TODO(sunng87): this api response structure cannot represent error well.
Expand All @@ -154,11 +153,11 @@ pub async fn from_output(
Ok(rows) => match HttpRecordsOutput::try_new(schema, rows) {
Ok(rows) => rows,
Err(err) => {
return Err(ErrorResponse::from_error(ty, err));
return Err(ErrorResponse::from_error(err));
}
},
Err(err) => {
return Err(ErrorResponse::from_error(ty, err));
return Err(ErrorResponse::from_error(err));
}
};
if let Some(physical_plan) = o.meta.plan {
Expand All @@ -180,14 +179,14 @@ pub async fn from_output(
results.push(GreptimeQueryOutput::Records(rows));
}
Err(err) => {
return Err(ErrorResponse::from_error(ty, err));
return Err(ErrorResponse::from_error(err));
}
}
}
},

Err(err) => {
return Err(ErrorResponse::from_error(ty, err));
return Err(ErrorResponse::from_error(err));
}
}
}
Expand Down Expand Up @@ -239,7 +238,7 @@ pub async fn promql(
let resp = if let Some((status, msg)) =
validate_schema(sql_handler.clone(), query_ctx.clone()).await
{
let resp = ErrorResponse::from_error_message(ResponseFormat::GreptimedbV1, status, msg);
let resp = ErrorResponse::from_error_message(status, msg);
HttpResponse::Error(resp)
} else {
let prom_query = params.into();
Expand Down
15 changes: 5 additions & 10 deletions src/servers/src/http/influxdb_result_v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use axum::http::HeaderValue;
use axum::response::{IntoResponse, Response};
use axum::Json;
use common_error::ext::ErrorExt;
use common_query::{Output, OutputData};
use common_recordbatch::{util, RecordBatch};
use schemars::JsonSchema;
Expand Down Expand Up @@ -143,10 +142,6 @@ impl InfluxdbV1Response {
outputs: Vec<crate::error::Result<Output>>,
epoch: Option<Epoch>,
) -> HttpResponse {
fn make_error_response(error: impl ErrorExt) -> HttpResponse {
HttpResponse::Error(ErrorResponse::from_error(ResponseFormat::InfluxdbV1, error))
}

// TODO(sunng87): this api response structure cannot represent error well.
// It hides successful execution results from error response
let mut results = Vec::with_capacity(outputs.len());
Expand All @@ -172,11 +167,11 @@ impl InfluxdbV1Response {
});
}
Err(err) => {
return make_error_response(err);
return HttpResponse::Error(ErrorResponse::from_error(err));
}
},
Err(err) => {
return make_error_response(err);
return HttpResponse::Error(ErrorResponse::from_error(err));
}
}
}
Expand All @@ -189,14 +184,14 @@ impl InfluxdbV1Response {
});
}
Err(err) => {
return make_error_response(err);
return HttpResponse::Error(ErrorResponse::from_error(err));
}
}
}
}
}
Err(err) => {
return make_error_response(err);
return HttpResponse::Error(ErrorResponse::from_error(err));
}
}
}
Expand All @@ -222,7 +217,7 @@ impl IntoResponse for InfluxdbV1Response {
let mut resp = Json(self).into_response();
resp.headers_mut().insert(
&GREPTIME_DB_HEADER_FORMAT,
HeaderValue::from_static("influxdb_v1"),
HeaderValue::from_static(ResponseFormat::InfluxdbV1.as_str()),
);
resp.headers_mut().insert(
&GREPTIME_DB_HEADER_EXECUTION_TIME,
Expand Down
10 changes: 3 additions & 7 deletions src/servers/src/http/script.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,15 @@ use snafu::ResultExt;

use crate::error::{HyperSnafu, InvalidUtf8ValueSnafu};
use crate::http::error_result::ErrorResponse;
use crate::http::{ApiState, GreptimedbV1Response, HttpResponse, ResponseFormat};
use crate::http::{ApiState, GreptimedbV1Response, HttpResponse};

macro_rules! json_err {
($e: expr) => {{
return HttpResponse::Error(ErrorResponse::from_error(ResponseFormat::GreptimedbV1, $e));
return HttpResponse::Error(ErrorResponse::from_error($e));
}};

($msg: expr, $code: expr) => {{
return HttpResponse::Error(ErrorResponse::from_error_message(
ResponseFormat::GreptimedbV1,
$code,
$msg.to_string(),
));
return HttpResponse::Error(ErrorResponse::from_error_message($code, $msg.to_string()));
}};
}

Expand Down
7 changes: 3 additions & 4 deletions src/servers/src/http/table_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,13 @@ pub struct TableResponse {

impl TableResponse {
pub async fn from_output(outputs: Vec<crate::error::Result<Output>>) -> HttpResponse {
match handler::from_output(ResponseFormat::Csv, outputs).await {
match handler::from_output(outputs).await {
Err(err) => HttpResponse::Error(err),
Ok((output, _)) => {
if output.len() > 1 {
HttpResponse::Error(ErrorResponse::from_error_message(
ResponseFormat::Table,
StatusCode::InvalidArguments,
"Multi-statements are not allowed".to_string(),
"cannot output multi-statements result in table format".to_string(),
))
} else {
HttpResponse::Table(TableResponse {
Expand Down Expand Up @@ -143,7 +142,7 @@ impl IntoResponse for TableResponse {
.into_response();
resp.headers_mut().insert(
&GREPTIME_DB_HEADER_FORMAT,
HeaderValue::from_static("TABLE"),
HeaderValue::from_static(ResponseFormat::Table.as_str()),
shuiyisong marked this conversation as resolved.
Show resolved Hide resolved
);
resp.headers_mut().insert(
&GREPTIME_DB_HEADER_EXECUTION_TIME,
Expand Down