Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
6d1c232
fix terms agg
fupelaqu Nov 19, 2025
2181ac2
remove .keyword
fupelaqu Nov 19, 2025
9db439f
fix fields duplicates
fupelaqu Nov 21, 2025
c39d9b9
minor refactoring for window functions
fupelaqu Nov 21, 2025
4b421ff
fix macros for COUNT(*), update ElasticResponse adding optional SQL a…
fupelaqu Nov 22, 2025
96d8c4e
add full support for window functions
fupelaqu Nov 22, 2025
d69d691
to fix COUNT(*)
fupelaqu Nov 23, 2025
599694e
add isAggregation and asAggregation functions, fix _source for pure a…
fupelaqu Nov 24, 2025
4c4f124
to fix compilation bug
fupelaqu Nov 24, 2025
fcfce6a
to fix count using _index
fupelaqu Nov 24, 2025
716b556
to fix unrelated code to java client api
fupelaqu Nov 24, 2025
8a1a7bb
handle buckets with painless script, fix min_doc_count for all buckets
fupelaqu Nov 24, 2025
8c9c744
fix sql query specifications
fupelaqu Nov 24, 2025
ece2def
fix sql script fields with aggregations, shouldBeScripted with arithm…
fupelaqu Nov 24, 2025
313b82a
fix param and metric name for count all
fupelaqu Nov 26, 2025
6e50bb1
add support for bucket script
fupelaqu Nov 26, 2025
393cc62
upgrade testContainers
fupelaqu Nov 27, 2025
e73b7d9
update elastic conversion in order to load aggs top hits
fupelaqu Nov 27, 2025
6555fef
rename bucketPath to nestedPath
fupelaqu Nov 27, 2025
081027a
add flag indicating whether or not an aggregate is a window function …
fupelaqu Nov 27, 2025
97c7faf
update logs for scroll and search
fupelaqu Nov 27, 2025
98080d9
update logs for scroll and search
fupelaqu Nov 27, 2025
8dc0082
do not update order by for window functions
fupelaqu Nov 27, 2025
0ca06f9
add support for multiple bucket trees (for windowing) and add aggrega…
fupelaqu Nov 28, 2025
ab72700
update specifications for top hits aggregation, update to v 0.14.0
fupelaqu Nov 28, 2025
422e221
fix lint
fupelaqu Nov 28, 2025
b9d784e
fix jest tests
fupelaqu Nov 28, 2025
fe9cf7f
update README.md
fupelaqu Nov 28, 2025
1097401
finalize support for window functions with multi-partitioning
fupelaqu Nov 30, 2025
4d49bad
add specifications for window functions
fupelaqu Nov 30, 2025
9533843
add support for count, min, max, avg and sum with partitions
fupelaqu Nov 30, 2025
2b85c83
to fix sql query specifications
fupelaqu Nov 30, 2025
5b27725
to fix window function parsing + mapping within bucket scripts
fupelaqu Dec 1, 2025
3123ebc
to fix aggregate functions that should be scripted
fupelaqu Dec 1, 2025
d18742b
to fix parser bug
fupelaqu Dec 1, 2025
aac63c4
to fix script with date parse function
fupelaqu Dec 1, 2025
336af08
to fix aggregations with multiple partition by, update README.md + ke…
fupelaqu Dec 3, 2025
5515d6a
to fix regression with window functions
fupelaqu Dec 3, 2025
dc226a7
update README.md
fupelaqu Dec 3, 2025
ff1349c
update query normalization before parsing it
fupelaqu Dec 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
728 changes: 706 additions & 22 deletions README.md

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@

package app.softnetwork.elastic.sql.bridge

import app.softnetwork.elastic.sql.query.{Bucket, Criteria, Except, Field}
import app.softnetwork.elastic.sql.query.{Bucket, Criteria, Except, Field, FieldSort}
import com.sksamuel.elastic4s.requests.searches.{SearchBodyBuilderFn, SearchRequest}

case class ElasticSearchRequest(
sql: String,
fields: Seq[Field],
except: Option[Except],
sources: Seq[String],
Expand All @@ -28,7 +29,8 @@ case class ElasticSearchRequest(
offset: Option[Int],
search: SearchRequest,
buckets: Seq[Bucket] = Seq.empty,
aggregations: Seq[ElasticAggregation] = Seq.empty
having: Option[Criteria] = None,
sorts: Seq[FieldSort] = Seq.empty
) {
def minScore(score: Option[Double]): ElasticSearchRequest = {
score match {
Expand Down
134 changes: 96 additions & 38 deletions bridge/src/main/scala/app/softnetwork/elastic/sql/bridge/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@

package app.softnetwork.elastic.sql

import app.softnetwork.elastic.sql.`type`.{SQLBigInt, SQLDouble, SQLTemporal, SQLVarchar}
import app.softnetwork.elastic.sql.`type`.{
SQLBigInt,
SQLDouble,
SQLNumeric,
SQLTemporal,
SQLVarchar
}
import app.softnetwork.elastic.sql.function.aggregate.COUNT
import app.softnetwork.elastic.sql.function.geo.{Distance, Meters}
import app.softnetwork.elastic.sql.operator._
Expand All @@ -27,14 +33,14 @@ import com.sksamuel.elastic4s.requests.common.FetchSourceContext
import com.sksamuel.elastic4s.requests.script.Script
import com.sksamuel.elastic4s.requests.script.ScriptType.Source
import com.sksamuel.elastic4s.requests.searches.aggs.{
Aggregation,
AbstractAggregation,
FilterAggregation,
NestedAggregation,
TermsAggregation
}
import com.sksamuel.elastic4s.requests.searches.queries.compound.BoolQuery
import com.sksamuel.elastic4s.requests.searches.queries.{InnerHit, Query}
import com.sksamuel.elastic4s.requests.searches.sort.FieldSort
import com.sksamuel.elastic4s.requests.searches.sort.{FieldSort, ScriptSort, ScriptSortType}
import com.sksamuel.elastic4s.requests.searches.{
MultiSearchRequest,
SearchBodyBuilderFn,
Expand Down Expand Up @@ -142,10 +148,10 @@ package object bridge {
implicit def requestToRootAggregations(
request: SQLSearchRequest,
aggregations: Seq[ElasticAggregation]
): Seq[Aggregation] = {
): Seq[AbstractAggregation] = {
val notNestedAggregations = aggregations.filterNot(_.nested)

val notNestedBuckets = request.buckets.filterNot(_.nested)
val notNestedBuckets = request.bucketTree.filterNot(_.bucket.nested)

val rootAggregations = notNestedAggregations match {
case Nil =>
Expand All @@ -158,8 +164,8 @@ package object bridge {
None,
aggregations
) match {
case Some(b) => Seq(b)
case _ => Seq.empty
case Nil => Seq.empty
case aggs => aggs
}
buckets
case aggs =>
Expand All @@ -173,14 +179,18 @@ package object bridge {
val buckets = ElasticAggregation.buildBuckets(
notNestedBuckets,
request.sorts -- directions.keys,
aggregations,
aggs,
directions,
request.having.flatMap(_.criteria),
None,
aggs
) match {
case Some(b) => Seq(b)
case _ => aggregations
case Nil => aggs.map(_.agg)
case aggs =>
if (request.groupBy.isEmpty && request.windowFunctions.exists(_.isWindowing))
notNestedAggregations.filter(_.bucketPath.isEmpty).map(_.agg) ++ aggs
else
aggs
}
buckets
}
Expand All @@ -204,12 +214,14 @@ package object bridge {

// Group nested buckets by their nested path
val nestedGroupedBuckets =
request.buckets
.filter(_.nested)
.groupBy(
_.nestedBucket.getOrElse(
throw new IllegalArgumentException(
"Nested bucket must have a nested element"
request.bucketTree
.filter(_.bucket.nested)
.map(tree =>
tree.groupBy(
_.bucket.nestedBucket.getOrElse(
throw new IllegalArgumentException(
"Nested bucket must have a nested element"
)
)
)
)
Expand All @@ -229,17 +241,16 @@ package object bridge {

// Get the buckets for this nested element
val nestedBuckets =
nestedGroupedBuckets.getOrElse(n.innerHitsName, Seq.empty)
nestedGroupedBuckets.map(_.getOrElse(n.innerHitsName, Seq.empty))

val notRelatedAggregationsToBuckets = elasticAggregations
.filterNot { ea =>
nestedBuckets.exists(nb => nb.identifier.path == ea.sourceField)
nestedBuckets.flatten.exists(nb => nb.bucket.identifier.path == ea.sourceField)
}
.map(_.agg)

val relatedAggregationsToBuckets = elasticAggregations
.filter { ea =>
nestedBuckets.exists(nb => nb.identifier.path == ea.sourceField)
nestedBuckets.flatten.exists(nb => nb.bucket.identifier.path == ea.sourceField)
}
.map(_.agg)

Expand All @@ -257,7 +268,7 @@ package object bridge {
requestToNestedFilterAggregation(request, n.innerHitsName)

// Build buckets for this nested aggregation
val buckets: Seq[Aggregation] =
val buckets: Seq[AbstractAggregation] =
ElasticAggregation.buildBuckets(
nestedBuckets,
request.sorts -- directions.keys,
Expand All @@ -267,8 +278,12 @@ package object bridge {
Some(n),
aggregations
) match {
case Some(b) => Seq(b)
case _ => notRelatedAggregationsToBuckets
case Nil => notRelatedAggregationsToBuckets.map(_.agg)
case aggs =>
if (request.groupBy.isEmpty && request.windowFunctions.exists(_.isWindowing))
notRelatedAggregationsToBuckets.filter(_.bucketPath.isEmpty).map(_.agg) ++ aggs
else
aggs
}

val children = n.children
Expand Down Expand Up @@ -373,7 +388,7 @@ package object bridge {
}

private def addNestedAggregationsToTermsAggregation(
agg: Aggregation,
agg: AbstractAggregation,
nested: Seq[NestedAggregation]
): Option[TermsAggregation] = {
agg match {
Expand All @@ -397,24 +412,29 @@ package object bridge {

implicit def requestToElasticSearchRequest(request: SQLSearchRequest): ElasticSearchRequest =
ElasticSearchRequest(
request.sql,
request.select.fields,
request.select.except,
request.sources,
request.where.flatMap(_.criteria),
request.limit.map(_.limit),
request.limit.flatMap(_.offset.map(_.offset)),
request.limit.flatMap(_.offset.map(_.offset)).orElse(Some(0)),
request,
request.buckets,
request.aggregates.map(
ElasticAggregation(_, request.having.flatMap(_.criteria), request.sorts)
)
request.having.flatMap(_.criteria),
request.orderBy.map(_.sorts).getOrElse(Seq.empty)
).minScore(request.score)

implicit def requestToSearchRequest(request: SQLSearchRequest): SearchRequest = {
import request._

val aggregations = request.aggregates.map(
ElasticAggregation(_, request.having.flatMap(_.criteria), request.sorts)
ElasticAggregation(
_,
request.having.flatMap(_.criteria),
request.sorts,
request.sqlAggregations
)
)

val rootAggregations = requestToRootAggregations(request, aggregations)
Expand Down Expand Up @@ -457,7 +477,7 @@ package object bridge {
_search
}

_search = scriptFields.filterNot(_.aggregation) match {
_search = scriptFields.filterNot(_.isAggregation) match {
case Nil => _search
case _ =>
_search scriptfields scriptFields.map { field =>
Expand All @@ -478,17 +498,55 @@ package object bridge {

_search = orderBy match {
case Some(o) if aggregates.isEmpty && buckets.isEmpty =>
_search sortBy o.sorts.map(sort =>
sort.order match {
case Some(Desc) => FieldSort(sort.field).desc()
case _ => FieldSort(sort.field).asc()
_search sortBy o.sorts.map { sort =>
if (sort.isScriptSort) {
val context = PainlessContext()
val painless = sort.field.painless(Some(context))
val painlessScript = s"$context$painless"
val script =
sort.out match {
case _: SQLTemporal if !painless.endsWith("toEpochMilli()") =>
val parts = painlessScript.split(";").toSeq
if (parts.size > 1) {
val lastPart = parts.last.trim.stripPrefix("return ")
if (lastPart.split(" ").toSeq.size == 1) {
val newLastPart =
s"""($lastPart != null) ? $lastPart.toInstant().toEpochMilli() : null"""
s"${parts.dropRight(1).mkString(";")}; return $newLastPart"
} else {
painlessScript
}
} else {
s"$painlessScript.toInstant().toEpochMilli()"
}
case _ => painlessScript
}
val scriptSort =
ScriptSort(
script = Script(script = script)
.lang("painless")
.scriptType(Source),
scriptSortType = sort.field.out match {
case _: SQLTemporal | _: SQLNumeric => ScriptSortType.Number
case _ => ScriptSortType.String
}
)
sort.order match {
case Some(Desc) => scriptSort.desc()
case _ => scriptSort.asc()
}
} else {
sort.order match {
case Some(Desc) => FieldSort(sort.field.aliasOrName).desc()
case _ => FieldSort(sort.field.aliasOrName).asc()
}
}
)
}
case _ => _search
}

if (allAggregations.nonEmpty && fields.isEmpty) {
_search size 0
_search size 0 fetchSource false
} else {
limit match {
case Some(l) => _search limit l.limit from l.offset.map(_.offset).getOrElse(0)
Expand All @@ -512,7 +570,7 @@ package object bridge {

implicit def expressionToQuery(expression: GenericExpression): Query = {
import expression._
if (aggregation)
if (isAggregation)
return matchAllQuery()
if (
identifier.functions.nonEmpty && (identifier.functions.size > 1 || (identifier.functions.head match {
Expand Down Expand Up @@ -946,7 +1004,7 @@ package object bridge {
case Left(l) =>
val filteredAgg: Option[FilterAggregation] = requestToFilterAggregation(l)
l.aggregates
.map(ElasticAggregation(_, l.having.flatMap(_.criteria), l.sorts))
.map(ElasticAggregation(_, l.having.flatMap(_.criteria), l.sorts, l.sqlAggregations))
.map(aggregation => {
val queryFiltered =
l.where
Expand Down
Loading