Skip to content

Commit

Permalink
support spark3.1.x
Browse files Browse the repository at this point in the history
  • Loading branch information
xiarixiaoyao committed Mar 1, 2022
1 parent 84b622b commit fa9cee1
Show file tree
Hide file tree
Showing 11 changed files with 1,463 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import org.apache.hudi.table.BulkInsertPartitioner
import org.apache.log4j.LogManager
import org.apache.spark.SPARK_VERSION
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.types.StructType
Expand Down
37 changes: 36 additions & 1 deletion hudi-spark-datasource/hudi-spark3.1.x/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark3.version}</version>
<version>${spark31.version}</version>
<optional>true</optional>
</dependency>

Expand All @@ -182,16 +182,36 @@
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark-client</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark-common_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark3-common</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Hoodie - Test -->
Expand All @@ -203,14 +223,22 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark-client</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<type>test-jar</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-common</artifactId>
Expand All @@ -219,13 +247,20 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark-common_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<type>test-jar</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.parquet;

import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hudi.client.utils.SparkSchemaUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector;
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ColumnarBatch;

import java.io.IOException;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.Map;

public class Spark312HoodieVectorizedParquetRecordReader extends VectorizedParquetRecordReader {

// save the col type change info.
private Map<Integer, Pair<DataType, DataType>> typeChangeInfos;

private ColumnarBatch columnarBatch;

private Map<Integer, WritableColumnVector> idToColumnVectors;

private WritableColumnVector[] columnVectors;

// The capacity of vectorized batch.
private int capacity;

// If true, this class returns batches instead of rows.
private boolean returnColumnarBatch;

// The memory mode of the columnarBatch.
private final MemoryMode memoryMode;

/**
* Batch of rows that we assemble and the current index we've returned. Every time this
* batch is used up (batchIdx == numBatched), we populated the batch.
*/
private int batchIdx = 0;
private int numBatched = 0;

public Spark312HoodieVectorizedParquetRecordReader(
ZoneId convertTz,
String datetimeRebaseMode,
String int96RebaseMode,
boolean useOffHeap,
int capacity,
Map<Integer, Pair<DataType, DataType>> typeChangeInfos) {
super(convertTz, datetimeRebaseMode, int96RebaseMode, useOffHeap, capacity);
memoryMode = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP;
this.typeChangeInfos = typeChangeInfos;
this.capacity = capacity;
}

@Override
public void initBatch(StructType partitionColumns, InternalRow partitionValues) {
super.initBatch(partitionColumns, partitionValues);
if (columnVectors == null) {
columnVectors = new WritableColumnVector[sparkSchema.length() + partitionColumns.length()];
}
if (idToColumnVectors == null) {
idToColumnVectors = new HashMap<>();
typeChangeInfos.entrySet()
.stream()
.forEach(f -> {
WritableColumnVector vector =
memoryMode == MemoryMode.OFF_HEAP ? new OffHeapColumnVector(capacity, f.getValue().getLeft()) : new OnHeapColumnVector(capacity, f.getValue().getLeft());
idToColumnVectors.put(f.getKey(), vector);
});
}
}

@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException, UnsupportedOperationException {
super.initialize(inputSplit, taskAttemptContext);
}

@Override
public void close() throws IOException {
super.close();
for (Map.Entry<Integer, WritableColumnVector> e : idToColumnVectors.entrySet()) {
e.getValue().close();
}
idToColumnVectors = null;
columnarBatch = null;
columnVectors = null;
}

@Override
public ColumnarBatch resultBatch() {
ColumnarBatch currentColumnBatch = super.resultBatch();
boolean changed = false;
for (Map.Entry<Integer, Pair<DataType, DataType>> entry : typeChangeInfos.entrySet()) {
boolean rewrite = SparkSchemaUtils
.convertColumnVectorType((WritableColumnVector) currentColumnBatch.column(entry.getKey()),
idToColumnVectors.get(entry.getKey()), currentColumnBatch.numRows());
if (rewrite) {
changed = true;
columnVectors[entry.getKey()] = idToColumnVectors.get(entry.getKey());
}
}
if (changed) {
if (columnarBatch == null) {
// fill other vector
for (int i = 0; i < columnVectors.length; i++) {
if (columnVectors[i] == null) {
columnVectors[i] = (WritableColumnVector) currentColumnBatch.column(i);
}
}
columnarBatch = new ColumnarBatch(columnVectors);
}
columnarBatch.setNumRows(currentColumnBatch.numRows());
return columnarBatch;
} else {
return currentColumnBatch;
}
}

@Override
public boolean nextBatch() throws IOException {
boolean result = super.nextBatch();
if (idToColumnVectors != null) {
idToColumnVectors.entrySet().stream().forEach(e -> e.getValue().reset());
}
numBatched = resultBatch().numRows();
batchIdx = 0;
return result;
}

@Override
public void enableReturningBatches() {
returnColumnarBatch = true;
super.enableReturningBatches();
}

@Override
public Object getCurrentValue() {
if (typeChangeInfos == null || typeChangeInfos.isEmpty()) {
return super.getCurrentValue();
}

if (returnColumnarBatch) {
return columnarBatch == null ? super.getCurrentValue() : columnarBatch;
}

return columnarBatch == null ? super.getCurrentValue() : columnarBatch.getRow(batchIdx - 1);
}

@Override
public boolean nextKeyValue() throws IOException {
resultBatch();

if (returnColumnarBatch) {
return nextBatch();
}

if (batchIdx >= numBatched) {
if (!nextBatch()) {
return false;
}
}
++batchIdx;
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
import org.apache.spark.sql.types.DataType

/**
* ALTER TABLE ... ADD COLUMNS command, as parsed from SQL.
*/
case class HoodieAlterTableAddColumnsStatement(
tableName: Seq[String],
columnsToAdd: Seq[QualifiedColType]) extends ParsedStatement

/**
* ALTER TABLE ... CHANGE COLUMN command, as parsed from SQL.
*/
case class HoodieAlterTableAlterColumnStatement(
tableName: Seq[String],
column: Seq[String],
dataType: Option[DataType],
nullable: Option[Boolean],
comment: Option[String],
position: Option[ColumnPosition]) extends ParsedStatement


/**
* ALTER TABLE ... RENAME COLUMN command, as parsed from SQL.
*/
case class HoodieAlterTableRenameColumnStatement(
tableName: Seq[String],
column: Seq[String],
newName: String) extends ParsedStatement

/**
* ALTER TABLE ... DROP COLUMNS command, as parsed from SQL.
*/
case class HoodieAlterTableDropColumnsStatement(
tableName: Seq[String], columnsToDrop: Seq[Seq[String]]) extends ParsedStatement

/**
* ALTER TABLE ... SET TBLPROPERTIES command, as parsed from SQL.
*/
case class HoodieAlterTableSetPropertiesStatement(
tableName: Seq[String], properties: Map[String, String]) extends ParsedStatement

/**
* ALTER TABLE ... UNSET TBLPROPERTIES command, as parsed from SQL.
*/
case class HoodieAlterTableUnsetPropertiesStatement(
tableName: Seq[String], propertyKeys: Seq[String], ifExists: Boolean) extends ParsedStatement

0 comments on commit fa9cee1

Please sign in to comment.