Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18934][SQL] Writing to dynamic partitions does not preserve sort order if spills occur #16347

Closed

Conversation

junegunn
Copy link

What changes were proposed in this pull request?

Make dynamic partition writer perform stable sort by the partition key, so that the sort order within the partition specified via sortWithinPartitions or SORT BY is preserved even when spill occurs.

How was this patch tested?

Manually tested with the following code snippet and orcdump.

// FileFormatWriter
sc.parallelize(1 to 10000000).toDS.withColumn("part", 'value.mod(2))
  .repartition(1, 'part).sortWithinPartitions("value")
  .write.mode("overwrite").format("orc").partitionBy("part")
  .saveAsTable("test_sort_within")

spark.read.table("test_sort_within").filter('part === 0).show
spark.read.table("test_sort_within").filter('part === 1).show

// SparkHiveDynamicPartitionWriterContainer
//   Insert into an existing Hive table with dynamic partitions
//     CREATE TABLE TEST_SORT_WITHIN (VALUE INT) PARTITIONED BY (PART INT) STORED AS ORC
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
sc.parallelize(1 to 10000000).toDS.withColumn("part", 'value.mod(2))
  .repartition(1, 'part).sortWithinPartitions("value")
  .write.mode("overwrite").insertInto("test_sort_within_hive")

spark.read.table("test_sort_within_hive").filter('part === 0).show
spark.read.table("test_sort_within_hive").filter('part === 1).show

It was not straightforward to come up with a unit test as the problem is only reproducible if spill occurs due to memory constraint. I'd appreciate any suggestions or pointers.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@rxin
Copy link
Contributor

rxin commented Dec 20, 2016

Thanks for submitting the ticket. In general I don't think the sortWithinPartitions property can carry over to writing out data, because one partition actually corresponds to more than one file.

Can your use case be satisfied by adding an explicit sortBy?

df.write.sortBy(col).parquet(...)

@junegunn
Copy link
Author

junegunn commented Dec 20, 2016

Thanks for the comment. I was trying to implement the following Hive QL in Spark SQL/API:

set hive.exec.dynamic.partition.mode=nonstrict;
set hive.mapred.mode = nonstrict;

insert overwrite table target_table
partition (day)
select * from source_table
distribute by day sort by id;

In Hive, distribute by day ensures that the records with the same "day" goes to the same reducer, and sort by id ensures that the input to each reducer is sorted by "id". It works as expected. The number of reducers is no more than the cardinality of "day" column, and I could confirm that the generated ORC file in each partition is sorted by "id".

However, if I run the same query or its equivalent Spark code – repartition('day) for distribute by day, and sortWithinPartitions('id) for sort by id – on Spark, we have the right number of writer tasks, one for each partition, and each task generates a single output file, but the generated ORC file is not properly sorted by "id" making ORC index ineffective.

Can your use case be satisfied by adding an explicit sortBy?

sortBy is for bucketed tables and requires bucketBy, so it's not exactly what I'm looking for. Anyway, this issue is more about Hive compatibility.

@chpritchard-expedia
Copy link

chpritchard-expedia commented Jan 5, 2017

@junegunn I ran into the same issue, using partitionBy; missed it completely during my testing. Do you have any workarounds for stock Spark 2.x? I'm at a loss, and currently I can't apply the patch to the cluster, and bucketing seems -mostly- unavailable in S3, as I could do bucketing and move the output files.

@junegunn
Copy link
Author

junegunn commented Jan 5, 2017

@chpritchard-expedia The patch here fixes the problem. I don't think it's possible to workaround the issue by using Spark API in some different ways, because we can't completely avoid memory spills at the writers. Hive doesn't have the problem, so maybe you can consider running the same statement on Hive if this is not something Spark wants to address.

Anyway, for anyone who's interested, I could confirm that for a sorted ORC table built with this patch, point/range lookups on the sorted column can be several times faster. Also the final size of the table turned out to be significantly smaller in this case (60% of improperly sorted table) due to the temporal locality in our data.

@rxin
Copy link
Contributor

rxin commented Jan 5, 2017

Maybe we should make DataFrameWriter.sortBy work here.

@chpritchard-expedia
Copy link

@rxin - sortBy is somewhat tied in with bucketing, which is also a little difficult to work with. First, bucketing often relies on a column being present, whereas in Hive (and with repartition), I may use a formula, to split the data into appropriate buckets that are evenly distributed.

Overall is not well supported throughout the ecosystem. Even with all of that, Spark doesn't particularly support semantics to say that a data set is already sorted.

In Hive, I've had to do a lot of PARTITION BY(datefield, bucket) CLUSTERED BY (key) INTO 1 SORTED BY (key) INTO 1 BUCKETS. That gets us stable totally sorted files, for GUIDs.

In the case of Spark, this issue of partitionBy destroying sorting is a painful bug. I'm now using some very large data sets, searching for keys, and instead of returning data in a few seconds (thanks to predicate pushdown with Parquet), it has to scan the entire files.

@rxin
Copy link
Contributor

rxin commented Jan 5, 2017

What I was suggesting was to allow sort by without bucketing.

@chpritchard-expedia
Copy link

@rxin - Oh, yes that'd be fantastic, partitionBy.sortBy is just about all I need to survive in this crazy world. In the meantime, I think there ought to be a big warning label on partitionBy in the Spark docs.

@junegunn junegunn force-pushed the dynamic-partition-writer-stable-sort branch from bfeccd8 to 3e2bb08 Compare January 20, 2017 05:48
@junegunn
Copy link
Author

junegunn commented Jan 20, 2017

Rebased to current master. The patch is simpler thanks to the refactoring made in SPARK-18243.

Anyway, I can understand your rationale for wanting to have explicit API on the writer side, but then make sure that the sort specification from sortWithinPartitions is automatically propagated to the writer, or the method is no longer compatible to SORT BY in Hive and the documentation should be corrected accordingly. Care should be taken for INSERT OVERWRITE TABLE ... DISTRIBUTE BY ... SORT BY ... statement in Spark SQL so that it's compatible to the same Hive QL.

@Downchuck
Copy link

Is there anyone on the Spark team taking this up? This bug is painful; it's saddened a hundred TB of data I stacked up, and I'm really trying to avoid more manual work. "INSERT OVERWRITE TABLE ... DISTRIBUTE BY ... SORT BY" is how I live my life these days.

@cloud-fan
Copy link
Contributor

cloud-fan commented Apr 10, 2017

is this still a problem after #16898 ?

@junegunn junegunn force-pushed the dynamic-partition-writer-stable-sort branch 2 times, most recently from cd9c7f3 to 3e2bb08 Compare April 11, 2017 07:36
@junegunn
Copy link
Author

@cloud-fan Unfortunately, yes.

sc.parallelize(1 to 10000000).toDS.withColumn("part", 'value.mod(2))
  .repartition(1, 'part).sortWithinPartitions("value")
  .write.mode("overwrite").format("orc").partitionBy("part")
  .saveAsTable("test_sort_within")

For the above case, requiredOrdering is part and actualOrdering is value, so SortExec runs anyway and the ordering within the partition is not respected if spill occurs.

However, we now have a workaround; prepend partition columns to sortWithinPartitions call or SORT BY clause, i.e. .sortWithinPartitions("part", "value"), to bypass SortExec.

@cloud-fan
Copy link
Contributor

cloud-fan commented Apr 11, 2017

@junegunn I think it's not a problem, df.write.xxx is not guaranteed to retain the ordering of df when writing data output.

Currently the DataFrameWriter only provide one interface to specify the ordering of the output: bucketedBy and sortBy

@junegunn
Copy link
Author

junegunn commented Apr 11, 2017

@cloud-fan It's not a problem in the context of DataFrame API. But when it comes to Spark SQL, it makes Spark SQL incompatible to equivalent HiveQL in a subtle way. And at least we may need to revisit the documentation that gives wrong impression that sortWithinPartitions and SORT BY of HiveQL are equivalent.

https://github.com/apache/spark/blob/v2.1.0/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L990

@cloud-fan
Copy link
Contributor

is this still needed after #16898 ?

@junegunn
Copy link
Author

See my answer above.

@cloud-fan
Copy link
Contributor

@junegunn Can you check the query plan of hive for INSERT OVERWRITE TABLE ... DISTRIBUTE BY ... SORT BY ...?

In Spark SQL, the query plan looks like

'InsertIntoTable  `table1`
+- 'Sort ['j ASC NULLS FIRST], false
   +- 'RepartitionByExpression ['i], 200
      +- table

The sort happens before the insertion, this explains why Spark doesn't preserve the sort order. If in hive the sort is included in insertion, we should follow that.

In the meanwhile, the behavior of DataFrameWriter looks reasonable(need to fix the document of sortWithinPartitions ), but we should add new API to allow users specify sort ordering during writing, to be consistent with SQL API.

@HyukjinKwon
Copy link
Member

gentle ping @junegunn on ^.

@junegunn
Copy link
Author

Hive makes sure that the output file is properly sorted by the column specified in SORT BY clause by having only one reduce task (output) for each partition.

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: __________________
            Statistics: Num rows: 183663543 Data size: 313697356092 Basic stats: COMPLETE Column stats: PARTIAL
            Select Operator
              expressions: __ (type: bigint), ________ (type: string), ___ (type: string), _________ (type: string), _______________ (type: string), _______ (type: string), _____________ (type: string), ________ (type: string), __ (type: string), _________ (type: string), ________ (type: string), _______________ (type: string), _____________ (type: string), _____________ (type: string), ____________ (type: string), __________ (type: string), _____ (type: string), __________________ (type: string), ___ (type: string)
              outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16, _col17, _col18
              Statistics: Num rows: 183663543 Data size: 313697356092 Basic stats: COMPLETE Column stats: PARTIAL
              Reduce Output Operator
                key expressions: _col0 (type: bigint)
                sort order: +
                Map-reduce partition columns: _col18 (type: string)
                Statistics: Num rows: 183663543 Data size: 313697356092 Basic stats: COMPLETE Column stats: PARTIAL
                value expressions: _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: string), _col5 (type: string), _col6 (type: string), _col7 (type: string), _col8 (type: string), _col9 (type: string), _col10 (type: string), _col11 (type: string), _col12 (type: string), _col13 (type: string), _col14 (type: string), _col15 (type: string), _col16 (type: string), _col17 (type: string), _col18 (type: string)
      Execution mode: vectorized
      Reduce Operator Tree:
        Select Operator
          expressions: KEY.reducesinkkey0 (type: bigint), VALUE._col0 (type: string), VALUE._col1 (type: string), VALUE._col2 (type: string), VALUE._col3 (type: string), VALUE._col4 (type: string), VALUE._col5 (type: string), VALUE._col6 (type: string), VALUE._col7 (type: string), VALUE._col8 (type: string), VALUE._col9 (type: string), VALUE._col10 (type: string), VALUE._col11 (type: string), VALUE._col12 (type: string), VALUE._col13 (type: string), VALUE._col14 (type: string), VALUE._col15 (type: string), VALUE._col16 (type: string), VALUE._col17 (type: string)
          outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16, _col17, _col18
          Statistics: Num rows: 183663543 Data size: 33794091912 Basic stats: COMPLETE Column stats: PARTIAL
          File Output Operator
            compressed: false
            Statistics: Num rows: 183663543 Data size: 33794091912 Basic stats: COMPLETE Column stats: PARTIAL
            table:
                input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
                output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
                serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
                name: _______________.________________

The later stage simply moves the files to the corresponding directories.

Since the patch no longer merges and I think I have made my point, I'm closing this.

@junegunn junegunn closed this Jun 19, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants