From 0a1f79b774d5afd58a6f0325066be1aa8be23471 Mon Sep 17 00:00:00 2001 From: MeredithAnya Date: Tue, 5 Mar 2024 15:51:35 -0800 Subject: [PATCH] ref(ch-upgrades): update query_comparer (#5584) * ref(ch-upgrades): update query_comparer * have two separate reports * lotsa updates * update docstring for matched_pairs --- snuba/cli/query_comparer.py | 449 ++++++++++-------- snuba/clickhouse/upgrades/comparisons.py | 66 ++- tests/clickhouse/upgrades/test_comparisons.py | 158 ++++++ 3 files changed, 466 insertions(+), 207 deletions(-) diff --git a/snuba/cli/query_comparer.py b/snuba/cli/query_comparer.py index fb0a33104c..a43236f194 100644 --- a/snuba/cli/query_comparer.py +++ b/snuba/cli/query_comparer.py @@ -1,136 +1,66 @@ -import csv import itertools from dataclasses import dataclass -from typing import Any, NamedTuple, Optional, Sequence, Union +from typing import Any, MutableMapping, Optional, Sequence, Tuple import click import structlog from snuba import settings from snuba.admin.notifications.slack.client import SlackClient +from snuba.clickhouse.upgrades.comparisons import ( + BlobGetter, + DataMismatchResult, + FileFormat, + FileManager, + PerfMismatchResult, + QueryMeasurementResult, +) from snuba.environment import setup_logging, setup_sentry +from snuba.utils.gcs import GCSUploader logger = structlog.get_logger().bind(module=__name__) - -@dataclass(frozen=True) -class QueryMeasurement: - query_id: str - query_duration_ms: int - result_rows: int - result_bytes: int - read_rows: int - read_bytes: int - - -@dataclass(frozen=True) -class MismatchedValues: - base_value: int - new_value: int - delta: int - - -class PerformanceMismatchResult(NamedTuple): - query_id: str - duration_ms_base: int - duration_ms_new: int - read_rows_base: int - read_rows_new: int - read_bytes_base: int - read_bytes_new: int - duration_ms_delta: int - read_rows_delta: int - read_bytes_delta: int - - -class DataMismatchResult(NamedTuple): - query_id: str - result_rows_base: int - result_rows_new: int - result_bytes_base: int - result_bytes_new: int - result_rows_delta: int - result_bytes_delta: int +MismatchResultSet = Sequence[PerfMismatchResult | DataMismatchResult] @dataclass -class QueryMismatch: - query_id: str - query_duration_ms: MismatchedValues - result_rows: MismatchedValues - result_bytes: MismatchedValues - read_rows: MismatchedValues - read_bytes: MismatchedValues - - def performance_mismatches(self) -> PerformanceMismatchResult: - return PerformanceMismatchResult( - self.query_id, - self.query_duration_ms.base_value, - self.query_duration_ms.new_value, - self.read_rows.base_value, - self.read_rows.new_value, - self.read_bytes.base_value, - self.read_bytes.new_value, - self.query_duration_ms.delta, - self.read_rows.delta, - self.read_bytes.delta, - ) - - def data_mismatches(self) -> DataMismatchResult: - return DataMismatchResult( - self.query_id, - self.result_rows.base_value, - self.result_rows.new_value, - self.result_bytes.base_value, - self.result_bytes.new_value, - self.result_rows.delta, - self.result_bytes.delta, - ) - +class TableTotals: + total_queries: int = 0 + total_data_mismatches: int = 0 + total_perf_mismatches: int = 0 -MEASUREMENTS = [ - "query_duration_ms", - "result_rows", - "result_bytes", - "read_rows", - "read_bytes", -] + def add(self, queries: int, data_mismatches: int, perf_mismatches: int) -> None: + self.total_queries += queries + self.total_data_mismatches += data_mismatches + self.total_perf_mismatches += perf_mismatches -def write_querylog_comparison_results_to_csv( - results: Union[Sequence[DataMismatchResult], Sequence[PerformanceMismatchResult]], - filename: str, - header_row: Sequence[str], -) -> None: - with open(filename, mode="w") as file: - writer = csv.writer(file) - writer.writerow(header_row) - for row in results: - writer.writerow(row) +TOTALS_BY_TABLES: MutableMapping[str, TableTotals] = {} @click.command() @click.option( - "--base-file", - help="Clickhouse queries results from base version.", + "--gcs-bucket", + help="Name of gcs bucket to read query results from, and upload compared files to.", required=True, ) @click.option( - "--upgrade-file", - help="Clickhouse queries results from upgrade version.", - required=True, + "--compare-type", + help="Run comparisons on performance, data consistency or both (default).", + type=click.Choice(["perf", "data"]), ) @click.option( - "--table", - help="Clickhouse queries results from upgrade version.", - required=True, + "--override", + help="Option to override any previously re-run results", + is_flag=True, + default=False, ) @click.option("--log-level", help="Logging level to use.") def query_comparer( *, - base_file: str, - upgrade_file: str, - table: str, + gcs_bucket: str, + compare_type: str, + override: bool, log_level: Optional[str] = None, ) -> None: """ @@ -139,86 +69,242 @@ def query_comparer( setup_logging(log_level) setup_sentry() - base = open(base_file) - upgrade = open(upgrade_file) + uploader = GCSUploader(gcs_bucket) + file_manager = FileManager(uploader) + blob_getter = BlobGetter(uploader) - base_reader = csv.reader(base) - upgrade_reader = csv.reader(upgrade) + blob_prefixes = blob_getter._get_sub_dir_prefixes(prefix="") + result_prefixes = sorted([p for p in blob_prefixes if p.startswith("results")]) - def query_measurement(row: Any) -> QueryMeasurement: - return QueryMeasurement( - query_id=row[0], - query_duration_ms=int(row[1]), - result_rows=int(row[2]), - result_bytes=int(row[3]), - read_rows=int(row[4]), - read_bytes=int(row[5]), - ) + if len(result_prefixes) != 2: + raise Exception("Not enough results to compare.") - mismatches = [] - total_rows = 0 - for v1_row, v2_row in itertools.zip_longest(base_reader, upgrade_reader): - if v1_row[0] == "query_id": - # csv header row - continue + def get_matched_pairs() -> Sequence[Tuple[str, str]]: + """ + In order to compare results, we need results from both + clickhouse versions: e.g. 21.8 & 22.8 + This function finds the matched blob pairs of results + by looking at the blob names. - assert v1_row[0] == v2_row[0], "Invalid query_ids: ids must match" - total_rows += 1 + returns a sequence of pairs: + [ + ("results-21-8/2024_02_15/meredith_test_22.csv", "results-22-8/2024_02_15/meredith_test_22.csv"), + ("results-21-8/2024_02_15/meredith_test_23.csv", "results-22-8/2024_02_15/meredith_test_23.csv"), + ] + """ + matches = [] + v1_prefix, v2_prefix = result_prefixes + for v1_name in blob_getter.get_all_names(prefix=v1_prefix): + # the blobs are named the same except for the prefix, + # e.g. results-21-8/ vs results-22-8/ + v2_name = v1_name.replace(v1_prefix, v2_prefix) + if uploader.blob_exists(v2_name): + matches.append((v1_name, v2_name)) + return matches + + matched_pairs = get_matched_pairs() + for (v1_blob, v2_blob) in matched_pairs: + blob_suffix = v1_blob.split("/", 1)[-1] + compared_perf_blob = f"compared-perf/{blob_suffix}" + compared_data_blob = f"compared-data/{blob_suffix}" + perf_blob_exists = uploader.blob_exists(compared_perf_blob) + data_blob_exists = uploader.blob_exists(compared_data_blob) + + if perf_blob_exists and data_blob_exists and not override: + continue - mismatches_exist = any( - [1 for i in range(1, len(v1_row)) if v1_row[i] != v2_row[i]] - ) - if not mismatches_exist: + if compare_type == "perf" and perf_blob_exists and not override: continue - v1_data = query_measurement(v1_row) - v2_data = query_measurement(v2_row) + if compare_type == "data" and data_blob_exists and not override: + continue - mismatches.append(_create_mismatch(v1_data, v2_data)) + result_file_format = file_manager.parse_blob_name(v1_blob) + v1_results = file_manager.download(v1_blob) + v2_results = file_manager.download(v2_blob) + + perf_mismatches = [] + data_mismatches = [] + total_rows = 0 + for v1_result, v2_result in itertools.zip_longest(v1_results, v2_results): + assert ( + v1_result.query_id == v2_result.query_id + ), f"Invalid query_ids: {v1_result.query_id} and {v2_result.query_id} must match" + total_rows += 1 + if v1_result == v2_result: + continue + + assert isinstance(v1_result, QueryMeasurementResult) + assert isinstance(v2_result, QueryMeasurementResult) + + perf_mismatch = create_perf_mismatch(v1_result, v2_result) + data_mismatch = create_data_mismatch(v1_result, v2_result) + if perf_mismatch: + perf_mismatches.append(perf_mismatch) + if data_mismatch: + data_mismatches.append(data_mismatch) + + def save_comparisons(compare_type: str, mismatches: MismatchResultSet) -> None: + compared_file_format = FileFormat( + f"compared-{compare_type}", + result_file_format.date, + result_file_format.table, + result_file_format.hour, + ) + file_manager.save(compared_file_format, mismatches, header_row=True) + if mismatches: + # need the filename for the slack report + filename = file_manager.format_filename(compared_file_format) + message = _format_file_slack_overview( + total_rows, mismatches, result_file_format.table, compare_type + ) + _send_slack_report(message, filename) + + totals_by_table = TOTALS_BY_TABLES.get(result_file_format.table) + if not totals_by_table: + totals_by_table = TableTotals() + TOTALS_BY_TABLES[result_file_format.table] = totals_by_table + + totals_by_table.add( + queries=total_rows, + data_mismatches=len(data_mismatches), + perf_mismatches=len(perf_mismatches), + ) + if compare_type == "perf": + save_comparisons(compare_type, perf_mismatches) + elif compare_type == "data": + save_comparisons(compare_type, data_mismatches) + else: + # no compare type == run both reports + save_comparisons("perf", perf_mismatches) + save_comparisons("data", data_mismatches) + + for table in TOTALS_BY_TABLES.keys(): + message = _format_table_slack_overview(table, TOTALS_BY_TABLES[table]) + _send_slack_report(message) + + +PERF_PERCENT_THRESHOLD = 10 + +DATA_THRESHOLDS = { + "result_rows": 0, + "result_bytes": 0, +} + + +def create_perf_mismatch( + v1_data: QueryMeasurementResult, v2_data: QueryMeasurementResult +) -> Optional[PerfMismatchResult]: + duration_ms = v1_data.query_duration_ms + duration_ms_new = v2_data.query_duration_ms + duration_ms_delta = duration_ms_new - duration_ms + + read_rows = v1_data.read_rows + read_rows_new = v2_data.read_rows + read_rows_delta = read_rows_new - read_rows + + read_bytes = v1_data.read_bytes + read_bytes_new = v2_data.read_bytes + read_bytes_delta = read_bytes_new - read_bytes + + duration_percent = (duration_ms_delta / duration_ms) * 100 + # we use the delta as well as the percent threshold because for + # super fast queries, 10ms diff is not that big of a deal but the + # percentage would exceed the threshold. + if duration_percent > PERF_PERCENT_THRESHOLD and (duration_ms_delta > 10): + return PerfMismatchResult( + v1_data.query_id, + duration_ms=duration_ms, + duration_ms_new=duration_ms_new, + read_rows=read_rows, + read_rows_new=read_rows_new, + read_bytes=read_bytes, + read_bytes_new=read_bytes_new, + duration_ms_delta=duration_ms_delta, + read_rows_delta=read_rows_delta, + read_bytes_delta=read_bytes_delta, + ) + return None - _send_slack_report(total_rows, mismatches, table) +def create_data_mismatch( + v1_data: QueryMeasurementResult, v2_data: QueryMeasurementResult +) -> Optional[DataMismatchResult]: + result_rows = v1_data.result_rows + result_rows_new = v2_data.result_rows + result_rows_delta = result_rows_new - result_rows -def _create_mismatch( - v1_data: QueryMeasurement, v2_data: QueryMeasurement -) -> QueryMismatch: - mismatch = {} - for m in MEASUREMENTS: - v1_value = v1_data.__getattribute__(m) - v2_value = v2_data.__getattribute__(m) + result_bytes = v1_data.result_bytes + result_bytes_new = v2_data.result_bytes + result_bytes_delta = result_bytes_new - result_bytes - delta = v2_value - v1_value - mismatched_values = MismatchedValues( - v1_value, - v2_value, - delta, + # higher or lower amount of rows/bytes is considered a mismatch + if (abs(result_rows_delta) > DATA_THRESHOLDS["result_rows"]) or abs( + result_bytes_delta + ) > DATA_THRESHOLDS["result_bytes"]: + return DataMismatchResult( + v1_data.query_id, + result_rows=result_rows, + result_rows_new=result_rows_new, + result_bytes=result_bytes, + result_bytes_new=result_bytes_new, + result_rows_delta=result_rows_delta, + result_bytes_delta=result_bytes_delta, ) - mismatch[m] = mismatched_values - return QueryMismatch( - query_id=v1_data.query_id, - query_duration_ms=mismatch["query_duration_ms"], - result_rows=mismatch["result_rows"], - result_bytes=mismatch["result_bytes"], - read_rows=mismatch["read_rows"], - read_bytes=mismatch["read_bytes"], - ) + return None -def _format_slack_overview( - total_rows: int, mismatches: Sequence[QueryMismatch], table: str +def _format_table_slack_overview( + table: str, + totals: TableTotals, +) -> Any: + print("TALBLSB", totals) + per_data_mismatches = (totals.total_data_mismatches / totals.total_queries) * 100 + per_perf_mismatches = (totals.total_perf_mismatches / totals.total_queries) * 100 + total_queries = f"Total Queries: `{totals.total_queries}`\n" + total_data_mismatches = f"Total Data Mismatches: `{totals.total_data_mismatches}`\n" + total_perf_mismatches = f"Total Perf Mismatches: `{totals.total_perf_mismatches}`\n" + percent_data_mismatches = f"% Data Mismatch: `{per_data_mismatches}`\n" + percent_perf_mismatches = f"% Perf Mismatch: `{per_perf_mismatches}`\n" + overview_text = f"{total_queries} {total_data_mismatches} {percent_data_mismatches} {total_perf_mismatches} {percent_perf_mismatches}" + return { + "blocks": [ + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": f"*Table Comparison Overview - {table} `21.8` v `22.8`*\n", + }, + }, + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": overview_text, + }, + }, + ] + } + + +def _format_file_slack_overview( + total_rows: int, + mismatches: MismatchResultSet, + table: str, + compare_type: str, ) -> Any: queries = total_rows num_mismatches = len(mismatches) per_mismatches = len(mismatches) / total_rows - overview_text = f"*Overview: `{table}`*\n Total Queries: `{queries}`\n Total Mismatches: `{num_mismatches}`\n% Mismatch: `{per_mismatches}`" + overview_text = f"*`{table}`*\n Total Queries: `{queries}`\n Total Mismatches: `{num_mismatches}`\n% Mismatch: `{per_mismatches}`" return { "blocks": [ { "type": "section", "text": { "type": "mrkdwn", - "text": "*Querylog Comparison `21.8` v `22.8`*\n", + "text": f"*Querylog Comparison - {compare_type.upper()} `21.8` v `22.8`*\n", }, }, { @@ -233,60 +319,19 @@ def _format_slack_overview( def _send_slack_report( - total_rows: int, mismatches: Sequence[QueryMismatch], table: str + message: Any, + filename: Optional[str] = None, ) -> None: slack_client = SlackClient( channel_id=settings.SNUBA_SLACK_CHANNEL_ID, token=settings.SLACK_API_TOKEN ) - slack_client.post_message( - message=_format_slack_overview(total_rows, mismatches, table) - ) + slack_client.post_message(message=message) - if not mismatches: - return - - p_header_row = [ - "query_id", - "ms_1", - "ms_2", - "rows1", - "rows2", - "bytes1", - "bytes2", - "duration_delta", - "read_rows_delta", - "read_bytes_delta", - ] - p_results: Sequence[PerformanceMismatchResult] = [ - m.performance_mismatches() for m in mismatches - ] - - d_header_row = [ - "query_id", - "rows1", - "rows2", - "bytes1", - "bytes2", - "result_rows_delta", - "result_bytes_delta", - ] - d_results: Sequence[DataMismatchResult] = [m.data_mismatches() for m in mismatches] - - p_filename = f"perf_{table}.csv" - d_filename = f"data_{table}.csv" - - for results, header_row, filename in [ - (p_results, p_header_row, p_filename), - (d_results, d_header_row, d_filename), - ]: - assert isinstance(results, (PerformanceMismatchResult, DataMismatchResult)) - write_querylog_comparison_results_to_csv( - results, f"/tmp/{filename}", header_row - ) + if filename: slack_client.post_file( file_name=filename, file_path=f"/tmp/{filename}", file_type="csv", - initial_comment=f"Querylog Result Report: {filename}", + initial_comment=f"report from: {filename}", ) diff --git a/snuba/clickhouse/upgrades/comparisons.py b/snuba/clickhouse/upgrades/comparisons.py index 38a29c70b7..e03db6e027 100644 --- a/snuba/clickhouse/upgrades/comparisons.py +++ b/snuba/clickhouse/upgrades/comparisons.py @@ -11,6 +11,38 @@ logger = structlog.get_logger().bind(module=__name__) +@dataclass(frozen=True) +class MismatchedValues: + base_value: int + new_value: int + delta: int + + +@dataclass +class PerfMismatchResult: + query_id: str + duration_ms: int + duration_ms_new: int + read_rows: int + read_rows_new: int + read_bytes: int + read_bytes_new: int + duration_ms_delta: int + read_rows_delta: int + read_bytes_delta: int + + +@dataclass +class DataMismatchResult: + query_id: str + result_rows: int + result_rows_new: int + result_bytes: int + result_bytes_new: int + result_rows_delta: int + result_bytes_delta: int + + @dataclass class QueryInfoResult: query_str: str @@ -26,12 +58,31 @@ class QueryMeasurementResult: read_rows: int read_bytes: int - -Results = Union[QueryInfoResult, QueryMeasurementResult] + def __eq__(self, other: object) -> bool: + if not isinstance(other, QueryMeasurementResult): + return NotImplemented + if self.query_duration_ms != other.query_duration_ms: + return False + if self.result_rows != other.result_rows: + return False + if self.result_bytes != other.result_bytes: + return False + if self.read_rows != other.read_rows: + return False + if self.read_bytes != other.read_bytes: + return False + return True + + +Results = Union[ + QueryInfoResult, QueryMeasurementResult, DataMismatchResult, PerfMismatchResult +] DIRECTORY_RESULT_TYPES: Dict[str, Type[Results]] = { "queries": QueryInfoResult, "results": QueryMeasurementResult, + "compared-data": DataMismatchResult, + "compared-perf": PerfMismatchResult, } @@ -81,7 +132,7 @@ def _save_to_csv( with open(self._full_path(filename), mode="w") as file: writer = csv.writer(file) if header_row: - fields = list(dataclasses.fields(result_type)) # mypy ig + fields = [f.name for f in dataclasses.fields(result_type)] # mypy ig writer.writerow(fields) for row in results: writer.writerow(dataclasses.astuple(row)) @@ -126,13 +177,18 @@ def parse_blob_name(self, blob_name: str) -> FileFormat: directory, datetime.strptime(date, "%Y_%m_%d"), table, int(hour) ) - def save(self, file_format: FileFormat, results: Sequence[Results]) -> None: + def save( + self, + file_format: FileFormat, + results: Sequence[Results], + header_row: bool = False, + ) -> None: """ First save the results to local csv file, then upload the file to gcs bucket. """ filename = self.format_filename(file_format) - self._save_to_csv(filename, results) + self._save_to_csv(filename, results, header_row) blob_name = self._format_blob_name(file_format) self._save_to_gcs(filename, blob_name) diff --git a/tests/clickhouse/upgrades/test_comparisons.py b/tests/clickhouse/upgrades/test_comparisons.py index dfb3f9ee13..d6c4761f18 100644 --- a/tests/clickhouse/upgrades/test_comparisons.py +++ b/tests/clickhouse/upgrades/test_comparisons.py @@ -2,11 +2,19 @@ from typing import Optional from unittest.mock import Mock, patch +import pytest + +from snuba.cli.query_comparer import ( + TableTotals, + create_data_mismatch, + create_perf_mismatch, +) from snuba.clickhouse.upgrades.comparisons import ( BlobGetter, FileFormat, FileManager, QueryInfoResult, + QueryMeasurementResult, ) from snuba.utils.gcs import Blobs @@ -127,3 +135,153 @@ def test_blob_getter(mock_uploader: Mock) -> None: "queries/2024_02_16/meredith_test_1.csv", "queries/2024_02_16/meredith_test_2.csv", ] + + +perf_mismatch_tests = [ + pytest.param( + 30, + 45, + True, + id="Slower duration - over threshold, over 10ms delta", + ), + pytest.param( + 30, + 35, + False, + id="Slower duration - over threshold, under 10ms delta ", + ), + pytest.param( + 30, + 32, + False, + id="Slower duration - under threshold, under 10ms delta ", + ), + pytest.param( + 30, + 20, + False, + id="Faster duration", + ), +] + + +@pytest.mark.parametrize( + "first_duration, second_duration, is_mismatch", perf_mismatch_tests +) +def test_perf_mismatch( + first_duration: int, second_duration: int, is_mismatch: bool +) -> None: + first_measurement = QueryMeasurementResult( + query_id="xxxx-xxxx-xxxx-xxxx", + query_duration_ms=first_duration, + result_rows=0, + result_bytes=0, + read_rows=0, + read_bytes=0, + ) + second_measurement = QueryMeasurementResult( + query_id="xxxx-xxxx-xxxx-xxxx", + query_duration_ms=second_duration, + result_rows=0, + result_bytes=0, + read_rows=0, + read_bytes=0, + ) + + mismatch = create_perf_mismatch(first_measurement, second_measurement) + if is_mismatch: + assert mismatch + else: + assert mismatch is None + + +data_mismatch_tests = [ + pytest.param( + 0, + 0, + 12, + 16, + True, + id="Higher result_bytes", + ), + pytest.param( + 0, + 0, + 12, + 10, + True, + id="Lower result_bytes", + ), + pytest.param( + 10, + 12, + 0, + 0, + True, + id="Higher result_rows", + ), + pytest.param( + 10, + 8, + 0, + 0, + True, + id="Lower results_rows", + ), + pytest.param( + 10, + 10, + 12, + 12, + False, + id="Equal result rows & bytes", + ), +] + + +@pytest.mark.parametrize( + "first_result_rows, second_result_rows, first_result_bytes, second_result_bytes, is_mismatch", + data_mismatch_tests, +) +def test_data_mismatch( + first_result_rows: int, + second_result_rows: int, + first_result_bytes: int, + second_result_bytes: int, + is_mismatch: bool, +) -> None: + first_measurement = QueryMeasurementResult( + query_id="xxxx-xxxx-xxxx-xxxx", + query_duration_ms=0, + result_rows=first_result_rows, + result_bytes=first_result_bytes, + read_rows=0, + read_bytes=0, + ) + second_measurement = QueryMeasurementResult( + query_id="xxxx-xxxx-xxxx-xxxx", + query_duration_ms=0, + result_rows=second_result_rows, + result_bytes=second_result_bytes, + read_rows=0, + read_bytes=0, + ) + + mismatch = create_data_mismatch(first_measurement, second_measurement) + if is_mismatch: + assert mismatch + else: + assert mismatch is None + + +def test_table_totals() -> None: + totals = TableTotals() + totals.add(10, 5, 6) + assert totals.total_queries == 10 + assert totals.total_data_mismatches == 5 + assert totals.total_perf_mismatches == 6 + + totals.add(10, 5, 6) + assert totals.total_queries == 20 + assert totals.total_data_mismatches == 10 + assert totals.total_perf_mismatches == 12