### Apache Iceberg Performance Evaluation

This notebook compares the performance of an Apache Iceberg table. We will explore different types table organization (wide, narrow, partitioned) as well as differnet search engines (DubckDB, Trino, pyarrow).

Load aa dataset. This a large MultiQC real-life run split into 8 different sub-runs to demonstrate working with multiple parquet files.

In [1]:
# DATASET = "reallife"
DATASET = "simulated"
FORMAT = "wide"
EXPERIMENT_NAME = f"{FORMAT}_10runs_10mod_5sec_100samples_50metrics"
BUCKET_NAME = "vlad-megaqc"
# EXPERIMENT_NAME = "Petropoulus_2016_wide"
# LOCAL_PARQUET_GLOB = "/Users/vlad/git/evals/data/reallife/run*/multiqc_data/multiqc.parquet"
# LOCAL_PARQUET_GLOB = f"/Users/vlad/git/evals/data/{DATASET}/{EXPERIMENT_NAME}/{FORMAT}_format/*.parquet"

In [13]:
import glob
import os
import natsort

import boto3
import pyarrow.dataset as ds
import pyarrow.fs as fs
from dotenv import load_dotenv
from pyiceberg.catalog import load_catalog
from pyiceberg.catalog.rest import RestCatalog
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.catalog.glue import GlueCatalog

load_dotenv()

CATALOG_NAME = "megaqc-test"
NAMESPACE = DATASET
TABLE_NAME = EXPERIMENT_NAME

# Configure Cloudflare R2 connection
TOKEN = os.environ["CLOUDFLARE_TOKEN"]
ACCESS_KEY_ID = os.environ["CLOUDFLARE_ACCESS_KEY_ID"]
SECRET_ACCESS_KEY = os.environ["CLOUDFLARE_SECRET_ACCESS_KEY"]
WAREHOUSE = "8234d07c9c28a6f6c380fe45731ba8e4_megaqc-test"
CATALOG_URI = "https://catalog.cloudflarestorage.com/8234d07c9c28a6f6c380fe45731ba8e4/megaqc-test"
R2_ENDPOINT_URL = "https://8234d07c9c28a6f6c380fe45731ba8e4.r2.cloudflarestorage.com"
R2_BUCKET_NAME = "megaqc-test"

# Configure AWS S3 connection
S3_BUCKET_NAME = "vlad-megaqc"
s3_prefix = f"{DATASET}/{EXPERIMENT_NAME}/"
AWS_ACCESS_KEY_ID = os.environ["AWS_ACCESS_KEY_ID"]
AWS_SECRET_ACCESS_KEY = os.environ["AWS_SECRET_ACCESS_KEY"]
AWS_REGION = os.environ["AWS_REGION"]
S3_WAREHOUSE_LOCATION = f"s3://{S3_BUCKET_NAME}/warehouse/{DATASET}/{EXPERIMENT_NAME}/"

### Upload data to r2

In [7]:
print(f"Uploading data from {LOCAL_PARQUET_GLOB} to R2 bucket {BUCKET_NAME}")

# Create boto3 S3 client with Cloudflare R2 configuration
s3_client = boto3.client(
    "s3",
    endpoint_url=R2_ENDPOINT_URL,
    aws_access_key_id=ACCESS_KEY_ID,
    aws_secret_access_key=SECRET_ACCESS_KEY,
    region_name="auto",
)

# Delete existing files in the bucket with the current prefix
print(f"Deleting existing files with prefix '{s3_prefix}' from bucket '{BUCKET_NAME}'")
paginator = s3_client.get_paginator("list_objects_v2")
deleted_count = 0
for page in paginator.paginate(Bucket=BUCKET_NAME, Prefix=s3_prefix):
    if "Contents" in page:
        objects_to_delete = [{"Key": obj["Key"]} for obj in page["Contents"]]
        if objects_to_delete:
            s3_client.delete_objects(Bucket=BUCKET_NAME, Delete={"Objects": objects_to_delete})
            deleted_count += len(objects_to_delete)

print(f"Deleted {deleted_count} objects from R2 bucket")

files_to_upload = glob.glob(str(LOCAL_PARQUET_GLOB))
# natsort the files
files_to_upload = natsort.natsorted(files_to_upload)
print(f"Uploading {len(files_to_upload)} files to R2")

# Upload each file to R2
for i, file_path in enumerate(sorted(files_to_upload)):
    file_name = os.path.basename(file_path)
    s3_key = f"{s3_prefix}{i + 1}/{file_name}"
    print(f"Uploading {file_path} to {s3_key}")

    with open(file_path, "rb") as file_data:
        s3_client.upload_fileobj(file_data, BUCKET_NAME, s3_key)

print(f"Upload of {len(files_to_upload)} files completed successfully")


Uploading data from /Users/vlad/git/evals/data/simulated/wide_2runs_10mod_5sec_100samples_50metrics/wide_format/*.parquet to R2 bucket megaqc-test
Deleting existing files with prefix 'simulated/wide_2runs_10mod_5sec_100samples_50metrics/parquet/' from bucket 'megaqc-test'
Deleted 0 objects from R2 bucket
Uploading 2 files to R2
Uploading /Users/vlad/git/evals/data/simulated/wide_2runs_10mod_5sec_100samples_50metrics/wide_format/multiqc_run_1.parquet to simulated/wide_2runs_10mod_5sec_100samples_50metrics/parquet/1/multiqc_run_1.parquet
Uploading /Users/vlad/git/evals/data/simulated/wide_2runs_10mod_5sec_100samples_50metrics/wide_format/multiqc_run_2.parquet to simulated/wide_2runs_10mod_5sec_100samples_50metrics/parquet/2/multiqc_run_2.parquet
Upload of 2 files completed successfully


### Create catalog

In [8]:
DROP_FIRST = True
TYPE = "glue"

In [16]:
if TYPE == "rest":
    # Connect to R2 Data Catalog
    catalog = RestCatalog(
        name=CATALOG_NAME,
        token=TOKEN,
        warehouse=WAREHOUSE,
        uri=CATALOG_URI,
    )
elif TYPE == "glue":
    catalog = GlueCatalog(
        name="glue",
        boto3_session=boto3.Session(
            aws_access_key_id=ACCESS_KEY_ID,
            aws_secret_access_key=SECRET_ACCESS_KEY,
            region_name=AWS_REGION,
        ),
        warehouse=S3_WAREHOUSE_LOCATION,
    )
else:
    raise ValueError(f"Invalid catalog type: {TYPE}")

# Verify auth is working by trying to list namespaces
try:
    namespaces = catalog.list_namespaces()
    print(f"Successfully authenticated. Available namespaces: {namespaces}")
except Exception as e:
    print(f"Authentication error: {e}")
    raise

# Create namespace if it doesn't exist
catalog.create_namespace_if_not_exists(NAMESPACE)

try:
    print(f"Dropping existing table: {NAMESPACE}.{TABLE_NAME}")
    # Use direct API call with headers if needed
    catalog.drop_table((NAMESPACE, TABLE_NAME))
    print(f"Table {NAMESPACE}.{TABLE_NAME} dropped successfully")
except NoSuchTableError:
    print(f"Table {NAMESPACE}.{TABLE_NAME} does not exist, nothing to drop")
except Exception as e:
    print(f"Error dropping table: {e}")
    # Continue with the script even if drop fails

if TYPE == "rest":
    # Create S3 filesystem for PyArrow to access R2
    s3_fs = fs.S3FileSystem(
        endpoint_override=R2_ENDPOINT_URL, access_key=ACCESS_KEY_ID, secret_key=SECRET_ACCESS_KEY, region="auto"
    )
    # Load from R2 bucket using PyArrow
    r2_path = f"{BUCKET_NAME}/{s3_prefix}"
    print(f"Creating PyArrow dataset from R2 bucket at {r2_path}")

    # Create dataset from R2 bucket
    dataset = ds.dataset(r2_path, format="parquet", filesystem=s3_fs)
else:
    # Create S3 filesystem for PyArrow to access S3
    s3_fs = fs.S3FileSystem(
        access_key=AWS_ACCESS_KEY_ID, secret_key=AWS_SECRET_ACCESS_KEY, region=AWS_REGION
    )
    # Load from S3 bucket using PyArrow
    s3_path = f"{S3_BUCKET_NAME}/{s3_prefix}"
    print(f"Creating PyArrow dataset from S3 bucket at {s3_path}")
    dataset = ds.dataset(s3_path, format="parquet", filesystem=s3_fs)

# Convert to PyArrow table
pa_table = dataset.to_table()
print(f"Loaded PyArrow table with schema: {pa_table.schema}")

# Instead of dropping, try to recreate by first checking if exists
table_exists = False
try:
    catalog.load_table((NAMESPACE, TABLE_NAME))
    table_exists = True
    print(f"Table {NAMESPACE}.{TABLE_NAME} already exists")
except NoSuchTableError:
    table_exists = False
    print(f"Table {NAMESPACE}.{TABLE_NAME} doesn't exist, will create new")

# If table exists and we couldn't drop it, try to create a new table with a different name
if table_exists and DROP_FIRST:
    # Generate a unique name with timestamp
    import datetime

    timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
    new_table_name = f"{TABLE_NAME}_{timestamp}"
    print(f"Creating new table with unique name: {new_table_name}")
    TABLE_NAME = new_table_name

# Create the table
print(f"Creating Iceberg table: {NAMESPACE}.{TABLE_NAME}")
try:
    # Create new table with explicit properties
    iceberg_table = catalog.create_table(
        identifier=(NAMESPACE, TABLE_NAME),
        schema=pa_table.schema,
    )
    print(f"Successfully created table {NAMESPACE}.{TABLE_NAME}")
except Exception as e:
    print(f"Error creating table: {e}")
    # Try fallback approach
    print("Trying alternate approach with create_table_if_not_exists...")
    iceberg_table = catalog.create_table_if_not_exists(
        identifier=(NAMESPACE, TABLE_NAME),
        schema=pa_table.schema,
    )

Successfully authenticated. Available namespaces: [('megaqc-test',), ('simulated',)]
Dropping existing table: simulated.wide_10runs_10mod_5sec_100samples_50metrics
Table simulated.wide_10runs_10mod_5sec_100samples_50metrics does not exist, nothing to drop
Creating PyArrow dataset from S3 bucket at vlad-megaqc/simulated/wide_10runs_10mod_5sec_100samples_50metrics/
Loaded PyArrow table with schema: type: string
creation_date: timestamp[us]
run_id: int64
anchor: string
plot_type: string
plot_input_data: string
section_key: string
sample_name: string
data_sources: string
multiqc_version: string
modules: double
col_ZBreGuni: double
col_ZBreGuni_str: string
col_RGqTtNIX: double
col_RGqTtNIX_str: string
col_QugIEqBf: double
col_QugIEqBf_str: string
col_hOReKEuV: double
col_hOReKEuV_str: string
col_GnncXIZv: double
col_GnncXIZv_str: string
col_wCrCQpWh: double
col_wCrCQpWh_str: string
col_eBBOazsk: double
col_eBBOazsk_str: string
col_inNHoQSp: double
col_inNHoQSp_str: string
col_zYLsGhci: doub

### Create Glue catalog

### Add table to the index

In [18]:
iceberg_table = catalog.load_table((NAMESPACE, TABLE_NAME))
print(f"Appending {pa_table.num_rows} rows to Iceberg table")
iceberg_table.append(pa_table)

Appending 50510 rows to Iceberg table


### Querying

In [19]:
if TYPE == "rest":
    catalog = load_catalog(
        CATALOG_NAME,
        type="rest",
        uri=CATALOG_URI,
        warehouse=WAREHOUSE,
        token=TOKEN,
    )
elif TYPE == "glue":
    catalog = load_catalog(
        CATALOG_NAME,
        type="glue",
        uri=CATALOG_URI,
        warehouse=S3_WAREHOUSE_LOCATION,
    )
iceberg_table = catalog.load_table((NAMESPACE, TABLE_NAME))

#### With Polars

In [20]:
import polars as pl
df = iceberg_table.to_polars()
df.count().collect()

type,creation_date,run_id,anchor,plot_type,plot_input_data,section_key,sample_name,data_sources,multiqc_version,modules,col_ZBreGuni,col_ZBreGuni_str,col_RGqTtNIX,col_RGqTtNIX_str,col_QugIEqBf,col_QugIEqBf_str,col_hOReKEuV,col_hOReKEuV_str,col_GnncXIZv,col_GnncXIZv_str,col_wCrCQpWh,col_wCrCQpWh_str,col_eBBOazsk,col_eBBOazsk_str,col_inNHoQSp,col_inNHoQSp_str,col_zYLsGhci,col_zYLsGhci_str,col_ajnHDGFy,col_ajnHDGFy_str,col_RPCLrgvU,col_RPCLrgvU_str,col_DTSIvdOU,col_DTSIvdOU_str,col_oMDkdXtN,col_oMDkdXtN_str,…,col_EFtkrjSL_str,col_HfzVbXXE,col_HfzVbXXE_str,col_yStqGyjL,col_yStqGyjL_str,col_tzFuKFhp,col_tzFuKFhp_str,col_yzYxuKcY,col_yzYxuKcY_str,col_xWuwaZdv,col_xWuwaZdv_str,col_XtFwlRoq,col_XtFwlRoq_str,col_HnGGRhaY,col_HnGGRhaY_str,col_XDkJvVsU,col_XDkJvVsU_str,col_zPAmKlAF,col_zPAmKlAF_str,col_MRGOMtQg,col_MRGOMtQg_str,col_LdIPzavf,col_LdIPzavf_str,col_kvPHJeDF,col_kvPHJeDF_str,col_cepRQZWz,col_cepRQZWz_str,col_eFTfQyyL,col_eFTfQyyL_str,col_LDCFNczn,col_LDCFNczn_str,col_RvwhILHh,col_RvwhILHh_str,col_OQDoxvlo,col_OQDoxvlo_str,col_pLinLScc,col_pLinLScc_str
u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,…,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32
50510,50510,50510,50500,50500,500,50000,50000,50000,50000,0,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,…,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000,50000


In [60]:
df.filter(pl.col("type") == "table_row").select(pl.col("col_total_sequences_val")).describe()

statistic,col_total_sequences_val
str,f64
"""count""",1529.0
"""null_count""",30580.0
"""mean""",7.15351
"""std""",5.383347
"""min""",0.068885
"""25%""",3.596049
"""50%""",6.41474
"""75%""",9.392508
"""max""",68.397426


In [None]:
print("Unique sections in the table:")
unique_sections = df.filter(pl.col('type') == 'plot_input').select(pl.col('anchor')).unique()
print(unique_sections.collect())

Unique sections in the table:
shape: (32, 1)
┌─────────────────────────────────┐
│ anchor                          │
│ ---                             │
│ str                             │
╞═════════════════════════════════╡
│ fastqc_sequence_length_distrib… │
│ cutadapt_trimmed_sequences_plo… │
│ rseqc_bam_stat                  │
│ rseqc_read_distribution_plot    │
│ fastqc_per_sequence_gc_content… │
│ …                               │
│ fastqc_sequence_counts_plot     │
│ fastqc_per_base_n_content_plot  │
│ qualimap_genomic_origin         │
│ fastqc_overrepresented_sequenc… │
│ fastqc_top_overrepresented_seq… │
└─────────────────────────────────┘


In [67]:
print("Runs in the table:")
runs = df.filter(pl.col('type') == 'run_metadata').select(pl.col('creation_date'))
print(runs.collect())
print("Unique samples in the table:")
unique_samples = df.filter(pl.col('type') == 'table_row').select(pl.col('sample_name')).unique()
print(unique_samples.collect())

Runs in the table:
shape: (8, 1)
┌────────────────────────────┐
│ creation_date              │
│ ---                        │
│ datetime[μs]               │
╞════════════════════════════╡
│ 2025-04-30 18:48:37.306406 │
│ 2025-04-30 18:56:01.104408 │
│ 2025-04-30 18:57:46.955920 │
│ 2025-04-30 18:59:31.437127 │
│ 2025-04-30 21:48:24.557758 │
│ 2025-04-30 22:05:15.876615 │
│ 2025-04-30 22:15:21.886231 │
│ 2025-04-30 22:25:28.638526 │
└────────────────────────────┘
Unique samples in the table:
shape: (1_529, 1)
┌─────────────┐
│ sample_name │
│ ---         │
│ str         │
╞═════════════╡
│ ERX1121432  │
│ ERX1121055  │
│ ERX1121047  │
│ ERX1120943  │
│ ERX1121843  │
│ …           │
│ ERX1122294  │
│ ERX1121574  │
│ ERX1122212  │
│ ERX1121129  │
│ ERX1121249  │
└─────────────┘


In [75]:
print("Metrics in the table:")
columns = [col for col in df.columns if col.startswith('col_') and col.endswith('_val')]
print("Columns:", "\n".join(columns))

Metrics in the table:
Columns: col_total_records_val
col_qc_failed_val
col_optical_pcr_duplicate_val
col_non_primary_hits_val
col_unmapped_reads_val
col_mapq_gte_mapq_cut_unique_val
col_reads_map_to_sense_val
col_reads_map_to_antisense_val
col_non-splice_reads_val
col_splice_reads_val
col_raw_total_sequences_val
col_reads_mapped_and_paired_val
col_reads_properly_paired_val
col_reads_duplicated_val
col_reads_QC_failed_val
col_reads_MQ0_val
col_bases_mapped_(cigar)_val
col_bases_trimmed_val
col_bases_duplicated_val
col_pairs_on_different_chromosomes_val
col_pairs_with_other_orientation_val
col_inward_oriented_pairs_val
col_outward_oriented_pairs_val
col_flagstat_total_val
col_total_passed_val
col_mapped_passed_val
col_secondary_passed_val
col_duplicates_passed_val
col_paired in sequencing_passed_val
col_properly paired_passed_val
col_with itself and mate mapped_passed_val
col_singletons_passed_val
col_with mate mapped to a different chr_passed_val
col_with mate mapped to a different chr 

  columns = [col for col in df.columns if col.startswith('col_') and col.endswith('_val')]


In [76]:
metrics = df.filter(pl.col('type') == 'table_row').select(columns).describe()
print(metrics)

shape: (9, 89)
┌───────────┬───────────┬───────────┬───────────┬───┬───────────┬───────────┬───────────┬──────────┐
│ statistic ┆ col_total ┆ col_qc_fa ┆ col_optic ┆ … ┆ col_avg_s ┆ col_media ┆ col_perce ┆ col_tota │
│ ---       ┆ _records_ ┆ iled_val  ┆ al_pcr_du ┆   ┆ equence_l ┆ n_sequenc ┆ nt_fails_ ┆ l_sequen │
│ str       ┆ val       ┆ ---       ┆ plicate_v ┆   ┆ ength_val ┆ e_length_ ┆ val       ┆ ces_val  │
│           ┆ ---       ┆ f64       ┆ al        ┆   ┆ ---       ┆ val       ┆ ---       ┆ ---      │
│           ┆ f64       ┆           ┆ ---       ┆   ┆ f64       ┆ ---       ┆ f64       ┆ f64      │
│           ┆           ┆           ┆ f64       ┆   ┆           ┆ f64       ┆           ┆          │
╞═══════════╪═══════════╪═══════════╪═══════════╪═══╪═══════════╪═══════════╪═══════════╪══════════╡
│ count     ┆ 1529.0    ┆ 1529.0    ┆ 1529.0    ┆ … ┆ 1529.0    ┆ 1529.0    ┆ 1529.0    ┆ 1529.0   │
│ null_coun ┆ 30580.0   ┆ 30580.0   ┆ 30580.0   ┆ … ┆ 30580.0   ┆ 30580.0   

#### DuckDB

In [42]:
table_name = f'{NAMESPACE}.{TABLE_NAME}'
con = iceberg_table.scan().to_duckdb(table_name=table_name)

In [47]:
# min, max, median, mean on col_total_sequences_val column but with DuckDB
con.execute(f"""
    SELECT 
        COUNT(col_total_sequences_val), 
        MEAN(col_total_sequences_val), 
        STDDEV(col_total_sequences_val), 
        MIN(col_total_sequences_val), 
        MAX(col_total_sequences_val) 
    FROM "{table_name}"
    WHERE type='table_row'
""").fetchall()

[(1529, 7.153509624591234, 5.38334733048627, 0.068885, 68.397426)]

#### With Trino

In [None]:
conn = trino.dbapi.connect(
    host="localhost",          # or EC2 / k8s hostname
    port=8080,
    user="vlad",
    catalog="r2",              # <- matches r2.properties
    schema="default",
)
cursor = conn.cursor()
cursor.execute("SELECT count(*) FROM multiqc_metrics_partitioned")
cursor.execute("""
CREATE TABLE IF NOT EXISTS metrics (
    run_id VARCHAR,
    module_id VARCHAR,
    metric_name VARCHAR,
    value DOUBLE
)
WITH (
    format = 'PARQUET',
    partitioning = ARRAY['run_id']
)
""")

TrinoConnectionError: failed to execute: HTTPConnectionPool(host='localhost', port=8080): Max retries exceeded with url: /v1/statement (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x1543a8f30>: Failed to establish a new connection: [Errno 61] Connection refused'))

In [5]:
def query_single_metric_parquet(parquet_dir, metric_name="metric_0"):
    """Query Parquet files to retrieve specific metric values using PyArrow"""
    print(f"Querying Parquet files for metrics with name: {metric_name}")
    start_time = time.time()

    # Read the Parquet files with partitioning information
    dataset = ds.dataset(parquet_dir, format="parquet", partitioning="hive")

    # Define filter condition for the metric name
    filter_expr = (ds.field("metric_name") == metric_name)
    # Read the filtered data
    table = dataset.to_table(filter=filter_expr)
    # Convert to pandas DataFrame if needed
    df = table.to_pandas()

    end_time = time.time()
    elapsed = end_time - start_time
    
    print(f"Query found {len(df)} records in {elapsed:.4f} seconds")
    return df, elapsed


def query_single_module_parquet(parquet_dir, run_id="run_0", module_id="module_0"):
    """Query Parquet files to retrieve specific module data using PyArrow"""
    print(f"Querying Parquet files for run_id={run_id} and module_id={module_id}")
    start_time = time.time()

    # Read the Parquet files with partitioning information
    dataset = ds.dataset(parquet_dir, format="parquet", partitioning="hive")

    # Define filter condition for the run_id and module_id
    filter_expr = (ds.field("run_id") == run_id) & (ds.field("module_id") == module_id)
    # Read the filtered data
    table = dataset.to_table(filter=filter_expr)
    # Convert to pandas DataFrame if needed
    df = table.to_pandas()
    
    end_time = time.time()
    elapsed = end_time - start_time
    
    print(f"Query found {len(df)} records in {elapsed:.4f} seconds")
    return df, elapsed


def run_parquet_benchmark(parquet_dir, num_runs=10, num_modules=10, 
                         num_samples_per_module=100, num_metrics_per_module=20):
    """Run a complete Parquet benchmark"""
    print("-" * 80)
    print("PARQUET BENCHMARK")
    print("-" * 80)
    
    print("\nGenerating sample data with:")
    print(f"- {num_runs} runs")
    print(f"- {num_modules} modules per run")
    print(f"- {num_samples_per_module} samples per module")
    print(f"- {num_metrics_per_module} metrics per module")
    
    # Storage benchmark
    current_time = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
    benchmark_dir = f"{parquet_dir}/{current_time}"
    
    # Query benchmarks
    print("\nRunning query benchmarks:")
    
    # Query by metric name
    _, metric_query_time = query_single_metric_parquet(benchmark_dir)
    
    # Query by run_id and module_id
    _, module_query_time = query_single_module_parquet(benchmark_dir)
    
    # Summary
    print("\nPARQUET BENCHMARK SUMMARY:")
    print(f"Query by metric time: {metric_query_time:.4f} seconds")
    print(f"Query by module time: {module_query_time:.4f} seconds")
    
    return {
        "metric_query_time": metric_query_time,
        "module_query_time": module_query_time
    }


# MinIO/S3 configuration
PARQUET_BUCKET = "s3://megaqc-test/parquet_data"

# Set MinIO credentials for local testing
# If using AWS S3 directly, these would be your AWS credentials
# os.environ["AWS_ACCESS_KEY_ID"] = "minio"
# os.environ["AWS_SECRET_ACCESS_KEY"] = "minio123"
# os.environ["AWS_ENDPOINT_URL"] = "http://minio:9000"
# os.environ["AWS_REGION"] = "us-east-1"

# Run the benchmark with smaller dataset for testing
results = run_parquet_benchmark(
    PARQUET_BUCKET,
    num_runs=5,
    num_modules=5,
    num_samples_per_module=10,
    num_metrics_per_module=5
)

--------------------------------------------------------------------------------
PARQUET BENCHMARK
--------------------------------------------------------------------------------

Generating sample data with:
- 5 runs
- 5 modules per run
- 10 samples per module
- 5 metrics per module
Flattening data...
Creating DataFrame...
Creating Parquet Table...
Writing to Parquet file s3://megaqc-test/parquet_data/2025-04-23-13-46-14...
Parquet storage time: 7.2234 seconds

Running query benchmarks:
Querying Parquet files for metrics with name: metric_0
Query found 250 records in 1.2624 seconds
Querying Parquet files for run_id=run_0 and module_id=module_0
Query found 50 records in 1.2209 seconds

PARQUET BENCHMARK SUMMARY:
Storage time: 7.2234 seconds
Query by metric time: 1.2624 seconds
Query by module time: 1.2209 seconds


In [32]:
from pyiceberg.catalog import load_catalog

cat = load_catalog(
    "multiqc",
    type="sql",
    uri="sqlite:////Users/vlad/multiqc_iceberg.db",
    warehouse="s3://megaqc-test/iceberg_data/2025-04-23-13-46-14",
)

arrow_tbl = pa.Table.from_pandas(df, preserve_index=False)

cat.create_namespace("multiqc")
cat.create_table(
    identifier="multiqc.metrics",
    schema=arrow_tbl.schema,
)

# Load the table and append the new snapshot
table = cat.load_table("multiqc.metrics")
table.append(arrow_tbl)  # fast-append commit

In [6]:
# Benchmark script for Iceberg storage and querying with Trino

def create_trino_connection(
        host="trino-coordinator", port=8080, 
        user="trino", catalog="iceberg", schema="default"
    ):
    """Create a connection to Trino"""
    try:
        conn = trino.dbapi.connect(
            host=host,
            port=port,
            user=user,
            catalog=catalog,
            schema=schema,
        )
        print("Connected to Trino successfully!")
        return conn
    except Exception as e:
        print(f"Error connecting to Trino: {e}")
        return None


def query_single_metric_iceberg(conn, metric_name="metric_0"):
    """Query Iceberg table to retrieve specific metric values using Trino"""
    print(f"Querying Iceberg table for metrics with name: {metric_name}")
    start_time = time.time()

    # Execute query through Trino
    cursor = conn.cursor()
    query = f"""
        SELECT * FROM metrics
        WHERE metric_name = '{metric_name}'
    """
    
    try:
        cursor.execute(query)
        # Fetch all results
        results = cursor.fetchall()
        
        # Convert to DataFrame
        columns = [desc[0] for desc in cursor.description]
        df = pd.DataFrame(results, columns=columns)
        
        end_time = time.time()
        elapsed = end_time - start_time
        
        print(f"Query found {len(df)} records in {elapsed:.4f} seconds")
        return df, elapsed
    
    except Exception as e:
        print(f"Error querying data: {e}")
        return pd.DataFrame(), -1


def query_single_module_iceberg(conn, run_id="run_0", module_id="module_0"):
    """Query Iceberg table to retrieve specific module data using Trino"""
    print(f"Querying Iceberg table for run_id={run_id} and module_id={module_id}")
    start_time = time.time()

    # Execute query through Trino
    cursor = conn.cursor()
    query = f"""
        SELECT * FROM metrics
        WHERE run_id = '{run_id}' AND module_id = '{module_id}'
    """
    
    try:
        cursor.execute(query)
        # Fetch all results
        results = cursor.fetchall()
        
        # Convert to DataFrame
        columns = [desc[0] for desc in cursor.description]
        df = pd.DataFrame(results, columns=columns)
        
        end_time = time.time()
        elapsed = end_time - start_time
        
        print(f"Query found {len(df)} records in {elapsed:.4f} seconds")
        return df, elapsed
    
    except Exception as e:
        print(f"Error querying data: {e}")
        return pd.DataFrame(), -1


def run_iceberg_benchmark(num_runs=10, num_modules=10, 
                         num_samples_per_module=100, num_metrics_per_module=20):
    """Run a complete Iceberg benchmark"""
    print("-" * 80)
    print("ICEBERG BENCHMARK")
    print("-" * 80)
    
    print("\nGenerating sample data with:")
    print(f"- {num_runs} runs")
    print(f"- {num_modules} modules per run")
    print(f"- {num_samples_per_module} samples per module")
    print(f"- {num_metrics_per_module} metrics per module")
    
    # Generate test data
    data = generate_all_data(
        num_runs, num_modules, num_samples_per_module, num_metrics_per_module
    )
    
    # Connect to Trino
    conn = create_trino_connection()
    if not conn:
        print("Failed to connect to Trino. Aborting Iceberg benchmark.")
        return {
            "storage_time": -1,
            "metric_query_time": -1,
            "module_query_time": -1
        }
    
    # Initialize Iceberg schema
    if not init_iceberg_schema(conn):
        print("Failed to initialize Iceberg schema. Aborting Iceberg benchmark.")
        return {
            "storage_time": -1,
            "metric_query_time": -1,
            "module_query_time": -1
        }
    
    # Clear existing data
    try:
        cursor = conn.cursor()
        cursor.execute("DELETE FROM metrics")
        print("Cleared existing data from metrics table")
    except Exception as e:
        print(f"Error clearing metrics table: {e}")
    
    # Storage benchmark
    storage_time = store_in_iceberg(data, conn)
    
    # Query benchmarks
    print("\nRunning query benchmarks:")
    
    # Query by metric name
    _, metric_query_time = query_single_metric_iceberg(conn)
    
    # Query by run_id and module_id
    _, module_query_time = query_single_module_iceberg(conn)
    
    # Summary
    print("\nICEBERG BENCHMARK SUMMARY:")
    print(f"Storage time: {storage_time:.4f} seconds")
    print(f"Query by metric time: {metric_query_time:.4f} seconds")
    print(f"Query by module time: {module_query_time:.4f} seconds")
    
    return {
        "storage_time": storage_time,
        "metric_query_time": metric_query_time,
        "module_query_time": module_query_time
    }


# Run the benchmark with smaller dataset for testing
results = run_iceberg_benchmark(
    num_runs=5,
    num_modules=5,
    num_samples_per_module=10,
    num_metrics_per_module=5
) 

--------------------------------------------------------------------------------
ICEBERG BENCHMARK
--------------------------------------------------------------------------------

Generating sample data with:
- 5 runs
- 5 modules per run
- 10 samples per module
- 5 metrics per module
Connected to Trino successfully!
Error creating Iceberg table: failed to execute: HTTPConnectionPool(host='trino-coordinator', port=8080): Max retries exceeded with url: /v1/statement (Caused by NameResolutionError("<urllib3.connection.HTTPConnection object at 0x113ee9760>: Failed to resolve 'trino-coordinator' ([Errno 8] nodename nor servname provided, or not known)"))
Failed to initialize Iceberg schema. Aborting Iceberg benchmark.


In [9]:
conn = trino.dbapi.connect(
    host="localhost",
    port=8080,
    user="trino",
    catalog="iceberg",
    schema="default"
)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS metrics (
    run_id VARCHAR,
    module_id VARCHAR,
    metric_name VARCHAR,
    value DOUBLE
)
WITH (
    format = 'PARQUET',
    partitioning = ARRAY['run_id']
)
""")

TrinoConnectionError: failed to execute: HTTPConnectionPool(host='localhost', port=8080): Max retries exceeded with url: /v1/statement (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x1081c19d0>: Failed to establish a new connection: [Errno 61] Connection refused'))