In [1]:
import sys
!{sys.executable} -m pip install --upgrade google-cloud-bigquery matplotlib numpy scipy

In [1]:
%%time

from google.cloud import bigquery
import warnings

PROJECT="" # Enter your Google Project ID
LOCATION="US"
DATASET="cromwell_monitoring"
DATE_RANGE="30 DAY"
MAX_GB_PROCESSED=1
MAX_ROWS=100000

warnings.filterwarnings('ignore', '.*user credentials from Google Cloud SDK.*', module='google.auth')

def monitoring_query(dry_run=False):
    client = bigquery.Client()
    job_config = bigquery.QueryJobConfig(dry_run=dry_run)
    query = f"""
        WITH metrics AS (
          SELECT
            instance_id,
            TIMESTAMP_DIFF(MAX(timestamp), MIN(timestamp), SECOND) runtime_duration_sec,
            AVG((SELECT AVG(p) FROM UNNEST(cpu_used_percent) p)) cpu_used_percent_avg,
            MAX(mem_used_gb) mem_used_gb_max,
            MAX(disk_used_gb[OFFSET(0)]) disk_used_gb_max,
            AVG(disk_read_iops[OFFSET(0)]) disk_read_iops_avg,
            AVG(disk_write_iops[OFFSET(0)]) disk_write_iops_avg
          FROM
            `{PROJECT}.{DATASET}.metrics`
          WHERE
            timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {DATE_RANGE})
          GROUP BY
            instance_id
        ),
        
        results AS (
            SELECT
              r.project_id, r.zone, r.preemptible,
              r.workflow_id, workflow_name, r.task_call_name, r.shard, r.attempt, execution_status,
              m.start_time metadata_start_time, TIMESTAMP_DIFF(m.end_time, m.start_time, SECOND) metadata_duration_sec, runtime_duration_sec,
              cpu_platform, r.cpu_count cpu_total_cores,
              (r.cpu_count * cpu_used_percent_avg / 100) cpu_used_cores_avg,
              r.mem_total_gb, mem_used_gb_max,
              r.disk_mounts,
              disk_types[OFFSET(0)] disk_type,
              r.disk_total_gb[OFFSET(0)] disk_total_gb,
              disk_used_gb_max, disk_read_iops_avg, disk_write_iops_avg,
              (SELECT SUM(CAST(value AS FLOAT64)) FROM UNNEST(inputs) WHERE type = 'file') inputs_size_gb,
              docker_image
            FROM
              `{PROJECT}.{DATASET}.runtime` r
            JOIN
              metrics
            USING (instance_id)
            JOIN
              `{PROJECT}.{DATASET}.metadata` m
            USING (instance_name)
            WHERE
              r.start_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {DATE_RANGE})
              AND
              m.start_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {DATE_RANGE})
            ORDER BY
              r.start_time DESC
        )
        
        SELECT *
        FROM results
        WHERE RAND() < {MAX_ROWS}/(SELECT COUNT(*) FROM results)
        AND inputs_size_gb IS NOT NULL
    """
    return client.query(
        query,
        location=LOCATION,
        job_config=job_config,
    )

q = monitoring_query(dry_run=True)
gb_processed = q.total_bytes_processed / 1024**3
if gb_processed > MAX_GB_PROCESSED:
    print(f"This query will process {gb_processed:.1f} GB when run. Please adjust DATE_RANGE and retry.")
    exit(1)
else:
    rows = [row for row in monitoring_query()]
    print(f"Sample size: {len(rows)} rows.")

In [1]:
%%time

%config InlineBackend.figure_formats = ['png']

from enum import Enum
import matplotlib.pyplot as plt
import numpy as np
from scipy.spatial import ConvexHull

MIN_SAMPLE_SIZE = 5
MIN_INPUTS_SIZE_GB = 0.1
DISK_OFFSET_GB = 0.1

CPU_HOUR_PRICE = 0.033174
CPU_HOUR_PRICE_PREEMPT = 0.00698
MEM_HOUR_PRICE = 0.004446
MEM_HOUR_PRICE_PREEMPT = 0.00094

MONTH_HOURS = 730
HDD_HOUR_PRICE = 0.04 / MONTH_HOURS
SSD_HOUR_PRICE = 0.17 / MONTH_HOURS
LOCAL_SSD_HOUR_PRICE = 0.08 / MONTH_HOURS
LOCAL_SSD_HOUR_PRICE_PREEMPT = 0.048 / MONTH_HOURS

calls = {}
for row in rows:
    if row.task_call_name in calls:
        calls[row.task_call_name].append(row)
    else:
        calls[row.task_call_name] = [row]

def to_str(f: float):
    return ('%f' % f).rstrip('0').rstrip('.')

class Metric(Enum):
    CPU = 1
    MEM = 2
    DISK = 3

def get_metric_type(metric: str):
    if metric.startswith('cpu'):
        return Metric.CPU
    elif metric.startswith('mem'):
        return Metric.MEM
    elif metric.startswith('disk'):
        return Metric.DISK

def find_best_fit(x, y, offset, m_type):
    x = np.array(x)
    px = np.insert(x, 0, x.min())
    y = np.add(y, offset)
    py = np.insert(y, 0, 2 * y.max())
    points = np.array([px, py]).transpose()

    hull = ConvexHull(points, qhull_options='QG0')

    fit_x = np.linspace(x.min(), x.max(), 1000)
    best_fit_y0 = None
    best_fit_y = None
    best_fit_label = None
    best_sum = np.inf

    for facet in hull.simplices[hull.good]:
        fit = np.polyfit(hull.points[facet, 0], hull.points[facet, 1], 1)

        if m_type == Metric.CPU:
            fit = np.round(fit / 2 * 10) / 10
        elif m_type == Metric.MEM:
            fit = np.ceil(fit / 0.25 * 10) / 10
        elif m_type == Metric.DISK:
            fit = np.ceil(fit * 10) / 10

        fit_yy0 = fit[0] * x + fit[1]
        fit_yy = fit[0] * fit_x + fit[1]
        fit_formula = f"{fit[0]:.1f} * size(inputs, 'G') + {fit[1]:.1f}"

        if m_type == Metric.CPU:
            thresh = 0.75
            fit_y0 = np.array([1 if yy0 < thresh else np.round(yy0) * 2 for yy0 in fit_yy0])
            fit_y = np.array([1 if yy < thresh else np.round(yy) * 2 for yy in fit_yy])
            if fit_yy.max() < thresh:
                fit_label = f'1'
            elif fit_yy.min() > thresh:
                if fit_y.min() == fit_y.max():
                    fit_label = to_str(fit_y[0])
                else:
                    fit_label = f'round({fit_formula}) * 2'
            else:
                fit_label = f'if {fit_formula} < {thresh} then 1 else round({fit_formula}) * 2'
        elif m_type == Metric.MEM:
            thresh = 3
            fit_y0 = np.array([1 if yy0 <= thresh else np.ceil(yy0) * 0.25 for yy0 in fit_yy0])
            fit_y = np.array([1 if yy <= thresh else np.ceil(yy) * 0.25 for yy in fit_yy])
            if fit_yy.max() <= thresh:
                fit_label = f'1'
            elif fit_yy.min() > thresh:
                if fit_y.min() == fit_y.max():
                    fit_label = to_str(fit_y[0])
                else:
                    fit_label = f'ceil({fit_formula}) * 0.25'
            else:
                fit_label = f'if {fit_formula} <= {thresh} then 1 else ceil({fit_formula}) * 0.25'
        elif m_type == Metric.DISK:
            fit_y0 = np.ceil(fit_yy0)
            fit_y = np.ceil(fit_yy)
            if fit_y.min() == fit_y.max():
                fit_label = to_str(fit_y[0])
            else:
                fit_label = f'ceil({fit_formula})'

        s = fit_y.sum()
        if s <= best_sum:
            best_sum = s
            best_fit_y0 = fit_y0
            best_fit_y = fit_y
            best_fit_label = f'best fit: {fit_label}'
    
    return fit_x, best_fit_y, best_fit_y0, best_fit_label

def get_price(y, preemptible, hours, units, preemptible_units):
    yy = np.multiply(y, hours)
    return np.multiply(yy[~preemptible], units[~preemptible]).sum() + \
        np.multiply(yy[preemptible], preemptible_units[preemptible]).sum()

def get_disk_price(s):
    if s.disk_type == 'HDD':
        return HDD_HOUR_PRICE
    elif s.disk_type == 'SSD':
        return SSD_HOUR_PRICE
    elif s.disk_type == 'LOCAL':
        if s.preemptible:
            return LOCAL_SSD_HOUR_PRICE_PREEMPT
        else:
            return LOCAL_SSD_HOUR_PRICE

def get_price_savings(samples, total, fit, m_type):
    preemptible = np.array([s.preemptible for s in samples])
    hours = np.array([s.runtime_duration_sec / 3600.0 for s in samples])

    if m_type == Metric.CPU:
        price_units = np.full(len(samples), CPU_HOUR_PRICE)
        price_units_preempt = np.full(len(samples), CPU_HOUR_PRICE_PREEMPT)
        price_units_fit = price_units
        price_units_fit_preempt = price_units_preempt
    elif m_type == Metric.MEM:
        price_units = np.full(len(samples), MEM_HOUR_PRICE)
        price_units_preempt = np.full(len(samples), MEM_HOUR_PRICE_PREEMPT)
        price_units_fit = price_units
        price_units_fit_preempt = price_units_preempt
    elif m_type == Metric.DISK:
        price_units = np.array([get_disk_price(s) for s in samples])
        price_units_preempt = price_units
        price_units_fit = np.full(len(samples), HDD_HOUR_PRICE)
        price_units_fit_preempt = price_units_fit

    price_total = get_price(total, preemptible, hours, price_units, price_units_preempt)
    price_fit = get_price(fit, preemptible, hours, price_units_fit, price_units_fit_preempt)
    savings = price_total - price_fit

    return price_total, savings

def plot(i, j, samples, title, metric, total, ylabel, color, offset=0):
    failed = [s for s in samples if s.execution_status == 'Failed']
    done = [s for s in samples if s.execution_status == 'Done']

    inputs_size_gb_all = [s.inputs_size_gb for s in samples]
    inputs_size_gb_failed = [s.inputs_size_gb for s in failed]
    inputs_size_gb_done = [s.inputs_size_gb for s in done]
    
    metric_all = [s[metric] for s in samples]
    metric_failed = [s[metric] for s in failed]
    metric_done = [s[metric] for s in done]

    total_all = [s[total] for s in samples]
    total_failed = [s[total] for s in failed]
    total_done = [s[total] for s in done]

    m_type = get_metric_type(metric)
    fit_x, fit_y, fit_y0, fit_label = find_best_fit(inputs_size_gb_all, metric_all, offset, m_type)
    spending, savings = get_price_savings(samples, total_all, fit_y0, m_type)

    plt.subplot(n, 3, i * 3 + j)

    if len(inputs_size_gb_failed) > 0:
        plt.plot(inputs_size_gb_failed, metric_failed, 'o', label=f'{metric} (failure)', color='xkcd:orange')
        plt.plot(inputs_size_gb_failed, total_failed, 'o', label=f'{total} (failure)', color='xkcd:light orange')

    plt.plot(inputs_size_gb_done, metric_done, '.', label=f'{metric} (success)', color=f'xkcd:{color}')
    plt.plot(inputs_size_gb_done, total_done, '.', label=f'{total} (success)', color=f'xkcd:light {color}')
        
    if fit_y is None:
        print(f'Failed to find a fit for {metric} of "{title}" task call')
    else:
        plt.plot(fit_x, fit_y, label=fit_label)

    plt.xlabel('Inputs (GB)')
    plt.ylabel(ylabel)
    plt.title(f'{title}: spending ~\${spending:0.2f}, savings ~\${savings:0.2f}')
    plt.legend()

i = 0
n = len(calls)
fig = plt.figure(figsize=(30, 10 * n))

for name, samples in calls.items():
    call = f'"{name}" task call'
    if len(samples) < MIN_SAMPLE_SIZE:
        print(f'Skipping {call}: sample too small')
        continue
    
    if np.all([s.inputs_size_gb < MIN_INPUTS_SIZE_GB for s in samples]):
        print(f'Skipping {call}: inputs size too small')
        continue

    plot(i, 1, samples, name, 'cpu_used_cores_avg', 'cpu_total_cores', 'CPU (cores)', 'green')
    plot(i, 2, samples, name, 'mem_used_gb_max', 'mem_total_gb', 'Memory (GB)', 'blue')
    plot(i, 3, samples, name, 'disk_used_gb_max', 'disk_total_gb', 'Disk (GB)', 'magenta', DISK_OFFSET_GB)

    i += 1

plt.show()