Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-8890][SQL] Fallback on sorting when writing many dynamic partitions #8010

Closed
wants to merge 9 commits into from

Conversation

marmbrus
Copy link
Contributor

@marmbrus marmbrus commented Aug 6, 2015

Previously, we would open a new file for each new dynamic written out using HadoopFsRelation. For formats like parquet this is very costly due to the buffers required to get good compression. In this PR I refactor the code allowing us to fall back on an external sort when many partitions are seen. As such each task will open no more than spark.sql.sources.maxFiles files. I also did the following cleanup:

  • Instead of keying the file HashMap on an expensive to compute string representation of the partition, we now use a fairly cheap UnsafeProjection that avoids heap allocations.
  • The control flow for instantiating and invoking a writer container has been simplified. Now instead of switching in two places based on the use of partitioning, the specific writer container must implement a single method writeRows that is invoked using runJob.
  • InternalOutputWriter has been removed. Instead we have a private[sql] method writeInternal that converts and calls the public method. This method can be overridden by internal datasources to avoid the conversion. This change remove a lot of code duplication and per-row asInstanceOf checks.
  • commands.scala has been split up.

@SparkQA
Copy link

SparkQA commented Aug 6, 2015

Test build #40089 has finished for PR 8010 at commit 71cc717.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • s"Using output committer class $
    • logInfo(s"Using user defined output committer class $
    • s"Using output committer class $


currentWriter.writeInternal(sortedIterator.getValue)
}
currentWriter.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is an exception happen before this line, currentWriter will not be closed.

@SparkQA
Copy link

SparkQA commented Aug 6, 2015

Test build #40104 has finished for PR 8010 at commit 7e2d0a4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 7, 2015

Test build #40117 has finished for PR 8010 at commit f5675bd.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val PARTITION_COLUMN_TYPE_INFERENCE =
booleanConf("spark.sql.sources.partitionColumnTypeInference.enabled",
defaultValue = Some(true),
doc = "When true, automatically infer the data types for partitioned columns.")

val PARTITION_MAX_FILES =
intConf("spark.sql.sources.maxFiles",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sources.maxConcurrentWrites ?

@liancheng
Copy link
Contributor

LGTM except for some minor issues.

@SparkQA
Copy link

SparkQA commented Aug 7, 2015

Test build #40183 has finished for PR 8010 at commit 40f0372.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 7, 2015

Test build #40189 has finished for PR 8010 at commit 17b690e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 7, 2015

Test build #40193 has finished for PR 8010 at commit 00804fe.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

asfgit pushed a commit that referenced this pull request Aug 7, 2015
…itions

Previously, we would open a new file for each new dynamic written out using `HadoopFsRelation`.  For formats like parquet this is very costly due to the buffers required to get good compression.  In this PR I refactor the code allowing us to fall back on an external sort when many partitions are seen.  As such each task will open no more than `spark.sql.sources.maxFiles` files.  I also did the following cleanup:

 - Instead of keying the file HashMap on an expensive to compute string representation of the partition, we now use a fairly cheap UnsafeProjection that avoids heap allocations.
 - The control flow for instantiating and invoking a writer container has been simplified.  Now instead of switching in two places based on the use of partitioning, the specific writer container must implement a single method `writeRows` that is invoked using `runJob`.
 - `InternalOutputWriter` has been removed.  Instead we have a `private[sql]` method `writeInternal` that converts and calls the public method.  This method can be overridden by internal datasources to avoid the conversion.  This change remove a lot of code duplication and per-row `asInstanceOf` checks.
 - `commands.scala` has been split up.

Author: Michael Armbrust <michael@databricks.com>

Closes #8010 from marmbrus/fsWriting and squashes the following commits:

00804fe [Michael Armbrust] use shuffleMemoryManager.pageSizeBytes
775cc49 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into fsWriting
17b690e [Michael Armbrust] remove comment
40f0372 [Michael Armbrust] address comments
f5675bd [Michael Armbrust] char -> string
7e2d0a4 [Michael Armbrust] make sure we close current writer
8100100 [Michael Armbrust] delete empty commands.scala
71cc717 [Michael Armbrust] update comment
8ec75ac [Michael Armbrust] [SPARK-8890][SQL] Fallback on sorting when writing many dynamic partitions

(cherry picked from commit 49702bd)
Signed-off-by: Michael Armbrust <michael@databricks.com>
@marmbrus
Copy link
Contributor Author

marmbrus commented Aug 7, 2015

Thanks for reviewing. Merged to master and 1.5

|DataFrame schema: ${df.schema}
|Relation schema: ${relation.schema}
""".stripMargin)
val partitionColumnsInSpec = relation.partitionColumns.fieldNames
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems partitionColumnsInSpec and partitionColumns point to the same thing?
I think you mean the partition in InsertIntoTable? If so, it's already checked by PreWriteCheck.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, thats possible. I just copied this from the earlier code and moved it to a better place. If you want to add an analysis test to make sure this error works and then can still remove this code that would be great!

CodingCat pushed a commit to CodingCat/spark that referenced this pull request Aug 17, 2015
…itions

Previously, we would open a new file for each new dynamic written out using `HadoopFsRelation`.  For formats like parquet this is very costly due to the buffers required to get good compression.  In this PR I refactor the code allowing us to fall back on an external sort when many partitions are seen.  As such each task will open no more than `spark.sql.sources.maxFiles` files.  I also did the following cleanup:

 - Instead of keying the file HashMap on an expensive to compute string representation of the partition, we now use a fairly cheap UnsafeProjection that avoids heap allocations.
 - The control flow for instantiating and invoking a writer container has been simplified.  Now instead of switching in two places based on the use of partitioning, the specific writer container must implement a single method `writeRows` that is invoked using `runJob`.
 - `InternalOutputWriter` has been removed.  Instead we have a `private[sql]` method `writeInternal` that converts and calls the public method.  This method can be overridden by internal datasources to avoid the conversion.  This change remove a lot of code duplication and per-row `asInstanceOf` checks.
 - `commands.scala` has been split up.

Author: Michael Armbrust <michael@databricks.com>

Closes apache#8010 from marmbrus/fsWriting and squashes the following commits:

00804fe [Michael Armbrust] use shuffleMemoryManager.pageSizeBytes
775cc49 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into fsWriting
17b690e [Michael Armbrust] remove comment
40f0372 [Michael Armbrust] address comments
f5675bd [Michael Armbrust] char -> string
7e2d0a4 [Michael Armbrust] make sure we close current writer
8100100 [Michael Armbrust] delete empty commands.scala
71cc717 [Michael Armbrust] update comment
8ec75ac [Michael Armbrust] [SPARK-8890][SQL] Fallback on sorting when writing many dynamic partitions
@marmbrus marmbrus deleted the fsWriting branch March 8, 2016 00:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants