Skip to content

[SPARK-38444][SQL]Automatically calculate the upper and lower bounds of partitions when no specified partition related params#35764

Closed
caican00 wants to merge 7 commits intoapache:masterfrom
caican00:optimize-jdbc-scan
Closed

[SPARK-38444][SQL]Automatically calculate the upper and lower bounds of partitions when no specified partition related params#35764
caican00 wants to merge 7 commits intoapache:masterfrom
caican00:optimize-jdbc-scan

Conversation

@caican00
Copy link
Contributor

@caican00 caican00 commented Mar 8, 2022

What changes were proposed in this pull request?

when access rdbms, such as mysql, this patch can automatically calculate upper and lower bounds according to the primary key to improve parallelism and speed up query.

Why are the changes needed?

when access rdbms, such as mysql, if partitionColumn, lowerBound, upperBound, numPartitions are not specified, by default only one partition to scan database is working.

It makes load data from database slow and makes it difficult for users to configure multiple parameters to improve parallelism.

This patch can automatically calculate upper and lower bounds according to the primary key to improve parallelism and speed up query.

Does this PR introduce any user-facing change?

yes. new config defaultNumPartitions in JDBCOptions. It is used to set the default parallelism.

How was this patch tested?

new testing

…of partitions when no specified partition related params
@github-actions github-actions bot added the SQL label Mar 8, 2022
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@caican00
Copy link
Contributor Author

caican00 commented Mar 8, 2022

@MaxGekk HI, could you help to review this patch? thanks

@HyukjinKwon
Copy link
Member

cc @maropu too FYI who looked into these code paths.

@caican00
Copy link
Contributor Author

caican00 commented Mar 9, 2022

cc @cloud-fan

val numPartitions = parameters.get(JDBC_NUM_PARTITIONS).map(_.toInt)

// the default number of partitions
val defaultNumPartitions = parameters.getOrElse(DEFAULT_NUM_PARTITIONS, "10").toInt
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between JDBC_NUM_PARTITIONS?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here :)
Do we really need a default? If the users want to have multiple partitions, shouldn't they specify this explicitly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between JDBC_NUM_PARTITIONS?

@wangyum

  1. partitionColumn, lowerBound, upperBound and numPartitions must be specified together. If an unreasonable numPartitions was specified by users, such as 1, the parallelism is still very small.
  2. therefore, we(jdbc) should rezoning partition nums using another config, not JDBC_NUM_PARTITIONS(it is specified by users and its value maybe very small)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here :) Do we really need a default? If the users want to have multiple partitions, shouldn't they specify this explicitly?

@huaxingao

  1. if users want to change partition nums, they only need to specify DEFAULT_NUM_PARTITIONS explicitly and do not need to specify any more parameters, jdbc will automatically calculate the upper and lower bounds of partitions using primary key
  2. If users do not specify any parameters, then we(jdbc) need a default value to determine the base number of partitions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I replied below. I think if this is about default number of partitions, we can just set the default to JDBC_NUM_PARTITIONS flag.

* @return JDBCPartitioningInfo
*/
def getPartitionBound(
schema: StructType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 4 space indentation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, updated it.

sql(s"insert into h2.test.employee values($id, 'a')")
}
val df = sql("select id, name from h2.test.employee")
// default partition num is 15
Copy link
Contributor

@huaxingao huaxingao Mar 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean default partition num is 10?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my mistake. updated it, thanks

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Sep 26, 2022
@cloud-fan cloud-fan removed the Stale label Sep 26, 2022
@cloud-fan
Copy link
Contributor

@sadikovi can you take a look?

}

/**
* get the min and max value by the column
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Get the min and max values for the column.

resolver: Resolver,
timeZoneId: String,
jdbcOptions: JDBCOptions,
filters: Array[Filter] = Array.empty): JDBCPartitioningInfo = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we return Option[JDBCPartitioningInfo] instead?

val dataType = prk.dataType
var lBound: String = null
var uBound: String = null
val sql = s"select min(${prk.name}) as lBound, max(${prk.name}) as uBound " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain this logic in the javadoc for this method? Also, what happens if the table is empty?

statement.setQueryTimeout(jdbcOptions.queryTimeout)
val resultSet = statement.executeQuery()
while (resultSet.next()) {
lBound = resultSet.getString("lBound")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it work for primary keys that are integers or timestamps?

uBound = resultSet.getString("uBound")
}
} catch {
case _: SQLException =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it is worth at least logging the exception but I would consider re-throwing it.

filters.map(filter => filter.references.distinct.map(r => filterColumns.add(r)))
// primary keys used for partitioning
val prks = schema.fields.filter(
f => f.metadata.getBoolean("isIndexKey") &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the code handle composite primary keys or any multi-column indexes, e.g. with 2 or more columns?

val columnType =
dialect.getCatalystType(dataType, typeName, fieldSize, metadata).getOrElse(
getCatalystType(dataType, fieldSize, fieldScale, isSigned))
list.contains(columnName) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it the same as:

metadata.putBoolean("isIndexKey", list.contains(columnName))

Also, can we make list a set?

// the number of partitions
val numPartitions = parameters.get(JDBC_NUM_PARTITIONS).map(_.toInt)

// the default number of partitions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update this comment? It is unclear what default number of partition it is - is it for overall number of partitions in the RDD or is it specifically for primary keys in the table and pushed filters?

Copy link
Contributor

@sadikovi sadikovi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update the PR title and description to reflect the changes? I think we should just have a flag to enable/disable partitioning based on available primary keys.

val numPartitions = parameters.get(JDBC_NUM_PARTITIONS).map(_.toInt)

// the default number of partitions
val defaultNumPartitions = parameters.getOrElse(DEFAULT_NUM_PARTITIONS, "10").toInt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the name of the config is misleading, this is essentially the default value of numPartitions configs:

val numPartitions = parameters.get(JDBC_NUM_PARTITIONS).map(_.toInt).getOrElse(10)

@github-actions
Copy link

github-actions bot commented Jan 6, 2023

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jan 6, 2023
@github-actions github-actions bot closed this Jan 7, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants

Comments