Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

'Non-aggregate SELECT expression(s) not part of GROUP BY' Error When Using LATEST_BY_OFFSET #5114

Closed
archy-bold opened this issue Apr 20, 2020 · 18 comments · Fixed by #5271
Closed
Labels
Milestone

Comments

@archy-bold
Copy link

archy-bold commented Apr 20, 2020

Describe the bug
I wanted to create a table which joined several rows into one collecting a single column into an array by using COLLECT_LIST. I also wanted that table to be keyed by a single column, but still have the other columns visible. So using a large GROUP BY on all the columns wasn't an option.

I figured the aggregate function LATEST_BY_OFFSET was the closest to what I wanted to do as it would return the latest column, even though they would always be the same. However, running the following statement:

CREATE TABLE ALBUMS AS
  SELECT LATEST_BY_OFFSET(id) AS id, LATEST_BY_OFFSET(title) AS title, upc, LATEST_BY_OFFSET(release_date) AS release_date
  COLLECT_LIST(artist) AS artists
  FROM ALBUMS_SRC
  GROUP BY upc;

And got the following error:

Non-aggregate SELECT expression(s) not part of GROUP BY: [LATEST_BY_OFFSET(ALBUMS_SRC.ID), LATEST_BY_OFFSET(ALBUMS_SRC.RELEASE_DATE), LATEST_BY_OFFSET(ALBUMS_SRC.TITLE)]

Ideally I'd be able to group by one column and use non-aggregated columns in the same query but I thought using an aggregated column in its place would work here, but it appears LATEST_BY_OFFSET isn't considered to produce an aggregated result in this case.

Using TOPK does work, however:

CREATE TABLE ALBUMS AS
SELECT TOPK(id, 1)[1] AS id, TOPK(title, 1)[1] AS title, upc, TOPK(release_date, 1)[1] AS release_date, \
COLLECT_LIST(artist) AS artists \
FROM ALBUMS_SRC \
GROUP BY upc

To Reproduce
Steps to reproduce the behavior, include:

  1. The version of KSQL: 0.7.0

Sample queries above.

Sample data (note multiple rows can exist for a single album representing each artist):

id,title,upc,release_date,artist
2245,Chinampa,191515612908,17396,"0|5151|El Búho|1|performer"
2515,Twilight Zone,3700604727675,18355,0|4248|Pumpkin|1|performer
2515,Twilight Zone,3700604727675,18355,"1|4249|Vin'S da Cuero|1|performer"
2508,"Mes anciens",3700604727606,18355,0|4923|Keusty|1|performer
2508,"Mes anciens",3700604727606,18355,"1|5481|Tom Dettinger|0|producer"
2508,"Mes anciens",3700604727606,18355,"2|5482|Noham Saad-Saoud|0|producer"
2508,"Mes anciens",3700604727606,18355,"3|5483|Chinhan Trieu|0|producer"

Expected behavior

I expect the table to be created without error.

Actual behaviour

I get the following error:

Non-aggregate SELECT expression(s) not part of GROUP BY: [LATEST_BY_OFFSET(ALBUMS_SRC.ID), LATEST_BY_OFFSET(ALBUMS_SRC.RELEASE_DATE), LATEST_BY_OFFSET(ALBUMS_SRC.TITLE)]

@archy-bold archy-bold added the bug label Apr 20, 2020
@archy-bold
Copy link
Author

archy-bold commented Apr 20, 2020

Tried in ksqdb 0.8.1 and, whilst the query successfully runs, the table is not populated and these errors appear in the logs:

ksqldb-server      | [2020-04-20 11:41:20,665] ERROR stream-thread [_confluent-ksql-default_query_CTAS_MYSQL_ALBUMS_5-47a67775-f6c2-4deb-b9a1-9c202a1a0f31-StreamThread-3] task [1_2] Failed to flush state store Aggregate-Aggregate-Materialize:  (org.apache.kafka.streams.processor.internals.ProcessorStateManager:287)
ksqldb-server      | java.lang.NullPointerException
ksqldb-server      |    at io.confluent.ksql.function.udaf.latest.LatestByOffset$1.map(LatestByOffset.java:155)
ksqldb-server      |    at io.confluent.ksql.function.udaf.latest.LatestByOffset$1.map(LatestByOffset.java:125)
ksqldb-server      |    at io.confluent.ksql.function.UdafAggregateFunction.lambda$null$3(UdafAggregateFunction.java:174)
ksqldb-server      |    at io.confluent.ksql.function.UdafAggregateFunction.timed(UdafAggregateFunction.java:180)
ksqldb-server      |    at io.confluent.ksql.function.UdafAggregateFunction.lambda$getResultMapper$4(UdafAggregateFunction.java:174)
ksqldb-server      |    at io.confluent.ksql.execution.function.udaf.KudafAggregator$ResultTransformer.transform(KudafAggregator.java:135)
ksqldb-server      |    at io.confluent.ksql.execution.function.udaf.KudafAggregator$ResultTransformer.transform(KudafAggregator.java:112)
ksqldb-server      |    at io.confluent.ksql.execution.streams.transform.KsTransformer.transform(KsTransformer.java:53)
ksqldb-server      |    at io.confluent.ksql.execution.streams.transform.KsTransformer.transform(KsTransformer.java:36)
ksqldb-server      |    at org.apache.kafka.streams.kstream.internals.KTableTransformValues$KTableTransformValuesProcessor.process(KTableTransformValues.java:107)
ksqldb-server      |    at org.apache.kafka.streams.kstream.internals.KTableTransformValues$KTableTransformValuesProcessor.process(KTableTransformValues.java:81)
ksqldb-server      |    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
ksqldb-server      |    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
ksqldb-server      |    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
ksqldb-server      |    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
ksqldb-server      |    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
ksqldb-server      |    at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
ksqldb-server      |    at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)
ksqldb-server      |    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:119)
ksqldb-server      |    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92)
ksqldb-server      |    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
ksqldb-server      |    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
ksqldb-server      |    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109)
ksqldb-server      |    at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124)
ksqldb-server      |    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:272)
ksqldb-server      |    at org.apache.kafka.streams.state.internals.WrappedStateStore.flush(WrappedStateStore.java:84)
ksqldb-server      |    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$flush$7(MeteredKeyValueStore.java:192)
ksqldb-server      |    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
ksqldb-server      |    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:192)
ksqldb-server      |    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:282)
ksqldb-server      |    at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:177)
ksqldb-server      |    at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:554)
ksqldb-server      |    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:490)
ksqldb-server      |    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:478)
ksqldb-server      |    at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:226)
ksqldb-server      |    at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:543)
ksqldb-server      |    at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:977)
ksqldb-server      |    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:823)
ksqldb-server      |    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
ksqldb-server      |    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)

@MichaelDrogalis MichaelDrogalis added this to the 0.9.0 milestone Apr 21, 2020
@apurvam
Copy link
Contributor

apurvam commented Apr 22, 2020

are you saying that in 0.7.0 the query is rejected at compile time? That is because latest_by_offset only was introduced in 0.8.0. Also, we fixed this recently #4975, which will be released in 0.9.0, which I think will address your problem?

@archy-bold
Copy link
Author

I can't say for sure, but I think this will probably fix the error, yes.

Sorry, I hadn't realised that the function didn't exist in 0.7.0. However, the error given doesn't state that the function doesn't exist, only that it should be in the GROUP BY part of the statement. This is what led to the confusion and lacks clarity. I guess there's a chance it could catch people again.

Non-aggregate SELECT expression(s) not part of GROUP BY: [LATEST_BY_OFFSET(ALBUMS_SRC.ID), LATEST_BY_OFFSET(ALBUMS_SRC.RELEASE_DATE), LATEST_BY_OFFSET(ALBUMS_SRC.TITLE)]

@spena spena modified the milestones: 0.9.0, 0.10.0 Apr 27, 2020
@ZachtimusPrime
Copy link

I'm running into the same error message when trying to use LATEST_BY_OFFSET() as part of a table. For reference we are running the official ksql docker image confluentinc/cp-ksql-server:5.4.1.

big-andy-coates added a commit to big-andy-coates/ksql that referenced this issue May 5, 2020
fixes: confluentinc#5114

Added test to ensure the latest version of ksqlDb gives a meaningful error message if an unknown function is called with a non-aggregate select expression.  Previous versions failed with somewhat confusing error e.g.

```
Non-aggregate SELECT expression(s) not part of GROUP BY: [WONT_FIND_ME(ID)]
```

Now fails with:

```
Can't find any functions with the name 'WONT_FIND_ME'
```
@big-andy-coates
Copy link
Contributor

I'm running into the same error message when trying to use LATEST_BY_OFFSET() as part of a table. For reference we are running the official ksql docker image confluentinc/cp-ksql-server:5.4.1.

5.4.1 does not include LATEST_BY_OFFSET.

@big-andy-coates
Copy link
Contributor

Hey all. The issues here seem to be only pertaining to versions of KSQL that don't have LATEST_BY_OFFSET or maybe pertaining to using the function on a table source, (which isn't supported).

Hence I've raised a PR to ensure a more helpful error message is returned if the method doesn't exist. Merging that PR will close this issue. Please feel free to raise another issue if anyone feels there specific issue has not been addressed.

@brightneuron
Copy link

brightneuron commented May 5, 2020

Hi @big-andy-coates, thanks for clarifying that this function doesn't work with tables; the documentation I read wasn't clear on this point.
There don't appear to be any other functions which would allow a table column (not in the GROUP BY) to reflect the most recent value only. For example: in an incoming stream of location data for each (keyed) device ID - to be able to reflect the last known device ID location.
Appreciate if you can point me to any Confluent documentation which might explain how to achieve this, because I haven't found anything to date.

@ZachtimusPrime
Copy link

ZachtimusPrime commented May 5, 2020

@big-andy-coates @brightneuron I think my larger problem is LATEST_BY_OFFSET not being in the latest confluentinc/cp-ksql-server image, but it's also problematic if the function cannot be used within the create statement for a table. I saw no mention of that in the documentation. I have a somewhat similar use case to @brightneuron I believe, and to avoid adding the value I'm interested in to the group by clause I have to perform some aggregation on the field. LATEST_BY_OFFSET would solve that for me.

To be clear, the source I would be creating the table from is a stream.

@big-andy-coates
Copy link
Contributor

LATEST_BY_OFFSET is available in 0.9 confluentinc/ksqldb-server image.

It can used in a CREATE TABLE statement where at least one source is a stream. The documentation does mention this, (though it's not super clear). See https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/aggregate-functions/#latest_by_offset, which states:

LATEST_BY_OFFSET(col1)

Stream

Return the latest value for a given column. Latest here is defined as the value in the partition with the greatest offset. Rows that have col1 set to null are ignored.

Notice the Stream on the line after the function name? That means it works on stream sources. Other functions may have Stream, Table.

The reason the function doesn't work on table sources is because table can have data retracted from them, requiring the derived table to undo some previous update. This is not feasible for many aggregate functions, including LATEST_BY_OFFSET, as they would need to maintain a full history of all values ever seen.

@big-andy-coates
Copy link
Contributor

There don't appear to be any other functions which would allow a table column (not in the GROUP BY) to reflect the most recent value only.

What does your source data look like? If each record contains the values you want then you may be able to just import it as a change log topic, i.e. via a CREATE TABLE statement, rather than a CREATE STREAM.

If the topic is not correctly keyed, you may need to first import as a stream, repartition and reimport the sink topic as a table.

@aabrams
Copy link

aabrams commented May 6, 2020

Hi, I'm getting the same error using CREATE TABLE with the source being a stream

big-andy-coates added a commit that referenced this issue May 6, 2020
…5271)

* test: add group by test to ensure useful error on unknown function

fixes: #5114

Added test to ensure the latest version of ksqlDb gives a meaningful error message if an unknown function is called with a non-aggregate select expression.  Previous versions failed with somewhat confusing error e.g.

```
Non-aggregate SELECT expression(s) not part of GROUP BY: [WONT_FIND_ME(ID)]
```

Now fails with:

```
Can't find any functions with the name 'WONT_FIND_ME'
```

Co-authored-by: Andy Coates <big-andy-coates@users.noreply.github.com>
@aabrams
Copy link

aabrams commented May 6, 2020

Example below, should this be giving an error (using 0.8.1)?

CREATE STREAM 600_customer_events_stream WITH (PARTITIONS=6) AS SELECT *
FROM customer_events_raw
WHERE tenant=600 AND customer is not NULL
EMIT CHANGES;

CREATE TABLE 600_customer_events_agg WITH (PARTITIONS=6) AS SELECT
customer AS customer_id
,LATEST_BY_OFFSET(
CASE
WHEN event = 'subscribe_to_notification' AND EXTRACTJSONFIELD(context,'$.category')='water' THEN 'True'
WHEN event = 'unsubscribe_to_notification' AND EXTRACTJSONFIELD(context,'$.category')='water' THEN 'False'
END
) AS water_reminder_subscription
FROM 600_customer_events_stream
WINDOW HOPPING (SIZE 2 DAYS, ADVANCE BY 1 DAY)
GROUP BY customer
EMIT CHANGES;

@brightneuron
Copy link

brightneuron commented May 6, 2020

Hi Andy, my table is based on a stream of incoming flight position events ("incoming_flight_events"). Each event in this stream contains partial flight position data only, represented in columns such as longitude, latitude, ground speed, altitude, etc).

So my hope was to create a table that could summarise each detected flight (contact) with a combination of some aggregate data (e.g. no. of associated events) along with the last known location data.

My table statement looks something like this (example error-inducing statements "commented out"):

CREATE TABLE table_flightcontacts
    WITH ( 
        kafka_topic='table.flightcontacts',
        value_format='AVRO',
        TIMESTAMP='last_messageDateTime'
        )    
    AS SELECT 
        aircraftID,
        COUNT(*) AS messages,
        # LATEST_BY_OFFSET(altitude) AS last_altitude,
        # LATEST_BY_OFFSET(groundSpeed) AS last_groundSpeed,
        MIN(messageGenDateTime) AS first_messageDateTime,
        MAX(messageGenDateTime) AS last_messageDateTime
    FROM incoming_flight_events
    WINDOW SESSION (60 MINUTES)
    GROUP BY aircraftID
    EMIT CHANGES;

Unfortunately if I include the LATEST_BY_OFFSET parameters, the statement fails as above. This seems counter-intuitive, but I'm a relative ksqldb newbie.

@brightneuron
Copy link

Hi @big-andy-coates, shouldn't this issue be reopened?
Based on your comments above, my use case shouldn't be failing.
I'm running ksqldb v5.5.0

@brightneuron
Copy link

Never mind. FYI for others with the same confusion - I clarified via Community Slack that this UDAF is available on KSQL standalone 0.8.1, however this version isn't yet packaged into Confluent Platform 5.5.0. And unfortunately KSQL throws that unhelpful error message rather than advising the UDAF doesn't exist.
UDAF source is here: https://github.com/confluentinc/ksql/blob/master/ksqldb-engine/src/main/java/io/confluent/ksql/function/udaf/latest/LatestByOffset.java

@almazik
Copy link

almazik commented Jun 2, 2020

Hit the same problem. The documentation should clearly state in which version of KSQLDB each function was introduced. And it is NOT clear what version of KSQLDB is packed in Confluent Platform 5.5, either the docker images should be versioned according to source version, or there should be a clear way to retrieve version from the Confluent Control Center.
Spent half a day thinking what I have missed, eventually getting down to the simplest aggregation like

create stream test (id string (KEY), name string) with (KAFKA_TOPIC='test', VALUE_FORMAT='JSON', partitions = 2);

create table test_table as
select id, LATEST_BY_OFFSET(name)
from test
group by id;

just to find out that LATEST_BY_OFFSET is not included in Confluent Platform 5.5.

PLEASE, invest more time into documentation, TODOs in the docs are looking ridiculous.

@jrtm885
Copy link

jrtm885 commented Aug 10, 2020

LATEST_BY_OFFSET aggregate function works well in Confluent Plataform 5.5.1-2.12.
I tested this feature and it worked.

@chuck-confluent
Copy link

I'm getting this error in Confluent Cloud years later, where I know LATEST_BY_OFFSET is supported.

Non-aggregate SELECT expression(s) not part of GROUP BY: CREDIT_CARD_NUMBER, FULL_NAME, EMAIL, TRANSACTION_TIMESTAMP Either add the column(s) to the GROUP BY or remove them from the SELECT.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.