In [1]:
import base64
from dotenv import load_dotenv

from constants import *
from utility_functions import *
import pandas as pd
import pyarrow.parquet as pq
import pyarrow as pa

In [2]:
load_dotenv()

True

In [3]:
engineer_s3_credentials = get_aws_credentials(ROLE_ENGINEER)
hr_s3_credentials = get_aws_credentials(ROLE_HR)
admin_s3_credentials = get_aws_credentials(ROLE_ADMIN)

Assumed ENGINEER role successfully.
Assumed HR role successfully.
Assumed ADMIN role successfully.


In [4]:
engineer_s3_client = create_aws_client_for(engineer_s3_credentials, "s3")
hr_s3_client = create_aws_client_for(hr_s3_credentials, "s3")
admin_s3_client = create_aws_client_for(admin_s3_credentials, "s3")

In [5]:
list_s3_bucket_objects(engineer_s3_client)
list_s3_bucket_objects(hr_s3_client)
list_s3_bucket_objects(admin_s3_client)

Objects in bucket 's3-rbac-in-data-lakes-experiments':
 - employee_data_encrypted_locally.parquet
 - employee_data_encrypted_modular.parquet
 - employee_data_raw.parquet
Objects in bucket 's3-rbac-in-data-lakes-experiments':
 - employee_data_encrypted_locally.parquet
 - employee_data_encrypted_modular.parquet
 - employee_data_raw.parquet
Objects in bucket 's3-rbac-in-data-lakes-experiments':
 - employee_data_encrypted_locally.parquet
 - employee_data_encrypted_modular.parquet
 - employee_data_raw.parquet


In [6]:
engineer_kms_client = create_aws_client_for(engineer_s3_credentials, "kms")
hr_kms_client = create_aws_client_for(hr_s3_credentials, "kms")
admin_kms_client = create_aws_client_for(admin_s3_credentials, "kms")

In [7]:
admin_kms_client.list_keys()

{'Keys': [{'KeyId': '5722129f-2136-4ef4-8b53-5a242b553f34',
   'KeyArn': 'arn:aws:kms:eu-north-1:501994300007:key/5722129f-2136-4ef4-8b53-5a242b553f34'},
  {'KeyId': '86c41c3f-fc21-4730-a20b-b755e5b63ebb',
   'KeyArn': 'arn:aws:kms:eu-north-1:501994300007:key/86c41c3f-fc21-4730-a20b-b755e5b63ebb'},
  {'KeyId': 'd229ff0a-b839-4732-9dd8-602c38a4487b',
   'KeyArn': 'arn:aws:kms:eu-north-1:501994300007:key/d229ff0a-b839-4732-9dd8-602c38a4487b'}],
 'Truncated': False,
 'ResponseMetadata': {'RequestId': '12d5791e-a477-4a8a-add0-bb4a5c935145',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '12d5791e-a477-4a8a-add0-bb4a5c935145',
   'cache-control': 'no-cache, no-store, must-revalidate, private',
   'expires': '0',
   'pragma': 'no-cache',
   'date': 'Mon, 08 Dec 2025 20:37:31 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '452',
   'connection': 'keep-alive'},
  'RetryAttempts': 0}}

In [9]:
usable_keys(engineer_kms_client)

['d229ff0a-b839-4732-9dd8-602c38a4487b']

In [10]:
usable_keys(hr_kms_client)

['86c41c3f-fc21-4730-a20b-b755e5b63ebb',
 'd229ff0a-b839-4732-9dd8-602c38a4487b']

In [11]:
usable_keys(admin_kms_client)

['5722129f-2136-4ef4-8b53-5a242b553f34',
 '86c41c3f-fc21-4730-a20b-b755e5b63ebb',
 'd229ff0a-b839-4732-9dd8-602c38a4487b']

## Create parquet with encrypted columns

In [12]:
admin_crypto_factory = make_crypto_factory_for_kms(admin_kms_client)

In [13]:
encryption_config = pe.EncryptionConfiguration(
    footer_key="file-access-key",              
    column_keys={
        "salary-key": ["Salary"],     
        "password-key": ["Password"] 
    },
    plaintext_footer=False         
)

In [None]:
kms_conn_config = pe.KmsConnectionConfig(
    kms_instance_id=f"aws-kms-{REGION}",
    kms_instance_url=f"https://kms.{REGION}.amazonaws.com"
)

In [15]:
file_encryption_props = admin_crypto_factory.file_encryption_properties(
    kms_conn_config,
    encryption_config
)

In [16]:
df = pd.read_csv(EMPLOYEE_DATA_RAW_CSV_PATH)
df

Unnamed: 0,ID,Name,Email,Department,Salary,Password
0,1,Alice,alice@example.com,HR,55000,DummyPassword1
1,2,Bob,bob@example.com,Engineering,72000,DummyPassword2
2,3,Charlie,charlie@example.com,Marketing,63000,DummyPassword3
3,4,David,david@example.com,Finance,80000,DummyPassword4
4,5,Eva,eva@example.com,Engineering,75000,DummyPassword5
5,6,Frank,frank@example.com,HR,70000,DummyPassword6
6,7,Grace,grace@example.com,Sales,85000,DummyPassword7
7,8,Hannah,hannah@example.com,Finance,65000,DummyPassword8
8,9,Ian,ian@example.com,Marketing,54800,DummyPassword9
9,10,Julia,julia@example.com,Sales,74500,DummyPassword10


In [17]:
table = pa.Table.from_pandas(df)

with pq.ParquetWriter(
    EMPLOYEE_DATA_ENCRYPTED_MODULAR_PATH,
    table.schema,
    encryption_properties=file_encryption_props
) as writer:
    writer.write_table(table)


In [18]:
def read_parquet_best_effort_for_role(
    kms_client,
    parquet_path: str,
    skip_unreadable_columns: bool = True,
):
    """
    Try to read an encrypted Parquet file for a given role (KMS client).

    - If the role can decrypt the footer but not all columns:
      * If skip_unreadable_columns=True:
          we try each column individually and keep only those that can be read.
      * Else:
          we raise as soon as any column fails.

    - If the role cannot decrypt the footer at all (no footer key access),
      opening ParquetFile will raise immediately.
    """
    print(f"\n=== Reading {parquet_path} with role KMS client {kms_client} ===")

    crypto_factory = make_crypto_factory_for_kms(kms_client)
    decryption_props = crypto_factory.file_decryption_properties(kms_conn_config)

    try:
        pf = pq.ParquetFile(
            parquet_path,
            decryption_properties=decryption_props
        )
    except Exception as e:
        print(f"Failed to open Parquet file: {e}")
        return None

    schema = pf.schema
    print("File columns:", schema.names)

    if not skip_unreadable_columns:
        # Try reading everything in one go; will raise if any column cannot be decrypted
        try:
            table = pf.read()
            return table.to_pandas()
        except Exception as e:
            print(f"Failed to read full table: {e}")
            return None
        
    # Try reading columns one by one
    readable_cols = []
    for name in schema.names:
        try:
            pf.read(columns=[name])  # attempt to read single column
            readable_cols.append(name)
        except Exception as e:
            print(f"Skipping column {name} due to decryption error: {e}")

    if not readable_cols:
        print("No readable columns for this role.")
        return None

    # Finally read only the columns that worked
    table = pf.read(columns=readable_cols)
    df = table.to_pandas()
    print(f"Readable columns for this role: {readable_cols}")
    return df

In [19]:
print("\n--- ADMIN READ (should see all columns) ---")
admin_df = read_parquet_best_effort_for_role(admin_kms_client, EMPLOYEE_DATA_ENCRYPTED_MODULAR_PATH)
if admin_df is not None:
    print(admin_df.head())


--- ADMIN READ (should see all columns) ---

=== Reading ../data/employee_data_encrypted_modular.parquet with role KMS client <botocore.client.KMS object at 0x0000017F5F356E30> ===
File columns: ['ID', 'Name', 'Email', 'Department', 'Salary', 'Password']
Readable columns for this role: ['ID', 'Name', 'Email', 'Department', 'Salary', 'Password']
   ID     Name                Email   Department  Salary        Password
0   1    Alice    alice@example.com           HR   55000  DummyPassword1
1   2      Bob      bob@example.com  Engineering   72000  DummyPassword2
2   3  Charlie  charlie@example.com    Marketing   63000  DummyPassword3
3   4    David    david@example.com      Finance   80000  DummyPassword4
4   5      Eva      eva@example.com  Engineering   75000  DummyPassword5


In [20]:
print("\n--- HR READ (depends on KMS policy: likely Salary but not Password) ---")
hr_df = read_parquet_best_effort_for_role(hr_kms_client, EMPLOYEE_DATA_ENCRYPTED_MODULAR_PATH)
if hr_df is not None:
    print(hr_df.head())


--- HR READ (depends on KMS policy: likely Salary but not Password) ---

=== Reading ../data/employee_data_encrypted_modular.parquet with role KMS client <botocore.client.KMS object at 0x0000017F5E28E650> ===
File columns: ['ID', 'Name', 'Email', 'Department', 'Salary', 'Password']
Skipping column Password due to decryption error: An error occurred (AccessDeniedException) when calling the Decrypt operation: User: arn:aws:sts::501994300007:assumed-role/HR/HR-notebook-session is not authorized to perform: kms:Decrypt on resource: arn:aws:kms:eu-north-1:501994300007:key/5722129f-2136-4ef4-8b53-5a242b553f34 because no identity-based policy allows the kms:Decrypt action
Readable columns for this role: ['ID', 'Name', 'Email', 'Department', 'Salary']
   ID     Name                Email   Department  Salary
0   1    Alice    alice@example.com           HR   55000
1   2      Bob      bob@example.com  Engineering   72000
2   3  Charlie  charlie@example.com    Marketing   63000
3   4    David   

In [21]:
print("\n--- ENGINEER READ (depends on KMS policy) ---")
engineer_df = read_parquet_best_effort_for_role(engineer_kms_client, EMPLOYEE_DATA_ENCRYPTED_MODULAR_PATH)
if engineer_df is not None:
    print(engineer_df.head())


--- ENGINEER READ (depends on KMS policy) ---

=== Reading ../data/employee_data_encrypted_modular.parquet with role KMS client <botocore.client.KMS object at 0x0000017F5E249E70> ===
File columns: ['ID', 'Name', 'Email', 'Department', 'Salary', 'Password']
Skipping column Salary due to decryption error: An error occurred (AccessDeniedException) when calling the Decrypt operation: User: arn:aws:sts::501994300007:assumed-role/ENGINEER/ENGINEER-notebook-session is not authorized to perform: kms:Decrypt on resource: arn:aws:kms:eu-north-1:501994300007:key/86c41c3f-fc21-4730-a20b-b755e5b63ebb because no identity-based policy allows the kms:Decrypt action
Skipping column Password due to decryption error: An error occurred (AccessDeniedException) when calling the Decrypt operation: User: arn:aws:sts::501994300007:assumed-role/ENGINEER/ENGINEER-notebook-session is not authorized to perform: kms:Decrypt on resource: arn:aws:kms:eu-north-1:501994300007:key/5722129f-2136-4ef4-8b53-5a242b553f34 b

In [22]:
# Upload the encrypted Parquet file to S3
admin_s3_client.upload_file(EMPLOYEE_DATA_ENCRYPTED_MODULAR_PATH, BUCKET_NAME, EMPLOYEE_DATA_ENCRYPTED_MODULAR_KEY)
print(f"\nUploaded encrypted Parquet to s3://{BUCKET_NAME}/{EMPLOYEE_DATA_ENCRYPTED_MODULAR_KEY}")


Uploaded encrypted Parquet to s3://s3-rbac-in-data-lakes-experiments/employee_data_encrypted_modular.parquet


## Encrypt employee data locally

In [23]:
def encrypt_with_kms(value, key_id, kms):
    """
    Encrypt a string with the given KMS key and return base64-encoded ciphertext.
    """
    if pd.isna(value):
        return None
    
    resp = kms.encrypt(
        KeyId=key_id,
        Plaintext=str(value).encode("utf-8")
    )
    ciphertext = resp["CiphertextBlob"]
    return base64.b64encode(ciphertext).decode("utf-8")


In [24]:
# Create encrypted columns
df_encrypted = df.copy()
df_encrypted["Salary"] = df["Salary"].apply(lambda x: encrypt_with_kms(x, SALARY_KEY_ID, admin_kms_client))
df_encrypted["Password"] = df["Password"].apply(lambda x: encrypt_with_kms(x, PASSWORD_KEY_ID, admin_kms_client))

df_encrypted

Unnamed: 0,ID,Name,Email,Department,Salary,Password
0,1,Alice,alice@example.com,HR,AQICAHh3ZcDR7KW29w3WBXby4fktfVh0IQ0If41GjOu1gT...,AQICAHhEEOu0doKRSA3zwSAezttR/w1AceU4yqUZ0bKbGI...
1,2,Bob,bob@example.com,Engineering,AQICAHh3ZcDR7KW29w3WBXby4fktfVh0IQ0If41GjOu1gT...,AQICAHhEEOu0doKRSA3zwSAezttR/w1AceU4yqUZ0bKbGI...
2,3,Charlie,charlie@example.com,Marketing,AQICAHh3ZcDR7KW29w3WBXby4fktfVh0IQ0If41GjOu1gT...,AQICAHhEEOu0doKRSA3zwSAezttR/w1AceU4yqUZ0bKbGI...
3,4,David,david@example.com,Finance,AQICAHh3ZcDR7KW29w3WBXby4fktfVh0IQ0If41GjOu1gT...,AQICAHhEEOu0doKRSA3zwSAezttR/w1AceU4yqUZ0bKbGI...
4,5,Eva,eva@example.com,Engineering,AQICAHh3ZcDR7KW29w3WBXby4fktfVh0IQ0If41GjOu1gT...,AQICAHhEEOu0doKRSA3zwSAezttR/w1AceU4yqUZ0bKbGI...
5,6,Frank,frank@example.com,HR,AQICAHh3ZcDR7KW29w3WBXby4fktfVh0IQ0If41GjOu1gT...,AQICAHhEEOu0doKRSA3zwSAezttR/w1AceU4yqUZ0bKbGI...
6,7,Grace,grace@example.com,Sales,AQICAHh3ZcDR7KW29w3WBXby4fktfVh0IQ0If41GjOu1gT...,AQICAHhEEOu0doKRSA3zwSAezttR/w1AceU4yqUZ0bKbGI...
7,8,Hannah,hannah@example.com,Finance,AQICAHh3ZcDR7KW29w3WBXby4fktfVh0IQ0If41GjOu1gT...,AQICAHhEEOu0doKRSA3zwSAezttR/w1AceU4yqUZ0bKbGI...
8,9,Ian,ian@example.com,Marketing,AQICAHh3ZcDR7KW29w3WBXby4fktfVh0IQ0If41GjOu1gT...,AQICAHhEEOu0doKRSA3zwSAezttR/w1AceU4yqUZ0bKbGI...
9,10,Julia,julia@example.com,Sales,AQICAHh3ZcDR7KW29w3WBXby4fktfVh0IQ0If41GjOu1gT...,AQICAHhEEOu0doKRSA3zwSAezttR/w1AceU4yqUZ0bKbGI...


In [25]:
def decrypt_with_kms(ciphertext_b64, kms):
    blob = base64.b64decode(ciphertext_b64)
    resp = kms.decrypt(CiphertextBlob=blob)
    return resp["Plaintext"].decode("utf-8")

In [26]:
decrypt_with_kms(df_encrypted.loc[0, "Salary"], hr_kms_client)

'55000'

In [27]:
output_parquet_path = EMPLOYEE_DATA_ENCRYPTED_LOCALLY_PATH
df_encrypted.to_parquet(output_parquet_path, index=False)
print(f"Written Parquet to {output_parquet_path}")

Written Parquet to ../data/employee_data_encrypted_locally.parquet


In [28]:
admin_s3_client.upload_file(output_parquet_path, BUCKET_NAME, EMPLOYEE_DATA_ENCRYPTED_LOCALLY_KEY)
print(f"Uploaded to s3://{BUCKET_NAME}/{output_parquet_path}")

Uploaded to s3://s3-rbac-in-data-lakes-experiments/../data/employee_data_encrypted_locally.parquet
