Authenticate

In [1]:
from google.colab import auth
auth.authenticate_user()

Methods for processing data

CONFIGURATION_COLUMNS_DEFAULT_VALUES is a dict where keys are columns defining the performance test configuration and values are their default values (if any is defined). The default values are dedicated for configuration columns which were added at a later point and are missing from older test attempts.




In [111]:
import json
import warnings
from typing import Dict, List, Optional, Set, Tuple

import numpy as np
import pandas as pd
from google.cloud import bigquery
from tabulate import tabulate


PREFIX_LABEL_INDICATOR = "_prefix"
PRECISION = 4

# we assume here that configuration column value can only be missing
# if that column did not exist at the time of the test attempt
CONFIGURATION_COLUMNS_DEFAULT_VALUES = {
    "composer_version": None,
    "airflow_version": None,
    "python_version": None,
    "environment_size": None,
    "elastic_dag_configuration_type": None,
    "private_ip_enabled": None,
    "drs_enabled": False,
    "AIRFLOW__CORE__STORE_SERIALIZED_DAGS": False,
}


def assign_time_series_metrics(
    resources_dict: Dict, time_series_metrics: Set[str]
) -> None:
    """
    Assigns time series metrics to proper resource type and metric category

    :param resources_dict: dict containing information about analyzed resource types.
    :type resources_dict: Dict
    :param time_series_metrics: set of column names containing time series metrics data.
    :type time_series_metrics: Set[str]
    """

    for resource_type in resources_dict:
        resources_dict[resource_type]["gauge_metrics"] = []
        resources_dict[resource_type]["cumulative_metrics"] = []

    # by iterating this way we make sure every metric ends up in only one of the resource types
    # and metric kinds
    for metric in time_series_metrics:

        metric_elements = metric.split("__")
        assigned = False

        for resource_type in sorted(resources_dict):

            if resource_type not in metric_elements:
                continue

            if "CUMULATIVE" in metric_elements:
                resources_dict[resource_type]["cumulative_metrics"].append(metric)
                assigned = True
                break

            if "GAUGE" in metric_elements:
                resources_dict[resource_type]["gauge_metrics"].append(metric)
                assigned = True
                break

        if not assigned:
            warnings.warn(
                f"Metric {metric} does not match any of resource types or metric kinds."
            )


def compare_performance_of_two_configurations(
    all_perf_test_data: pd.DataFrame,
    resources_dict: Dict,
    general_metrics: Set[str],
    env_conf_1: Dict,
    env_conf_2: Dict,
    include_test_attempts_with_failed_dag_runs: Optional[bool] = False,
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """
    Collects results for test attempts matching two different configurations,
    calculates average values of metrics specified in resources_dict
    and returns a dataframe with comparison of these averages.

    :param all_perf_test_data: pandas Dataframe with all performance test data collected from BQ.
    :type all_perf_test_data: pd.DataFrame
    :param resources_dict: dict containing information about time series metrics that should be
        analyzed for different resource types and optional filter that control resource aggregation.
    :type resources_dict: Dict
    :param general_metrics: set with metrics referring to a whole test attempt. Average of each of
        these metrics will be calculated across all matching test attempts.
    :type general_metrics: Set[str]
    :param env_conf_1: dict with configuration column values for first environment. If one of the
        configuration columns is missing, then all its values apply.
    :type env_conf_1: Dict
    :param env_conf_2: dict with configuration column values for second environment. Performance of
        this configuration will be compared in regard to configuration from env_conf_1.
    :type env_conf_2: Dict
    :param include_test_attempts_with_failed_dag_runs: set to True if you want the test attempts
        with failed dag runs to be included in analysis.
    :type include_test_attempts_with_failed_dag_runs: bool

    :return: three pandas DataFrames:
        first: containing average values of general and time series metrics for both
            configurations as well as percentage difference between env_conf_2 and env_conf_1.
        second: containing number of test_attempts per set of configuration columns missing from
            env_conf_1. None of none of configuration columns are missing from env_conf_1.
        third: containing number of test_attempts per set of configuration columns missing from
            env_conf_2. None of none of configuration columns are missing from env_conf_2.
    :rtype: Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]
    """

    missing_conf_columns_1, missing_conf_columns_2 = validate_configurations(
        env_conf_1, env_conf_2
    )

    # TODO: with column missing from ENVIRONMENT_CONFIGURATION make a total summary and a grouping
    #  (per every value of missing column)
    #  -> make it an option - either collect all matching or do the grouping;
    #  when doing the grouping, the groups should be chosen so that configurations
    #  differ only with one column - this difference in one column should be checked
    #  before doing the grouping on missing columns

    configuration_data = collect_results_for_configuration(
        all_perf_test_data, env_conf_1, include_test_attempts_with_failed_dag_runs
    )

    test_attempts_per_missing_conf_1 = (
        get_test_attempts_count_for_missing_conf_columns(
            configuration_data, missing_conf_columns_1
        )
        if missing_conf_columns_1
        else None
    )

    _, conf_1_total_results = calculate_statistics_for_configuration(
        configuration_data, resources_dict, general_metrics
    )

    configuration_data = collect_results_for_configuration(
        all_perf_test_data, env_conf_2, include_test_attempts_with_failed_dag_runs
    )

    test_attempts_per_missing_conf_2 = (
        get_test_attempts_count_for_missing_conf_columns(
            configuration_data, missing_conf_columns_2
        )
        if missing_conf_columns_2
        else None
    )

    _, conf_2_total_results = calculate_statistics_for_configuration(
        configuration_data, resources_dict, general_metrics
    )

    categories = []
    conf_1_values = []
    conf_2_values = []
    differences = []

    all_categories = set(conf_2_total_results).union(set(conf_1_total_results))

    for category in sorted(all_categories):

        if category not in conf_1_total_results or category not in conf_2_total_results:

            conf_1_value = round(conf_1_total_results.get(category, np.nan), PRECISION)
            conf_2_value = round(conf_2_total_results.get(category, np.nan), PRECISION)
            # display nan in difference in case of category missing from one configuration
            percentage_change_str = np.nan

        else:
            # it should be impossible for categories present for both configurations to be np.nan
            conf_1_value = round(conf_1_total_results[category], PRECISION)
            conf_2_value = round(conf_2_total_results[category], PRECISION)

            # also covers the case when they are both zero
            if conf_1_value == conf_2_value:
                percentage_change_str = "0.0%"
            # if the first value is zero
            elif not conf_1_value:
                percentage_change_str = "+inf" if conf_2_value > 0 else "-inf"
            else:
                percentage_change = round(
                    abs((conf_2_value - conf_1_value) / conf_1_value * 100), PRECISION
                )
                change_sign = "+" if conf_2_value > conf_1_value else "-"
                percentage_change_str = f"{change_sign}{percentage_change}%"

        categories.append(category)
        conf_1_values.append(conf_1_value)
        conf_2_values.append(conf_2_value)
        differences.append(percentage_change_str)

    comparison_df = pd.DataFrame(
        {
            "category": categories,
            "configuration_1": conf_1_values,
            "configuration_2": conf_2_values,
            "difference": differences,
        }
    )

    return (
        comparison_df,
        test_attempts_per_missing_conf_1,
        test_attempts_per_missing_conf_2,
    )


def validate_configurations(
    env_conf_1: Dict, env_conf_2: Dict
) -> Tuple[List[str], List[str]]:
    """
    Checks the configuration that are to be compared and reports discrepancies.

    :param env_conf_1: dict with configuration column values for first environment..
    :type env_conf_1: Dict
    :param env_conf_2: dict with configuration column values for second environment..
    :type env_conf_2: Dict

    :return: two lists of configuration columns missing from both of configuration dicts.
    :rtype: Tuple[List[str], List[str]]
    """
    missing_conf_columns_1 = check_configuration(env_conf_1, 1)
    missing_conf_columns_2 = check_configuration(env_conf_2, 2)

    if env_conf_1 == env_conf_2:
        warnings.warn("Configurations are identical.")

    else:

        missing_columns = env_conf_1.keys() - env_conf_2.keys()
        if missing_columns:
            warnings.warn(
                f"Configuration 1 contains columns missing from configuration 2: {missing_columns}."
            )

        missing_columns = env_conf_2.keys() - env_conf_1.keys()
        if missing_columns:
            warnings.warn(
                f"Configuration 2 contains columns missing from configuration 1: {missing_columns}."
            )

        # they should differ with only one configuration column:
        # - composer_version
        # - airflow version (if composer_version is set to None
        #   which indicates vanilla airflow tests)
        different_columns = {
            column
            for column in env_conf_1
            if column in env_conf_2 and env_conf_1[column] != env_conf_2[column]
        }

        if len(different_columns) > 1:
            warnings.warn(
                f"Configurations differ in more than one column: {different_columns}."
            )

    return missing_conf_columns_1, missing_conf_columns_2


def check_configuration(env_conf: Dict, number: Optional[int] = None) -> List[str]:
    """
    Checks if there are discrepancies in the environments configuration that should be
    collected for performance analysis.

    :param env_conf: dict with configuration column values.
    :type env_conf: Dict
    :param number: optional number identifying the configuration.
    :type number: int

    :return: list with configuration columns missing from given configuration.
    :rtype: List[str]
    """

    missing_conf_columns = list(
        sorted(CONFIGURATION_COLUMNS_DEFAULT_VALUES.keys() - env_conf.keys())
    )

    for column in env_conf:

        if column not in CONFIGURATION_COLUMNS_DEFAULT_VALUES:
            warnings.warn(
                f"Configuration {number if number is not None else ''} contains column '{column}' "
                f"which is not a configuration column."
            )

    if "composer_version" not in env_conf:
        warnings.warn(
            f"composer_version column is missing "
            f"from configuration {number if number is not None else ''}."
        )

    return missing_conf_columns


def collect_results_for_configuration(
    all_perf_test_data: pd.DataFrame,
    env_conf: Dict,
    include_test_attempts_with_failed_dag_runs: Optional[bool] = False,
) -> pd.DataFrame:
    """
    Collects results matching configuration columns provided in env_conf dictionary.

    :param all_perf_test_data: pandas Dataframe with all performance test data collected from BQ.
    :type all_perf_test_data: pd.DataFrame
    :param env_conf: dict with configuration column values. If one of the configuration columns
        is missing from the dictionary, then it is not included in query and all its values apply.
    :type env_conf: Dict
    :param include_test_attempts_with_failed_dag_runs: set to True if you want the test attempts
        with failed dag runs to be included in analysis.
    :type include_test_attempts_with_failed_dag_runs: bool

    :return: pandas DataFrame containing performance metrics for test attempts that match the
        provided configuration.
    :rtype: pd.DataFrame
    """

    query_parts = []

    for column_name, column_value in env_conf.items():

        if isinstance(column_value, str):
            query_parts.append(f'{column_name} == "{column_value}"')
        else:
            query_parts.append(f"{column_name} == {column_value}")

    if not include_test_attempts_with_failed_dag_runs:
        query_parts.append("dag_run_failed_count == 0")

    query = " & ".join(query_parts)

    return all_perf_test_data.query(query)


def get_test_attempts_count_for_missing_conf_columns(
    configuration_data: pd.DataFrame, missing_conf_columns: List[str]
) -> Optional[pd.DataFrame]:
    """
    For provided subset of test attempts checks how many of them belong to different set of values
    of missing_conf_columns.

    :param configuration_data: pandas DataFrame containing performance metrics for test attempts
        that match certain configuration dict.
    :type configuration_data: pd.DataFrame
    :param: missing_conf_columns: list with configuration columns missing from configuration dict.
    :type missing_conf_columns: List[str]

    :return: pandas DataFrame that for every group of values of missing_conf_columns present in
        configuration_data contains number of test_attempts representing this group
    :rtype: pd.DataFrame
    """

    if not missing_conf_columns:
        warnings.warn("missing_conf_columns is empty.")
        return None

    df_rows = []

    for group_name, group in configuration_data.groupby(missing_conf_columns):

        test_attempts_in_group = len(group.groupby("uuid"))
        missing_conf_columns_values = list(group_name)

        df_rows.append(missing_conf_columns_values + [test_attempts_in_group])

    test_attempts_per_missing_conf = pd.DataFrame(
        df_rows, columns=missing_conf_columns + ["test_attempts"]
    )

    return test_attempts_per_missing_conf


def calculate_statistics_for_configuration(
    configuration_data: pd.DataFrame, resources_dict: Dict, general_metrics: Set[str]
) -> Tuple[Dict, Dict]:
    """
    For provided subset of test attempts calculates average growth of cumulative metrics
    and average values of gauge and general metrics, grouping resources based on resource types and
    filters from resources_dict.

    :param configuration_data: pandas DataFrame containing performance metrics for test attempts
        that match certain configuration.
    :type configuration_data: pd.DataFrame
    :param resources_dict: dict containing information about time series metrics that should be
        analyzed for different resource types and optional filter that control resource aggregation.
    :type resources_dict: Dict
    :param general_metrics: set with metrics referring to a whole test attempt. Average of each of
        these metrics will be calculated across all test attempts from configuration_data.
    :type general_metrics: Set[str]

    :return: pandas DataFrame containing average values of general and time series metrics for both
        configurations as well as percentage difference between env_conf_2 and env_conf_1.
    :rtype: pd.DataFrame
    """

    uuid_results_dict = {}

    for resource_type in resources_dict:

        analyzed_columns = (
            resources_dict[resource_type]["group_by_columns"]
            + ["timestamp"]
            + resources_dict[resource_type]["cumulative_metrics"]
            + resources_dict[resource_type]["gauge_metrics"]
        )

        configuration_data_grouped = configuration_data[analyzed_columns].groupby(
            resources_dict[resource_type]["group_by_columns"]
        )

        uuid_index = resources_dict[resource_type]["group_by_columns"].index("uuid")

        group_filters = resources_dict[resource_type].get("group_filters")

        for group_name, single_resource_data in configuration_data_grouped:

            if group_filters:
                group_filter_name = check_if_group_filters_apply(
                    group_name,
                    group_filters,
                    resources_dict[resource_type]["group_by_columns"],
                )
                if group_filter_name is None:
                    continue
            else:
                group_filter_name = "all"

            uuid = group_name[uuid_index]

            if uuid not in uuid_results_dict:
                uuid_results_dict[uuid] = {}

            if group_filter_name not in uuid_results_dict[uuid]:
                uuid_results_dict[uuid][group_filter_name] = {}

            # in case of pod metrics it is possible to get duplicated entries
            # due to multiple containers on given pod, but with a correct
            # subset of columns we can simply remove duplicated rows;
            # after removing duplicates we should have a single row per
            # timestamp in every group
            single_resource_data = single_resource_data.sort_values(
                ["timestamp"]
            ).drop_duplicates()

            for cumulative_metric in resources_dict[resource_type][
                "cumulative_metrics"
            ]:
                # add _per_second suffix to cumulative metric names
                # to indicate the change in their meaning
                cumulative_metric_growth = f"{cumulative_metric}_per_second"

                if (
                    cumulative_metric_growth
                    not in uuid_results_dict[uuid][group_filter_name]
                ):
                    uuid_results_dict[uuid][group_filter_name][
                        cumulative_metric_growth
                    ] = []

                average_growth = process_cumulative_metric_for_single_resource(
                    single_resource_data,
                    cumulative_metric,
                    resources_dict[resource_type]["group_by_columns"],
                )

                uuid_results_dict[uuid][group_filter_name][
                    cumulative_metric_growth
                ].append(average_growth)

            for gauge_metric in resources_dict[resource_type]["gauge_metrics"]:
                if gauge_metric not in uuid_results_dict[uuid][group_filter_name]:
                    uuid_results_dict[uuid][group_filter_name][gauge_metric] = []

                average_metric_value = process_gauge_metric_for_single_resource(
                    single_resource_data,
                    gauge_metric,
                    resources_dict[resource_type]["group_by_columns"],
                )

                uuid_results_dict[uuid][group_filter_name][gauge_metric].append(
                    average_metric_value
                )

    final_results_dict = {}

    for uuid in uuid_results_dict:

        for group_filter_name in uuid_results_dict[uuid]:

            for metric_name in uuid_results_dict[uuid][group_filter_name]:

                # sum of metric values across resources of specific category
                # (for example airflow-worker containers)
                sum_of_values = np.sum(
                    uuid_results_dict[uuid][group_filter_name][metric_name]
                )

                uuid_results_dict[uuid][group_filter_name][metric_name] = sum_of_values

                if np.isnan(sum_of_values):
                    warnings.warn(
                        f"Nan value encountered when calculating value of {metric_name} "
                        f"for {group_filter_name} category of resources for uuid {uuid}. "
                        f"Skipping this uuid in calculating mean value of this metric "
                        f"for this resource category."
                    )
                    continue

                category = "__".join([group_filter_name, metric_name])

                if category not in final_results_dict:
                    final_results_dict[category] = []

                final_results_dict[category].append(sum_of_values)

    # finally, calculate mean of values across different test attempts (different uuids)
    for category in final_results_dict:

        final_results_dict[category] = np.mean(final_results_dict[category])

    # first row for every test attempt
    general_results = configuration_data.groupby(["uuid"]).first()

    # calculate mean for non-time-series metrics
    for general_metric in general_metrics:
        general_metric_mean = general_results[general_metric].mean()
        if np.isnan(general_metric_mean):
            warnings.warn(
                f"{general_metric} for all test attempts in one of configurations is nan."
            )
            continue
        final_results_dict[general_metric] = general_metric_mean

    # TODO: add information about amount of uuids that contributed to each of averages

    final_results_dict["test_attempts"] = len(general_results)

    return uuid_results_dict, final_results_dict


def check_if_group_filters_apply(
    group_name: Tuple[str, ...],
    group_filters: Dict[str, Dict],
    group_by_columns: List[str],
) -> Optional[str]:
    """
    Returns first of group_filters specified for the resource type that applies to given group_name
    or None if none of them applies to this group.

    :param group_name: tuple that identifies a single group created as a result of grouping
        configuration_data by group_by_columns.
    :type group_name: Tuple[str, ...]
    :param group_filters: a dictionary containing filters which control how groups of given resource
        type should be aggregated.
    :type group_filters: Dict[str, Dict]
    :param group_by_columns: list of columns by which configuration_data was grouped to calculate
        performance of given resource type.
    :type group_by_columns: List[str]

    :return: name of the first filter that fully applies to group_name or None
        if none of them applies.
    :rtype: Optional[str]

    :raises: ValueError: if one of group filters contains filter on column that is not
        amongst columns by which metrics of single test attempt were grouped.
    """

    for filter_name in group_filters:

        for column_filter in group_filters[filter_name]:

            if column_filter.endswith(PREFIX_LABEL_INDICATOR):
                column_name = column_filter[: -(len(PREFIX_LABEL_INDICATOR))]
                expected_column_value = group_filters[filter_name][column_filter]
                matching_function = column_value_starts_with
            else:
                column_name = column_filter
                expected_column_value = group_filters[filter_name][column_filter]
                matching_function = column_value_equals

            if column_name not in group_by_columns:
                raise ValueError(
                    f"Group filter {filter_name} contains filter on column {column_name} "
                    f"that is not amongst columns "
                    f"by which this resource is grouped: {group_by_columns}."
                )

            index = group_by_columns.index(column_name)

            column_value = group_name[index]

            # if at least one column filter does not apply, then check next group of filters
            if not matching_function(column_value, expected_column_value):
                break

        # if all column_filter from given group_filter apply then we have found
        # a match
        else:
            return filter_name

    return None


def column_value_starts_with(column_value: str, expected_value: str) -> bool:
    """
    Returns True if column_value starts with expected_value and False otherwise
    """
    return column_value.startswith(expected_value)


def column_value_equals(column_value: str, expected_value: str) -> bool:
    """
    Returns True if column_value is equal to expected_value and False otherwise
    """
    return column_value == expected_value


def process_gauge_metric_for_single_resource(
    single_resource_data: pd.DataFrame, metric: str, group_by_columns: List[str]
) -> float:
    """
    Calculates average value of gauge metric for single resource
    (node, pod or container) in given test attempt.

    :param single_resource_data: pandas DataFrame which contains time series data of single resource
        (single node, pod or container).
    :type single_resource_data: pd.DataFrame
    :param metric: name of gauge metric average of which should be calculated.
    :type metric: str
    :param group_by_columns: list of columns by which configuration_data was grouped to calculate
        performance of given resource type.
    :type group_by_columns: List[str]

    :return: float value with average of provided metric or np.nan if DataFrame did not contain a
        single value of that metric.
    :rtype: float
    """

    count = single_resource_data[metric].count()

    # count is 0 if metric column consists only of nans
    if count == 0:
        warnings.warn(
            f"It seems that {metric} contains only nan values for "
            f"{identify_resource(single_resource_data, group_by_columns)}. Returning nan."
        )
        return np.nan

    return single_resource_data[metric].sum() / count


def process_cumulative_metric_for_single_resource(
    single_resource_data: pd.DataFrame, metric: str, group_by_columns: List[str]
) -> float:
    """
    Calculates average growth of cumulative metric over time for single resource
    (node, pod or container) in given test attempt.

    :param single_resource_data: pandas DataFrame which contains time series data of single resource
        (single node, pod or container). Must be sorted by timestamp.
    :type single_resource_data: pd.DataFrame
    :param metric: name of cumulative metric average growth of which should be calculated.
    :type metric: str
    :param group_by_columns: list of columns by which configuration_data was grouped to calculate
        performance of given resource type.
    :type group_by_columns: List[str]

    :return: float value with average growth per second of provided metric or np.nan if DataFrame
        contains less than two values of that metric.
    :rtype: float

    :raises: ValueError: if metric does not follow the constraints of cumulative metric
    """

    min_value = np.nan

    for _, row in single_resource_data.iterrows():
        metric_value = row[metric]

        if np.isnan(metric_value):
            continue

        if np.isnan(min_value):
            min_value = metric_value
            current_max_value = metric_value

            min_timestamp = row["timestamp"]
            current_max_timestamp = row["timestamp"]
            continue

        # the same value as in previous non-nan timestamp is allowed
        if metric_value < current_max_value:
            # TODO: check what happens when pod/container gets restarted
            raise ValueError(
                f"Error when calculating {metric} for "
                f"{identify_resource(single_resource_data, group_by_columns)}. "
                f"The value in latter timestamp is smaller than in a previous one."
            )

        current_max_value = metric_value
        current_max_timestamp = row["timestamp"]

    if np.isnan(min_value):
        warnings.warn(
            f"It seems that metric {metric} contains only nan values for "
            f"{identify_resource(single_resource_data, group_by_columns)}. Returning nan."
        )
        return np.nan

    if min_timestamp == current_max_timestamp:
        warnings.warn(
            f"It seems that metric {metric} contains only a single non-nan value for "
            f"{identify_resource(single_resource_data, group_by_columns)}. and its growth "
            f"cannot be calculated. Returning nan."
        )
        return np.nan

    growth = (current_max_value - min_value) / (current_max_timestamp - min_timestamp)

    return growth

    # in case of 'breaks' in rising of cumulative metric
    # (for example due to pod restart) we could probably take
    # the total increase of metric value divided by total timespan

    #
    # 7
    # 6       /
    # 5      /
    # 4     /
    # 3    /              /
    # 2   /              /
    # 1  /              /
    # 0  1  2  3  4  5  6  7  8

    # so in this case:
    # increase: 6 + 3
    # timespan: 2 + 1


def identify_resource(
    single_resource_data: pd.DataFrame, group_by_columns: List[str]
) -> str:
    """
    Returns a string identifying the resource data of which is stored in single_resource_data.

    :param single_resource_data: pandas DataFrame which contains time series data of single resource
        (single node, pod or container).
    :type single_resource_data: pd.DataFrame
    :param group_by_columns: list of columns by which configuration_data was grouped to calculate
        performance of this resource type.
    :type group_by_columns: List[str]

    :return: string identifying the resource.
    :rtype: str
    """
    string_parts = []

    for column in group_by_columns:

        string_parts.append(f"{column}: {single_resource_data[column].iloc[0]}")

    return ", ".join(string_parts)


Environment setup

ENVIRONMENT_CONFIGURATION (1 and 2) are dicts specifying the configuration for which results summary should be done. Removing one of the keys will cause the test attempts with any value in the removed column to be included in the calculations.

TIME_SERIES_METRICS is a set of column names with time series metrics which should be included in the summary of performance. If a column is not present for one of the test attempts (for example because corresponding metric was added recently) then given test attempt will simply not partake in calculating average of this metric.

GENERAL_METRICS is a set of column names that contain metrics which refer to the whole test attempt. Similarly to TIME_SERIES_METRICS, if a column is not present for some uuids, then they will be excluded from calculation of average.

RESOURCES_DICT is a dictionary that describes which time series metrics apply to which resource type, as well as by which columns data should be grouped in order to calculate average performance for given resource type correctly. Optionally, every resource type can also specify group_filters, which describe how the created groups should be aggregated. For example, you can specify that average of container metrics should be calculated separately only for 'airflow-worker' and 'airflow-scheduler' containers. Not specifying any group_filter causes all resorces of given type to be included in calculations.

In [112]:
PROJECT_ID = "polidea-airflow"
DATASET = "characteristics_dataset"

ENVIRONMENT_CONFIGURATION_1 = {
    "composer_version": "1.10.3",
    "airflow_version": "1.10.6",
    "python_version": "2.7.12",
    "environment_size": "small",
    "elastic_dag_configuration_type": "no_structure__200_dags__1_tasks__1_dag_runs__0.0_sleep__python_operator",
    "private_ip_enabled": False,
    "drs_enabled": False,
    "AIRFLOW__CORE__STORE_SERIALIZED_DAGS": False,
}

ENVIRONMENT_CONFIGURATION_2 = {
    "composer_version": "1.10.4",
    "airflow_version": "1.10.6",
    "python_version": "2.7.12",
    "environment_size": "small",
    "elastic_dag_configuration_type": "no_structure__200_dags__1_tasks__1_dag_runs__0.0_sleep__python_operator",
    "private_ip_enabled": False,
    "drs_enabled": False,
    "AIRFLOW__CORE__STORE_SERIALIZED_DAGS": False,
}

# set of metrics that refer to time series data
TIME_SERIES_METRICS = {
    "k8s_node__CUMULATIVE__kubernetes_io_node_cpu_core_usage_time",
    "k8s_node__CUMULATIVE__kubernetes_io_node_network_received_bytes_count",
    "k8s_node__CUMULATIVE__kubernetes_io_node_network_sent_bytes_count",
    "k8s_node__GAUGE__kubernetes_io_node_memory_used_bytes__memory_type_evictable", 
    "k8s_node__GAUGE__kubernetes_io_node_memory_used_bytes__memory_type_non_evictable",
    "k8s_pod__CUMULATIVE__kubernetes_io_pod_network_received_bytes_count",
    "k8s_pod__CUMULATIVE__kubernetes_io_pod_network_sent_bytes_count",
    "k8s_container__CUMULATIVE__kubernetes_io_container_cpu_core_usage_time",
    "k8s_container__GAUGE__kubernetes_io_container_memory_used_bytes__memory_type_evictable",
    "k8s_container__GAUGE__kubernetes_io_container_memory_used_bytes__memory_type_non_evictable",   
}

# set of metrics that refer to the whole test attempt;
# their average across diferent test attempts will be calculated
GENERAL_METRICS = {
    "test_duration",
    "dag_run_average_duration",
    "task_instance_average_duration",
}

# group filters should only include the columns from group_by_keys list
RESOURCES_DICT = {
    "k8s_node": {
        "group_by_columns": ["uuid", "node_name"]
    },
    "k8s_pod": {
        "group_by_columns": ["uuid", "pod_name"],
        "group_filters": {
            "airflow-scheduler": {
                "pod_name_prefix": "airflow-scheduler"
            },
            "airflow-worker": {
                "pod_name_prefix": "airflow-worker"
            },
        }
    },
    "k8s_container": {
        "group_by_columns": ["uuid", "pod_name", "container_name"],
        "group_filters": {
            "airflow-scheduler": {
                "container_name": "airflow-scheduler"
            },
            "airflow-worker": {
                "container_name": "airflow-worker"
            },
        }
    }
}

assign_time_series_metrics(RESOURCES_DICT, TIME_SERIES_METRICS)

Collect data from BQ and save it in DF variable


In [31]:
CLIENT = bigquery.Client(project=PROJECT_ID)

QUERY = f'SELECT * FROM `{PROJECT_ID}.{DATASET}.*`'
PERFORMANCE_TEST_DATA = CLIENT.query(QUERY).to_dataframe()

# replace nans in configuration columns added at later point with their default values
for column in CONFIGURATION_COLUMNS_DEFAULT_VALUES:
    if CONFIGURATION_COLUMNS_DEFAULT_VALUES[column] is not None:
        PERFORMANCE_TEST_DATA[column] = PERFORMANCE_TEST_DATA[column].fillna(
            CONFIGURATION_COLUMNS_DEFAULT_VALUES[column]
        )

Compare results for two configurations

In [115]:
comparison_df, test_attempts_per_missing_conf_1, test_attempts_per_missing_conf_2 = compare_performance_of_two_configurations(
    all_perf_test_data=PERFORMANCE_TEST_DATA,
    resources_dict=RESOURCES_DICT,
    general_metrics=GENERAL_METRICS,
    env_conf_1=ENVIRONMENT_CONFIGURATION_1,
    env_conf_2=ENVIRONMENT_CONFIGURATION_2,
    include_test_attempts_with_failed_dag_runs=False,
)

print(tabulate(comparison_df, headers='keys', tablefmt='psql', floatfmt=f".{PRECISION}f"))

if test_attempts_per_missing_conf_1 is not None:
    print()
    print("Detailed information about configurations belonging to first configuration dict.")
    print(tabulate(test_attempts_per_missing_conf_1, headers='keys', tablefmt='psql'))

if test_attempts_per_missing_conf_2 is not None:
    print()
    print("Detailed information about configurations belonging to second configuration dict.")
    print(tabulate(test_attempts_per_missing_conf_2, headers='keys', tablefmt='psql'))

+----+---------------------------------------------------------------------------------------------------------------+-------------------+-------------------+--------------+
|    | category                                                                                                      |   configuration_1 |   configuration_2 | difference   |
|----+---------------------------------------------------------------------------------------------------------------+-------------------+-------------------+--------------|
|  0 | airflow-scheduler__k8s_container__CUMULATIVE__kubernetes_io_container_cpu_core_usage_time_per_second          |            0.2589 |            0.2524 | -2.5106%     |
|  1 | airflow-scheduler__k8s_container__GAUGE__kubernetes_io_container_memory_used_bytes__memory_type_evictable     |      1105011.8095 |      1183470.3915 | +7.1002%     |
|  2 | airflow-scheduler__k8s_container__GAUGE__kubernetes_io_container_memory_used_bytes__memory_type_non_evictable |    24530784

Analyze results for single configuration

In [None]:
configuration_data = collect_results_for_configuration(PERFORMANCE_TEST_DATA, ENVIRONMENT_CONFIGURATION_1)

uuid_results_1, configuration_1_total_results = calculate_statistics_for_configuration(configuration_data, RESOURCES_DICT, GENERAL_METRICS)

print(json.dumps(configuration_1_total_results, indent=4, sort_keys=True))
print()
print(json.dumps(uuid_results_1, indent=4, sort_keys=True))