Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24906][SQL] Adaptively enlarge split / partition size for Parq… #21868

Closed
wants to merge 2 commits into from
Closed

[SPARK-24906][SQL] Adaptively enlarge split / partition size for Parq… #21868

wants to merge 2 commits into from

Conversation

habren
Copy link

@habren habren commented Jul 25, 2018

Please refer to https://issues.apache.org/jira/browse/SPARK-24906 for more detail and test

For columnar file, such as, when spark sql read the table, each split will be 128 MB by default since spark.sql.files.maxPartitionBytes is default to 128MB. Even when user set it to a large value, such as 512MB, the task may read only few MB or even hundreds of KB. Because the table (Parquet) may consists of dozens of columns while the SQL only need few columns. And spark will prune the unnecessary columns.

In this case, spark DataSourceScanExec can enlarge maxPartitionBytes adaptively.

For example, there is 40 columns , 20 are integer while another 20 are long. When use query on an integer type column and an long type column, the maxPartitionBytes should be 20 times larger. (204+208) / (4+8) = 20.

With this optimization, the number of task will be smaller and the job will run faster. More importantly, for a very large cluster (more the 10 thousand nodes), it will relieve RM's schedule pressure.

Here is the test

The table named test2 has more than 40 columns and there are more than 5 TB data each hour.

When we issue a very simple query

select count(device_id) from test2 where date=20180708 and hour='23'

There are 72176 tasks and the duration of the job is 4.8 minutes

Most tasks last less than 1 second and read less than 1.5 MB data

After the optimization, there are only 1615 tasks and the job last only 30 seconds. It almost 10 times faster.

The median of read data is 44.2MB.

https://issues.apache.org/jira/browse/SPARK-24906

@maropu
Copy link
Member

maropu commented Jul 25, 2018

Thanks for the work, but, probably, we first need consensus to work on this because this part is pretty performance-sensitive... As @viirya described in the jira, I think we need more general approach than the current fix (for example, I'm not sure that this fix don't have any performance degression).


val PARQUET_ARRAY_LENGTH = buildConf("spark.sql.parquet.array.length")
.intConf
.createWithDefault(ArrayType.defaultConcreteType.defaultSize)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ArrayType.defaultConcreteType is ArrayType(NullType, containsNull = true). I think using this you won't get a reasonable number.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your comments. I set the default value to StringType.defaultSize (8). It's default size, use should configure it according to the real data

.reduceOption(_ + _).getOrElse(StringType.defaultSize)
val totalColumnSize = relation.dataSchema.map(_.dataType).map(getTypeLength(_))
.reduceOption(_ + _).getOrElse(StringType.defaultSize)
val multiplier = totalColumnSize / selectedColumnSize
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems here you can only get the ratio of selected columns to total columns. The actual type sizes are not put into consideration.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are many data types. CalendarIntervalType StructType MapType NullType UserDefinedType AtomicType(TimestampType StringType HiveStringType BooleanType DateType BinaryType NumericType) ObjectType ArrayType. For AtomicType, the size is fixed to the defaultSize. For complex type, such as StructType, MapType, ArrayType, the size is mutable. So I make it configurable with default value. With the data type size, multiplier is not only the ratio of selected columns to total columns, but the total size of selected columns to total size of all columns.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya As defined in getTypeLength, user can define the complex types' length as per the data statistics. And the length for AtomicType can be determined by AtomicType.defaultSize. So the multiplier is the ratio of the total length of the selected columns to the total length of all columns.

