Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bigquery kafka sink connect not accepting boolean values after SMT processing #176

Closed
saumyasuhagiya opened this issue Jan 25, 2022 · 3 comments
Labels
wontfix This will not be worked on

Comments

@saumyasuhagiya
Copy link

saumyasuhagiya commented Jan 25, 2022

Schemaless Bigquery kafka sink connector SMT not able to save data to bigquery on boolean.

MapsUtil.debugPrint on recordValue before returning from apply(R record).

active = true java.lang.String
Schema definition

{
  "mode": "NULLABLE",
  "name": "active",
  "type": "BOOLEAN"
}

Deserialiser

public class BooleanDeserialiser extends JsonDeserializer<Boolean> {

@Override
public Boolean deserialize(JsonParser parser, DeserializationContext context)
        throws IOException {
    return !"0".equals(parser.getText());
}

Serialiser

public class BooleanSerialiser extends JsonSerializer<Boolean> {

@Override
public void serialize(Boolean value, JsonGenerator gen, SerializerProvider serializers)
        throws IOException {
    gen.writeString(value ? "true" : "false");
}

Error

[row index 76]: invalid: Cannot convert string value to boolean: 1
at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.maybeThrowEncounteredErrors(KCBQThreadPoolExecutor.java:108)
at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:233)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
@saumyasuhagiya saumyasuhagiya changed the title Bigquery kafka sink connect not accepting boolean values Bigquery kafka sink connect not accepting boolean values after SMT processing Jan 25, 2022
@C0urante
Copy link

The connector does very little for schemaless data before passing it to the BigQuery client API. It's difficult to tell from this description but it seems like the issue is that the upstream active field is a string but the corresponding downstream column is a boolean.

It's not impossible to implement some more sophisticated logic that first retrieves the schema of the table and then examines the content of each incoming sink record to make sure that it adheres to that schema (and, if it doesn't, tries to modify those contents so that they do), but this would likely incur a performance penalty and be fairly complicated to implement for relatively little benefit.

This should instead be addressed with upstream changes. Some options include:

  • The Cast SMT (docs available from Apache and Confluent)
  • A custom SMT (if the Cast SMT is not sufficient because of, e.g., its lack of support for nested fields)
  • A Kafka Streams or KSQL job that preprocesses data before it is sent to the connector

@saumyasuhagiya If I've misunderstood the issue here let me know, otherwise I'll close as won't fix.

@saumyasuhagiya
Copy link
Author

Thanks for the details. This might be a straightforward thing to do. However, is there any chance we can improve the logging to provide the exact field name in error? @C0urante This might be helpful for debugging issues faster as well.

I should take schema and compare it, however when I used same payload via command bq insert dev.table_name data.json and it worked fine. It had active as string "true".

@C0urante
Copy link

However, is there any chance we can improve the logging to provide the exact field name in error?

I don't think so; we already log the exact error messages that BigQuery gives us in the response from our call to the insert all API. You might consider reaching out to the BigQuery team and seeing if they would consider adding this information to the error messages they send out in the insert all API.

I used same payload via command bq insert dev.table_name data.json and it worked fine. It had active as string "true".

The bq insert docs explain why:

Data types are converted to match the column types of the destination table. This command is intended for testing purposes only. To stream data into BigQuery, use the insertAll API method.

Going to close this as won't fix.

@C0urante C0urante added the wontfix This will not be worked on label Jan 25, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
wontfix This will not be worked on
Projects
None yet
Development

No branches or pull requests

2 participants