Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support HTTP&gRPC&pg set timezone #3125

Merged
merged 3 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ etcd-client = "0.12"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a31ea166fc015ea7ff111ac94e26c3a5d64364d2" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2c1f17dce7af748c9a1255e82d6ceb7959f8919b" }
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/bin/nyc-taxi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr {
catalog_name: CATALOG_NAME.to_string(),
schema_name: SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
desc: "".to_string(),
desc: String::default(),
column_defs: vec![
ColumnDef {
name: "VendorID".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ mod tests {
assert_eq!(
StatusCode::StorageUnavailable,
Error::SystemCatalog {
msg: "".to_string(),
msg: String::default(),
location: Location::generate(),
}
.status_code()
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/kvbackend/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl KvBackendCatalogManager {
catalog_manager: me.clone(),
information_schema_provider: Arc::new(InformationSchemaProvider::new(
// The catalog name is not used in system_catalog, so let it empty
"".to_string(),
String::default(),
me.clone(),
)),
},
Expand Down
2 changes: 1 addition & 1 deletion src/client/examples/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn run() {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: "test_logical_dist_exec".to_string(),
desc: "".to_string(),
desc: String::default(),
column_defs: vec![
ColumnDef {
name: "timestamp".to_string(),
Expand Down
20 changes: 17 additions & 3 deletions src/client/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ pub struct Database {
// The dbname follows naming rule as out mysql, postgres and http
// protocol. The server treat dbname in priority of catalog/schema.
dbname: String,
// The time zone indicates the time zone where the user is located.
// Some queries need to be aware of the user's time zone to perform some specific actions.
timezone: String,

client: Client,
ctx: FlightContext,
Expand All @@ -58,7 +61,8 @@ impl Database {
Self {
catalog: catalog.into(),
schema: schema.into(),
dbname: "".to_string(),
dbname: String::default(),
timezone: String::default(),
client,
ctx: FlightContext::default(),
}
Expand All @@ -73,8 +77,9 @@ impl Database {
/// environment
pub fn new_with_dbname(dbname: impl Into<String>, client: Client) -> Self {
Self {
catalog: "".to_string(),
schema: "".to_string(),
catalog: String::default(),
schema: String::default(),
timezone: String::default(),
dbname: dbname.into(),
client,
ctx: FlightContext::default(),
Expand Down Expand Up @@ -105,6 +110,14 @@ impl Database {
self.dbname = dbname.into();
}

pub fn timezone(&self) -> &String {
&self.timezone
}

pub fn set_timezone(&mut self, timezone: impl Into<String>) {
self.timezone = timezone.into();
}

pub fn set_auth(&mut self, auth: AuthScheme) {
self.ctx.auth_header = Some(AuthHeader {
auth_scheme: Some(auth),
Expand Down Expand Up @@ -161,6 +174,7 @@ impl Database {
schema: self.schema.clone(),
authorization: self.ctx.auth_header.clone(),
dbname: self.dbname.clone(),
timezone: self.timezone.clone(),
// TODO(Taylor-lagrange): add client grpc tracing
tracing_context: W3cTrace::new(),
}),
Expand Down
4 changes: 2 additions & 2 deletions src/client/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,15 +230,15 @@ mod test {
let result = check_response_header(Some(ResponseHeader {
status: Some(PbStatus {
status_code: StatusCode::Success as u32,
err_msg: "".to_string(),
err_msg: String::default(),
}),
}));
assert!(result.is_ok());

let result = check_response_header(Some(ResponseHeader {
status: Some(PbStatus {
status_code: u32::MAX,
err_msg: "".to_string(),
err_msg: String::default(),
}),
}));
assert!(matches!(
Expand Down
2 changes: 1 addition & 1 deletion src/common/base/src/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ mod tests {
let bytes = StringBytes::from(hello.clone());
assert_eq!(bytes.len(), hello.len());

let zero = "".to_string();
let zero = String::default();
let bytes = StringBytes::from(zero);
assert!(bytes.is_empty());
}
Expand Down
10 changes: 5 additions & 5 deletions src/common/grpc-expr/src/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ mod tests {
#[test]
fn test_alter_expr_to_request() {
let expr = AlterExpr {
catalog_name: "".to_string(),
schema_name: "".to_string(),
catalog_name: String::default(),
schema_name: String::default(),
table_name: "monitor".to_string(),

kind: Some(Kind::AddColumns(AddColumns {
Expand Down Expand Up @@ -186,8 +186,8 @@ mod tests {
#[test]
fn test_alter_expr_with_location_to_request() {
let expr = AlterExpr {
catalog_name: "".to_string(),
schema_name: "".to_string(),
catalog_name: String::default(),
schema_name: String::default(),
table_name: "monitor".to_string(),

kind: Some(Kind::AddColumns(AddColumns {
Expand All @@ -204,7 +204,7 @@ mod tests {
}),
location: Some(Location {
location_type: LocationType::First.into(),
after_column_name: "".to_string(),
after_column_name: String::default(),
}),
},
AddColumn {
Expand Down
2 changes: 1 addition & 1 deletion src/common/procedure/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ pub struct ManagerConfig {
impl Default for ManagerConfig {
fn default() -> Self {
Self {
parent_path: "".to_string(),
parent_path: String::default(),
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
remove_outdated_meta_task_interval: Duration::from_secs(60 * 10),
Expand Down
2 changes: 1 addition & 1 deletion src/common/query/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl From<&AddColumnLocation> for Location {
match value {
AddColumnLocation::First => Location {
location_type: LocationType::First.into(),
after_column_name: "".to_string(),
after_column_name: String::default(),
},
AddColumnLocation::After { column_name } => Location {
location_type: LocationType::After.into(),
Expand Down
8 changes: 4 additions & 4 deletions src/common/time/src/interval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ impl IntervalFormat {
return "PT0S".to_string();
}
let fract_str = match self.microseconds {
0 => "".to_string(),
0 => String::default(),
_ => format!(".{:06}", self.microseconds)
.trim_end_matches('0')
.to_string(),
Expand Down Expand Up @@ -446,7 +446,7 @@ impl IntervalFormat {
if self.is_zero() {
return "00:00:00".to_string();
}
let mut result = "".to_string();
let mut result = String::default();
if self.has_year_month() {
if self.years != 0 {
result.push_str(&format!("{} year ", self.years));
Expand All @@ -464,7 +464,7 @@ impl IntervalFormat {

/// get postgres time part(include hours, minutes, seconds, microseconds)
fn get_postgres_time_part(&self) -> String {
let mut time_part = "".to_string();
let mut time_part = String::default();
if self.has_time_part() {
let sign = if !self.has_time_part_positive() {
"-"
Expand Down Expand Up @@ -516,7 +516,7 @@ fn get_time_part(
is_time_part_positive: bool,
is_only_time: bool,
) -> String {
let mut interval = "".to_string();
let mut interval = String::default();
if is_time_part_positive && is_only_time {
interval.push_str(&format!("{}:{:02}:{:02}", hours, mins, secs));
} else {
Expand Down
10 changes: 10 additions & 0 deletions src/common/time/src/timezone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ pub fn get_timezone(tz: Option<Timezone>) -> Timezone {
})
}

#[inline(always)]
/// If the `tz = Some("") || None || Some(Invalid timezone)`, return system timezone,
/// or return parsed `tz` as timezone.
pub fn parse_timezone(tz: Option<&str>) -> Timezone {
waynexia marked this conversation as resolved.
Show resolved Hide resolved
match tz {
None | Some("") => Timezone::Named(Tz::UTC),
Some(tz) => Timezone::from_tz_string(tz).unwrap_or(Timezone::Named(Tz::UTC)),
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Timezone {
Offset(FixedOffset),
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,8 @@ pub fn check_permission(
// show create table and alter are not supported yet
Statement::ShowCreateTable(_) | Statement::CreateExternalTable(_) | Statement::Alter(_) => {
}
// set/show variable now only alter/show variable in session
Statement::SetVariables(_) | Statement::ShowVariables(_) => {}

Statement::Insert(insert) => {
validate_param(insert.table_name(), query_ctx)?;
Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/procedure/region_failover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ mod tests {
let nodes = (1..=region_distribution.len())
.map(|id| Peer {
id: id as u64,
addr: "".to_string(),
addr: String::default(),
})
.collect();
Arc::new(RandomNodeSelector { nodes })
Expand Down Expand Up @@ -742,7 +742,7 @@ mod tests {
peers: Arc::new(Mutex::new(vec![
Some(Peer {
id: 42,
addr: "".to_string(),
addr: String::default(),
}),
None,
])),
Expand Down
2 changes: 1 addition & 1 deletion src/meta-srv/src/procedure/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub mod mock {
header: Some(ResponseHeader {
status: Some(PbStatus {
status_code: 0,
err_msg: "".to_string(),
err_msg: String::default(),
}),
}),
affected_rows: 0,
Expand Down
4 changes: 2 additions & 2 deletions src/operator/src/expr_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ pub(crate) async fn create_external_expr(
catalog_name,
schema_name,
table_name,
desc: "".to_string(),
desc: String::default(),
column_defs,
time_index,
primary_keys,
Expand Down Expand Up @@ -198,7 +198,7 @@ pub fn create_to_expr(create: &CreateTable, query_ctx: QueryContextRef) -> Resul
catalog_name,
schema_name,
table_name,
desc: "".to_string(),
desc: String::default(),
column_defs: columns_to_expr(&create.columns, &time_index, &primary_keys)?,
time_index,
primary_keys,
Expand Down
49 changes: 45 additions & 4 deletions src/operator/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use common_meta::table_name::TableName;
use common_query::Output;
use common_telemetry::tracing;
use common_time::range::TimestampRange;
use common_time::Timestamp;
use common_time::{Timestamp, Timezone};
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use query::parser::QueryStatement;
use query::plan::LogicalPlan;
Expand All @@ -45,14 +45,14 @@ use sql::statements::copy::{CopyDatabaseArgument, CopyTable, CopyTableArgument};
use sql::statements::statement::Statement;
use sql::statements::OptionMap;
use sql::util::format_raw_object_name;
use sqlparser::ast::ObjectName;
use sqlparser::ast::{Expr, ObjectName, Value};
use table::engine::TableReference;
use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest};
use table::TableRef;

use crate::error::{
self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, PlanStatementSnafu,
Result, TableNotFoundSnafu,
self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, NotSupportedSnafu,
PlanStatementSnafu, Result, TableNotFoundSnafu,
};
use crate::insert::InserterRef;
use crate::statement::backup::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
Expand Down Expand Up @@ -188,6 +188,20 @@ impl StatementExecutor {
self.show_create_table(table_name, table_ref, query_ctx)
.await
}
Statement::SetVariables(set_var) => {
let var_name = set_var.variable.to_string().to_uppercase();
match var_name.as_str() {
"TIMEZONE" | "TIME_ZONE" => set_timezone(set_var.value, query_ctx)?,
_ => {
return NotSupportedSnafu {
feat: format!("Unsupported set variable {}", var_name),
}
.fail()
}
}
Ok(Output::AffectedRows(0))
}
Statement::ShowVariables(show_variable) => self.show_variable(show_variable, query_ctx),
}
}

Expand Down Expand Up @@ -228,6 +242,33 @@ impl StatementExecutor {
}
}

fn set_timezone(exprs: Vec<Expr>, ctx: QueryContextRef) -> Result<()> {
let tz_expr = exprs.first().context(NotSupportedSnafu {
feat: "No timezone find in set variable statement",
})?;
match tz_expr {
Expr::Value(Value::SingleQuotedString(tz)) | Expr::Value(Value::DoubleQuotedString(tz)) => {
match Timezone::from_tz_string(tz.as_str()) {
Ok(timezone) => ctx.set_timezone(timezone),
Err(_) => {
return NotSupportedSnafu {
feat: format!("Invalid timezone expr {} in set variable statement", tz),
}
.fail()
}
}
Ok(())
}
expr => NotSupportedSnafu {
feat: format!(
"Unsupported timezone expr {} in set variable statement",
expr
),
}
.fail(),
}
}

fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<CopyTableRequest> {
let direction = match stmt {
CopyTable::To(_) => CopyDirection::Export,
Expand Down
7 changes: 6 additions & 1 deletion src/operator/src/statement/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use session::context::QueryContextRef;
use snafu::ResultExt;
use sql::ast::{Ident, Value as SqlValue};
use sql::statements::create::{PartitionEntry, Partitions};
use sql::statements::show::{ShowDatabases, ShowTables};
use sql::statements::show::{ShowDatabases, ShowTables, ShowVariables};
use sql::{statements, MAXVALUE};
use table::TableRef;

Expand Down Expand Up @@ -71,6 +71,11 @@ impl StatementExecutor {
query::sql::show_create_table(table, partitions, query_ctx)
.context(error::ExecuteStatementSnafu)
}

#[tracing::instrument(skip_all)]
pub fn show_variable(&self, stmt: ShowVariables, query_ctx: QueryContextRef) -> Result<Output> {
query::sql::show_variable(stmt, query_ctx).context(error::ExecuteStatementSnafu)
}
}

fn create_partitions_stmt(partitions: Vec<PartitionInfo>) -> Result<Option<Partitions>> {
Expand Down