Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions mysql_ch_replicator/mysql_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,26 +95,35 @@ def get_table_create_statement(self, table_name) -> str:

def get_records(self, table_name, order_by, limit, start_value=None, worker_id=None, total_workers=None):
self.reconnect_if_required()
order_by_str = ','.join(order_by)

# Escape column names with backticks to avoid issues with reserved keywords like "key"
order_by_escaped = [f'`{col}`' for col in order_by]
order_by_str = ','.join(order_by_escaped)

where = ''
if start_value is not None:
start_value = ','.join(map(str, start_value))
where = f'WHERE ({order_by_str}) > ({start_value}) '
# Build the start_value condition for pagination
start_value_str = ','.join(map(str, start_value))
where = f'WHERE ({order_by_str}) > ({start_value_str}) '

# Add partitioning filter for parallel processing if needed
# Add partitioning filter for parallel processing (e.g., sharded crawling)
if worker_id is not None and total_workers is not None and total_workers > 1:
# Use a list comprehension to build the COALESCE expressions with proper quoting
coalesce_expressions = [f"COALESCE({key}, '')" for key in order_by]
# Escape column names in COALESCE expressions
coalesce_expressions = [f"COALESCE(`{key}`, '')" for key in order_by]
concat_keys = f"CONCAT_WS('|', {', '.join(coalesce_expressions)})"
hash_condition = f"CRC32({concat_keys}) % {total_workers} = {worker_id}"

if where:
where += f'AND {hash_condition} '
else:
where = f'WHERE {hash_condition} '


# Construct final query
query = f'SELECT * FROM `{table_name}` {where}ORDER BY {order_by_str} LIMIT {limit}'

# Execute the actual query
# print("Executing query:", query)

# Execute the query
self.cursor.execute(query)
res = self.cursor.fetchall()
records = [x for x in res]
Expand Down