Skip to content

Commit

Permalink
feat(cubesql): PowerBI basic queries support
Browse files Browse the repository at this point in the history
  • Loading branch information
paveltiunov committed May 27, 2022
1 parent 38c2b2e commit 455ae07
Show file tree
Hide file tree
Showing 9 changed files with 689 additions and 196 deletions.
1 change: 1 addition & 0 deletions packages/cubejs-backend-native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/cubesql/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/cubesql/cubesql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ comfy-table = "4.1.1"
bitflags = "1.3.2"
egg = "0.7.1"
paste = "1.0.6"
csv = "1.1.6"

[dev-dependencies]
pretty_assertions = "1.0.0"
Expand Down
28 changes: 28 additions & 0 deletions rust/cubesql/cubesql/src/compile/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3683,6 +3683,34 @@ ORDER BY \"COUNT(count)\" DESC"
);
}

#[test]
fn power_bi_dimension_only() {
init_logger();

let query_plan = convert_select_to_query_plan(
"select \"_\".\"customer_gender\"\r\nfrom \r\n(\r\n select \"rows\".\"customer_gender\" as \"customer_gender\"\r\n from \r\n (\r\n select \"customer_gender\"\r\n from \"public\".\"KibanaSampleDataEcommerce\" \"$Table\"\r\n ) \"rows\"\r\n group by \"customer_gender\"\r\n) \"_\"\r\norder by \"_\".\"customer_gender\"\r\nlimit 1001".to_string(),
DatabaseProtocol::PostgreSQL,
);

let logical_plan = query_plan.as_logical_plan();
assert_eq!(
logical_plan.find_cube_scan().request,
V1LoadRequestQuery {
measures: Some(vec![]),
segments: Some(vec![]),
dimensions: Some(vec!["KibanaSampleDataEcommerce.customer_gender".to_string()]),
time_dimensions: None,
order: Some(vec![vec![
"KibanaSampleDataEcommerce.customer_gender".to_string(),
"asc".to_string(),
],],),
limit: Some(1001),
offset: None,
filters: None,
}
);
}

#[test]
fn non_cube_filters_cast_kept() {
init_logger();
Expand Down
14 changes: 14 additions & 0 deletions rust/cubesql/cubesql/src/compile/rewrite/analysis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,20 @@ impl LogicalPlanAnalysis {

Some(vec)
}
LogicalPlanLanguage::AggregateGroupExpr(params) => {
for p in params.iter() {
vec.extend(referenced_columns(*p)?.into_iter());
}

Some(vec)
}
LogicalPlanLanguage::AggregateAggrExpr(params) => {
for p in params.iter() {
vec.extend(referenced_columns(*p)?.into_iter());
}

Some(vec)
}
_ => None,
}
}
Expand Down
9 changes: 7 additions & 2 deletions rust/cubesql/cubesql/src/compile/rewrite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ crate::plan_to_language! {
members: Vec<LogicalPlan>,
aliases: Vec<(String, String)>,
table_name: Option<String>,
target_table_name: Option<String>,
},
InnerAggregateSplitReplacer {
members: Vec<LogicalPlan>,
Expand Down Expand Up @@ -605,9 +606,13 @@ fn filter(expr: impl Display, input: impl Display) -> String {
fn column_alias_replacer(
members: impl Display,
aliases: impl Display,
cube: impl Display,
table_name: impl Display,
target_table_name: impl Display,
) -> String {
format!("(ColumnAliasReplacer {} {} {})", members, aliases, cube)
format!(
"(ColumnAliasReplacer {} {} {} {})",
members, aliases, table_name, target_table_name
)
}

fn member_replacer(members: impl Display, aliases: impl Display) -> String {
Expand Down
262 changes: 257 additions & 5 deletions rust/cubesql/cubesql/src/compile/rewrite/rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,226 @@ use crate::{
CubeError,
};
use datafusion::{logical_plan::LogicalPlan, physical_plan::planner::DefaultPhysicalPlanner};
use egg::{EGraph, Extractor, Id, Rewrite, Runner};
use std::sync::Arc;
use egg::{EGraph, Extractor, Id, IterationData, Language, Rewrite, Runner};
use itertools::Itertools;
use std::{env, ffi::OsStr, fs, io::Write, sync::Arc};

pub struct Rewriter {
graph: EGraph<LogicalPlanLanguage, LogicalPlanAnalysis>,
cube_context: Arc<CubeContext>,
}

type CubeRunner = Runner<LogicalPlanLanguage, LogicalPlanAnalysis, IterInfo>;

#[derive(Debug)]
pub struct IterDebugInfo {
svg_file: String,
formatted_egraph: String,
formatted_nodes_csv: Vec<Vec<String>>,
formatted_edges_csv: Vec<Vec<String>>,
}

impl IterDebugInfo {
pub fn format_egraph(graph: &EGraph<LogicalPlanLanguage, LogicalPlanAnalysis>) -> String {
let clusters = graph
.classes()
.map(|class| {
let node_names = class
.nodes
.iter()
.map(|n| format!("{:?}", format!("{:?}", n)))
.collect::<Vec<_>>();
let links = node_names
.iter()
.map(|n| {
format!(
" {} [shape=rect];\n {:?} -> {};\n",
n,
format!("#{}", class.id),
n
)
})
.join("\n");
let external_links = class
.nodes
.iter()
.flat_map(|n| {
n.children().iter().map(move |c| {
format!(" {:?} -> {:?};", format!("{:?}", n), format!("#{}", c))
})
})
.collect::<Vec<_>>();
(
format!(
" subgraph cluster_{} {{\
\n style=filled;\
\n color=lightgrey;\
\n node [style=filled,color=white];\
\n{}\
\n }}",
class.id, links
),
external_links,
)
})
.collect::<Vec<_>>();
format!(
"digraph Egraph {{\
\n{}\
\n{}\
}}",
clusters
.iter()
.map(|(cluster, _)| cluster.to_string())
.join("\n"),
clusters
.iter()
.map(|(_, links)| links.join("\n"))
.join("\n"),
)
}

pub fn format_nodes_csv(
graph: &EGraph<LogicalPlanLanguage, LogicalPlanAnalysis>,
iteration_id: usize,
) -> Vec<Vec<String>> {
let mut res = Vec::new();
for class in graph.classes() {
res.push(vec![
class.id.to_string(),
format!("#{}", class.id),
class.id.to_string(),
format!("<[{}.0, {}.0]>", iteration_id, iteration_id),
]);
res.extend(
class
.nodes
.iter()
.map(|n| {
vec![
format!("{:?}", n),
format!("{:?}", n),
class.id.to_string(),
format!("<[{}.0, {}.0]>", iteration_id, iteration_id),
]
})
.collect::<Vec<_>>(),
);
}
res
}

pub fn format_edges_csv(
graph: &EGraph<LogicalPlanLanguage, LogicalPlanAnalysis>,
iteration_id: usize,
) -> Vec<Vec<String>> {
let mut res = Vec::new();
for class in graph.classes() {
res.extend(
class
.nodes
.iter()
.map(|n| {
vec![
class.id.to_string(),
format!("{:?}", n),
"directed".to_string(),
format!("<[{}.0, {}.0]>", iteration_id, iteration_id),
]
})
.collect::<Vec<_>>(),
);

res.extend(
class
.nodes
.iter()
.flat_map(|n| {
n.children().iter().map(move |c| {
vec![
format!("{:?}", n),
c.to_string(),
"directed".to_string(),
format!("<[{}.0, {}.0]>", iteration_id, iteration_id),
]
})
})
.collect::<Vec<_>>(),
);
}
res
}

pub fn run_dot<S, I>(graph: String, args: I) -> Result<(), CubeError>
where
S: AsRef<OsStr>,
I: IntoIterator<Item = S>,
{
use std::process::{Command, Stdio};
let mut child = Command::new("dot")
.args(args)
.stdin(Stdio::piped())
.stdout(Stdio::null())
.spawn()?;
let stdin = child.stdin.as_mut().expect("Failed to open stdin");
write!(stdin, "{}", graph)?;
match child.wait()?.code() {
Some(0) => Ok(()),
Some(e) => Err(CubeError::internal(format!(
"dot program returned error code {}",
e
))),
None => Err(CubeError::internal(
"dot program was killed by a signal".to_string(),
)),
}
}

pub fn export_svg(&self) -> Result<(), CubeError> {
Self::run_dot(
self.formatted_egraph.to_string(),
&["-Tsvg", "-o", self.svg_file.as_str()],
)
}

fn make(runner: &CubeRunner) -> Self {
let iteration_id = runner.iterations.len();
let svg_file = format!("egraph-debug/iteration-{}.svg", iteration_id);
let formatted_egraph = Self::format_egraph(&runner.egraph);
IterDebugInfo {
svg_file,
formatted_egraph,
formatted_nodes_csv: Self::format_nodes_csv(&runner.egraph, iteration_id),
formatted_edges_csv: Self::format_edges_csv(&runner.egraph, iteration_id),
}
}
}

#[derive(Debug)]
pub struct IterInfo {
debug_info: Option<IterDebugInfo>,
}

impl IterInfo {
pub fn egraph_debug_enabled() -> bool {
env::var("CUBESQL_DEBUG_EGRAPH")
.map(|v| v.to_lowercase() == "true")
.unwrap_or(false)
}
}

impl IterationData<LogicalPlanLanguage, LogicalPlanAnalysis> for IterInfo {
fn make(runner: &CubeRunner) -> Self {
IterInfo {
debug_info: if Self::egraph_debug_enabled() {
Some(IterDebugInfo::make(runner))
} else {
None
},
}
}
}

impl Rewriter {
pub fn new(
graph: EGraph<LogicalPlanLanguage, LogicalPlanAnalysis>,
Expand All @@ -35,8 +247,8 @@ impl Rewriter {
}
}

pub fn rewrite_runner(&self) -> Runner<LogicalPlanLanguage, LogicalPlanAnalysis> {
Runner::<LogicalPlanLanguage, LogicalPlanAnalysis>::new(LogicalPlanAnalysis::new(
pub fn rewrite_runner(&self) -> CubeRunner {
CubeRunner::new(LogicalPlanAnalysis::new(
self.cube_context.clone(),
Arc::new(DefaultPhysicalPlanner::default()),
))
Expand All @@ -54,10 +266,50 @@ impl Rewriter {
let rules = self.rewrite_rules();
let runner = runner.run(rules.iter());
log::debug!("Iterations: {:?}", runner.iterations);
if IterInfo::egraph_debug_enabled() {
let _ = fs::remove_dir_all("egraph-debug");
let _ = fs::create_dir_all("egraph-debug");
let mut nodes = csv::Writer::from_path("egraph-debug/nodes.csv")
.map_err(|e| CubeError::internal(e.to_string()))?;
let mut edges = csv::Writer::from_path("egraph-debug/edges.csv")
.map_err(|e| CubeError::internal(e.to_string()))?;
nodes
.write_record(&["Id", "Label", "Cluster", "Timeset"])
.map_err(|e| CubeError::internal(e.to_string()))?;
edges
.write_record(&["Source", "Target", "Type", "Timeset"])
.map_err(|e| CubeError::internal(e.to_string()))?;
for i in runner.iterations {
i.data.debug_info.as_ref().unwrap().export_svg()?;
for node in i
.data
.debug_info
.as_ref()
.unwrap()
.formatted_nodes_csv
.iter()
{
nodes
.write_record(node)
.map_err(|e| CubeError::internal(e.to_string()))?;
}
for edge in i
.data
.debug_info
.as_ref()
.unwrap()
.formatted_edges_csv
.iter()
{
edges
.write_record(edge)
.map_err(|e| CubeError::internal(e.to_string()))?;
}
}
}
let extractor = Extractor::new(&runner.egraph, BestCubePlan);
let (_, best) = extractor.find_best(root);
let new_root = Id::from(best.as_ref().len() - 1);
//log::debug!("Egraph: {:#?}", runner.egraph);
log::debug!("Best: {:?}", best);
self.graph = runner.egraph.clone();
let converter =
Expand Down

0 comments on commit 455ae07

Please sign in to comment.