Skip to content

Commit bcea233

Browse files
authored
Merge pull request #25 from SOFTNETWORK-APP/fix/nestedAggregations
- fix all nested aggregations with SQL
2 parents 828a23a + 631cec2 commit bcea233

File tree

15 files changed

+1037
-481
lines changed

15 files changed

+1037
-481
lines changed

README.md

Lines changed: 136 additions & 217 deletions
Large diffs are not rendered by default.

bridge/src/main/scala/app/softnetwork/elastic/sql/bridge/ElasticAggregation.scala

Lines changed: 229 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,11 @@ import com.sksamuel.elastic4s.ElasticApi.{
4646
import com.sksamuel.elastic4s.requests.script.Script
4747
import com.sksamuel.elastic4s.requests.searches.aggs.{
4848
Aggregation,
49+
CardinalityAggregation,
50+
ExtendedStatsAggregation,
4951
FilterAggregation,
5052
NestedAggregation,
53+
StatsAggregation,
5154
TermsAggregation,
5255
TermsOrder
5356
}
@@ -71,6 +74,14 @@ case class ElasticAggregation(
7174
) {
7275
val nested: Boolean = nestedElement.nonEmpty
7376
val filtered: Boolean = filteredAgg.nonEmpty
77+
78+
// CHECK if it is a "global" metric (cardinality, etc.) or a bucket metric (avg, sum, etc.)
79+
val isGlobalMetric: Boolean = agg match {
80+
case _: CardinalityAggregation => true
81+
case _: StatsAggregation => true
82+
case _: ExtendedStatsAggregation => true
83+
case _ => false
84+
}
7485
}
7586

7687
object ElasticAggregation {
@@ -186,17 +197,6 @@ object ElasticAggregation {
186197
topHits
187198
}
188199

189-
val filteredAggName = "filtered_agg"
190-
191-
def filtered(): Unit =
192-
having match {
193-
case Some(_) =>
194-
aggPath ++= Seq(filteredAggName)
195-
aggPath ++= Seq(aggName)
196-
case _ =>
197-
aggPath ++= Seq(aggName)
198-
}
199-
200200
val nestedElement = identifier.nestedElement
201201

202202
val nestedElements: Seq[NestedElement] =
@@ -205,6 +205,7 @@ object ElasticAggregation {
205205
val nestedAgg =
206206
nestedElements match {
207207
case Nil =>
208+
aggPath ++= Seq(aggName)
208209
None
209210
case nestedElements =>
210211
def buildNested(n: NestedElement): NestedAggregation = {
@@ -231,11 +232,16 @@ object ElasticAggregation {
231232
}
232233
}
233234

234-
Some(buildNested(nestedElements.head))
235+
val root = nestedElements.head
236+
val nestedAgg = buildNested(root) subaggs Seq(_agg)
237+
having match {
238+
case Some(_) => aggPath ++= Seq("filtered_agg")
239+
case _ =>
240+
}
241+
aggPath ++= Seq(aggName)
242+
Some(nestedAgg)
235243
}
236244

237-
filtered()
238-
239245
ElasticAggregation(
240246
aggPath.mkString("."),
241247
field,
@@ -254,19 +260,24 @@ object ElasticAggregation {
254260
bucketsDirection: Map[String, SortOrder],
255261
aggregations: Seq[Aggregation],
256262
aggregationsDirection: Map[String, SortOrder],
257-
having: Option[Criteria]
263+
having: Option[Criteria],
264+
nested: Option[NestedElement],
265+
allElasticAggregations: Seq[ElasticAggregation]
258266
): Option[TermsAggregation] = {
259267
buckets.reverse.foldLeft(Option.empty[TermsAggregation]) { (current, bucket) =>
268+
// Determine the bucketPath of the current bucket
269+
val currentBucketPath = bucket.identifier.path
270+
260271
var agg = {
261272
bucketsDirection.get(bucket.identifier.identifierName) match {
262273
case Some(direction) =>
263-
termsAgg(bucket.name, s"${bucket.identifier.path}.keyword")
274+
termsAgg(bucket.name, s"$currentBucketPath.keyword")
264275
.order(Seq(direction match {
265276
case Asc => TermsOrder("_key", asc = true)
266277
case _ => TermsOrder("_key", asc = false)
267278
}))
268279
case None =>
269-
termsAgg(bucket.name, s"${bucket.identifier.path}.keyword")
280+
termsAgg(bucket.name, s"$currentBucketPath.keyword")
270281
}
271282
}
272283
bucket.size.foreach(s => agg = agg.size(s))
@@ -304,22 +315,212 @@ object ElasticAggregation {
304315
agg
305316
val withHaving = having match {
306317
case Some(criteria) =>
307-
val script = MetricSelectorScript.metricSelector(criteria)
308-
val bucketsPath = criteria.extractMetricsPath
309-
310-
val bucketSelector =
311-
bucketSelectorAggregation(
312-
"having_filter",
313-
Script(script.replaceAll("1 == 1 &&", "").replaceAll("&& 1 == 1", "").trim),
314-
bucketsPath
315-
)
316-
317-
withAggregationOrders.copy(subaggs = aggregations :+ bucketSelector)
318+
val script = metricSelectorForBucket(
319+
criteria,
320+
nested,
321+
allElasticAggregations
322+
)
318323

324+
if (script.nonEmpty) {
325+
val bucketSelector =
326+
bucketSelectorAggregation(
327+
"having_filter",
328+
Script(script),
329+
extractMetricsPathForBucket(
330+
criteria,
331+
nested,
332+
allElasticAggregations
333+
)
334+
)
335+
withAggregationOrders.copy(subaggs = aggregations :+ bucketSelector)
336+
} else {
337+
withAggregationOrders.copy(subaggs = aggregations)
338+
}
319339
case None => withAggregationOrders.copy(subaggs = aggregations)
320340
}
321341
Some(withHaving)
322342
}
323343
}
324344
}
345+
346+
/** Generates the bucket_selector script for a given bucket
347+
*/
348+
def metricSelectorForBucket(
349+
criteria: Criteria,
350+
nested: Option[NestedElement],
351+
allElasticAggregations: Seq[ElasticAggregation]
352+
): String = {
353+
354+
val currentBucketPath = nested.map(_.bucketPath).getOrElse("")
355+
356+
// No filtering
357+
val fullScript = MetricSelectorScript
358+
.metricSelector(criteria)
359+
.replaceAll("1 == 1 &&", "")
360+
.replaceAll("&& 1 == 1", "")
361+
.replaceAll("1 == 1", "")
362+
.trim
363+
364+
// println(s"[DEBUG] currentBucketPath = $currentBucketPath")
365+
// println(s"[DEBUG] fullScript (complete) = $fullScript")
366+
367+
if (fullScript.isEmpty) {
368+
return ""
369+
}
370+
371+
// Parse the script to extract the conditions
372+
val conditions = parseConditions(fullScript)
373+
// println(s"[DEBUG] conditions = $conditions")
374+
375+
// Filter based on availability in buckets_path
376+
val relevantConditions = conditions.filter { condition =>
377+
val metricNames = extractMetricNames(condition)
378+
// println(s"[DEBUG] condition = $condition, metricNames = $metricNames")
379+
380+
metricNames.forall { metricName =>
381+
allElasticAggregations.find(agg =>
382+
agg.aggName == metricName || agg.field == metricName
383+
) match {
384+
case Some(elasticAgg) =>
385+
val metricBucketPath = elasticAgg.nestedElement
386+
.map(_.bucketPath)
387+
.getOrElse("")
388+
389+
// println(
390+
// s"[DEBUG] metricName = $metricName, metricBucketPath = $metricBucketPath, aggType = ${elasticAgg.agg.getClass.getSimpleName}"
391+
// )
392+
393+
val belongsToLevel = metricBucketPath == currentBucketPath
394+
395+
val isDirectChildAndAccessible =
396+
if (isDirectChild(metricBucketPath, currentBucketPath)) {
397+
// Check if it's a "global" metric (cardinality, etc.)
398+
elasticAgg.isGlobalMetric
399+
} else {
400+
false
401+
}
402+
403+
val result = belongsToLevel || isDirectChildAndAccessible
404+
405+
// println(
406+
// s"[DEBUG] belongsToLevel = $belongsToLevel, isDirectChildAndAccessible = $isDirectChildAndAccessible, result = $result"
407+
// )
408+
result
409+
410+
case None =>
411+
// println(s"[DEBUG] metricName = $metricName NOT FOUND")
412+
currentBucketPath.isEmpty
413+
}
414+
}
415+
}
416+
417+
// println(s"[DEBUG] relevantConditions = $relevantConditions")
418+
419+
if (relevantConditions.isEmpty) {
420+
""
421+
} else {
422+
relevantConditions.mkString(" && ")
423+
}
424+
}
425+
426+
/** HELPER: Parse the conditions of a script (separated by &&)
427+
*/
428+
private def parseConditions(script: String): Seq[String] = {
429+
// Simple parsing : split by " && "
430+
// ⚠️ This simple implementation does not handle parentheses.
431+
script.split(" && ").map(_.trim).toSeq
432+
}
433+
434+
/** HELPER: Extracts the metric names from a condition Example: "params.ingredient_count >= 3" =>
435+
* Seq("ingredient_count")
436+
*/
437+
private def extractMetricNames(condition: String): Seq[String] = {
438+
// Pattern to extract "params.XXX"
439+
val pattern = "params\\.([a-zA-Z_][a-zA-Z0-9_]*)".r
440+
pattern.findAllMatchIn(condition).map(_.group(1)).toSeq
441+
}
442+
443+
// HELPER: Check if a path is a direct child
444+
private def isDirectChild(childPath: String, parentPath: String): Boolean = {
445+
if (parentPath.isEmpty) {
446+
childPath.nonEmpty && !childPath.contains(">")
447+
} else {
448+
childPath.startsWith(parentPath + ">") &&
449+
childPath.count(_ == '>') == parentPath.count(_ == '>') + 1
450+
}
451+
}
452+
453+
/** Extracts the buckets_path for a given bucket
454+
*/
455+
def extractMetricsPathForBucket(
456+
criteria: Criteria,
457+
nested: Option[NestedElement],
458+
allElasticAggregations: Seq[ElasticAggregation]
459+
): Map[String, String] = {
460+
461+
val currentBucketPath = nested.map(_.bucketPath).getOrElse("")
462+
463+
// Extract ALL metrics paths
464+
val allMetricsPaths = criteria.extractAllMetricsPath
465+
466+
// println(s"[DEBUG extractMetricsPath] currentBucketPath = $currentBucketPath")
467+
// println(s"[DEBUG extractMetricsPath] allMetricsPaths = $allMetricsPaths")
468+
469+
// Filter and adapt the paths for this bucket
470+
val result = allMetricsPaths.flatMap { case (metricName, metricPath) =>
471+
allElasticAggregations.find(agg =>
472+
agg.aggName == metricName || agg.field == metricName
473+
) match {
474+
case Some(elasticAgg) =>
475+
val metricBucketPath = elasticAgg.nestedElement
476+
.map(_.bucketPath)
477+
.getOrElse("")
478+
479+
// println(
480+
// s"[DEBUG extractMetricsPath] metricName = $metricName, metricBucketPath = $metricBucketPath, aggType = ${elasticAgg.agg.getClass.getSimpleName}"
481+
// )
482+
483+
if (metricBucketPath == currentBucketPath) {
484+
// Metric of the same level
485+
// println(s"[DEBUG extractMetricsPath] Same level: $metricName -> $metricName")
486+
Some(metricName -> metricName)
487+
488+
} else if (isDirectChild(metricBucketPath, currentBucketPath)) {
489+
// Metric of a direct child
490+
491+
// CHECK if it is a "global" metric (cardinality, etc.) or a bucket metric (avg, sum, etc.)
492+
val isGlobalMetric = elasticAgg.isGlobalMetric
493+
494+
if (isGlobalMetric) {
495+
// Global metric: can be referenced from the parent
496+
val childNestedName = elasticAgg.nestedElement
497+
.map(_.innerHitsName)
498+
.getOrElse("")
499+
// println(
500+
// s"[DEBUG extractMetricsPath] Direct child (global metric): $metricName -> $childNestedName>$metricName"
501+
// )
502+
Some(metricName -> s"$childNestedName>$metricName")
503+
} else {
504+
// Bucket metric: cannot be referenced from the parent
505+
// println(
506+
// s"[DEBUG extractMetricsPath] Direct child (bucket metric): $metricName -> SKIP (bucket-level metric)"
507+
// )
508+
None
509+
}
510+
511+
} else {
512+
// A different level of metric
513+
// println(s"[DEBUG extractMetricsPath] Other level: $metricName -> SKIP")
514+
None
515+
}
516+
517+
case None =>
518+
// println(s"[DEBUG extractMetricsPath] Not found: $metricName -> SKIP")
519+
None
520+
}
521+
}
522+
523+
// println(s"[DEBUG extractMetricsPath] result = $result")
524+
result
525+
}
325526
}

0 commit comments

Comments
 (0)