From ad918d5682b90364e7b79427b98689f67093a900 Mon Sep 17 00:00:00 2001 From: Joy Gao <17896160+jgao54@users.noreply.github.com> Date: Tue, 19 Aug 2025 14:20:08 -1000 Subject: [PATCH 1/2] mongo: add a quick-start page for working with json --- .../clickpipes/mongodb/quickstart.md | 315 ++++++++++++++++++ sidebars.js | 1 + 2 files changed, 316 insertions(+) create mode 100644 docs/integrations/data-ingestion/clickpipes/mongodb/quickstart.md diff --git a/docs/integrations/data-ingestion/clickpipes/mongodb/quickstart.md b/docs/integrations/data-ingestion/clickpipes/mongodb/quickstart.md new file mode 100644 index 00000000000..cac0348acec --- /dev/null +++ b/docs/integrations/data-ingestion/clickpipes/mongodb/quickstart.md @@ -0,0 +1,315 @@ +--- +title: 'Working with JSON in ClickHouse' +sidebar_label: 'Working with JSON' +slug: /integrations/clickpipes/mongodb/quickstart +description: 'Common patterns for working with JSON data replicated from MongoDB to ClickHouse via ClickPipes' +--- + +# Working with JSON in ClickHouse + +This guide provides common patterns for working with JSON data replicated from MongoDB to ClickHouse via ClickPipes. + +Suppose we created a collection `t1` in MongoDB to track customer orders: + +```javascript +db.t1.insertOne({ + "order_id": "ORD-001234", + "customer_id": 98765, + "status": "completed", + "total_amount": 299.97, + "order_date": new Date(), + "shipping": { + "method": "express", + "city": "Seattle", + "cost": 19.99 + }, + "items": [ + { + "category": "electronics", + "price": 149.99 + }, + { + "category": "accessories", + "price": 24.99 + } + ] +}) +``` + +MongoDB CDC Connector replicates MongoDB documents to ClickHouse using the native JSON data type. The replicated table `t1` in ClickHouse will contain the following row: + +```shell +Row 1: +────── +_id: "68a4df4b9fe6c73b541703b0" +_full_document: {"_id":"68a4df4b9fe6c73b541703b0","customer_id":"98765","items":[{"category":"electronics","price":149.99},{"category":"accessories","price":24.99}],"order_date":"2025-08-19T20:32:11.705Z","order_id":"ORD-001234","shipping":{"city":"Seattle","cost":19.99,"method":"express"},"status":"completed","total_amount":299.97} +_peerdb_synced_at: 2025-08-19 20:50:42.005000000 +_peerdb_is_deleted: 0 +_peerdb_version: 0 +``` + +## Table schema {#table-schema} + +The replicated tables use this standard schema: + +```shell +┌─name───────────────┬─type──────────┐ +│ _id │ String │ +│ _full_document │ JSON │ +│ _peerdb_synced_at │ DateTime64(9) │ +│ _peerdb_version │ Int64 │ +│ _peerdb_is_deleted │ Int8 │ +└────────────────────┴───────────────┘ +``` + +- `_id`: Primary key from MongoDB +- `_full_document`: MongoDB document replicated as JSON data type +- `_peerdb_synced_at`: Records when the row was last synced +- `_peerdb_version`: Tracks the version of the row; incremented when the row is updated or deleted +- `_peerdb_is_deleted`: Marks whether the row is deleted + +### ReplacingMergeTree table engine {#replacingmergetree-table-engine} + +ClickPipes maps MongoDB collections into ClickHouse using the `ReplacingMergeTree` table engine family. With this engine, updates are modeled as inserts with a newer version (`_peerdb_version`) of the document for a given primary key (`_id`), enabling efficient handling of updates, replaces, and deletes as versioned inserts. + +`ReplacingMergeTree` clears out duplicates asynchronously in the background. To guarantee the absence of duplicates for the same row, use the [`FINAL` modifier](https://clickhouse.com/docs/sql-reference/statements/select/from#final-modifier). For example: + +```sql +SELECT * FROM t1 FINAL; +``` + +### Handling deletes {#handling-deletes} + +Deletes from MongoDB are propagated as new rows marked as deleted using the `_peerdb_is_deleted` column. You typically want to filter these out in your queries: + +```sql +SELECT * FROM t1 FINAL WHERE _peerdb_is_deleted = 0; +``` + +You can also create a row-level policy to automatically filter out deleted rows instead of specifying the filter in each query: + +```sql +CREATE ROW POLICY policy_name ON t1 +FOR SELECT USING _peerdb_is_deleted = 0; +``` + +## Querying JSON data {#querying-json-data} + +You can directly query JSON fields using dot syntax: + +```sql +SELECT + _full_document.order_id, + _full_document.shipping.method +FROM t1; +``` + +Result: +```shell +┌─_full_document.order_id─┬─_full_document.shipping.method─┐ +│ ORD-001234 │ express │ +└─────────────────────────┴────────────────────────────────┘ +``` + +### Dynamic type {#dynamic-type} + +In ClickHouse, each field in JSON has `Dynamic` type. Dynamic type allows ClickHouse to store values of any type without knowing the type in advance. You can verify this with the `toTypeName` function: + +```sql +SELECT toTypeName(_full_document.customer_id) AS type FROM t1; +``` + +Result: +```shell +┌─type────┐ +│ Dynamic │ +└─────────┘ +``` + +To examine the underlying data type(s) for a field, you can check with the `dynamicType` function. Note that it's possible to have different data types for the same field name in different rows: + +```sql +SELECT dynamicType(_full_document.customer_id) AS type FROM t1; +``` + +Result: +```shell +┌─type──┐ +│ Int64 │ +└───────┘ +``` + +[Regular functions](https://clickhouse.com/docs/sql-reference/functions/regular-functions) work for dynamic type just like they do for regular columns: + +**Example 1: Date parsing** +```sql +SELECT parseDateTimeBestEffortOrNull(_full_document.order_date) AS order_date FROM t1; +``` + +Result: +```shell +┌─order_date──────────┐ +│ 2025-08-19 20:32:11 │ +└─────────────────────┘ +``` + +**Example 2: Conditional logic** +```sql +SELECT multiIf( + _full_document.total_amount < 100, 'less_than_100', + _full_document.total_amount < 1000, 'less_than_1000', + '1000+') AS spendings +FROM t1; +``` + +Result: +```shell +┌─spendings──────┐ +│ less_than_1000 │ +└────────────────┘ +``` + +**Example 3: Array operations** +```sql +SELECT length(_full_document.items) AS item_count FROM t1; +``` + +Result: +```shell +┌─item_count─┐ +│ 2 │ +└────────────┘ +``` + +### Field casting {#field-casting} + +[Aggregation functions](https://clickhouse.com/docs/sql-reference/aggregate-functions/combinators) in ClickHouse don't work with dynamic type directly. For example, if you attempt to directly use the `sum` function on a dynamic type, you get the following error: + +```sql +SELECT sum(_full_document.shipping.cost) AS shipping_cost FROM t1; +-- DB::Exception: Illegal type Dynamic of argument for aggregate function sum. (ILLEGAL_TYPE_OF_ARGUMENT) +``` + +To use aggregation functions, cast the field to the appropriate type with the `CAST` function or `::` syntax: + +```sql +SELECT sum(_full_document.shipping.cost::Float32) AS shipping_cost FROM t1; +``` + +Result: +```shell +┌─shipping_cost─┐ +│ 19.99 │ +└───────────────┘ +``` + +:::note +Casting from dynamic type to the underlying data type (determined by `dynamicType`) is very performant, as ClickHouse already stores the value in its underlying type internally. +::: + +## Flattening JSON {#flattening-json} + +### Normal view {#normal-view} + +You can create normal views on top of the JSON table to encapsulate flattening/casting/transformation logic in order to query data similar to a relational table. Normal views are lightweight as they only store the query itself, not the underlying data. For example: + +```sql +CREATE VIEW v1 AS +SELECT + CAST(_full_document._id, 'String') AS object_id, + CAST(_full_document.order_id, 'String') AS order_id, + CAST(_full_document.customer_id, 'Int64') AS customer_id, + CAST(_full_document.status, 'String') AS status, + CAST(_full_document.total_amount, 'Decimal64(2)') AS total_amount, + CAST(parseDateTime64BestEffortOrNull(_full_document.order_date, 3), 'DATETIME(3)') AS order_date, + CAST(_full_document.shipping.method, 'String') AS shipping_method, + CAST(_full_document.shipping.city, 'String') AS shipping_city, + CAST(_full_document.shipping.cost, 'Decimal64(2)') AS shipping_cost, + _full_document.items AS items +FROM t1 FINAL +WHERE _peerdb_is_deleted = 0; +``` + +This view will have the following schema: + +```shell +┌─name────────────┬─type───────────┐ +│ object_id │ String │ +│ order_id │ String │ +│ customer_id │ Int64 │ +│ status │ String │ +│ total_amount │ Decimal(18, 2) │ +│ order_date │ DateTime64(3) │ +│ shipping_method │ String │ +│ shipping_city │ String │ +│ shipping_cost │ Decimal(18, 2) │ +│ items │ Dynamic │ +└─────────────────┴────────────────┘ +``` + +You can now query the view similar to how you would query a flattened table: + +```sql +SELECT + customer_id, + sum(total_amount) +FROM v1 +WHERE shipping_city = 'Seattle' +GROUP BY customer_id +ORDER BY customer_id DESC +LIMIT 10; +``` + +### Refreshable materialized view {#refreshable-materialized-view} + +You can also create [Refreshable Materialized Views](https://clickhouse.com/docs/materialized-view/refreshable-materialized-view), which enable you to schedule query execution for deduplicating rows and storing the results in a flattened destination table. With each scheduled refresh, the destination table is replaced with the latest query results. + +The key advantage of this method is that the query using the `FINAL` keyword runs only once during the refresh, eliminating the need for subsequent queries on the destination table to use `FINAL`. + +However, a drawback is that the data in the destination table is only as up-to-date as the most recent refresh. For many use cases, refresh intervals ranging from several minutes to a few hours provide a good balance between data freshness and query performance. + +```sql +CREATE TABLE flattened_t1 ( + `_id` String, + `order_id` String, + `customer_id` Int64, + `status` String, + `total_amount` Decimal(18, 2), + `order_date` DateTime64(3), + `shipping_method` String, + `shipping_city` String, + `shipping_cost` Decimal(18, 2), + `items` Dynamic +) +ENGINE = ReplacingMergeTree() +PRIMARY KEY _id +ORDER BY _id; + +CREATE MATERIALIZED VIEW mv1 REFRESH EVERY 1 HOUR TO flattened_t1 AS +SELECT + CAST(_full_document._id, 'String') AS _id, + CAST(_full_document.order_id, 'String') AS order_id, + CAST(_full_document.customer_id, 'Int64') AS customer_id, + CAST(_full_document.status, 'String') AS status, + CAST(_full_document.total_amount, 'Decimal64(2)') AS total_amount, + CAST(parseDateTime64BestEffortOrNull(_full_document.order_date, 3), 'DATETIME(3)') AS order_date, + CAST(_full_document.shipping.method, 'String') AS shipping_method, + CAST(_full_document.shipping.city, 'String') AS shipping_city, + CAST(_full_document.shipping.cost, 'Decimal64(2)') AS shipping_cost, + _full_document.items AS items +FROM t1 FINAL +WHERE _peerdb_is_deleted = 0; +``` + +You can now query the table `flattened_t1` directly without the `FINAL` modifier: + +```sql +SELECT + customer_id, + sum(total_amount) +FROM flattened_t1 +WHERE shipping_city = 'Seattle' +GROUP BY customer_id +ORDER BY customer_id DESC +LIMIT 10; +``` diff --git a/sidebars.js b/sidebars.js index 187de23ad80..cccee0c2e5a 100644 --- a/sidebars.js +++ b/sidebars.js @@ -810,6 +810,7 @@ const sidebars = { items: [ "integrations/data-ingestion/clickpipes/mongodb/index", "integrations/data-ingestion/clickpipes/mongodb/datatypes", + "integrations/data-ingestion/clickpipes/mongodb/quickstart", { type: "category", label: "Operations", From f2e35e1a5bbae0270aa1f511c49e9029bf802358 Mon Sep 17 00:00:00 2001 From: Shaun Struwig <41984034+Blargian@users.noreply.github.com> Date: Wed, 20 Aug 2025 11:09:54 +0200 Subject: [PATCH 2/2] add title parameter to code blocks for nicer rendering of query and result blocks --- .../clickpipes/mongodb/quickstart.md | 38 +++++++++---------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/docs/integrations/data-ingestion/clickpipes/mongodb/quickstart.md b/docs/integrations/data-ingestion/clickpipes/mongodb/quickstart.md index cac0348acec..acc3cda5254 100644 --- a/docs/integrations/data-ingestion/clickpipes/mongodb/quickstart.md +++ b/docs/integrations/data-ingestion/clickpipes/mongodb/quickstart.md @@ -97,15 +97,14 @@ FOR SELECT USING _peerdb_is_deleted = 0; You can directly query JSON fields using dot syntax: -```sql +```sql title="Query" SELECT _full_document.order_id, _full_document.shipping.method FROM t1; ``` -Result: -```shell +```shell title="Result" ┌─_full_document.order_id─┬─_full_document.shipping.method─┐ │ ORD-001234 │ express │ └─────────────────────────┴────────────────────────────────┘ @@ -115,12 +114,11 @@ Result: In ClickHouse, each field in JSON has `Dynamic` type. Dynamic type allows ClickHouse to store values of any type without knowing the type in advance. You can verify this with the `toTypeName` function: -```sql +```sql title="Query" SELECT toTypeName(_full_document.customer_id) AS type FROM t1; ``` -Result: -```shell +```shell title="Result" ┌─type────┐ │ Dynamic │ └─────────┘ @@ -128,12 +126,11 @@ Result: To examine the underlying data type(s) for a field, you can check with the `dynamicType` function. Note that it's possible to have different data types for the same field name in different rows: -```sql +```sql title="Query" SELECT dynamicType(_full_document.customer_id) AS type FROM t1; ``` -Result: -```shell +```shell title="Result" ┌─type──┐ │ Int64 │ └───────┘ @@ -142,19 +139,20 @@ Result: [Regular functions](https://clickhouse.com/docs/sql-reference/functions/regular-functions) work for dynamic type just like they do for regular columns: **Example 1: Date parsing** -```sql + +```sql title="Query" SELECT parseDateTimeBestEffortOrNull(_full_document.order_date) AS order_date FROM t1; ``` -Result: -```shell +```shell title="Result" ┌─order_date──────────┐ │ 2025-08-19 20:32:11 │ └─────────────────────┘ ``` **Example 2: Conditional logic** -```sql + +```sql title="Query" SELECT multiIf( _full_document.total_amount < 100, 'less_than_100', _full_document.total_amount < 1000, 'less_than_1000', @@ -162,20 +160,19 @@ SELECT multiIf( FROM t1; ``` -Result: -```shell +```shell title="Result" ┌─spendings──────┐ │ less_than_1000 │ └────────────────┘ ``` **Example 3: Array operations** -```sql + +```sql title="Query" SELECT length(_full_document.items) AS item_count FROM t1; ``` -Result: -```shell +```shell title="Result" ┌─item_count─┐ │ 2 │ └────────────┘ @@ -192,12 +189,11 @@ SELECT sum(_full_document.shipping.cost) AS shipping_cost FROM t1; To use aggregation functions, cast the field to the appropriate type with the `CAST` function or `::` syntax: -```sql +```sql title="Query" SELECT sum(_full_document.shipping.cost::Float32) AS shipping_cost FROM t1; ``` -Result: -```shell +```shell title="Result" ┌─shipping_cost─┐ │ 19.99 │ └───────────────┘