Skip to content

Commit

Permalink
chore: use name in pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
v3g42 committed Jan 20, 2023
1 parent ee28aa3 commit 1add773
Showing 1 changed file with 13 additions and 10 deletions.
23 changes: 13 additions & 10 deletions dozer-sql/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub fn statement_to_pipeline(
query_to_pipeline(&query_name, &query, &mut pipeline, &mut pipeline_map, false)?;
};
let node = pipeline_map
.get(&query_name)
.get(&query_name.0)
.expect("query should have been initialized")
.to_owned();
Ok((pipeline, node))
Expand All @@ -53,7 +53,7 @@ fn query_to_pipeline(
processor_name: &NameOrAlias,
query: &Query,
pipeline: &mut AppPipeline,
pipeline_map: &mut HashMap<NameOrAlias, (String, PortHandle)>,
pipeline_map: &mut HashMap<String, (String, PortHandle)>,
stateful: bool,
) -> Result<(), PipelineError> {
// Attach the first pipeline if there is with clause
Expand Down Expand Up @@ -109,7 +109,7 @@ fn select_to_pipeline(
processor_name: &NameOrAlias,
select: Select,
pipeline: &mut AppPipeline,
pipeline_map: &mut HashMap<NameOrAlias, (String, PortHandle)>,
pipeline_map: &mut HashMap<String, (String, PortHandle)>,
stateful: bool,
) -> Result<(), PipelineError> {
// FROM clause
Expand All @@ -132,7 +132,7 @@ fn select_to_pipeline(

let input_names = get_input_names(&input_tables);
for (port_index, table_name) in input_names.iter().enumerate() {
if let Some((processor_name, processor_port)) = pipeline_map.get(table_name) {
if let Some((processor_name, processor_port)) = pipeline_map.get(&table_name.0) {
pipeline.connect_nodes(
processor_name,
Some(*processor_port),
Expand Down Expand Up @@ -176,7 +176,10 @@ fn select_to_pipeline(
)?;
}

pipeline_map.insert(processor_name.clone(), (gen_agg_name, DEFAULT_PORT_HANDLE));
pipeline_map.insert(
processor_name.0.clone(),
(gen_agg_name, DEFAULT_PORT_HANDLE),
);

Ok(())
}
Expand All @@ -189,7 +192,7 @@ fn select_to_pipeline(
pub fn get_input_tables(
from: &TableWithJoins,
pipeline: &mut AppPipeline,
pipeline_map: &mut HashMap<NameOrAlias, (String, PortHandle)>,
pipeline_map: &mut HashMap<String, (String, PortHandle)>,
) -> Result<IndexedTabelWithJoins, PipelineError> {
let mut input_tables = vec![];

Expand Down Expand Up @@ -220,15 +223,15 @@ pub fn get_input_names(input_tables: &IndexedTabelWithJoins) -> Vec<NameOrAlias>
}
pub fn get_entry_points(
input_tables: &IndexedTabelWithJoins,
pipeline_map: &mut HashMap<NameOrAlias, (String, PortHandle)>,
pipeline_map: &mut HashMap<String, (String, PortHandle)>,
) -> Result<Vec<PipelineEntryPoint>, PipelineError> {
let mut endpoints = vec![];

let input_names = get_input_names(input_tables);

for (input_port, table) in input_names.iter().enumerate() {
if !pipeline_map.contains_key(table) {
let name = table.0.clone();
let name = table.0.clone();
if !pipeline_map.contains_key(&name) {
endpoints.push(PipelineEntryPoint::new(
AppSourceId::new(name, None),
input_port as PortHandle,
Expand All @@ -242,7 +245,7 @@ pub fn get_entry_points(
pub fn get_from_source(
relation: &TableFactor,
pipeline: &mut AppPipeline,
pipeline_map: &mut HashMap<NameOrAlias, (String, PortHandle)>,
pipeline_map: &mut HashMap<String, (String, PortHandle)>,
) -> Result<NameOrAlias, PipelineError> {
match relation {
TableFactor::Table { name, alias, .. } => {
Expand Down

0 comments on commit 1add773

Please sign in to comment.