diff --git a/mysql_ch_replicator/converter.py b/mysql_ch_replicator/converter.py index af4088f..126d686 100644 --- a/mysql_ch_replicator/converter.py +++ b/mysql_ch_replicator/converter.py @@ -434,11 +434,14 @@ def __convert_alter_table_add_column(self, db_name, table_name, tokens): raise Exception('add multiple columns not implemented', tokens) column_after = None + column_first = False if tokens[-2].lower() == 'after': column_after = strip_sql_name(tokens[-1]) tokens = tokens[:-2] if len(tokens) < 2: raise Exception('wrong tokens count', tokens) + elif tokens[-1].lower() == 'first': + column_first = True column_name = strip_sql_name(tokens[0]) column_type_mysql = tokens[1] @@ -452,21 +455,32 @@ def __convert_alter_table_add_column(self, db_name, table_name, tokens): mysql_table_structure: TableStructure = table_structure[0] ch_table_structure: TableStructure = table_structure[1] - if column_after is None: - column_after = strip_sql_name(mysql_table_structure.fields[-1].name) + if column_first: + mysql_table_structure.add_field_first( + TableField(name=column_name, field_type=column_type_mysql) + ) + + ch_table_structure.add_field_first( + TableField(name=column_name, field_type=column_type_ch) + ) + else: + if column_after is None: + column_after = strip_sql_name(mysql_table_structure.fields[-1].name) - mysql_table_structure.add_field_after( - TableField(name=column_name, field_type=column_type_mysql), - column_after, - ) + mysql_table_structure.add_field_after( + TableField(name=column_name, field_type=column_type_mysql), + column_after, + ) - ch_table_structure.add_field_after( - TableField(name=column_name, field_type=column_type_ch), - column_after, - ) + ch_table_structure.add_field_after( + TableField(name=column_name, field_type=column_type_ch), + column_after, + ) query = f'ALTER TABLE {db_name}.{table_name} ADD COLUMN {column_name} {column_type_ch}' - if column_after is not None: + if column_first: + query += ' FIRST' + else: query += f' AFTER {column_after}' if self.db_replicator: diff --git a/mysql_ch_replicator/table_structure.py b/mysql_ch_replicator/table_structure.py index 3ffdce9..336e2ce 100644 --- a/mysql_ch_replicator/table_structure.py +++ b/mysql_ch_replicator/table_structure.py @@ -25,6 +25,11 @@ def preprocess(self): field_names.index(key) for key in self.primary_keys ] + def add_field_first(self, new_field: TableField): + + self.fields.insert(0, new_field) + self.preprocess() + def add_field_after(self, new_field: TableField, after: str): idx_to_insert = None @@ -36,11 +41,13 @@ def add_field_after(self, new_field: TableField, after: str): raise Exception('field after not found', after) self.fields.insert(idx_to_insert, new_field) + self.preprocess() def remove_field(self, field_name): for idx, field in enumerate(self.fields): if field.name == field_name: del self.fields[idx] + self.preprocess() return raise Exception(f'field {field_name} not found') diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index efa1c4e..b712e17 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -1173,6 +1173,87 @@ def test_percona_migration(monkeypatch): binlog_replicator_runner.stop() +def test_add_column_first_after_and_drop_column(monkeypatch): + monkeypatch.setattr(DbReplicator, 'INITIAL_REPLICATION_BATCH_SIZE', 1) + + cfg = config.Settings() + cfg.load(CONFIG_FILE) + + mysql = mysql_api.MySQLApi( + database=None, + mysql_settings=cfg.mysql, + ) + + ch = clickhouse_api.ClickhouseApi( + database=TEST_DB_NAME, + clickhouse_settings=cfg.clickhouse, + ) + + prepare_env(cfg, mysql, ch) + + mysql.execute(f''' +CREATE TABLE {TEST_TABLE_NAME} ( + `id` int NOT NULL, + PRIMARY KEY (`id`)); + ''') + + mysql.execute( + f"INSERT INTO {TEST_TABLE_NAME} (id) VALUES (42)", + commit=True, + ) + + binlog_replicator_runner = BinlogReplicatorRunner() + binlog_replicator_runner.run() + db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME) + db_replicator_runner.run() + + assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) + + ch.execute_command(f'USE {TEST_DB_NAME}') + + assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1) + + # Test adding a column as the new first column, after another column, and dropping a column + # These all move the primary key column to a different index and test the table structure is + # updated correctly. + + # Test add column first + mysql.execute( + f"ALTER TABLE {TEST_TABLE_NAME} ADD COLUMN c1 INT FIRST") + mysql.execute( + f"INSERT INTO {TEST_TABLE_NAME} (id, c1) VALUES (43, 11)", + commit=True, + ) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, where="id=43")) == 1) + assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="id=43")[0]['c1'] == 11) + + # Test add column after + mysql.execute( + f"ALTER TABLE {TEST_TABLE_NAME} ADD COLUMN c2 INT AFTER c1") + mysql.execute( + f"INSERT INTO {TEST_TABLE_NAME} (id, c1, c2) VALUES (44, 111, 222)", + commit=True, + ) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, where="id=44")) == 1) + assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="id=44")[0]['c1'] == 111) + assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="id=44")[0]['c2'] == 222) + + # Test drop column + mysql.execute( + f"ALTER TABLE {TEST_TABLE_NAME} DROP COLUMN c2") + mysql.execute( + f"INSERT INTO {TEST_TABLE_NAME} (id, c1) VALUES (45, 1111)", + commit=True, + ) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, where="id=45")) == 1) + assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="id=45")[0]['c1'] == 1111) + assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="id=45")[0].get('c2') is None) + + db_replicator_runner.stop() + binlog_replicator_runner.stop() + + def test_parse_mysql_table_structure(): query = "CREATE TABLE IF NOT EXISTS user_preferences_portal (\n\t\t\tid char(36) NOT NULL,\n\t\t\tcategory varchar(50) DEFAULT NULL,\n\t\t\tdeleted tinyint(1) DEFAULT 0,\n\t\t\tdate_entered datetime DEFAULT NULL,\n\t\t\tdate_modified datetime DEFAULT NULL,\n\t\t\tassigned_user_id char(36) DEFAULT NULL,\n\t\t\tcontents longtext DEFAULT NULL\n\t\t ) ENGINE=InnoDB DEFAULT CHARSET=utf8"