From be4999ee65cb257c6dfc44958b007201c544e379 Mon Sep 17 00:00:00 2001 From: liandong Date: Tue, 24 Jun 2025 11:20:08 +0800 Subject: [PATCH 1/2] Fix the conflict issue with reserved keywords. --- mysql_ch_replicator/mysql_api.py | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/mysql_ch_replicator/mysql_api.py b/mysql_ch_replicator/mysql_api.py index 2b1cc80..fd4e103 100644 --- a/mysql_ch_replicator/mysql_api.py +++ b/mysql_ch_replicator/mysql_api.py @@ -95,26 +95,35 @@ def get_table_create_statement(self, table_name) -> str: def get_records(self, table_name, order_by, limit, start_value=None, worker_id=None, total_workers=None): self.reconnect_if_required() - order_by_str = ','.join(order_by) + + # Escape column names with backticks to avoid issues with reserved keywords like "key" + order_by_escaped = [f'`{col}`' for col in order_by] + order_by_str = ','.join(order_by_escaped) + where = '' if start_value is not None: - start_value = ','.join(map(str, start_value)) - where = f'WHERE ({order_by_str}) > ({start_value}) ' + # Build the start_value condition for pagination + start_value_str = ','.join(map(str, start_value)) + where = f'WHERE ({order_by_str}) > ({start_value_str}) ' - # Add partitioning filter for parallel processing if needed + # Add partitioning filter for parallel processing (e.g., sharded crawling) if worker_id is not None and total_workers is not None and total_workers > 1: - # Use a list comprehension to build the COALESCE expressions with proper quoting - coalesce_expressions = [f"COALESCE({key}, '')" for key in order_by] + # Escape column names in COALESCE expressions + coalesce_expressions = [f"COALESCE(`{key}`, '')" for key in order_by] concat_keys = f"CONCAT_WS('|', {', '.join(coalesce_expressions)})" hash_condition = f"CRC32({concat_keys}) % {total_workers} = {worker_id}" + if where: where += f'AND {hash_condition} ' else: where = f'WHERE {hash_condition} ' - + + # Construct final query query = f'SELECT * FROM `{table_name}` {where}ORDER BY {order_by_str} LIMIT {limit}' - # Execute the actual query + print("Executing query:", query) + + # Execute the query self.cursor.execute(query) res = self.cursor.fetchall() records = [x for x in res] From 41214a61c129f7f2c98fe10d4e3162d8145bee56 Mon Sep 17 00:00:00 2001 From: liandong Date: Tue, 24 Jun 2025 11:31:23 +0800 Subject: [PATCH 2/2] Remove the SQL query print statement used for debugging --- mysql_ch_replicator/mysql_api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mysql_ch_replicator/mysql_api.py b/mysql_ch_replicator/mysql_api.py index fd4e103..1ba8ae1 100644 --- a/mysql_ch_replicator/mysql_api.py +++ b/mysql_ch_replicator/mysql_api.py @@ -121,7 +121,7 @@ def get_records(self, table_name, order_by, limit, start_value=None, worker_id=N # Construct final query query = f'SELECT * FROM `{table_name}` {where}ORDER BY {order_by_str} LIMIT {limit}' - print("Executing query:", query) +# print("Executing query:", query) # Execute the query self.cursor.execute(query)