Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48359][SQL] Built-in functions for Zstd compression and decompression #46672

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -2577,6 +2577,11 @@
"message" : [
"expects %1$, %2$ and so on, but got %0$."
]
},
"ZSTD_DECOMPRESS_INPUT" : {
"message" : [
"expects valid zstd-compressed data."
]
}
},
"sqlState" : "22023"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3650,6 +3650,77 @@ object functions {
def try_aes_decrypt(input: Column, key: Column): Column =
Column.fn("try_aes_decrypt", input, key)

/**
* Returns a compressed value of `expr` using Zstandard with the specified compression `level`.
* The and default level is 3. Uses single-pass mode by default.
*
* @param input
* The binary value to compress
* @param level
* Optional integer argument that represents the compression level. The compression level controls
* the trade-off between compression speed and compression ratio. Valid values: between 1 and 22
* inclusive, where 1 means fastest but lowest compression ratio, and 22 means slowest but highest
* compression ratio. The default level is 3 if not specified
* @param streamingMode
* Optional boolean argument that represents whether to use streaming mode. If true, the function
* will compress the input in streaming mode. The default value is false.
*
* @group misc_funcs
* @since 4.0.0
*/
def zstd_compress(input: Column, level: Column, streamingMode: Column): Column =
Column.fn("zstd_compress", input, level, streamingMode)

/**
* Returns the compressed value of `input`.
*
* @see
* `org.apache.spark.sql.functions.zstd_compress(Column, Column, Column)`
*
* @group misc_funcs
* @since 4.0.0
*/
def zstd_compress(input: Column, level: Column): Column =
Column.fn("zstd_compress", input, level)

/**
* Returns the compressed value of `input`.
*
* @see
* `org.apache.spark.sql.functions.zstd_compress(Column, Column)`
*
* @group misc_funcs
* @since 4.0.0
*/
def zstd_compress(input: Column): Column =
Column.fn("zstd_compress", input)

/**
* Returns the decompressed value of `expr` using Zstandard.
* On decompression failure, it throws an exception.
*
* @param input
* The binary value to decompress
*
* @group misc_funcs
* @since 4.0.0
*/
def zstd_decompress(input: Column): Column =
Column.fn("zstd_decompress", input)

/**
* Returns the decompressed value of `expr` using Zstandard.
* On decompression failure, it returns NULL.
*
* @param input
* The binary value to decompress
*
* @group misc_funcs
* @since 4.0.0
*/
def try_zstd_decompress(input: Column): Column =
Column.fn("try_zstd_decompress", input)

/**
* Returns a sha1 hash value as a hex string of the `col`.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2914,6 +2914,26 @@ class PlanGenerationTestSuite
fn.try_aes_decrypt(fn.col("g"), fn.col("g"))
}

functionTest("zstd_compress") {
fn.zstd_compress(fn.col("g"))
}

functionTest("zstd_compress with level") {
fn.zstd_compress(fn.col("g"), fn.lit(5))
}

functionTest("zstd_compress with level streaming_mode") {
fn.zstd_compress(fn.col("g"), fn.lit(5), fn.lit(true))
}

functionTest("zstd_decompress") {
fn.zstd_decompress(fn.col("g"))
}

functionTest("try_zstd_decompress") {
fn.try_zstd_decompress(fn.col("g"))
}

functionTest("sha") {
fn.sha(fn.col("g"))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Project [try_zstd_decompress(cast(g#0 as binary)) AS try_zstd_decompress(g)#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Project [zstd_compress(cast(g#0 as binary), 3, false) AS zstd_compress(g, 3, false)#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Project [zstd_compress(cast(g#0 as binary), 5, false) AS zstd_compress(g, 5, false)#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Project [zstd_compress(cast(g#0 as binary), 5, true) AS zstd_compress(g, 5, true)#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Project [zstd_decompress(cast(g#0 as binary)) AS zstd_decompress(g)#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"common": {
"planId": "1"
},
"project": {
"input": {
"common": {
"planId": "0"
},
"localRelation": {
"schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
}
},
"expressions": [{
"unresolvedFunction": {
"functionName": "try_zstd_decompress",
"arguments": [{
"unresolvedAttribute": {
"unparsedIdentifier": "g"
}
}]
}
}]
}
}
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"common": {
"planId": "1"
},
"project": {
"input": {
"common": {
"planId": "0"
},
"localRelation": {
"schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
}
},
"expressions": [{
"unresolvedFunction": {
"functionName": "zstd_compress",
"arguments": [{
"unresolvedAttribute": {
"unparsedIdentifier": "g"
}
}]
}
}]
}
}
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"common": {
"planId": "1"
},
"project": {
"input": {
"common": {
"planId": "0"
},
"localRelation": {
"schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
}
},
"expressions": [{
"unresolvedFunction": {
"functionName": "zstd_compress",
"arguments": [{
"unresolvedAttribute": {
"unparsedIdentifier": "g"
}
}, {
"literal": {
"integer": 5
}
}]
}
}]
}
}
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"common": {
"planId": "1"
},
"project": {
"input": {
"common": {
"planId": "0"
},
"localRelation": {
"schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
}
},
"expressions": [{
"unresolvedFunction": {
"functionName": "zstd_compress",
"arguments": [{
"unresolvedAttribute": {
"unparsedIdentifier": "g"
}
}, {
"literal": {
"integer": 5
}
}, {
"literal": {
"boolean": true
}
}]
}
}]
}
}
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"common": {
"planId": "1"
},
"project": {
"input": {
"common": {
"planId": "0"
},
"localRelation": {
"schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
}
},
"expressions": [{
"unresolvedFunction": {
"functionName": "zstd_decompress",
"arguments": [{
"unresolvedAttribute": {
"unparsedIdentifier": "g"
}
}]
}
}]
}
}
Binary file not shown.
31 changes: 31 additions & 0 deletions python/pyspark/sql/connect/functions/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -3963,6 +3963,37 @@ def try_aes_decrypt(
try_aes_decrypt.__doc__ = pysparkfuncs.try_aes_decrypt.__doc__


def zstd_compress(
input: "ColumnOrName",
level: Optional["ColumnOrName"] = None,
streaming_mode: Optional["ColumnOrName"] = None,
) -> Column:
_level = lit(3) if level is None else level
_streaming_mode = lit(False) if streaming_mode is None else streaming_mode
return _invoke_function_over_columns("zstd_compress", input, _level, _streaming_mode)


zstd_compress.__doc__ = pysparkfuncs.zstd_compress.__doc__


def zstd_decompress(
input: "ColumnOrName",
) -> Column:
return _invoke_function_over_columns("zstd_decompress", input)


zstd_decompress.__doc__ = pysparkfuncs.zstd_decompress.__doc__


def try_zstd_decompress(
input: "ColumnOrName",
) -> Column:
return _invoke_function_over_columns("try_zstd_decompress", input)


try_zstd_decompress.__doc__ = pysparkfuncs.try_zstd_decompress.__doc__


def sha(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("sha", col)

Expand Down
Loading