Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-11644: [Python][Parquet] Low-level Parquet decryption in Python #9631

Closed
wants to merge 33 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
f40ae8c
ARROW-11644: [Python][Parquet] Expose Parquet encryption headers to C…
itamarst Feb 9, 2021
6a41e21
ARROW-11644: [Python][Parquet] Expose Parquet decryption headers to C…
itamarst Feb 9, 2021
d9f5d82
ARROW-11644: [Python][Parquet] For now we're doing decryption only.
itamarst Feb 16, 2021
284b162
ARROW-11644: [Python][Parquet] Fix typo.
itamarst Feb 16, 2021
7bab699
ARROW-11644: [Python][Parquet] Sketch of Python API for decryption.
itamarst Feb 16, 2021
8369a1c
It compiles now.
itamarst Feb 18, 2021
eef1f07
Sketch of encryption, initially just for testing decryption.
itamarst Feb 26, 2021
e828a91
ARROW-11644: [Python][Parquet] Basic encryption support compiles now.
itamarst Mar 2, 2021
1ff110d
ARROW-11644: [Python][Parquet] First working round-trip test.
itamarst Mar 4, 2021
23e2feb
ARROW-11644: [Python][Parquet] Support for column keys.
itamarst Mar 12, 2021
219e581
ARROW-11644: [Python][Parquet] Test for allowing plain text files.
itamarst Mar 12, 2021
3eac091
ARROW-11644: [Python][Parquet] Test for disabling footer verification.
itamarst Mar 12, 2021
2ca0390
ARROW-11644: [Python][Parquet] Lint fixes.
itamarst Mar 15, 2021
d6fd90e
ARROW-11644: [Python][Parquet] Sketch of DecryptionKeyRetriever suppo…
itamarst Mar 29, 2021
e925794
ARROW-11644: [Python][Parquet] DecryptionKeyRetriever support for Pyt…
itamarst Mar 29, 2021
707a94e
ARROW-11644: [Python][Parquet] Just rely on C++ error handling.
itamarst Mar 29, 2021
079f635
ARROW-11644: [Python][Parquet] Fix GIL handling.
itamarst Mar 29, 2021
2bd1aba
ARROW-11644: [Python][Parquet] A test for the key retriever.
itamarst Mar 29, 2021
0d1ed15
ARROW-11644: [Python][Parquet] Lint fixes.
itamarst Mar 29, 2021
d739f52
ARROW-11644: [Python][Parquet] Enable encyrption in wheel.
itamarst Mar 29, 2021
622e21c
Merge remote-tracking branch 'upstream/master' into ARROW-11644
itamarst Mar 29, 2021
1b5b93b
ARROW-11644: [Python][Parquet] Make it compile.
itamarst Apr 1, 2021
2b35340
ARROW-11644: [Python][Parquet] Fix typo in name.
itamarst Apr 1, 2021
922681c
ARROW-11644: [Python][Parquet] Make it compile.
itamarst Apr 1, 2021
992c170
ARROW-11644: [Python][Parquet] Test for null bytes in metadata.
itamarst Apr 1, 2021
945fcef
ARROW-11644: [Python][Parquet] Support arbitrary bytes, not just UTF-…
itamarst Apr 2, 2021
1a8d801
ARROW-11644: [Python][Parquet] Always set metadata.
itamarst Apr 2, 2021
2a44cd8
ARROW-11644: [Python][Parquet] If footer key isn't set, need retriever.
itamarst Apr 2, 2021
c3ebf6d
ARROW-11644: [Python][Parquet] Fix lints.
itamarst Apr 6, 2021
2a5bd3f
Merge remote-tracking branch 'upstream/master' into ARROW-11644
itamarst Apr 6, 2021
9a5f4c7
Merge remote-tracking branch 'upstream/master' into ARROW-11644
itamarst May 6, 2021
7267bd4
ARROW-11644: [Python][Parquet] Switch to simpler pure-Cython key retr…
itamarst May 6, 2021
7cfb720
ARROW-11644: [Python][Parquet] Fix bad merge.
itamarst May 6, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions ci/scripts/python_wheel_manylinux_build.sh
Expand Up @@ -56,6 +56,7 @@ echo "=== (${PYTHON_VERSION}) Building Arrow C++ libraries ==="
: ${ARROW_MIMALLOC:=ON}
: ${ARROW_ORC:=ON}
: ${ARROW_PARQUET:=ON}
: ${PARQUET_REQUIRE_ENCRYPTION:=ON}
: ${ARROW_PLASMA:=ON}
: ${ARROW_S3:=ON}
: ${ARROW_TENSORFLOW:=ON}
Expand Down Expand Up @@ -89,6 +90,7 @@ cmake \
-DARROW_ORC=${ARROW_ORC} \
-DARROW_PACKAGE_KIND="python-wheel-manylinux${MANYLINUX_VERSION}" \
-DARROW_PARQUET=${ARROW_PARQUET} \
-DPARQUET_REQUIRE_ENCRYPTION=ON \
-DARROW_PLASMA=${ARROW_PLASMA} \
-DARROW_PYTHON=ON \
-DARROW_RPATH_ORIGIN=ON \
Expand Down
111 changes: 109 additions & 2 deletions python/pyarrow/_parquet.pxd
Expand Up @@ -18,6 +18,7 @@
# distutils: language = c++
# cython: language_level = 3

from libcpp.map cimport map as c_map
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport (CChunkedArray, CSchema, CStatus,
CTable, CMemoryPool, CBuffer,
Expand Down Expand Up @@ -336,7 +337,8 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
void disable_buffered_stream()
void set_buffer_size(int64_t buf_size)
int64_t buffer_size() const

void file_decryption_properties(
shared_ptr[FileDecryptionProperties] decryption)
CReaderProperties default_reader_properties()

cdef cppclass ArrowReaderProperties:
Expand Down Expand Up @@ -376,6 +378,9 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
Builder* encoding(const c_string& path,
ParquetEncoding encoding)
Builder* write_batch_size(int64_t batch_size)
Builder* encryption(
shared_ptr[FileEncryptionProperties] file_encryption_properties
)
shared_ptr[WriterProperties] build()

cdef cppclass ArrowWriterProperties:
Expand All @@ -393,6 +398,55 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
shared_ptr[ArrowWriterProperties] build()
c_bool support_deprecated_int96_timestamps()

cdef enum ParquetCipherType:
AES_GCM_V1 "parquet::ParquetCipher::AES_GCM_V1"
AES_GCM_CTR_V1 "parquet::ParquetCipher::AES_GCM_CTR_V1"

cdef cppclass ColumnEncryptionProperties:
cppclass Builder:
Builder(const c_string& name)
Builder(const shared_ptr[ColumnPath]& path)
Builder* key(c_string column_key)
Builder* key_metadata(const c_string& key_metadata)
Builder* key_id(const c_string& key_id)
shared_ptr[ColumnEncryptionProperties] build()
c_string column_path()
c_bool is_encrypted()
c_bool is_encrypted_with_footer_key()
c_string key()
c_string key_metadata()
WipeOutEncryptionKey()
c_bool is_utilized()
shared_ptr[ColumnEncryptionProperties] DeepClone()

cdef cppclass FileEncryptionProperties:
cppclass Builder:
Builder(const c_string& footer_key)
Builder* set_plaintext_footer()
Builder* algorithm(ParquetCipherType parquet_cipher)
Builder* footer_key_id(const c_string& key_id)
Builder* footer_key_metadata(const c_string& footer_key_metadata)
Builder* aad_prefix(const c_string& aad_prefix)
Builder* disable_aad_prefix_storage()
Builder* encrypted_columns(
const c_map[c_string, shared_ptr[
ColumnEncryptionProperties]]& encrypted_columns)
shared_ptr[FileEncryptionProperties] build()
c_bool encrypted_footer()
# EncryptionAlgorithm algorithm()
c_string footer_key()
c_string footer_key_metadata()
c_string file_aad()
shared_ptr[c_map[c_string, shared_ptr[
ColumnEncryptionProperties]]] column_encryption_properties(
const c_string& column_path)
c_bool is_utilized()
set_utilized()
WipeOutEncryptionKeys()
shared_ptr[FileEncryptionProperties] DeepClone(c_string new_aad_prefix)
c_map[c_string, shared_ptr[
ColumnEncryptionProperties]] encrypted_columns()


cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil:
cdef cppclass FileReader:
Expand Down Expand Up @@ -450,6 +504,58 @@ cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil:
const shared_ptr[const CKeyValueMetadata]& key_value_metadata,
shared_ptr[CSchema]* out)

cdef extern from "parquet/encryption/encryption.h" namespace "parquet" nogil:
cdef cppclass DecryptionKeyRetriever:
c_string GetKey(const c_string& key_metadata)

cdef cppclass IntegerKeyIdReceiver:
PutKey(uint32_t key_id, const c_string& key)
c_string GetKey(const c_string& key_metadata)

cdef cppclass StringKeyIdReceiver:
PutKey(const c_string& key_id, const c_string& key)
c_string GetKey(const c_string& key_metadata)

cdef cppclass ColumnDecryptionProperties:
cppclass Builder:
Builder(const c_string& name)
Builder(const shared_ptr[ColumnPath]& path)
Builder* key(const c_string& key)
shared_ptr[ColumnDecryptionProperties] build()
c_string column_path()
c_string key()
c_bool is_utilized()
c_bool set_utilized()
WipeOutDecryptionKey()
shared_ptr[ColumnDecryptionProperties] DeepClone()

cdef cppclass FileDecryptionProperties:
cppclass Builder:
Builder()
Builder* footer_key(const c_string footer_key)
Builder* column_keys(const c_map[c_string, shared_ptr[
ColumnDecryptionProperties]]& column_decryption_properties)
Builder* key_retriever(
const shared_ptr[DecryptionKeyRetriever]& key_retriever)
Builder* disable_footer_signature_verification()
Builder* aad_prefix(const c_string& aad_prefix)
# Builder* aad_prefix_verifier(shared_ptr[
# AADPrefixVerifier] aad_prefix_verifier]
Builder* plaintext_files_allowed()
shared_ptr[FileDecryptionProperties] build()
c_string column_key()
c_string footer_key()
c_string aad_prefix()
const shared_ptr[DecryptionKeyRetriever]& key_retriever()
c_bool check_plaintext_footer_integrity()
c_bool plaintext_files_allowed()
# shared_ptr[AADPrefixVerifier]& aad_prefix_verifier()
WipeOutDecryptionKeys()
c_bool is_utilized()
set_utilized()
shared_ptr[FileDecryptionProperties] DeepClone()


cdef extern from "parquet/arrow/schema.h" namespace "parquet::arrow" nogil:

CStatus ToParquetSchema(
Expand Down Expand Up @@ -501,7 +607,8 @@ cdef shared_ptr[WriterProperties] _create_writer_properties(
data_page_size=*,
compression_level=*,
use_byte_stream_split=*,
data_page_version=*) except *
data_page_version=*,
lowlevel_encryption_properties=*) except *


cdef shared_ptr[ArrowWriterProperties] _create_arrow_writer_properties(
Expand Down
138 changes: 128 additions & 10 deletions python/pyarrow/_parquet.pyx
Expand Up @@ -913,6 +913,68 @@ cdef ParquetCompression compression_from_name(name):
return ParquetCompression_UNCOMPRESSED


cdef cppclass PythonKeyRetriever(DecryptionKeyRetriever):
# Retrieve a decryption key from a Python function.
object pyobject

__init__(object pyobject):
this.pyobject = pyobject

c_string GetKey(const c_string& key_metadata) nogil:
with gil:
return this.pyobject(key_metadata)


cdef class LowLevelDecryptionProperties(_Weakrefable):
"""Wrapper for FileDecryptionProperties."""
cdef:
shared_ptr[FileDecryptionProperties] decryption_properties

def __cinit__(self,
footer_key=None,
# Map column name (str) to key (bytes):
column_keys=None,
# Callable that takes key metadata (bytes), returns the key
# (bytes); equivalent to DecryptionKeyRetriever in C++ API.
retrieve_key=None,
disable_footer_signature_verification=False,
plaintext_files_allowed=False):
cdef c_map[c_string, shared_ptr[
ColumnDecryptionProperties]] c_column_keys
cdef FileDecryptionProperties.Builder builder
cdef c_string column, key
cdef shared_ptr[DecryptionKeyRetriever] key_retriever

builder = FileDecryptionProperties.Builder()

if retrieve_key is not None:
if not callable(retrieve_key):
raise ValueError(
"retrieve_key %s is not callable" % (retrieve_key,))
key_retriever = shared_ptr[DecryptionKeyRetriever](
new PythonKeyRetriever(retrieve_key)
)
builder.key_retriever(key_retriever)

if footer_key is not None:
builder.footer_key(footer_key)

if column_keys is not None:
for pycolumn, key in column_keys.items():
column = pycolumn.encode("utf-8")
c_column_keys[column] = ColumnDecryptionProperties.Builder(
column).key(key).build()
builder.column_keys(c_column_keys)

if disable_footer_signature_verification:
builder.disable_footer_signature_verification()

if plaintext_files_allowed:
builder.plaintext_files_allowed()

self.decryption_properties = builder.build()


cdef class ParquetReader(_Weakrefable):
cdef:
object source
Expand All @@ -929,7 +991,8 @@ cdef class ParquetReader(_Weakrefable):

def open(self, object source not None, bint use_memory_map=True,
read_dictionary=None, FileMetaData metadata=None,
int buffer_size=0, bint pre_buffer=False):
int buffer_size=0, bint pre_buffer=False,
LowLevelDecryptionProperties lldecrypt=None):
cdef:
shared_ptr[CRandomAccessFile] rd_handle
shared_ptr[CFileMetaData] c_metadata
Expand All @@ -938,6 +1001,7 @@ cdef class ParquetReader(_Weakrefable):
default_arrow_reader_properties())
c_string path
FileReaderBuilder builder
shared_ptr[FileDecryptionProperties] decryption_properties

if metadata is not None:
c_metadata = metadata.sp_metadata
Expand All @@ -950,6 +1014,10 @@ cdef class ParquetReader(_Weakrefable):
else:
raise ValueError('Buffer size must be larger than zero')

if lldecrypt is not None:
decryption_properties = lldecrypt.decryption_properties
properties.file_decryption_properties(decryption_properties)

arrow_props.set_pre_buffer(pre_buffer)

self.source = source
Expand Down Expand Up @@ -1175,19 +1243,61 @@ cdef class ParquetReader(_Weakrefable):
return pyarrow_wrap_chunked_array(out)


cdef class LowLevelEncryptionProperties(_Weakrefable):
"""Wrapper for FileEncryptionProperties.

Not intended for direct use by Python users, currently, just wrapped to
enable testing of low-level decryption.
"""
cdef:
shared_ptr[FileEncryptionProperties] encryption_properties

def __cinit__(self, footer_key, column_keys=None, plaintext_footer=False,
footer_key_metadata=None, column_keys_metadata=None):
cdef FileEncryptionProperties.Builder* builder
cdef c_string column
cdef c_map[c_string, shared_ptr[
ColumnEncryptionProperties]] c_column_keys

builder = new FileEncryptionProperties.Builder(footer_key)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a possibility of a memory leak here? Did you consider using a unique_ptr here instead?

if footer_key_metadata is not None:
builder.footer_key_metadata(footer_key_metadata)

if column_keys is not None:
for pycolumn, key in column_keys.items():
column = pycolumn.encode("utf-8")
if column_keys_metadata is not None:
key_id = column_keys_metadata[pycolumn]
else:
key_id = column
c_column_keys[column] = ColumnEncryptionProperties.Builder(
column).key(key).key_metadata(key_id).build()
builder.encrypted_columns(c_column_keys)

if plaintext_footer:
builder.set_plaintext_footer()

self.encryption_properties = builder.build()
del builder


cdef shared_ptr[WriterProperties] _create_writer_properties(
use_dictionary=None,
compression=None,
version=None,
write_statistics=None,
data_page_size=None,
compression_level=None,
use_byte_stream_split=False,
data_page_version=None) except *:
use_dictionary=None,
compression=None,
version=None,
write_statistics=None,
data_page_size=None,
compression_level=None,
use_byte_stream_split=False,
data_page_version=None,
lowlevel_encryption_properties=None
) except *:
"""General writer properties"""
cdef:
shared_ptr[WriterProperties] properties
WriterProperties.Builder props
LowLevelEncryptionProperties llep
shared_ptr[FileEncryptionProperties] encryption_properties

# data_page_version

Expand Down Expand Up @@ -1266,6 +1376,12 @@ cdef shared_ptr[WriterProperties] _create_writer_properties(
if data_page_size is not None:
props.data_pagesize(data_page_size)

# encryption
if lowlevel_encryption_properties is not None:
llep = lowlevel_encryption_properties
encryption_properties = llep.encryption_properties
props.encryption(encryption_properties)

properties = props.build()

return properties
Expand Down Expand Up @@ -1365,6 +1481,7 @@ cdef class ParquetWriter(_Weakrefable):
use_byte_stream_split=False,
writer_engine_version=None,
data_page_version=None,
lowlevel_encryption_properties=None,
use_compliant_nested_type=False):
cdef:
shared_ptr[WriterProperties] properties
Expand All @@ -1391,7 +1508,8 @@ cdef class ParquetWriter(_Weakrefable):
data_page_size=data_page_size,
compression_level=compression_level,
use_byte_stream_split=use_byte_stream_split,
data_page_version=data_page_version
data_page_version=data_page_version,
lowlevel_encryption_properties=lowlevel_encryption_properties,
)
arrow_properties = _create_arrow_writer_properties(
use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
Expand Down
11 changes: 8 additions & 3 deletions python/pyarrow/parquet.py
Expand Up @@ -35,7 +35,8 @@
from pyarrow._parquet import (ParquetReader, Statistics, # noqa
FileMetaData, RowGroupMetaData,
ColumnChunkMetaData,
ParquetSchema, ColumnSchema)
ParquetSchema, ColumnSchema,
LowLevelDecryptionProperties)
from pyarrow.fs import (LocalFileSystem, FileSystem,
_resolve_filesystem_and_path, _ensure_filesystem)
from pyarrow import filesystem as legacyfs
Expand Down Expand Up @@ -213,15 +214,19 @@ class ParquetFile:
Coalesce and issue file reads in parallel to improve performance on
high-latency filesystems (e.g. S3). If True, Arrow will use a
background I/O thread pool.
low_level_decryption: LowLevelDecryptionProperties, default None
Low-level decryption properties. Note that a nicer high-level API
should be available eventually.
"""

def __init__(self, source, metadata=None, common_metadata=None,
read_dictionary=None, memory_map=False, buffer_size=0,
pre_buffer=False):
pre_buffer=False, low_level_decryption=None):
self.reader = ParquetReader()
self.reader.open(source, use_memory_map=memory_map,
buffer_size=buffer_size, pre_buffer=pre_buffer,
read_dictionary=read_dictionary, metadata=metadata)
read_dictionary=read_dictionary, metadata=metadata,
lldecrypt=low_level_decryption)
self.common_metadata = common_metadata
self._nested_paths_by_prefix = self._build_nested_paths()

Expand Down