Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UDFs that returns non-optional data types silently fail in persistent queries #5364

Closed
MichaelDrogalis opened this issue May 14, 2020 · 9 comments · Fixed by #6084
Closed
Assignees
Labels
bug P0 Denotes must-have for a given milestone user-defined-functions Tickets about UDF, UDAF, UDTF
Milestone

Comments

@MichaelDrogalis
Copy link
Contributor

MichaelDrogalis commented May 14, 2020

Describe the bug

When a custom UDTF returns an Avro schema whose data types are non-optional, push queries are able to work with the results, but persistent queries silently swallow the output.

To Reproduce

Confluent Platform 5.5.0, ksqlD 0.9.0.

docker-compose.yml for the components:

---
version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.5.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-enterprise-kafka:5.5.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

  schema-registry:
    image: confluentinc/cp-schema-registry:5.5.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'

  ksqldb-server:
    image: confluentinc/ksqldb-server:0.9.0
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8088:8088"
    volumes:
      - "./confluent-hub-components/:/usr/share/kafka/plugins/"
      - ./target:/etc/ksqldb/ext
    environment:
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_BOOTSTRAP_SERVERS: "broker:9092"
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
      KSQL_KSQL_EXTENSION_DIR: "/etc/ksqldb/ext/"
      # Configuration to embed Kafka Connect support.
      KSQL_CONNECT_GROUP_ID: "ksql-connect-cluster"
      KSQL_CONNECT_BOOTSTRAP_SERVERS: "broker:9092"
      KSQL_CONNECT_KEY_CONVERTER: "io.confluent.connect.avro.AvroConverter"
      KSQL_CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
      KSQL_CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
      KSQL_CONNECT_CONFIG_STORAGE_TOPIC: "ksql-connect-configs"
      KSQL_CONNECT_OFFSET_STORAGE_TOPIC: "ksql-connect-offsets"
      KSQL_CONNECT_STATUS_STORAGE_TOPIC: "ksql-connect-statuses"
      KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_PLUGIN_PATH: "/usr/share/kafka/plugins"

  ksqldb-cli:
    image: confluentinc/ksqldb-cli:0.9.0
    container_name: ksqldb-cli
    depends_on:
      - broker
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

pom.xml for the UDTF:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.estruxture</groupId>
    <artifactId>goldberg</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!-- Specify the repository for Confluent dependencies -->
    <repositories>
        <repository>
            <id>confluent</id>
            <url>http://packages.confluent.io/maven/</url>
        </repository>
    </repositories>

    <!-- Specify build properties -->
    <properties>
        <exec.mainClass>com.estruxture.goldberg.Cal1SplitPanel</exec.mainClass>
        <java.version>1.8</java.version>
        <kafka.version>5.5.0-ccs</kafka.version>
        <kafka.scala.version>2.12</kafka.scala.version>
        <scala.version>${kafka.scala.version}.8</scala.version>
        <confluent.version>5.5.0</confluent.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <!-- Specify the ksqldb-udf dependency -->
    <dependencies>
        <dependency>
            <groupId>io.confluent.ksql</groupId>
            <artifactId>ksqldb-udf</artifactId>
            <version>5.5.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect-api</artifactId>
            <version>2.5.0</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>

    <!-- Build boilerplate -->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                    <encoding>UTF-8</encoding>
                    <compilerArgs>
                        <arg>-parameters</arg>
                    </compilerArgs>
                </configuration>
            </plugin>

            <!-- Package all dependencies as one jar -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.5.2</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <mainClass>${exec.mainClass}</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>assemble-all</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Create the file src/main/java/io/confluent/developer/TestUdf.java. Note the Schema.INT32_SCHEMA, which should be Schema.OPTIONAL_INT32_SCHEMA to work with ksqlDB:

package io.confluent.developer;

import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.function.udtf.Udtf;
import io.confluent.ksql.function.udtf.UdtfDescription;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

@UdtfDescription(name="ksql_struct_test", description="Test function for debugging.")
public class TestUdf {

   public static final String OUTPUT_STRUCT_DESCRIPTOR = "STRUCT<" +
           "TEST1 INT," +
           "TEST2 INT" +
           ">";

   public static final Schema OUTPUT_SCHEMA = SchemaBuilder.struct().optional()
           .field("TEST1", Schema.INT32_SCHEMA)
           .field("TEST2", Schema.INT32_SCHEMA)
           .build();

   @Udtf(description = "Test function for debugging", schema = OUTPUT_STRUCT_DESCRIPTOR)
   public List<Struct> ksql_struct_test(@UdfParameter final int myValue) {

       // Here, we are testing the output of a function in a structure format.
       // We are simulating the split of one Kafka message into multiple, separate messages
       // each with a potentially complex structure.

       List<Struct> myList = new ArrayList<>();

       myList.add(new Struct(OUTPUT_SCHEMA)
               .put("TEST1", 1)
               .put("TEST2", 2)
       );

       return myList;
   }

}

Build the jar:

mvn clean compile assembly:single

Start Docker Compose:

docker-compose up

Start the CLI:

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

And run:

set 'auto.offset.reset' = 'earliest';

-- Any stream will do to reveal the bug.
create stream s1 (x varchar) with (kafka_topic = 's1', partitions = 1, value_format = 'avro');

insert into s1 (x) values ('a');

Now, use the UDTF in a push query:

SELECT ksql_struct_test(1) AS mytest FROM s1 EMIT CHANGES;

Which yields:

+----------------------------------------------------------------------------------------------------------------+
|MYTEST                                                                                                          |
+----------------------------------------------------------------------------------------------------------------+
|{TEST1=1, TEST2=2}                                                                                              |
^CQuery terminated

Now turn that statement into a persistent query:

CREATE STREAM s2 AS SELECT ksql_struct_test(1) AS mytest FROM s1 EMIT CHANGES; 

And query:

select * from s2 emit changes;
+-------------------------------------------------------+-------------------------------------------------------+
|ROWKEY                                                 |MYTEST                                                 |
+-------------------------------------------------------+-------------------------------------------------------+

This one hangs and returns nothing.

Expected behavior

The last select should either return the same data as the push query did, or both should emit some kind of error about bad data types. Nothing is visible in the logs or processing log.

@MichaelDrogalis MichaelDrogalis added this to the 0.10.0 milestone May 14, 2020
@MichaelDrogalis MichaelDrogalis changed the title UDTF that returns a struct fails in persistent query UDFs that returns non-optional data types silently fail in persistent queries May 14, 2020
@vpapavas
Copy link
Member

Hey @MichaelDrogalis! I looked into this with current master.
I followed your example but without docker. I just created a UDF in our code like so

@UdfDescription(name = "AS_STRUCT", description = "Construct a list based on some inputs")
public class AsStruct {

  public static final String OUTPUT_STRUCT_DESCRIPTOR =
      "STRUCT<"
      + "TEST1 INT,"
      + "TEST2 INT"
      + ">";

  public static final Schema OUTPUT_SCHEMA = SchemaBuilder.struct().optional()
      .field("TEST1", Schema.INT32_SCHEMA)
      .field("TEST2", Schema.INT32_SCHEMA)
      .build();

  @Udf(description = "Test function for debugging", schema = OUTPUT_STRUCT_DESCRIPTOR)
  public Struct ksql_struct_test(@UdfParameter final int myValue) {
    return new Struct(OUTPUT_SCHEMA).put("TEST1", 1).put("TEST2", 2);
  }
}

and then created stream S1:

create stream s1 (x varchar) with (kafka_topic = 's1', partitions = 1, value_format = 'json');

and inserted a row into it

insert into s1 (x) values ('a');

Then, I created stream S2:

CREATE STREAM S2 WITH (KAFKA_TOPIC='S2', PARTITIONS=1, REPLICAS=1) AS SELECT   S1.ROWKEY ROWKEY,   AS_STRUCT(1) MYTEST FROM S1 S1 EMIT CHANGES;

And print the output topic of CSAS

print S2 from beginning;

which returned nothing. There was also no error or exception in the logs.

But, then I inserted another row into stream S1 like so

insert into s1 (x) values ('a')

and that caused the exception in the logs

[2020-05-27 17:16:48,563] ERROR Unhandled exception caught in streams thread _confluent-ksql-default_query_CSAS_S2_0-90d42a04-15db-402f-92b5-565c29d7f036-StreamThread-1. (io.confluent.ksql.util.QueryMetadata:134)
org.apache.kafka.streams.errors.StreamsException: Error encountered sending record to topic S2 for task 0_0 due to:
org.apache.kafka.connect.errors.DataException: Struct schemas do not match.
	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:166)
	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:128)
	at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:88)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:225)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:205)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:157)
	at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:56)
	at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
	at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809)
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:225)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:205)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:157)
	at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:56)
	at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
	at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809)
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:225)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:205)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:157)
	at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:100)
	at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$2(StreamTask.java:590)
	at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809)
	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:590)
	at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:851)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:694)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:509)
Caused by: org.apache.kafka.connect.errors.DataException: Struct schemas do not match.
	at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:247)
	at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
	at io.confluent.ksql.serde.GenericRowSerDe$GenericRowSerializer.serialize(GenericRowSerDe.java:282)
	at io.confluent.ksql.serde.GenericRowSerDe$GenericRowSerializer.serialize(GenericRowSerDe.java:251)
	at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:146)
	... 27 more

However, no error is returned to the CLI. Moreover, if you try

select * from s2 emit changes;

again, there is no error in the CLI.

@derekjn derekjn modified the milestones: 0.10.0, 0.11.0 Jun 12, 2020
@satamrajuveeresh
Copy link

I am facing the similar issue while trying to write the data to a stream.
Infra: Docker 5.5 -confluent

code.txt
When this code is deployed, select statement is working but when trying to create CSAS or CTAS this query is not working.
IN similar fashion, while writing a UDAF - similar to Collectlist. that accepts struct as input and produces a list of struct.
I am facing an error like :
image
Source code for the UDAF is
code1.txt
please let us know when this bug could be fixed.
++ @MichaelDrogalis please provide your comments on this one

@big-andy-coates big-andy-coates added the user-defined-functions Tickets about UDF, UDAF, UDTF label Jun 19, 2020
@agavra agavra added P0 Denotes must-have for a given milestone and removed needs-triage labels Jun 23, 2020
@big-andy-coates
Copy link
Contributor

big-andy-coates commented Jun 23, 2020

The strategic fix here is to move away from using the Connect Struct type in our UDF framework, as it has a different type system to ksqlDB, and switch to a type we control and which matches the type system of ksqlDB.

This is certainly something we need to do before going v1.0.

There may be a quicker fix/hack we can put in to work around this issue for now. Or maybe we should just bite the bullet and fix the source of the issue.

Somewhat related to #3624

@big-andy-coates
Copy link
Contributor

Strategic fix to be covered under #3413. This ticket will be used for the short term (i.e. 3 years), fix.

@vcrfxia
Copy link
Contributor

vcrfxia commented Aug 19, 2020

Had a look at this today. Current state of affairs (on 0.11.0) is as follows:

  • The reason push queries succeed but persistent queries fail when using UD(*)Fs with non-optional return types is because the error is encountered during serialization when producing to the output topic (stack trace is shown in Vicky's comment above).
  • The reason there's a serialization exception is because the Struct schema created by ksqlDB has optional field types, whereas the Struct object returned by the UDF has non-optional field types. These two schemas are considered incompatible, so this call in the serializer (for creating the row to be serialized) fails as a result:
    struct.put(schema.fields().get(i), data.get(i));
  • This results in an error with message org.apache.kafka.streams.errors.StreamsException: Error encountered sending record to topic <topic_name> for task 0_0 due to: org.apache.kafka.connect.errors.DataException: Struct schemas do not match. being logged in the server logs, and no messages are produced to the sink topic. This explains why subsequent push queries on the sink return no data (and no error).
  • The reason this error is not logged in the processing log (in contrast to analogous deserialization errors, which are) is because only ksqlDB's deserializer is integrated with the processing logger, and not ksqlDB's serializer.

In terms of what we can do to make this better:

  • We can't reject the UD(*)F upfront (while loading UDFs) since there's no way for ksqlDB to know what the actual Struct schema returned by the UDF without invoking the UDF method, which doesn't seem great. The best we could do is invoke the method with a few input values as a smoke test of sorts, but this feels incomplete in terms of addressing the underlying issue.
  • We could integrate the ksqlDB serializer with the processing logger so these error messages show up in the processing log. This makes sense to do anyway, but it's unclear how useful this is for the specific issue at hand (non-optional return types in UDFs) since the error message Struct schemas do not match. isn't great. To improve the error message, we could catch the error in the ksqlDB serializer and check the Struct schema for non-optional types and show a more specific error message if found. A bit of a hack but could be worthwhile depending on the prevalence of users getting tripped up by this. This fix would be relatively quick (a couple days).
  • A better, long-term fix (as Andy suggested) is to switch away from the Connect Struct entirely, and instead introduce a KsqlStruct type for use in constructing return types in UDFs. We'd like to do this eventually anyway and would address the confusion in this issue along the way, but this would be an investment of at least a week (potentially more -- can refine estimate as needed).

@MichaelDrogalis do you have thoughts on what sort of fix we'd like to pursue? I don't have a sense of how prevalent this particular issue is and what the priority is as a result.

Also note that there is a workaround for this in ksqlDB today: specific optional return types in the Struct, rather than non-optional return types (e.g., Schema.OPTIONAL_INT32_SCHEMA rather than Schema.INT32_SCHEMA) cc @satamrajuveeresh who suggested perhaps being blocked on this?

@MichaelDrogalis
Copy link
Contributor Author

Thanks for the breakdown @vcrfxia. It seems like if we want to fix this, we just need to bite the bullet and do it properly. We can kick this out a little further, but we should probably queue it up soon. The issue with the workaround is that its almost impossible to discover. All you get is a hanging query with no feedback, so there's no way for you to know what the problem is, much less what to do about it.

@vcrfxia
Copy link
Contributor

vcrfxia commented Aug 20, 2020

The issue with the workaround is that its almost impossible to discover. All you get is a hanging query with no feedback, so there's no way for you to know what the problem is, much less what to do about it.

Needing to check the processing log to discover processing errors is common across many different types of errors in ksqlDB: inability to deserialize source records, various types of production errors (e.g., record size too large), etc. In my mind if a query isn't producing the expected results, the processing log is a natural first place to look, so I'm surprised by the statement that this is "almost impossible to discover." Are you suggesting we should think about replacing the processing log as a first-step in debugging?

I agree, though, that figuring out what to about the error (post-discovery) is pretty rough, unless we add the additional hack of checking for non-optional schema types upon hitting schema incompatibility, in order to improve the error message.

@MichaelDrogalis
Copy link
Contributor Author

My comment extends from the fact that this error wasn't showing up in the processing log. :) If that's fixed, then we're in a better place.

@vcrfxia
Copy link
Contributor

vcrfxia commented Aug 24, 2020

Opened a PR to add serialization exceptions to the processing log, and throw a custom error message calling out the possibility of serialization exceptions (for struct fields) being caused by non-optional schemas from UDFs: #6084

The long-term fix of switching away from Connect types to remove the possibility of encountering this error will be tracked in #4961 instead.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug P0 Denotes must-have for a given milestone user-defined-functions Tickets about UDF, UDAF, UDTF
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants