Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support HTTP&gRPC&pg set timezone #3125

Merged
merged 3 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ etcd-client = "0.12"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a31ea166fc015ea7ff111ac94e26c3a5d64364d2" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2c1f17dce7af748c9a1255e82d6ceb7959f8919b" }
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"
Expand Down
14 changes: 14 additions & 0 deletions src/client/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ pub struct Database {
// The dbname follows naming rule as out mysql, postgres and http
// protocol. The server treat dbname in priority of catalog/schema.
dbname: String,
// The time zone indicates the time zone where the user is located.
// Some queries need to be aware of the user's time zone to perform some specific actions.
timezone: String,

client: Client,
ctx: FlightContext,
Expand All @@ -59,6 +62,7 @@ impl Database {
catalog: catalog.into(),
schema: schema.into(),
dbname: "".to_string(),
timezone: "".to_string(),
Taylor-lagrange marked this conversation as resolved.
Show resolved Hide resolved
client,
ctx: FlightContext::default(),
}
Expand All @@ -75,6 +79,7 @@ impl Database {
Self {
catalog: "".to_string(),
schema: "".to_string(),
timezone: "".to_string(),
Taylor-lagrange marked this conversation as resolved.
Show resolved Hide resolved
dbname: dbname.into(),
client,
ctx: FlightContext::default(),
Expand Down Expand Up @@ -105,6 +110,14 @@ impl Database {
self.dbname = dbname.into();
}

pub fn timezone(&self) -> &String {
&self.timezone
}

pub fn set_timezone(&mut self, timezone: impl Into<String>) {
self.timezone = timezone.into();
}

pub fn set_auth(&mut self, auth: AuthScheme) {
self.ctx.auth_header = Some(AuthHeader {
auth_scheme: Some(auth),
Expand Down Expand Up @@ -161,6 +174,7 @@ impl Database {
schema: self.schema.clone(),
authorization: self.ctx.auth_header.clone(),
dbname: self.dbname.clone(),
timezone: self.timezone.clone(),
// TODO(Taylor-lagrange): add client grpc tracing
tracing_context: W3cTrace::new(),
}),
Expand Down
10 changes: 10 additions & 0 deletions src/common/time/src/timezone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ pub fn get_timezone(tz: Option<Timezone>) -> Timezone {
})
}

#[inline(always)]
/// If the `tz = Some("") || None || Some(Invalid timezone)`, return system timezone,
/// or return parsed `tz` as timezone.
pub fn parse_timezone(tz: Option<&str>) -> Timezone {
waynexia marked this conversation as resolved.
Show resolved Hide resolved
match tz {
None | Some("") => Timezone::Named(Tz::UTC),
Some(tz) => Timezone::from_tz_string(tz).unwrap_or(Timezone::Named(Tz::UTC)),
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Timezone {
Offset(FixedOffset),
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,8 @@ pub fn check_permission(
// show create table and alter are not supported yet
Statement::ShowCreateTable(_) | Statement::CreateExternalTable(_) | Statement::Alter(_) => {
}
// set/show variable now only alter/show variable in session
Statement::SetVariables(_) | Statement::ShowVariables(_) => {}

Statement::Insert(insert) => {
validate_param(insert.table_name(), query_ctx)?;
Expand Down
49 changes: 45 additions & 4 deletions src/operator/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use common_meta::table_name::TableName;
use common_query::Output;
use common_telemetry::tracing;
use common_time::range::TimestampRange;
use common_time::Timestamp;
use common_time::{Timestamp, Timezone};
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use query::parser::QueryStatement;
use query::plan::LogicalPlan;
Expand All @@ -45,14 +45,14 @@ use sql::statements::copy::{CopyDatabaseArgument, CopyTable, CopyTableArgument};
use sql::statements::statement::Statement;
use sql::statements::OptionMap;
use sql::util::format_raw_object_name;
use sqlparser::ast::ObjectName;
use sqlparser::ast::{Expr, ObjectName, Value};
use table::engine::TableReference;
use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest};
use table::TableRef;

use crate::error::{
self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, PlanStatementSnafu,
Result, TableNotFoundSnafu,
self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, NotSupportedSnafu,
PlanStatementSnafu, Result, TableNotFoundSnafu,
};
use crate::insert::InserterRef;
use crate::statement::backup::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
Expand Down Expand Up @@ -188,6 +188,20 @@ impl StatementExecutor {
self.show_create_table(table_name, table_ref, query_ctx)
.await
}
Statement::SetVariables(set_var) => {
let var_name = set_var.variable.to_string().to_uppercase();
match var_name.as_str() {
"TIMEZONE" | "TIME_ZONE" => set_timezone(set_var.value, query_ctx)?,
_ => {
return NotSupportedSnafu {
feat: format!("Unsupported set variable {}", var_name),
}
.fail()
}
}
Ok(Output::AffectedRows(0))
}
Statement::ShowVariables(show_variable) => self.show_variable(show_variable, query_ctx),
}
}

Expand Down Expand Up @@ -228,6 +242,33 @@ impl StatementExecutor {
}
}

fn set_timezone(exprs: Vec<Expr>, ctx: QueryContextRef) -> Result<()> {
let tz_expr = exprs.first().context(NotSupportedSnafu {
feat: "No timezone find in set variable statement",
})?;
match tz_expr {
Expr::Value(Value::SingleQuotedString(tz)) | Expr::Value(Value::DoubleQuotedString(tz)) => {
match Timezone::from_tz_string(tz.as_str()) {
Ok(timezone) => ctx.set_timezone(timezone),
Err(_) => {
return NotSupportedSnafu {
feat: format!("Invalid timezone expr {} in set variable statement", tz),
}
.fail()
}
}
Ok(())
}
expr => NotSupportedSnafu {
feat: format!(
"Unsupported timezone expr {} in set variable statement",
expr
),
}
.fail(),
}
}

fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<CopyTableRequest> {
let direction = match stmt {
CopyTable::To(_) => CopyDirection::Export,
Expand Down
7 changes: 6 additions & 1 deletion src/operator/src/statement/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use session::context::QueryContextRef;
use snafu::ResultExt;
use sql::ast::{Ident, Value as SqlValue};
use sql::statements::create::{PartitionEntry, Partitions};
use sql::statements::show::{ShowDatabases, ShowTables};
use sql::statements::show::{ShowDatabases, ShowTables, ShowVariables};
use sql::{statements, MAXVALUE};
use table::TableRef;

Expand Down Expand Up @@ -71,6 +71,11 @@ impl StatementExecutor {
query::sql::show_create_table(table, partitions, query_ctx)
.context(error::ExecuteStatementSnafu)
}

#[tracing::instrument(skip_all)]
pub fn show_variable(&self, stmt: ShowVariables, query_ctx: QueryContextRef) -> Result<Output> {
query::sql::show_variable(stmt, query_ctx).context(error::ExecuteStatementSnafu)
}
}

fn create_partitions_stmt(partitions: Vec<PartitionInfo>) -> Result<Option<Partitions>> {
Expand Down
4 changes: 4 additions & 0 deletions src/query/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ pub enum Error {
#[snafu(display("Unsupported expr type: {}", name))]
UnsupportedExpr { name: String, location: Location },

#[snafu(display("Unsupported show variable: {}", name))]
UnsupportedVariable { name: String, location: Location },

#[snafu(display("Operation {} not implemented yet", operation))]
Unimplemented {
operation: String,
Expand Down Expand Up @@ -274,6 +277,7 @@ impl ErrorExt for Error {
| ConvertSchema { .. }
| AddSystemTimeOverflow { .. }
| ColumnSchemaIncompatible { .. }
| UnsupportedVariable { .. }
| ColumnSchemaNoDefault { .. } => StatusCode::InvalidArguments,

BuildBackend { .. } | ListObjects { .. } => StatusCode::StorageUnavailable,
Expand Down
25 changes: 23 additions & 2 deletions src/query/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use common_datasource::util::find_dir_and_filename;
use common_query::prelude::GREPTIME_TIMESTAMP;
use common_query::Output;
use common_recordbatch::{RecordBatch, RecordBatches};
use common_time::timezone::get_timezone;
use common_time::Timestamp;
use datatypes::prelude::*;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, RawSchema, Schema};
Expand All @@ -38,12 +39,12 @@ use regex::Regex;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use sql::statements::create::Partitions;
use sql::statements::show::{ShowDatabases, ShowKind, ShowTables};
use sql::statements::show::{ShowDatabases, ShowKind, ShowTables, ShowVariables};
use table::requests::{FILE_TABLE_LOCATION_KEY, FILE_TABLE_PATTERN_KEY};
use table::TableRef;

use crate::datafusion::execute_show_with_filter;
use crate::error::{self, Result};
use crate::error::{self, Result, UnsupportedVariableSnafu};

const SCHEMAS_COLUMN: &str = "Schemas";
const TABLES_COLUMN: &str = "Tables";
Expand Down Expand Up @@ -229,6 +230,26 @@ pub async fn show_tables(
}
}

pub fn show_variable(stmt: ShowVariables, query_ctx: QueryContextRef) -> Result<Output> {
let variable = stmt.variable.to_string().to_uppercase();
let value = match variable.as_str() {
"SYSTEM_TIME_ZONE" => get_timezone(None).to_string(),
Taylor-lagrange marked this conversation as resolved.
Show resolved Hide resolved
"TIME_ZONE" => query_ctx.timezone().to_string(),
_ => return UnsupportedVariableSnafu { name: variable }.fail(),
};
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
variable,
ConcreteDataType::string_datatype(),
false,
)]));
let records = RecordBatches::try_from_columns(
schema,
vec![Arc::new(StringVector::from(vec![value])) as _],
)
.context(error::CreateRecordBatchSnafu)?;
Ok(Output::RecordBatches(records))
}

pub fn show_create_table(
table: TableRef,
partitions: Option<Partitions>,
Expand Down
9 changes: 6 additions & 3 deletions src/servers/src/grpc/greptime_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use common_error::status_code::StatusCode;
use common_query::Output;
use common_runtime::Runtime;
use common_telemetry::logging;
use common_time::timezone::parse_timezone;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};

Expand Down Expand Up @@ -161,11 +162,13 @@ pub(crate) fn create_query_context(header: Option<&RequestHeader>) -> QueryConte
}
})
.unwrap_or((DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME));

QueryContextBuilder::default()
let timezone = parse_timezone(header.map(|h| h.timezone.as_str()));
let ctx = QueryContextBuilder::default()
.current_catalog(catalog.to_string())
.current_schema(schema.to_string())
.build()
.build();
ctx.set_timezone(timezone);
Taylor-lagrange marked this conversation as resolved.
Show resolved Hide resolved
ctx
}

/// Histogram timer for handling gRPC request.
Expand Down
24 changes: 21 additions & 3 deletions src/servers/src/http/authorize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_catalog::parse_catalog_and_schema_from_db_string;
use common_error::ext::ErrorExt;
use common_telemetry::warn;
use common_time::timezone::parse_timezone;
use common_time::Timezone;
use headers::Header;
use secrecy::SecretString;
use session::context::QueryContext;
use session::context::QueryContextBuilder;
use snafu::{ensure, OptionExt, ResultExt};

use super::header::GreptimeDbName;
use super::header::{GreptimeDbName, GREPTIME_TIMEZONE_HEADER_NAME};
use super::{ResponseFormat, PUBLIC_APIS};
use crate::error::{
self, InvalidAuthorizationHeaderSnafu, InvalidParameterSnafu, InvisibleASCIISnafu,
Expand Down Expand Up @@ -56,7 +58,12 @@ pub async fn inner_auth<B>(
) -> std::result::Result<Request<B>, Response> {
// 1. prepare
let (catalog, schema) = extract_catalog_and_schema(&req);
let query_ctx = QueryContext::with(catalog, schema);
let timezone = extract_timezone(&req);
let query_ctx = QueryContextBuilder::default()
.current_catalog(catalog.to_string())
.current_schema(schema.to_string())
.build();
query_ctx.set_timezone(timezone);
Taylor-lagrange marked this conversation as resolved.
Show resolved Hide resolved
let need_auth = need_auth(&req);
let is_influxdb = req.uri().path().contains("influxdb");

Expand Down Expand Up @@ -142,6 +149,17 @@ pub fn extract_catalog_and_schema<B>(request: &Request<B>) -> (&str, &str) {
parse_catalog_and_schema_from_db_string(dbname)
}

fn extract_timezone<B>(request: &Request<B>) -> Timezone {
// parse timezone from header
let timezone = request
.headers()
.get(&GREPTIME_TIMEZONE_HEADER_NAME)
// eat this invalid ascii error and give user the final IllegalParam error
.and_then(|header| header.to_str().ok())
.unwrap_or("");
parse_timezone(Some(timezone))
}

fn get_influxdb_credentials<B>(request: &Request<B>) -> Result<Option<(Username, Password)>> {
// compat with influxdb v2 and v1
if let Some(header) = request.headers().get(http::header::AUTHORIZATION) {
Expand Down
3 changes: 3 additions & 0 deletions src/servers/src/http/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ pub const GREPTIME_DB_HEADER_FORMAT: &str = "x-greptime-format";
pub const GREPTIME_DB_HEADER_EXECUTION_TIME: &str = "x-greptime-execution-time";

pub static GREPTIME_DB_HEADER_NAME: HeaderName = HeaderName::from_static("x-greptime-name");
pub static GREPTIME_DB_NAME_HEADER_NAME: HeaderName = HeaderName::from_static("x-greptime-db-name");
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
pub static GREPTIME_TIMEZONE_HEADER_NAME: HeaderName =
HeaderName::from_static("x-greptime-timezone");

pub struct GreptimeDbName(Option<String>);

Expand Down