In [16]:
from IPython.core.display import HTML
HTML(r"""
<style>
    * {
        font-family: monospace;
        font-size: 12px;
        line-height: normal;
    }
</style>
""")

# Import this

In [26]:
from datetime import timedelta
import base64
import json
import logging
import requests

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.parquet.encryption as pe
from pyarrow.tests.parquet.encryption import InMemoryKmsClient

logger = logging.getLogger(__name__)

# KMS Client

## Mock Client

In [18]:
# secret 128-bit AES key
footer_key = b"1234567890123450"
pii1_key = b"2345678901234501"
pii2_key = b"3456789012345012"

mock_kms_connection_config = pe.KmsConnectionConfig(
    kms_instance_id="python_client",
    kms_instance_url="https://URL1",
    key_access_token="MyToken",
    custom_kms_conf={
        "parquet_footer_key": footer_key.decode("UTF-8"),
        "confidential_encrypt_pii1": pii1_key.decode("UTF-8"),
        "confidential_encrypt_pii2": pii2_key.decode("UTF-8"),
    }
)

def mock_kms_factory(kms_connection_configuration):
    return InMemoryKmsClient(kms_connection_configuration)

In [19]:
mock_kms_client = mock_kms_factory(mock_kms_connection_config)
text1 = mock_kms_client.wrap_key(key_bytes="hello".encode(), master_key_identifier="confidential_encrypt_pii1")
text1

b'MjM0NTY3ODkwMTIzNDUwMWhlbGxv'

In [20]:
mock_kms_client.unwrap_key(wrapped_key=text1, master_key_identifier="confidential_encrypt_pii1")

b'hello'

In [21]:
mock_encryption_config = pe.EncryptionConfiguration(
    cache_lifetime=timedelta(minutes=10.0),
    column_keys={
       "confidential_encrypt_pii1": ["name"],
       "confidential_encrypt_pii2": ["birth_day"],
    },
    data_key_length_bits=128,
    double_wrapping=True,
    encryption_algorithm="AES_GCM_V1",
    footer_key="parquet_footer_key",
    plaintext_footer=True,
)
crypto_factory = pe.CryptoFactory(mock_kms_factory)
file_encryption_properties = crypto_factory.file_encryption_properties(
    mock_kms_connection_config,
    mock_encryption_config,
)

In [22]:
data = {
    'name': ["john", "bob", "alice"],
    'birth_day': ["1990-01-01", "1990-01-01", "1990-01-01"],
    'favorite_color': ["red", "purple", "navy"]
}
df = pd.DataFrame(data=data)
print(df.head())
table = pa.Table.from_pandas(df)
print(table.schema)

    name   birth_day favorite_color
0   john  1990-01-01            red
1    bob  1990-01-01         purple
2  alice  1990-01-01           navy
name: string
birth_day: string
favorite_color: string
-- schema metadata --
pandas: '{"index_columns": [{"kind": "range", "name": null, "start": 0, "' + 613


In [23]:
mock_file = "./data/mock_data.parquet"
with pq.ParquetWriter(
    where=mock_file,
    schema=table.schema,
    encryption_properties=file_encryption_properties,
) as writer:
    writer.write_table(table)

In [24]:
decryption_config = pe.DecryptionConfiguration(cache_lifetime=timedelta(minutes=10.0))
file_decryption_properties = crypto_factory.file_decryption_properties(
    mock_kms_connection_config,
    decryption_config,
)
result = pq.ParquetFile(
    source=mock_file,
    decryption_properties=file_decryption_properties,
)

In [25]:
result.read().to_pandas()

Unnamed: 0,name,birth_day,favorite_color
0,john,1990-01-01,red
1,bob,1990-01-01,purple
2,alice,1990-01-01,navy


## Vault Client

In [267]:
class VaultClient(pe.KmsClient):
    """An example of a KmsClient implementation with master keys
    managed by Hashicorp Vault KMS.
    See Vault documentation: https://www.vaultproject.io/api/secret/transit
    Not for production use!
    """
    JSON_MEDIA_TYPE = "application/json; charset=utf-8"
    DEFAULT_TRANSIT_ENGINE = "/v1/transit/"
    WRAP_ENDPOINT = "encrypt/"
    UNWRAP_ENDPOINT = "decrypt/"
    TOKEN_HEADER = "X-Vault-Token"

    def __init__(self, kms_connection_config):
        """Create a VaultClient instance.

        Parameters
        ----------
        kms_connection_config : KmsConnectionConfig
           configuration parameters to connect to vault,
           e.g. URL and access token
        """
        pe.KmsClient.__init__(self)
        self.kms_url = kms_connection_config.kms_instance_url + \
            VaultClient.DEFAULT_TRANSIT_ENGINE
        self.kms_connection_config = kms_connection_config

    def wrap_key(self, key_bytes, master_key_identifier):
        """Call Vault to wrap key key_bytes with key
        identified by master_key_identifier."""
        endpoint = self.kms_url + VaultClient.WRAP_ENDPOINT
        headers = {VaultClient.TOKEN_HEADER:
                   self.kms_connection_config.key_access_token}
        r = requests.post(endpoint + master_key_identifier,
                          headers=headers,
                          data={'plaintext': base64.b64encode(key_bytes)})
        r.raise_for_status()
        r_dict = r.json()
        wrapped_key = r_dict['data']['ciphertext']
        return wrapped_key

    def unwrap_key(self, wrapped_key, master_key_identifier):
        """Call Vault to unwrap wrapped_key with key
        identified by master_key_identifier"""
        endpoint = self.kms_url + VaultClient.UNWRAP_ENDPOINT
        headers = {VaultClient.TOKEN_HEADER:
                   self.kms_connection_config.key_access_token}
        r = requests.post(endpoint + master_key_identifier,
                          headers=headers,
                          data={'ciphertext': wrapped_key})
        r.raise_for_status()
        r_dict = r.json()
        plaintext = r_dict['data']['plaintext']
        key_bytes = base64.b64decode(plaintext)
        return key_bytes

kms_connection_config = pe.KmsConnectionConfig(
    # kms_instance_url="https://datainfra-dp-masking-api.zalopay.vn/",
    # key_access_token="30338e4118d17773515f9e11d0fcf415ff5d344d",
)

def kms_factory(kms_connection_configuration):
    return VaultClient(kms_connection_configuration)

## AWS KMS Client

In [38]:
from src.parquet_modular_encryption.pme_utils import (
    kms_factory,
    connect_to_hdfs,
    decrypt_file,
    encrypt_to_file,
)

In [39]:
with open(file="/Users/lap14443/secret_keys/pme_key.json", mode="r") as fi:
    secret_keys = json.load(fp=fi)
kms_connection_config = pe.KmsConnectionConfig(
    custom_kms_conf={
        "region_name": secret_keys["region_name"],
        "aws_access_key_id": secret_keys["aws_access_key_id"],
        "aws_secret_access_key": secret_keys["aws_secret_access_key"],
    }
)
crypto_factory = pe.CryptoFactory(kms_factory)

In [31]:
encryption_config = pe.EncryptionConfiguration(
    footer_key="parquet_footer_key",
    encryption_algorithm="AES_GCM_V1",
    data_key_length_bits=128,
    plaintext_footer=True,
    double_wrapping=True,
    column_keys={
        "confidential_encrypt_multiple": ["name"],
        "confidential_encrypt_pii2": ["birth_day"],
        },
    cache_lifetime=timedelta(minutes=10.0),
)

In [19]:
import os


os.environ['HADOOP_CONF_DIR'] = '/usr/hdp/3.1.0.0-78/hadoop/etc/hadoop'
os.environ["HADOOP_USER_NAME"] = "zdeploy"
hdfs = connect_to_hdfs(host="10.60.37.61", port=8020, extra_conf={})

In [16]:
import pandas as pd
import pyarrow as pa


data = {
    'name': ["john", "bob", "alice"],
    'birth_day': ["1990-01-01", "1990-01-01", "1990-01-01"],
    'favorite_color': ["red", "purple", "navy"]
}
df = pd.DataFrame(data=data)
print(df.head())
table = pa.Table.from_pandas(df)
print(table.schema)

    name   birth_day favorite_color
0   john  1990-01-01            red
1    bob  1990-01-01         purple
2  alice  1990-01-01           navy
name: string
birth_day: string
favorite_color: string
-- schema metadata --
pandas: '{"index_columns": [{"kind": "range", "name": null, "start": 0, "' + 613


In [17]:
mock_file = "/tmp/hungnd8/test_kms/mock.parquet"
with pq.ParquetWriter(
    where=mock_file,
    schema=table.schema,
    encryption_properties=file_encryption_properties,
    filesystem=hdfs,
) as writer:
    writer.write_table(table)

In [21]:
%%bash
hdfs dfs -ls hdfs://10.60.37.61:8020//tmp/hungnd8/test_kms/

Found 3 items
-rw-r--r--   3 zdeploy hdfs       6029 2024-05-10 17:21 hdfs://10.60.37.61:8020/tmp/hungnd8/test_kms/mock.parquet
-rw-r--r--   3 zdeploy hdfs       6029 2024-05-10 15:00 hdfs://10.60.37.61:8020/tmp/hungnd8/test_kms/mock_2.parquet
-rw-r--r--   3 zdeploy hdfs          0 2024-05-10 17:18 hdfs://10.60.37.61:8020/tmp/hungnd8/test_kms/mock_3.parquet


In [18]:
df = decrypt_file(
    location=mock_file,
    crypto_factory=crypto_factory,
    kms_connection_config=kms_connection_config,
    fs=hdfs,
).to_pandas()

In [20]:
df.head()

Unnamed: 0,name,birth_day,favorite_color
0,john,1990-01-01,red
1,bob,1990-01-01,purple
2,alice,1990-01-01,navy
