Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] - Partial Update : update few columns of a table #2637

Closed
Sugamber opened this issue Mar 5, 2021 · 20 comments
Closed

[SUPPORT] - Partial Update : update few columns of a table #2637

Sugamber opened this issue Mar 5, 2021 · 20 comments
Assignees

Comments

@Sugamber
Copy link

Sugamber commented Mar 5, 2021

We have one table with more than 300 columns. We would like to update only few fields.
I read the configuration and it suggested that we have to use HoodieRecordPayload and provide our own merge logic.
I didn't see any example on hudi documentaion.
Please help me on this

@Sugamber Sugamber changed the title [SUPPORT] [SUPPORT] - Update partial record Mar 5, 2021
@Sugamber Sugamber changed the title [SUPPORT] - Update partial record [SUPPORT] - Partial Update : update record with few columns of a table Mar 5, 2021
@Sugamber Sugamber changed the title [SUPPORT] - Partial Update : update record with few columns of a table [SUPPORT] - Partial Update : update few columns of a table Mar 5, 2021
@bvaradar
Copy link
Contributor

bvaradar commented Mar 9, 2021

@n3nash : Can you please take a look at this request when you get a chance ?

@n3nash
Copy link
Contributor

n3nash commented Mar 11, 2021

@Sugamber The HoodieRecordPayload provides 2 API's to perform custom merges

  1. preCombine
  2. combineAndGetUpdateValue

Each of these API's is a callback that provides you with the other payload. In case of preCombine this could be another payload while performing an in-memory reduce by key operation while for combineAndGetUpdateValue, the payload provided in the callback is the latest one in storage.

See an example here -> https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java#L50

This payload simply takes the latest payload, you could just add custom logic in above 2 methods and achieve desired behavior.

@Sugamber
Copy link
Author

@n3nash How can we pass custom class name? I copied same class as CustomRecordUpdate and tried to set during save. It throws class not found exception.
writeDF.write.format("org.apache.hudi") .options(hudiConfMap) .option("hoodie.datasource.write.payload.class", classOf[CustomRecordUpdate].getName) .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "col1") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "col2") .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "col3") .option(HoodieWriteConfig.TABLE_NAME, tableName.toString) .mode(SaveMode.Append) .save(tablePath.toString)

I tried all three ways of scala api but non of them worked.
classOf[CustomRecordUpdate].getCanonicalName classOf[CustomRecordUpdate].getName classOf[CustomRecordUpdate].getSimpleName

@nsivabalan
Copy link
Contributor

@Sugamber : did you confirm that your class exists in class path while you run your spark job. classOf[CustomRecordUpdate].getName should have worked.

@Sugamber
Copy link
Author

@nsivabalan Yes,
I have added the jar file in both driver and executor class path.
spark-submit --jars /path/lib/orders-poc-1.0.41-SNAPSHOT-shaded.jar,/path/hudi-support-jars/org.apache.avro_avro-1.8.2.jar,/path/hudi-support-jars/spark-avro_2.11-2.4.4.jar,/path/hudi-support-jars/hudi-spark-bundle_2.11-0.7.0.jar --master yarn --deploy-mode cluster --num-executors 2 --executor-cores 4 --executor-memory 8g --driver-memory=8g --queue=default --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.driver.extraClassPath=org.apache.avro_avro-1.8.2.jar:spark-avro_2.11-2.4.4.jar:hudi-spark-bundle_2.11-0.7.0.jar:/path/lib/orders-poc-1.0.41-SNAPSHOT-shaded.jar --conf spark.executor.extraClassPath=org.apache.avro_avro-1.8.2.jar:spark-avro_2.11-2.4.4.jar:hudi-spark-bundle_2.11-0.7.0.jar:/path/lib/orders-poc-1.0.41-SNAPSHOT-shaded.jar --files /path/hive-site.xml,/path/resources/hudiConf.conf --class com.app.workflows.RecordPartialUpdate lib/orders-poc-1.0.41-SNAPSHOT-shaded.jar/

I'm able to find class name in jar using linux command.
find /path/orders-poc-1.0.41-SNAPSHOT-shaded.jar|xargs grep CustomRecordUpdate Binary file /path/orders-poc-1.0.41-SNAPSHOT-shaded.jar matches

@Sugamber
Copy link
Author

I'm able to resolve class not found exception.

@nsivabalan
Copy link
Contributor

what was the issue or fix. Do you mind updating it here.

@Sugamber
Copy link
Author

@nsivabalan , I had created shaded jar and it was causing the issues as few dependencies version were conflicting.

@Sugamber
Copy link
Author

Sugamber commented Mar 23, 2021

I have created one class after implementing HoodieRecordPayload. We have three methods for which we have to write our logic.

  1. preCombine
  2. combineAndGetUpdateValue
  3. getInsertValue
    @n3nash As per your above explanation, preCombine would provide the current record which is coming in incremental load and combineAndGetUpdateValue will provide the latest records from hoodie table.
    Please correct me if my understanding is incorrect.

In my use case , I'm only getting few columns out of 20 in incremental data. I have built schema in constructor as preCombine method does not have any schema details.

