Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 25 additions & 11 deletions mysql_ch_replicator/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,11 +434,14 @@ def __convert_alter_table_add_column(self, db_name, table_name, tokens):
raise Exception('add multiple columns not implemented', tokens)

column_after = None
column_first = False
if tokens[-2].lower() == 'after':
column_after = strip_sql_name(tokens[-1])
tokens = tokens[:-2]
if len(tokens) < 2:
raise Exception('wrong tokens count', tokens)
elif tokens[-1].lower() == 'first':
column_first = True

column_name = strip_sql_name(tokens[0])
column_type_mysql = tokens[1]
Expand All @@ -452,21 +455,32 @@ def __convert_alter_table_add_column(self, db_name, table_name, tokens):
mysql_table_structure: TableStructure = table_structure[0]
ch_table_structure: TableStructure = table_structure[1]

if column_after is None:
column_after = strip_sql_name(mysql_table_structure.fields[-1].name)
if column_first:
mysql_table_structure.add_field_first(
TableField(name=column_name, field_type=column_type_mysql)
)

ch_table_structure.add_field_first(
TableField(name=column_name, field_type=column_type_ch)
)
else:
if column_after is None:
column_after = strip_sql_name(mysql_table_structure.fields[-1].name)

mysql_table_structure.add_field_after(
TableField(name=column_name, field_type=column_type_mysql),
column_after,
)
mysql_table_structure.add_field_after(
TableField(name=column_name, field_type=column_type_mysql),
column_after,
)

ch_table_structure.add_field_after(
TableField(name=column_name, field_type=column_type_ch),
column_after,
)
ch_table_structure.add_field_after(
TableField(name=column_name, field_type=column_type_ch),
column_after,
)

query = f'ALTER TABLE {db_name}.{table_name} ADD COLUMN {column_name} {column_type_ch}'
if column_after is not None:
if column_first:
query += ' FIRST'
else:
query += f' AFTER {column_after}'

if self.db_replicator:
Expand Down
7 changes: 7 additions & 0 deletions mysql_ch_replicator/table_structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ def preprocess(self):
field_names.index(key) for key in self.primary_keys
]

def add_field_first(self, new_field: TableField):

self.fields.insert(0, new_field)
self.preprocess()

def add_field_after(self, new_field: TableField, after: str):

idx_to_insert = None
Expand All @@ -36,11 +41,13 @@ def add_field_after(self, new_field: TableField, after: str):
raise Exception('field after not found', after)

self.fields.insert(idx_to_insert, new_field)
self.preprocess()

def remove_field(self, field_name):
for idx, field in enumerate(self.fields):
if field.name == field_name:
del self.fields[idx]
self.preprocess()
return
raise Exception(f'field {field_name} not found')

Expand Down
81 changes: 81 additions & 0 deletions test_mysql_ch_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1173,6 +1173,87 @@ def test_percona_migration(monkeypatch):
binlog_replicator_runner.stop()


def test_add_column_first_after_and_drop_column(monkeypatch):
monkeypatch.setattr(DbReplicator, 'INITIAL_REPLICATION_BATCH_SIZE', 1)

cfg = config.Settings()
cfg.load(CONFIG_FILE)

mysql = mysql_api.MySQLApi(
database=None,
mysql_settings=cfg.mysql,
)

ch = clickhouse_api.ClickhouseApi(
database=TEST_DB_NAME,
clickhouse_settings=cfg.clickhouse,
)

prepare_env(cfg, mysql, ch)

mysql.execute(f'''
CREATE TABLE {TEST_TABLE_NAME} (
`id` int NOT NULL,
PRIMARY KEY (`id`));
''')

mysql.execute(
f"INSERT INTO {TEST_TABLE_NAME} (id) VALUES (42)",
commit=True,
)

binlog_replicator_runner = BinlogReplicatorRunner()
binlog_replicator_runner.run()
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME)
db_replicator_runner.run()

assert_wait(lambda: TEST_DB_NAME in ch.get_databases())

ch.execute_command(f'USE {TEST_DB_NAME}')

assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1)

# Test adding a column as the new first column, after another column, and dropping a column
# These all move the primary key column to a different index and test the table structure is
# updated correctly.

# Test add column first
mysql.execute(
f"ALTER TABLE {TEST_TABLE_NAME} ADD COLUMN c1 INT FIRST")
mysql.execute(
f"INSERT INTO {TEST_TABLE_NAME} (id, c1) VALUES (43, 11)",
commit=True,
)
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, where="id=43")) == 1)
assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="id=43")[0]['c1'] == 11)

# Test add column after
mysql.execute(
f"ALTER TABLE {TEST_TABLE_NAME} ADD COLUMN c2 INT AFTER c1")
mysql.execute(
f"INSERT INTO {TEST_TABLE_NAME} (id, c1, c2) VALUES (44, 111, 222)",
commit=True,
)
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, where="id=44")) == 1)
assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="id=44")[0]['c1'] == 111)
assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="id=44")[0]['c2'] == 222)

# Test drop column
mysql.execute(
f"ALTER TABLE {TEST_TABLE_NAME} DROP COLUMN c2")
mysql.execute(
f"INSERT INTO {TEST_TABLE_NAME} (id, c1) VALUES (45, 1111)",
commit=True,
)
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, where="id=45")) == 1)
assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="id=45")[0]['c1'] == 1111)
assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="id=45")[0].get('c2') is None)

db_replicator_runner.stop()
binlog_replicator_runner.stop()


def test_parse_mysql_table_structure():
query = "CREATE TABLE IF NOT EXISTS user_preferences_portal (\n\t\t\tid char(36) NOT NULL,\n\t\t\tcategory varchar(50) DEFAULT NULL,\n\t\t\tdeleted tinyint(1) DEFAULT 0,\n\t\t\tdate_entered datetime DEFAULT NULL,\n\t\t\tdate_modified datetime DEFAULT NULL,\n\t\t\tassigned_user_id char(36) DEFAULT NULL,\n\t\t\tcontents longtext DEFAULT NULL\n\t\t ) ENGINE=InnoDB DEFAULT CHARSET=utf8"

Expand Down
Loading