Skip to content

Commit

Permalink
[Improve](spark-load)update spark version for spark load to resolve c…
Browse files Browse the repository at this point in the history
…ve problem (apache#30368)
  • Loading branch information
gnehil committed Mar 18, 2024
1 parent c2e155e commit 06801d5
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 9 deletions.
2 changes: 1 addition & 1 deletion fe/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ under the License.
<javax.activation.version>1.2.0</javax.activation.version>
<jaxws-api.version>2.3.0</jaxws-api.version>
<RoaringBitmap.version>0.8.13</RoaringBitmap.version>
<spark.version>2.4.6</spark.version>
<spark.version>3.4.1</spark.version>
<hive.version>3.1.3</hive.version>
<hive.common.version>2.3.9</hive.common.version>
<nimbusds.version>9.35</nimbusds.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
Expand Down Expand Up @@ -190,7 +188,6 @@ private void writeRepartitionAndSortedRDDToParquet(JavaPairRDD<List<Object>, Obj
// TODO(wb) should deal largeint as BigInteger instead of string when using biginteger as key,
// data type may affect sorting logic
StructType dstSchema = DppUtils.createDstTableSchema(indexMeta.columns, false, true);
ExpressionEncoder encoder = RowEncoder.apply(dstSchema);

resultRDD.repartitionAndSortWithinPartitions(new BucketPartitioner(bucketKeyMap), new BucketComparator())
.foreachPartition((VoidFunction<Iterator<Tuple2<List<Object>, Object[]>>>) t -> {
Expand Down Expand Up @@ -254,15 +251,13 @@ private void writeRepartitionAndSortedRDDToParquet(JavaPairRDD<List<Object>, Obj
conf.set("spark.sql.parquet.outputTimestampType", "INT96");
ParquetWriteSupport.setSchema(dstSchema, conf);
ParquetWriteSupport parquetWriteSupport = new ParquetWriteSupport();
parquetWriter = new ParquetWriter<InternalRow>(new Path(tmpPath), parquetWriteSupport,
parquetWriter = new ParquetWriter<>(new Path(tmpPath), parquetWriteSupport,
CompressionCodecName.SNAPPY, 256 * 1024 * 1024, 16 * 1024, 1024 * 1024, true, false,
WriterVersion.PARQUET_1_0, conf);
if (parquetWriter != null) {
LOG.info("[HdfsOperate]>> initialize writer succeed! path:" + tmpPath);
}
LOG.info("[HdfsOperate]>> initialize writer succeed! path:" + tmpPath);
lastBucketKey = curBucketKey;
}
InternalRow internalRow = encoder.toRow(rowWithoutBucketKey);
InternalRow internalRow = InternalRow.apply(rowWithoutBucketKey.toSeq());
parquetWriter.write(internalRow);
}
if (parquetWriter != null) {
Expand Down

0 comments on commit 06801d5

Please sign in to comment.