-
Notifications
You must be signed in to change notification settings - Fork 453
/
postgres_consistency_test.py
136 lines (117 loc) · 4.87 KB
/
postgres_consistency_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import argparse
from pg8000 import Connection
from pg8000.exceptions import InterfaceError
from materialize.output_consistency.common.configuration import (
ConsistencyTestConfiguration,
)
from materialize.output_consistency.execution.evaluation_strategy import (
DataFlowRenderingEvaluation,
EvaluationStrategy,
)
from materialize.output_consistency.execution.sql_executor import create_sql_executor
from materialize.output_consistency.execution.sql_executors import SqlExecutors
from materialize.output_consistency.ignore_filter.inconsistency_ignore_filter import (
GenericInconsistencyIgnoreFilter,
)
from materialize.output_consistency.input_data.scenarios.evaluation_scenario import (
EvaluationScenario,
)
from materialize.output_consistency.input_data.test_input_data import (
ConsistencyTestInputData,
)
from materialize.output_consistency.output.output_printer import OutputPrinter
from materialize.output_consistency.output_consistency_test import (
OutputConsistencyTest,
connect,
)
from materialize.output_consistency.validation.result_comparator import ResultComparator
from materialize.postgres_consistency.custom.predefined_pg_queries import (
create_custom_pg_consistency_queries,
)
from materialize.postgres_consistency.execution.pg_evaluation_strategy import (
PgEvaluation,
)
from materialize.postgres_consistency.execution.pg_sql_executors import PgSqlExecutors
from materialize.postgres_consistency.ignore_filter.pg_inconsistency_ignore_filter import (
PgInconsistencyIgnoreFilter,
)
from materialize.postgres_consistency.validation.pg_result_comparator import (
PostgresResultComparator,
)
class PostgresConsistencyTest(OutputConsistencyTest):
def __init__(self) -> None:
self.pg_connection: Connection | None = None
def get_scenario(self) -> EvaluationScenario:
return EvaluationScenario.POSTGRES_CONSISTENCY
def create_sql_executors(
self,
config: ConsistencyTestConfiguration,
connection: Connection,
output_printer: OutputPrinter,
) -> SqlExecutors:
if self.pg_connection is None:
raise RuntimeError("Postgres connection is not initialized")
return PgSqlExecutors(
create_sql_executor(config, connection, output_printer, "mz"),
create_sql_executor(
config, self.pg_connection, output_printer, "pg", is_mz=False
),
)
def create_result_comparator(
self, ignore_filter: GenericInconsistencyIgnoreFilter
) -> ResultComparator:
return PostgresResultComparator(ignore_filter)
def create_inconsistency_ignore_filter(
self, sql_executors: SqlExecutors
) -> GenericInconsistencyIgnoreFilter:
return PgInconsistencyIgnoreFilter()
def create_evaluation_strategies(
self, sql_executors: SqlExecutors
) -> list[EvaluationStrategy]:
mz_evaluation_strategy = DataFlowRenderingEvaluation()
mz_evaluation_strategy.name = "Materialize evaluation"
mz_evaluation_strategy.simple_db_object_name = "mz_evaluation"
return [
# Materialize
mz_evaluation_strategy,
# Postgres
PgEvaluation(),
]
def create_input_data(self) -> ConsistencyTestInputData:
input_data = super().create_input_data()
input_data.predefined_queries.extend(create_custom_pg_consistency_queries())
return input_data
def main() -> int:
test = PostgresConsistencyTest()
parser = argparse.ArgumentParser(
prog="postgres-consistency-test",
formatter_class=argparse.RawDescriptionHelpFormatter,
description="Test the consistency of Materialize and Postgres",
)
parser.add_argument("--mz-host", default="localhost", type=str)
parser.add_argument("--mz-port", default=6875, type=int)
parser.add_argument("--pg-host", default="localhost", type=str)
parser.add_argument("--pg-port", default=5432, type=int)
parser.add_argument("--pg-password", default=None, type=str)
args = test.parse_output_consistency_input_args(parser)
try:
mz_db_user = "materialize"
mz_connection = connect(args.mz_host, args.mz_port, mz_db_user)
pg_db_user = "postgres"
test.pg_connection = connect(
args.pg_host, args.pg_port, pg_db_user, args.pg_password
)
except InterfaceError:
return 1
result = test.run_output_consistency_tests(mz_connection, args)
return 0 if result.all_passed() else 1
if __name__ == "__main__":
exit(main())