Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
353 changes: 136 additions & 217 deletions README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,11 @@ import com.sksamuel.elastic4s.ElasticApi.{
import com.sksamuel.elastic4s.requests.script.Script
import com.sksamuel.elastic4s.requests.searches.aggs.{
Aggregation,
CardinalityAggregation,
ExtendedStatsAggregation,
FilterAggregation,
NestedAggregation,
StatsAggregation,
TermsAggregation,
TermsOrder
}
Expand All @@ -71,6 +74,14 @@ case class ElasticAggregation(
) {
val nested: Boolean = nestedElement.nonEmpty
val filtered: Boolean = filteredAgg.nonEmpty

// CHECK if it is a "global" metric (cardinality, etc.) or a bucket metric (avg, sum, etc.)
val isGlobalMetric: Boolean = agg match {
case _: CardinalityAggregation => true
case _: StatsAggregation => true
case _: ExtendedStatsAggregation => true
case _ => false
}
}

object ElasticAggregation {
Expand Down Expand Up @@ -186,17 +197,6 @@ object ElasticAggregation {
topHits
}

val filteredAggName = "filtered_agg"

def filtered(): Unit =
having match {
case Some(_) =>
aggPath ++= Seq(filteredAggName)
aggPath ++= Seq(aggName)
case _ =>
aggPath ++= Seq(aggName)
}

val nestedElement = identifier.nestedElement

val nestedElements: Seq[NestedElement] =
Expand All @@ -205,6 +205,7 @@ object ElasticAggregation {
val nestedAgg =
nestedElements match {
case Nil =>
aggPath ++= Seq(aggName)
None
case nestedElements =>
def buildNested(n: NestedElement): NestedAggregation = {
Expand All @@ -231,11 +232,16 @@ object ElasticAggregation {
}
}

Some(buildNested(nestedElements.head))
val root = nestedElements.head
val nestedAgg = buildNested(root) subaggs Seq(_agg)
having match {
case Some(_) => aggPath ++= Seq("filtered_agg")
case _ =>
}
aggPath ++= Seq(aggName)
Some(nestedAgg)
}

filtered()

ElasticAggregation(
aggPath.mkString("."),
field,
Expand All @@ -254,19 +260,24 @@ object ElasticAggregation {
bucketsDirection: Map[String, SortOrder],
aggregations: Seq[Aggregation],
aggregationsDirection: Map[String, SortOrder],
having: Option[Criteria]
having: Option[Criteria],
nested: Option[NestedElement],
allElasticAggregations: Seq[ElasticAggregation]
): Option[TermsAggregation] = {
buckets.reverse.foldLeft(Option.empty[TermsAggregation]) { (current, bucket) =>
// Determine the bucketPath of the current bucket
val currentBucketPath = bucket.identifier.path

var agg = {
bucketsDirection.get(bucket.identifier.identifierName) match {
case Some(direction) =>
termsAgg(bucket.name, s"${bucket.identifier.path}.keyword")
termsAgg(bucket.name, s"$currentBucketPath.keyword")
.order(Seq(direction match {
case Asc => TermsOrder("_key", asc = true)
case _ => TermsOrder("_key", asc = false)
}))
case None =>
termsAgg(bucket.name, s"${bucket.identifier.path}.keyword")
termsAgg(bucket.name, s"$currentBucketPath.keyword")
}
}
bucket.size.foreach(s => agg = agg.size(s))
Expand Down Expand Up @@ -304,22 +315,212 @@ object ElasticAggregation {
agg
val withHaving = having match {
case Some(criteria) =>
val script = MetricSelectorScript.metricSelector(criteria)
val bucketsPath = criteria.extractMetricsPath

val bucketSelector =
bucketSelectorAggregation(
"having_filter",
Script(script.replaceAll("1 == 1 &&", "").replaceAll("&& 1 == 1", "").trim),
bucketsPath
)

withAggregationOrders.copy(subaggs = aggregations :+ bucketSelector)
val script = metricSelectorForBucket(
criteria,
nested,
allElasticAggregations
)

if (script.nonEmpty) {
val bucketSelector =
bucketSelectorAggregation(
"having_filter",
Script(script),
extractMetricsPathForBucket(
criteria,
nested,
allElasticAggregations
)
)
withAggregationOrders.copy(subaggs = aggregations :+ bucketSelector)
} else {
withAggregationOrders.copy(subaggs = aggregations)
}
case None => withAggregationOrders.copy(subaggs = aggregations)
}
Some(withHaving)
}
}
}

/** Generates the bucket_selector script for a given bucket
*/
def metricSelectorForBucket(
criteria: Criteria,
nested: Option[NestedElement],
allElasticAggregations: Seq[ElasticAggregation]
): String = {

val currentBucketPath = nested.map(_.bucketPath).getOrElse("")

// No filtering
val fullScript = MetricSelectorScript
.metricSelector(criteria)
.replaceAll("1 == 1 &&", "")
.replaceAll("&& 1 == 1", "")
.replaceAll("1 == 1", "")
.trim

// println(s"[DEBUG] currentBucketPath = $currentBucketPath")
// println(s"[DEBUG] fullScript (complete) = $fullScript")

if (fullScript.isEmpty) {
return ""
}

// Parse the script to extract the conditions
val conditions = parseConditions(fullScript)
// println(s"[DEBUG] conditions = $conditions")

// Filter based on availability in buckets_path
val relevantConditions = conditions.filter { condition =>
val metricNames = extractMetricNames(condition)
// println(s"[DEBUG] condition = $condition, metricNames = $metricNames")

metricNames.forall { metricName =>
allElasticAggregations.find(agg =>
agg.aggName == metricName || agg.field == metricName
) match {
case Some(elasticAgg) =>
val metricBucketPath = elasticAgg.nestedElement
.map(_.bucketPath)
.getOrElse("")

// println(
// s"[DEBUG] metricName = $metricName, metricBucketPath = $metricBucketPath, aggType = ${elasticAgg.agg.getClass.getSimpleName}"
// )

val belongsToLevel = metricBucketPath == currentBucketPath

val isDirectChildAndAccessible =
if (isDirectChild(metricBucketPath, currentBucketPath)) {
// Check if it's a "global" metric (cardinality, etc.)
elasticAgg.isGlobalMetric
} else {
false
}

val result = belongsToLevel || isDirectChildAndAccessible

// println(
// s"[DEBUG] belongsToLevel = $belongsToLevel, isDirectChildAndAccessible = $isDirectChildAndAccessible, result = $result"
// )
result

case None =>
// println(s"[DEBUG] metricName = $metricName NOT FOUND")
currentBucketPath.isEmpty
}
}
}

// println(s"[DEBUG] relevantConditions = $relevantConditions")

if (relevantConditions.isEmpty) {
""
} else {
relevantConditions.mkString(" && ")
}
}

/** HELPER: Parse the conditions of a script (separated by &&)
*/
private def parseConditions(script: String): Seq[String] = {
// Simple parsing : split by " && "
// ⚠️ This simple implementation does not handle parentheses.
script.split(" && ").map(_.trim).toSeq
}

/** HELPER: Extracts the metric names from a condition Example: "params.ingredient_count >= 3" =>
* Seq("ingredient_count")
*/
private def extractMetricNames(condition: String): Seq[String] = {
// Pattern to extract "params.XXX"
val pattern = "params\\.([a-zA-Z_][a-zA-Z0-9_]*)".r
pattern.findAllMatchIn(condition).map(_.group(1)).toSeq
}

// HELPER: Check if a path is a direct child
private def isDirectChild(childPath: String, parentPath: String): Boolean = {
if (parentPath.isEmpty) {
childPath.nonEmpty && !childPath.contains(">")
} else {
childPath.startsWith(parentPath + ">") &&
childPath.count(_ == '>') == parentPath.count(_ == '>') + 1
}
}

/** Extracts the buckets_path for a given bucket
*/
def extractMetricsPathForBucket(
criteria: Criteria,
nested: Option[NestedElement],
allElasticAggregations: Seq[ElasticAggregation]
): Map[String, String] = {

val currentBucketPath = nested.map(_.bucketPath).getOrElse("")

// Extract ALL metrics paths
val allMetricsPaths = criteria.extractAllMetricsPath

// println(s"[DEBUG extractMetricsPath] currentBucketPath = $currentBucketPath")
// println(s"[DEBUG extractMetricsPath] allMetricsPaths = $allMetricsPaths")

// Filter and adapt the paths for this bucket
val result = allMetricsPaths.flatMap { case (metricName, metricPath) =>
allElasticAggregations.find(agg =>
agg.aggName == metricName || agg.field == metricName
) match {
case Some(elasticAgg) =>
val metricBucketPath = elasticAgg.nestedElement
.map(_.bucketPath)
.getOrElse("")

// println(
// s"[DEBUG extractMetricsPath] metricName = $metricName, metricBucketPath = $metricBucketPath, aggType = ${elasticAgg.agg.getClass.getSimpleName}"
// )

if (metricBucketPath == currentBucketPath) {
// Metric of the same level
// println(s"[DEBUG extractMetricsPath] Same level: $metricName -> $metricName")
Some(metricName -> metricName)

} else if (isDirectChild(metricBucketPath, currentBucketPath)) {
// Metric of a direct child

// CHECK if it is a "global" metric (cardinality, etc.) or a bucket metric (avg, sum, etc.)
val isGlobalMetric = elasticAgg.isGlobalMetric

if (isGlobalMetric) {
// Global metric: can be referenced from the parent
val childNestedName = elasticAgg.nestedElement
.map(_.innerHitsName)
.getOrElse("")
// println(
// s"[DEBUG extractMetricsPath] Direct child (global metric): $metricName -> $childNestedName>$metricName"
// )
Some(metricName -> s"$childNestedName>$metricName")
} else {
// Bucket metric: cannot be referenced from the parent
// println(
// s"[DEBUG extractMetricsPath] Direct child (bucket metric): $metricName -> SKIP (bucket-level metric)"
// )
None
}

} else {
// A different level of metric
// println(s"[DEBUG extractMetricsPath] Other level: $metricName -> SKIP")
None
}

case None =>
// println(s"[DEBUG extractMetricsPath] Not found: $metricName -> SKIP")
None
}
}

// println(s"[DEBUG extractMetricsPath] result = $result")
result
}
}
Loading