def getTypeLength (dataType : DataType) : Int = {
if (dataType.isInstanceOf[StructType]) {
fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength
} else if (dataType.isInstanceOf[ArrayType]) {
fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength
} else if (dataType.isInstanceOf[MapType]) {
fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength
} else {
dataType.defaultSize
}
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya Now it also support ORC. Please help to review

@HyukjinKwon
Copy link
Member

BTW, why does this PR target branch-2.3? I think it should be master.

@habren
Copy link
Author

habren commented Jul 26, 2018

@maropu If I understand correct, your concern is about how to calculate the multiplier.

Here is my idea. A parquet file has several row groups and each group consists of many columns. Typically, we make the maxPartitionBytes to 128MB which is the same as the block size and the row group size. As recommended by parquet, the row group size and hdfs block size should be 512MB or 1GB. In our environment, it's 512MB.

In DataSourceScanExec, all files are splitted as per maxPartitionBytes (default to 128MB) and openCostBytes (4MB) no matter is columnar file or row based file. And each task will read a split / partition (typically 128MB). For row based file, the read bytes for a split is 128MB. But for column file, the read bytes is typically much smaller than 128MB because a split / row group consist of several columns and only few of them are read because of column pruning. As a result, the read types of the task is very small. So I want to try our best to make sure a Task read 128MB (or some other value according to maxPartitionBytes and openCostBytes). And the idea is enlarge the split / partition according to how the columns are pruned.

I don't think there will be performance downgrade if the read bytes is approximately equal to maxPartitionBytes. Because, with this fix, the Task will read the same data volume no matter it's column file or row based file.

image

@habren habren closed this Jul 26, 2018
@habren habren reopened this Jul 26, 2018
@habren
Copy link
Author

habren commented Jul 30, 2018

Hi @maropu and @viirya Do you agree with the basic idea that we should take column pruning in to consideration during splitting the input files?

@habren
Copy link
Author

habren commented Aug 8, 2018

Hi @maropu and @viirya Do you agree with the basic idea that we should take column pruning in to consideration during splitting the input files?

@maropu
Copy link
Member

maropu commented Aug 9, 2018

Is this a parquet-specific issue? e.g., how about ORC?

@habren
Copy link
Author

habren commented Aug 9, 2018

@maropu Thanks for your comments. ORC can also benefit from this change since ORC is also columnar file format. Do you think I should add ORC support by change the below line

if(fsRelation.fileFormat.isInstanceOf[ParquetSource])

to
if(fsRelation.fileFormat.isInstanceOf[ParquetSource] || fsRelation.fileFormat.isInstanceOf[OrcFileFormat])

@HyukjinKwon
Copy link
Member

??? why does this still target branch-2.3? is this a backport?

@habren
Copy link
Author

habren commented Aug 10, 2018

@HyukjinKwon Thanks for your comments. I will submit it to master soon

@habren habren changed the base branch from branch-2.3 to master August 16, 2018 01:29
@habren
Copy link
Author

habren commented Aug 16, 2018

Hi @HyukjinKwon I moved the change to master branch just now. Please help to review

val selectedColumnSize = requiredSchema.map(_.dataType).map(getTypeLength(_))
.reduceOption(_ + _).getOrElse(StringType.defaultSize)
val totalColumnSize = relation.dataSchema.map(_.dataType).map(getTypeLength(_))
.reduceOption(_ + _).getOrElse(StringType.defaultSize)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type based estimation is very rough. This is still hard for end users to decide the initial size.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile The target of this change is not making users easy to set the partition size. Instead, when user set the partition size, this change will try its best to make sure the read size is close to the value that set by user. Without this change, when user set partition size to 128MB, the actual read size may be 1MB or even smaller because of column pruning.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think his point is that the estimation is super rough which I agree with .. I am less sure if we should go ahead or not partially by this reason as well.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon I agree that the estimation is rough especially for complex type. For AtomicType, it works better. And at least it take column pruning into consideration.

@@ -25,17 +25,16 @@ import java.util.zip.Deflater
import scala.collection.JavaConverters._
import scala.collection.immutable
import scala.util.matching.Regex

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't remove these blank lines. Can you revert it back?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya Ok, I added it back

val PARQUET_ARRAY_LENGTH = buildConf("spark.sql.parquet.array.length")
.doc("Set the default size of array column")
.intConf
.createWithDefault(StringType.defaultSize)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feature includes so many configs, my concern is it is hard for end users to set them.

val PARQUET_STRUCT_LENGTH = buildConf("spark.sql.parquet.struct.length")
.doc("Set the default size of struct column")
.intConf
.createWithDefault(StringType.defaultSize)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And these configs assume that different storage formats use the same size?

@HyukjinKwon
Copy link
Member

@habren, BTW, just for clarification, you can set the bigger number to spark.sql.files.maxPartitionBytes explicitly and that resolved your issue. This one is to handle it dynamically, right?

@@ -459,6 +460,29 @@ object SQLConf {
.intConf
.createWithDefault(4096)

val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = buildConf("spark.sql.parquet.adaptiveFileSplit")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This configuration doesn't look specific to parquet anymore.

Copy link
Author

@habren habren Aug 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon I changed the configuration name just now

@@ -459,6 +460,29 @@ object SQLConf {
.intConf
.createWithDefault(4096)

val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = buildConf("spark.sql.parquet.adaptiveFileSplit")
.doc("For columnar file format (e.g., Parquet), it's possible that only few (not all) " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's I would avoid abbreviation.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated it accordingly

val IS_PARQUET_PARTITION_ADAPTIVE_ENABLED = buildConf("spark.sql.parquet.adaptiveFileSplit")
.doc("For columnar file format (e.g., Parquet), it's possible that only few (not all) " +
"columns are needed. So, it's better to make sure that the total size of the selected " +
"columns is about 128 MB "
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds not describing what the configuration does actually.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the description just now. Please help to review it again. Thanks a lot @HyukjinKwon

.intConf
.createWithDefault(StringType.defaultSize)

val PARQUET_MAP_LENGTH = buildConf("spark.sql.parquet.map.length")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't do this. This makes more complicated and I would just set a bigger number for maxPartitionBytes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was thinking that.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon @viirya Setting spark.sql.files.maxPartitionBytes explicitly do works. For you or other advanced users, it's convenient to set a bigger number of maxPartitionBytes.

But for ad-hoc query, the selected columns are different for different queries, and it's not convenient or event impossible for users to set different maxPartitionBytes for different queries.

And for general user (non advanced user), it's not easy for them to calculate a proper value of maxPartitionBytes.

You know, in many big company, there may be one or few teams are familiar with the details of Spark, and they maintain the Spark cluster. Other teams are general users of Spark and they care more about their business, such as data warehouse build up and recommendation algorithm. This feature try to handle it dynamically even the users are not familiar with Spark.

@habren
Copy link
Author

habren commented Aug 17, 2018

@HyukjinKwon Yes this is to handle it dynamically.
For ad-hoc query, the selected columns are different for different queries, and it's not convenient or event impossible for users to set different maxPartitionBytes for different queries.
And for general user (non advanced user), it's not easy for them to set a proper value of maxPartitionBytes.
So, this change make it easier

BTW, just for clarification, you can set the bigger number to spark.sql.files.maxPartitionBytes explicitly and that resolved your issue. This one is to handle it dynamically, right?

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@HyukjinKwon
Copy link
Member

I think we should fix this. Basically the dynamic estimation logic is too flaky, and I think we need this for the current status. Let's don't add it for now.

While I am revisiting old PRs, I am trying to suggest to close PRs that look not likely to be merged. Let me suggest to close this for now but please feel free to recreate a PR if you strongly this is needed in Spark. No objection.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants