Skip to content
This repository was archived by the owner on May 17, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
matrix:
os: [ubuntu-latest]
python-version:
- "3.7.1"
- "3.7"
- "3.8"
- "3.9"
- "3.10"
Expand All @@ -34,7 +34,7 @@ jobs:
python-version: ${{ matrix.python-version }}

- name: Build the stack
run: docker-compose up -d mysql postgres presto trino
run: docker-compose up -d mysql postgres presto trino clickhouse

- name: Install Poetry
run: pip install poetry
Expand All @@ -47,4 +47,5 @@ jobs:
DATADIFF_SNOWFLAKE_URI: '${{ secrets.DATADIFF_SNOWFLAKE_URI }}'
DATADIFF_PRESTO_URI: '${{ secrets.DATADIFF_PRESTO_URI }}'
DATADIFF_TRINO_URI: '${{ secrets.DATADIFF_TRINO_URI }}'
DATADIFF_CLICKHOUSE_URI: 'clickhouse://clickhouse:Password1@localhost:9000/clickhouse'
run: poetry run unittest-parallel -j 16
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ $ data-diff \

| Database | Connection string | Status |
|---------------|-------------------------------------------------------------------------------------------------------------------------------------|--------|
| PostgreSQL >=10 | `postgresql://<user>:<password>@<host>:5432/<database>` | 💚 |
| PostgreSQL >=10 | `postgresql://<user>:<password>@<host>:5432/<database>` | 💚 |
| MySQL | `mysql://<user>:<password>@<hostname>:5432/<database>` | 💚 |
| Snowflake | `"snowflake://<user>[:<password>]@<account>/<database>/<SCHEMA>?warehouse=<WAREHOUSE>&role=<role>[&authenticator=externalbrowser]"` | 💚 |
| Oracle | `oracle://<username>:<password>@<hostname>/database` | 💛 |
Expand All @@ -128,9 +128,9 @@ $ data-diff \
| Presto | `presto://<username>:<password>@<hostname>:8080/<database>` | 💛 |
| Databricks | `databricks://<http_path>:<access_token>@<server_hostname>/<catalog>/<schema>` | 💛 |
| Trino | `trino://<username>:<password>@<hostname>:8080/<database>` | 💛 |
| Clickhouse | `clickhouse://<username>:<password>@<hostname>:9000/<database>` | 💛 |
| ElasticSearch | | 📝 |
| Planetscale | | 📝 |
| Clickhouse | | 📝 |
| Pinot | | 📝 |
| Druid | | 📝 |
| Kafka | | 📝 |
Expand Down Expand Up @@ -171,6 +171,8 @@ While you may install them manually, we offer an easy way to install them along

- `pip install 'data-diff[trino]'`

- `pip install 'data-diff[clickhouse]'`

- For BigQuery, see: https://pypi.org/project/google-cloud-bigquery/


Expand Down
1 change: 1 addition & 0 deletions data_diff/databases/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@
from .presto import Presto
from .databricks import Databricks
from .trino import Trino
from .clickhouse import Clickhouse

from .connect import connect_to_uri
152 changes: 152 additions & 0 deletions data_diff/databases/clickhouse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
from typing import Optional, Type

from .base import (
MD5_HEXDIGITS,
CHECKSUM_HEXDIGITS,
TIMESTAMP_PRECISION_POS,
ThreadedDatabase,
import_helper,
ConnectError,
)
from .database_types import ColType, Decimal, Float, Integer, FractionalType, Native_UUID, TemporalType, Text, Timestamp


@import_helper("clickhouse")
def import_clickhouse():
import clickhouse_driver

return clickhouse_driver


class Clickhouse(ThreadedDatabase):
TYPE_CLASSES = {
"Int8": Integer,
"Int16": Integer,
"Int32": Integer,
"Int64": Integer,
"Int128": Integer,
"Int256": Integer,
"UInt8": Integer,
"UInt16": Integer,
"UInt32": Integer,
"UInt64": Integer,
"UInt128": Integer,
"UInt256": Integer,
"Float32": Float,
"Float64": Float,
"Decimal": Decimal,
"UUID": Native_UUID,
"String": Text,
"FixedString": Text,
"DateTime": Timestamp,
"DateTime64": Timestamp,
}
ROUNDS_ON_PREC_LOSS = False

def __init__(self, *, thread_count: int, **kw):
super().__init__(thread_count=thread_count)

self._args = kw
# In Clickhouse database and schema are the same
self.default_schema = kw["database"]

def create_connection(self):
clickhouse = import_clickhouse()

class SingleConnection(clickhouse.dbapi.connection.Connection):
"""Not thread-safe connection to Clickhouse"""

def cursor(self, cursor_factory=None):
if not len(self.cursors):
_ = super().cursor()
return self.cursors[0]

try:
return SingleConnection(**self._args)
except clickhouse.OperationError as e:
raise ConnectError(*e.args) from e

def _parse_type_repr(self, type_repr: str) -> Optional[Type[ColType]]:
nullable_prefix = "Nullable("
if type_repr.startswith(nullable_prefix):
type_repr = type_repr[len(nullable_prefix):].rstrip(")")

if type_repr.startswith("Decimal"):
type_repr = "Decimal"
elif type_repr.startswith("FixedString"):
type_repr = "FixedString"
elif type_repr.startswith("DateTime64"):
type_repr = "DateTime64"

return self.TYPE_CLASSES.get(type_repr)

def quote(self, s: str) -> str:
return f'"{s}"'

def md5_to_int(self, s: str) -> str:
substr_idx = 1 + MD5_HEXDIGITS - CHECKSUM_HEXDIGITS
return f"reinterpretAsUInt128(reverse(unhex(lowerUTF8(substr(hex(MD5({s})), {substr_idx})))))"

def to_string(self, s: str) -> str:
return f"toString({s})"

def normalize_timestamp(self, value: str, coltype: TemporalType) -> str:
prec= coltype.precision
if coltype.rounds:
timestamp = f"toDateTime64(round(toUnixTimestamp64Micro(toDateTime64({value}, 6)) / 1000000, {prec}), 6)"
return self.to_string(timestamp)

fractional = f"toUnixTimestamp64Micro(toDateTime64({value}, {prec})) % 1000000"
fractional = f"lpad({self.to_string(fractional)}, 6, '0')"
value = f"formatDateTime({value}, '%Y-%m-%d %H:%M:%S') || '.' || {self.to_string(fractional)}"
return f"rpad({value}, {TIMESTAMP_PRECISION_POS + 6}, '0')"

def _convert_db_precision_to_digits(self, p: int) -> int:
# Done the same as for PostgreSQL but need to rewrite in another way
# because it does not help for float with a big integer part.
return super()._convert_db_precision_to_digits(p) - 2

def normalize_number(self, value: str, coltype: FractionalType) -> str:
# If a decimal value has trailing zeros in a fractional part, when casting to string they are dropped.
# For example:
# select toString(toDecimal128(1.10, 2)); -- the result is 1.1
# select toString(toDecimal128(1.00, 2)); -- the result is 1
# So, we should use some custom approach to save these trailing zeros.
# To avoid it, we can add a small value like 0.000001 to prevent dropping of zeros from the end when casting.
# For examples above it looks like:
# select toString(toDecimal128(1.10, 2 + 1) + toDecimal128(0.001, 3)); -- the result is 1.101
# After that, cut an extra symbol from the string, i.e. 1.101 -> 1.10
# So, the algorithm is:
# 1. Cast to decimal with precision + 1
# 2. Add a small value 10^(-precision-1)
# 3. Cast the result to string
# 4. Drop the extra digit from the string. To do that, we need to slice the string
# with length = digits in an integer part + 1 (symbol of ".") + precision

if coltype.precision == 0:
return self.to_string(f"round({value})")

precision = coltype.precision
# TODO: too complex, is there better performance way?
value = f"""
if({value} >= 0, '', '-') || left(
toString(
toDecimal128(
round(abs({value}), {precision}),
{precision} + 1
)
+
toDecimal128(
exp10(-{precision + 1}),
{precision} + 1
)
),
toUInt8(
greatest(
floor(log10(abs({value}))) + 1,
1
)
) + 1 + {precision}
)
"""
return value
3 changes: 3 additions & 0 deletions data_diff/databases/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from .presto import Presto
from .databricks import Databricks
from .trino import Trino
from .clickhouse import Clickhouse


@dataclass
Expand Down Expand Up @@ -85,6 +86,7 @@ def match_path(self, dsn):
help_str="databricks://:access_token@server_name/http_path",
),
"trino": MatchUriPath(Trino, ["catalog", "schema"], help_str="trino://<user>@<host>/<catalog>/<schema>"),
"clickhouse": MatchUriPath(Clickhouse, ["database?"], help_str="clickhouse://<user>:<pass>@<host>/<database>"),
}


Expand All @@ -110,6 +112,7 @@ def connect_to_uri(db_uri: str, thread_count: Optional[int] = 1) -> Database:
- presto
- databricks
- trino
- clickhouse
"""

dsn = dsnparse.parse(db_uri)
Expand Down
2 changes: 1 addition & 1 deletion data_diff/table_segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ def count_and_checksum(self) -> Tuple[int, int]:

if count:
assert checksum, (count, checksum)
return count or 0, checksum if checksum is None else int(checksum)
return count or 0, int(checksum) if count else None

def query_key_range(self) -> Tuple[int, int]:
"""Query database for minimum and maximum key. This is used for setting the initial bounds."""
Expand Down
7 changes: 6 additions & 1 deletion dev/dev.env
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,9 @@ POSTGRES_DB=postgres
MYSQL_DATABASE=mysql
MYSQL_USER=mysql
MYSQL_PASSWORD=Password1
MYSQL_ROOT_PASSWORD=RootPassword1
MYSQL_ROOT_PASSWORD=RootPassword1

CLICKHOUSE_USER=clickhouse
CLICKHOUSE_PASSWORD=Password1
CLICKHOUSE_DB=clickhouse
CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=1
24 changes: 24 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,29 @@ services:
networks:
- local

clickhouse:
container_name: clickhouse
image: clickhouse/clickhouse-server:21.12.3.32
restart: always
volumes:
- clickhouse-data:/var/lib/clickhouse:delegated
ulimits:
nproc: 65535
nofile:
soft: 262144
hard: 262144
ports:
- '8123:8123'
- '9000:9000'
expose:
- '8123'
- '9000'
env_file:
- dev/dev.env
tty: true
networks:
- local

# prestodb.dbapi.connect(host="127.0.0.1", user="presto").cursor().execute('SELECT * FROM system.runtime.nodes')
presto:
build:
Expand All @@ -77,6 +100,7 @@ services:
volumes:
postgresql-data:
mysql-data:
clickhouse-data:

networks:
local:
Expand Down
Loading