Skip to content

Commit

Permalink
feat(cubesql): Support performance_schema.session_variables & global_…
Browse files Browse the repository at this point in the history
…variables
  • Loading branch information
ovr committed Dec 6, 2021
1 parent 52f683e commit a807858
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 2 deletions.
1 change: 1 addition & 0 deletions rust/cubesql/src/compile/engine/information_schema/mod.rs
@@ -1,3 +1,4 @@
pub mod columns;
pub mod statistics;
pub mod tables;
pub mod variables;
2 changes: 2 additions & 0 deletions rust/cubesql/src/compile/engine/information_schema/tables.rs
Expand Up @@ -157,6 +157,8 @@ impl InfoSchemaTableProvider {
let mut builder = InformationSchemaTablesBuilder::new();
builder.add_table("def", "information_schema", "tables");
builder.add_table("def", "information_schema", "columns");
builder.add_table("def", "performance_schema", "session_variables");
builder.add_table("def", "performance_schema", "global_variables");

for cube in cubes {
builder.add_table("def", "db", cube.name.clone());
Expand Down
80 changes: 80 additions & 0 deletions rust/cubesql/src/compile/engine/information_schema/variables.rs
@@ -0,0 +1,80 @@
use std::{any::Any, collections::HashMap, sync::Arc};

use async_trait::async_trait;
use datafusion::{
arrow::{
array::{Array, StringBuilder},
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
},
datasource::{datasource::TableProviderFilterPushDown, TableProvider, TableType},
error::DataFusionError,
logical_plan::Expr,
physical_plan::{memory::MemoryExec, ExecutionPlan},
};

pub struct PerfSchemaVariablesProvider {
variables: HashMap<String, String>,
}

impl PerfSchemaVariablesProvider {
pub fn new() -> Self {
let mut variables = HashMap::new();
variables.insert("max_allowed_packet".to_string(), "67108864".to_string());

Self { variables }
}
}

#[async_trait]
impl TableProvider for PerfSchemaVariablesProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn table_type(&self) -> TableType {
TableType::View
}

fn schema(&self) -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("VARIABLE_NAME", DataType::Utf8, false),
Field::new("VARIABLE_VALUE", DataType::Utf8, false),
]))
}

async fn scan(
&self,
projection: &Option<Vec<usize>>,
_batch_size: usize,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
let mut names = StringBuilder::new(100);
let mut values = StringBuilder::new(100);

for (key, value) in self.variables.iter() {
names.append_value(key.clone()).unwrap();
values.append_value(value.clone()).unwrap();
}

let mut data: Vec<Arc<dyn Array>> = vec![];
data.push(Arc::new(names.finish()));
data.push(Arc::new(values.finish()));

let batch = RecordBatch::try_new(self.schema(), data)?;

Ok(Arc::new(MemoryExec::try_new(
&[vec![batch]],
self.schema(),
projection.clone(),
)?))
}

fn supports_filter_pushdown(
&self,
_filter: &Expr,
) -> Result<TableProviderFilterPushDown, DataFusionError> {
Ok(TableProviderFilterPushDown::Unsupported)
}
}
10 changes: 9 additions & 1 deletion rust/cubesql/src/compile/engine/provider.rs
Expand Up @@ -10,7 +10,7 @@ use datafusion::{

use super::information_schema::{
columns::InfoSchemaColumnsProvider, statistics::InfoSchemaStatisticsProvider,
tables::InfoSchemaTableProvider,
tables::InfoSchemaTableProvider, variables::PerfSchemaVariablesProvider,
};

pub struct CubeContext<'a> {
Expand Down Expand Up @@ -55,6 +55,14 @@ impl<'a> ContextProvider for CubeContext<'a> {
if tp.eq_ignore_ascii_case("information_schema.statistics") {
return Some(Arc::new(InfoSchemaStatisticsProvider::new()));
}

if tp.eq_ignore_ascii_case("performance_schema.global_variables") {
return Some(Arc::new(PerfSchemaVariablesProvider::new()));
}

if tp.eq_ignore_ascii_case("performance_schema.session_variables") {
return Some(Arc::new(PerfSchemaVariablesProvider::new()));
}
};

None
Expand Down
29 changes: 28 additions & 1 deletion rust/cubesql/src/compile/mod.rs
Expand Up @@ -1256,7 +1256,9 @@ impl QueryPlanner {
}
};

if schema_name.to_lowercase() == "information_schema" {
if schema_name.to_lowercase() == "information_schema"
|| schema_name.to_lowercase() == "performance_schema"
{
return self.create_df_logical_plan(stmt.clone(), props);
}

Expand Down Expand Up @@ -3067,6 +3069,8 @@ mod tests {
+---------------+--------------------+---------------------------+------------+--------+---------+------------+-------------+----------------+-------------+-----------------+--------------+-----------+----------------+-------------+-------------+------------+-----------------+----------+----------------+---------------+\n\
| def | information_schema | tables | BASE TABLE | InnoDB | 10 | Dynamic | 0 | 0 | 16384 | | | | | | | | | | | |\n\
| def | information_schema | columns | BASE TABLE | InnoDB | 10 | Dynamic | 0 | 0 | 16384 | | | | | | | | | | | |\n\
| def | performance_schema | session_variables | BASE TABLE | InnoDB | 10 | Dynamic | 0 | 0 | 16384 | | | | | | | | | | | |\n\
| def | performance_schema | global_variables | BASE TABLE | InnoDB | 10 | Dynamic | 0 | 0 | 16384 | | | | | | | | | | | |\n\
| def | db | KibanaSampleDataEcommerce | BASE TABLE | InnoDB | 10 | Dynamic | 0 | 0 | 16384 | | | | | | | | | | | |\n\
| def | db | Logs | BASE TABLE | InnoDB | 10 | Dynamic | 0 | 0 | 16384 | | | | | | | | | | | |\n\
+---------------+--------------------+---------------------------+------------+--------+---------+------------+-------------+----------------+-------------+-----------------+--------------+-----------+----------------+-------------+-------------+------------+-----------------+----------+----------------+---------------+"
Expand Down Expand Up @@ -3109,4 +3113,27 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_performance_schema_variables() -> Result<(), CubeError> {
assert_eq!(
execute_df_query("SELECT * FROM performance_schema.session_variables WHERE VARIABLE_NAME = 'max_allowed_packet'".to_string()).await?,
"+--------------------+----------------+\n\
| VARIABLE_NAME | VARIABLE_VALUE |\n\
+--------------------+----------------+\n\
| max_allowed_packet | 67108864 |\n\
+--------------------+----------------+"
);

assert_eq!(
execute_df_query("SELECT * FROM performance_schema.global_variables WHERE VARIABLE_NAME = 'max_allowed_packet'".to_string()).await?,
"+--------------------+----------------+\n\
| VARIABLE_NAME | VARIABLE_VALUE |\n\
+--------------------+----------------+\n\
| max_allowed_packet | 67108864 |\n\
+--------------------+----------------+"
);

Ok(())
}
}

0 comments on commit a807858

Please sign in to comment.