# Goal

This notebook picks one of our medium-large datasets in Delta and converts it to Parquet files to test data sharding using `ShardedByKey` in a Sagemaker Processing Job.

The selected dataset is stored at `s3://ml-rd-ml-datasets/generateVectorEmbed/Qwen3-Embedding-0.6B/miracl/fr/vector_corpus/`.

## 1. Authentication to AWS

In [1]:
import boto3

ml_session = boto3.Session(profile_name="ml", region_name="us-east-1")

In [2]:
import os

credentials = ml_session.get_credentials().get_frozen_credentials()
os.environ["AWS_ACCESS_KEY_ID"] = credentials.access_key
os.environ["AWS_SECRET_ACCESS_KEY"] = credentials.secret_key
os.environ["AWS_SESSION_TOKEN"] = credentials.token

## 2. Copy latest Delta Lake table to destination

In [3]:
from deltalake import DeltaTable

ORIGINAL_DATASET_S3_URI = "s3://ml-rd-ml-datasets/generateVectorEmbed/Qwen3-Embedding-0.6B/miracl/fr/vector_corpus/"

dt = DeltaTable(
    table_uri=ORIGINAL_DATASET_S3_URI,
    storage_options={"timeout": "3600s"}
)

[90m[[0m2025-07-28T14:38:54Z [33mWARN [0m aws_config::imds::region[90m][0m failed to load region from IMDS err=failed to load IMDS session token: dispatch failure: timeout: client error (Connect): HTTP connect timeout occurred after 1s: timed out (FailedToLoadToken(FailedToLoadToken { source: DispatchFailure(DispatchFailure { source: ConnectorError { kind: Timeout, source: hyper_util::client::legacy::Error(Connect, HttpTimeoutError { kind: "HTTP connect", duration: 1s }), connection: Unknown } }) }))
[90m[[0m2025-07-28T14:38:55Z [33mWARN [0m aws_config::imds::region[90m][0m failed to load region from IMDS err=failed to load IMDS session token: dispatch failure: timeout: client error (Connect): HTTP connect timeout occurred after 1s: timed out (FailedToLoadToken(FailedToLoadToken { source: DispatchFailure(DispatchFailure { source: ConnectorError { kind: Timeout, source: hyper_util::client::legacy::Error(Connect, HttpTimeoutError { kind: "HTTP connect", duration: 1s }), conne

Retrieve the Parquet files that form the latest version of the table:

In [4]:
parquet_files = dt.file_uris()

print(f"# of Parquet files forming the latest version of the Delta table: {len(parquet_files)}\n")
print(parquet_files[0])  # print only one sample

# of Parquet files forming the latest version of the Delta table: 601

s3://ml-rd-ml-datasets/generateVectorEmbed/Qwen3-Embedding-0.6B/miracl/fr/vector_corpus/part-00001-84b8c9cf-8478-4437-a9a4-020ad0539959-c000.snappy.parquet


Calculate the size of all the Parquet files in disk:

In [5]:
import pyarrow.parquet as pq

size_per_parquet = []

for parquet_file in parquet_files[0:20]:  # as all the Parquet files have similar size, to speed up execution we iterate only on the first 20s.
    pa_parquet_file = pq.ParquetFile(parquet_file)
    metadata = pa_parquet_file.metadata

    total_row_groups_size_mb = 0
    for i in range(pa_parquet_file.num_row_groups):
        row_group = metadata.row_group(i)
        size_bytes = row_group.total_byte_size
        
        size_kb = size_bytes / 1024
        size_mb = size_kb / 1024

        total_row_groups_size_mb = total_row_groups_size_mb + size_mb

    size_per_parquet.append(total_row_groups_size_mb)


print(f"Avg size (MB) of each Parquet files = {sum(size_per_parquet) / len(size_per_parquet)}")
print(f"Max. size (MB) of a Parquet file = {max(size_per_parquet)}")
print(f"Min. size (MB) of a Parquet file: {min(size_per_parquet)}")

Avg size (MB) of each Parquet files = 97.03176679611207
Max. size (MB) of a Parquet file = 101.11428356170654
Min. size (MB) of a Parquet file: 71.83601951599121


From the previous execution, we can infer:

In [6]:
avg_size_per_parquet_mb = sum(size_per_parquet) / len(size_per_parquet)
num_parquet_files = len(parquet_files)

total_size_mb = avg_size_per_parquet_mb * num_parquet_files
total_size_gb = total_size_mb / 1024

print(f"Total size (GB) of ALL the Parquet files (on disk) = {total_size_gb}")

Total size (GB) of ALL the Parquet files (on disk) = 56.94930844185874


But that is the size on disk, let's check how it correlates with RAM memory:

In [7]:
import psutil

def log_memory():
    process = psutil.Process(os.getpid())
    musage = process.memory_info().rss / 1024**2
    print(f"Memory usage: {musage:.2f} MB")
    return musage

log_memory()

Memory usage: 163.16 MB


163.15625

In [8]:
import pandas as pd

df = pd.read_parquet(parquet_files[0])
log_memory()


Memory usage: 335.70 MB


335.703125

So, from this quick experiment, we can assume that for loading the whole dataset entirely into memory and nothing else, we need an instance with ~90GB. But usually, there is more work apart from loading the dataset (like tokenization, etc.) so actually it is >90GB, and if you require more datasets loaded into the memory, this value explodes.

Let's copy all the Parquet files of the latest version of the Delta table to a new destination:

In [10]:
print(parquet_files)

['s3://ml-rd-ml-datasets/generateVectorEmbed/Qwen3-Embedding-0.6B/miracl/fr/vector_corpus/part-00001-84b8c9cf-8478-4437-a9a4-020ad0539959-c000.snappy.parquet', 's3://ml-rd-ml-datasets/generateVectorEmbed/Qwen3-Embedding-0.6B/miracl/fr/vector_corpus/part-00002-84b8c9cf-8478-4437-a9a4-020ad0539959-c000.snappy.parquet', 's3://ml-rd-ml-datasets/generateVectorEmbed/Qwen3-Embedding-0.6B/miracl/fr/vector_corpus/part-00003-84b8c9cf-8478-4437-a9a4-020ad0539959-c000.snappy.parquet', 's3://ml-rd-ml-datasets/generateVectorEmbed/Qwen3-Embedding-0.6B/miracl/fr/vector_corpus/part-00001-422c2dc0-a348-4896-b80c-8cbcc4ecbf0f-c000.snappy.parquet', 's3://ml-rd-ml-datasets/generateVectorEmbed/Qwen3-Embedding-0.6B/miracl/fr/vector_corpus/part-00002-422c2dc0-a348-4896-b80c-8cbcc4ecbf0f-c000.snappy.parquet', 's3://ml-rd-ml-datasets/generateVectorEmbed/Qwen3-Embedding-0.6B/miracl/fr/vector_corpus/part-00003-422c2dc0-a348-4896-b80c-8cbcc4ecbf0f-c000.snappy.parquet', 's3://ml-rd-ml-datasets/generateVectorEmbed/Q

In [None]:
s3_client = ml_session.client("s3")

source_bucket = "ml-rd-ml-datasets"
destination_bucket = "mvp-mlops-platform"
destination_prefix = "poc-multi-instance-data-prep/"

for parquet_file in parquet_files:
    print(f"Copying {parquet_file}...")

    source_key = parquet_file.replace(f"s3://{source_bucket}/", "")
    destination_key = destination_prefix + source_key.split("/")[-1]

    s3_client.copy_object(
        Bucket=destination_bucket,
        CopySource={'Bucket': source_bucket, 'Key': source_key},
        Key=destination_key
    )

    print(f"{parquet_file} successfully copied!")

Copying s3://ml-rd-ml-datasets/generateVectorEmbed/Qwen3-Embedding-0.6B/miracl/fr/vector_corpus/part-00001-84b8c9cf-8478-4437-a9a4-020ad0539959-c000.snappy.parquet...
s3://ml-rd-ml-datasets/generateVectorEmbed/Qwen3-Embedding-0.6B/miracl/fr/vector_corpus/part-00001-84b8c9cf-8478-4437-a9a4-020ad0539959-c000.snappy.parquet successfully copied!
Copying s3://ml-rd-ml-datasets/generateVectorEmbed/Qwen3-Embedding-0.6B/miracl/fr/vector_corpus/part-00002-84b8c9cf-8478-4437-a9a4-020ad0539959-c000.snappy.parquet...
s3://ml-rd-ml-datasets/generateVectorEmbed/Qwen3-Embedding-0.6B/miracl/fr/vector_corpus/part-00002-84b8c9cf-8478-4437-a9a4-020ad0539959-c000.snappy.parquet successfully copied!
Copying s3://ml-rd-ml-datasets/generateVectorEmbed/Qwen3-Embedding-0.6B/miracl/fr/vector_corpus/part-00003-84b8c9cf-8478-4437-a9a4-020ad0539959-c000.snappy.parquet...
s3://ml-rd-ml-datasets/generateVectorEmbed/Qwen3-Embedding-0.6B/miracl/fr/vector_corpus/part-00003-84b8c9cf-8478-4437-a9a4-020ad0539959-c000.snap

## Create larger shards, approx ~1500MB per shard:

In [1]:
# s3 = ml_session.resource("s3")

# destination_bucket = "mvp-mlops-platform"
# destination_prefix = "poc-multi-instance-data-prep/"

# dest_bucket = s3.Bucket(destination_bucket)
# pq_files = [f"s3://{destination_bucket}/{x.key}" for x in dest_bucket.objects.filter(Prefix=destination_prefix)]
# pq_files = list(filter(lambda x: x.endswith(".parquet"), pq_files))
# pq_files

In [2]:
# import pyarrow.parquet as pq
# import pyarrow.dataset as ds
# import s3fs
# import gc

# s3 = s3fs.S3FileSystem()  # default credentials

# NUM_FILES_PER_SHARD = 15

# num_shard = 0
# for i in range(0, len(pq_files), NUM_FILES_PER_SHARD):
#     print(f"Writing shard {num_shard} of total {len(pq_files) / NUM_FILES_PER_SHARD}")

#     files_per_shard = pq_files[i: i + NUM_FILES_PER_SHARD]
#     dataset = ds.dataset(files_per_shard, format="parquet", filesystem=s3)
#     shard_table = dataset.to_table()

#     output_file = f"s3://{destination_bucket}/poc-multi-instance-data-prep-shards/shard_{num_shard}"
#     pq.write_table(shard_table, output_file, filesystem=s3)
#     print(f"Shard {num_shard} successfully written!")

#     del shard_table
#     del dataset
#     gc.collect()
    
#     num_shard = num_shard + 1
