diff --git a/mysql_ch_replicator/mysql_api.py b/mysql_ch_replicator/mysql_api.py index 2b1cc80..1ba8ae1 100644 --- a/mysql_ch_replicator/mysql_api.py +++ b/mysql_ch_replicator/mysql_api.py @@ -95,26 +95,35 @@ def get_table_create_statement(self, table_name) -> str: def get_records(self, table_name, order_by, limit, start_value=None, worker_id=None, total_workers=None): self.reconnect_if_required() - order_by_str = ','.join(order_by) + + # Escape column names with backticks to avoid issues with reserved keywords like "key" + order_by_escaped = [f'`{col}`' for col in order_by] + order_by_str = ','.join(order_by_escaped) + where = '' if start_value is not None: - start_value = ','.join(map(str, start_value)) - where = f'WHERE ({order_by_str}) > ({start_value}) ' + # Build the start_value condition for pagination + start_value_str = ','.join(map(str, start_value)) + where = f'WHERE ({order_by_str}) > ({start_value_str}) ' - # Add partitioning filter for parallel processing if needed + # Add partitioning filter for parallel processing (e.g., sharded crawling) if worker_id is not None and total_workers is not None and total_workers > 1: - # Use a list comprehension to build the COALESCE expressions with proper quoting - coalesce_expressions = [f"COALESCE({key}, '')" for key in order_by] + # Escape column names in COALESCE expressions + coalesce_expressions = [f"COALESCE(`{key}`, '')" for key in order_by] concat_keys = f"CONCAT_WS('|', {', '.join(coalesce_expressions)})" hash_condition = f"CRC32({concat_keys}) % {total_workers} = {worker_id}" + if where: where += f'AND {hash_condition} ' else: where = f'WHERE {hash_condition} ' - + + # Construct final query query = f'SELECT * FROM `{table_name}` {where}ORDER BY {order_by_str} LIMIT {limit}' - # Execute the actual query +# print("Executing query:", query) + + # Execute the query self.cursor.execute(query) res = self.cursor.fetchall() records = [x for x in res]