Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,16 @@ public abstract O bulkInsertPreppedRecords(I preppedRecords, final String instan
*/
public abstract O delete(K keys, final String instantTime);

/**
* Deletes records from a Hoodie table based on {@link HoodieKey} and {@link HoodieRecordLocation} specified in the
* input records.
*
* @param preppedRecords Empty records with key and locator set.
* @param instantTime Commit time handle.
* @return Collection of WriteStatus to inspect errors and counts.
*/
public abstract O deletePrepped(I preppedRecords, final String instantTime);

/**
* Common method containing steps to be performed before write (upsert/insert/...
* @param instantTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,16 @@ public abstract HoodieWriteMetadata<O> bulkInsert(HoodieEngineContext context, S
*/
public abstract HoodieWriteMetadata<O> delete(HoodieEngineContext context, String instantTime, K keys);

/**
* Delete records from Hoodie table, with same keys as that of input records, at the supplied instantTime. {@link HoodieKey}s.
*
* @param context {@link HoodieEngineContext}.
* @param instantTime Instant Time for the action.
* @param preppedRecords hoodie records to be deleted based on record key and locator.
* @return {@link HoodieWriteMetadata}
*/
public abstract HoodieWriteMetadata<O> deletePrepped(HoodieEngineContext context, String instantTime, I preppedRecords);

/**
* Deletes all data of partitions.
* @param context HoodieEngineContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ public List<WriteStatus> delete(List<HoodieKey> keys, String instantTime) {
return postWrite(result, instantTime, table);
}

@Override
public List<WriteStatus> deletePrepped(List<HoodieRecord<T>> preppedRecords, final String instantTime) {
throw new HoodieNotSupportedException("DeletePrepped operation is not supported yet");
}

public List<WriteStatus> deletePartitions(List<String> partitions, String instantTime) {
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
initTable(WriteOperationType.DELETE_PARTITION, Option.ofNullable(instantTime));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,11 @@ public HoodieWriteMetadata<List<WriteStatus>> delete(HoodieEngineContext context
throw new HoodieNotSupportedException("This method should not be invoked");
}

@Override
public HoodieWriteMetadata<List<WriteStatus>> deletePrepped(HoodieEngineContext context, String instantTime, List<HoodieRecord<T>> preppedRecords) {
throw new HoodieNotSupportedException("This method should not be invoked");
}

@Override
public HoodieWriteMetadata<List<WriteStatus>> deletePartitions(HoodieEngineContext context, String instantTime, List<String> partitions) {
return new FlinkDeletePartitionCommitActionExecutor<>(context, config, this, instantTime, partitions).execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,15 @@ public List<WriteStatus> delete(List<HoodieKey> keys,
return postWrite(result, instantTime, table);
}

@Override
public List<WriteStatus> deletePrepped(List<HoodieRecord<T>> preppedRecords, final String instantTime) {
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
initTable(WriteOperationType.DELETE, Option.ofNullable(instantTime));
preWrite(instantTime, WriteOperationType.DELETE, table.getMetaClient());
HoodieWriteMetadata<List<WriteStatus>> result = table.deletePrepped(context,instantTime, preppedRecords);
return postWrite(result, instantTime, table);
}

@Override
protected List<WriteStatus> postWrite(HoodieWriteMetadata<List<WriteStatus>> result,
String instantTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.hudi.table.action.commit.JavaBulkInsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaBulkInsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaDeleteCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaDeletePreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaInsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaInsertOverwriteCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaInsertOverwriteTableCommitActionExecutor;
Expand Down Expand Up @@ -120,6 +121,12 @@ public HoodieWriteMetadata<List<WriteStatus>> delete(HoodieEngineContext context
return new JavaDeleteCommitActionExecutor<>(context, config, this, instantTime, keys).execute();
}

@Override
public HoodieWriteMetadata<List<WriteStatus>> deletePrepped(HoodieEngineContext context, String instantTime, List<HoodieRecord<T>> preppedRecords) {
return new JavaDeletePreppedCommitActionExecutor<>((HoodieJavaEngineContext) context, config,
this, instantTime, preppedRecords).execute();
}

@Override
public HoodieWriteMetadata deletePartitions(HoodieEngineContext context, String instantTime, List<String> partitions) {
throw new HoodieNotSupportedException("Delete partitions is not supported yet");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.table.action.commit;

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;

import java.util.List;

public class JavaDeletePreppedCommitActionExecutor<T>
extends BaseJavaCommitActionExecutor<T> {

private final List<HoodieRecord<T>> preppedRecords;

public JavaDeletePreppedCommitActionExecutor(HoodieJavaEngineContext context,
HoodieWriteConfig config, HoodieTable table,
String instantTime, List<HoodieRecord<T>> preppedRecords) {
super(context, config, table, instantTime, WriteOperationType.DELETE);
this.preppedRecords = preppedRecords;
}

@Override
public HoodieWriteMetadata<List<WriteStatus>> execute() {
return super.execute(preppedRecords);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,15 @@ public JavaRDD<WriteStatus> delete(JavaRDD<HoodieKey> keys, String instantTime)
return postWrite(resultRDD, instantTime, table);
}

@Override
public JavaRDD<WriteStatus> deletePrepped(JavaRDD<HoodieRecord<T>> preppedRecord, String instantTime) {
HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table = initTable(WriteOperationType.DELETE, Option.ofNullable(instantTime));
preWrite(instantTime, WriteOperationType.DELETE, table.getMetaClient());
HoodieWriteMetadata<HoodieData<WriteStatus>> result = table.deletePrepped(context,instantTime, HoodieJavaRDD.of(preppedRecord));
HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
return postWrite(resultRDD, instantTime, table);
}

public HoodieWriteResult deletePartitions(List<String> partitions, String instantTime) {
HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table = initTable(WriteOperationType.DELETE_PARTITION, Option.ofNullable(instantTime));
preWrite(instantTime, WriteOperationType.DELETE_PARTITION, table.getMetaClient());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.apache.avro.Schema;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.client.model.HoodieInternalRow;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.keygen.BaseKeyGenerator;
Expand Down Expand Up @@ -83,6 +81,7 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
*/
private final transient StructType schema;

private boolean isDeleted;
public HoodieSparkRecord(UnsafeRow data) {
this(data, null);
}
Expand All @@ -93,6 +92,8 @@ public HoodieSparkRecord(InternalRow data, StructType schema) {
validateRow(data, schema);
this.copy = false;
this.schema = schema;

isDeleted = data == null;
}

public HoodieSparkRecord(HoodieKey key, UnsafeRow data, boolean copy) {
Expand All @@ -105,6 +106,7 @@ public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, boo
validateRow(data, schema);
this.copy = copy;
this.schema = schema;
isDeleted = data == null;
}

private HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, HoodieOperation operation, boolean copy) {
Expand All @@ -113,6 +115,7 @@ private HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, Ho
validateRow(data, schema);
this.copy = copy;
this.schema = schema;
isDeleted = data == null;
}

public HoodieSparkRecord(
Expand All @@ -128,6 +131,10 @@ public HoodieSparkRecord(
this.schema = schema;
}

public boolean isDeleted() {
return isDeleted;
}

@Override
public HoodieSparkRecord newInstance() {
return new HoodieSparkRecord(this.key, this.data, this.schema, this.operation, this.copy);
Expand Down Expand Up @@ -428,10 +435,11 @@ private static void validateRow(InternalRow data, StructType schema) {
// In case provided row is anything but [[UnsafeRow]], it's expected that the
// corresponding schema has to be provided as well so that it could be properly
// serialized (in case it would need to be)
boolean isValid = data instanceof UnsafeRow
|| schema != null && (data instanceof HoodieInternalRow || SparkAdapterSupport$.MODULE$.sparkAdapter().isColumnarBatchRow(data));
// akl_todo: data can be null (and this is valid)
// boolean isValid = data instanceof UnsafeRow
// || schema != null && (data instanceof HoodieInternalRow || SparkAdapterSupport$.MODULE$.sparkAdapter().isColumnarBatchRow(data));

ValidationUtils.checkState(isValid);
// ValidationUtils.checkState(isValid);
}

private static Object getValue(StructType structType, String fieldName, InternalRow row) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.hudi.table.action.commit.SparkBulkInsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkDeleteCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkDeletePartitionCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkDeletePreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkInsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkInsertOverwriteCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkInsertOverwriteTableCommitActionExecutor;
Expand Down Expand Up @@ -126,6 +127,11 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> delete(HoodieEngineContext c
return new SparkDeleteCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, keys).execute();
}

@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> deletePrepped(HoodieEngineContext context, String instantTime, HoodieData<HoodieRecord<T>> preppedRecords) {
return new SparkDeletePreppedCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, preppedRecords).execute();
}

@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> deletePartitions(HoodieEngineContext context, String instantTime, List<String> partitions) {
return new SparkDeletePartitionCommitActionExecutor<>(context, config, this, instantTime, partitions).execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.hudi.table.action.deltacommit.SparkBulkInsertDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.SparkBulkInsertPreppedDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.SparkDeleteDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.SparkDeletePreppedDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.SparkInsertDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.SparkInsertPreppedDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitActionExecutor;
Expand Down Expand Up @@ -105,6 +106,11 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> delete(HoodieEngineContext c
return new SparkDeleteDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, keys).execute();
}

@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> deletePrepped(HoodieEngineContext context, String instantTime, HoodieData<HoodieRecord<T>> preppedRecords) {
return new SparkDeletePreppedDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, preppedRecords).execute();
}

@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> upsertPrepped(HoodieEngineContext context, String instantTime,
HoodieData<HoodieRecord<T>> preppedRecords) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.table.action.commit;

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;

public class SparkDeletePreppedCommitActionExecutor<T>
extends BaseSparkCommitActionExecutor<T> {
private final HoodieData<HoodieRecord<T>> preppedRecords;

public SparkDeletePreppedCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table, String instantTime, HoodieData<HoodieRecord<T>> preppedRecords) {
super(context, config, table, instantTime, WriteOperationType.DELETE);
this.preppedRecords = preppedRecords;
}

@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
return super.execute(preppedRecords);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.table.action.deltacommit;

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;

public class SparkDeletePreppedDeltaCommitActionExecutor<T>
extends BaseSparkDeltaCommitActionExecutor<T> {

private final HoodieData<HoodieRecord<T>> preppedRecords;

public SparkDeletePreppedDeltaCommitActionExecutor(HoodieSparkEngineContext context,
HoodieWriteConfig config, HoodieTable table,
String instantTime, HoodieData<HoodieRecord<T>> preppedRecords) {
super(context, config, table, instantTime, WriteOperationType.DELETE);
this.preppedRecords = preppedRecords;
}

@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
return super.execute(preppedRecords);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public static boolean isOverwrite(WriteOperationType operationType) {
public static boolean isDataChange(WriteOperationType operation) {
return operation == WriteOperationType.INSERT
|| operation == WriteOperationType.UPSERT
|| operation == WriteOperationType.UPSERT_PREPPED
|| operation == WriteOperationType.DELETE
|| operation == WriteOperationType.BULK_INSERT
|| operation == WriteOperationType.DELETE_PARTITION
Expand Down
Loading