Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Customize field separators #7252

Open
rui-mo opened this issue Oct 26, 2023 · 17 comments
Open

Customize field separators #7252

rui-mo opened this issue Oct 26, 2023 · 17 comments
Labels
enhancement New feature or request

Comments

@rui-mo
Copy link
Collaborator

rui-mo commented Oct 26, 2023

Description

Previously, field separators were added as parameter of Subfield with #6014. The only usage of Subfield in Gluten is to create SubfieldFilters like SubstraitToVeloxPlan.cpp#L718-L726. We found it is not enough to simply customize separators there to disable tokenizing field name by dot.
There are many usages of Subfield in Velox scan, including HiveDataSource.cpp#L762, TableHandle.cpp#L193-L194 and ScanSpec.cpp#L422 etc. We cannot control them so the default separators are still used.
To make it work, now we need to modify the default separators in a hard-coded way by changing dot to '\0'. Do you think it is possible to add a flag in Velox flags, so people can change the default content of separators?

@rui-mo rui-mo added the enhancement New feature or request label Oct 26, 2023
@rui-mo
Copy link
Collaborator Author

rui-mo commented Oct 26, 2023

@Yuhta Could you please take a look at this issue? Thanks.

@Yuhta
Copy link
Contributor

Yuhta commented Oct 26, 2023

I think it can be a part of the connector config or query config. Is it different from query to query, or is it depending on the connector?

@rui-mo
Copy link
Collaborator Author

rui-mo commented Oct 27, 2023

In Spark, it does not vary from query to query. Field name containing dot is allowed.
apache/spark@79826f3

@Yuhta
Copy link
Contributor

Yuhta commented Oct 27, 2023

I see so it is specific to certain files, I would put it as a connector config and set that in all connectors that reading these files

@Yuhta
Copy link
Contributor

Yuhta commented Oct 27, 2023

When you generates subfield prunings, you will need to use the customized separator though

@rui-mo
Copy link
Collaborator Author

rui-mo commented Oct 31, 2023

@Yuhta Thank you. I will check how to add this config.

@zhli1142015
Copy link
Contributor

Hello @rui-mo
We encounter the same issues. I want to check is there any update for this item?
Thanks.

@rui-mo
Copy link
Collaborator Author

rui-mo commented Nov 7, 2023

@zhli1142015 No update from my side recently. I plan to try a config as suggested when I get free time.
Please feel free to contribute if you want, thanks.

@zhli1142015
Copy link
Contributor

Sure, let me work on this.

@zhli1142015
Copy link
Contributor

zhli1142015 commented Nov 8, 2023

Hello @rui-mo and @Yuhta
I did some investigation for this case, looks the dot is not the only difference. From below sample, we can observe '.', '', '*', '^', '[', ']', '"' are not separators for spark. I think we may need to provide a custom Tokenizer for spark:
Add static std::unique_ptr<Tokenizer> getTokenizer(std::string path); and static void registerTokenizerFactory(std::function<std::unique_ptr<Tokenizer>(std::string)> tokenizerFactory); methods to allow Gluten to plugin spark custom Tokenizer to velox.
Do you think if this is ok?
Thanks.
Sample:

spark.read.json(
"""
          {"a": {"b''1*^[]\\#\"": 1, "c.e": 'random'}, "d": 1}
          {"a": {"b''1*^[]\\#\"": 3, "c.e": 'string'}, "d": 2}"""
        .split("\n").toSeq.toDS()).write.format("parquet").saveAsTable("`target23`")
 val df = spark.sql("select * from `target23`")
df.show(false)
df.explain(true)

Spark output:

+-----------+---+
|a          |d  |
+-----------+---+
|{1, random}|1  |
|{3, string}|2  |
+-----------+---+

== Parsed Logical Plan ==
'Project [*]
+- 'UnresolvedRelation [target23], [], false

== Analyzed Logical Plan ==
a: struct<b''1*^[]\#":bigint,c.e:string>, d: bigint
Project [a#122, d#123L]
+- SubqueryAlias spark_catalog.default.target23
   +- Relation default.target23[a#122,d#123L] parquet

== Optimized Logical Plan ==
Relation default.target23[a#122,d#123L] parquet

== Physical Plan ==
FileScan parquet default.target23[a#122,d#123L] Batched: false, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[abfss://tpcds@veloxscan.dfs.core.windows.net/synapse/workspaces/veloxs..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:struct<**b''1*^[]\#"**:bigint,c.e:string>,d:bigint>

df: org.apache.spark.sql.DataFrame = [a: struct<**b''1*^[]\#"**: bigint, c.e: string>, d: bigint]

Gluten-Velox output:

Plan Node: 
-- TableScan[table: hive_table] -> n0_0:ROW<"b''1*^[]\#""":BIGINT,"c.e":VARCHAR>, n0_1:BIGINT

java.lang.RuntimeException: Exception: VeloxRuntimeError
Error Source: RUNTIME
Error Code: UNREACHABLE_CODE
Retriable: False
Function: computeNext
File: /home/velox/velox/type/Tokenizer.cpp
Line: 81
Stack trace:
# 0  _ZN8facebook5velox7process10StackTraceC1Ei
# 1  _ZN8facebook5velox14VeloxExceptionC1EPKcmS3_St17basic_string_viewIcSt11char_traitsIcEES7_S7_S7_bNS1_4TypeES7_
# 2  _ZN8facebook5velox6detail14veloxCheckFailINS0_17VeloxRuntimeErrorENS1_22CompileTimeEmptyStringEEEvRKNS1_18VeloxCheckFailArgsET0_
# 3  _ZN8facebook5velox6common9Tokenizer11computeNextEv
# 4  _ZN8facebook5velox6common9Tokenizer16tryToComputeNextEv
# 5  _ZN8facebook5velox6common8SubfieldC2ERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEb
# 6  _ZN8facebook5velox6common8ScanSpec8addFieldERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEj
# 7  _ZN8facebook5velox6common8ScanSpec19addFieldRecursivelyERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKNS0_4TypeEj
# 8  _ZN8facebook5velox6common8ScanSpec17addAllChildFieldsERKNS0_4TypeE
# 9  _ZN8facebook5velox6common8ScanSpec19addFieldRecursivelyERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKNS0_4TypeEj
# 10 _ZN8facebook5velox9connector4hive14HiveDataSource12makeScanSpecERKSt13unordered_mapINS0_6common8SubfieldESt10unique_ptrINS5_6FilterESt14default_deleteIS8_EESt4hashIS6_ESt8equal_toIS6_ESaISt4pairIKS6_SB_EEERKSt10shared_ptrIKNS0_7RowTypeEERKSt6vectorIPKNS2_16HiveColumnHandleESaISW_EERKST_IS6_SaIS6_EEPNS0_6memory10MemoryPoolE
# 11 _ZN8facebook5velox9connector4hive14HiveDataSourceC2ERKSt10shared_ptrIKNS0_7RowTypeEERKS4_INS1_20ConnectorTableHandleEERKSt13unordered_mapINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEES4_INS1_12ColumnHandleEESt4hashISK_ESt8equal_toISK_ESaISt4pairIKSK_SM_EEEPNS0_13CachedFactoryISK_S4_INS0_10FileHandleEENS0_19FileHandleGeneratorEEEPNS0_6memory10MemoryPoolEPNS0_4core19ExpressionEvaluatorEPNS14_15MemoryAllocatorERSS_PN5folly8ExecutorEPKNS0_6ConfigE
# 12 _ZN8facebook5velox9connector4hive13HiveConnector16createDataSourceERKSt10shared_ptrIKNS0_7RowTypeEERKS4_INS1_20ConnectorTableHandleEERKSt13unordered_mapINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEES4_INS1_12ColumnHandleEESt4hashISK_ESt8equal_toISK_ESaISt4pairIKSK_SM_EEEPNS1_17ConnectorQueryCtxE
# 13 _ZN8facebook5velox4exec9TableScan9getOutputEv
# 14 _ZN8facebook5velox4exec6Driver11runInternalERSt10shared_ptrIS2_ERS3_INS1_13BlockingStateEERS3_INS0_9RowVectorEE
# 15 _ZN8facebook5velox4exec6Driver4nextERSt10shared_ptrINS1_13BlockingStateEE
# 16 _ZN8facebook5velox4exec4Task4nextEPN5folly10SemiFutureINS3_4UnitEEE
# 17 _ZN6gluten24WholeStageResultIterator4nextEv
# 18 Java_io_glutenproject_vectorized_ColumnarBatchOutIterator_nativeHasNext
# 19 0x00007ff8d21c8607

	at io.glutenproject.vectorized.ColumnarBatchOutIterator.nativeHasNext(Native Method)
	at io.glutenproject.vectorized.ColumnarBatchOutIterator.hasNextInternal(ColumnarBatchOutIterator.java:61)
	at io.glutenproject.vectorized.GeneralOutIterator.hasNext(GeneralOutIterator.java:32)
	at io.glutenproject.backendsapi.velox.IteratorHandler$$anon$1.hasNext(IteratorHandler.scala:174)
	at io.glutenproject.vectorized.CloseableColumnBatchIterator.hasNext(CloseableColumnBatchIterator.scala:40)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.isEmpty(Iterator.scala:387)
	at scala.collection.Iterator.isEmpty$(Iterator.scala:387)
	at org.apache.spark.InterruptibleIterator.isEmpty(InterruptibleIterator.scala:28)
	at io.glutenproject.execution.VeloxColumnarToRowExec$.toRowIterator(VeloxColumnarToRowExec.scala:95)
	at io.glutenproject.execution.VeloxColumnarToRowExec.$anonfun$doExecuteInternal$1(VeloxColumnarToRowExec.scala:79)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:862)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:862)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:57)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:366)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:330)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:57)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:366)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:330)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:57)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:366)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:330)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

@rui-mo
Copy link
Collaborator Author

rui-mo commented Nov 9, 2023

@zhli1142015 Thank you. I agree we need to allow customized tokenizer in Velox through. One question, with SparkTokenizer, how can we decide which one to use in the code? Or should the default one be changed accordingly?

@zhli1142015
Copy link
Contributor

zhli1142015 commented Nov 9, 2023

We should only use SparkTokenizer when running spark queries and it should be registered during Gluten
CPP initialization. I don't think we need to change it dynamically.
Thanks.

@Yuhta
Copy link
Contributor

Yuhta commented Nov 20, 2023

@zhli1142015 How hard is it to make the current Tokenizer work with Spark configuration?

@zhli1142015
Copy link
Contributor

zhli1142015 commented Nov 21, 2023

Hello @Yuhta ,
I think for spark the Tokenizer is not needed or should be like the one I added for UT (below) in which path_ is returned directly.

  std::unique_ptr<Subfield::PathElement> next() override {
    if (!hasNext()) {
      VELOX_USER_FAIL("No more tokens");
    }
    state = State::kDone;
    return std::make_unique<Subfield::NestedField>(path_);
  }

If we want to make now Tokenizer works for spark, we need to pass down some config to tell it to work in spark mode, and also we need to add some if-else to disingiush the different behaviors for all separaters defined in Tokenizer. I think a plugable tokenizer may be better, as different language may require different semantics tokenizers. Please let me know if you have any comments for this.
Thanks.

@Yuhta
Copy link
Contributor

Yuhta commented Nov 21, 2023

@zhli1142015 The tokenizer is needed to pass down subfield pruning information, so spark would still need it if it wants to do subfield pruning. We need to find out some character values that can be used as field separators in Spark to do this, and fix the tokenizer to work with these customized separators.

I also see that we can take Subfields directly instead of a string in velox. Maybe Spark should not use the Subfield(const std::string&, const std::shared_ptr<Separators>&) constructor (which is very Presto specific), but construct the Subfield directly from std::vector<std::unique_ptr<PathElement>>&&. How is that feasible? @rui-mo

@yingsu00
Copy link
Collaborator

yingsu00 commented Aug 21, 2024

@rui-mo @zhli1142015 I'm recently working on similar issues and encounter this as well in Prestissimo. #10785 In my understanding, whether to allow dot"." or other special chars to be part of the column name is not a Spark vs Presto thing. How the column names can be defined is per table format specifications. For Hive, it should allow all unicode chars except dot (.) and colon (:) as specified in https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL:

Table names and column names are case insensitive but SerDe and property names are case sensitive.

  • In Hive 0.12 and earlier, only alphanumeric and underscore characters are allowed in table and column names.
  • In Hive 0.13 and later, column names can contain any Unicode character (see HIVE-6013), however, dot (.) and colon (:) yield errors on querying, so they are disallowed in Hive 1.2.0 (see HIVE-10120). Any column name that is specified within backticks (`) is treated literally. Within a backtick string, use double backticks (``) to represent a backtick character. Backtick quotation also enables the use of reserved keywords for table and column identifiers.
  • To revert to pre-0.13.0 behavior and restrict column names to alphanumeric and underscore characters, set the configuration property hive.support.quoted.identifiers to none. In this configuration, backticked names are interpreted as regular expressions. For details, see Supporting Quoted Identifiers in Column Names.

According to https://issues.apache.org/jira/browse/HIVE-10120, column names like emp.no is not allowed, even if it's backtick quotationed.

CREATE TABLE a (`emp.no` string);
select `emp.no` from a; fails with this message:

Iceberg normalize special chars into some other ascii chars (see apache/iceberg#10120) . For example, an Iceberg table with TEST:A1B2.RAW.ABC-GG-1-A column is transformed into TEST_x3AA1B2_x2ERAW_x2EABC_x2DGG_x2D1_x2DA. This behavior is different from Hive so should use specialized Tokenizer.

So it seems to me that

  1. if the table is a Hive table, having dot in the column name should yield a user error, saying dot is not allowed.
  2. Then we should create different Tokenizers like HiveTokenizer and IcebergTokenizer, and implement them according to the spec. For Hive, both behaviours (either pre0.13(only numbers or letters) or after 0.13(all unicode chars under backtick quotations)) should be supported. Column name that is specified within backticks (`) is treated literally
  3. Introduce hive.support.quoted.identifiers(https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-hive.support.quoted.identifiers) for Hive.

If it's really impossible to make the users not to use dots in the column names, may be we can force them to use it under backticks, while keeping plain dot as delimiters. e.g. task.name is a struct type column

select * from t where `task.name`.`suffix` = '.finished' and `task.name`.`prefix` = 'high_priority';

This is not conforming to Hive spec 100%, but is discussible.
What do you think? Is this reasonable?

@rui-mo
Copy link
Collaborator Author

rui-mo commented Aug 22, 2024

if the table is a Hive table, having dot in the column name should yield a user error, saying dot is not allowed.

@yingsu00 Thanks for sharing this knowledge. Spark allows dot in the column name, but as #10693 does, the child scan creation will not use Tokenizer, and Gluten can pass Subfield created from the path element to Velox so as to avoid the use of Tokenizer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants