Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions mysql_ch_replicator/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,55 @@ def parse_mysql_point(binary):
return (x, y)


def parse_mysql_polygon(binary):
"""
Parses the binary representation of a MySQL POLYGON data type
and returns a list of tuples [(x1,y1), (x2,y2), ...] representing the polygon vertices.

:param binary: The binary data representing the POLYGON.
:return: A list of tuples with the coordinate values.
"""
if binary is None:
return []

# Determine if SRID is present (25 bytes for header with SRID, 21 without)
has_srid = len(binary) > 25
offset = 4 if has_srid else 0

# Read byte order
byte_order = binary[offset]
if byte_order == 0:
endian = '>'
elif byte_order == 1:
endian = '<'
else:
raise ValueError("Invalid byte order in WKB POLYGON")

# Read WKB Type
wkb_type = struct.unpack(endian + 'I', binary[offset + 1:offset + 5])[0]
if wkb_type != 3: # WKB type 3 means POLYGON
raise ValueError("Not a WKB POLYGON type")

# Read number of rings (polygons can have holes)
num_rings = struct.unpack(endian + 'I', binary[offset + 5:offset + 9])[0]
if num_rings == 0:
return []

# Read the first ring (outer boundary)
ring_offset = offset + 9
num_points = struct.unpack(endian + 'I', binary[ring_offset:ring_offset + 4])[0]
points = []

# Read each point in the ring
for i in range(num_points):
point_offset = ring_offset + 4 + (i * 16) # 16 bytes per point (8 for x, 8 for y)
x = struct.unpack(endian + 'd', binary[point_offset:point_offset + 8])[0]
y = struct.unpack(endian + 'd', binary[point_offset + 8:point_offset + 16])[0]
points.append((x, y))

return points


def strip_sql_name(name):
name = name.strip()
if name.startswith('`'):
Expand Down Expand Up @@ -201,6 +250,9 @@ def convert_type(self, mysql_type, parameters):
if mysql_type == 'point':
return 'Tuple(x Float32, y Float32)'

if mysql_type == 'polygon':
return 'Array(Tuple(x Float32, y Float32))'

# Correctly handle numeric types
if mysql_type.startswith('numeric'):
# Determine if parameters are specified via parentheses:
Expand Down Expand Up @@ -433,6 +485,9 @@ def convert_record(
if mysql_field_type.startswith('point'):
clickhouse_field_value = parse_mysql_point(clickhouse_field_value)

if mysql_field_type.startswith('polygon'):
clickhouse_field_value = parse_mysql_polygon(clickhouse_field_value)

if mysql_field_type.startswith('enum('):
enum_values = mysql_structure.fields[idx].additional_data
field_name = mysql_structure.fields[idx].name if idx < len(mysql_structure.fields) else "unknown"
Expand Down
143 changes: 143 additions & 0 deletions test_mysql_ch_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1620,6 +1620,149 @@ def test_enum_conversion():
assert_wait(lambda: 'stopping db_replicator' in read_logs(TEST_DB_NAME))
assert('Traceback' not in read_logs(TEST_DB_NAME))


def test_polygon_type():
"""
Test that polygon type is properly converted and handled between MySQL and ClickHouse.
Tests both the type conversion and data handling for polygon values.
"""
config_file = CONFIG_FILE
cfg = config.Settings()
cfg.load(config_file)
mysql_config = cfg.mysql
clickhouse_config = cfg.clickhouse
mysql = mysql_api.MySQLApi(
database=None,
mysql_settings=mysql_config
)
ch = clickhouse_api.ClickhouseApi(
database=TEST_DB_NAME,
clickhouse_settings=clickhouse_config
)

prepare_env(cfg, mysql, ch)

# Create a table with polygon type
mysql.execute(f'''
CREATE TABLE `{TEST_TABLE_NAME}` (
id INT NOT NULL AUTO_INCREMENT,
name VARCHAR(50) NOT NULL,
area POLYGON NOT NULL,
nullable_area POLYGON,
PRIMARY KEY (id)
)
''')

# Insert test data with polygons
# Using ST_GeomFromText to create polygons from WKT (Well-Known Text) format
mysql.execute(f'''
INSERT INTO `{TEST_TABLE_NAME}` (name, area, nullable_area) VALUES
('Square', ST_GeomFromText('POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))'), ST_GeomFromText('POLYGON((0 0, 0 2, 2 2, 2 0, 0 0))')),
('Triangle', ST_GeomFromText('POLYGON((0 0, 1 0, 0.5 1, 0 0))'), NULL),
('Complex', ST_GeomFromText('POLYGON((0 0, 0 3, 3 3, 3 0, 0 0))'), ST_GeomFromText('POLYGON((1 1, 1 2, 2 2, 2 1, 1 1))'));
''', commit=True)

run_all_runner = RunAllRunner(cfg_file=config_file)
run_all_runner.run()

assert_wait(lambda: TEST_DB_NAME in ch.get_databases())
ch.execute_command(f'USE `{TEST_DB_NAME}`')
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3)

# Get the ClickHouse data
results = ch.select(TEST_TABLE_NAME)

# Verify the data
assert len(results) == 3

# Check first row (Square)
assert results[0]['name'] == 'Square'
assert len(results[0]['area']) == 5 # Square has 5 points (including closing point)
assert len(results[0]['nullable_area']) == 5
# Verify some specific points
assert results[0]['area'][0] == {'x': 0.0, 'y': 0.0}
assert results[0]['area'][1] == {'x': 0.0, 'y': 1.0}
assert results[0]['area'][2] == {'x': 1.0, 'y': 1.0}
assert results[0]['area'][3] == {'x': 1.0, 'y': 0.0}
assert results[0]['area'][4] == {'x': 0.0, 'y': 0.0} # Closing point

# Check second row (Triangle)
assert results[1]['name'] == 'Triangle'
assert len(results[1]['area']) == 4 # Triangle has 4 points (including closing point)
assert results[1]['nullable_area'] == [] # NULL values are returned as empty list
# Verify some specific points
assert results[1]['area'][0] == {'x': 0.0, 'y': 0.0}
assert results[1]['area'][1] == {'x': 1.0, 'y': 0.0}
assert results[1]['area'][2] == {'x': 0.5, 'y': 1.0}
assert results[1]['area'][3] == {'x': 0.0, 'y': 0.0} # Closing point

# Check third row (Complex)
assert results[2]['name'] == 'Complex'
assert len(results[2]['area']) == 5 # Outer square
assert len(results[2]['nullable_area']) == 5 # Inner square
# Verify some specific points
assert results[2]['area'][0] == {'x': 0.0, 'y': 0.0}
assert results[2]['area'][2] == {'x': 3.0, 'y': 3.0}
assert results[2]['nullable_area'][0] == {'x': 1.0, 'y': 1.0}
assert results[2]['nullable_area'][2] == {'x': 2.0, 'y': 2.0}

# Test realtime replication by adding more records
mysql.execute(f'''
INSERT INTO `{TEST_TABLE_NAME}` (name, area, nullable_area) VALUES
('Pentagon', ST_GeomFromText('POLYGON((0 0, 1 0, 1.5 1, 0.5 1.5, 0 0))'), ST_GeomFromText('POLYGON((0.2 0.2, 0.8 0.2, 1 0.8, 0.5 1, 0.2 0.2))')),
('Hexagon', ST_GeomFromText('POLYGON((0 0, 1 0, 1.5 0.5, 1 1, 0.5 1, 0 0))'), NULL),
('Circle', ST_GeomFromText('POLYGON((0 0, 0 2, 2 2, 2 0, 0 0))'), ST_GeomFromText('POLYGON((0.5 0.5, 0.5 1.5, 1.5 1.5, 1.5 0.5, 0.5 0.5))'));
''', commit=True)

# Wait for new records to be replicated
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 6)

# Verify the new records using WHERE clauses
# Check Pentagon
pentagon = ch.select(TEST_TABLE_NAME, where="name='Pentagon'")[0]
assert pentagon['name'] == 'Pentagon'
assert len(pentagon['area']) == 5 # Pentagon has 5 points
assert len(pentagon['nullable_area']) == 5 # Inner pentagon
assert abs(pentagon['area'][0]['x'] - 0.0) < 1e-6
assert abs(pentagon['area'][0]['y'] - 0.0) < 1e-6
assert abs(pentagon['area'][2]['x'] - 1.5) < 1e-6
assert abs(pentagon['area'][2]['y'] - 1.0) < 1e-6
assert abs(pentagon['nullable_area'][0]['x'] - 0.2) < 1e-6
assert abs(pentagon['nullable_area'][0]['y'] - 0.2) < 1e-6
assert abs(pentagon['nullable_area'][2]['x'] - 1.0) < 1e-6
assert abs(pentagon['nullable_area'][2]['y'] - 0.8) < 1e-6

# Check Hexagon
hexagon = ch.select(TEST_TABLE_NAME, where="name='Hexagon'")[0]
assert hexagon['name'] == 'Hexagon'
assert len(hexagon['area']) == 6 # Hexagon has 6 points
assert hexagon['nullable_area'] == [] # NULL values are returned as empty list
assert abs(hexagon['area'][0]['x'] - 0.0) < 1e-6
assert abs(hexagon['area'][0]['y'] - 0.0) < 1e-6
assert abs(hexagon['area'][2]['x'] - 1.5) < 1e-6
assert abs(hexagon['area'][2]['y'] - 0.5) < 1e-6
assert abs(hexagon['area'][4]['x'] - 0.5) < 1e-6
assert abs(hexagon['area'][4]['y'] - 1.0) < 1e-6

# Check Circle
circle = ch.select(TEST_TABLE_NAME, where="name='Circle'")[0]
assert circle['name'] == 'Circle'
assert len(circle['area']) == 5 # Outer square
assert len(circle['nullable_area']) == 5 # Inner square
assert abs(circle['area'][0]['x'] - 0.0) < 1e-6
assert abs(circle['area'][0]['y'] - 0.0) < 1e-6
assert abs(circle['area'][2]['x'] - 2.0) < 1e-6
assert abs(circle['area'][2]['y'] - 2.0) < 1e-6
assert abs(circle['nullable_area'][0]['x'] - 0.5) < 1e-6
assert abs(circle['nullable_area'][0]['y'] - 0.5) < 1e-6
assert abs(circle['nullable_area'][2]['x'] - 1.5) < 1e-6
assert abs(circle['nullable_area'][2]['y'] - 1.5) < 1e-6

run_all_runner.stop()
assert_wait(lambda: 'stopping db_replicator' in read_logs(TEST_DB_NAME))
assert('Traceback' not in read_logs(TEST_DB_NAME))

@pytest.mark.parametrize("query,expected", [
("CREATE TABLE `mydb`.`mytable` (id INT)", "mydb"),
("CREATE TABLE mydb.mytable (id INT)", "mydb"),
Expand Down