/
identifiers.py
211 lines (178 loc) · 8.97 KB
/
identifiers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from random import Random
from textwrap import dedent
from typing import Any
from pg8000.converters import literal # type: ignore
from materialize.checks.actions import Testdrive
from materialize.checks.checks import Check
from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD
from materialize.checks.executors import Executor
from materialize.mz_version import MzVersion
from materialize.util import naughty_strings
def dq(ident: str) -> str:
ident = ident.replace('"', '""')
return f'"{ident}"'
def dq_print(ident: str) -> str:
ident = ident.replace("\\", "\\\\")
ident = ident.replace('"', '\\"')
return f'"{ident}"'
def sq(ident: str) -> Any:
return literal(ident)
def schemas() -> str:
return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
def cluster() -> str:
return "> CREATE CLUSTER identifiers SIZE '4'\n"
class Identifiers(Check):
def _can_run(self, e: Executor) -> bool:
# CREATE ROLE not compatible with older releases
return self.base_version >= MzVersion.parse_mz("v0.47.0-dev")
IDENT_KEYS = [
"db",
"schema",
"type",
"table",
"column",
"value1",
"value2",
# "source",
"source_view",
"kafka_conn",
"csr_conn",
"secret",
# "secret_value",
"mv0",
"mv1",
"mv2",
"sink0",
"sink1",
"sink2",
"alias",
"role",
"comment_table",
"comment_column",
]
def __init__(self, base_version: MzVersion, rng: Random | None) -> None:
strings = naughty_strings()
values = (rng or Random(0)).sample(strings, len(self.IDENT_KEYS))
self.ident = {
key: value.encode("utf-8")[:255].decode("utf-8", "ignore")
for key, value in zip(self.IDENT_KEYS, values)
}
# ERROR: invalid input syntax for type bytea: invalid escape sequence
self.ident["secret_value"] = "secret_value"
# https://github.com/MaterializeInc/materialize/issues/22535
self.ident["source"] = "source"
super().__init__(base_version, rng)
def initialize(self) -> Testdrive:
cmds = f"""
> SET cluster=identifiers;
> CREATE ROLE {dq(self.ident["role"])};
> CREATE DATABASE {dq(self.ident["db"])};
> SET DATABASE={dq(self.ident["db"])};
> CREATE SCHEMA {dq(self.ident["schema"])};
> CREATE TYPE {dq(self.ident["type"])} AS LIST (ELEMENT TYPE = text);
> CREATE TABLE {dq(self.ident["schema"])}.{dq(self.ident["table"])} ({dq(self.ident["column"])} TEXT, c2 {dq(self.ident["type"])});
> INSERT INTO {dq(self.ident["schema"])}.{dq(self.ident["table"])} VALUES ({sq(self.ident["value1"])}, LIST[{sq(self.ident["value2"])}]::{dq(self.ident["type"])});
> CREATE MATERIALIZED VIEW {dq(self.ident["schema"])}.{dq(self.ident["mv0"])} IN CLUSTER {self._default_cluster()} AS
SELECT COUNT({dq(self.ident["column"])}) FROM {dq(self.ident["schema"])}.{dq(self.ident["table"])};
$ kafka-create-topic topic=sink-source-ident
$ kafka-ingest format=avro key-format=avro topic=sink-source-ident key-schema=${{keyschema}} schema=${{schema}} repeat=1000
{{"key1": "U2${{kafka-ingest.iteration}}"}} {{"f1": "A${{kafka-ingest.iteration}}"}}
> CREATE CONNECTION IF NOT EXISTS {dq(self.ident["kafka_conn"])} FOR KAFKA {self._kafka_broker()};
> CREATE CONNECTION IF NOT EXISTS {dq(self.ident["csr_conn"])} FOR CONFLUENT SCHEMA REGISTRY URL '${{testdrive.schema-registry-url}}';
> CREATE SOURCE {dq(self.ident["source"])}
IN CLUSTER identifiers
FROM KAFKA CONNECTION {dq(self.ident["kafka_conn"])} (TOPIC 'testdrive-sink-source-ident-${{testdrive.seed}}')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {dq(self.ident["csr_conn"])}
ENVELOPE UPSERT;
> CREATE MATERIALIZED VIEW {dq(self.ident["source_view"])} IN CLUSTER {self._default_cluster()} AS
SELECT LEFT(key1, 2) as l_k, LEFT(f1, 1) AS l_v, COUNT(*) AS c FROM {dq(self.ident["source"])} GROUP BY LEFT(key1, 2), LEFT(f1, 1);
> CREATE SINK {dq(self.ident["schema"])}.{dq(self.ident["sink0"])}
IN CLUSTER identifiers
FROM {dq(self.ident["source_view"])}
INTO KAFKA CONNECTION {dq(self.ident["kafka_conn"])} (TOPIC 'sink-sink-ident0')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {dq(self.ident["csr_conn"])}
ENVELOPE DEBEZIUM;
"""
if self.base_version >= MzVersion(0, 44, 0):
cmds += f"""
> CREATE SECRET {dq(self.ident["secret"])} as {sq(self.ident["secret_value"])};
"""
if self.base_version >= MzVersion(0, 72, 0):
cmds += f"""
> COMMENT ON TABLE {dq(self.ident["schema"])}.{dq(self.ident["table"])} IS {sq(self.ident["comment_table"])};
> COMMENT ON COLUMN {dq(self.ident["schema"])}.{dq(self.ident["table"])}.{dq(self.ident["column"])} IS {sq(self.ident["comment_column"])};
"""
return Testdrive(schemas() + cluster() + dedent(cmds))
def manipulate(self) -> list[Testdrive]:
cmds = [
f"""
> SET CLUSTER=identifiers;
> SET DATABASE={dq(self.ident["db"])};
> CREATE MATERIALIZED VIEW {dq(self.ident["schema"])}.{dq(self.ident["mv" + i])} IN CLUSTER {self._default_cluster()} AS
SELECT {dq(self.ident["column"])}, c2 as {dq(self.ident["alias"])} FROM {dq(self.ident["schema"])}.{dq(self.ident["table"])};
> INSERT INTO {dq(self.ident["schema"])}.{dq(self.ident["table"])} VALUES ({sq(self.ident["value1"])}, LIST[{sq(self.ident["value2"])}]::{dq(self.ident["type"])});
> CREATE SINK {dq(self.ident["schema"])}.{dq(self.ident["sink" + i])}
IN CLUSTER identifiers
FROM {dq(self.ident["source_view"])}
INTO KAFKA CONNECTION {dq(self.ident["kafka_conn"])} (TOPIC 'sink-sink-ident')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {dq(self.ident["csr_conn"])}
ENVELOPE DEBEZIUM;
"""
for i in ["1", "2"]
]
return [Testdrive(dedent(s)) for s in cmds]
def validate(self) -> Testdrive:
cmds = f"""
> SHOW DATABASES WHERE name NOT LIKE 'to_be_created%' AND name NOT LIKE 'owner_db%' AND name NOT LIKE 'privilege_db%' AND name <> 'defpriv_db';
materialize
{dq_print(self.ident["db"])}
> SET DATABASE={dq(self.ident["db"])};
> SELECT name FROM mz_roles WHERE name = {sq(self.ident["role"])}
{dq_print(self.ident["role"])}
> SHOW TYPES;
{dq_print(self.ident["type"])}
> SHOW SCHEMAS FROM {dq(self.ident["db"])};
public
information_schema
mz_catalog
mz_unsafe
mz_internal
pg_catalog
{dq_print(self.ident["schema"])}
> SHOW SINKS FROM {dq(self.ident["schema"])};
{dq_print(self.ident["sink0"])} kafka 4 identifiers
{dq_print(self.ident["sink1"])} kafka 4 identifiers
{dq_print(self.ident["sink2"])} kafka 4 identifiers
> SELECT * FROM {dq(self.ident["schema"])}.{dq(self.ident["mv0"])};
3
> SELECT {dq(self.ident["column"])}, {dq(self.ident["alias"])}[1] FROM {dq(self.ident["schema"])}.{dq(self.ident["mv1"])};
{dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
{dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
{dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
> SELECT {dq(self.ident["column"])}, {dq(self.ident["alias"])}[1] FROM {dq(self.ident["schema"])}.{dq(self.ident["mv2"])};
{dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
{dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
{dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
> SELECT * FROM {dq(self.ident["source_view"])};
U2 A 1000
"""
if self.base_version >= MzVersion(0, 72, 0):
cmds += f"""
> SELECT object_sub_id, comment FROM mz_internal.mz_comments JOIN mz_tables ON mz_internal.mz_comments.id = mz_tables.id WHERE name = {sq(self.ident["table"])};
<null> {dq_print(self.ident["comment_table"])}
1 {dq_print(self.ident["comment_column"])}
"""
if self.base_version >= MzVersion(0, 44, 0):
cmds += f"""
> SHOW SECRETS;
{dq_print(self.ident["secret"])}
"""
return Testdrive(dedent(cmds))