Skip to content

Commit

Permalink
refactor: handle error for http format (#3548)
Browse files Browse the repository at this point in the history
* refactor: handle error for http format

Signed-off-by: tison <wander4096@gmail.com>

* finish format handling

Signed-off-by: tison <wander4096@gmail.com>

* simplify auth error

Signed-off-by: tison <wander4096@gmail.com>

* fix

Signed-off-by: tison <wander4096@gmail.com>

* clippy format

Signed-off-by: tison <wander4096@gmail.com>

* no longer set greptime-db-format on influxdb error

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
  • Loading branch information
tisonkun committed Mar 21, 2024
1 parent 856a4e1 commit 8b7a5aa
Show file tree
Hide file tree
Showing 11 changed files with 54 additions and 89 deletions.
1 change: 0 additions & 1 deletion src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,6 @@ impl Server for HttpServer {
async fn handle_error(err: BoxError) -> Json<HttpResponse> {
error!(err; "Unhandled internal error");
Json(HttpResponse::Error(ErrorResponse::from_error_message(
ResponseFormat::GreptimedbV1,
StatusCode::Unexpected,
format!("Unhandled internal error: {err}"),
)))
Expand Down
43 changes: 20 additions & 23 deletions src/servers/src/http/arrow_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,49 +60,46 @@ async fn write_arrow_bytes(
}

impl ArrowResponse {
pub async fn from_output(mut outputs: Vec<crate::error::Result<Output>>) -> HttpResponse {
if outputs.len() != 1 {
pub async fn from_output(mut outputs: Vec<error::Result<Output>>) -> HttpResponse {
if outputs.len() > 1 {
return HttpResponse::Error(ErrorResponse::from_error_message(
ResponseFormat::Arrow,
StatusCode::InvalidArguments,
"Multi-statements and empty query are not allowed".to_string(),
"cannot output multi-statements result in arrow format".to_string(),
));
}

match outputs.remove(0) {
Ok(output) => match output.data {
OutputData::AffectedRows(_rows) => HttpResponse::Arrow(ArrowResponse {
match outputs.pop() {
None => HttpResponse::Arrow(ArrowResponse {
data: vec![],
execution_time_ms: 0,
}),
Some(Ok(output)) => match output.data {
OutputData::AffectedRows(_) => HttpResponse::Arrow(ArrowResponse {
data: vec![],
execution_time_ms: 0,
}),
OutputData::RecordBatches(recordbatches) => {
let schema = recordbatches.schema();
match write_arrow_bytes(recordbatches.as_stream(), schema.arrow_schema()).await
{
OutputData::RecordBatches(batches) => {
let schema = batches.schema();
match write_arrow_bytes(batches.as_stream(), schema.arrow_schema()).await {
Ok(payload) => HttpResponse::Arrow(ArrowResponse {
data: payload,
execution_time_ms: 0,
}),
Err(e) => {
HttpResponse::Error(ErrorResponse::from_error(ResponseFormat::Arrow, e))
}
Err(e) => HttpResponse::Error(ErrorResponse::from_error(e)),
}
}

OutputData::Stream(recordbatches) => {
let schema = recordbatches.schema();
match write_arrow_bytes(recordbatches, schema.arrow_schema()).await {
OutputData::Stream(batches) => {
let schema = batches.schema();
match write_arrow_bytes(batches, schema.arrow_schema()).await {
Ok(payload) => HttpResponse::Arrow(ArrowResponse {
data: payload,
execution_time_ms: 0,
}),
Err(e) => {
HttpResponse::Error(ErrorResponse::from_error(ResponseFormat::Arrow, e))
}
Err(e) => HttpResponse::Error(ErrorResponse::from_error(e)),
}
}
},
Err(e) => HttpResponse::Error(ErrorResponse::from_error(ResponseFormat::Arrow, e)),
Some(Err(e)) => HttpResponse::Error(ErrorResponse::from_error(e)),
}
}

Expand All @@ -127,7 +124,7 @@ impl IntoResponse for ArrowResponse {
),
(
&GREPTIME_DB_HEADER_FORMAT,
HeaderValue::from_static("ARROW"),
HeaderValue::from_static(ResponseFormat::Arrow.as_str()),
),
(
&GREPTIME_DB_HEADER_EXECUTION_TIME,
Expand Down
15 changes: 5 additions & 10 deletions src/servers/src/http/authorize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use session::context::QueryContextBuilder;
use snafu::{ensure, OptionExt, ResultExt};

use super::header::{GreptimeDbName, GREPTIME_TIMEZONE_HEADER_NAME};
use super::{ResponseFormat, PUBLIC_APIS};
use super::PUBLIC_APIS;
use crate::error::{
self, InvalidAuthHeaderInvisibleASCIISnafu, InvalidAuthHeaderSnafu, InvalidParameterSnafu,
NotFoundInfluxAuthSnafu, Result, UnsupportedAuthSchemeSnafu, UrlDecodeSnafu,
Expand Down Expand Up @@ -88,7 +88,7 @@ pub async fn inner_auth<B>(
crate::metrics::METRIC_AUTH_FAILURE
.with_label_values(&[e.status_code().as_ref()])
.inc();
return Err(err_response(is_influxdb_request(&req), e).into_response());
return Err(err_response(e));
}
};

Expand All @@ -112,7 +112,7 @@ pub async fn inner_auth<B>(
crate::metrics::METRIC_AUTH_FAILURE
.with_label_values(&[e.status_code().as_ref()])
.inc();
Err(err_response(is_influxdb_request(&req), e).into_response())
Err(err_response(e))
}
}
}
Expand All @@ -128,13 +128,8 @@ pub async fn check_http_auth<B>(
}
}

fn err_response(is_influxdb: bool, err: impl ErrorExt) -> impl IntoResponse {
let ty = if is_influxdb {
ResponseFormat::InfluxdbV1
} else {
ResponseFormat::GreptimedbV1
};
(StatusCode::UNAUTHORIZED, ErrorResponse::from_error(ty, err))
fn err_response(err: impl ErrorExt) -> Response {
(StatusCode::UNAUTHORIZED, ErrorResponse::from_error(err)).into_response()
}

pub fn extract_catalog_and_schema<B>(request: &Request<B>) -> (&str, &str) {
Expand Down
11 changes: 6 additions & 5 deletions src/servers/src/http/csv_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,13 @@ pub struct CsvResponse {

impl CsvResponse {
pub async fn from_output(outputs: Vec<crate::error::Result<Output>>) -> HttpResponse {
match handler::from_output(ResponseFormat::Csv, outputs).await {
match handler::from_output(outputs).await {
Err(err) => HttpResponse::Error(err),
Ok((output, _)) => {
if output.len() > 1 {
HttpResponse::Error(ErrorResponse::from_error_message(
ResponseFormat::Csv,
StatusCode::InvalidArguments,
"Multi-statements are not allowed".to_string(),
"cannot output multi-statements result in csv format".to_string(),
))
} else {
HttpResponse::Csv(CsvResponse {
Expand Down Expand Up @@ -100,8 +99,10 @@ impl IntoResponse for CsvResponse {
payload,
)
.into_response();
resp.headers_mut()
.insert(&GREPTIME_DB_HEADER_FORMAT, HeaderValue::from_static("CSV"));
resp.headers_mut().insert(
&GREPTIME_DB_HEADER_FORMAT,
HeaderValue::from_static(ResponseFormat::Csv.as_str()),
);
resp.headers_mut().insert(
&GREPTIME_DB_HEADER_EXECUTION_TIME,
HeaderValue::from(execution_time),
Expand Down
15 changes: 4 additions & 11 deletions src/servers/src/http/error_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,17 @@ use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

use crate::http::header::constants::GREPTIME_DB_HEADER_ERROR_CODE;
use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT};
use crate::http::ResponseFormat;
use crate::http::header::GREPTIME_DB_HEADER_EXECUTION_TIME;

#[derive(Serialize, Deserialize, Debug, JsonSchema)]
pub struct ErrorResponse {
#[serde(skip)]
ty: ResponseFormat,
code: u32,
error: String,
execution_time_ms: u64,
}

impl ErrorResponse {
pub fn from_error(ty: ResponseFormat, error: impl ErrorExt) -> Self {
pub fn from_error(error: impl ErrorExt) -> Self {
let code = error.status_code();

if code.should_log_error() {
Expand All @@ -44,12 +41,11 @@ impl ErrorResponse {
debug!("Failed to handle HTTP request, err: {:?}", error);
}

Self::from_error_message(ty, code, error.output_msg())
Self::from_error_message(code, error.output_msg())
}

pub fn from_error_message(ty: ResponseFormat, code: StatusCode, msg: String) -> Self {
pub fn from_error_message(code: StatusCode, msg: String) -> Self {
ErrorResponse {
ty,
code: code as u32,
error: msg,
execution_time_ms: 0,
Expand All @@ -76,14 +72,11 @@ impl ErrorResponse {

impl IntoResponse for ErrorResponse {
fn into_response(self) -> Response {
let ty = self.ty.as_str();
let code = self.code;
let execution_time = self.execution_time_ms;
let mut resp = Json(self).into_response();
resp.headers_mut()
.insert(GREPTIME_DB_HEADER_ERROR_CODE, HeaderValue::from(code));
resp.headers_mut()
.insert(&GREPTIME_DB_HEADER_FORMAT, HeaderValue::from_static(ty));
resp.headers_mut().insert(
&GREPTIME_DB_HEADER_EXECUTION_TIME,
HeaderValue::from(execution_time),
Expand Down
4 changes: 2 additions & 2 deletions src/servers/src/http/greptime_result_v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub struct GreptimedbV1Response {

impl GreptimedbV1Response {
pub async fn from_output(outputs: Vec<crate::error::Result<Output>>) -> HttpResponse {
match handler::from_output(ResponseFormat::GreptimedbV1, outputs).await {
match handler::from_output(outputs).await {
Ok((output, resp_metrics)) => HttpResponse::GreptimedbV1(Self {
output,
execution_time_ms: 0,
Expand Down Expand Up @@ -77,7 +77,7 @@ impl IntoResponse for GreptimedbV1Response {

resp.headers_mut().insert(
&GREPTIME_DB_HEADER_FORMAT,
HeaderValue::from_static("greptimedb_v1"),
HeaderValue::from_static(ResponseFormat::GreptimedbV1.as_str()),
);
resp.headers_mut().insert(
&GREPTIME_DB_HEADER_EXECUTION_TIME,
Expand Down
13 changes: 6 additions & 7 deletions src/servers/src/http/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ pub async fn sql(
let outputs = match result {
Err((status, msg)) => {
return HttpResponse::Error(
ErrorResponse::from_error_message(format, status, msg)
ErrorResponse::from_error_message(status, msg)
.with_execution_time(start.elapsed().as_millis() as u64),
);
}
Expand All @@ -130,7 +130,6 @@ pub async fn sql(

/// Create a response from query result
pub async fn from_output(
ty: ResponseFormat,
outputs: Vec<crate::error::Result<Output>>,
) -> Result<(Vec<GreptimeQueryOutput>, HashMap<String, Value>), ErrorResponse> {
// TODO(sunng87): this api response structure cannot represent error well.
Expand All @@ -154,11 +153,11 @@ pub async fn from_output(
Ok(rows) => match HttpRecordsOutput::try_new(schema, rows) {
Ok(rows) => rows,
Err(err) => {
return Err(ErrorResponse::from_error(ty, err));
return Err(ErrorResponse::from_error(err));
}
},
Err(err) => {
return Err(ErrorResponse::from_error(ty, err));
return Err(ErrorResponse::from_error(err));
}
};
if let Some(physical_plan) = o.meta.plan {
Expand All @@ -180,14 +179,14 @@ pub async fn from_output(
results.push(GreptimeQueryOutput::Records(rows));
}
Err(err) => {
return Err(ErrorResponse::from_error(ty, err));
return Err(ErrorResponse::from_error(err));
}
}
}
},

Err(err) => {
return Err(ErrorResponse::from_error(ty, err));
return Err(ErrorResponse::from_error(err));
}
}
}
Expand Down Expand Up @@ -239,7 +238,7 @@ pub async fn promql(
let resp = if let Some((status, msg)) =
validate_schema(sql_handler.clone(), query_ctx.clone()).await
{
let resp = ErrorResponse::from_error_message(ResponseFormat::GreptimedbV1, status, msg);
let resp = ErrorResponse::from_error_message(status, msg);
HttpResponse::Error(resp)
} else {
let prom_query = params.into();
Expand Down
15 changes: 5 additions & 10 deletions src/servers/src/http/influxdb_result_v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use axum::http::HeaderValue;
use axum::response::{IntoResponse, Response};
use axum::Json;
use common_error::ext::ErrorExt;
use common_query::{Output, OutputData};
use common_recordbatch::{util, RecordBatch};
use schemars::JsonSchema;
Expand Down Expand Up @@ -143,10 +142,6 @@ impl InfluxdbV1Response {
outputs: Vec<crate::error::Result<Output>>,
epoch: Option<Epoch>,
) -> HttpResponse {
fn make_error_response(error: impl ErrorExt) -> HttpResponse {
HttpResponse::Error(ErrorResponse::from_error(ResponseFormat::InfluxdbV1, error))
}

// TODO(sunng87): this api response structure cannot represent error well.
// It hides successful execution results from error response
let mut results = Vec::with_capacity(outputs.len());
Expand All @@ -172,11 +167,11 @@ impl InfluxdbV1Response {
});
}
Err(err) => {
return make_error_response(err);
return HttpResponse::Error(ErrorResponse::from_error(err));
}
},
Err(err) => {
return make_error_response(err);
return HttpResponse::Error(ErrorResponse::from_error(err));
}
}
}
Expand All @@ -189,14 +184,14 @@ impl InfluxdbV1Response {
});
}
Err(err) => {
return make_error_response(err);
return HttpResponse::Error(ErrorResponse::from_error(err));
}
}
}
}
}
Err(err) => {
return make_error_response(err);
return HttpResponse::Error(ErrorResponse::from_error(err));
}
}
}
Expand All @@ -222,7 +217,7 @@ impl IntoResponse for InfluxdbV1Response {
let mut resp = Json(self).into_response();
resp.headers_mut().insert(
&GREPTIME_DB_HEADER_FORMAT,
HeaderValue::from_static("influxdb_v1"),
HeaderValue::from_static(ResponseFormat::InfluxdbV1.as_str()),
);
resp.headers_mut().insert(
&GREPTIME_DB_HEADER_EXECUTION_TIME,
Expand Down
10 changes: 3 additions & 7 deletions src/servers/src/http/script.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,15 @@ use snafu::ResultExt;

use crate::error::{HyperSnafu, InvalidUtf8ValueSnafu};
use crate::http::error_result::ErrorResponse;
use crate::http::{ApiState, GreptimedbV1Response, HttpResponse, ResponseFormat};
use crate::http::{ApiState, GreptimedbV1Response, HttpResponse};

macro_rules! json_err {
($e: expr) => {{
return HttpResponse::Error(ErrorResponse::from_error(ResponseFormat::GreptimedbV1, $e));
return HttpResponse::Error(ErrorResponse::from_error($e));
}};

($msg: expr, $code: expr) => {{
return HttpResponse::Error(ErrorResponse::from_error_message(
ResponseFormat::GreptimedbV1,
$code,
$msg.to_string(),
));
return HttpResponse::Error(ErrorResponse::from_error_message($code, $msg.to_string()));
}};
}

Expand Down
7 changes: 3 additions & 4 deletions src/servers/src/http/table_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,13 @@ pub struct TableResponse {

impl TableResponse {
pub async fn from_output(outputs: Vec<crate::error::Result<Output>>) -> HttpResponse {
match handler::from_output(ResponseFormat::Csv, outputs).await {
match handler::from_output(outputs).await {
Err(err) => HttpResponse::Error(err),
Ok((output, _)) => {
if output.len() > 1 {
HttpResponse::Error(ErrorResponse::from_error_message(
ResponseFormat::Table,
StatusCode::InvalidArguments,
"Multi-statements are not allowed".to_string(),
"cannot output multi-statements result in table format".to_string(),
))
} else {
HttpResponse::Table(TableResponse {
Expand Down Expand Up @@ -143,7 +142,7 @@ impl IntoResponse for TableResponse {
.into_response();
resp.headers_mut().insert(
&GREPTIME_DB_HEADER_FORMAT,
HeaderValue::from_static("TABLE"),
HeaderValue::from_static(ResponseFormat::Table.as_str()),
);
resp.headers_mut().insert(
&GREPTIME_DB_HEADER_EXECUTION_TIME,
Expand Down

0 comments on commit 8b7a5aa

Please sign in to comment.