Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions mysql_ch_replicator/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,8 @@ def parse_mysql_table_structure(self, create_statement, required_table_name=None
continue
if line.lower().startswith('fulltext'):
continue
if line.lower().startswith('spatial'):
continue
if line.lower().startswith('primary key'):
# Define identifier to match column names, handling backticks and unquoted names
identifier = (Suppress('`') + Word(alphas + alphanums + '_') + Suppress('`')) | Word(
Expand Down
16 changes: 9 additions & 7 deletions test_mysql_ch_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,15 +315,17 @@ def test_runner():
name varchar(255),
age int,
rate decimal(10,4),
coordinate point NOT NULL,
KEY `IDX_age` (`age`),
FULLTEXT KEY `IDX_name` (`name`),
PRIMARY KEY (id)
PRIMARY KEY (id),
SPATIAL KEY `coordinate` (`coordinate`)
) ENGINE=InnoDB AUTO_INCREMENT=2478808 DEFAULT CHARSET=latin1;
''')


mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES ('Ivan', 42);", commit=True)
mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES ('Peter', 33);", commit=True)
mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age, coordinate) VALUES ('Ivan', 42, POINT(10.0, 20.0));", commit=True)
mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age, coordinate) VALUES ('Peter', 33, POINT(10.0, 20.0));", commit=True)

run_all_runner = RunAllRunner()
run_all_runner.run()
Expand All @@ -335,7 +337,7 @@ def test_runner():
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2)

mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES ('Filipp', 50);", commit=True)
mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age, coordinate) VALUES ('Filipp', 50, POINT(10.0, 20.0));", commit=True)
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3)
assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Filipp'")[0]['age'] == 50)

Expand All @@ -346,7 +348,7 @@ def test_runner():
kill_process(binlog_repl_pid)
kill_process(db_repl_pid, force=True)

mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, rate) VALUES ('John', 12.5);", commit=True)
mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, rate, coordinate) VALUES ('John', 12.5, POINT(10.0, 20.0));", commit=True)
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 4)
assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='John'")[0]['rate'] == 12.5)

Expand All @@ -362,14 +364,14 @@ def test_runner():
mysql.execute(f"UPDATE {TEST_TABLE_NAME} SET age=88 WHERE name='Ivan'", commit=True)
assert_wait(lambda: ch.select(TEST_TABLE_NAME, "name='Ivan'")[0]['age'] == 88)

mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES ('Vlad', 99);", commit=True)
mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age, coordinate) VALUES ('Vlad', 99, POINT(10.0, 20.0));", commit=True)

assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 4)

assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, final=False)) == 4)

mysql.execute(
command=f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES (%s, %s);",
command=f"INSERT INTO {TEST_TABLE_NAME} (name, age, coordinate) VALUES (%s, %s, POINT(10.0, 20.0));",
args=(b'H\xe4llo'.decode('latin-1'), 1912),
commit=True,
)
Expand Down
Loading