Skip to content
70 changes: 68 additions & 2 deletions adminforth/dataConnectors/clickhouse.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { IAdminForthDataSourceConnector, IAdminForthSingleFilter, IAdminForthAndOrFilter, AdminForthResource, AdminForthResourceColumn } from '../types/Back.js';
import { IAdminForthDataSourceConnector, IAdminForthSingleFilter, IAdminForthAndOrFilter, AdminForthResource, AdminForthResourceColumn, IAggregationRule, IGroupByRule, IGroupByDateTrunc, IGroupByField } from '../types/Back.js';
import AdminForthBaseConnector from './baseConnector.js';
import dayjs from 'dayjs';
import { createClient } from '@clickhouse/client'
Expand Down Expand Up @@ -444,13 +444,79 @@ class ClickhouseConnector extends AdminForthBaseConnector implements IAdminForth
return { where, params };
}

async getAggregateWithOriginalTypes({ resource, filters, aggregations, groupBy }: {
resource: AdminForthResource;
filters: IAdminForthAndOrFilter;
aggregations: { [alias: string]: IAggregationRule };
groupBy?: IGroupByRule;
}): Promise <Array<{ group?: string, [key: string]: any }>> {

const tableName = `${this.dbName}.${resource.table}`;

const selectParts: string[] = [];
let groupExpr: string | null = null;

if (groupBy?.type === 'date_trunc') {
const g = groupBy as IGroupByDateTrunc;
const tz = g.timezone ?? 'UTC';

const field = `toTimeZone(${g.field}, '${tz}')`;

switch (g.truncation) {
case 'day': groupExpr = `toDate(toStartOfDay(${field}))`; break;
case 'month': groupExpr = `toDate(toStartOfMonth(${field}))`; break;
case 'week': groupExpr = `toDate(toStartOfWeek(${field}))`; break;
case 'year': groupExpr = `toDate(toStartOfYear(${field}))`; break;
}

selectParts.push(`${groupExpr} AS \`group\``);

} else if (groupBy?.type === 'field') {
const g = groupBy as IGroupByField;
groupExpr = `${g.field}`;
selectParts.push(`${groupExpr} AS \`group\``);
}

for (const [alias, rule] of Object.entries(aggregations)) {
switch (rule.operation) {
case 'count': selectParts.push(`count() AS \`${alias}\``); break;
case 'sum': selectParts.push(`sum(${rule.field}) AS \`${alias}\``); break;
case 'avg': selectParts.push(`avg(${rule.field}) AS \`${alias}\``); break;
case 'min': selectParts.push(`min(${rule.field}) AS \`${alias}\``); break;
case 'max': selectParts.push(`max(${rule.field}) AS \`${alias}\``); break;
case 'median': selectParts.push(`quantile(0.5)(${rule.field}) AS \`${alias}\``); break;
Comment thread
kulikp1 marked this conversation as resolved.
}
}

const { where, params } = this.whereClause(resource, filters);

let query = `SELECT ${selectParts.join(', ')} FROM ${tableName} ${where}`;

if (groupExpr) {
query += ` GROUP BY ${groupExpr} ORDER BY ${groupExpr} ASC`;
}

const result = await this.client.query({
query,
format: 'JSONEachRow',
query_params: params,
});

const rows = await result.json();

return rows.map((r: any) => ({
group: r.group,
...r,
}));
Comment thread
kulikp1 marked this conversation as resolved.
}

async getDataWithOriginalTypes({ resource, limit, offset, sort, filters }: {
resource: AdminForthResource,
limit: number,
offset: number,
sort: { field: string, direction: AdminForthSortDirections }[],
filters: IAdminForthAndOrFilter,
}): Promise<any[]> {
}): Promise<Array<{ group?: string, [key: string]: any }>> {
Comment thread
kulikp1 marked this conversation as resolved.
const columns = resource.dataSourceColumns.map((col) => {
Comment thread
kulikp1 marked this conversation as resolved.
// for decimal cast to string
if (col.type == AdminForthDataTypes.DECIMAL) {
Expand Down
104 changes: 103 additions & 1 deletion adminforth/dataConnectors/mongo.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import dayjs from 'dayjs';
import { MongoClient } from 'mongodb';
import { Decimal128, Double } from 'bson';
import { IAdminForthDataSourceConnector, IAdminForthSingleFilter, IAdminForthAndOrFilter, AdminForthResource } from '../types/Back.js';
import { IAdminForthDataSourceConnector, IAdminForthSingleFilter, IAdminForthAndOrFilter, AdminForthResource, IAggregationRule, IGroupByRule, IGroupByDateTrunc, IGroupByField } from '../types/Back.js';
import AdminForthBaseConnector from './baseConnector.js';
import { afLogger } from '../modules/logger.js';
import { AdminForthDataTypes, AdminForthFilterOperators, AdminForthSortDirections, } from '../types/Common.js';
Expand Down Expand Up @@ -305,6 +305,108 @@ class MongoConnector extends AdminForthBaseConnector implements IAdminForthDataS
.filter((f) => (f as IAdminForthSingleFilter).insecureRawSQL === undefined)
.map((f) => this.getFilterQuery(resource, f)));
}

async getAggregateWithOriginalTypes({ resource, filters, aggregations, groupBy }: {
resource: AdminForthResource;
filters: IAdminForthAndOrFilter;
aggregations: { [alias: string]: IAggregationRule };
groupBy?: IGroupByRule;
}): Promise<Array<{ group?: string, [key: string]: any }>> {
Comment thread
kulikp1 marked this conversation as resolved.

const collection = this.client.db().collection(resource.table);

const match = filters?.subFilters?.length ? this.getFilterQuery(resource, filters) : {};

let groupId: any = null;

if (groupBy?.type === 'field') {
const g = groupBy as IGroupByField;
groupId = `$${g.field}`;
}

if (groupBy?.type === 'date_trunc') {
const g = groupBy as IGroupByDateTrunc;
const tz = g.timezone ?? 'UTC';
const dateTruncSpec: any = {
date: `$${g.field}`,
unit: g.truncation,
timezone: tz,
};
if (g.truncation === 'week') {
dateTruncSpec.startOfWeek = 'Mon';
}
groupId = { $dateTrunc: dateTruncSpec };
}

const groupStage: Record<string, any> = {
_id: groupId,
};

for (const [alias, rule] of Object.entries(aggregations)) {
switch (rule.operation) {
case 'count': groupStage[alias] = { $sum: 1 }; break;
case 'sum': groupStage[alias] = { $sum: { $toDouble: `$${rule.field}` } }; break;
case 'avg': groupStage[alias] = { $avg: { $toDouble: `$${rule.field}` } }; break;
case 'min': groupStage[alias] = { $min: { $toDouble: `$${rule.field}` } }; break;
case 'max': groupStage[alias] = { $max: { $toDouble: `$${rule.field}` } }; break;
case 'median': groupStage[alias] = { $push: { $toDouble: `$${rule.field}` } }; break;
Copy link

Copilot AI Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mongo median implementation uses $group + $push to accumulate all values for the field into an array per group, then sorts to compute the median. This can easily exceed MongoDB's 16MB document limit for a group and/or the aggregation memory limits on large datasets, causing the query to fail or become unstable. Please compute the median fully inside MongoDB without materializing the entire value list (e.g., using a server-side median/percentile operator when available, or a window-function based approach), or otherwise implement a bounded/approximate median strategy.

Suggested change
case 'median': groupStage[alias] = { $push: { $toDouble: `$${rule.field}` } }; break;
case 'median': groupStage[alias] = { $median: { input: { $toDouble: `$${rule.field}` }, method: 'approximate' } }; break;

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is important

}
}

const pipeline: any[] = [];

if (Object.keys(match).length) {
pipeline.push({ $match: match });
}

pipeline.push({ $group: groupStage });

pipeline.push({
$project: {
_id: 0,
group: !groupBy ? "$$REMOVE" : (groupBy.type === 'date_trunc' ? {
$cond: {
if: { $eq: [{ $type: "$_id" }, "date"] },
then: {
$dateToString: {
format: "%Y-%m-%d",
date: "$_id",
timezone: (groupBy as IGroupByDateTrunc).timezone ?? 'UTC'
}
},
else: "$_id"
}
} : "$_id"),
...Object.fromEntries(
Object.keys(groupStage)
.filter(k => k !== '_id')
.map(k => [k, `$${k}`])
),
},
});

const calculateMedian = (arr: any[]) => {
if (!Array.isArray(arr) || arr.length === 0) return null;
const sorted = [...arr].sort((a, b) => a - b);
const mid = Math.floor(sorted.length / 2);
return sorted.length % 2 === 0
? (sorted[mid - 1] + sorted[mid]) / 2
: sorted[mid];
};

const result = await collection.aggregate(pipeline).toArray();

const medianAliases = Object.keys(aggregations).filter(
alias => aggregations[alias].operation === 'median'
);

return result.map(row => {
medianAliases.forEach(alias => {
row[alias] = calculateMedian(row[alias]);
});
return row;
});
}

async getDataWithOriginalTypes({ resource, limit, offset, sort, filters }:
{
Expand Down
125 changes: 120 additions & 5 deletions adminforth/dataConnectors/mysql.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import dayjs from 'dayjs';
import { AdminForthResource, IAdminForthSingleFilter, IAdminForthAndOrFilter, IAdminForthDataSourceConnector, AdminForthConfig } from '../types/Back.js';
import { AdminForthResource, IAdminForthSingleFilter, IAdminForthAndOrFilter, IAdminForthDataSourceConnector, AdminForthConfig, IAggregationRule, IGroupByRule, IGroupByDateTrunc, IGroupByField } from '../types/Back.js';
import { AdminForthDataTypes, AdminForthFilterOperators, AdminForthSortDirections, } from '../types/Common.js';
import AdminForthBaseConnector from './baseConnector.js';
import mysql from 'mysql2/promise';
Expand Down Expand Up @@ -338,13 +338,128 @@ class MysqlConnector extends AdminForthBaseConnector implements IAdminForthDataS
} : { sql: '', values: [] };
}

async getAggregateWithOriginalTypes({ resource, filters, aggregations, groupBy }: {
resource: AdminForthResource;
filters: IAdminForthAndOrFilter;
aggregations: { [alias: string]: IAggregationRule };
groupBy?: IGroupByRule;
}): Promise<Array<{ group?: string, [key: string]: any }>> {
const tableName = resource.table;
const selectParts: string[] = [];
const medianFields: { alias: string; field: string }[] = [];
let groupExpr: string | null = null;

if (groupBy?.type === 'field') {
groupExpr = `\`${groupBy.field}\``;
selectParts.push(`${groupExpr} AS \`group\``);
} else if (groupBy?.type === 'date_trunc') {
const g = groupBy as IGroupByDateTrunc;
const tz = g.timezone ?? 'UTC';
if (!/^[A-Za-z0-9/_+\-]+$/.test(tz)) {
throw new Error(`Invalid timezone value: ${tz}`);
}
const innerExpr = `COALESCE(CONVERT_TZ(\`${g.field}\`, 'UTC', '${tz}'), \`${g.field}\`)`;
switch (g.truncation) {
case 'day': groupExpr = `DATE_FORMAT(${innerExpr}, '%Y-%m-%d')`; break;
case 'month': groupExpr = `DATE_FORMAT(${innerExpr}, '%Y-%m-01')`; break;
case 'year': groupExpr = `DATE_FORMAT(${innerExpr}, '%Y-01-01')`; break;
case 'week': groupExpr = `DATE_FORMAT(DATE_SUB(${innerExpr}, INTERVAL WEEKDAY(${innerExpr}) DAY), '%Y-%m-%d')`; break;
}
selectParts.push(`${groupExpr} AS \`group\``);
}

for (const [alias, rule] of Object.entries(aggregations)) {
const f = `\`${rule.field}\``;
switch (rule.operation) {
case 'sum': selectParts.push(`SUM(${f}) AS \`${alias}\``); break;
case 'count': selectParts.push(`COUNT(*) AS \`${alias}\``); break;
case 'avg': selectParts.push(`AVG(${f}) AS \`${alias}\``); break;
case 'min': selectParts.push(`MIN(${f}) AS \`${alias}\``); break;
case 'max': selectParts.push(`MAX(${f}) AS \`${alias}\``); break;
case 'median': medianFields.push({ alias, field: rule.field }); break;
}
}

const { sql: where, values: filterValues } = this.whereClauseAndValues(filters);

type AggRow = { group?: string } & Record<string, number | string | null>;

// Run non-median aggregations
let rows: AggRow[] = [];
const hasNonMedian = selectParts.length > (groupExpr ? 1 : 0);
if (hasNonMedian) {
let query = `SELECT ${selectParts.join(', ')} FROM \`${tableName}\` ${where}`;
if (groupExpr) query += ` GROUP BY ${groupExpr} ORDER BY ${groupExpr} ASC`;
dbLogger.trace(`🪲📜 MySQL AGG Q: ${query} values: ${JSON.stringify(filterValues)}`);
const [result] = await this.client.execute(query, filterValues);
rows = result as AggRow[];
}

// Run each median via window functions (MySQL 8+) — no session variables, no memory pressure
for (const { alias, field } of medianFields) {
const f = `\`${field}\``;
const nullGuard = where ? `${where} AND ${f} IS NOT NULL` : `WHERE ${f} IS NOT NULL`;

let medianQuery: string;
if (groupExpr) {
medianQuery = `
SELECT \`group\`, AVG(${f}) AS \`${alias}\`
FROM (
SELECT ${groupExpr} AS \`group\`, ${f},
ROW_NUMBER() OVER (PARTITION BY ${groupExpr} ORDER BY ${f}) AS rn,
COUNT(*) OVER (PARTITION BY ${groupExpr}) AS cnt
FROM \`${tableName}\` ${nullGuard}
) t
WHERE rn IN (FLOOR((cnt + 1) / 2.0), CEIL((cnt + 1) / 2.0))
GROUP BY \`group\`
ORDER BY \`group\` ASC
`;
} else {
medianQuery = `
SELECT AVG(${f}) AS \`${alias}\`
FROM (
SELECT ${f},
ROW_NUMBER() OVER (ORDER BY ${f}) AS rn,
COUNT(*) OVER () AS cnt
FROM \`${tableName}\` ${nullGuard}
) t
WHERE rn IN (FLOOR((cnt + 1) / 2.0), CEIL((cnt + 1) / 2.0))
`;
}

dbLogger.trace(`🪲📜 MySQL MEDIAN Q: ${medianQuery} values: ${JSON.stringify(filterValues)}`);
const [medianResult] = await this.client.execute(medianQuery, filterValues);
const medianRows = medianResult as AggRow[];

if (groupExpr) {
if (rows.length === 0) {
rows = medianRows.map((r) => ({ group: r.group, [alias]: r[alias] }));
} else {
const byGroup = new Map(medianRows.map((r) => [String(r.group), r[alias]]));
for (const row of rows) {
row[alias] = byGroup.get(String(row.group)) ?? null;
}
}
} else {
const medianVal = medianRows[0]?.[alias] ?? null;
if (rows.length === 0) {
rows = [{ [alias]: medianVal }];
} else {
rows[0][alias] = medianVal;
}
}
}

return rows;
}

async getDataWithOriginalTypes({ resource, limit, offset, sort, filters }): Promise<any[]> {
const columns = resource.dataSourceColumns.map((col) => `${col.name}`).join(', ');
const columns = resource.dataSourceColumns.map((col: { name: string }) => `${col.name}`).join(', ');
const tableName = resource.table;

const { sql: where, values: filterValues } = this.whereClauseAndValues(filters);

const orderBy = sort.length ? `ORDER BY ${sort.map((s) => `${s.field} ${this.SortDirectionsMap[s.direction]}`).join(', ')}` : '';
const orderBy = sort.length ? `ORDER BY ${sort.map((s: { field: string; direction: AdminForthSortDirections }) => `${s.field} ${this.SortDirectionsMap[s.direction]}`).join(', ')}` : '';
let selectQuery = `SELECT ${columns} FROM ${tableName}`;
if (where) selectQuery += ` ${where}`;
if (orderBy) selectQuery += ` ${orderBy}`;
Expand Down Expand Up @@ -385,7 +500,7 @@ class MysqlConnector extends AdminForthBaseConnector implements IAdminForthDataS
async getMinMaxForColumnsWithOriginalTypes({ resource, columns }) {
const tableName = resource.table;
const result = {};
await Promise.all(columns.map(async (col) => {
await Promise.all(columns.map(async (col: { name: string }) => {
const q = `SELECT MIN(${col.name}) as min, MAX(${col.name}) as max FROM ${tableName}`;
dbLogger.trace(`🪲📜 MySQL Q: ${q}`);
const [results] = await this.client.execute(q);
Expand All @@ -410,7 +525,7 @@ class MysqlConnector extends AdminForthBaseConnector implements IAdminForthDataS

async updateRecordOriginalValues({ resource, recordId, newValues }) {
const values = [...Object.values(newValues), recordId];
const columnsWithPlaceholders = Object.keys(newValues).map((col, i) => `${col} = ?`).join(', ');
const columnsWithPlaceholders = Object.keys(newValues).map((col) => `${col} = ?`).join(', ');
const q = `UPDATE ${resource.table} SET ${columnsWithPlaceholders} WHERE ${this.getPrimaryKey(resource)} = ?`;
dbLogger.trace(`🪲📜 MySQL Q: ${q} values: ${JSON.stringify(values)}`);
await this.client.execute(q, values);
Expand Down