Skip to content

Commit

Permalink
[improvement](spark-load) support parquet and orc file (apache#13438)
Browse files Browse the repository at this point in the history
Add support for parquet/orc in SparkDpp.java
Fixed sparkDpp checkstyle issue
  • Loading branch information
liujinhui1994 authored and chenjie committed Oct 20, 2022
1 parent 6025e2a commit 563dc29
Showing 1 changed file with 26 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.google.common.base.Strings;
import com.google.gson.Gson;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -75,7 +76,6 @@
import java.util.Map;
import java.util.Queue;
import java.util.Set;

// This class is a Spark-based data preprocessing program,
// which will make use of the distributed compute framework of spark to
// do ETL job/sort/preaggregate jobs in spark job
Expand All @@ -87,6 +87,7 @@
// 2. repartition data by using doris data model(partition and bucket)
// 3. process aggregation if needed
// 4. write data to parquet file

public final class SparkDpp implements java.io.Serializable {
private static final Logger LOG = LoggerFactory.getLogger(SparkDpp.class);

Expand Down Expand Up @@ -212,7 +213,6 @@ private void writeRepartitionAndSortedRDDToParquet(JavaPairRDD<List<Object>, Obj
continue;
}


String curBucketKey = keyColumns.get(0).toString();
List<Object> columnObjects = new ArrayList<>();
for (int i = 1; i < keyColumns.size(); ++i) {
Expand Down Expand Up @@ -620,6 +620,30 @@ private Dataset<Row> loadDataFromPath(SparkSession spark,
if (fileGroup.columnsFromPath != null) {
srcColumnsWithColumnsFromPath.addAll(fileGroup.columnsFromPath);
}

if (fileGroup.fileFormat.equalsIgnoreCase("parquet")) {
// parquet had its own schema, just use it; perhaps we could add some validation in future.
Dataset<Row> dataFrame = spark.read().parquet(fileUrl);
if (!CollectionUtils.isEmpty(columnValueFromPath)) {
for (int k = 0; k < columnValueFromPath.size(); k++) {
dataFrame = dataFrame.withColumn(
fileGroup.columnsFromPath.get(k), functions.lit(columnValueFromPath.get(k)));
}
}
return dataFrame;
}

if (fileGroup.fileFormat.equalsIgnoreCase("orc")) {
Dataset<Row> dataFrame = spark.read().orc(fileUrl);
if (!CollectionUtils.isEmpty(columnValueFromPath)) {
for (int k = 0; k < columnValueFromPath.size(); k++) {
dataFrame = dataFrame.withColumn(
fileGroup.columnsFromPath.get(k), functions.lit(columnValueFromPath.get(k)));
}
}
return dataFrame;
}

StructType srcSchema = createScrSchema(srcColumnsWithColumnsFromPath);
JavaRDD<String> sourceDataRdd = spark.read().textFile(fileUrl).toJavaRDD();
int columnSize = dataSrcColumns.size();
Expand Down

0 comments on commit 563dc29

Please sign in to comment.