Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][HUDI-3088] Use Spark 3.2 as default Spark version #5419

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 15 additions & 14 deletions azure-pipelines.yml
Expand Up @@ -23,9 +23,10 @@ pool:

variables:
MAVEN_OPTS: '-Dcheckstyle.skip=true -Drat.skip=true -Djacoco.skip=true'
SPARK_VERSION: '2.4.4'
HADOOP_VERSION: '2.7'
SPARK_VERSION: '3.2.1'
HADOOP_VERSION: '3.2'
SPARK_ARCHIVE: spark-$(SPARK_VERSION)-bin-hadoop$(HADOOP_VERSION)
SPARK_PROFILE: scala-2.12,spark3
EXCLUDE_TESTED_MODULES: '!hudi-examples/hudi-examples-common,!hudi-examples/hudi-examples-flink,!hudi-examples/hudi-examples-java,!hudi-examples/hudi-examples-spark,!hudi-common,!hudi-flink-datasource/hudi-flink,!hudi-client/hudi-spark-client,!hudi-client/hudi-client-common,!hudi-client/hudi-flink-client,!hudi-client/hudi-java-client,!hudi-cli,!hudi-utilities,!hudi-sync/hudi-hive-sync'

stages:
Expand All @@ -40,7 +41,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'clean install'
options: -T 2.5C -DskipTests
options: -T 2.5C -DskipTests -P $(SPARK_PROFILE)
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
Expand All @@ -49,7 +50,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'test'
options: -Punit-tests -pl hudi-common,hudi-flink-datasource/hudi-flink,hudi-client/hudi-spark-client
options: -P $(SPARK_PROFILE),unit-tests -pl hudi-common,hudi-flink-datasource/hudi-flink,hudi-client/hudi-spark-client
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
Expand All @@ -58,7 +59,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'test'
options: -Pfunctional-tests -pl hudi-common,hudi-flink-datasource/hudi-flink
options: -P $(SPARK_PROFILE),functional-tests -pl hudi-common,hudi-flink-datasource/hudi-flink
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
Expand All @@ -71,7 +72,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'clean install'
options: -T 2.5C -DskipTests
options: -T 2.5C -DskipTests -P $(SPARK_PROFILE)
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
Expand All @@ -80,7 +81,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'test'
options: -Pfunctional-tests -pl hudi-client/hudi-spark-client
options: -P $(SPARK_PROFILE),functional-tests -pl hudi-client/hudi-spark-client
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
Expand All @@ -93,7 +94,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'clean install'
options: -T 2.5C -DskipTests
options: -T 2.5C -P $(SPARK_PROFILE) -DskipTests
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
Expand All @@ -102,7 +103,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'test'
options: -Punit-tests -pl hudi-client/hudi-client-common,hudi-client/hudi-flink-client,hudi-client/hudi-java-client,hudi-cli,hudi-utilities,hudi-sync/hudi-hive-sync
options: -P $(SPARK_PROFILE),unit-tests -pl hudi-client/hudi-client-common,hudi-client/hudi-flink-client,hudi-client/hudi-java-client,hudi-cli,hudi-utilities,hudi-sync/hudi-hive-sync
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
Expand All @@ -111,7 +112,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'test'
options: -Pfunctional-tests -pl hudi-client/hudi-client-common,hudi-client/hudi-flink-client,hudi-client/hudi-java-client,hudi-cli,hudi-utilities,hudi-sync/hudi-hive-sync
options: -P $(SPARK_PROFILE),functional-tests -pl hudi-client/hudi-client-common,hudi-client/hudi-flink-client,hudi-client/hudi-java-client,hudi-cli,hudi-utilities,hudi-sync/hudi-hive-sync
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
Expand All @@ -124,7 +125,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'clean install'
options: -T 2.5C -DskipTests
options: -T 2.5C -DskipTests -P $(SPARK_PROFILE)
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
Expand All @@ -133,7 +134,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'test'
options: -Punit-tests -pl $(EXCLUDE_TESTED_MODULES)
options: -P $(SPARK_PROFILE),unit-tests -pl $(EXCLUDE_TESTED_MODULES)
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
Expand All @@ -142,7 +143,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'test'
options: -Pfunctional-tests -pl $(EXCLUDE_TESTED_MODULES)
options: -P $(SPARK_PROFILE),functional-tests -pl $(EXCLUDE_TESTED_MODULES)
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
Expand All @@ -162,5 +163,5 @@ stages:
tar -xvf $(Pipeline.Workspace)/$(SPARK_ARCHIVE).tgz -C $(Pipeline.Workspace)/
mkdir /tmp/spark-events/
- script: |
mvn $(MAVEN_OPTS) -Pintegration-tests verify
mvn $(MAVEN_OPTS) -P $(SPARK_PROFILE),integration-tests verify
displayName: IT
Expand Up @@ -19,6 +19,7 @@

package org.apache.hudi.io.storage;

import org.apache.avro.AvroRuntimeException;
import org.apache.hudi.common.bloom.BloomFilter;

import org.apache.avro.Schema;
Expand Down Expand Up @@ -49,21 +50,20 @@
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Abstract class for unit tests of {@link HoodieFileReader} and {@link HoodieFileWriter}
* for different file format
* Abstract class for unit tests of {@link HoodieFileReader} and {@link HoodieFileWriter} for
* different file format
*/
public abstract class TestHoodieReaderWriterBase {
protected static final int NUM_RECORDS = 50;
@TempDir
protected File tempDir;
@TempDir protected File tempDir;

protected abstract Path getFilePath();

protected abstract HoodieFileWriter<GenericRecord> createWriter(
Schema avroSchema, boolean populateMetaFields) throws Exception;

protected abstract HoodieFileReader<GenericRecord> createReader(
Configuration conf) throws Exception;
protected abstract HoodieFileReader<GenericRecord> createReader(Configuration conf)
throws Exception;

protected abstract void verifyMetadata(Configuration conf) throws IOException;

Expand All @@ -80,7 +80,8 @@ public void clearTempFile() {

@Test
public void testWriteReadMetadata() throws Exception {
Schema avroSchema = getSchemaFromResource(TestHoodieReaderWriterBase.class, "/exampleSchema.avsc");
Schema avroSchema =
getSchemaFromResource(TestHoodieReaderWriterBase.class, "/exampleSchema.avsc");
writeFileWithSimpleSchema();

Configuration conf = new Configuration();
Expand Down Expand Up @@ -145,10 +146,12 @@ public void testWriteReadWithEvolvedSchema() throws Exception {

Configuration conf = new Configuration();
HoodieFileReader<GenericRecord> hoodieReader = createReader(conf);
String[] schemaList = new String[] {
"/exampleEvolvedSchema.avsc", "/exampleEvolvedSchemaChangeOrder.avsc",
"/exampleEvolvedSchemaColumnRequire.avsc", "/exampleEvolvedSchemaColumnType.avsc",
"/exampleEvolvedSchemaDeleteColumn.avsc"};
String[] schemaList =
new String[] {
"/exampleEvolvedSchema.avsc", "/exampleEvolvedSchemaChangeOrder.avsc",
"/exampleEvolvedSchemaColumnRequire.avsc", "/exampleEvolvedSchemaColumnType.avsc",
"/exampleEvolvedSchemaDeleteColumn.avsc"
};

for (String evolvedSchemaPath : schemaList) {
verifyReaderWithSchema(evolvedSchemaPath, hoodieReader);
Expand All @@ -164,7 +167,8 @@ public void testReaderFilterRowKeys() throws Exception {
}

protected void writeFileWithSimpleSchema() throws Exception {
Schema avroSchema = getSchemaFromResource(TestHoodieReaderWriterBase.class, "/exampleSchema.avsc");
Schema avroSchema =
getSchemaFromResource(TestHoodieReaderWriterBase.class, "/exampleSchema.avsc");
HoodieFileWriter<GenericRecord> writer = createWriter(avroSchema, true);
for (int i = 0; i < NUM_RECORDS; i++) {
GenericRecord record = new GenericData.Record(avroSchema);
Expand Down Expand Up @@ -217,15 +221,24 @@ protected void verifyComplexRecords(Iterator<GenericRecord> iterator) {
}

private void verifyFilterRowKeys(HoodieFileReader<GenericRecord> hoodieReader) {
Set<String> candidateRowKeys = IntStream.range(40, NUM_RECORDS * 2)
.mapToObj(i -> "key" + String.format("%02d", i)).collect(Collectors.toCollection(TreeSet::new));
List<String> expectedKeys = IntStream.range(40, NUM_RECORDS)
.mapToObj(i -> "key" + String.format("%02d", i)).sorted().collect(Collectors.toList());
assertEquals(expectedKeys, hoodieReader.filterRowKeys(candidateRowKeys)
.stream().sorted().collect(Collectors.toList()));
Set<String> candidateRowKeys =
IntStream.range(40, NUM_RECORDS * 2)
.mapToObj(i -> "key" + String.format("%02d", i))
.collect(Collectors.toCollection(TreeSet::new));
List<String> expectedKeys =
IntStream.range(40, NUM_RECORDS)
.mapToObj(i -> "key" + String.format("%02d", i))
.sorted()
.collect(Collectors.toList());
assertEquals(
expectedKeys,
hoodieReader.filterRowKeys(candidateRowKeys).stream()
.sorted()
.collect(Collectors.toList()));
}

private void verifyReaderWithSchema(String schemaPath, HoodieFileReader<GenericRecord> hoodieReader) throws IOException {
private void verifyReaderWithSchema(
String schemaPath, HoodieFileReader<GenericRecord> hoodieReader) throws IOException {
Schema evolvedSchema = getSchemaFromResource(TestHoodieReaderWriterBase.class, schemaPath);
Iterator<GenericRecord> iter = hoodieReader.getRecordIterator(evolvedSchema);
int index = 0;
Expand All @@ -242,10 +255,18 @@ private void verifyRecord(String schemaPath, GenericRecord record, int index) {
if ("/exampleEvolvedSchemaColumnType.avsc".equals(schemaPath)) {
assertEquals(Integer.toString(index), record.get("number").toString());
} else if ("/exampleEvolvedSchemaDeleteColumn.avsc".equals(schemaPath)) {
assertNull(record.get("number"));
assertIfFieldExistsInRecord(record, "number");
} else {
assertEquals(index, record.get("number"));
}
assertNull(record.get("added_field"));
assertIfFieldExistsInRecord(record, "added_field");
}

private void assertIfFieldExistsInRecord(GenericRecord record, String field) {
try {
assertNull(record.get(field));
} catch (AvroRuntimeException e) {
assertEquals("Not a valid schema field: " + field, e.getMessage());
}
}
}
34 changes: 34 additions & 0 deletions hudi-client/hudi-spark-client/pom.xml
Expand Up @@ -48,10 +48,30 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.orc</groupId>
<artifactId>orc-mapreduce</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Parquet -->
Expand All @@ -60,6 +80,14 @@
<artifactId>parquet-avro</artifactId>
</dependency>

<!-- org.codehaus.jackson test -->
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
<version>${codehaus-jackson.version}</version>
<scope>test</scope>
</dependency>

<!-- Hoodie - Test -->
<dependency>
<groupId>org.apache.hudi</groupId>
Expand Down Expand Up @@ -174,6 +202,12 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.thoughtworks.paranamer</groupId>
<artifactId>paranamer</artifactId>
<version>2.8</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Expand Up @@ -108,6 +108,10 @@ public class TestSparkHoodieHBaseIndex extends SparkClientFunctionalTestHarness
@BeforeAll
public static void init() throws Exception {
// Initialize HbaseMiniCluster
System.setProperty("zookeeper.preAllocSize", "100");
System.setProperty("zookeeper.maxCnxns", "60");
System.setProperty("zookeeper.4lw.commands.whitelist", "*");

hbaseConfig = HBaseConfiguration.create();
hbaseConfig.set("zookeeper.znode.parent", "/hudi-hbase-test");

Expand Down
Expand Up @@ -409,11 +409,15 @@ protected void initDFSMetaClient() throws IOException {
protected void cleanupDFS() throws IOException {
if (hdfsTestService != null) {
hdfsTestService.stop();
dfsCluster.shutdown();
hdfsTestService = null;
}

if (dfsCluster != null) {
dfsCluster.shutdown();
dfsCluster = null;
dfs = null;
}

// Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
// same JVM
FileSystem.closeAll();
Expand Down
Expand Up @@ -510,14 +510,15 @@ public static Object getNestedFieldVal(GenericRecord record, String fieldName, b
try {
for (; i < parts.length; i++) {
String part = parts[i];
Field field = valueNode.getSchema().getField(part);
Object val = valueNode.get(part);
if (val == null) {
if (field == null || val == null) {
break;
}

// return, if last part of name
if (i == parts.length - 1) {
Schema fieldSchema = valueNode.getSchema().getField(part).schema();
Schema fieldSchema = field.schema();
return convertValueForSpecificDataTypes(fieldSchema, val, consistentLogicalTimestampEnabled);
} else {
// VC: Need a test here
Expand Down
Expand Up @@ -18,15 +18,15 @@

package org.apache.hudi.common.model.debezium;

import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.util.Option;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.util.Option;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import javax.annotation.Nullable;
import java.io.IOException;

/**
Expand Down Expand Up @@ -72,11 +72,21 @@ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue

protected abstract boolean shouldPickCurrentRecord(IndexedRecord currentRecord, IndexedRecord insertRecord, Schema schema) throws IOException;

@Nullable
private static Object getFieldVal(GenericRecord record, String fieldName) {
Schema.Field recordField = record.getSchema().getField(fieldName);
if (recordField == null) {
return null;
}

return record.get(recordField.pos());
}

private Option<IndexedRecord> handleDeleteOperation(IndexedRecord insertRecord) {
boolean delete = false;
if (insertRecord instanceof GenericRecord) {
GenericRecord record = (GenericRecord) insertRecord;
Object value = record.get(DebeziumConstants.FLATTENED_OP_COL_NAME);
Object value = getFieldVal(record, DebeziumConstants.FLATTENED_OP_COL_NAME);
delete = value != null && value.toString().equalsIgnoreCase(DebeziumConstants.DELETE_OP);
}

Expand All @@ -86,4 +96,4 @@ private Option<IndexedRecord> handleDeleteOperation(IndexedRecord insertRecord)
private IndexedRecord getInsertRecord(Schema schema) throws IOException {
return super.getInsertValue(schema).get();
}
}
}