Skip to content

Commit

Permalink
Merge pull request #416 from aadant/add_max_datetime_value
Browse files Browse the repository at this point in the history
Add --max_datetime_value to compare with CH DateTime64
  • Loading branch information
aadant committed Dec 14, 2023
2 parents 9860722 + d82a8fb commit 2620aec
Showing 1 changed file with 8 additions and 5 deletions.
13 changes: 8 additions & 5 deletions sink-connector/python/db_compare/mysql_table_checksum.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def get_table_checksum_query(table, conn, binary_encoding):
first_column = True
min_date_value = args.min_date_value
max_date_value = args.max_date_value
max_datetime_value = args.max_datetime_value
for row in rowset:
column_name = '`'+row['column_name']+'`'
data_type = row['data_type']
Expand All @@ -82,10 +83,10 @@ def get_table_checksum_query(table, conn, binary_encoding):
nullables.append(column_name)
if 'datetime' == data_type or 'datetime(1)' == data_type or 'datetime(2)' == data_type or 'datetime(3)' == data_type:
# CH datetime range is not the same as MySQL https://clickhouse.com/docs/en/sql-reference/data-types/datetime64/
select += f"case when {column_name} > substr('{max_date_value} 23:59:59.999', 1, length({column_name})) then substr(TRIM(TRAILING '0' FROM CAST('{max_date_value} 23:59:59.999' AS datetime(3))),1,length({column_name})) else case when {column_name} <= '{min_date_value} 00:00:00' then TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM CAST('{min_date_value} 00:00:00.000' AS datetime(3)))) else TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM {column_name})) end end"
select += f"case when {column_name} >= substr('{max_datetime_value}', 1, length({column_name})) then substr(TRIM(TRAILING '0' FROM CAST('{max_datetime_value}' AS datetime(3))),1,length({column_name})) else case when {column_name} <= '{min_date_value} 00:00:00' then TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM CAST('{min_date_value} 00:00:00.000' AS datetime(3)))) else TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM {column_name})) end end"
elif 'datetime(4)' == data_type or 'datetime(5)' == data_type or 'datetime(6)' == data_type:
# CH datetime range is not the same as MySQL https://clickhouse.com/docs/en/sql-reference/data-types/datetime64/ii
select += f"case when {column_name} > substr('{max_date_value} 23:59:59.999999', 1, length({column_name})) then substr(TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM CAST('{max_date_value} 23:59:59.999999' AS datetime(6)))),1,length({column_name})) else case when {column_name} <= '{min_date_value} 00:00:00' then TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM CAST('{min_date_value} 00:00:00.000000' AS datetime(6)))) else TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM {column_name})) end end"
select += f"case when {column_name} >= substr('{max_datetime_value}', 1, length({column_name})) then substr(TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM CAST('{max_datetime_value}' AS datetime(6)))),1,length({column_name})) else case when {column_name} <= '{min_date_value} 00:00:00' then TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM CAST('{min_date_value} 00:00:00.000000' AS datetime(6)))) else TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM {column_name})) end end"
elif 'time' == data_type or 'time(1)' == data_type or 'time(2)' == data_type or 'time(3)' == data_type or 'time(4)' == data_type or 'time(5)' == data_type or 'time(6)' == data_type:
select += f"substr(cast({column_name} as time(6)),1,length({column_name}))"
elif 'timestamp' == data_type or 'timestamp(1)' == data_type or 'timestamp(2)' == data_type or 'timestamp(3)' == data_type or 'timestamp(4)' == data_type or 'timestamp(5)' == data_type or 'timestamp(6)' == data_type:
Expand Down Expand Up @@ -162,12 +163,12 @@ def select_table_statements(table, query, select_query, order_by, external_colum
coalesce(max(c),0) as c,
coalesce(max(d),0) as d
from (
select @md5sum :=md5( convert(concat_ws('#',{select_query}) using utf8mb4 )) as `hash`,
@a:=@a+cast(conv(substring(@md5sum, 1, 8), -16, 10) as signed) as a,
select @md5sum :=md5( convert(concat_ws('#',{select_query}) using utf8mb4 )) as `hash`,
@a:=@a+cast(conv(substring(@md5sum, 1, 8), -16, 10) as signed) as a,
@b:=@b+cast(conv(substring(@md5sum, 9, 8), -16, 10) as signed) as b,
@c:=@c+cast(conv(substring(@md5sum, 17, 8), -16, 10) as signed) as c,
@d:=@d+cast(conv(substring(@md5sum, 25, 8), -16, 10) as signed) as d
from {schema}.{table} where {where}
from {schema}.{table} where {where}
) as t;
""".format(select_query=select_query, schema=args.mysql_database, table=table, where=where, order_by=order_by, limit=limit)

Expand Down Expand Up @@ -270,6 +271,8 @@ def main():
'--min_date_value', help='Minimum Date32/DateTime64 date', default='1900-01-01', required=False)
parser.add_argument(
'--max_date_value', help='Maximum Date32/Datetime64 date', default='2299-12-31', required=False)
parser.add_argument(
'--max_datetime_value', help='Maximum Datetime64 datetime', default='2299-12-31 23:59:59', required=False)
parser.add_argument('--debug', dest='debug',
action='store_true', default=False)
parser.add_argument('--exclude_columns', help='columns exclude',
Expand Down

0 comments on commit 2620aec

Please sign in to comment.