Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Help with Reading Kafka topic written using Debezium Connector - Deltastreamer #2149

Closed
ashishmgofficial opened this issue Oct 6, 2020 · 50 comments

Comments

@ashishmgofficial
Copy link

ashishmgofficial commented Oct 6, 2020

Hi Team,

Im facing this use case where I need to ingest data from kafka topic usinf Deltastreamer which is loaded using Debezium connector. So the topic contains schema which contains fields like before, after, ts_ms, op, source etc. Im providing record key as after.id and precombine key with after.timestamp but still the entire debezium output is being ingested.

Please find my properties

hoodie.upsert.shuffle.parallelism=2
 hoodie.insert.shuffle.parallelism=2
 hoodie.delete.shuffle.parallelism=2
 hoodie.bulkinsert.shuffle.parallelism=2
 hoodie.embed.timeline.server=true
 hoodie.filesystem.view.type=EMBEDDED_KV_STORE
 hoodie.compact.inline=false
# Key fields, for kafka example
hoodie.datasource.write.recordkey.field=after.inc_id
hoodie.datasource.write.partitionpath.field=date
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator
# Schema provider props (change to absolute path based on your installation)
#hoodie.deltastreamer.schemaprovider.source.schema.file=/var/demo/config/schema.avsc
#hoodie.deltastreamer.schemaprovider.target.schema.file=/var/demo/config/schema.avsc
# Kafka Source
hoodie.deltastreamer.source.kafka.topic=airflow.public.motor_crash_violation_incidents
#Kafka props
bootstrap.servers=http://xxxxx:29092
auto.offset.reset=earliest
hoodie.deltastreamer.schemaprovider.registry.url=http://xxxxx:8081/subjects/airflow.public.motor_crash_violation_incidents-value/versions/latest
#hoodie.deltastreamer.schemaprovider.registry.targetUrl=http://xxxxx:8081/subjects/airflow.public.motor_crash_violation_incidents-value/versions/latest
schema.registry.url=http://xxxxx:8081
validate.non.null = false
@bvaradar
Copy link
Contributor

bvaradar commented Oct 6, 2020

@ashishmgofficial : You need to plugin a transformer class to only select the columns you need and record-payload to handle deletions. We are currently in the process of adding the transformer to OSS Hudi but broadly here is how it will look like
(thanks to @joshk-kang).

gist :

package org.apache.hudi.utilities.transform;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class DebeziumTransformer implements Transformer {

  @Override
  public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset,
      TypedProperties properties) {

    Dataset<Row> insertedOrUpdatedData = rowDataset
        .select("op", "ts_ms", "after.*")
        .withColumnRenamed("op", "_op")
        .withColumnRenamed("ts_ms", "_ts_ms")
        .filter(rowDataset.col("op").notEqual("d"));

    Dataset<Row> deletedData = rowDataset
        .select("op", "ts_ms", "before.*")
        .withColumnRenamed("op", "_op")
        .withColumnRenamed("ts_ms", "_ts_ms")
        .filter(rowDataset.col("op").equalTo("d"));

    Dataset<Row> transformedData = insertedOrUpdatedData.union(deletedData);

    return transformedData;
  }
}
public class DebeziumAvroPayload extends OverwriteWithLatestAvroPayload {

  // Field is prefixed with a underscore by transformer to indicate metadata field
  public static final String OP_FIELD = "_op";
  public static final String DELETE_OP = "d";

  public DebeziumAvroPayload(GenericRecord record, Comparable orderingVal) {
    super(record, orderingVal);
  }

  public DebeziumAvroPayload(Option<GenericRecord> record) {
    this(record.get(), (record1) -> 0); // natural order
  }

  @Override
  protected boolean isDeleteRecord(GenericRecord genericRecord) {
    return genericRecord.get(OP_FIELD) != null && genericRecord.get(OP_FIELD).toString().equalsIgnoreCase(
        DELETE_OP);
  }
}

@ashishmgofficial
Copy link
Author

@bvaradar So in this case we should be giving updated schema file for the target ?

@bvaradar
Copy link
Contributor

bvaradar commented Oct 6, 2020

@ashishmgofficial : Yes, you are correct. You could create custom SchemaProvider that inherits from say Confluent Schema Registry based schema provider. Please see below for an example implementation.

public class DebeziumSchemaRegistryProvider extends SchemaRegistryProvider {

  public DebeziumSchemaRegistryProvider(TypedProperties props,
      JavaSparkContext jssc) {
    super(props, jssc);
  }

  /**
   * Debezium target schema is a nested structure with many metadata fields. This will
   * flatten the schema structure and only require necessary metadata information
   * @return
   */
  @Override
  public Schema getTargetSchema() {
    Schema registrySchema = super.getTargetSchema();

    Field dataField = registrySchema.getField("after");
    Field tsField = registrySchema.getField("ts_ms");
    Field opField = registrySchema.getField("op");

    // Initialize with metadata columns
    FieldAssembler<Schema> payloadFieldAssembler = SchemaBuilder.builder()
        .record("formatted_debezium_payload")
        .fields()
        .name("_" + tsField.name()).type(tsField.schema()).withDefault(null)
        .name("_" + opField.name()).type(tsField.schema()).withDefault(null);

    // Add data columns to schema
    dataField.schema()
        .getTypes()
        // "after" field is a union with data schema and null schema, so we need to extract only the data schema portion
        .get(dataField.schema().getIndexNamed(registrySchema.getNamespace() + ".Value"))
        .getFields()
        .forEach(field -> {
          payloadFieldAssembler.name(field.name()).type(field.schema()).withDefault(null);
        });

    return payloadFieldAssembler.endRecord();
  }
}

@ashishmgofficial
Copy link
Author

ashishmgofficial commented Oct 7, 2020

@bvaradar : Thanks for the code . I followed your instructions but tried to add _is_hoodie_deleted column to the dataset using following code for testing

Im getting the following error with the code mentioned in the post

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2043)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2031)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2030)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2030)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:967)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2264)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2213)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2202)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:778)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
        at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1409)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
        at org.apache.spark.rdd.RDD.take(RDD.scala:1382)
        at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1517)
        at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1517)
        at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1517)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
        at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1516)
        at org.apache.spark.api.java.JavaRDDLike$class.isEmpty(JavaRDDLike.scala:544)
        at org.apache.spark.api.java.AbstractJavaRDDLike.isEmpty(JavaRDDLike.scala:45)
        at org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:349)
        at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:238)
        at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:163)
        at org.apache.hudi.common.util.Option.ifPresent(Option.java:96)
        at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:161)
        at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:466)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.RuntimeException: Error while decoding: java.lang.NegativeArraySizeException
createexternalrow(input[0, bigint, true], input[1, string, false].toString, input[2, string, false].toString, input[3, int, false], input[4, int, true], input[5, string, true].toString, input[6, string, true].toString, input[7, int, true], input[8, string, true].toString, input[9, bigint, false], StructField(_ts_ms,LongType,true), StructField(_op,StringType,false), StructField(_hoodie_is_deleted,StringType,false), StructField(inc_id,IntegerType,false), StructField(year,IntegerType,true), StructField(violation_desc,StringType,true), StructField(violation_code,StringType,true), StructField(case_individual_id,IntegerType,true), StructField(flag,StringType,true), StructField(last_modified_ts,LongType,false))
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:305)
        at org.apache.hudi.AvroConversionUtils$$anonfun$1.apply(AvroConversionUtils.scala:44)
        at org.apache.hudi.AvroConversionUtils$$anonfun$1.apply(AvroConversionUtils.scala:44)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:394)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
        at scala.collection.AbstractIterator.to(Iterator.scala:1334)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
        at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$31.apply(RDD.scala:1409)
        at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$31.apply(RDD.scala:1409)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NegativeArraySizeException
        at org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:297)
        at org.apache.spark.unsafe.types.UTF8String.toString(UTF8String.java:1226)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.createExternalRow_0_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown Source)
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:302)
        ... 28 more

Transformer

  private static final Logger LOG = LogManager.getLogger(DebeziumCustomTransformer.class);
  @Override
  public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset,
      TypedProperties properties) {
    
    Dataset<Row> insertedOrUpdatedData = rowDataset
        .select("op", "ts_ms", "after.*")
        .withColumnRenamed("op", "_op")
        .withColumn("_is_hoodie_deleted",lit("false"))
        .withColumnRenamed("ts_ms", "_ts_ms")
        .filter(rowDataset.col("op").notEqual("d"));
    
    Dataset<Row> deletedData = rowDataset
        .select("op", "ts_ms", "before.*")
        .withColumnRenamed("op", "_op")
        .withColumn("_is_hoodie_deleted",lit("true"))
        .withColumnRenamed("ts_ms", "_ts_ms")
        .filter(rowDataset.col("op").equalTo("d"));
    
    Dataset<Row> transformedData = insertedOrUpdatedData.union(deletedData);
    return transformedData;
  }
}

SchemaProviderDebezium

public class DebeziumRegistryProvider extends SchemaRegistryProvider {  
  public DebeziumRegistryProvider(TypedProperties props,
      JavaSparkContext jssc) {
    super(props, jssc);
  }
  
  /**
   * Debezium target schema is a nested structure with many metadata fields. This will
   * flatten the schema structure and only require necessary metadata information
   * @return
   */
  @Override
  public Schema getTargetSchema() {
    Schema registrySchema = super.getTargetSchema();
  
    Field dataField = registrySchema.getField("after");
    Field tsField = registrySchema.getField("ts_ms");
    Field opField = registrySchema.getField("op");
    Field hoodieDeleteField = registrySchema.getField("op");
  
    // Initialize with metadata columns
    FieldAssembler<Schema> payloadFieldAssembler = SchemaBuilder.builder()
        .record("formatted_debezium_payload")
        .fields()
        .name("_" + tsField.name()).type(tsField.schema()).withDefault(null)
        .name("_" + opField.name()).type(opField.schema()).withDefault(null)
        .name("_hoodie_is_deleted").type(hoodieDeleteField.schema()).withDefault(null);
  
    // Add data columns to schema
    dataField.schema()
        .getTypes()
        // "after" field is a union with data schema and null schema, so we need to extract only the data schema portion
        .get(dataField.schema().getIndexNamed(registrySchema.getNamespace() + ".Value"))
        .getFields()
        .forEach(field -> {
          payloadFieldAssembler.name(field.name()).type(field.schema()).withDefault(null);
        });
    return payloadFieldAssembler.endRecord();
  }
}

@bvaradar
Copy link
Contributor

bvaradar commented Oct 7, 2020

@ashishmgofficial : You dont need _hoodie_is_deleted if you are using the custom transformer.

@ashishmgofficial
Copy link
Author

@bvaradar I changed the code to as previous and ran the deltastreamer . But some reason is causing error and data is getting rolled back :

20/10/07 18:34:18 ERROR Client: Application diagnostics message: User class threw exception: org.apache.hudi.exception.HoodieException: Commit 20201007183359 failed and rolled-back !
        at org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:449)
        at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:249)
        at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:163)
        at org.apache.hudi.common.util.Option.ifPresent(Option.java:96)
        at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:161)
        at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:466)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:685)

Exception in thread "main" org.apache.spark.SparkException: Application application_1601158208025_9743 finished with failed status
        at org.apache.spark.deploy.yarn.Client.run(Client.scala:1149)
        at org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1526)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

@ashishmgofficial
Copy link
Author

hudi-kafka

@ashishmgofficial
Copy link
Author

ashishmgofficial commented Oct 7, 2020

I found this Error message in the logs :

 INFO DAGScheduler: Job 11 finished: sum at DeltaSync.java:406, took 0.108824 s
20/10/07 20:27:07 ERROR DeltaSync: Delta Sync found errors when writing. Errors/Total=16/16
20/10/07 20:27:07 ERROR DeltaSync: Printing out the top 100 errors

Im able to see the transformed dataset properly when i sysout the dataset from transformer :

+---+-------------+------+----+-----------------------------------+--------------+------------------+----+----------------+
|_op|       _ts_ms|inc_id|year|                     violation_desc|violation_code|case_individual_id|flag|last_modified_ts|
+---+-------------+------+----+-----------------------------------+--------------+------------------+----+----------------+
|  r|1601531771650|     1|2016|          DRIVING WHILE INTOXICATED|         11923|          17475366|   I|1600945380000000|
|  r|1601531771650|     3|2016|AGGRAVATED UNLIC OPER 2ND/PREV CONV|        5112A1|          17475367|   U|1600959600000000|
|  r|1601531771650|     4|2019|     AGGRAVATED UNLIC OPER 2ND/PREV|        5112A2|          17475368|   I|1569337200000000|
|  r|1601531771651|     5|2018| UNREASONABLE SPEED/SPECIAL HAZARDS|         1180E|          17475569|   I|1601374200000000|
|  r|1601531771651|     7|2018| UNREASONABLE SPEED/SPECIAL HAZARDS|         1180E|          17475571|   I|1601375400000000|
|  r|1601531771651|     2|2020| UNREASONABLE SPEED/SPECIAL HAZARDS|         2180F|          17475569|   U|1601377200000000|
|  r|1601531771651|     6|2018| UNREASONABLE SPEED/SPECIAL HAZARDS|         2180F|          17475570|   I|1601374800000000|
|  r|1601531771651|     8|2020| UNREASONABLE SPEED/SPECIAL HAZARDS|         1180E|          17475572|   I|1601376600000000|
|  r|1601531771652|     9|2020| UNREASONABLE SPEED/SPECIAL HAZARDS|         1180E|          17475573|   I|1601377200000000|
|  r|1601531771652|    10|2020| UNREASONABLE SPEED/SPECIAL HAZARDS|         1180D|          17475574|   I|1601377800000000|
|  r|1601531771652|    11|2020| UNREASONABLE SPEED/SPECIAL HAZARDS|         1180D|          17475574|   I|1601381400000000|
|  r|1601531771652|    12|2020| UNREASONABLE SPEED/SPECIAL HAZARDS|         1180E|          17475574|   I|1601385000000000|
|  r|1601531771652|    13|2020| UNREASONABLE SPEED/SPECIAL HAZARDS|         1180E|          17475574|   I|1601388600000000|
|  r|1601531771653|    34|2020| UNREASONABLE SPEED/SPECIAL HAZARDS|         1180E|          17475574|   I|1601392200000000|
|  c|1601531913459|    35|2020| UNREASONABLE SPEED/SPECIAL HAZARDS|         1180E|          17475574|   I|1601395800000000|
|  r|1601532941011|     1|2016|          DRIVING WHILE INTOXICATED|         11923|          17475366|   I|1600945380000000|
|  r|1601532941011|     3|2016|AGGRAVATED UNLIC OPER 2ND/PREV CONV|        5112A1|          17475367|   U|1600959600000000|
|  r|1601532941012|     4|2019|     AGGRAVATED UNLIC OPER 2ND/PREV|        5112A2|          17475368|   I|1569337200000000|
|  r|1601532941012|     5|2018| UNREASONABLE SPEED/SPECIAL HAZARDS|         1180E|          17475569|   I|1601374200000000|
|  r|1601532941012|     7|2018| UNREASONABLE SPEED/SPECIAL HAZARDS|         1180E|          17475571|   I|1601375400000000|
+---+-------------+------+----+-----------------------------------+--------------+------------------+----+----------------+

@bvaradar
Copy link
Contributor

bvaradar commented Oct 8, 2020

@ashishmgofficial : The exception you pasted is not the real root-cause. You should see the root-cause exceptions in the executor logs as well. It would be easy to debug if we know the root-cause. Can you find the exceptions and paste it here ?

But from the logs you pasted, It should be coming from the payload class.

@ashishmgofficial
Copy link
Author

@bvaradar Please find below the logs

log.txt

@bvaradar
Copy link
Contributor

bvaradar commented Oct 8, 2020

@ashishmgofficial : the one you pasted is only driver side logs. Do you have executor logs ? If you have spark history server setup, you can look at the tasks sections in the failed stage to look at exception. You can also simply collect all the logs (executor + driver) and attach them (e:g : yarn logs -applicationId )

@ashishmgofficial
Copy link
Author

@bvaradar My bad...Im attaching the logs
yarn-logs.txt

@bvaradar
Copy link
Contributor

bvaradar commented Oct 9, 2020

@ashishmgofficial : This looks like schema mismatch issue. There might be a bug in the schema provider implementation that I pasted. Can you also attach the schema from schema registry to help debug (I guess you are using http://xxxxx:8081/subjects/airflow.public.motor_crash_violation_incidents-value/versions/latest) ?

@ashishmgofficial
Copy link
Author

ashishmgofficial commented Oct 9, 2020

@bvaradar Yes Im using the above mentioned url for schema


{
  "connect.name": "airflow.public.motor_crash_violation_incidents.Envelope",
  "fields": [
    {
      "default": null,
      "name": "before",
      "type": [
        "null",
        {
          "connect.name": "airflow.public.motor_crash_violation_incidents.Value",
          "fields": [
            {
              "name": "inc_id",
              "type": "int"
            },
            {
              "default": null,
              "name": "year",
              "type": [
                "null",
                "int"
              ]
            },
            {
              "default": null,
              "name": "violation_desc",
              "type": [
                "null",
                "string"
              ]
            },
            {
              "default": null,
              "name": "violation_code",
              "type": [
                "null",
                "string"
              ]
            },
            {
              "default": null,
              "name": "case_individual_id",
              "type": [
                "null",
                "int"
              ]
            },
            {
              "default": null,
              "name": "flag",
              "type": [
                "null",
                "string"
              ]
            },
            {
              "name": "last_modified_ts",
              "type": {
                "connect.name": "io.debezium.time.MicroTimestamp",
                "connect.version": 1,
                "type": "long"
              }
            }
          ],
          "name": "Value",
          "type": "record"
        }
      ]
    },
    {
      "default": null,
      "name": "after",
      "type": [
        "null",
        "Value"
      ]
    },
    {
      "name": "source",
      "type": {
        "connect.name": "io.debezium.connector.postgresql.Source",
        "fields": [
          {
            "name": "version",
            "type": "string"
          },
          {
            "name": "connector",
            "type": "string"
          },
          {
            "name": "name",
            "type": "string"
          },
          {
            "name": "ts_ms",
            "type": "long"
          },
          {
            "default": "false",
            "name": "snapshot",
            "type": [
              {
                "connect.default": "false",
                "connect.name": "io.debezium.data.Enum",
                "connect.parameters": {
                  "allowed": "true,last,false"
                },
                "connect.version": 1,
                "type": "string"
              },
              "null"
            ]
          },
          {
            "name": "db",
            "type": "string"
          },
          {
            "name": "schema",
            "type": "string"
          },
          {
            "name": "table",
            "type": "string"
          },
          {
            "default": null,
            "name": "txId",
            "type": [
              "null",
              "long"
            ]
          },
          {
            "default": null,
            "name": "lsn",
            "type": [
              "null",
              "long"
            ]
          },
          {
            "default": null,
            "name": "xmin",
            "type": [
              "null",
              "long"
            ]
          }
        ],
        "name": "Source",
        "namespace": "io.debezium.connector.postgresql",
        "type": "record"
      }
    },
    {
      "name": "op",
      "type": "string"
    },
    {
      "default": null,
      "name": "ts_ms",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "transaction",
      "type": [
        "null",
        {
          "fields": [
            {
              "name": "id",
              "type": "string"
            },
            {
              "name": "total_order",
              "type": "long"
            },
            {
              "name": "data_collection_order",
              "type": "long"
            }
          ],
          "name": "ConnectDefault",
          "namespace": "io.confluent.connect.avro",
          "type": "record"
        }
      ]
    }
  ],
  "name": "Envelope",
  "namespace": "airflow.public.motor_crash_violation_incidents",
  "type": "record"
}

@bvaradar
Copy link
Contributor

bvaradar commented Oct 9, 2020

@ashishmgofficial : I tried to repro with the schema you have sent but am unable to. I guess the correct schema-provider may not have been set correctly. Can you paste the entire spark-submit command here along with all hoodie configs (the one in the ticket description is old without the new changes that was proposed).

@ashishmgofficial
Copy link
Author

ashishmgofficial commented Oct 9, 2020

@bvaradar Please find the details :

spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --jars s3://xxxx/hudi/jars/hudi-spark-bundle_2.11-0.6.1-SNAPSHOT.jar --packages org.apache.spark:spark-avro_2.11:2.4.4,org.apache.hadoop:hadoop-aws:2.7.3 --master yarn --deploy-mode client s3://xxxx/hudi/jars/hudi-utilities-bundle_2.11-0.6.1-SNAPSHOT.jar --table-type COPY_ON_WRITE --source-ordering-field last_modified_ts --source-class org.apache.hudi.utilities.sources.AvroKafkaSource --target-base-path s3a://xxxx/warehouse/hudi_dms_acc_kafka --target-table hudi_dms_acc_kafka --props s3://xxxx/hudi/conf/hudi-kafka.properties --schemaprovider-class org.apache.hudi.utilities.schema.DebeziumRegistryProvider --payload-class org.apache.hudi.common.model.DebeziumAvroPayload --transformer-class org.apache.hudi.utilities.transform.DebeziumCustomTransformer

hudi-kafka.properties

hoodie.upsert.shuffle.parallelism=10
hoodie.insert.shuffle.parallelism=10
hoodie.delete.shuffle.parallelism=10
hoodie.bulkinsert.shuffle.parallelism=10
hoodie.embed.timeline.server=true
hoodie.filesystem.view.type=EMBEDDED_KV_STORE
hoodie.compact.inline=false
# Key fields, for kafka example
hoodie.datasource.write.recordkey.field=inc_id
hoodie.datasource.write.precombine.field=last_modified_ts
hoodie.datasource.write.partitionpath.field=violation_code
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator
# Schema provider props (change to absolute path based on your installation)
#hoodie.deltastreamer.schemaprovider.source.schema.file=/var/demo/config/schema.avsc
#hoodie.deltastreamer.schemaprovider.target.schema.file=/var/demo/config/schema.avsc
# Kafka Source
hoodie.deltastreamer.source.kafka.topic=airflow.public.motor_crash_violation_incidents
#Kafka props
bootstrap.servers=http://xxxx:29092
auto.offset.reset=earliest
hoodie.deltastreamer.schemaprovider.registry.url=http://xxxx:8081/subjects/airflow.public.motor_crash_violation_incidents-value/versions/latest
#hoodie.deltastreamer.schemaprovider.registry.targetUrl=http://xxxx:8081/subjects/airflow.public.motor_crash_violation_incidents-value/versions/latest
schema.registry.url=http://xxxx:8081
validate.non.null = false

@bvaradar
Copy link
Contributor

bvaradar commented Oct 9, 2020

From your earlier link : #2149 (comment)

I do see in the SchemaProvider, hodieDeleteField is set wrongly.

Field hoodieDeleteField = registrySchema.getField("op");

Have you removed hoodieDeleteField from both SchemaProvider and Transformer ?

@ashishmgofficial
Copy link
Author

@bvaradar I have changed all the code to as how you had send earlier

So the HoodieDeleateField is not present now

@ashishmgofficial
Copy link
Author

Avro Payload :

package org.apache.hudi.common.model;

import org.apache.hudi.common.util.Option;
import org.apache.avro.generic.GenericRecord;

public class DebeziumAvroPayload extends OverwriteWithLatestAvroPayload {

  // Field is prefixed with a underscore by transformer to indicate metadata field
  public static final String OP_FIELD = "_op";
  public static final String DELETE_OP = "d";

  public DebeziumAvroPayload(GenericRecord record, Comparable orderingVal) {
    super(record, orderingVal);
  }

  public DebeziumAvroPayload(Option<GenericRecord> record) {
    this(record.isPresent() ? record.get() : null, 0); // natural order
  }

  @Override
  protected boolean isDeleteRecord(GenericRecord genericRecord) {
    return (genericRecord.get(OP_FIELD) != null && genericRecord.get(OP_FIELD).toString().equalsIgnoreCase(
        DELETE_OP));
  }
}

SchemaProvider :

package org.apache.hudi.utilities.schema;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.SchemaBuilder.FieldAssembler;
import org.apache.spark.api.java.JavaSparkContext;

public class DebeziumSchemaRegistryProvider extends SchemaRegistryProvider {  
  public DebeziumSchemaRegistryProvider(TypedProperties props,
      JavaSparkContext jssc) {
    super(props, jssc);
  }

  /**
   * Debezium target schema is a nested structure with many metadata fields. This will
   * flatten the schema structure and only require necessary metadata information
   * @return
   */
  @Override
  public Schema getTargetSchema() {
    Schema registrySchema = super.getTargetSchema();

    Field dataField = registrySchema.getField("after");
    Field tsField = registrySchema.getField("ts_ms");
    Field opField = registrySchema.getField("op");

    // Initialize with metadata columns
    FieldAssembler<Schema> payloadFieldAssembler = SchemaBuilder.builder()
        .record("formatted_debezium_payload")
        .fields()
        .name("_" + tsField.name()).type(tsField.schema()).withDefault(null)
        .name("_" + opField.name()).type(tsField.schema()).withDefault(null);

    // Add data columns to schema
    dataField.schema()
        .getTypes()
        // "after" field is a union with data schema and null schema, so we need to extract only the data schema portion
        .get(dataField.schema().getIndexNamed(registrySchema.getNamespace() + ".Value"))
        .getFields()
        .forEach(field -> {
          payloadFieldAssembler.name(field.name()).type(field.schema()).withDefault(null);
        });

    return payloadFieldAssembler.endRecord();
  }
}

Transformer :

package org.apache.hudi.utilities.transform;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class DebeziumTransformer implements Transformer {

  @Override
  public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset,
      TypedProperties properties) {

    Dataset<Row> insertedOrUpdatedData = rowDataset
        .select("op", "ts_ms", "after.*")
        .withColumnRenamed("op", "_op")
        .withColumnRenamed("ts_ms", "_ts_ms")
        .filter(rowDataset.col("op").notEqual("d"));

    Dataset<Row> deletedData = rowDataset
        .select("op", "ts_ms", "before.*")
        .withColumnRenamed("op", "_op")
        .withColumnRenamed("ts_ms", "_ts_ms")
        .filter(rowDataset.col("op").equalTo("d"));

    Dataset<Row> transformedData = insertedOrUpdatedData.union(deletedData);
    System.out.println(transformedData.showString(20, 50, false));
    return transformedData;
  }
}

I have added these three classes to hudi-utilities and hudi-common and build the jar.

@bvaradar
Copy link
Contributor

bvaradar commented Oct 9, 2020

@ashishmgofficial : I think there is a bug in the Schema provider implementation:

FieldAssembler<Schema> payloadFieldAssembler = SchemaBuilder.builder()
    .record("formatted_debezium_payload")
    .fields()
    .name("_" + tsField.name()).type(tsField.schema()).withDefault(null)
    .name("_" + opField.name()).type(tsField.schema()).withDefault(null);

should have been

FieldAssembler<Schema> payloadFieldAssembler = SchemaBuilder.builder()
    .record("formatted_debezium_payload")
    .fields()
    .name("_" + tsField.name()).type(tsField.schema()).withDefault(null)
    .name("_" + opField.name()).type(opField.schema()).withDefault(null);

Can you try with that change ?

@ashishmgofficial
Copy link
Author

ashishmgofficial commented Oct 9, 2020

@bvaradar Thanks for noticing it. I think that solved the previous error but producing following error now :

20/10/09 10:32:09 INFO AppInfoParser: Kafka version : 2.0.0
20/10/09 10:32:09 INFO AppInfoParser: Kafka commitId : 3402a8361b734732
20/10/09 10:32:09 INFO InternalKafkaConsumer: Initial fetch for spark-executor-null airflow.public.motor_crash_violation_incidents-0 0
20/10/09 10:32:09 INFO Metadata: Cluster ID: cA3sXVaIR-qlM1MPNNYnCw
20/10/09 10:32:09 ERROR Executor: Exception in task 0.3 in stage 2.0 (TID 5)
java.lang.RuntimeException: Error while decoding: java.lang.NegativeArraySizeException
createexternalrow(input[0, bigint, true], input[1, string, false].toString, input[2, int, false], input[3, int, true], input[4, string, true].toString, input[5, string, true].toString, input[6, int, true], input[7, string, true].toString, input[8, bigint, false], StructField(_ts_ms,LongType,true), StructField(_op,StringType,false), StructField(inc_id,IntegerType,false), StructField(year,IntegerType,true), StructField(violation_desc,StringType,true), StructField(violation_code,StringType,true), StructField(case_individual_id,IntegerType,true), StructField(flag,StringType,true), StructField(last_modified_ts,LongType,false))
	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:305)
	at org.apache.hudi.AvroConversionUtils$$anonfun$1.apply(AvroConversionUtils.scala:44)
	at org.apache.hudi.AvroConversionUtils$$anonfun$1.apply(AvroConversionUtils.scala:44)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:394)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at scala.collection.AbstractIterator.to(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
	at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$31.apply(RDD.scala:1409)
	at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$31.apply(RDD.scala:1409)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NegativeArraySizeException
	at org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:297)
	at org.apache.spark.unsafe.types.UTF8String.toString(UTF8String.java:1226)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.createExternalRow_0_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:302)
	... 28 more
 

I think this error earlier also in the same thread when i was trying to add _hoodie_is_deleted field

@ashishmgofficial
Copy link
Author

Following the Kafka data as consumed using Kafkacat

{"before": null, "after": {"Value": {"inc_id": 1, "year": {"int": 2016}, "violation_desc": {"string": "DRIVING WHILE INTOXICATED"}, "violation_code": {"string": "11923"}, "case_individual_id": {"int": 17475366}, "flag": {"string": "I"}, "last_modified_ts": 1600945380000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601531771650, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 8853671}, "lsn": {"long": 135217816}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601531771650}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 3, "year": {"int": 2016}, "violation_desc": {"string": "AGGRAVATED UNLIC OPER 2ND/PREV CONV"}, "violation_code": {"string": "5112A1"}, "case_individual_id": {"int": 17475367}, "flag": {"string": "U"}, "last_modified_ts": 1600959600000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601531771650, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 8853671}, "lsn": {"long": 135217816}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601531771650}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 4, "year": {"int": 2019}, "violation_desc": {"string": "AGGRAVATED UNLIC OPER 2ND/PREV"}, "violation_code": {"string": "5112A2"}, "case_individual_id": {"int": 17475368}, "flag": {"string": "I"}, "last_modified_ts": 1569337200000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601531771650, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 8853671}, "lsn": {"long": 135217816}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601531771650}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 5, "year": {"int": 2018}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "1180E"}, "case_individual_id": {"int": 17475569}, "flag": {"string": "I"}, "last_modified_ts": 1601374200000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601531771651, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 8853671}, "lsn": {"long": 135217816}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601531771651}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 7, "year": {"int": 2018}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "1180E"}, "case_individual_id": {"int": 17475571}, "flag": {"string": "I"}, "last_modified_ts": 1601375400000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601531771651, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 8853671}, "lsn": {"long": 135217816}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601531771651}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 2, "year": {"int": 2020}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "2180F"}, "case_individual_id": {"int": 17475569}, "flag": {"string": "U"}, "last_modified_ts": 1601377200000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601531771651, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 8853671}, "lsn": {"long": 135217816}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601531771651}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 6, "year": {"int": 2018}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "2180F"}, "case_individual_id": {"int": 17475570}, "flag": {"string": "I"}, "last_modified_ts": 1601374800000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601531771651, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 8853671}, "lsn": {"long": 135217816}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601531771651}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 8, "year": {"int": 2020}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "1180E"}, "case_individual_id": {"int": 17475572}, "flag": {"string": "I"}, "last_modified_ts": 1601376600000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601531771651, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 8853671}, "lsn": {"long": 135217816}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601531771651}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 9, "year": {"int": 2020}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "1180E"}, "case_individual_id": {"int": 17475573}, "flag": {"string": "I"}, "last_modified_ts": 1601377200000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601531771652, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 8853671}, "lsn": {"long": 135217816}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601531771652}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 10, "year": {"int": 2020}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "1180D"}, "case_individual_id": {"int": 17475574}, "flag": {"string": "I"}, "last_modified_ts": 1601377800000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601531771652, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 8853671}, "lsn": {"long": 135217816}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601531771652}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 11, "year": {"int": 2020}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "1180D"}, "case_individual_id": {"int": 17475574}, "flag": {"string": "I"}, "last_modified_ts": 1601381400000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601531771652, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 8853671}, "lsn": {"long": 135217816}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601531771652}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 12, "year": {"int": 2020}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "1180E"}, "case_individual_id": {"int": 17475574}, "flag": {"string": "I"}, "last_modified_ts": 1601385000000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601531771652, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 8853671}, "lsn": {"long": 135217816}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601531771652}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 13, "year": {"int": 2020}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "1180E"}, "case_individual_id": {"int": 17475574}, "flag": {"string": "I"}, "last_modified_ts": 1601388600000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601531771652, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 8853671}, "lsn": {"long": 135217816}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601531771652}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 34, "year": {"int": 2020}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "1180E"}, "case_individual_id": {"int": 17475574}, "flag": {"string": "I"}, "last_modified_ts": 1601392200000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601531771652, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 8853671}, "lsn": {"long": 135217816}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601531771653}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 35, "year": {"int": 2020}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "1180E"}, "case_individual_id": {"int": 17475574}, "flag": {"string": "I"}, "last_modified_ts": 1601395800000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601531913118, "snapshot": {"string": "false"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 8854199}, "lsn": {"long": 135339584}, "xmin": null}, "op": "c", "ts_ms": {"long": 1601531913459}, "transaction": {"ConnectDefault": {"id": "8854199", "total_order": 1, "data_collection_order": 1}}}
{"before": null, "after": {"Value": {"inc_id": 1, "year": {"int": 2016}, "violation_desc": {"string": "DRIVING WHILE INTOXICATED"}, "violation_code": {"string": "11923"}, "case_individual_id": {"int": 17475366}, "flag": {"string": "I"}, "last_modified_ts": 1600945380000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601532941011, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 8858001}, "lsn": {"long": 136229808}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601532941011}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 3, "year": {"int": 2016}, "violation_desc": {"string": "AGGRAVATED UNLIC OPER 2ND/PREV CONV"}, "violation_code": {"string": "5112A1"}, "case_individual_id": {"int": 17475367}, "flag": {"string": "U"}, "last_modified_ts": 1600959600000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601532941011, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 8858001}, "lsn": {"long": 136229808}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601532941011}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 4, "year": {"int": 2019}, "violation_desc": {"string": "AGGRAVATED UNLIC OPER 2ND/PREV"}, "violation_code": {"string": "5112A2"}, "case_individual_id": {"int": 17475368}, "flag": {"string": "I"}, "last_modified_ts": 1569337200000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601532941011, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 8858001}, "lsn": {"long": 136229808}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601532941012}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 5, "year": {"int": 2018}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "1180E"}, "case_individual_id": {"int": 17475569}, "flag": {"string": "I"}, "last_modified_ts": 1601374200000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601532941012, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 8858001}, "lsn": {"long": 136229808}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601532941012}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 7, "year": {"int": 2018}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "1180E"}, "case_individual_id": {"int": 17475571}, "flag": {"string": "I"}, "last_modified_ts": 1601375400000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601532941012, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 8858001}, "lsn": {"long": 136229808}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601532941012}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 2, "year": {"int": 2020}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "2180F"}, "case_individual_id": {"int": 17475569}, "flag": {"string": "U"}, "last_modified_ts": 1601377200000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601532941012, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 8858001}, "lsn": {"long": 136229808}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601532941012}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 6, "year": {"int": 2018}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "2180F"}, "case_individual_id": {"int": 17475570}, "flag": {"string": "I"}, "last_modified_ts": 1601374800000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601532941012, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 8858001}, "lsn": {"long": 136229808}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601532941012}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 8, "year": {"int": 2020}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "1180E"}, "case_individual_id": {"int": 17475572}, "flag": {"string": "I"}, "last_modified_ts": 1601376600000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601532941012, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 8858001}, "lsn": {"long": 136229808}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601532941012}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 9, "year": {"int": 2020}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "1180E"}, "case_individual_id": {"int": 17475573}, "flag": {"string": "I"}, "last_modified_ts": 1601377200000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601532941013, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 8858001}, "lsn": {"long": 136229808}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601532941013}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 10, "year": {"int": 2020}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "1180D"}, "case_individual_id": {"int": 17475574}, "flag": {"string": "I"}, "last_modified_ts": 1601377800000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601532941013, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 8858001}, "lsn": {"long": 136229808}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601532941013}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 11, "year": {"int": 2020}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "1180D"}, "case_individual_id": {"int": 17475574}, "flag": {"string": "I"}, "last_modified_ts": 1601381400000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601532941013, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 8858001}, "lsn": {"long": 136229808}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601532941013}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 12, "year": {"int": 2020}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "1180E"}, "case_individual_id": {"int": 17475574}, "flag": {"string": "I"}, "last_modified_ts": 1601385000000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601532941013, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 8858001}, "lsn": {"long": 136229808}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601532941013}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 13, "year": {"int": 2020}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "1180E"}, "case_individual_id": {"int": 17475574}, "flag": {"string": "I"}, "last_modified_ts": 1601388600000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601532941013, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 8858001}, "lsn": {"long": 136229808}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601532941013}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 34, "year": {"int": 2020}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "1180E"}, "case_individual_id": {"int": 17475574}, "flag": {"string": "I"}, "last_modified_ts": 1601392200000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601532941013, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 8858001}, "lsn": {"long": 136229808}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601532941013}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 35, "year": {"int": 2020}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "1180E"}, "case_individual_id": {"int": 17475574}, "flag": {"string": "I"}, "last_modified_ts": 1601395800000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601532941013, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 8858001}, "lsn": {"long": 136229808}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601532941013}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 36, "year": {"int": 2020}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "1180E"}, "case_individual_id": {"int": 17475574}, "flag": {"string": "I"}, "last_modified_ts": 1601398800000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601533208794, "snapshot": {"string": "false"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 8858996}, "lsn": {"long": 136463936}, "xmin": null}, "op": "c", "ts_ms": {"long": 1601533208915}, "transaction": {"ConnectDefault": {"id": "8858996", "total_order": 1, "data_collection_order": 1}}}
{"before": null, "after": {"Value": {"inc_id": 1, "year": {"int": 2016}, "violation_desc": {"string": "DRIVING WHILE INTOXICATED"}, "violation_code": {"string": "11923"}, "case_individual_id": {"int": 17475366}, "flag": {"string": "I"}, "last_modified_ts": 1600945380000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601985963857, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 10537240}, "lsn": {"long": 529139832}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601985963857}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 3, "year": {"int": 2016}, "violation_desc": {"string": "AGGRAVATED UNLIC OPER 2ND/PREV CONV"}, "violation_code": {"string": "5112A1"}, "case_individual_id": {"int": 17475367}, "flag": {"string": "U"}, "last_modified_ts": 1600959600000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601985963857, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 10537240}, "lsn": {"long": 529139832}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601985963857}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 4, "year": {"int": 2019}, "violation_desc": {"string": "AGGRAVATED UNLIC OPER 2ND/PREV"}, "violation_code": {"string": "5112A2"}, "case_individual_id": {"int": 17475368}, "flag": {"string": "I"}, "last_modified_ts": 1569337200000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601985963857, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 10537240}, "lsn": {"long": 529139832}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601985963857}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 5, "year": {"int": 2018}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "1180E"}, "case_individual_id": {"int": 17475569}, "flag": {"string": "I"}, "last_modified_ts": 1601374200000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601985963857, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 10537240}, "lsn": {"long": 529139832}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601985963857}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 7, "year": {"int": 2018}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "1180E"}, "case_individual_id": {"int": 17475571}, "flag": {"string": "I"}, "last_modified_ts": 1601375400000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601985963857, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 10537240}, "lsn": {"long": 529139832}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601985963857}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 2, "year": {"int": 2020}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "2180F"}, "case_individual_id": {"int": 17475569}, "flag": {"string": "U"}, "last_modified_ts": 1601377200000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601985963857, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 10537240}, "lsn": {"long": 529139832}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601985963857}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 6, "year": {"int": 2018}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "2180F"}, "case_individual_id": {"int": 17475570}, "flag": {"string": "I"}, "last_modified_ts": 1601374800000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601985963857, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 10537240}, "lsn": {"long": 529139832}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601985963857}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 8, "year": {"int": 2020}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "1180E"}, "case_individual_id": {"int": 17475572}, "flag": {"string": "I"}, "last_modified_ts": 1601376600000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601985963857, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 10537240}, "lsn": {"long": 529139832}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601985963857}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 9, "year": {"int": 2020}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "1180E"}, "case_individual_id": {"int": 17475573}, "flag": {"string": "I"}, "last_modified_ts": 1601377200000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601985963857, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 10537240}, "lsn": {"long": 529139832}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601985963857}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 10, "year": {"int": 2020}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "1180D"}, "case_individual_id": {"int": 17475574}, "flag": {"string": "I"}, "last_modified_ts": 1601377800000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601985963858, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 10537240}, "lsn": {"long": 529139832}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601985963858}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 11, "year": {"int": 2020}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "1180D"}, "case_individual_id": {"int": 17475574}, "flag": {"string": "I"}, "last_modified_ts": 1601381400000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601985963858, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 10537240}, "lsn": {"long": 529139832}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601985963858}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 12, "year": {"int": 2020}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "1180E"}, "case_individual_id": {"int": 17475574}, "flag": {"string": "I"}, "last_modified_ts": 1601385000000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601985963858, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 10537240}, "lsn": {"long": 529139832}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601985963858}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 13, "year": {"int": 2020}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "1180E"}, "case_individual_id": {"int": 17475574}, "flag": {"string": "I"}, "last_modified_ts": 1601388600000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601985963858, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 10537240}, "lsn": {"long": 529139832}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601985963858}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 34, "year": {"int": 2020}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "1180E"}, "case_individual_id": {"int": 17475574}, "flag": {"string": "I"}, "last_modified_ts": 1601392200000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601985963858, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 10537240}, "lsn": {"long": 529139832}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601985963858}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 35, "year": {"int": 2020}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "1180E"}, "case_individual_id": {"int": 17475574}, "flag": {"string": "I"}, "last_modified_ts": 1601395800000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601985963858, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 10537240}, "lsn": {"long": 529139832}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601985963858}, "transaction": null}
{"before": null, "after": {"Value": {"inc_id": 36, "year": {"int": 2020}, "violation_desc": {"string": "UNREASONABLE SPEED/SPECIAL HAZARDS"}, "violation_code": {"string": "1180E"}, "case_individual_id": {"int": 17475574}, "flag": {"string": "I"}, "last_modified_ts": 1601398800000000}}, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1601985963858, "snapshot": {"string": "true"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 10537240}, "lsn": {"long": 529139832}, "xmin": null}, "op": "r", "ts_ms": {"long": 1601985963858}, "transaction": null}

@bvaradar
Copy link
Contributor

bvaradar commented Oct 9, 2020

Let me try with the sample data you provided and get back over the weekend.

@bvaradar
Copy link
Contributor

@ashishmgofficial : It looks like the json data and the avro schema are not matching correctly. When I read the file through spark directly (please see below), I am getting an different schema than the one you provided. This is because debezium is configured to write in "JSON_SCHEMA" mode which I think is the default. This has both data and schema inlined and is inefficient in space.

Since you are actually managing avro schemas, can you configure Debezium to write avro records directly rather than json. In my experiments (with a custom schema), I saw 8x speeded in Debezium by changing the format from json_schema to avro. If you still want to write as json, disable inline schema by setting the below debezium configs to false:
key.converter.schemas.enable
value.converter.schemas.enable

==========

scala> val df = spark.read.json("file:///var/hoodie/ws/docker/inp.json")
df: org.apache.spark.sql.DataFrame = [after: struct<Value: struct<case_individual_id: struct<int: bigint>, flag: struct<string: string> ... 5 more fields>>, before: string ... 4 more fields]

scala> df.printSchema()
root
|-- after: struct (nullable = true)
| |-- Value: struct (nullable = true)
| | |-- case_individual_id: struct (nullable = true)
| | | |-- int: long (nullable = true)
| | |-- flag: struct (nullable = true)
| | | |-- string: string (nullable = true)
| | |-- inc_id: long (nullable = true)
| | |-- last_modified_ts: long (nullable = true)
| | |-- violation_code: struct (nullable = true)
| | | |-- string: string (nullable = true)
| | |-- violation_desc: struct (nullable = true)
| | | |-- string: string (nullable = true)
| | |-- year: struct (nullable = true)
| | | |-- int: long (nullable = true)
|-- before: string (nullable = true)
|-- op: string (nullable = true)
|-- source: struct (nullable = true)
| |-- connector: string (nullable = true)
| |-- db: string (nullable = true)
| |-- lsn: struct (nullable = true)
| | |-- long: long (nullable = true)
| |-- name: string (nullable = true)
| |-- schema: string (nullable = true)
| |-- snapshot: struct (nullable = true)
| | |-- string: string (nullable = true)
| |-- table: string (nullable = true)
| |-- ts_ms: long (nullable = true)
| |-- txId: struct (nullable = true)
| | |-- long: long (nullable = true)
| |-- version: string (nullable = true)
| |-- xmin: string (nullable = true)
|-- transaction: string (nullable = true)
|-- ts_ms: struct (nullable = true)
| |-- long: long (nullable = true)

@ashishmgofficial
Copy link
Author

@bvaradar The json I had provided is the output of kafkacat utility which outputs as json. In our process we have Key = String and Value as AVRO for Kafka. Now the different schema is due to the inline data types in the json output of kafkacat which is read as is by spark

@bvaradar
Copy link
Contributor

@ashishmgofficial : Would it be possible to dump the avro records (value) as-is in a file and attach ?

@ashishmgofficial
Copy link
Author

@bvaradar PFA below the files
Downloads.zip

@bvaradar
Copy link
Contributor

@ashishmgofficial : It took a while to debug this. Basically, the problem is in how spark deduces avro schema from ROw (code in spark-avro). This is incompatible with the schema passed through schema registry. Here is a gist which avoids the problem with a workaround:

https://gist.github.com/bvaradar/f2dbb50f7c7a82178c04d41603269306

Please try this and see if you are able to ingest successfully...

@ashishmgofficial
Copy link
Author

@bvaradar getting following error in patch :

error: corrupt patch at line 252

Doing git patch for the first time .Might be Im doing something silly

git apply file.patch

@bvaradar
Copy link
Contributor

@ashishmgofficial : I found a simpler way to workaround this and updated the gist: https://gist.github.com/bvaradar/f2dbb50f7c7a82178c04d41603269306

Can you refresh the above link.

You should download to local file and apply

patch -p1 < <file_path>

@ashishmgofficial
Copy link
Author

@bvaradar Thanks !!! .. It seems to ingest properly. I will test all scenarios like delete etc and let you know . Thanks for such amazing support .!!

@ashishmgofficial
Copy link
Author

ashishmgofficial commented Oct 14, 2020

@bvaradar The patch worked successfully for Insert and upserts except for Delete.
I think due to the way debezium loads the delete changes into kafka this issue is coming

Attaching the executor logs, KafkaCat outputs for ref :

logs.txt

kafkacat.json.txt

I had issued delete for record with inc_id = 3

@ashishmgofficial
Copy link
Author

@bvaradar I changed postgres configuration and now the debezium delete action doesnt create null value in "Before" :

{"before": {"Value": {"inc_id": 3, "year": {"int": 2016}, "violation_desc": {"string": "AGGRAVATED UNLIC OPER 2ND/PREV CONV"}, "violation_code": {"string": "5112A1"}, "case_individual_id": {"int": 17475367}, "flag": {"string": "U"}, "last_modified_ts": 1600959600000000}}, "after": null, "source": {"version": "1.2.2.Final", "connector": "postgresql", "name": "airflow", "ts_ms": 1602690384912, "snapshot": {"string": "false"}, "db": "airflow", "schema": "public", "table": "motor_crash_violation_incidents", "txId": {"long": 13145245}, "lsn": {"long": 1138834144}, "xmin": null}, "op": "d", "ts_ms": {"long": 1602690385412}, "transaction": {"ConnectDefault": {"id": "13145245", "total_order": 1, "data_collection_order": 1}}}

But still hudi puts up earlier error

@bvaradar
Copy link
Contributor

This is clearly the error : "Caused by: java.lang.NullPointerException: Null value appeared in non-nullable field:"
Looking at the code, its not very clear how this can happen. Just to be paranoid, when you tested second time, did you make sure the kafka topic did not have old record with null value ?

Would be helpful if you the dataset in avro format and I can try reproducing it

@ashishmgofficial
Copy link
Author

ashishmgofficial commented Oct 15, 2020

@bvaradar I thought that at first. To confirm the same I retried the scenario multiple times. Im getting the same error everytime. Only during Deletes

airflow.public.motor_crash_violation_incidents+0+0000000000 (1).avro.zip

Following is the table when I read the above avro :

+--------------------+--------------------+--------------------+---+-------------+----------------+
|              before|               after|              source| op|        ts_ms|     transaction|
+--------------------+--------------------+--------------------+---+-------------+----------------+
|                null|[1, 2016, DRIVING...|[1.2.2.Final, pos...|  c|1602791105042|[13519353, 1, 1]|
|                null|[3, 2016, AGGRAVA...|[1.2.2.Final, pos...|  c|1602791105042|[13519354, 1, 1]|
|                null|[4, 2019, AGGRAVA...|[1.2.2.Final, pos...|  c|1602791105043|[13519355, 1, 1]|
|                null|[2, 2020, UNREASO...|[1.2.2.Final, pos...|  c|1602791105043|[13519356, 1, 1]|
|                null|[9, 2020, UNREASO...|[1.2.2.Final, pos...|  c|1602791105043|[13519357, 1, 1]|
|                null|[10, 2020, UNREAS...|[1.2.2.Final, pos...|  c|1602791105043|[13519358, 1, 1]|
|                null|[11, 2020, UNREAS...|[1.2.2.Final, pos...|  c|1602791105044|[13519359, 1, 1]|
|                null|[12, 2020, UNREAS...|[1.2.2.Final, pos...|  c|1602791105044|[13519360, 1, 1]|
|                null|[13, 2020, UNREAS...|[1.2.2.Final, pos...|  c|1602791105044|[13519361, 1, 1]|
|                null|[34, 2020, UNREAS...|[1.2.2.Final, pos...|  c|1602791105044|[13519362, 1, 1]|
|                null|[35, 2020, UNREAS...|[1.2.2.Final, pos...|  c|1602791105045|[13519363, 1, 1]|
|                null|[36, 2020, UNREAS...|[1.2.2.Final, pos...|  c|1602791105045|[13519364, 1, 1]|
|                null|[37, 2020, UNREAS...|[1.2.2.Final, pos...|  c|1602791105045|[13519365, 1, 1]|
|                null|[38, 2020, UNREAS...|[1.2.2.Final, pos...|  c|1602791105045|[13519366, 1, 1]|
|[3, 2016, AGGRAVA...|                null|[1.2.2.Final, pos...|  d|1602791141537|[13519503, 1, 1]|
+--------------------+--------------------+--------------------+---+-------------+----------------+

@bvaradar
Copy link
Contributor

@ashishmgofficial : With your provided avro file, I am able to ingest without any errors.

spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.4   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE   --table-type COPY_ON_WRITE   --source-class org.apache.hudi.utilities.sources.AvroDFSSource   --source-ordering-field _ts_ms    --target-base-path /tmp/hudi_data   --target-table stock_ticks_cow --props /Users/balaji.varadarajan/code/oss/upstream_hudi/docker/demo/config/cases_dfs_source.properties   --schemaprovider-class org.apache.hudi.utilities.schema.DebeziumSchemaRegistryProvider   --transformer-class org.apache.hudi.utilities.transform.DebeziumTransformer   --hoodie-conf hoodie.compaction.payload.class=org.apache.hudi.common.model.DebeziumAvroPayload   --hoodie-conf hoodie.deltastreamer.source.dfs.root=file:///tmp/data/ 2>&1 | tee o.log

Logs

+-------------+---+------+----+-----------------------------------+--------------+------------------+----+----------------+
|_ts_ms       |_op|inc_id|year|violation_desc                     |violation_code|case_individual_id|flag|last_modified_ts|
+-------------+---+------+----+-----------------------------------+--------------+------------------+----+----------------+
|1602791105042|c  |1     |2016|DRIVING WHILE INTOXICATED          |11923         |17475366          |I   |1600945380000000|
|1602791105042|c  |3     |2016|AGGRAVATED UNLIC OPER 2ND/PREV CONV|5112A1        |17475367          |U   |1600959600000000|
|1602791105043|c  |4     |2019|AGGRAVATED UNLIC OPER 2ND/PREV     |5112A2        |17475368          |I   |1569337200000000|
|1602791105043|c  |2     |2020|UNREASONABLE SPEED/SPECIAL HAZARDS |2180F         |17475569          |U   |1601377200000000|
|1602791105043|c  |9     |2020|UNREASONABLE SPEED/SPECIAL HAZARDS |1180E         |17475573          |I   |1601377200000000|
|1602791105043|c  |10    |2020|UNREASONABLE SPEED/SPECIAL HAZARDS |1180D         |17475574          |I   |1601377800000000|
|1602791105044|c  |11    |2020|UNREASONABLE SPEED/SPECIAL HAZARDS |1180D         |17475574          |I   |1601381400000000|
|1602791105044|c  |12    |2020|UNREASONABLE SPEED/SPECIAL HAZARDS |1180E         |17475574          |I   |1601385000000000|
|1602791105044|c  |13    |2020|UNREASONABLE SPEED/SPECIAL HAZARDS |1180E         |17475574          |I   |1601388600000000|
|1602791105044|c  |34    |2020|UNREASONABLE SPEED/SPECIAL HAZARDS |1180E         |17475574          |I   |1601392200000000|
|1602791105045|c  |35    |2020|UNREASONABLE SPEED/SPECIAL HAZARDS |1180E         |17475574          |I   |1601395800000000|
|1602791105045|c  |36    |2020|UNREASONABLE SPEED/SPECIAL HAZARDS |1180E         |17475574          |I   |1601398800000000|
|1602791105045|c  |37    |2020|UNREASONABLE SPEED/SPECIAL HAZARDS |1180E         |17475574          |I   |1601399400000000|
|1602791105045|c  |38    |2020|UNREASONABLE SPEED/SPECIAL HAZARDS |1180D         |17475574          |I   |1601402400000000|
|1602791141537|d  |3     |2016|AGGRAVATED UNLIC OPER 2ND/PREV CONV|5112A1        |17475367          |U   |1600959600000000|
+-------------+---+------+----+-----------------------------------+--------------+------------------+----+----------------+

....
20/10/15 21:56:44 INFO DeltaSync: Commit 20201015215641 successful!
......

I am able to read the newly added data successfully too:

scala> df.show(100, false)
+-------------------+--------------------+------------------+----------------------+---------------------------------------------------------------------+-------------+---+------+----+-----------------------------------+--------------+------------------+----+----------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name                                                    |_ts_ms       |_op|inc_id|year|violation_desc                     |violation_code|case_individual_id|flag|last_modified_ts|
+-------------------+--------------------+------------------+----------------------+---------------------------------------------------------------------+-------------+---+------+----+-----------------------------------+--------------+------------------+----+----------------+
|20201015215641     |20201015215641_0_1  |34                |                      |738db628-641f-4d60-b7e5-c9c971a371f8-0_0-27-29_20201015215641.parquet|1602791105044|c  |34    |2020|UNREASONABLE SPEED/SPECIAL HAZARDS |1180E         |17475574          |I   |1601392200000000|
|20201015215641     |20201015215641_0_2  |10                |                      |738db628-641f-4d60-b7e5-c9c971a371f8-0_0-27-29_20201015215641.parquet|1602791105043|c  |10    |2020|UNREASONABLE SPEED/SPECIAL HAZARDS |1180D         |17475574          |I   |1601377800000000|
|20201015215641     |20201015215641_0_3  |1                 |                      |738db628-641f-4d60-b7e5-c9c971a371f8-0_0-27-29_20201015215641.parquet|1602791105042|c  |1     |2016|DRIVING WHILE INTOXICATED          |11923         |17475366          |I   |1600945380000000|
|20201015215641     |20201015215641_0_4  |3                 |                      |738db628-641f-4d60-b7e5-c9c971a371f8-0_0-27-29_20201015215641.parquet|1602791141537|d  |3     |2016|AGGRAVATED UNLIC OPER 2ND/PREV CONV|5112A1        |17475367          |U   |1600959600000000|
|20201015215641     |20201015215641_0_5  |36                |                      |738db628-641f-4d60-b7e5-c9c971a371f8-0_0-27-29_20201015215641.parquet|1602791105045|c  |36    |2020|UNREASONABLE SPEED/SPECIAL HAZARDS |1180E         |17475574          |I   |1601398800000000|
|20201015215641     |20201015215641_0_6  |12                |                      |738db628-641f-4d60-b7e5-c9c971a371f8-0_0-27-29_20201015215641.parquet|1602791105044|c  |12    |2020|UNREASONABLE SPEED/SPECIAL HAZARDS |1180E         |17475574          |I   |1601385000000000|
|20201015215641     |20201015215641_0_7  |9                 |                      |738db628-641f-4d60-b7e5-c9c971a371f8-0_0-27-29_20201015215641.parquet|1602791105043|c  |9     |2020|UNREASONABLE SPEED/SPECIAL HAZARDS |1180E         |17475573          |I   |1601377200000000|
|20201015215641     |20201015215641_0_8  |38                |                      |738db628-641f-4d60-b7e5-c9c971a371f8-0_0-27-29_20201015215641.parquet|1602791105045|c  |38    |2020|UNREASONABLE SPEED/SPECIAL HAZARDS |1180D         |17475574          |I   |1601402400000000|
|20201015215641     |20201015215641_0_9  |2                 |                      |738db628-641f-4d60-b7e5-c9c971a371f8-0_0-27-29_20201015215641.parquet|1602791105043|c  |2     |2020|UNREASONABLE SPEED/SPECIAL HAZARDS |2180F         |17475569          |U   |1601377200000000|
|20201015215641     |20201015215641_0_10 |4                 |                      |738db628-641f-4d60-b7e5-c9c971a371f8-0_0-27-29_20201015215641.parquet|1602791105043|c  |4     |2019|AGGRAVATED UNLIC OPER 2ND/PREV     |5112A2        |17475368          |I   |1569337200000000|
|20201015215641     |20201015215641_0_11 |13                |                      |738db628-641f-4d60-b7e5-c9c971a371f8-0_0-27-29_20201015215641.parquet|1602791105044|c  |13    |2020|UNREASONABLE SPEED/SPECIAL HAZARDS |1180E         |17475574          |I   |1601388600000000|
|20201015215641     |20201015215641_0_12 |11                |                      |738db628-641f-4d60-b7e5-c9c971a371f8-0_0-27-29_20201015215641.parquet|1602791105044|c  |11    |2020|UNREASONABLE SPEED/SPECIAL HAZARDS |1180D         |17475574          |I   |1601381400000000|
|20201015215641     |20201015215641_0_13 |37                |                      |738db628-641f-4d60-b7e5-c9c971a371f8-0_0-27-29_20201015215641.parquet|1602791105045|c  |37    |2020|UNREASONABLE SPEED/SPECIAL HAZARDS |1180E         |17475574          |I   |1601399400000000|
|20201015215641     |20201015215641_0_14 |35                |                      |738db628-641f-4d60-b7e5-c9c971a371f8-0_0-27-29_20201015215641.parquet|1602791105045|c  |35    |2020|UNREASONABLE SPEED/SPECIAL HAZARDS |1180E         |17475574          |I   |1601395800000000|
+-------------------+--------------------+------------------+----------------------+---------------------------------------------------------------------+-------------+---+------+----+-----------------------------------+--------------+------------------+----+----------------+

Do yo see anything I am missing here ?

@bvaradar
Copy link
Contributor

BTW, it looks like both create and delete have the same last_modified_ts which means that precombine would not have deleted the records. Is this fake data ? If so, can you set the deletion timestamp to be higher ?

@ashishmgofficial
Copy link
Author

ashishmgofficial commented Oct 16, 2020

@bvaradar Isnt the --source-ordering-field _ts_ms Then precombine should be looking in for _ts_ms right for deletion ?

Delete worked fine for me as I replace --hoodie-conf hoodie.compaction.payload.class=org.apache.hudi.common.model.DebeziumAvroPayload with --payload-class org.apache.hudi.common.model.DebeziumAvroPayload

I checked the same scenario using the spark submit you had provided with AvroDFSSource and Im getting the same results but when I execute for AvroKafkaSource the earlier error comes up

+-------------------+--------------------+------------------+----------------------+--------------------+-------------+---+------+----+--------------------+--------------+------------------+----+----------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|       _ts_ms|_op|inc_id|year|      violation_desc|violation_code|case_individual_id|flag|last_modified_ts|
+-------------------+--------------------+------------------+----------------------+--------------------+-------------+---+------+----+--------------------+--------------+------------------+----+----------------+
|     20201016125655|  20201016125655_0_1|                12|                      |b2699714-2916-4c4...|1602791105044|  c|    12|2020|UNREASONABLE SPEE...|         1180E|          17475574|   I|1601385000000000|
|     20201016125655|  20201016125655_0_2|                36|                      |b2699714-2916-4c4...|1602791105045|  c|    36|2020|UNREASONABLE SPEE...|         1180E|          17475574|   I|1601398800000000|
|     20201016125655|  20201016125655_0_3|                11|                      |b2699714-2916-4c4...|1602791105044|  c|    11|2020|UNREASONABLE SPEE...|         1180D|          17475574|   I|1601381400000000|
|     20201016125655|  20201016125655_0_4|                37|                      |b2699714-2916-4c4...|1602791105045|  c|    37|2020|UNREASONABLE SPEE...|         1180E|          17475574|   I|1601399400000000|
|     20201016125655|  20201016125655_0_5|                13|                      |b2699714-2916-4c4...|1602791105044|  c|    13|2020|UNREASONABLE SPEE...|         1180E|          17475574|   I|1601388600000000|
|     20201016125655|  20201016125655_0_6|                 4|                      |b2699714-2916-4c4...|1602791105043|  c|     4|2019|AGGRAVATED UNLIC ...|        5112A2|          17475368|   I|1569337200000000|
|     20201016125655|  20201016125655_0_7|                10|                      |b2699714-2916-4c4...|1602791105043|  c|    10|2020|UNREASONABLE SPEE...|         1180D|          17475574|   I|1601377800000000|
|     20201016125655|  20201016125655_0_8|                34|                      |b2699714-2916-4c4...|1602791105044|  c|    34|2020|UNREASONABLE SPEE...|         1180E|          17475574|   I|1601392200000000|
|     20201016125655|  20201016125655_0_9|                35|                      |b2699714-2916-4c4...|1602791105045|  c|    35|2020|UNREASONABLE SPEE...|         1180E|          17475574|   I|1601395800000000|
|     20201016125655| 20201016125655_0_10|                38|                      |b2699714-2916-4c4...|1602791105045|  c|    38|2020|UNREASONABLE SPEE...|         1180D|          17475574|   I|1601402400000000|
|     20201016125655| 20201016125655_0_11|                 1|                      |b2699714-2916-4c4...|1602791105042|  c|     1|2016|DRIVING WHILE INT...|         11923|          17475366|   I|1600945380000000|
|     20201016125655| 20201016125655_0_12|                 2|                      |b2699714-2916-4c4...|1602791105043|  c|     2|2020|UNREASONABLE SPEE...|         2180F|          17475569|   U|1601377200000000|
|     20201016125655| 20201016125655_0_13|                 9|                      |b2699714-2916-4c4...|1602791105043|  c|     9|2020|UNREASONABLE SPEE...|         1180E|          17475573|   I|1601377200000000|
+-------------------+--------------------+------------------+----------------------+--------------------+-------------+---+------+----+--------------------+--------------+------------------+----+----------------+

@ashishmgofficial
Copy link
Author

ashishmgofficial commented Oct 16, 2020

I followed these steps :


- Took fresh clone of release-0.6.0 branch
- applied the patch provided
- build and used the jar to run below commands

AvroKafkaSource :

spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.4   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer s3://xxxx/hudi/jars/hudi-utilities-bundle_2.11-0.6.0.jar   --table-type COPY_ON_WRITE   --source-class org.apache.hudi.utilities.sources.AvroKafkaSource   --source-ordering-field _ts_ms    --target-base-path s3://xxxx/warehouse/hudi_dms_acc_kafka   --target-table hudi_dms_acc_kafka --props s3://xxxx/hudi/conf/hudi-kafka.properties   --schemaprovider-class org.apache.hudi.utilities.schema.DebeziumSchemaRegistryProvider   --transformer-class org.apache.hudi.utilities.transform.DebeziumTransformer   --payload-class org.apache.hudi.common.model.DebeziumAvroPayload 2>&1 | tee o.log

o.log

AvroDFSSource :

spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.4   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer s3://xxxx/hudi/jars/hudi-utilities-bundle_2.11-0.6.0.jar   --table-type COPY_ON_WRITE   --source-class org.apache.hudi.utilities.sources.AvroDFSSource   --source-ordering-field _ts_ms    --target-base-path s3://xxxx/warehouse/hudi_dms_acc_kafka   --target-table hudi_dms_acc_kafka --props s3://xxxx/hudi/conf/hudi-conf.properties   --schemaprovider-class org.apache.hudi.utilities.schema.DebeziumSchemaRegistryProvider   --transformer-class org.apache.hudi.utilities.transform.DebeziumTransformer   --payload-class org.apache.hudi.common.model.DebeziumAvroPayload 2>&1 | tee p.log

p.log

@bvaradar
Copy link
Contributor

@ashishmgofficial : If I need to test with Kafka, would need a way to generate both Key and Value payload. Do you have some script to publish records to Kafka ? BTW, yeah, you are right about _ts_ms ordering field

@ashishmgofficial
Copy link
Author

@bvaradar We are using the Debezium postgres connector of Confluent Kafka

@ashishmgofficial
Copy link
Author

@bvaradar I can provide all the SQL's in Postgres which I'm using to reproduce this though :

DROP TABLE public.motor_crash_violation_incidents;

CREATE TABLE public.motor_crash_violation_incidents (
	inc_id serial ,
	"year" int4 NULL,
	violation_desc varchar(100) NULL,
	violation_code varchar(20) NULL,
	case_individual_id int4 NULL,
	flag varchar(1) NULL,
	last_modified_ts timestamp not NULL,
	CONSTRAINT motor_crash_violation_incidents_pkey PRIMARY KEY (inc_id)
);

ALTER TABLE public.motor_crash_violation_incidents REPLICA IDENTITY FULL;

Insert records :

INSERT INTO public.motor_crash_violation_incidents
(inc_id, "year", violation_desc, violation_code, case_individual_id, flag, last_modified_ts)
VALUES(1, 2016, 'DRIVING WHILE INTOXICATED', '11923', 17475366, 'I', '2020-09-24 11:03:00.000');
commit;
INSERT INTO public.motor_crash_violation_incidents
(inc_id, "year", violation_desc, violation_code, case_individual_id, flag, last_modified_ts)
VALUES(3, 2016, 'AGGRAVATED UNLIC OPER 2ND/PREV CONV', '5112A1', 17475367, 'U', '2020-09-24 15:00:00.000');
commit;
INSERT INTO public.motor_crash_violation_incidents
(inc_id, "year", violation_desc, violation_code, case_individual_id, flag, last_modified_ts)
VALUES(4, 2019, 'AGGRAVATED UNLIC OPER 2ND/PREV', '5112A2', 17475368, 'I', '2019-09-24 15:00:00.000');
commit;
INSERT INTO public.motor_crash_violation_incidents
(inc_id, "year", violation_desc, violation_code, case_individual_id, flag, last_modified_ts)
VALUES(2, 2020, 'UNREASONABLE SPEED/SPECIAL HAZARDS', '2180F', 17475569, 'U', '2020-09-29 11:00:00.000');
commit;
INSERT INTO public.motor_crash_violation_incidents
(inc_id, "year", violation_desc, violation_code, case_individual_id, flag, last_modified_ts)
VALUES(9, 2020, 'UNREASONABLE SPEED/SPECIAL HAZARDS', '1180E', 17475573, 'I', '2020-09-29 11:00:00.000');
commit;
INSERT INTO public.motor_crash_violation_incidents
(inc_id, "year", violation_desc, violation_code, case_individual_id, flag, last_modified_ts)
VALUES(10, 2020, 'UNREASONABLE SPEED/SPECIAL HAZARDS', '1180D', 17475574, 'I', '2020-09-29 11:10:00.000');
commit;
INSERT INTO public.motor_crash_violation_incidents
(inc_id, "year", violation_desc, violation_code, case_individual_id, flag, last_modified_ts)
VALUES(11, 2020, 'UNREASONABLE SPEED/SPECIAL HAZARDS', '1180D', 17475574, 'I', '2020-09-29 12:10:00.000');
commit;
INSERT INTO public.motor_crash_violation_incidents
(inc_id, "year", violation_desc, violation_code, case_individual_id, flag, last_modified_ts)
VALUES(12, 2020, 'UNREASONABLE SPEED/SPECIAL HAZARDS', '1180E', 17475574, 'I', '2020-09-29 13:10:00.000');
commit;
INSERT INTO public.motor_crash_violation_incidents
(inc_id, "year", violation_desc, violation_code, case_individual_id, flag, last_modified_ts)
VALUES(13, 2020, 'UNREASONABLE SPEED/SPECIAL HAZARDS', '1180E', 17475574, 'I', '2020-09-29 14:10:00.000');
commit;
INSERT INTO public.motor_crash_violation_incidents
(inc_id, "year", violation_desc, violation_code, case_individual_id, flag, last_modified_ts)
VALUES(34, 2020, 'UNREASONABLE SPEED/SPECIAL HAZARDS', '1180E', 17475574, 'I', '2020-09-29 15:10:00.000');
commit;
INSERT INTO public.motor_crash_violation_incidents
(inc_id, "year", violation_desc, violation_code, case_individual_id, flag, last_modified_ts)
VALUES(35, 2020, 'UNREASONABLE SPEED/SPECIAL HAZARDS', '1180E', 17475574, 'I', '2020-09-29 16:10:00.000');
commit;
INSERT INTO public.motor_crash_violation_incidents
(inc_id, "year", violation_desc, violation_code, case_individual_id, flag, last_modified_ts)
VALUES(36, 2020, 'UNREASONABLE SPEED/SPECIAL HAZARDS', '1180E', 17475574, 'I', '2020-09-29 17:00:00.000');
commit;
INSERT INTO public.motor_crash_violation_incidents
(inc_id, "year", violation_desc, violation_code, case_individual_id, flag, last_modified_ts)
VALUES(37, 2020, 'UNREASONABLE SPEED/SPECIAL HAZARDS', '1180E', 17475574, 'I', '2020-09-29 17:10:00.000');
commit;
INSERT INTO public.motor_crash_violation_incidents
(inc_id, "year", violation_desc, violation_code, case_individual_id, flag, last_modified_ts)
VALUES(38, 2020, 'UNREASONABLE SPEED/SPECIAL HAZARDS', '1180D', 17475574, 'I', '2020-09-29 18:00:00.000');
commit;

Issue Delete :

DELETE FROM public.motor_crash_violation_incidents
WHERE inc_id=3;

These changes are automatically picked by the Confluent Kafka's Postgres Debezium Connector and written to topic

@ashishmgofficial
Copy link
Author

ashishmgofficial commented Oct 19, 2020

Not sure if this is gonna be of any help but attaching the latest logs. I can see this messages towards the end

at scala.collection.Iterator$class.isEmpty(Iterator.scala:331)
at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1334)
at org.apache.hudi.AvroConversionUtils$$anonfun$2.apply(AvroConversionUtils.scala:46)
at org.apache.hudi.AvroConversionUtils$$anonfun$2.apply(AvroConversionUtils.scala:45)

o.log

@bvaradar
Copy link
Contributor

@ashishmgofficial : THis turned out to be unrelated to Hudi. I tested with the debezium local setup. Debezium is writing 2 kafka records for each delete records with one of the record having value set to "null". You can inspect the kafka topic using kafka-avro-console-consumer. This "null" record is causing the spark row encoding to fail.

root@schemaregistry:/# kafka-avro-console-consumer --bootstrap-server kafka:9092 --topic debezium.public.motor_crash_violation_incidents --offset 'earliest' --partition 0 -property schema.registry.url=http://localhost:8085 --property print.key=true

{"inc_id":3}	{"before":{"debezium.public.motor_crash_violation_incidents.Value":{"inc_id":3,"year":{"int":2016},"violation_desc":{"string":"AGGRAVATED UNLIC OPER 2ND/PREV CONV"},"violation_code":{"string":"5112A1"},"case_individual_id":{"int":17475367},"flag":{"string":"U"},"last_modified_ts":1600959600000000}},"after":null,"source":{"version":"1.3.0.Final","connector":"postgresql","name":"debezium","ts_ms":1603179674623,"snapshot":{"string":"false"},"db":"debezium","schema":"public","table":"motor_crash_violation_incidents","txId":{"long":638},"lsn":{"long":34213136},"xmin":null},"op":"d","ts_ms":{"long":1603179675108},"transaction":null}
{"inc_id":3}	null

@ashishmgofficial
Copy link
Author

ashishmgofficial commented Oct 20, 2020

@bvaradar Yes I think thats the tombstone event. You can disable it with configs. I believe

tombstones.on.delete = false

@bvaradar
Copy link
Contributor

@ashishmgofficial : Let us know after you update the debezium setting if things work fine end to end.

@ashishmgofficial
Copy link
Author

@bvaradar You are correct. It worked fine once the config was added. For some reason , kafkacat was not showing up the tombstone record .

@bvaradar bvaradar closed this as completed Nov 9, 2020
@toninis
Copy link

toninis commented Jan 18, 2021

@bvaradar @ashishmgofficial I used the patch mentioned in #2149 (comment) and the instructions from #2149 (comment) but I got

Caused by: java.lang.NoSuchMethodException: org.apache.hudi.common.model.DebeziumAvroPayload.<init>(org.apache.avro.generic.GenericRecord, java.lang.Comparable)
  at java.lang.Class.getConstructor0(Class.java:3082)
  at java.lang.Class.getConstructor(Class.java:1825)
  at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:87)
... 28 more

I suppose something is wrong with my build ?

Either way is there a timeline if and when the community will integrate a DebeziumAvroPayload to Hudi ?

Thanks and sorry for mentioning this is in a closed issue .

@vinothchandar
Copy link
Member

@toninis this is kind of weird, given the snippet that has the constructor. the class seems to be there in the build.
do you have a branch where you have the code stashed? We can open a new issue or JIRA and work through this cc @nsivabalan

@toninis
Copy link

toninis commented Feb 25, 2021

@vinothchandar
Sorry I took so long to respond . It had worked and compiled successfully . I probably had missed something at the time .

Thanks for your response at the time .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants