Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48359][SQL] Built-in functions for Zstd compression and decompression #46672

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

xi-db
Copy link
Contributor

@xi-db xi-db commented May 20, 2024

What changes were proposed in this pull request?

Some users are using UDFs for Zstd compression and decompression, which results in poor performance. If we provide native functions, the performance will be improved by compressing and decompressing just within the JVM.

Now, we are introducing three new built-in functions:

zstd_compress(input: binary [, level: int [, steaming_mode: bool]])

zstd_decompress(input: binary)

try_zstd_decompress(input: binary)

where

  • input: The binary value to compress or decompress.
  • level: Optional integer argument that represents the compression level. The compression level controls the trade-off between compression speed and compression ratio. The default level is 3. Valid values: between 1 and 22 inclusive
  • streaming_mode: Optional boolean argument that represents whether to use streaming mode to compress. 

Examples:

> SELECT base64(zstd_compress(repeat("Apache Spark ", 10)));
  KLUv/SCCpQAAaEFwYWNoZSBTcGFyayABABLS+QU=
> SELECT base64(zstd_compress(repeat("Apache Spark ", 10), 3, true));
  KLUv/QBYpAAAaEFwYWNoZSBTcGFyayABABLS+QUBAAA=
> SELECT string(zstd_decompress(unbase64("KLUv/SCCpQAAaEFwYWNoZSBTcGFyayABABLS+QU=")));
  Apache Spark Apache Spark Apache Spark Apache Spark Apache Spark Apache Spark Apache Spark Apache Spark Apache Spark Apache Spark
> SELECT zstd_decompress(zstd_compress("Apache Spark"));
  Apache Spark
> SELECT try_zstd_decompress("invalid input")
  NULL

These three built-in functions are also available in Python and Scala.

Why are the changes needed?

Users no longer need to use UDFs for Zstd compression and decompression; they can directly use built-in SQL functions to run within the JVM.

Does this PR introduce any user-facing change?

Yes, three SQL functions - zstd_compress, zstd_decompress, and try_zstd_decompress are introduced.

How was this patch tested?

Added new UT and E2E tests.

Was this patch authored or co-authored using generative AI tooling?

No.

@xi-db xi-db changed the title [WIP][SPARK-48359][SQL] Built-in functions for Zstd compression and decompression [SPARK-48359][SQL] Built-in functions for Zstd compression and decompression May 20, 2024
@yaooqinn
Copy link
Member

Instead of adding (de)compression functions for different codecs, how about adding the compression and decompression directly, like,

@xi-db
Copy link
Contributor Author

xi-db commented May 21, 2024

Instead of adding (de)compression functions for different codecs, how about adding the compression and decompression directly, like,

Hi @yaooqinn, yes, that can be one way of implementing them. However, based on the following,

  • The compress methods in MySQL and SQL Server only accept one argument and users can't specify the compression algorithm or compression level. Besides, the compression algorithm used in MySQL's compress is not specified, and SQL Server only uses gzip, which is different from our cases. This may cause confusion for users who are familiar with other databases when using compress function in Apache Spark if we reuse the same name.
  • Looking at our SQL Function Reference, there is no precedent for integrating multiple algorithms into one SQL function, which might make the functions more complicated to use. Following the naming convention like aes_encrypt, url_encode and regexp_replace, this function is named zstd_compress, including the algorithm name.

Thus, the functions are named zstd_compress, zstd_decompress, and try_zstd_decompress in this PR, explicitly showing the algorithm they use, to make them simple to understand and use.

@yaooqinn
Copy link
Member

The compress methods in MySQL and SQL Server only accept one argument and users can't specify the compression algorithm or compression level. Besides, the compression algorithm used in MySQL's compress is not specified, and SQL Server only uses gzip, which is different from our cases. This may cause confusion for users who are familiar with other databases when using compress function in Apache Spark if we reuse the same name.

A parameter with a default value can achieve this. The default value can be either hard coded or configurable by session conf.

If zstd is replaced/dropped someday, we'd have to remove these functions first and cause a breaking change. I understand that it's unlikely to happen for 'zstd'. But what if we add compression functions in the same naming pattern for other existing compression codecs, will the possibility increase? And what if we add a new codec, do we need to add similar functions for self-consistency? Will it increase the maintenance cost?

Looking at our SQL Function Reference, there is no precedent for integrating multiple algorithms into one SQL function, which might make the functions more complicated to use. Following the naming convention like aes_encrypt, url_encode and regexp_replace, this function is named zstd_compress, including the algorithm name.

Most of the existing SQL functions are derived from other systems, Apache Hive, Postgres, MySQL, etc. AFAIK, Spark currently does not have such a naming convention itself, while 'supported by many other modern platforms' or 'defined in ANSI' are the rules we used mostly for adding new SQL functions

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants