/
mysql_cdc_actions.py
89 lines (74 loc) · 3.33 KB
/
mysql_cdc_actions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import random
from textwrap import dedent
from materialize.mzcompose.composition import Composition
from materialize.mzcompose.services.mysql import MySql
from materialize.zippy.balancerd_capabilities import BalancerdIsRunning
from materialize.zippy.framework import Action, Capabilities, Capability
from materialize.zippy.mysql_capabilities import MySqlRunning, MySqlTableExists
from materialize.zippy.mysql_cdc_capabilities import MySqlCdcTableExists
from materialize.zippy.mz_capabilities import MzIsRunning
from materialize.zippy.replica_capabilities import source_capable_clusters
from materialize.zippy.storaged_capabilities import StoragedRunning
class CreateMySqlCdcTable(Action):
"""Creates a MySQL CDC source in Materialized."""
@classmethod
def requires(cls) -> set[type[Capability]]:
return {
BalancerdIsRunning,
MzIsRunning,
StoragedRunning,
MySqlRunning,
MySqlTableExists,
}
def __init__(self, capabilities: Capabilities) -> None:
mysql_table = random.choice(capabilities.get(MySqlTableExists))
mysql_pg_cdc_name = f"mysql_{mysql_table.name}"
this_mysql_cdc_table = MySqlCdcTableExists(name=mysql_pg_cdc_name)
cluster_name = random.choice(source_capable_clusters(capabilities))
existing_mysql_cdc_tables = [
s
for s in capabilities.get(MySqlCdcTableExists)
if s.name == this_mysql_cdc_table.name
]
if len(existing_mysql_cdc_tables) == 0:
self.new_mysql_cdc_table = True
self.mysql_cdc_table = this_mysql_cdc_table
self.mysql_cdc_table.mysql_table = mysql_table
self.cluster_name = cluster_name
elif len(existing_mysql_cdc_tables) == 1:
self.new_mysql_cdc_table = False
self.mysql_cdc_table = existing_mysql_cdc_tables[0]
else:
assert False
super().__init__(capabilities)
def run(self, c: Composition) -> None:
if self.new_mysql_cdc_table:
assert self.mysql_cdc_table is not None
assert self.mysql_cdc_table.mysql_table is not None
name = self.mysql_cdc_table.name
c.testdrive(
dedent(
f"""
> CREATE SECRET {name}_password AS '{MySql.DEFAULT_ROOT_PASSWORD}'
> CREATE CONNECTION {name}_conn TO MYSQL (
HOST mysql,
USER root,
PASSWORD SECRET {name}_password
)
> CREATE SOURCE {name}_source
IN CLUSTER {self.cluster_name}
FROM MYSQL CONNECTION {name}_conn
FOR TABLES (public.{self.mysql_cdc_table.mysql_table.name} AS {name})
"""
)
)
def provides(self) -> list[Capability]:
return [self.mysql_cdc_table] if self.new_mysql_cdc_table else []