Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++] Substrait join results in all zeroes on the righthand side of the join #34484

Closed
paleolimbot opened this issue Mar 7, 2023 · 10 comments
Closed
Assignees
Milestone

Comments

@paleolimbot
Copy link
Member

Describe the bug, including details regarding any error messages, version, and platform.

Very possible that there's something wrong with my plan here! Reproducer via the R bindings:

cities <- tibble::tibble(
  city = c("Halifax", "Lancaster", "Chicago"),
  country = c("Canada", "United Kingdom", "United States")
)

countries <- tibble::tibble(
  country = c("United States", "Canada", "United Kingdom", "Morroco"),
  continent = c("North America", "North America", "Europe", "Africa")
)


tmp_left <- tempfile()
tmp_right <- tempfile()
arrow::write_parquet(cities, tmp_left)
arrow::write_parquet(cities, tmp_right)

plan <- sprintf('{
  "extensionUris": [
    {
      "extensionUriAnchor": 1,
      "uri": "https://github.com/substrait-io/substrait/blob/main/extensions/functions_arithmetic.yaml"
    },
    {
      "extensionUriAnchor": 2,
      "uri": "https://github.com/substrait-io/substrait/blob/main/extensions/functions_comparison.yaml"
    }
  ],
  "extensions": [
    {
      "extensionFunction": {
        "extensionUriReference": 2,
        "functionAnchor": 3,
        "name": "equal"
      }
    }
  ],
  "relations": [
    {
      "rel": {
        "join": {
          "common": {
            "emit": {
              "outputMapping": [
                0,
                1,
                2,
                3
              ]
            }
          },
          "left": {
            "read": {
              "baseSchema": {
                "names": [
                  "city",
                  "country"
                ],
                "struct": {
                  "types": [
                    {
                      "string": {
                        "nullability": "NULLABILITY_NULLABLE"
                      }
                    },
                    {
                      "string": {
                        "nullability": "NULLABILITY_NULLABLE"
                      }
                    }
                  ]
                }
              },
              "localFiles": {
                "items": [
                  {
                    "uriFile": "file://%s",
                    "parquet": {

                    }
                  }
                ]
              }
            }
          },
          "right": {
            "read": {
              "baseSchema": {
                "names": [
                  "country",
                  "continent"
                ],
                "struct": {
                  "types": [
                    {
                      "string": {
                        "nullability": "NULLABILITY_NULLABLE"
                      }
                    },
                    {
                      "string": {
                        "nullability": "NULLABILITY_NULLABLE"
                      }
                    }
                  ]
                }
              },
              "localFiles": {
                "items": [
                  {
                    "uriFile": "file://%s",
                    "parquet": {

                    }
                  }
                ]
              }
            }
          },
          "expression": {
            "scalarFunction": {
              "functionReference": 3,
              "outputType": {
                "bool": {
                  "nullability": "NULLABILITY_NULLABLE"
                }
              },
              "arguments": [
                {
                  "value": {
                    "selection": {
                      "directReference": {
                        "structField": {
                          "field": 1
                        }
                      },
                      "rootReference": {

                      }
                    }
                  }
                },
                {
                  "value": {
                    "selection": {
                      "directReference": {
                        "structField": {
                          "field": 2
                        }
                      },
                      "rootReference": {

                      }
                    }
                  }
                }
              ]
            }
          },
          "type": "JOIN_TYPE_INNER"
        }
      }
    }
  ]
}', tmp_left, tmp_right)

arrow:::do_exec_plan_substrait(plan) |> as.data.frame()
#> # A tibble: 3 × 4
#>   `FieldPath(0)` `FieldPath(1)` `FieldPath(2)` `FieldPath(3)`
#>   <chr>          <chr>                   <int>          <int>
#> 1 Halifax        Canada                      0              0
#> 2 Lancaster      United Kingdom              0              0
#> 3 Chicago        United States               0              0

Created on 2023-03-07 with reprex v2.0.2

(The substrait producer interface I'm using to generate this):

# remotes::install_github("voltrondata/substrait-r#225")
library(substrait, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)

cities <- tibble::tibble(
  city = c("Halifax", "Lancaster", "Chicago"),
  country = c("Canada", "United Kingdom", "United States")
)

countries <- tibble::tibble(
  country = c("United States", "Canada", "United Kingdom", "Morroco"),
  continent = c("North America", "North America", "Europe", "Africa")
)

compiler <- cities |> 
  arrow_substrait_compiler() |> 
  substrait_join(countries)

# Obviously very wrong
compiler |> collect()
#> # A tibble: 3 × 4
#>   city      country.x      country.y continent
#>   <chr>     <chr>              <int>     <int>
#> 1 Halifax   Canada                 0         0
#> 2 Lancaster United Kingdom         0         0
#> 3 Chicago   United States          0         0

# Anything wrong here?
compiler$plan()
#> message of type 'substrait.Plan' with 3 fields set
#> extension_uris {
#>   extension_uri_anchor: 1
#>   uri: "https://github.com/substrait-io/substrait/blob/main/extensions/functions_arithmetic.yaml"
#> }
#> extension_uris {
#>   extension_uri_anchor: 2
#>   uri: "https://github.com/substrait-io/substrait/blob/main/extensions/functions_comparison.yaml"
#> }
#> extensions {
#>   extension_function {
#>     extension_uri_reference: 2
#>     function_anchor: 3
#>     name: "equal"
#>   }
#> }
#> relations {
#>   root {
#>     input {
#>       join {
#>         common {
#>           emit {
#>             output_mapping: 0
#>             output_mapping: 1
#>             output_mapping: 2
#>             output_mapping: 3
#>           }
#>         }
#>         left {
#>           read {
#>             base_schema {
#>               names: "city"
#>               names: "country"
#>               struct {
#>                 types {
#>                   string {
#>                     nullability: NULLABILITY_NULLABLE
#>                   }
#>                 }
#>                 types {
#>                   string {
#>                     nullability: NULLABILITY_NULLABLE
#>                   }
#>                 }
#>               }
#>             }
#>             named_table {
#>               names: "named_table_1"
#>             }
#>           }
#>         }
#>         right {
#>           read {
#>             base_schema {
#>               names: "country"
#>               names: "continent"
#>               struct {
#>                 types {
#>                   string {
#>                     nullability: NULLABILITY_NULLABLE
#>                   }
#>                 }
#>                 types {
#>                   string {
#>                     nullability: NULLABILITY_NULLABLE
#>                   }
#>                 }
#>               }
#>             }
#>             named_table {
#>               names: "named_table_2"
#>             }
#>           }
#>         }
#>         expression {
#>           scalar_function {
#>             function_reference: 3
#>             output_type {
#>               bool {
#>                 nullability: NULLABILITY_NULLABLE
#>               }
#>             }
#>             arguments {
#>               value {
#>                 selection {
#>                   direct_reference {
#>                     struct_field {
#>                       field: 1
#>                     }
#>                   }
#>                   root_reference {
#>                   }
#>                 }
#>               }
#>             }
#>             arguments {
#>               value {
#>                 selection {
#>                   direct_reference {
#>                     struct_field {
#>                       field: 2
#>                     }
#>                   }
#>                   root_reference {
#>                   }
#>                 }
#>               }
#>             }
#>           }
#>         }
#>         type: JOIN_TYPE_INNER
#>       }
#>     }
#>     names: "city"
#>     names: "country.x"
#>     names: "country.y"
#>     names: "continent"
#>   }
#> }

Created on 2023-03-07 with reprex v2.0.2

Component(s)

C++

@westonpace
Copy link
Member

Your plan is good. It seems we are doing way too much testing with in-memory tables 😰

Since you are actually scanning it is using a scan node. The scan node is obnoxiously (and incorrectly) appending the augmented fields (__fragment_index, __batch_index, __last_in_fragment, __filename).

This is a bug in the Acero's Substrait handling for scan.

@westonpace
Copy link
Member

westonpace commented Mar 7, 2023

CC @vibhatha if you have time to take a look. For now we should just hide the augmented fields entirely from Substrait. At the moment I'm thinking the simplest way to do this might be to place a project node after any scan node. The project node can then hide these fields.

Then we can wait to fix it properly until we have the new scan node.

I think a more "proper" fix would be to key on the ReadRel's baseSchema property. If the column is named __filename (or whatever) then we automatically assume they are asking for the augmented field and we deliver it. This means a user can't use a column names __filename but that seems like a reasonable workaround until we decide to introduce the concept of "augmented fields" to Substrait (I can't imagine this happening anytime soon).

@vibhatha
Copy link
Collaborator

vibhatha commented Mar 8, 2023

take

@vibhatha
Copy link
Collaborator

vibhatha commented Mar 8, 2023

CC @vibhatha if you have time to take a look. For now we should just hide the augmented fields entirely from Substrait. At the moment I'm thinking the simplest way to do this might be to place a project node after any scan node. The project node can then hide these fields.

Then we can wait to fix it properly until we have the new scan node.

I think a more "proper" fix would be to key on the ReadRel's baseSchema property. If the column is named __filename (or whatever) then we automatically assume they are asking for the augmented field and we deliver it. This means a user can't use a column names __filename but that seems like a reasonable workaround until we decide to introduce the concept of "augmented fields" to Substrait (I can't imagine this happening anytime soon).

Sure, @westonpace I will work on this.

@paleolimbot
Copy link
Member Author

It seems we are doing way too much testing with in-memory tables

The flip side of that is that we really should implement a proper TableProvider instead of writing temporary Parquet files! Glad that it at least exposed this before it was used somewhere else 😬

@EpsilonPrime
Copy link
Contributor

EpsilonPrime commented May 7, 2024

I've identified a smaller reproduction case for this bug -- a single read relation that consumes a local parquet file. This issue does not occur for named tables.

Here's a test that properly catches the issue:

TEST(Substrait, ExecReadRelWithLocalFiles) {
    ASSERT_OK_AND_ASSIGN(std::string dir_string,
                         arrow::internal::GetEnvVar("PARQUET_TEST_DATA"));

    std::string substrait_json = R"({
        "relations": [
        {
          "root": {
            "input": {
              "read": {
                "common": {
                  "direct": {}
                },
                "baseSchema": {
                  "names": [
                    "f32",
                    "f64"
                  ],
                  "struct": {
                    "types": [
                      {
                        "fp32": {
                          "nullability": "NULLABILITY_REQUIRED"
                        }
                      },
                      {
                        "fp64": {
                          "nullability": "NULLABILITY_REQUIRED"
                        }
                      }
                    ],
                    "nullability": "NULLABILITY_REQUIRED"
                  }
                },
                "localFiles": {
                  "items": [
                    {
                      "uriFile": "file://[DIRECTORY_PLACEHOLDER]/byte_stream_split.zstd.parquet",
                      "parquet": {}
                    }
                  ]
                }
              }
            },
            "names": [
              "f32",
              "f64"
            ]
          }
        }
        ],
        "version": {
        "minorNumber": 42,
        "producer": "my-producer"
        }
    })";
    const char* placeholder="[DIRECTORY_PLACEHOLDER]";
    substrait_json.replace(substrait_json.find(placeholder), strlen(placeholder), dir_string);

    ASSERT_OK_AND_ASSIGN(auto buf,
                         internal::SubstraitFromJSON("Plan", substrait_json,
                                 /*ignore_unknown_fields=*/false));

    ASSERT_OK_AND_ASSIGN(auto declarations,
                         DeserializePlans(*buf, acero::NullSinkNodeConsumer::Make));
    ASSERT_EQ(declarations.size(), 1);
    acero::Declaration* decl = &declarations[0];
    ASSERT_EQ(decl->factory_name, "consuming_sink");
    ASSERT_OK_AND_ASSIGN(auto plan, acero::ExecPlan::Make());
    ASSERT_OK_AND_ASSIGN(auto sink_node, declarations[0].AddToPlan(plan.get()));
    ASSERT_STREQ(sink_node->kind_name(), "ConsumingSinkNode");
    ASSERT_EQ(sink_node->num_inputs(), 1);
    auto& prev_node = sink_node->inputs()[0];
    ASSERT_STREQ(prev_node->kind_name(), "SourceNode");

    plan->StartProducing();
    ASSERT_FINISHES_OK(plan->finished());
}

@EpsilonPrime
Copy link
Contributor

It feels to me that augmented fields should never leave the read rel. If the contents of the augmented fields are interesting for the rest of the plan then an expression could be used in the read relation to reference those fields (thus preserving them for future consumption). With that design the processing schema additionally includes the augmented fields but the output schema includes only the normal fields.

@westonpace
Copy link
Member

It feels to me that augmented fields should never leave the read rel. If the contents of the augmented fields are interesting for the rest of the plan then an expression could be used in the read relation to reference those fields (thus preserving them for future consumption). With that design the processing schema additionally includes the augmented fields but the output schema includes only the normal fields.

I'm not entirely sure I follow. What is the "processing schema". Mentally I think of the augmented fields as fields that are always present in every file. In other words, if you create a file with one column X then you have 5 field (X plus the four augmented fields). The base schema of the file has 5 fields and you can pick which ones to include or not in the projection.

I think the issue at hand here might be better described as the fact that the producer and consumer disagree about what fields are present in the file.

zeroshade pushed a commit that referenced this issue May 14, 2024
### Rationale for this change

Augmented fields interfere with the schema passing between nodes.  When enabled they cause names/schema mismatching at the end of the plan.

### What changes are included in this PR?

Adds an option to disable augmented fields (defaulting to adding them), connects it everywhere it is called, and disables it in ReadRel conversion.

### Are these changes tested?

Yes.

### Are there any user-facing changes?

There are no API related changes however this will allow Substrait plans that consume local files to work without requiring a project/emit relation after the read relation to remove the unexpected fields.

* GitHub Issue: #34484

Authored-by: David Sisson <EpsilonPrime@users.noreply.github.com>
Signed-off-by: Matt Topol <zotthewizard@gmail.com>
@zeroshade
Copy link
Member

Issue resolved by pull request 41583
#41583

@zeroshade zeroshade added this to the 17.0.0 milestone May 14, 2024
@EpsilonPrime
Copy link
Contributor

Here's how I would implement augmented fields in Substrait:

There are some number of extra fields that can be defined on a consumer that aren't strictly part of the data. These can be referenced by accessing hidden fields known as "augmented" fields. To access them a Substrait plan can reference these fields with a new FieldReference reference_type augmented_reference. Since there is no enum or registry for augmented types we access them by name.

    oneof reference_type {
      ReferenceSegment direct_reference = 1;
      MaskExpression masked_reference = 2;
      AugmentedReference augmented_reference =3 ;
    }


message AugmentedReference {
   string reference = 1;
}

Augmented fields are only valid in the read relation (and the short chain of non-join relations (including FilterRel and most importantly ProjectRel) thereafter unless we want to define a new steps out option). If these fields need to be accessed after that point a ProjectRel should expose these fields to be preserved as normal fields for the rest of the computation.

A consumer should reject plans that ask for augmented fields that are not supported.

vibhatha pushed a commit to vibhatha/arrow that referenced this issue May 25, 2024
…apache#41583)

### Rationale for this change

Augmented fields interfere with the schema passing between nodes.  When enabled they cause names/schema mismatching at the end of the plan.

### What changes are included in this PR?

Adds an option to disable augmented fields (defaulting to adding them), connects it everywhere it is called, and disables it in ReadRel conversion.

### Are these changes tested?

Yes.

### Are there any user-facing changes?

There are no API related changes however this will allow Substrait plans that consume local files to work without requiring a project/emit relation after the read relation to remove the unexpected fields.

* GitHub Issue: apache#34484

Authored-by: David Sisson <EpsilonPrime@users.noreply.github.com>
Signed-off-by: Matt Topol <zotthewizard@gmail.com>
JerAguilon pushed a commit to JerAguilon/arrow that referenced this issue May 29, 2024
…apache#41583)

### Rationale for this change

Augmented fields interfere with the schema passing between nodes.  When enabled they cause names/schema mismatching at the end of the plan.

### What changes are included in this PR?

Adds an option to disable augmented fields (defaulting to adding them), connects it everywhere it is called, and disables it in ReadRel conversion.

### Are these changes tested?

Yes.

### Are there any user-facing changes?

There are no API related changes however this will allow Substrait plans that consume local files to work without requiring a project/emit relation after the read relation to remove the unexpected fields.

* GitHub Issue: apache#34484

Authored-by: David Sisson <EpsilonPrime@users.noreply.github.com>
Signed-off-by: Matt Topol <zotthewizard@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants