From 1aea3fd56da72fedf317f9f8685f7a637ce29603 Mon Sep 17 00:00:00 2001 From: Filipp Ozinov Date: Wed, 26 Mar 2025 21:12:38 +0400 Subject: [PATCH] Polygon type support --- mysql_ch_replicator/converter.py | 55 ++++++++++++ test_mysql_ch_replicator.py | 143 +++++++++++++++++++++++++++++++ 2 files changed, 198 insertions(+) diff --git a/mysql_ch_replicator/converter.py b/mysql_ch_replicator/converter.py index f40a104..2efd201 100644 --- a/mysql_ch_replicator/converter.py +++ b/mysql_ch_replicator/converter.py @@ -133,6 +133,55 @@ def parse_mysql_point(binary): return (x, y) +def parse_mysql_polygon(binary): + """ + Parses the binary representation of a MySQL POLYGON data type + and returns a list of tuples [(x1,y1), (x2,y2), ...] representing the polygon vertices. + + :param binary: The binary data representing the POLYGON. + :return: A list of tuples with the coordinate values. + """ + if binary is None: + return [] + + # Determine if SRID is present (25 bytes for header with SRID, 21 without) + has_srid = len(binary) > 25 + offset = 4 if has_srid else 0 + + # Read byte order + byte_order = binary[offset] + if byte_order == 0: + endian = '>' + elif byte_order == 1: + endian = '<' + else: + raise ValueError("Invalid byte order in WKB POLYGON") + + # Read WKB Type + wkb_type = struct.unpack(endian + 'I', binary[offset + 1:offset + 5])[0] + if wkb_type != 3: # WKB type 3 means POLYGON + raise ValueError("Not a WKB POLYGON type") + + # Read number of rings (polygons can have holes) + num_rings = struct.unpack(endian + 'I', binary[offset + 5:offset + 9])[0] + if num_rings == 0: + return [] + + # Read the first ring (outer boundary) + ring_offset = offset + 9 + num_points = struct.unpack(endian + 'I', binary[ring_offset:ring_offset + 4])[0] + points = [] + + # Read each point in the ring + for i in range(num_points): + point_offset = ring_offset + 4 + (i * 16) # 16 bytes per point (8 for x, 8 for y) + x = struct.unpack(endian + 'd', binary[point_offset:point_offset + 8])[0] + y = struct.unpack(endian + 'd', binary[point_offset + 8:point_offset + 16])[0] + points.append((x, y)) + + return points + + def strip_sql_name(name): name = name.strip() if name.startswith('`'): @@ -201,6 +250,9 @@ def convert_type(self, mysql_type, parameters): if mysql_type == 'point': return 'Tuple(x Float32, y Float32)' + if mysql_type == 'polygon': + return 'Array(Tuple(x Float32, y Float32))' + # Correctly handle numeric types if mysql_type.startswith('numeric'): # Determine if parameters are specified via parentheses: @@ -433,6 +485,9 @@ def convert_record( if mysql_field_type.startswith('point'): clickhouse_field_value = parse_mysql_point(clickhouse_field_value) + if mysql_field_type.startswith('polygon'): + clickhouse_field_value = parse_mysql_polygon(clickhouse_field_value) + if mysql_field_type.startswith('enum('): enum_values = mysql_structure.fields[idx].additional_data field_name = mysql_structure.fields[idx].name if idx < len(mysql_structure.fields) else "unknown" diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index 7f30026..44bc65c 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -1620,6 +1620,149 @@ def test_enum_conversion(): assert_wait(lambda: 'stopping db_replicator' in read_logs(TEST_DB_NAME)) assert('Traceback' not in read_logs(TEST_DB_NAME)) + +def test_polygon_type(): + """ + Test that polygon type is properly converted and handled between MySQL and ClickHouse. + Tests both the type conversion and data handling for polygon values. + """ + config_file = CONFIG_FILE + cfg = config.Settings() + cfg.load(config_file) + mysql_config = cfg.mysql + clickhouse_config = cfg.clickhouse + mysql = mysql_api.MySQLApi( + database=None, + mysql_settings=mysql_config + ) + ch = clickhouse_api.ClickhouseApi( + database=TEST_DB_NAME, + clickhouse_settings=clickhouse_config + ) + + prepare_env(cfg, mysql, ch) + + # Create a table with polygon type + mysql.execute(f''' + CREATE TABLE `{TEST_TABLE_NAME}` ( + id INT NOT NULL AUTO_INCREMENT, + name VARCHAR(50) NOT NULL, + area POLYGON NOT NULL, + nullable_area POLYGON, + PRIMARY KEY (id) + ) + ''') + + # Insert test data with polygons + # Using ST_GeomFromText to create polygons from WKT (Well-Known Text) format + mysql.execute(f''' + INSERT INTO `{TEST_TABLE_NAME}` (name, area, nullable_area) VALUES + ('Square', ST_GeomFromText('POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))'), ST_GeomFromText('POLYGON((0 0, 0 2, 2 2, 2 0, 0 0))')), + ('Triangle', ST_GeomFromText('POLYGON((0 0, 1 0, 0.5 1, 0 0))'), NULL), + ('Complex', ST_GeomFromText('POLYGON((0 0, 0 3, 3 3, 3 0, 0 0))'), ST_GeomFromText('POLYGON((1 1, 1 2, 2 2, 2 1, 1 1))')); + ''', commit=True) + + run_all_runner = RunAllRunner(cfg_file=config_file) + run_all_runner.run() + + assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) + ch.execute_command(f'USE `{TEST_DB_NAME}`') + assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3) + + # Get the ClickHouse data + results = ch.select(TEST_TABLE_NAME) + + # Verify the data + assert len(results) == 3 + + # Check first row (Square) + assert results[0]['name'] == 'Square' + assert len(results[0]['area']) == 5 # Square has 5 points (including closing point) + assert len(results[0]['nullable_area']) == 5 + # Verify some specific points + assert results[0]['area'][0] == {'x': 0.0, 'y': 0.0} + assert results[0]['area'][1] == {'x': 0.0, 'y': 1.0} + assert results[0]['area'][2] == {'x': 1.0, 'y': 1.0} + assert results[0]['area'][3] == {'x': 1.0, 'y': 0.0} + assert results[0]['area'][4] == {'x': 0.0, 'y': 0.0} # Closing point + + # Check second row (Triangle) + assert results[1]['name'] == 'Triangle' + assert len(results[1]['area']) == 4 # Triangle has 4 points (including closing point) + assert results[1]['nullable_area'] == [] # NULL values are returned as empty list + # Verify some specific points + assert results[1]['area'][0] == {'x': 0.0, 'y': 0.0} + assert results[1]['area'][1] == {'x': 1.0, 'y': 0.0} + assert results[1]['area'][2] == {'x': 0.5, 'y': 1.0} + assert results[1]['area'][3] == {'x': 0.0, 'y': 0.0} # Closing point + + # Check third row (Complex) + assert results[2]['name'] == 'Complex' + assert len(results[2]['area']) == 5 # Outer square + assert len(results[2]['nullable_area']) == 5 # Inner square + # Verify some specific points + assert results[2]['area'][0] == {'x': 0.0, 'y': 0.0} + assert results[2]['area'][2] == {'x': 3.0, 'y': 3.0} + assert results[2]['nullable_area'][0] == {'x': 1.0, 'y': 1.0} + assert results[2]['nullable_area'][2] == {'x': 2.0, 'y': 2.0} + + # Test realtime replication by adding more records + mysql.execute(f''' + INSERT INTO `{TEST_TABLE_NAME}` (name, area, nullable_area) VALUES + ('Pentagon', ST_GeomFromText('POLYGON((0 0, 1 0, 1.5 1, 0.5 1.5, 0 0))'), ST_GeomFromText('POLYGON((0.2 0.2, 0.8 0.2, 1 0.8, 0.5 1, 0.2 0.2))')), + ('Hexagon', ST_GeomFromText('POLYGON((0 0, 1 0, 1.5 0.5, 1 1, 0.5 1, 0 0))'), NULL), + ('Circle', ST_GeomFromText('POLYGON((0 0, 0 2, 2 2, 2 0, 0 0))'), ST_GeomFromText('POLYGON((0.5 0.5, 0.5 1.5, 1.5 1.5, 1.5 0.5, 0.5 0.5))')); + ''', commit=True) + + # Wait for new records to be replicated + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 6) + + # Verify the new records using WHERE clauses + # Check Pentagon + pentagon = ch.select(TEST_TABLE_NAME, where="name='Pentagon'")[0] + assert pentagon['name'] == 'Pentagon' + assert len(pentagon['area']) == 5 # Pentagon has 5 points + assert len(pentagon['nullable_area']) == 5 # Inner pentagon + assert abs(pentagon['area'][0]['x'] - 0.0) < 1e-6 + assert abs(pentagon['area'][0]['y'] - 0.0) < 1e-6 + assert abs(pentagon['area'][2]['x'] - 1.5) < 1e-6 + assert abs(pentagon['area'][2]['y'] - 1.0) < 1e-6 + assert abs(pentagon['nullable_area'][0]['x'] - 0.2) < 1e-6 + assert abs(pentagon['nullable_area'][0]['y'] - 0.2) < 1e-6 + assert abs(pentagon['nullable_area'][2]['x'] - 1.0) < 1e-6 + assert abs(pentagon['nullable_area'][2]['y'] - 0.8) < 1e-6 + + # Check Hexagon + hexagon = ch.select(TEST_TABLE_NAME, where="name='Hexagon'")[0] + assert hexagon['name'] == 'Hexagon' + assert len(hexagon['area']) == 6 # Hexagon has 6 points + assert hexagon['nullable_area'] == [] # NULL values are returned as empty list + assert abs(hexagon['area'][0]['x'] - 0.0) < 1e-6 + assert abs(hexagon['area'][0]['y'] - 0.0) < 1e-6 + assert abs(hexagon['area'][2]['x'] - 1.5) < 1e-6 + assert abs(hexagon['area'][2]['y'] - 0.5) < 1e-6 + assert abs(hexagon['area'][4]['x'] - 0.5) < 1e-6 + assert abs(hexagon['area'][4]['y'] - 1.0) < 1e-6 + + # Check Circle + circle = ch.select(TEST_TABLE_NAME, where="name='Circle'")[0] + assert circle['name'] == 'Circle' + assert len(circle['area']) == 5 # Outer square + assert len(circle['nullable_area']) == 5 # Inner square + assert abs(circle['area'][0]['x'] - 0.0) < 1e-6 + assert abs(circle['area'][0]['y'] - 0.0) < 1e-6 + assert abs(circle['area'][2]['x'] - 2.0) < 1e-6 + assert abs(circle['area'][2]['y'] - 2.0) < 1e-6 + assert abs(circle['nullable_area'][0]['x'] - 0.5) < 1e-6 + assert abs(circle['nullable_area'][0]['y'] - 0.5) < 1e-6 + assert abs(circle['nullable_area'][2]['x'] - 1.5) < 1e-6 + assert abs(circle['nullable_area'][2]['y'] - 1.5) < 1e-6 + + run_all_runner.stop() + assert_wait(lambda: 'stopping db_replicator' in read_logs(TEST_DB_NAME)) + assert('Traceback' not in read_logs(TEST_DB_NAME)) + @pytest.mark.parametrize("query,expected", [ ("CREATE TABLE `mydb`.`mytable` (id INT)", "mydb"), ("CREATE TABLE mydb.mytable (id INT)", "mydb"),