Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: administration functions #3236

Merged
merged 17 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 8 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ etcd-client = "0.12"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "519b1d0757404c8ff1eeb2a68d29f5ade54a1752" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "96f1f0404f421ee560a4310c73c5071e49168168" }
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"
Expand Down
5 changes: 5 additions & 0 deletions src/common/function/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@ version.workspace = true
license.workspace = true

[dependencies]
api.workspace = true
arc-swap = "1.0"
async-trait.workspace = true
build-data = "0.1"
chrono-tz = "0.6"
common-error.workspace = true
common-macro.workspace = true
common-query.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
datafusion.workspace = true
datatypes.workspace = true
Expand All @@ -22,6 +26,7 @@ paste = "1.0"
session.workspace = true
snafu.workspace = true
statrs = "0.16"
table.workspace = true

[dev-dependencies]
ron = "0.7"
Expand Down
4 changes: 4 additions & 0 deletions src/common/function/src/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,20 @@ use datatypes::data_type::ConcreteDataType;
use datatypes::vectors::VectorRef;
use session::context::{QueryContextBuilder, QueryContextRef};

use crate::state::FunctionState;

/// The function execution context
#[derive(Clone)]
pub struct FunctionContext {
pub query_ctx: QueryContextRef,
pub state: Arc<FunctionState>,
}

impl Default for FunctionContext {
fn default() -> Self {
Self {
query_ctx: QueryContextBuilder::default().build(),
state: Arc::new(FunctionState::default()),
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions src/common/function/src/function_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::scalars::math::MathFunction;
use crate::scalars::numpy::NumpyFunction;
use crate::scalars::timestamp::TimestampFunction;
use crate::system::SystemFunction;
use crate::table::TableFunction;

#[derive(Default)]
pub struct FunctionRegistry {
Expand Down Expand Up @@ -74,13 +75,19 @@ impl FunctionRegistry {
pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
let function_registry = FunctionRegistry::default();

// Utility functions
MathFunction::register(&function_registry);
NumpyFunction::register(&function_registry);
TimestampFunction::register(&function_registry);
DateFunction::register(&function_registry);

// Aggregate functions
AggregateFunctions::register(&function_registry);

// System and administration functions
SystemFunction::register(&function_registry);
TableFunction::register(&function_registry);

Arc::new(function_registry)
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
// limitations under the License.

use std::sync::Arc;
use std::time::Duration;

use api::v1::meta::ProcedureStateResponse;
use async_trait::async_trait;
use common_query::error::Result;
use session::context::QueryContextRef;
use table::requests::{DeleteRequest, InsertRequest};

use crate::error::Result;

pub type AffectedRows = usize;

/// A trait for handling table mutations in `QueryEngine`.
Expand All @@ -30,6 +31,24 @@ pub trait TableMutationHandler: Send + Sync {

/// Delete rows from the table.
async fn delete(&self, request: DeleteRequest, ctx: QueryContextRef) -> Result<AffectedRows>;

/// Migrate a region from source peer to target peer, returns the procedure id if success.
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
async fn migrate_region(
&self,
region_id: u64,
from_peer: u64,
to_peer: u64,
replay_timeout: Duration,
) -> Result<String>;
}

/// A trait for handling meta service requests in `QueryEngine`.
#[async_trait]
pub trait MetaServiceHandler: Send + Sync {
/// Query the procedure' state by its id
async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse>;
}

pub type TableMutationHandlerRef = Arc<dyn TableMutationHandler>;

pub type MetaServiceHandlerRef = Arc<dyn MetaServiceHandler>;
5 changes: 4 additions & 1 deletion src/common/function/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@
// limitations under the License.

pub mod scalars;
pub mod system;
mod system;
mod table;

pub mod function;
pub mod function_registry;
pub mod handlers;
pub mod helper;
pub mod state;
14 changes: 10 additions & 4 deletions src/common/function/src/scalars/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,14 @@ use session::context::QueryContextRef;
use snafu::ResultExt;

use crate::function::{FunctionContext, FunctionRef};

/// Create a ScalarUdf from function and query context.
pub fn create_udf(func: FunctionRef, query_ctx: QueryContextRef) -> ScalarUdf {
use crate::state::FunctionState;

/// Create a ScalarUdf from function, query context and state.
pub fn create_udf(
func: FunctionRef,
query_ctx: QueryContextRef,
state: Arc<FunctionState>,
) -> ScalarUdf {
let func_cloned = func.clone();
let return_type: ReturnTypeFunction = Arc::new(move |input_types: &[ConcreteDataType]| {
Ok(Arc::new(func_cloned.return_type(input_types)?))
Expand All @@ -38,6 +43,7 @@ pub fn create_udf(func: FunctionRef, query_ctx: QueryContextRef) -> ScalarUdf {
let fun: ScalarFunctionImplementation = Arc::new(move |args: &[ColumnarValue]| {
let func_ctx = FunctionContext {
query_ctx: query_ctx.clone(),
state: state.clone(),
};

let len = args
Expand Down Expand Up @@ -101,7 +107,7 @@ mod tests {
}

// create a udf and test it again
let udf = create_udf(f.clone(), query_ctx);
let udf = create_udf(f.clone(), query_ctx, Arc::new(FunctionState::default()));

assert_eq!("test_and", udf.name);
assert_eq!(f.signature(), udf.signature);
Expand Down
25 changes: 25 additions & 0 deletions src/common/function/src/state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::handlers::{MetaServiceHandlerRef, TableMutationHandlerRef};

/// Shared state for SQL functions.
/// The handlers in state may be `None` in cli command-line or test cases.
#[derive(Clone, Default)]
pub struct FunctionState {
// The table mutation handler
pub table_mutation_handler: Option<TableMutationHandlerRef>,
// The meta service handler
pub meta_service_handler: Option<MetaServiceHandlerRef>,
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
}
10 changes: 8 additions & 2 deletions src/common/function/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod build;
pub mod version;
mod build;
mod database;
mod timezone;
mod version;

use std::sync::Arc;

use build::BuildFunction;
use database::DatabaseFunction;
use timezone::TimezoneFunction;
use version::VersionFunction;

use crate::function_registry::FunctionRegistry;
Expand All @@ -28,5 +32,7 @@ impl SystemFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register(Arc::new(BuildFunction));
registry.register(Arc::new(VersionFunction));
registry.register(Arc::new(DatabaseFunction));
registry.register(Arc::new(TimezoneFunction));
}
}
92 changes: 92 additions & 0 deletions src/common/function/src/system/database.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::{self};
use std::sync::Arc;

use common_query::error::Result;
use common_query::prelude::{Signature, Volatility};
use datatypes::prelude::{ConcreteDataType, ScalarVector};
use datatypes::vectors::{StringVector, VectorRef};

use crate::function::{Function, FunctionContext};

/// A function to return current schema name.
#[derive(Clone, Debug, Default)]
pub struct DatabaseFunction;

const NAME: &str = "database";

impl Function for DatabaseFunction {
fn name(&self) -> &str {
NAME
}

fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::string_datatype())
}

fn signature(&self) -> Signature {
Signature::uniform(0, vec![], Volatility::Immutable)
}

fn eval(&self, func_ctx: FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
let db = func_ctx.query_ctx.current_schema();

Ok(Arc::new(StringVector::from_slice(&[db])) as _)
}
}

impl fmt::Display for DatabaseFunction {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "DATABASE")
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use common_query::prelude::TypeSignature;
use session::context::QueryContextBuilder;

use super::*;
#[test]
fn test_build_function() {
let build = DatabaseFunction;
assert_eq!("database", build.name());
assert_eq!(
ConcreteDataType::string_datatype(),
build.return_type(&[]).unwrap()
);
assert!(matches!(build.signature(),
Signature {
type_signature: TypeSignature::Uniform(0, valid_types),
volatility: Volatility::Immutable
} if valid_types == vec![]
));

let query_ctx = QueryContextBuilder::default()
.current_schema("test_db".to_string())
.build();

let func_ctx = FunctionContext {
query_ctx,
..Default::default()
};
let vector = build.eval(func_ctx, &[]).unwrap();
let expect: VectorRef = Arc::new(StringVector::from(vec!["test_db"]));
assert_eq!(expect, vector);
}
}