Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore additional field in kafka record if not present in BQ table #230

Open
subham611 opened this issue Aug 10, 2022 · 1 comment
Open

Comments

@subham611
Copy link

I have a BQ table with field {"id":"string", "amount":"long"}.
I am passing kafka record as {"id":"1", "amount":1, "extraField":"someField"} and this results in following error:

Task failed with java.lang.NullPointerException error: null (com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor:70)
Exception in thread "pool-5-thread-2" java.lang.NullPointerException
	at com.wepay.kafka.connect.bigquery.convert.BigQuerySchemaConverter.convertSchema(BigQuerySchemaConverter.java:104)
	at com.wepay.kafka.connect.bigquery.convert.BigQuerySchemaConverter.convertSchema(BigQuerySchemaConverter.java:46)
	at com.wepay.kafka.connect.bigquery.SchemaManager.getBigQuerySchema(SchemaManager.java:479)
	at com.wepay.kafka.connect.bigquery.SchemaManager.convertRecordSchema(SchemaManager.java:315)
	at com.wepay.kafka.connect.bigquery.SchemaManager.getAndValidateProposedSchema(SchemaManager.java:286)
	at com.wepay.kafka.connect.bigquery.SchemaManager.getTableInfo(SchemaManager.java:269)
	at com.wepay.kafka.connect.bigquery.SchemaManager.updateSchema(SchemaManager.java:242)
	at com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter.attemptSchemaUpdate(AdaptiveBigQueryWriter.java:159)
	at com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter.performWriteRequest(AdaptiveBigQueryWriter.java:110)
	at com.wepay.kafka.connect.bigquery.write.row.BigQueryWriter.writeRows(BigQueryWriter.java:118)
	at com.wepay.kafka.connect.bigquery.write.batch.TableWriter.run(TableWriter.java:96)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
[2022-08-05 05:22:34,130] ERROR [client_event_receiver_bq_connector|task-0] Task failed with java.lang.NullPointerException error: null (com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor:70)
Exception in thread "pool-5-thread-1" java.lang.NullPointerException
	at com.wepay.kafka.connect.bigquery.convert.BigQuerySchemaConverter.convertSchema(BigQuerySchemaConverter.java:104)
	at com.wepay.kafka.connect.bigquery.convert.BigQuerySchemaConverter.convertSchema(BigQuerySchemaConverter.java:46)
	at com.wepay.kafka.connect.bigquery.SchemaManager.getBigQuerySchema(SchemaManager.java:479)
	at com.wepay.kafka.connect.bigquery.SchemaManager.convertRecordSchema(SchemaManager.java:315)
	at com.wepay.kafka.connect.bigquery.SchemaManager.getAndValidateProposedSchema(SchemaManager.java:286)
	at com.wepay.kafka.connect.bigquery.SchemaManager.getTableInfo(SchemaManager.java:269)
	at com.wepay.kafka.connect.bigquery.SchemaManager.updateSchema(SchemaManager.java:242)
	at com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter.attemptSchemaUpdate(AdaptiveBigQueryWriter.java:159)
	at com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter.performWriteRequest(AdaptiveBigQueryWriter.java:110)
	at com.wepay.kafka.connect.bigquery.write.row.BigQueryWriter.writeRows(BigQueryWriter.java:118)
	at com.wepay.kafka.connect.bigquery.write.batch.TableWriter.run(TableWriter.java:96)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

My connector config is as below:

{
    "connector.class"              : "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
    "tasks.max"                       : "1",
    "topics"                              : "kafka_bq_receiver-1",
    "key.converter"                   : "org.apache.kafka.connect.json.JsonConverter",
    "value.converter"                 : "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable"  : "false",
    "key.converter.schemas.enable"    : "false",
    "errors.deadletterqueue.topic.name" : "kafka-dq-sink",
    "errors.log.include.messages" : "true",
    "sanitizeTopics" : "true",
    "autoCreateTables" : "false",
    "autoUpdateSchemas" : "false",
    "schemaRetriever" : "com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever",
    "defaultDataset" : "df",
    "allowNewBigQueryFields" : "true",
    "allowBigQueryRequiredFieldRelaxation": "true",
    "keyfile" : "key.json",
    "bigQueryPartitionDecorator": "false",
    "timestamp": "UTC",
    "transforms" : "setTopic",
    "transforms.setTopic.type": "io.confluent.connect.transforms.ExtractTopic$Value",
    "transforms.setTopic.field": "id"
  }

Is there any way to ignore this additional field in kafka record instead of generating error?

@subham611
Copy link
Author

subham611 commented Aug 10, 2022

protected InsertAllRequest createInsertAllRequest(PartitionedTableId tableId,
                                                  Collection<InsertAllRequest.RowToInsert> rows) {
  return InsertAllRequest.newBuilder(tableId.getFullTableId(), rows)
      .setIgnoreUnknownValues(false)
      .setSkipInvalidRows(false)
      .build();
}

here setIgnoreUnknownValues is marked as false. Can we make it true or create variable which can be set be user?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant