Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 68 additions & 78 deletions plugins/clickhouse/api/jobs/EventDeduplicationJob.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,20 @@ const QueryHelpers = require('../QueryHelpers');
const ClusterManager = require('../managers/ClusterManager');
const countlyConfig = require('../../../../api/config');

const BATCH_SIZE = 1000;
const MAX_BATCHES = 100;
const DISCOVERY_LIMIT = 100000; // Max duplicate groups to discover per run
const MUTATION_BATCH_SIZE = 200; // IDs per DELETE statement (~40KB SQL)
const ANOMALY_THRESHOLD = 0.1;
const WINDOW_HOURS = 26;
const MAX_WINDOW_HOURS = 7 * 24; // 168 hours max lookback cap
const CHECKPOINT_ID = '_eventDeduplicationCheckpoint';

// Safety-net ClickHouse setting to prevent query size errors.
// http_max_field_value_size is a server-level setting (not query-overridable),
// but the single-pass design keeps query params small enough to not need it.
const CH_SETTINGS = {
max_query_size: 10 * 1024 * 1024
};

/**
* Get cluster-aware table configuration.
* Returns both the fully-resolved mutation table (with _local suffix) for SQL building,
Expand Down Expand Up @@ -104,8 +111,8 @@ class EventDeduplicationJob extends job.Job {
}

this.log.d("Config", {
BATCH_SIZE,
MAX_BATCHES,
DISCOVERY_LIMIT,
MUTATION_BATCH_SIZE,
ANOMALY_THRESHOLD,
WINDOW_HOURS,
MAX_WINDOW_HOURS,
Expand All @@ -117,12 +124,12 @@ class EventDeduplicationJob extends job.Job {
const { windowStart, windowEnd } = this.#getWindowBounds(checkpoint);
this.log.d("Window", { windowStart, windowEnd });

// ---- 2) Parallel: count total rows + discover first batch with excess count ----
// ---- 2) Single-pass: count total rows + discover ALL duplicates in parallel ----
await progress(0, 0, "Scanning window for rows and duplicates...");
const windowParams = { start: windowStart, end: windowEnd };
const [totalRows, firstBatch] = await Promise.all([
const [totalRows, discovery] = await Promise.all([
this.#getTotalRowsInWindow(queryService, tableConfig, windowParams),
this.#discoverDuplicatesWithExcess(queryService, tableConfig, windowParams, [])
this.#discoverDuplicates(queryService, tableConfig, windowParams)
]);

if (totalRows === 0) {
Expand All @@ -133,16 +140,22 @@ class EventDeduplicationJob extends job.Job {

this.log.i(`Total rows in window: ${totalRows.toLocaleString()}`);

const hitDiscoveryLimit = discovery.rows.length > DISCOVERY_LIMIT;
const allDuplicates = hitDiscoveryLimit ? discovery.rows.slice(0, DISCOVERY_LIMIT) : discovery.rows;
if (hitDiscoveryLimit) {
this.log.w(`Discovery limit reached (${DISCOVERY_LIMIT}), anomaly ratio may be approximate`);
}

// ---- 3) Anomaly check ----
if (firstBatch.totalExcess > 0) {
const dupRatio = firstBatch.totalExcess / totalRows;
this.log.i(`Duplicate rows: ${firstBatch.totalExcess} / ${totalRows} (${(dupRatio * 100).toFixed(2)}%)`);
if (discovery.totalExcess > 0) {
const dupRatio = discovery.totalExcess / totalRows;
this.log.i(`Duplicate rows: ${discovery.totalExcess} / ${totalRows} (${(dupRatio * 100).toFixed(2)}%)`);
if (dupRatio >= ANOMALY_THRESHOLD) {
const res = {
status: 'anomaly_abort',
message: `Duplicate ratio ${(dupRatio * 100).toFixed(2)}% exceeds threshold ${(ANOMALY_THRESHOLD * 100).toFixed(2)}%`,
totalRows,
totalDuplicateRows: firstBatch.totalExcess,
totalDuplicateRows: discovery.totalExcess,
duplicateRatio: dupRatio,
windowStart,
windowEnd,
Expand All @@ -153,66 +166,44 @@ class EventDeduplicationJob extends job.Job {
}
}

// ---- 4) Discovery + dispatch loop ----
let batchNum = 0;
let totalDuplicateIds = 0;
let totalDuplicateRows = 0;
let mutationsDispatched = 0;
const processedIds = [];
let duplicates = firstBatch.rows;

while (batchNum < MAX_BATCHES) {
await progress(MAX_BATCHES, batchNum, `Processing duplicates (batch ${batchNum + 1})...`);
if (allDuplicates.length === 0) {
const res = { status: 'completed', message: 'No duplicates found', totalRowsInWindow: totalRows, windowStart, windowEnd, durationMs: Date.now() - t0 };
this.log.i(res.message, res);
await this.#saveCheckpoint(db, windowEnd, res);
await progress(1, 1, "Complete");
return done(null, res);
}

if (duplicates.length === 0) {
this.log.i(`No more duplicates found after batch ${batchNum}`);
break;
}
// ---- 4) Chunk duplicates into small mutation batches and dispatch ----
const totalDuplicateIds = allDuplicates.length;
const totalDuplicateRows = allDuplicates.reduce((sum, d) => sum + (Number(d.cnt) - 1), 0);
const totalChunks = Math.ceil(totalDuplicateIds / MUTATION_BATCH_SIZE);
let mutationsDispatched = 0;

const batchDupRows = duplicates.reduce((sum, d) => sum + (Number(d.cnt) - 1), 0);
totalDuplicateIds += duplicates.length;
totalDuplicateRows += batchDupRows;
this.log.i(`Discovered ${totalDuplicateIds} duplicate _id groups (${totalDuplicateRows} extra rows), dispatching ${totalChunks} mutations`);

this.log.i(`Batch ${batchNum + 1}: ${duplicates.length} duplicate _id groups (${batchDupRows} extra rows)`);
for (let i = 0; i < totalDuplicateIds; i += MUTATION_BATCH_SIZE) {
const chunkIndex = Math.floor(i / MUTATION_BATCH_SIZE);
await progress(totalChunks, chunkIndex, `Dispatching mutation ${chunkIndex + 1}/${totalChunks}...`);

// Build DELETE SQL and dispatch through mutationManager (awaited to catch MongoDB errors)
const deleteSQL = this.#buildDeleteSQL(tableConfig, duplicates, windowStart, windowEnd);
const chunk = allDuplicates.slice(i, i + MUTATION_BATCH_SIZE);
const chunkDupRows = chunk.reduce((sum, d) => sum + (Number(d.cnt) - 1), 0);
const deleteSQL = this.#buildDeleteSQL(tableConfig, chunk, windowStart, windowEnd);

await plugins.dispatchAsPromise("/core/execute_native_ch_mutation", {
sql: deleteSQL,
db: tableConfig.dbName,
collection: tableConfig.baseTable,
metadata: { source: 'EventDeduplicationJob', batch: batchNum, idsCount: duplicates.length, rowsToDelete: batchDupRows }
metadata: { source: 'EventDeduplicationJob', batch: chunkIndex, idsCount: chunk.length, rowsToDelete: chunkDupRows }
});
mutationsDispatched++;

this.log.i(`Batch ${batchNum + 1} dispatched to mutationManager`);

// Track processed IDs to exclude from next discovery (passed as CH array param)
for (const d of duplicates) {
processedIds.push(String(d._id));
}
batchNum++;

// Stop if we've reached the batch limit
if (batchNum >= MAX_BATCHES) {
this.log.w(`Stopping after MAX_BATCHES=${MAX_BATCHES} — remaining duplicates may exist`);
break;
}

// Discover next batch
const nextBatch = await this.#discoverDuplicatesWithExcess(
queryService, tableConfig, windowParams, processedIds
);
duplicates = nextBatch.rows;
}

// ---- 5) Result ----
const maxBatchesReached = batchNum >= MAX_BATCHES;
const status = hitDiscoveryLimit ? 'discovery_limit_reached' : 'completed';
const result = {
status: maxBatchesReached ? 'max_batches_reached' : 'completed',
maxBatchesReached,
batchesProcessed: batchNum,
status,
hitDiscoveryLimit,
mutationsDispatched,
totalDuplicateIds,
totalDuplicateRows,
Expand All @@ -221,15 +212,20 @@ class EventDeduplicationJob extends job.Job {
windowStart,
windowEnd,
durationMs: Date.now() - t0,
config: { BATCH_SIZE, MAX_BATCHES, ANOMALY_THRESHOLD, WINDOW_HOURS, MAX_WINDOW_HOURS }
config: { DISCOVERY_LIMIT, MUTATION_BATCH_SIZE, ANOMALY_THRESHOLD, WINDOW_HOURS, MAX_WINDOW_HOURS }
};

// Persist checkpoint only on fully completed runs
if (result.status === 'completed') {
// Persist checkpoint only on fully completed runs — if the discovery limit
// was reached there may be remaining duplicates, so the next run must
// re-scan from the same starting point.
if (status === 'completed') {
await this.#saveCheckpoint(db, windowEnd, result);
}
else {
this.log.w(`Discovery limit reached (${DISCOVERY_LIMIT}) — checkpoint NOT advanced, next run will re-scan`);
}

await progress(MAX_BATCHES, MAX_BATCHES, "Complete");
await progress(totalChunks, totalChunks, "Complete");
this.log.i("Event deduplication job: complete", result);
return done(null, result);
}
Expand All @@ -250,7 +246,7 @@ class EventDeduplicationJob extends job.Job {
* @returns {Promise<object[]>} Parsed rows from JSONEachRow format
*/
async #queryJSON(queryService, query, queryParams) {
const opts = { query, format: 'JSONEachRow' };
const opts = { query, format: 'JSONEachRow', clickhouse_settings: CH_SETTINGS };
if (queryParams) {
opts.query_params = queryParams;
}
Expand Down Expand Up @@ -296,7 +292,8 @@ class EventDeduplicationJob extends job.Job {
$set: {
windowEnd,
completedAt: Date.now(),
batchesProcessed: result.batchesProcessed || 0,
batchesProcessed: result.mutationsDispatched || 0,
mutationsDispatched: result.mutationsDispatched || 0,
totalDuplicateRows: result.totalDuplicateRows || 0
}
},
Expand Down Expand Up @@ -354,21 +351,17 @@ class EventDeduplicationJob extends job.Job {
}

/**
* Discover duplicate _id groups and compute total excess row count in a single fused query.
* Uses a window function in the outer SELECT (where cnt is a materialized column) to compute
* Discover all duplicate _id groups in a single pass and compute total excess row count.
* Uses a window function in the outer SELECT to compute
* total_excess = sum(cnt) - count(*) across all duplicate groups.
* Previously-processed _id values are excluded via a parameterized Array(String) to avoid
* query size limits with large exclusion lists.
* Results are capped at DISCOVERY_LIMIT; if that limit is reached, the caller
* should NOT advance the checkpoint so the next run re-scans the same window.
* @param {object} queryService - ClickHouse query service instance
* @param {object} tableConfig - Table configuration from {@link getTableConfig}
* @param {{start: number, end: number}} windowParams - Window bounds as epoch milliseconds (UTC)
* @param {string[]} excludeIds - _id values already processed in prior batches
* @returns {Promise<{rows: object[], totalExcess: number}>} Duplicate groups (with _id, cnt, keep_ts, keep_cd) and total excess count
* @returns {Promise<{rows: object[], totalExcess: number}>} Duplicate groups (with _id, cnt, keep_ts_ms, keep_cd_ms) and total excess count
*/
async #discoverDuplicatesWithExcess(queryService, tableConfig, windowParams, excludeIds) {
const hasExclusions = excludeIds.length > 0;
const excludeClause = hasExclusions ? 'AND _id NOT IN {exclude_ids:Array(String)}' : '';

async #discoverDuplicates(queryService, tableConfig, windowParams) {
const q = `
SELECT
_id, cnt, keep_ts_ms, keep_cd_ms,
Expand All @@ -381,17 +374,14 @@ class EventDeduplicationJob extends job.Job {
toUnixTimestamp64Milli(argMin(cd, (ts, cd))) AS keep_cd_ms
FROM ${tableConfig.selectFull}
WHERE cd >= fromUnixTimestamp64Milli({start:Int64}, 'UTC') AND cd < fromUnixTimestamp64Milli({end:Int64}, 'UTC')
${excludeClause}
GROUP BY _id
HAVING cnt > 1
)
ORDER BY cnt DESC
LIMIT {batch_size:UInt32}
LIMIT {discovery_limit:UInt32}
`;
const params = { ...windowParams, batch_size: BATCH_SIZE };
if (hasExclusions) {
params.exclude_ids = excludeIds;
}
// Fetch one extra row to distinguish "exactly at limit" from "more exist"
const params = { ...windowParams, discovery_limit: DISCOVERY_LIMIT + 1 };
const rows = await this.#queryJSON(queryService, q, params);
const totalExcess = rows.length > 0 ? Number(rows[0].total_excess || 0) : 0;
return { rows, totalExcess };
Expand Down
Loading