For Example - Hudi table built with 20 columns. Now, requirement is to update only 3 columns and only these columns data is coming from incremental data feeds along with RECORDKEY_FIELD_OPT_KEY,PARTITIONPATH_FIELD_OPT_KEY and PRECOMBINE_FIELD_OPT_KEY column.

I have implemented the class as below. Please let me know in which method, I'll be getting full schema of the table.
I also found that getInsertValue method is getting invoked even this is not called in combineAndGetUpdate method.
I would like to know more about these three methods and its flow.

@Sugamber
Copy link
Author

public class PartialColumnUpdate implements HoodieRecordPayload {
private static final Logger logger = Logger.getLogger(PartialColumnUpdate.class);
private byte[] recordBytes;
private Schema schema;
private Comparable orderingVal;

public PartialColumnUpdate(GenericRecord genericRecord, Comparable orderingVal) {
    logger.info("Inside two parameter cons");
    try {
        if (genericRecord != null) {
            this.recordBytes = HoodieAvroUtils.avroToBytes(genericRecord);
            this.schema = genericRecord.getSchema();
            this.orderingVal = orderingVal;
        } else {
            this.recordBytes = new byte[0];
        }
    } catch (Exception io) {
        throw new RuntimeException("Cannot convert record to bytes ", io);
    }
}

public PartialColumnUpdate(Option<GenericRecord> record) {
    this(record.isPresent() ? record.get() : null, 0);
}

@Override
public PartialColumnUpdate preCombine(PartialColumnUpdate anotherRecord) {
    logger.info("Inside PreCombine");
    logger.info("preCombine => " + anotherRecord);
    logger.info("another_ordering value" + anotherRecord.orderingVal);
    logger.info("another_ schema value" + anotherRecord.schema);
    logger.info("another_ record bytes value" + anotherRecord.recordBytes);
    if (anotherRecord.orderingVal.compareTo(orderingVal) > 0) {
        return anotherRecord;
    } else {
        return this;
    }
}


@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord indexedRecord, Schema currentSchema) throws IOException {
    logger.info("Inside combineAndGetUpdateValue");
    logger.info("current schema" + currentSchema);
    logger.info("combineUpdate - >" + Option.of(indexedRecord));
    getInsertValue(currentSchema);
    return Option.empty();
}

@Override
public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException {
    logger.info("Inside getInsertValue");
    if (recordBytes.length == 0) {
        return Option.empty();
    }
    IndexedRecord indexedRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema);
    if (isDeleteRecord((GenericRecord) indexedRecord)) {
        return Option.empty();
    } else {
        return Option.of(indexedRecord);
    }
}

protected boolean isDeleteRecord(GenericRecord genericRecord) {
    final String isDeleteKey = "_hoodie_is_deleted";
    if (genericRecord.getSchema().getField(isDeleteKey) == null) {
        return false;
    }
    Object deleteMarker = genericRecord.get(isDeleteKey);
    return (deleteMarker instanceof Boolean && (boolean) deleteMarker);
}

}

@Sugamber
Copy link
Author

Sugamber commented Mar 23, 2021

Can this use case be achieved using Hudi as target schema and incremental schema are not same?

@Sugamber
Copy link
Author

@n3nash Please confirm if this use case can be achieved . If yes, provide few inputs.

@Sugamber
Copy link
Author

Sugamber commented Mar 24, 2021

There is an open pull request for partial update for CoW table.
#1929

It looks like my use case is similar to this .

@Sugamber
Copy link
Author

@nsivabalan Do we have any timeline for this pull request ?
Pull request 1- #1929
Pull request 2- #2666

@nsivabalan
Copy link
Contributor

nsivabalan commented Mar 26, 2021

@Sugamber : we are currently busy with an upcoming release. Once completed, I will start reviewing this work item. And yes, the linked PRs are similar to your ask. Guess there are few other folks who were interested in this. We can target it for next release.

@n3nash
Copy link
Contributor

n3nash commented Mar 30, 2021

@Sugamber You code looks correct. Here is the flow :

  1. InputDF -> DF -> DF<HoodieRecord (PartialUpdatePayload (bytes))
  2. In-batch dedup, combine of records with same record key -> preCombine(..) -> getInsertValue(incremental_schema)
  3. Perform upsert
  4. combineAndGetUpdateValue(record_from_disk, incremental_schema) -> getInsertValue(incremental_schema)

Now, if your target schema (schema of the record_from_disk) is different from the incremental_schema, that is not a problem as long as target_schema and incremental_schema are backwards compatible.

At a high level, the incremental_schema should always be a superset (all fields + new fields) of the target schema

@Sugamber
Copy link
Author

Thank you!!!

@n3nash
Copy link
Contributor

n3nash commented Apr 2, 2021

@Sugamber If your issue is addressed, please close this issue.

@n3nash n3nash closed this as completed Apr 2, 2021
@n3nash n3nash reopened this Apr 2, 2021
@Sugamber
Copy link
Author

Sugamber commented Apr 4, 2021

@n3nash I'm waiting for below pull request to merge. Please let me know if we can close once pull request merged.
#2666

@nsivabalan
Copy link
Contributor

We can close out this issue as we have a tracking jira and a PR being actively reviewed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants