Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python][Dataset] to_batches crash after calling 'count_rows' using dataset to read encrypted parquet #41431

Closed
RyogaWan opened this issue Apr 29, 2024 · 10 comments

Comments

@RyogaWan
Copy link

Describe the bug, including details regarding any error messages, version, and platform.

I‘m using pyarrow.dataset.dataset to read an encrypted parquet file with decryption_config set in ParquetFragmentScanOptions and use to_batches to generate a RecordBatchReader. But I find that, after calling 'count_rows', the to batches will crashed.

I also tried csv and parquet in plaintext, they can generate reader after 'count_rows'.

pyarrow version is 16.0.0

ExampleCode

import base64

import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.csv
import pyarrow.parquet as pq
import pyarrow.parquet.encryption as pe

rb = pa.record_batch([[1, 2, 3], ["a", "b", "c"], [None, 1.2, 3.2]], ["id", "name", "value"])

FOOTER_KEY = b"0123456789112345"
FOOTER_KEY_NAME = "footer_key"


class PyarrowInMemoryKmsClient(pe.KmsClient):
    def __init__(self):
        """Create an InMemoryKmsClient instance."""
        pe.KmsClient.__init__(self)
        self.master_keys_map = {
            FOOTER_KEY_NAME: FOOTER_KEY.decode("UTF-8"),
        }

    def wrap_key(self, key_bytes, master_key_identifier):
        """Not a secure cipher - the wrapped key
        is just the master key concatenated with key bytes"""
        print(f"key_bytes is {key_bytes}, {master_key_identifier=}")
        master_key_bytes = self.master_keys_map[master_key_identifier].encode(
            'utf-8')
        wrapped_key = b"".join([master_key_bytes, key_bytes])
        result = base64.b64encode(wrapped_key)
        return result

    def unwrap_key(self, wrapped_key, master_key_identifier):
        """Not a secure cipher - just extract the key from
        the wrapped key"""
        print(f"wrapped_key is {wrapped_key}, {master_key_identifier=}")
        expected_master_key = self.master_keys_map[master_key_identifier]
        decoded_wrapped_key = base64.b64decode(wrapped_key)
        master_key_bytes = decoded_wrapped_key[:16]
        decrypted_key = decoded_wrapped_key[16:]
        if (expected_master_key == master_key_bytes.decode('utf-8')):
            return decrypted_key
        raise ValueError("Incorrect master key used",
                         master_key_bytes, decrypted_key)


def kms_client_factory(kms_connection_configuration):
    return PyarrowInMemoryKmsClient()


def test_csv_dataset():
    source_csv = "./test.csv"
    with pa.csv.CSVWriter(source_csv, rb.schema) as c:
        for _ in range(6):
            c.write(rb)

    dataset = ds.dataset(source_csv, format="csv")
    reader = pa.RecordBatchReader.from_batches(dataset.schema, dataset.to_batches())

    count = 0
    for record in reader:
        count += record.num_rows

    print(dataset.count_rows())
    reader = pa.RecordBatchReader.from_batches(dataset.schema, dataset.to_batches())

    count = 0
    for record in reader:
        count += record.num_rows


def test_parquet_dataset():
    source_parquet = "./test.parquet"
    with pq.ParquetWriter(source_parquet, rb.schema) as c:
        for _ in range(6):
            c.write(rb)

    dataset = ds.dataset(source_parquet, format="parquet")
    reader = pa.RecordBatchReader.from_batches(dataset.schema, dataset.to_batches())

    count = 0
    for record in reader:
        count += record.num_rows

    print(dataset.count_rows())
    reader = pa.RecordBatchReader.from_batches(dataset.schema, dataset.to_batches())

    count = 0
    for record in reader:
        count += record.num_rows


def test_encrypted_parquet_dataset():
    source_enc_parquet = "./test.enc.parquet"
    crypt_factory = pe.CryptoFactory(kms_client_factory)
    encryption_config = pe.EncryptionConfiguration(
        footer_key=FOOTER_KEY_NAME,
        column_keys={
            FOOTER_KEY_NAME: rb.schema.names,
        },
        encryption_algorithm="AES_GCM_V1",
        data_key_length_bits=256,
    )
    kms_connection_config = pe.KmsConnectionConfig()

    with pq.ParquetWriter(source_enc_parquet, rb.schema,
                          encryption_properties=crypt_factory.file_encryption_properties(kms_connection_config,
                                                                                         encryption_config)) as c:
        for _ in range(6):
            c.write(rb)

    scan_options = ds.ParquetFragmentScanOptions(
        decryption_config=ds.ParquetDecryptionConfig(
            crypto_factory=crypt_factory, kms_connection_config=kms_connection_config,
            decryption_config=pe.DecryptionConfiguration()
        )
    )
    file_format = ds.ParquetFileFormat(default_fragment_scan_options=scan_options)

    dataset = ds.dataset(source_enc_parquet, format=file_format)
    reader = pa.RecordBatchReader.from_batches(dataset.schema, dataset.to_batches())

    count = 0
    for record in reader:
        count += record.num_rows

    print(dataset.count_rows())
    reader = pa.RecordBatchReader.from_batches(dataset.schema, dataset.to_batches())

    count = 0
    for record in reader:
        count += record.num_rows


if __name__ == '__main__':
    test_csv_dataset()
    test_parquet_dataset()
    test_encrypted_parquet_dataset()

Traceback

Traceback (most recent call last):
  File "/Users/yangyu/Library/Application Support/JetBrains/PyCharm2022.1/scratches/scratch_3.py", line 138, in <module>
    test_encrypted_parquet_dataset()
  File "/Users/yangyu/Library/Application Support/JetBrains/PyCharm2022.1/scratches/scratch_3.py", line 131, in test_encrypted_parquet_dataset
    for record in reader:
  File "pyarrow/ipc.pxi", line 666, in pyarrow.lib.RecordBatchReader.__next__
  File "pyarrow/ipc.pxi", line 700, in pyarrow.lib.RecordBatchReader.read_next_batch
  File "pyarrow/error.pxi", line 88, in pyarrow.lib.check_status
  File "pyarrow/_dataset.pyx", line 3769, in _iterator
  File "pyarrow/_dataset.pyx", line 3387, in pyarrow._dataset.TaggedRecordBatchIterator.__next__
  File "pyarrow/error.pxi", line 154, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 91, in pyarrow.lib.check_status
OSError: RowGroup is noted as encrypted but no file decryptor

Component(s)

Python

@AlenkaF
Copy link
Member

AlenkaF commented May 6, 2024

I can reproduce the issue locally.

@wgtmac @pitrou would you have time and idea on what could be the issue that both dataset.to_table() or dataset.to_batches() returns the error after calling count_rows if the parquet file is encrypted?

@wgtmac
Copy link
Member

wgtmac commented May 6, 2024

From the message OSError: RowGroup is noted as encrypted but no file decryptor, it seems that the decryption config is not correctly created. I'll take some time to investigate this as I'm not that familiar with python.

cc @tolleybot to see if you have any insight.

@wgtmac
Copy link
Member

wgtmac commented May 6, 2024

def test_encrypted_parquet_dataset():
    source_enc_parquet = "./test.enc.parquet"
    crypt_factory = pe.CryptoFactory(kms_client_factory)
    encryption_config = pe.EncryptionConfiguration(
        footer_key=FOOTER_KEY_NAME,
        column_keys={
            FOOTER_KEY_NAME: rb.schema.names,  // <-- something wrong here
        },
        encryption_algorithm="AES_GCM_V1",
        data_key_length_bits=256,
    )
    kms_connection_config = pe.KmsConnectionConfig()

@RyogaWan Could you double check the suspicious line above? It seems that we need to use COL_KEY_NAME list as in

encryption_config = pe.EncryptionConfiguration(
footer_key=FOOTER_KEY_NAME,
plaintext_footer=False,
# Use COL_KEY_NAME to encrypt `n_legs` and `animal` columns.
column_keys={
COL_KEY_NAME: ["n_legs", "animal"],
},
encryption_algorithm="AES_GCM_V1",
# requires timedelta or an assertion is raised
cache_lifetime=timedelta(minutes=5.0),
data_key_length_bits=256)

@RyogaWan
Copy link
Author

RyogaWan commented May 6, 2024

def test_encrypted_parquet_dataset():
    source_enc_parquet = "./test.enc.parquet"
    crypt_factory = pe.CryptoFactory(kms_client_factory)
    encryption_config = pe.EncryptionConfiguration(
        footer_key=FOOTER_KEY_NAME,
        column_keys={
            FOOTER_KEY_NAME: rb.schema.names,  // <-- something wrong here
        },
        encryption_algorithm="AES_GCM_V1",
        data_key_length_bits=256,
    )
    kms_connection_config = pe.KmsConnectionConfig()

@RyogaWan Could you double check the suspicious line above? It seems that we need to use COL_KEY_NAME list as in

encryption_config = pe.EncryptionConfiguration(
footer_key=FOOTER_KEY_NAME,
plaintext_footer=False,
# Use COL_KEY_NAME to encrypt `n_legs` and `animal` columns.
column_keys={
COL_KEY_NAME: ["n_legs", "animal"],
},
encryption_algorithm="AES_GCM_V1",
# requires timedelta or an assertion is raised
cache_lifetime=timedelta(minutes=5.0),
data_key_length_bits=256)

I think this is an identifier for column_keys used in KmsClient to get key to wrap or unwrap real key that encrypt dataset. In the example, I want to use same key for column and footer, so i just used FOOTER_KEY_NAME in the dict. Is there anything i‘m misunderstanding? And I apologize for the confusion I have caused.

@wgtmac
Copy link
Member

wgtmac commented May 6, 2024

You may use FOOTER_KEY_NAME as the key name, but the column names should be wrapped in the list. And please also check if the column name (i.e. rb.schema.names) is correct.

@RyogaWan
Copy link
Author

RyogaWan commented May 6, 2024

You may use FOOTER_KEY_NAME as the key name, but the column names should be wrapped in the list. And please also check if the column name (i.e. rb.schema.names) is correct.

Yes, I think it's correct. In my test, the 'to_batches' before 'count_rows' can be processing normally, but always failed after 'count_rows'.

@tolleybot
Copy link
Contributor

From the message OSError: RowGroup is noted as encrypted but no file decryptor, it seems that the decryption config is not correctly created. I'll take some time to investigate this as I'm not that familiar with python.

cc @tolleybot to see if you have any insight.

@wgtmac Sure thing

@wgtmac
Copy link
Member

wgtmac commented May 6, 2024

The error does not appear from print(dataset.count_rows()) and it successfully prints 18 on my end. The issue comes when the code snippet tries to create the reader again from the same dataset. It seems that the file decryption config cannot be reused.

@wgtmac
Copy link
Member

wgtmac commented May 6, 2024

I have found the root cause.

  1. When the dataset scanner is running for the 1st time, it will cache FileMetaData in the ParquetFileFragment:
    return SetMetadata(reader->parquet_reader()->metadata(), std::move(manifest));
  2. When the dataset creates a new scanner, the internal parquet reader will reuse the cached FileMetaData instead of parsing it from the footer:
    file->set_metadata(std::move(metadata));
  3. Because we don't parse FileMetaData again, file_decryptor_ is not created any more. (It was created in the 1st run here:
    file_decryptor_ = std::make_shared<InternalFileDecryptor>(
    )
  4. Since the file_decryptor_ is null, error is reported when we have access to encrypted data:
    throw ParquetException("RowGroup is noted as encrypted but no file decryptor");

wgtmac added a commit to wgtmac/arrow that referenced this issue May 6, 2024
wgtmac added a commit that referenced this issue May 8, 2024
…set (#41550)

### Rationale for this change

When parquet dataset is reused to create multiple scanners, `FileMetaData` objects are cached to avoid parsing them again. However, these caused issues on encrypted files since internal file decryptors were no longer created by cached `FileMetaData` objects.

### What changes are included in this PR?

Expose file_decryptor from FileMetaData and set it properly.

### Are these changes tested?

Yes, modify the test to reproduce the issue and assure fixed.

### Are there any user-facing changes?

No.
* GitHub Issue: #41431

Authored-by: Gang Wu <ustcwg@gmail.com>
Signed-off-by: Gang Wu <ustcwg@gmail.com>
@wgtmac wgtmac added this to the 17.0.0 milestone May 8, 2024
@wgtmac
Copy link
Member

wgtmac commented May 8, 2024

Issue resolved by pull request 41550
#41550

@wgtmac wgtmac closed this as completed May 8, 2024
@raulcd raulcd modified the milestones: 17.0.0, 16.1.0 May 9, 2024
raulcd pushed a commit that referenced this issue May 9, 2024
…set (#41550)

### Rationale for this change

When parquet dataset is reused to create multiple scanners, `FileMetaData` objects are cached to avoid parsing them again. However, these caused issues on encrypted files since internal file decryptors were no longer created by cached `FileMetaData` objects.

### What changes are included in this PR?

Expose file_decryptor from FileMetaData and set it properly.

### Are these changes tested?

Yes, modify the test to reproduce the issue and assure fixed.

### Are there any user-facing changes?

No.
* GitHub Issue: #41431

Authored-by: Gang Wu <ustcwg@gmail.com>
Signed-off-by: Gang Wu <ustcwg@gmail.com>
vibhatha pushed a commit to vibhatha/arrow that referenced this issue May 25, 2024
…d dataset (apache#41550)

### Rationale for this change

When parquet dataset is reused to create multiple scanners, `FileMetaData` objects are cached to avoid parsing them again. However, these caused issues on encrypted files since internal file decryptors were no longer created by cached `FileMetaData` objects.

### What changes are included in this PR?

Expose file_decryptor from FileMetaData and set it properly.

### Are these changes tested?

Yes, modify the test to reproduce the issue and assure fixed.

### Are there any user-facing changes?

No.
* GitHub Issue: apache#41431

Authored-by: Gang Wu <ustcwg@gmail.com>
Signed-off-by: Gang Wu <ustcwg@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants