Simulating MultiQC runs with exporting of data into two types of intermediate format:
- columnar data lake (parquet)
- document DB (MongoDB)

In [5]:
import os
import random
import string
import time
from datetime import datetime

import shutil
import duckdb
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pymongo
import pyarrow.dataset as ds

# Configuration parameters
NUM_RUNS = 1000  # Can be scaled up to millions in real case
NUM_MODULES = 10  # Fixed across runs
NUM_SAMPLES_PER_MODULE = 100  # Can be 10 to 1000
NUM_METRICS_PER_MODULE = 20  # Can be 10 to 50

# MongoDB setup
MONGO_DB_NAME = "data_comparison"
MONGO_COLLECTION_NAME = "runs"
MONGO_URI = "mongodb://localhost:27017/"

# Parquet setup
PARQUET_DIR = "parquet_data"
os.makedirs(PARQUET_DIR, exist_ok=True)


def generate_random_string(length=10):
    """Generate a random string of fixed length"""
    return "".join(random.choices(string.ascii_letters, k=length))


def generate_metric_metadata():
    """Generate metadata for a metric"""
    return {
        "min": random.uniform(0, 10),
        "max": random.uniform(90, 100),
        "scale": random.choice(["linear", "log"]),
        "color": f"#{random.randint(0, 0xFFFFFF):06x}",
        "type": random.choice(["numeric", "categorical", "percentage"]),
        "namespace": random.choice(["performance", "quality", "resource"]),
        "placement": random.choice(["primary", "secondary", "tertiary"]),
    }


def generate_value_metadata(value):
    """Generate metadata for a value"""
    return {
        "unmodified_value": value,
        "formatted_value": f"{value:.2f}" if isinstance(value, float) else str(value),
    }


def generate_sample_data(num_metrics):
    """Generate data for a single sample"""
    sample_id = generate_random_string()
    metrics = {}

    for i in range(num_metrics):
        metric_name = f"metric_{i}"
        value = random.uniform(0, 100)
        metrics[metric_name] = {
            "value": value,
            "metadata": generate_value_metadata(value),
        }

    return {"sample_id": sample_id, "metrics": metrics}


def generate_module_data(module_index, num_samples, num_metrics):
    """Generate data for a single module"""
    samples = [generate_sample_data(num_metrics) for _ in range(num_samples)]

    metrics_metadata = {}
    for i in range(num_metrics):
        metric_name = f"metric_{i}"
        metrics_metadata[metric_name] = generate_metric_metadata()

    return {
        "module_id": f"module_{module_index}",
        "name": f"Module {module_index}",
        "url": f"http://example.com/module/{module_index}",
        "comment": f"This is module {module_index}",
        "metrics_metadata": metrics_metadata,
        "samples": samples,
    }


def generate_run_data(
    run_index, num_modules, num_samples_per_module, num_metrics_per_module
):
    """Generate data for a single run"""
    modules = [
        generate_module_data(i, num_samples_per_module, num_metrics_per_module)
        for i in range(num_modules)
    ]

    return {
        "run_id": f"run_{run_index}",
        "timestamp": datetime.now().isoformat(),
        "modules": modules,
    }


def generate_all_data(
    num_runs, num_modules, num_samples_per_module, num_metrics_per_module
):
    """Generate all runs data"""
    return [
        generate_run_data(
            i, num_modules, num_samples_per_module, num_metrics_per_module
        )
        for i in range(num_runs)
    ]


In [6]:
def store_in_mongodb(data, uri=MONGO_URI, db_name=MONGO_DB_NAME, collection_name=MONGO_COLLECTION_NAME):
    """Store the hierarchical data in MongoDB"""
    client = pymongo.MongoClient(uri)
    db = client[db_name]
    collection = db[collection_name]

    # Drop the collection if it already exists
    collection.drop()

    # Insert the data
    start_time = time.time()
    collection.insert_many(data)
    end_time = time.time()

    # Create indexes for common queries
    collection.create_index([("run_id", pymongo.ASCENDING)])
    collection.create_index([("modules.module_id", pymongo.ASCENDING)])
    collection.create_index([("modules.samples.sample_id", pymongo.ASCENDING)])

    return end_time - start_time


def flatten_hierarchical_data(data):
    """Convert hierarchical data to flat format for Parquet"""
    flat_records = []

    for run in data:
        run_id = run["run_id"]
        timestamp = run["timestamp"]

        for module in run["modules"]:
            module_id = module["module_id"]
            module_name = module["name"]
            module_url = module["url"]
            module_comment = module["comment"]

            for sample in module["samples"]:
                sample_id = sample["sample_id"]

                for metric_name, metric_data in sample["metrics"].items():
                    value = metric_data["value"]
                    unmodified_value = metric_data["metadata"]["unmodified_value"]
                    formatted_value = metric_data["metadata"]["formatted_value"]

                    # Get metric metadata
                    metric_metadata = module["metrics_metadata"].get(metric_name, {})

                    flat_records.append(
                        {
                            "run_id": run_id,
                            "timestamp": timestamp,
                            "module_id": module_id,
                            "module_name": module_name,
                            "module_url": module_url,
                            "module_comment": module_comment,
                            "sample_id": sample_id,
                            "metric_name": metric_name,
                            "value": value,
                            "unmodified_value": unmodified_value,
                            "formatted_value": formatted_value,
                            "metric_min": metric_metadata.get("min"),
                            "metric_max": metric_metadata.get("max"),
                            "metric_scale": metric_metadata.get("scale"),
                            "metric_color": metric_metadata.get("color"),
                            "metric_type": metric_metadata.get("type"),
                            "metric_namespace": metric_metadata.get("namespace"),
                            "metric_placement": metric_metadata.get("placement"),
                        }
                    )

    return flat_records


def store_in_parquet(data, parquet_dir):
    """Store the flattened data in Parquet format"""
    flat_data = flatten_hierarchical_data(data)
    df = pd.DataFrame(flat_data)

    # Clean parquet_dir
    if os.path.exists(parquet_dir):
        shutil.rmtree(parquet_dir)

    # Create table from DataFrame
    table = pa.Table.from_pandas(df)

    # Write to Parquet file with partitioning
    start_time = time.time()
    pq.write_to_dataset(
        table,
        root_path=parquet_dir,
        partition_cols=["run_id", "module_id"],  # Partition by run_id and module_id
    )
    end_time = time.time()

    return end_time - start_time


def query_single_item_mongodb(uri=MONGO_URI, db_name=MONGO_DB_NAME, collection_name=MONGO_COLLECTION_NAME):
    """Query MongoDB to retrieve specific metric values"""

    client = pymongo.MongoClient(uri)
    db = client[db_name]
    collection = db[collection_name]

    # Constructing the MongoDB query to find runs matching criteria
    query = {"run_id": "run_0"}
    projection = {"run_id": 1, "modules": {"$elemMatch": {"samples.metrics": 1}}}

    start_time = time.time()
    results = list(collection.find(query, projection))
    end_time = time.time()

    return results, end_time - start_time


def query_single_metric_mongodb(uri=MONGO_URI, db_name=MONGO_DB_NAME, collection_name=MONGO_COLLECTION_NAME):
    """Query MongoDB to retrieve specific metric values"""
    start_time = time.time()

    client = pymongo.MongoClient(uri)
    db = client[db_name]
    collection = db[collection_name]

    # Example query: Get all values for metric_0 in module_0 for runs with specific criteria
    metric_name = "metric_0"

    # Using MongoDB's aggregation pipeline to unwind arrays and extract specific metric values
    pipeline = [
        # Unwind modules array
        {"$unwind": "$modules"},
        # Unwind samples array
        {"$unwind": "$modules.samples"},
        # Create a field for each metric and its value using $objectToArray to convert metrics to array
        {"$addFields": {
            "metricsArray": {"$objectToArray": "$modules.samples.metrics"}
        }},
        # Unwind metrics array
        {"$unwind": "$metricsArray"},
        # Filter to keep only the specific metric we want
        {"$match": {"metricsArray.k": metric_name}},
        # Project the fields we need
        {"$project": {
            "run_id": 1,
            "sample_id": "$modules.samples.sample_id",
            "metric_name": "$metricsArray.k",
            "metric_value": "$metricsArray.v.value"
        }}
    ]  
    start_time = time.time()    
    results = list(collection.aggregate(pipeline))
    end_time = time.time()

    return results, end_time - start_time


def query_single_metric_parquet(parquet_dir=PARQUET_DIR):
    """Query Parquet files to retrieve specific metric values using DuckDB"""
    start_time = time.time()

    metric_name = "metric_0"

    # Use DuckDB to query the Parquet files directly
    con = duckdb.connect(database=':memory:')
    query = f"""
        SELECT * FROM '{parquet_dir}/**/*.parquet'
        WHERE metric_name = '{metric_name}'
    """
    filtered_df = con.execute(query).fetchdf()
    
    end_time = time.time()

    return filtered_df, end_time - start_time


def query_single_metric_parquet_pyarrow(parquet_dir=PARQUET_DIR):
    start_time = time.time()

    metric_name = "metric_0"

    dataset = ds.dataset(parquet_dir, format="parquet")
    # Define filter condition for the metric name
    filter_expr = (ds.field("metric_name") == metric_name)
    # Read the filtered data
    table = dataset.to_table(filter=filter_expr)
    # Convert to pandas DataFrame if needed
    df = table.to_pandas()

    end_time = time.time()

    return df, end_time - start_time


def query_single_item_parquet(parquet_dir=PARQUET_DIR):
    """Query Parquet files to retrieve specific items using DuckDB"""
    start_time = time.time()

    # Filter by run_id and module_id
    run_id = "run_0"
    module_id = "module_0"

    # Use DuckDB to query the Parquet files directly
    con = duckdb.connect(database=':memory:')
    query = f"""
        SELECT * FROM '{parquet_dir}/**/*.parquet'
        WHERE run_id = '{run_id}' AND module_id = '{module_id}'
    """
    filtered_df = con.execute(query).fetchdf()
    
    end_time = time.time()

    return filtered_df, end_time - start_time



In [7]:
def mongo_size():
    client = pymongo.MongoClient(MONGO_URI)
    db = client[MONGO_DB_NAME]
    stats = db.command("collstats", MONGO_COLLECTION_NAME)
    size_bytes = stats['storageSize']
    return size_bytes

In [8]:
def dir_size(source=PARQUET_DIR):
    total_size = os.path.getsize(source)
    for item in os.listdir(source):
        itempath = os.path.join(source, item)
        if os.path.isfile(itempath):
            total_size += os.path.getsize(itempath)
        elif os.path.isdir(itempath):
            total_size += dir_size(itempath)
    return total_size

dir_size(PARQUET_DIR)

64

In [None]:
print("Generating sample data...")
data = generate_all_data(
    NUM_RUNS, NUM_MODULES, NUM_SAMPLES_PER_MODULE, NUM_METRICS_PER_MODULE
)
print(f"Generated {NUM_RUNS} runs with {NUM_MODULES} modules each")
print(
    f"Each module has {NUM_SAMPLES_PER_MODULE} samples with {NUM_METRICS_PER_MODULE} metrics"
)

print("\n--- MongoDB Performance ---")
mongo_store_time = store_in_mongodb(data)
print(f"MongoDB storage time: {mongo_store_time:.4f} seconds")

mongo_results, mongo_query_time = query_single_metric_mongodb()
print(f"MongoDB query single metric time: {mongo_query_time:.4f} seconds")
print(f"MongoDB results count: {len(mongo_results)}")
mongo_results, mongo_query_time = query_single_item_mongodb()
print(f"MongoDB query single module time: {mongo_query_time:.4f} seconds")
print(f"MongoDB results count: {len(mongo_results)}")

# Parquet comparison
print("\n--- Parquet Performance ---")
parquet_store_time = store_in_parquet(data, PARQUET_DIR)
print(f"Parquet storage time: {parquet_store_time:.4f} seconds")

# Similar query in Parquet
parquet_results, parquet_query_time = query_single_metric_parquet()
print(f"Parquet query single metric time (with DuckDB): {parquet_query_time:.4f} seconds")
print(f"Parquet results count: {len(parquet_results)}")
parquet_results, parquet_query_time = query_single_metric_parquet_pyarrow()
print(f"Parquet query single metric time (with pyarrow): {parquet_query_time:.4f} seconds")
print(f"Parquet results count: {len(parquet_results)}")
parquet_results, parquet_query_time = query_single_item_parquet()
print(f"Parquet query single module time: {parquet_query_time:.4f} seconds")
print(f"Parquet results count: {len(parquet_results)}")

print("\n--- Size Comparison ---")
print(f"MongoDB storage size: {mongo_size() / (1024 * 1024):.2f} MB")
print(f"Parquet storage size: {dir_size(PARQUET_DIR) / (1024 * 1024):.2f} MB")

print("\n--- Summary ---")
print("MongoDB advantages:")
print("- Flexible schema allows for easier updates and schema evolution")
print("- Better for nested/hierarchical data without flattening")
print("- Powerful query capabilities and indexing")
print("- Good for low-latency individual record access")

print("\nParquet advantages:")
print("- Columnar storage optimizes for analytical queries on specific metrics")
print("- Better compression ratio (typically 2-4x smaller than JSON)")
print("- Efficient for batch processing and analytics")
print("- Schema enforcement and type safety")
print("- Works well with big data processing tools (Spark, Presto, etc.)")

print("\nRecommendation:")
if mongo_query_time < parquet_query_time:
    print("For your use case, MongoDB might be more efficient for queries.")
else:
    print("For your use case, Parquet might be more efficient for queries.")

print("\nConsiderations:")
print(
    "1. If you need to query across many runs and metrics, Parquet's columnar nature should be better"
)
print(
    "2. If you primarily access complete records for specific runs, MongoDB should be better"
)
print("3. For very large datasets (millions of runs), consider a hybrid approach:")
print("   - Use MongoDB for recent/hot data with frequent access patterns")
print(
    "   - Archive older data to Parquet for long-term storage and batch analytics"
)
print("4. Partitioning strategies are critical for both approaches as data grows")
print("5. Consider using MongoDB for OLTP workloads and Parquet for OLAP workloads")

Generating sample data...
Generated 1000 runs with 10 modules each
Each module has 100 samples with 20 metrics

--- MongoDB Performance ---
MongoDB storage time: 21.9993 seconds
MongoDB query single metric time: 32.6960 seconds
MongoDB results count: 1000000
MongoDB query single module time: 0.0100 seconds
MongoDB results count: 1

--- Parquet Performance ---
Parquet storage time: 15.9431 seconds


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Parquet query single metric time (with DuckDB): 9.7788 seconds
Parquet results count: 1000000
Parquet query single metric time (with pyarrow): 3.4799 seconds
Parquet results count: 1000000
Parquet query single module time: 0.6225 seconds
Parquet results count: 2000

--- Size Comparison ---
MongoDB storage size: 507.88 MB
Parquet storage size: 600.40 MB

--- Summary ---
MongoDB advantages:
- Flexible schema allows for easier updates and schema evolution
- Better for nested/hierarchical data without flattening
- Powerful query capabilities and indexing
- Good for low-latency individual record access

Parquet advantages:
- Columnar storage optimizes for analytical queries on specific metrics
- Better compression ratio (typically 2-4x smaller than JSON)
- Efficient for batch processing and analytics
- Schema enforcement and type safety
- Works well with big data processing tools (Spark, Presto, etc.)

Recommendation:
For your use case, MongoDB might be more efficient for queries.

Consider

In [12]:
import clickhouse_connect
import json

if __name__ == '__main__':
    client = clickhouse_connect.get_client(
        host='v8u27izgoz.eu-central-1.aws.clickhouse.cloud',
        user='default',
        password='RxvHiATGn60~a',
        secure=True
    )
    print("Result:", client.query("SELECT 1").result_set[0][0])

CLICKHOUSE_DATABASE = 'data_comparison'

# Create database if it doesn't exist
client.query(f'CREATE DATABASE IF NOT EXISTS {CLICKHOUSE_DATABASE}')

Result: 1


<clickhouse_connect.driver.query.QueryResult at 0x5980cc9e0>

In [11]:
client.query(f'DROP DATABASE {CLICKHOUSE_DATABASE}')

<clickhouse_connect.driver.query.QueryResult at 0x5a74429c0>

In [17]:
# Create runs table with semi-structured data
client.query(f'''
CREATE TABLE IF NOT EXISTS {CLICKHOUSE_DATABASE}.runs (
    run_id String,
    timestamp DateTime,
    modules Nested(
        module_id String,
        name String,
        url String,
        comment String,
        metrics_metadata String  -- JSON string
    ),
    samples Nested(
        sample_id String,
        module_id String,
        metrics String  -- JSON string
    )
) ENGINE = MergeTree()
ORDER BY (run_id)
''')

# Create a materialized view for faster metric queries
client.query(f'''
CREATE MATERIALIZED VIEW IF NOT EXISTS {CLICKHOUSE_DATABASE}.metric_values
ENGINE = MergeTree()
ORDER BY (metric_name, run_id, module_id, sample_id)
AS WITH 
    extracted AS (
        SELECT
            run_id,
            modules.module_id AS module_id,
            samples.sample_id AS sample_id,
            tuple.1 AS metric_name,
            tuple.2 AS metric_value
        FROM {CLICKHOUSE_DATABASE}.runs
        ARRAY JOIN 
            modules,
            samples,
            JSONExtractKeysAndValues(samples.metrics, 'String', 'Float64') AS tuple
        WHERE samples.module_id = modules.module_id
    )
SELECT 
    run_id,
    module_id,
    sample_id,
    metric_name,
    metric_value
FROM extracted
''')

<clickhouse_connect.driver.query.QueryResult at 0x5a343d340>

In [18]:
"""Store hierarchical data in ClickHouse"""

# Prepare data for insertion
rows = []
for run in data:
    run_id = run["run_id"]
    timestamp = run["timestamp"]
    
    modules_data = {
        'module_id': [],
        'name': [],
        'url': [],
        'comment': [],
        'metrics_metadata': []
    }
    
    samples_data = {
        'sample_id': [],
        'module_id': [],
        'metrics': []
    }
    
    for module in run["modules"]:
        module_id = module["module_id"]
        
        # Add module info
        modules_data['module_id'].append(module_id)
        modules_data['name'].append(module["name"])
        modules_data['url'].append(module["url"])
        modules_data['comment'].append(module["comment"])
        modules_data['metrics_metadata'].append(json.dumps(module["metrics_metadata"]))
        
        # Add samples info
        for sample in module["samples"]:
            samples_data['sample_id'].append(sample["sample_id"])
            samples_data['module_id'].append(module_id)
            samples_data['metrics'].append(json.dumps(sample["metrics"]))
    
    rows.append({
        'run_id': run_id,
        'timestamp': timestamp,
        'modules': modules_data,
        'samples': samples_data
    })

# Insert data
start_time = time.time()
client.query(
    f'INSERT INTO {CLICKHOUSE_DATABASE}.runs VALUES',
    rows
)
end_time = time.time()
store_time = end_time - start_time
print(f"ClickHouse storage time: {store_time:.4f} seconds")

TypeError: not all arguments converted during string formatting

In [None]:
"""Query a specific metric across all runs and samples"""
metric_name = "metric_0"
start_time = time.time()
# Query using the materialized view
results = client.execute(f'''
SELECT run_id, module_id, sample_id, metric_value
FROM {CLICKHOUSE_DATABASE}.metric_values
WHERE metric_name = %s
''', (metric_name,))
end_time = time.time()
query_time = end_time - start_time
print(f"ClickHouse query time: {query_time:.4f} seconds")