Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22626][SQL] It deals with wrong Hive's statistics (zero rowCount) #19831

Closed
wants to merge 4 commits into from
Closed

[SPARK-22626][SQL] It deals with wrong Hive's statistics (zero rowCount) #19831

wants to merge 4 commits into from

Conversation

wangyum
Copy link
Member

@wangyum wangyum commented Nov 28, 2017

What changes were proposed in this pull request?

This pr to ensure that the Hive's statistics totalSize (or rawDataSize) > 0, rowCount also must be > 0. Otherwise may cause OOM when CBO is enabled.

How was this patch tested?

unit tests

@wangyum wangyum changed the title [SPARK-22489][SQL] Wrong Hive table statistics may trigger OOM if enables join reorder in CBO [SPARK-22626][SQL] Wrong Hive table statistics may trigger OOM if enables join reorder in CBO Nov 28, 2017
@SparkQA
Copy link

SparkQA commented Nov 28, 2017

Test build #84255 has finished for PR 19831 at commit b16f88e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 28, 2017

Test build #84259 has finished for PR 19831 at commit 5c43b2a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Nov 28, 2017

cc @wzhfy

@@ -418,7 +418,7 @@ private[hive] class HiveClientImpl(
// Note that this statistics could be overridden by Spark's statistics if that's available.
val totalSize = properties.get(StatsSetupConst.TOTAL_SIZE).map(BigInt(_))
val rawDataSize = properties.get(StatsSetupConst.RAW_DATA_SIZE).map(BigInt(_))
val rowCount = properties.get(StatsSetupConst.ROW_COUNT).map(BigInt(_)).filter(_ >= 0)
val rowCount = properties.get(StatsSetupConst.ROW_COUNT).map(BigInt(_)).filter(_ > 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hive has a flag called StatsSetupConst.COLUMN_STATS_ACCURATE. If I remember correctly, this flag will become false if user changes table properties or table data. Can you check if the flag exists in your case? If so, we can use the flag to decide whether to read statistics from Hive.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The root problem is that user can set "wrong" table properties. So if we want to prevent using wrong stats, we need to detect changes in properties. Otherwise your case can't be avoided.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StatsSetupConst.COLUMN_STATS_ACCURATE to ensure that statistics have been updated, but can not be guaranteed to be correct:

cat <<EOF > data
1,1
2,2
3,3
4,4
5,5
EOF

hive -e "CREATE TABLE spark_22626(c1 int, c2 int) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';"

hive -e "LOAD DATA local inpath 'data' into table spark_22626;"

hive -e "INSERT INTO table spark_22626 values(6, 6);"

hive -e "desc extended spark_22626;"

The result is:

parameters:{totalSize=24, numRows=1, rawDataSize=3, COLUMN_STATS_ACCURATE={"BASIC_STATS":"true"}

numRows should be 6, but got 1.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this could be more clear:

val rowCount = properties.get(StatsSetupConst.ROW_COUNT).map(BigInt(_))
val stats =
  if (totalSize.isDefined && totalSize.get > 0L) {
    Some(CatalogStatistics(sizeInBytes = totalSize.get, rowCount = rowCount.filter(_ > 0)))
  } else if (rawDataSize.isDefined && rawDataSize.get > 0) {
    Some(CatalogStatistics(sizeInBytes = rawDataSize.get, rowCount = rowCount.filter(_ > 0)))
  } else {
    None
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the investigation. Seems hive can't protect its stats properties.

@wzhfy
Copy link
Contributor

wzhfy commented Nov 29, 2017

BTW, the case here is not about join reorder, it's actually about broadcast decision. Could you update the title of this PR?

@wzhfy
Copy link
Contributor

wzhfy commented Nov 29, 2017

Besides, if the size stats totalSize or rawDataSize is wrong, the problem exists whether CBO is enabled or not. We need to change that in the title too.

@wangyum
Copy link
Member Author

wangyum commented Nov 30, 2017

If CBO enabled, the outputRowCount == 0, the getOutputSize is 1, sizeInBytes is 1 and this side can broadcast:

def getOutputSize(
attributes: Seq[Attribute],
outputRowCount: BigInt,
attrStats: AttributeMap[ColumnStat] = AttributeMap(Nil)): BigInt = {
// We assign a generic overhead for a Row object, the actual overhead is different for different
// Row format.
val sizePerRow = 8 + attributes.map { attr =>
if (attrStats.contains(attr)) {
attr.dataType match {
case StringType =>
// UTF8String: base + offset + numBytes
attrStats(attr).avgLen + 8 + 4
case _ =>
attrStats(attr).avgLen
}
} else {
attr.dataType.defaultSize
}
}.sum
// Output size can't be zero, or sizeInBytes of BinaryNode will also be zero
// (simple computation of statistics returns product of children).
if (outputRowCount > 0) outputRowCount * sizePerRow else 1
}

def estimate: Option[Statistics] = {
if (childStats.rowCount.isEmpty) return None
// Estimate selectivity of this filter predicate, and update column stats if needed.
// For not-supported condition, set filter selectivity to a conservative estimate 100%
val filterSelectivity = calculateFilterSelectivity(plan.condition).getOrElse(BigDecimal(1))
val filteredRowCount: BigInt = ceil(BigDecimal(childStats.rowCount.get) * filterSelectivity)
val newColStats = if (filteredRowCount == 0) {
// The output is empty, we don't need to keep column stats.
AttributeMap[ColumnStat](Nil)
} else {
colStatsMap.outputColumnStats(rowsBeforeFilter = childStats.rowCount.get,
rowsAfterFilter = filteredRowCount)
}
val filteredSizeInBytes: BigInt = getOutputSize(plan.output, filteredRowCount, newColStats)
Some(childStats.copy(sizeInBytes = filteredSizeInBytes, rowCount = Some(filteredRowCount),
attributeStats = newColStats))
}

If CBO disabled, the sizeInBytes = (p.child.stats.sizeInBytes * outputRowSize) / childRowSize and this side cann't broadcast:

/**
* A default, commonly used estimation for unary nodes. We assume the input row number is the
* same as the output row number, and compute sizes based on the column types.
*/
private def visitUnaryNode(p: UnaryNode): Statistics = {
// There should be some overhead in Row object, the size should not be zero when there is
// no columns, this help to prevent divide-by-zero error.
val childRowSize = p.child.output.map(_.dataType.defaultSize).sum + 8
val outputRowSize = p.output.map(_.dataType.defaultSize).sum + 8
// Assume there will be the same number of rows as child has.
var sizeInBytes = (p.child.stats.sizeInBytes * outputRowSize) / childRowSize
if (sizeInBytes == 0) {
// sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero
// (product of children).
sizeInBytes = 1
}
// Don't propagate rowCount and attributeStats, since they are not estimated here.
Statistics(sizeInBytes = sizeInBytes, hints = p.child.stats.hints)
}

@wangyum wangyum changed the title [SPARK-22626][SQL] Wrong Hive table statistics may trigger OOM if enables join reorder in CBO [SPARK-22626][SQL] Wrong Hive table statistics may trigger OOM if enables CBO Nov 30, 2017
@wzhfy
Copy link
Contributor

wzhfy commented Dec 1, 2017

Besides, if the size stats totalSize or rawDataSize is wrong, the problem exists whether CBO is enabled or not.

If CBO enabled, the outputRowCount == 0, the getOutputSize is 1, sizeInBytes is 1 and this side can broadcast:
If CBO disabled, the sizeInBytes = (p.child.stats.sizeInBytes * outputRowSize) / childRowSize and this side cann't broadcast:

@wangyum totalSize or rawDataSize can also be wrong, right?

@wzhfy
Copy link
Contributor

wzhfy commented Dec 1, 2017

Since Hive doesn't detect user to set wrong stats properties, I think this solution can alleviate the problem. Besides, it's consistent with what we do for totalSize and rawDataSize (only use the stats when > 0).

@wangyum
Copy link
Member Author

wangyum commented Dec 1, 2017

Yes, I saw some of these tables in my cluster, but the user did not manually modify this parameter:

# Detailed Table Information		
Database	dw	
Table	prod	
Owner	bi	
Created Time	Tue Nov 03 16:33:52 CST 2015	
Last Access	Thu Jan 01 08:00:00 CST 1970	
Created By	Spark 2.2 or prior	
Type	EXTERNAL	
Provider	hive	
Comment	Product list	
Table Properties	[transient_lastDdlTime=1508260780, last_modified_time=1473154014, last_modified_by=bi]	
Statistics	26596461123 bytes, 0 rows	
Location	viewfs://cluster9/user/hive/warehouse/dw.db/prod	
Serde Library	org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe	
InputFormat	org.apache.hadoop.mapred.TextInputFormat	
OutputFormat	org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat	
Storage Properties	[serialization.format=1]	
Partition Provider	Catalog	
Time taken: 1.241 seconds, Fetched 70 row(s)

@wangyum
Copy link
Member Author

wangyum commented Dec 1, 2017

cc @gatorsmile @cloud-fan

@cloud-fan
Copy link
Contributor

Is it really an issue? If you manually set a wrong statistics, how would you expect the system to do? I think data source tables don't allow you set the statistics manually, so this problem is inherited from Hive. cc @wzhfy to confirm.

This PR treats 0 row count as invalid, which is arguable, i.e. if we analyze an empty table, and then the 0 row count is valid.

@wangyum
Copy link
Member Author

wangyum commented Dec 1, 2017

Instead of manually setting up table statistics, I'm just trying to simulate the statistics for these tables by this way.
If totalSize (or rawDataSize) > 0 and rowCount = 0, at least one parameter is incorrect, and should not be optimized based on these incorrect statistics.

@wzhfy
Copy link
Contributor

wzhfy commented Dec 2, 2017

@cloud-fan Yes, Spark doesn't allow user to set (Spark's) statistics manually.

This PR treats 0 row count of Hive's stats, it doesn't affect the logic for Spark's stats. Besides, Spark currently only uses Hive's totalSize and rawDataSize when they are > 0. This PR changes the behavior for rowCount to be consistent with that, so I think it's fine. But the title of the PR should be more specific, i.e. it deals with wrong Hive's statistics (zero rowCount).

@@ -1187,6 +1187,22 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
}
}
}

test("Wrong Hive table statistics may trigger OOM if enables join reorder in CBO") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO you can just test the read logic for Hive's stats properties in StatisticsSuite, instead of a end-to-end test case, developers may not know what's going on by this test case.

@wangyum wangyum changed the title [SPARK-22626][SQL] Wrong Hive table statistics may trigger OOM if enables CBO [SPARK-22626][SQL] t deals with wrong Hive's statistics (zero rowCount) Dec 3, 2017
@wangyum wangyum changed the title [SPARK-22626][SQL] t deals with wrong Hive's statistics (zero rowCount) [SPARK-22626][SQL] It deals with wrong Hive's statistics (zero rowCount) Dec 3, 2017
@SparkQA
Copy link

SparkQA commented Dec 3, 2017

Test build #84394 has finished for PR 19831 at commit 5b744e3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in dff440f Dec 3, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants