diff --git a/mysql_ch_replicator/clickhouse_api.py b/mysql_ch_replicator/clickhouse_api.py index 8426093..5a5fe56 100644 --- a/mysql_ch_replicator/clickhouse_api.py +++ b/mysql_ch_replicator/clickhouse_api.py @@ -14,7 +14,7 @@ CREATE_TABLE_QUERY = ''' -CREATE TABLE {db_name}.{table_name} +CREATE TABLE {if_not_exists} {db_name}.{table_name} ( {fields}, `_version` UInt64, @@ -165,6 +165,7 @@ def create_table(self, structure: TableStructure, additional_indexes: list | Non primary_key = f'({primary_key})' query = CREATE_TABLE_QUERY.format(**{ + 'if_not_exists': 'IF NOT EXISTS' if structure.if_not_exists else '', 'db_name': self.database, 'table_name': structure.table_name, 'fields': fields, diff --git a/mysql_ch_replicator/converter.py b/mysql_ch_replicator/converter.py index 076d8c8..99db0d1 100644 --- a/mysql_ch_replicator/converter.py +++ b/mysql_ch_replicator/converter.py @@ -262,6 +262,7 @@ def convert_field_type(self, mysql_type, mysql_parameters): def convert_table_structure(self, mysql_structure: TableStructure) -> TableStructure: clickhouse_structure = TableStructure() clickhouse_structure.table_name = mysql_structure.table_name + clickhouse_structure.if_not_exists = mysql_structure.if_not_exists for field in mysql_structure.fields: clickhouse_field_type = self.convert_field_type(field.field_type, field.parameters) clickhouse_structure.fields.append(TableField( diff --git a/mysql_ch_replicator/db_replicator.py b/mysql_ch_replicator/db_replicator.py index 0d4f4fa..1250fc3 100644 --- a/mysql_ch_replicator/db_replicator.py +++ b/mysql_ch_replicator/db_replicator.py @@ -499,8 +499,16 @@ def handle_drop_table_query(self, query, db_name): tokens = query.split() if tokens[0].lower() != 'drop' or tokens[1].lower() != 'table': raise Exception('wrong drop table query', query) + + if_exists = (len(tokens) > 4 and + tokens[2].lower() == 'if' and + tokens[3].lower() == 'exists') + if if_exists: + del tokens[2:4] # Remove the 'IF', 'EXISTS' tokens + if len(tokens) != 3: raise Exception('wrong token count', query) + table_name = tokens[2] if '.' in table_name: db_name, table_name = table_name.split('.') @@ -508,8 +516,9 @@ def handle_drop_table_query(self, query, db_name): db_name = self.target_database table_name = strip_sql_name(table_name) db_name = strip_sql_name(db_name) - self.state.tables_structure.pop(table_name) - self.clickhouse_api.execute_command(f'DROP TABLE {db_name}.{table_name}') + if table_name in self.state.tables_structure: + self.state.tables_structure.pop(table_name) + self.clickhouse_api.execute_command(f'DROP TABLE {"IF EXISTS" if if_exists else ""} {db_name}.{table_name}') def log_stats_if_required(self): curr_time = time.time()