Skip to content

Commit

Permalink
feat(cubesql): Support pg_catalog.pg_statio_user_tables meta table
Browse files Browse the repository at this point in the history
  • Loading branch information
MazterQyou committed Jul 15, 2022
1 parent 7ba3148 commit a4d9050
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ mod pg_range;
mod pg_roles;
mod pg_settings;
mod pg_stat_activity;
mod pg_statio_user_tables;
mod pg_tables;
mod pg_type;
mod role_column_grants;
Expand All @@ -52,6 +53,7 @@ pub use pg_range::*;
pub use pg_roles::*;
pub use pg_settings::*;
pub use pg_stat_activity::*;
pub use pg_statio_user_tables::*;
pub use pg_tables::*;
pub use pg_type::*;
pub use role_column_grants::*;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
use std::{any::Any, sync::Arc};

use async_trait::async_trait;

use datafusion::{
arrow::{
array::{Array, ArrayRef, Int64Builder, StringBuilder, UInt32Builder},
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
},
datasource::{datasource::TableProviderFilterPushDown, TableProvider, TableType},
error::DataFusionError,
logical_plan::Expr,
physical_plan::{memory::MemoryExec, ExecutionPlan},
};

use crate::compile::CubeMetaTable;

struct PgCatalogStatioUserTablesBuilder {
relid: UInt32Builder,
schemaname: StringBuilder,
relname: StringBuilder,
heap_blks_read: Int64Builder,
heap_blks_hit: Int64Builder,
idx_blks_read: Int64Builder,
idx_blks_hit: Int64Builder,
toast_blks_read: Int64Builder,
toast_blks_hit: Int64Builder,
tidx_blks_read: Int64Builder,
tidx_blks_hit: Int64Builder,
}

impl PgCatalogStatioUserTablesBuilder {
fn new(capacity: usize) -> Self {
Self {
relid: UInt32Builder::new(capacity),
schemaname: StringBuilder::new(capacity),
relname: StringBuilder::new(capacity),
heap_blks_read: Int64Builder::new(capacity),
heap_blks_hit: Int64Builder::new(capacity),
idx_blks_read: Int64Builder::new(capacity),
idx_blks_hit: Int64Builder::new(capacity),
toast_blks_read: Int64Builder::new(capacity),
toast_blks_hit: Int64Builder::new(capacity),
tidx_blks_read: Int64Builder::new(capacity),
tidx_blks_hit: Int64Builder::new(capacity),
}
}

fn add_table(&mut self, relid: u32, schemaname: impl AsRef<str>, relname: impl AsRef<str>) {
self.relid.append_value(relid).unwrap();
self.schemaname.append_value(schemaname).unwrap();
self.relname.append_value(relname).unwrap();
self.heap_blks_read.append_value(0).unwrap();
self.heap_blks_hit.append_value(0).unwrap();
self.idx_blks_read.append_value(0).unwrap();
self.idx_blks_hit.append_value(0).unwrap();
self.toast_blks_read.append_null().unwrap();
self.toast_blks_hit.append_null().unwrap();
self.tidx_blks_read.append_null().unwrap();
self.tidx_blks_hit.append_null().unwrap();
}

fn finish(mut self) -> Vec<Arc<dyn Array>> {
let mut columns: Vec<Arc<dyn Array>> = vec![];

columns.push(Arc::new(self.relid.finish()));
columns.push(Arc::new(self.schemaname.finish()));
columns.push(Arc::new(self.relname.finish()));
columns.push(Arc::new(self.heap_blks_read.finish()));
columns.push(Arc::new(self.heap_blks_hit.finish()));
columns.push(Arc::new(self.idx_blks_read.finish()));
columns.push(Arc::new(self.idx_blks_hit.finish()));
columns.push(Arc::new(self.toast_blks_read.finish()));
columns.push(Arc::new(self.toast_blks_hit.finish()));
columns.push(Arc::new(self.tidx_blks_read.finish()));
columns.push(Arc::new(self.tidx_blks_hit.finish()));

columns
}
}

pub struct PgCatalogStatioUserTablesProvider {
data: Arc<Vec<ArrayRef>>,
}

impl PgCatalogStatioUserTablesProvider {
pub fn new(cube_tables: &Vec<CubeMetaTable>) -> Self {
let mut builder = PgCatalogStatioUserTablesBuilder::new(cube_tables.len());

for table in cube_tables.iter() {
builder.add_table(table.oid, "public", &table.name);
}

Self {
data: Arc::new(builder.finish()),
}
}
}

#[async_trait]
impl TableProvider for PgCatalogStatioUserTablesProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn table_type(&self) -> TableType {
TableType::View
}

fn schema(&self) -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("relid", DataType::UInt32, false),
Field::new("schemaname", DataType::Utf8, false),
Field::new("relname", DataType::Utf8, false),
Field::new("heap_blks_read", DataType::Int64, false),
Field::new("heap_blks_hit", DataType::Int64, false),
Field::new("idx_blks_read", DataType::Int64, false),
Field::new("idx_blks_hit", DataType::Int64, false),
Field::new("toast_blks_read", DataType::Int64, true),
Field::new("toast_blks_hit", DataType::Int64, true),
Field::new("tidx_blks_read", DataType::Int64, true),
Field::new("tidx_blks_hit", DataType::Int64, true),
]))
}

async fn scan(
&self,
projection: &Option<Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
let batch = RecordBatch::try_new(self.schema(), self.data.to_vec())?;

Ok(Arc::new(MemoryExec::try_new(
&[vec![batch]],
self.schema(),
projection.clone(),
)?))
}

fn supports_filter_pushdown(
&self,
_filter: &Expr,
) -> Result<TableProviderFilterPushDown, DataFusionError> {
Ok(TableProviderFilterPushDown::Unsupported)
}
}
10 changes: 9 additions & 1 deletion rust/cubesql/cubesql/src/compile/engine/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ use super::information_schema::postgres::{
PgCatalogDescriptionProvider, PgCatalogEnumProvider, PgCatalogIndexProvider,
PgCatalogMatviewsProvider, PgCatalogNamespaceProvider, PgCatalogProcProvider,
PgCatalogRangeProvider, PgCatalogRolesProvider, PgCatalogSettingsProvider,
PgCatalogStatActivityProvider, PgCatalogTableProvider, PgCatalogTypeProvider,
PgCatalogStatActivityProvider, PgCatalogStatioUserTablesProvider, PgCatalogTableProvider,
PgCatalogTypeProvider,
};

#[derive(Clone)]
Expand Down Expand Up @@ -317,6 +318,8 @@ impl DatabaseProtocol {
"pg_catalog.pg_roles".to_string()
} else if let Some(_) = any.downcast_ref::<PgCatalogStatActivityProvider>() {
"pg_catalog.pg_stat_activity".to_string()
} else if let Some(_) = any.downcast_ref::<PgCatalogStatioUserTablesProvider>() {
"pg_catalog.pg_statio_user_tables".to_string()
} else if let Some(_) = any.downcast_ref::<PostgresSchemaConstraintColumnUsageProvider>() {
"information_schema.constraint_column_usage".to_string()
} else if let Some(_) = any.downcast_ref::<PostgresSchemaViewsProvider>() {
Expand Down Expand Up @@ -479,6 +482,11 @@ impl DatabaseProtocol {
context.sessions.clone(),
)))
}
"pg_statio_user_tables" => {
return Some(Arc::new(PgCatalogStatioUserTablesProvider::new(
&context.meta.tables,
)))
}
_ => return None,
},
_ => return None,
Expand Down
14 changes: 14 additions & 0 deletions rust/cubesql/cubesql/src/compile/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7428,6 +7428,20 @@ ORDER BY \"COUNT(count)\" DESC"
Ok(())
}

#[tokio::test]
async fn test_pgcatalog_pgstatiousertables_postgres() -> Result<(), CubeError> {
insta::assert_snapshot!(
"pgcatalog_pgstatiousertables_postgres",
execute_query(
"SELECT * FROM pg_catalog.pg_statio_user_tables ORDER BY relid ASC".to_string(),
DatabaseProtocol::PostgreSQL
)
.await?
);

Ok(())
}

#[tokio::test]
async fn test_constraint_column_usage_postgres() -> Result<(), CubeError> {
insta::assert_snapshot!(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
source: cubesql/src/compile/mod.rs
expression: "execute_query(\"SELECT * FROM pg_catalog.pg_statio_user_tables ORDER BY relid ASC\".to_string(),\n DatabaseProtocol::PostgreSQL).await?"
---
+-------+------------+---------------------------+----------------+---------------+---------------+--------------+-----------------+----------------+----------------+---------------+
| relid | schemaname | relname | heap_blks_read | heap_blks_hit | idx_blks_read | idx_blks_hit | toast_blks_read | toast_blks_hit | tidx_blks_read | tidx_blks_hit |
+-------+------------+---------------------------+----------------+---------------+---------------+--------------+-----------------+----------------+----------------+---------------+
| 18000 | public | KibanaSampleDataEcommerce | 0 | 0 | 0 | 0 | NULL | NULL | NULL | NULL |
| 18013 | public | Logs | 0 | 0 | 0 | 0 | NULL | NULL | NULL | NULL |
+-------+------------+---------------------------+----------------+---------------+---------------+--------------+-----------------+----------------+----------------+---------------+

0 comments on commit a4d9050

Please sign in to comment.