Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support printing postgresql's bytea data type in its "hex" and "escape" format #3567

Merged
merged 19 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
c7c29ea
feat: support set variable statement of session
J0HN50N133 Mar 21, 2024
be1e49e
feat: support printing postgresql's bytea data type in its "hex" and …
J0HN50N133 Mar 21, 2024
e3bcb44
refactor: add 'SessionConfigValue' type and unify the name
J0HN50N133 Mar 22, 2024
3077937
Merge branch 'main' into feature_bytea
J0HN50N133 Mar 22, 2024
cae1430
doc: add license header
J0HN50N133 Mar 22, 2024
d9f0260
refactor: confine coupling with 'sql::ast::Value' in SessionConfigValue
J0HN50N133 Mar 22, 2024
5a31138
refactor: move all bytea wrapper into bytea.rs
J0HN50N133 Mar 22, 2024
03e8bd0
fix: remove unused import in context.rs and postgres.rs
J0HN50N133 Mar 22, 2024
0f3ab19
refactor: rename 'set_configuration_parameter' to 'set_session_config'
J0HN50N133 Mar 22, 2024
3a5fbfc
refactor: use mod to organize options via macro
J0HN50N133 Mar 22, 2024
5d1880e
refactor: re-model the session config value with static type
J0HN50N133 Mar 22, 2024
170a378
test: add integration test
J0HN50N133 Mar 24, 2024
7c6b5b4
refactor: move the encode bytea by format type logic into encoder
J0HN50N133 Mar 25, 2024
12b972f
test: add ut for byte related type
J0HN50N133 Mar 25, 2024
61cb858
doc: remove TODO of bytea_output
J0HN50N133 Mar 25, 2024
817149a
refactor: simplify the implementation with simple struct instead of c…
J0HN50N133 Mar 26, 2024
f1defe9
fix: typo of 'Available'
J0HN50N133 Mar 26, 2024
f6f0a9d
Merge branch 'main' into feature_bytea
tisonkun Mar 26, 2024
6ae783f
fix compile
tisonkun Mar 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion src/operator/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use datafusion::parquet;
use datatypes::arrow::error::ArrowError;
use datatypes::value::Value;
use servers::define_into_tonic_status;
use snafu::{Location, Snafu};
use sql::ast::Value;

#[derive(Snafu)]
#[snafu(visibility(pub))]
Expand Down Expand Up @@ -528,6 +528,12 @@ pub enum Error {

#[snafu(display("Invalid partition rule: {}", reason))]
InvalidPartitionRule { reason: String, location: Location },

#[snafu(display("Invalid configuration value."))]
InvalidConfigValue {
J0HN50N133 marked this conversation as resolved.
Show resolved Hide resolved
source: session::session_config::Error,
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -536,6 +542,7 @@ impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::InvalidSql { .. }
| Error::InvalidConfigValue { .. }
| Error::InvalidInsertRequest { .. }
| Error::InvalidDeleteRequest { .. }
| Error::IllegalPrimaryKeysDef { .. }
Expand Down
27 changes: 23 additions & 4 deletions src/operator/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use query::parser::QueryStatement;
use query::plan::LogicalPlan;
use query::QueryEngineRef;
use session::context::QueryContextRef;
use session::session_config::PGByteaOutputValue;
use session::table_name::table_idents_to_full_name;
use snafu::{ensure, OptionExt, ResultExt};
use sql::statements::copy::{CopyDatabase, CopyDatabaseArgument, CopyTable, CopyTableArgument};
Expand All @@ -52,8 +53,8 @@ use table::table_reference::TableReference;
use table::TableRef;

use crate::error::{
self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, NotSupportedSnafu,
PlanStatementSnafu, Result, TableNotFoundSnafu,
self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidConfigValueSnafu,
InvalidSqlSnafu, NotSupportedSnafu, PlanStatementSnafu, Result, TableNotFoundSnafu,
};
use crate::insert::InserterRef;
use crate::statement::copy_database::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
Expand Down Expand Up @@ -219,8 +220,7 @@ impl StatementExecutor {
// so we just ignore it here instead of returning an error to break the connection.
// Since the "bytea_output" only determines the output format of binary values,
// it won't cause much trouble if we do so.
// TODO(#3438): Remove this temporary workaround after the feature is implemented.
"BYTEA_OUTPUT" => (),
"BYTEA_OUTPUT" => set_bytea_output(set_var.value, query_ctx)?,

// Same as "bytea_output", we just ignore it here.
// Not harmful since it only relates to how date is viewed in client app's output.
Expand Down Expand Up @@ -339,6 +339,25 @@ fn set_timezone(exprs: Vec<Expr>, ctx: QueryContextRef) -> Result<()> {
}
}

fn set_bytea_output(exprs: Vec<Expr>, ctx: QueryContextRef) -> Result<()> {
let Some((var_value, [])) = exprs.split_first() else {
return (NotSupportedSnafu {
feat: "Set variable value must have one and only one value for bytea_output",
})
.fail();
};
let Expr::Value(value) = var_value else {
return (NotSupportedSnafu {
feat: "Set variable value must be a value",
})
.fail();
};
ctx.configuration_parameter().set_postgres_bytea_output(
PGByteaOutputValue::try_from(value.clone()).context(InvalidConfigValueSnafu)?,
);
Ok(())
}

fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<CopyTableRequest> {
let direction = match stmt {
CopyTable::To(_) => CopyDirection::Export,
Expand Down
1 change: 1 addition & 0 deletions src/servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ common-recordbatch.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
dashmap.workspace = true
datafusion.workspace = true
datafusion-common.workspace = true
datatypes.workspace = true
Expand Down
4 changes: 2 additions & 2 deletions src/servers/src/mysql/federated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ mod test {

#[test]
fn test_check() {
let session = Arc::new(Session::new(None, Channel::Mysql));
let session = Arc::new(Session::new(None, Channel::Mysql, Default::default()));
let query = "select 1";
let result = check(query, QueryContext::arc(), session.clone());
assert!(result.is_none());
Expand All @@ -320,7 +320,7 @@ mod test {
assert!(output.is_none());

fn test(query: &str, expected: &str) {
let session = Arc::new(Session::new(None, Channel::Mysql));
let session = Arc::new(Session::new(None, Channel::Mysql, Default::default()));
let output = check(query, QueryContext::arc(), session.clone());
match output.unwrap().data {
OutputData::RecordBatches(r) => {
Expand Down
6 changes: 5 additions & 1 deletion src/servers/src/mysql/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,11 @@ impl MysqlInstanceShim {
MysqlInstanceShim {
query_handler,
salt: scramble,
session: Arc::new(Session::new(Some(client_addr), Channel::Mysql)),
session: Arc::new(Session::new(
Some(client_addr),
Channel::Mysql,
Default::default(),
)),
user_provider,
prepared_stmts: Default::default(),
prepared_stmts_counter: AtomicU32::new(1),
Expand Down
2 changes: 1 addition & 1 deletion src/servers/src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ pub(crate) struct MakePostgresServerHandler {

impl MakePostgresServerHandler {
fn make(&self, addr: Option<SocketAddr>) -> PostgresServerHandler {
let session = Arc::new(Session::new(addr, Channel::Postgres));
let session = Arc::new(Session::new(addr, Channel::Postgres, Default::default()));
PostgresServerHandler {
query_handler: self.query_handler.clone(),
login_verifier: PgLoginVerifier::new(self.user_provider.clone()),
Expand Down
27 changes: 20 additions & 7 deletions src/servers/src/postgres/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use pgwire::api::stmt::{QueryParser, StoredStatement};
use pgwire::api::{ClientInfo, Type};
use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
use query::query_engine::DescribeResult;
use session::context::QueryContextRef;
use session::Session;
use sql::dialect::PostgreSqlDialect;
use sql::parser::{ParseOptions, ParserContext};
Expand Down Expand Up @@ -63,7 +64,7 @@ impl SimpleQueryHandler for PostgresServerHandler {
let mut results = Vec::with_capacity(outputs.len());

for output in outputs {
let resp = output_to_query_response(output, &Format::UnifiedText)?;
let resp = output_to_query_response(query_ctx.clone(), output, &Format::UnifiedText)?;
results.push(resp);
}

Expand All @@ -72,6 +73,7 @@ impl SimpleQueryHandler for PostgresServerHandler {
}

fn output_to_query_response<'a>(
query_ctx: QueryContextRef,
output: Result<Output>,
field_format: &Format,
) -> PgWireResult<Response<'a>> {
Expand All @@ -82,11 +84,16 @@ fn output_to_query_response<'a>(
}
OutputData::Stream(record_stream) => {
let schema = record_stream.schema();
recordbatches_to_query_response(record_stream, schema, field_format)
recordbatches_to_query_response(query_ctx, record_stream, schema, field_format)
}
OutputData::RecordBatches(recordbatches) => {
let schema = recordbatches.schema();
recordbatches_to_query_response(recordbatches.as_stream(), schema, field_format)
recordbatches_to_query_response(
query_ctx,
recordbatches.as_stream(),
schema,
field_format,
)
}
},
Err(e) => Ok(Response::Error(Box::new(ErrorInfo::new(
Expand All @@ -98,6 +105,7 @@ fn output_to_query_response<'a>(
}

fn recordbatches_to_query_response<'a, S>(
query_ctx: QueryContextRef,
recordbatches_stream: S,
schema: SchemaRef,
field_format: &Format,
Expand Down Expand Up @@ -125,7 +133,7 @@ where
row.and_then(|row| {
let mut encoder = DataRowEncoder::new(pg_schema_ref.clone());
for value in row.iter() {
encode_value(value, &mut encoder)?;
encode_value(&query_ctx, value, &mut encoder)?;
}
encoder.finish()
})
Expand Down Expand Up @@ -224,7 +232,9 @@ impl ExtendedQueryHandler for PostgresServerHandler {
let plan = plan
.replace_params_with_values(parameters_to_scalar_values(plan, portal)?.as_ref())
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
self.query_handler.do_exec_plan(plan, query_ctx).await
self.query_handler
.do_exec_plan(plan, query_ctx.clone())
.await
} else {
// manually replace variables in prepared statement when no
// logical_plan is generated. This happens when logical plan is not
Expand All @@ -234,10 +244,13 @@ impl ExtendedQueryHandler for PostgresServerHandler {
sql = sql.replace(&format!("${}", i + 1), &parameter_to_string(portal, i)?);
}

self.query_handler.do_query(&sql, query_ctx).await.remove(0)
self.query_handler
.do_query(&sql, query_ctx.clone())
.await
.remove(0)
};

output_to_query_response(output, &portal.result_column_format)
output_to_query_response(query_ctx, output, &portal.result_column_format)
}

async fn do_describe_statement<C>(
Expand Down
25 changes: 22 additions & 3 deletions src/servers/src/postgres/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod bytea;
mod interval;

use std::collections::HashMap;
Expand All @@ -28,7 +29,10 @@ use pgwire::api::results::{DataRowEncoder, FieldInfo};
use pgwire::api::Type;
use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
use query::plan::LogicalPlan;
use session::context::QueryContextRef;
use session::session_config::PGByteaOutputValue;

use self::bytea::{EscapeOutputBytea, HexOutputBytea};
use self::interval::PgInterval;
use crate::error::{self, Error, Result};
use crate::SqlPlan;
Expand All @@ -50,7 +54,11 @@ pub(super) fn schema_to_pg(origin: &Schema, field_formats: &Format) -> Result<Ve
.collect::<Result<Vec<FieldInfo>>>()
}

pub(super) fn encode_value(value: &Value, builder: &mut DataRowEncoder) -> PgWireResult<()> {
pub(super) fn encode_value(
query_ctx: &QueryContextRef,
value: &Value,
builder: &mut DataRowEncoder,
) -> PgWireResult<()> {
match value {
Value::Null => builder.encode_field(&None::<&i8>),
Value::Boolean(v) => builder.encode_field(v),
Expand All @@ -65,7 +73,13 @@ pub(super) fn encode_value(value: &Value, builder: &mut DataRowEncoder) -> PgWir
Value::Float32(v) => builder.encode_field(&v.0),
Value::Float64(v) => builder.encode_field(&v.0),
Value::String(v) => builder.encode_field(&v.as_utf8()),
Value::Binary(v) => builder.encode_field(&v.deref()),
Value::Binary(v) => {
let bytea_output = query_ctx.configuration_parameter().postgres_bytea_output();
match *bytea_output {
PGByteaOutputValue::ESCAPE => builder.encode_field(&EscapeOutputBytea(v.deref())),
PGByteaOutputValue::HEX => builder.encode_field(&HexOutputBytea(v.deref())),
}
}
Value::Date(v) => {
if let Some(date) = v.to_chrono_date() {
builder.encode_field(&date)
Expand Down Expand Up @@ -563,6 +577,7 @@ mod test {
use datatypes::value::ListValue;
use pgwire::api::results::{FieldFormat, FieldInfo};
use pgwire::api::Type;
use session::context::QueryContextBuilder;

use super::*;

Expand Down Expand Up @@ -784,12 +799,16 @@ mod test {
Value::Timestamp(1000001i64.into()),
Value::Interval(1000001i128.into()),
];
let query_context = QueryContextBuilder::default()
.configuration_parameter(Default::default())
.build();
let mut builder = DataRowEncoder::new(Arc::new(schema));
for i in values.iter() {
encode_value(i, &mut builder).unwrap();
encode_value(&query_context, i, &mut builder).unwrap();
}

let err = encode_value(
&query_context,
&Value::List(ListValue::new(
Some(Box::default()),
ConcreteDataType::int16_datatype(),
Expand Down