[WIP] [PROOF OF CONCEPT] [SPARK] [SQL] Collation Mode#46917
[WIP] [PROOF OF CONCEPT] [SPARK] [SQL] Collation Mode#46917GideonPotok wants to merge 10 commits intoapache:masterfrom
Conversation
latest review added checkinputdatatype to not support complex types containing nonbinary collations added checkinputdatatype to not support complex types containing nonbinary collations added struct test stuff Tests pass test structs fix scalastyle Collation Support for Mode
…essions/aggregate/Mode.scala Co-authored-by: Uros Bojanic <157381213+uros-db@users.noreply.github.com>
|
@dbatomic so do you think we should proceed with this approach? |
aeb7ac0 to
a52a5e4
Compare
| ) | ||
| extends OpenHashMap[K, V](initialCapacity) { | ||
| override def getOpenHashSet: OpenHashSet[K] = | ||
| new CollationAwareOpenHashSet[K, X](initialCapacity, 0.7, hashering, equalsFunction) |
There was a problem hiding this comment.
what's 0.7 here? since I see it in multiple lines, we should consider separating it out into one place, like a constant
| case ClassTag.Long => _data(pos) equals k | ||
| case ClassTag.Int => _data(pos) equals k | ||
| case ClassTag.Double => _data(pos) equals k | ||
| case ClassTag.Float => _data(pos) equals k |
There was a problem hiding this comment.
ClassTag.Long | ClassTag.Int | ClassTag.Double | ClassTag.Float
| case ClassTag.Int => _data(pos) equals k | ||
| case ClassTag.Double => _data(pos) equals k | ||
| case ClassTag.Float => _data(pos) equals k | ||
| case _ => nonClassTagKeyExistsAtPos(k, _data(pos)) |
There was a problem hiding this comment.
wait, since this is specialized(Long, Int, Double, Float), what else could be here in case _?
There was a problem hiding this comment.
@uros-db the specialized annotation is a performance optimization for versions of the generic class with those primitive types. But the type can still be anything else. The annotation results in the compiler generating the versions of generic classes for the specific types being specialized. The generic class can still be any other type, but boxing and unboxing will occur.
Source: https://www.scala-lang.org/api/current/scala/specialized.html
https://www.waitingforcode.com/scala-types/type-specialization-scala/read#:~:text=Scala%20uses%20the%20%40specialized%20class,classes%20for%20the%20specific%20types.
https://www.baeldung.com/scala/specialized-annotation
| @annotation.nowarn("cat=other-non-cooperative-equals") | ||
| protected def nonClassTagKeyExistsAtPos(k: T, dataAtPos: T): Boolean = { | ||
| dataAtPos equals k | ||
| } |
There was a problem hiding this comment.
related to the comment above, could you explain what this does?
There was a problem hiding this comment.
@uros-db please see comment above and let me know if we are on the same page
There was a problem hiding this comment.
are these results updated?
There was a problem hiding this comment.
@uros-db now the benchmark results are updated (jdk17 only at the moment) . Relative to each other (collations), it looks good. It is about as performant as the other approach.
@uros-db @dbatomic Which approach should we go with?
This PR:
OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1021-azure
AMD EPYC 7763 64-Core Processor
collation e2e benchmarks - mode - 10000 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------
mode df column with collation - UTF8_BINARY_LCASE 58 69 7 0.2 5757.5 1.0X
mode df column with collation - UNICODE 52 58 5 0.2 5233.2 1.1X
mode df column with collation - UTF8_BINARY 45 50 5 0.2 4462.9 1.3X
mode df column with collation - UNICODE_CI 46 50 5 0.2 4570.9 1.3X
The original approach (GroupMapReduce)[https://github.com//pull/46597]:
OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1021-azure
AMD EPYC 7763 64-Core Processor
collation e2e benchmarks - mode - 10000 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------
mode df column with collation - UTF8_BINARY_LCASE 56 68 7 0.2 5571.2 1.0X
mode df column with collation - UNICODE 47 52 5 0.2 4659.6 1.2X
mode df column with collation - UTF8_BINARY 44 48 3 0.2 4423.5 1.3X
mode df column with collation - UNICODE_CI 43 47 4 0.2 4316.9 1.3X
There was a problem hiding this comment.
if we want to proceed with this approach, I will get it out of "prototype" mode over the weekend! let me know.
There was a problem hiding this comment.
so let's revise the current approach (OHM):
- slightly less performant than GroupMapReduce
- considerably more complicated implementation
with all this in mind, I suggest we close this discussion and choose GroupMapReduce
@dbatomic final opinion?
There was a problem hiding this comment.
Ack. Thanks for this investigation and sorry for delayed response. let's continue with Group Map Reduce impl.
There was a problem hiding this comment.
@dbatomic sounds good. My pleasure and no worries.
| inputAggBufferOffset: Int = 0) | ||
| extends TypedAggregateWithHashMapAsBuffer |
There was a problem hiding this comment.
looks like an unnecessary change, but I suppose it's a prototyping leftover so I won't comment these more for now
| object CollationAwareFunctionRegistry { | ||
| def bytesToHashFunction(dataType: DataType): AnyRef => Long = { | ||
| val hashFunction = dataType match { | ||
| case s: StringType => a: AnyRef => CollationFactory.fetchCollation(s.collationId) | ||
| .hashFunction.applyAsLong(a.asInstanceOf[UTF8String]) | ||
| case nb: StructType if !UnsafeRowUtils.isBinaryStable(nb) => a: AnyRef => | ||
| a.asInstanceOf[InternalRow].toSeq(nb).zip(nb.fields.toSeq).foldLeft(0L)((acc, b) | ||
| => acc ^ bytesToHashFunction(b._2.dataType)(b._1.asInstanceOf[AnyRef])) | ||
| case _ => a: AnyRef => a.hashCode().toLong | ||
| } | ||
| hashFunction | ||
| } | ||
| def bytesToEqualFunction(dataType: DataType): (AnyRef, AnyRef) => Boolean = { | ||
| val equalFunction = dataType match { | ||
| case s: StringType => (a: AnyRef, b: AnyRef) => | ||
| a.asInstanceOf[UTF8String].semanticEquals(b.asInstanceOf[UTF8String], s.collationId) | ||
| case nb: StructType if !UnsafeRowUtils.isBinaryStable(nb) => (a: AnyRef, b: AnyRef) => | ||
| a.asInstanceOf[InternalRow].toSeq(nb) | ||
| .zip(b.asInstanceOf[InternalRow].toSeq(nb)).zipWithIndex | ||
| .forall { case ((a, b), i) => | ||
| bytesToEqualFunction( | ||
| nb.fields(i).dataType | ||
| )(a.asInstanceOf[AnyRef], b.asInstanceOf[AnyRef]) | ||
| } | ||
| case _ => (a: AnyRef, b: AnyRef) => | ||
| a.equals(b) | ||
| } | ||
| equalFunction | ||
|
|
||
| } | ||
| } |
There was a problem hiding this comment.
these look like general collations-related Util methods, perhaps we should place this elsewhere (for example, into CollationFactory)
### What changes were proposed in this pull request? [SPARK-47353](https://issues.apache.org/jira/browse/SPARK-47353) #### Pull requests [Scala TreeMap (RB Tree)](#46404) [GroupMapReduce](#46526) <- Most performant [GroupMapReduce (Cleaned up) (This PR)](#46597) <- Most performant [Comparing Experimental Approaches ](#46488) https://github.com/apache/spark/pull/46597/files#r1626058908 -> #46917 (comment) #### Central Change to Mode `eval` Algorithm: - Update to `eval` Method: The `eval` method now checks if the column being looked at is string with non-default collation and if so, uses a grouping ``` buff.toSeq.groupMapReduce { case (key: String, _) => CollationFactory.getCollationKey(UTF8String.fromString(key), collationId) case (key: UTF8String, _) => CollationFactory.getCollationKey(key, collationId) case (key, _) => key }(x => x)((x, y) => (x._1, x._2 + y._2)).values ``` #### Minor Change to Mode: - Introduction of `collationId`: A new lazy value `collationId` is computed from the `dataType` of the `child` expression, used to fetch the appropriate collation comparator when `collationEnabled` is true. This PR will fail for complex types containing collated strings Follow up PR will implement that #### Unit Test Enhancements: Significant additions to `CollationStringExpressionsSuite` to test new functionality including: - Tests for the `Mode` function when handling strings with different collation settings. #### Benchmark Updates: - Enhanced the `CollationBenchmark` classes to include benchmarks for the new mode functionality with and without collation settings, as well as numerical types. ### Why are the changes needed? 1. Ensures consistency in handling string comparisons under various collation settings. 2. Improves global usability by enabling compatibility with different collation standards. ### Does this PR introduce _any_ user-facing change? Yes, this PR introduces the following user-facing changes: 1. Adds a new `collationEnabled` property to the `Mode` expression. 2. Users can now specify collation settings for the `Mode` expression to customize its behavior. ### How was this patch tested? This patch was tested through a combination of new and existing unit and end-to-end SQL tests. 1. **Unit Tests:** - **CollationStringExpressionsSuite:** - Make the newly added tests more in the same design pattern as the existing tests - Added multiple test cases to verify that the `Mode` function correctly handles strings with different collation settings. Out of scope: Special Unicode Cases higher planes Tests do not need to include Null Handling. 3. **Benchmark Tests:** 4. **Manual Testing:** ``` ./build/mvn -DskipTests clean package export SPARK_HOME=/Users/gideon/repos/spark $SPARK_HOME/bin/spark-shell spark.sqlContext.setConf("spark.sql.collation.enabled", "true") import org.apache.spark.sql.types.StringType import org.apache.spark.sql.functions import spark.implicits._ val data = Seq(("Def"), ("def"), ("DEF"), ("abc"), ("abc")) val df = data.toDF("word") val dfLC = df.withColumn("word", col("word").cast(StringType("UTF8_BINARY_LCASE"))) val dfLCA = dfLC.agg(org.apache.spark.sql.functions.mode(functions.col("word")).as("count")) dfLCA.show() /* BEFORE: -----+ |count| +-----+ | abc| +-----+ AFTER: +-----+ |count| +-----+ | Def| +-----+ */ ``` 6. **Continuous Integration (CI):** - The patch passed all relevant Continuous Integration (CI) checks, including: - Unit test suite - Benchmark suite - Consider moving the new benchmark to the catalyst module ### Was this patch authored or co-authored using generative AI tooling? Nope! Closes #46597 from GideonPotok/spark_47353_3_clean. Lead-authored-by: GideonPotok <g.potok4@gmail.com> Co-authored-by: Gideon Potok <31429832+GideonPotok@users.noreply.github.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Why are the changes needed?
Does this PR introduce any user-facing change?
How was this patch tested?
Was this patch authored or co-authored using generative AI tooling?