New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-30244][SQL] Emit pre/post events for "Partition" methods in ExternalCatalogWithListener #27030
Conversation
@hvanhovell, Could you please review this pull request? |
ok to test |
* Event fired before a partition is created. | ||
*/ | ||
case class CreatePartitionPreEvent(database: String, name: String, | ||
parts: Seq[CatalogTablePartition]) extends PartitionEvent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indentation? You had better follow the existing style used in this file.
@@ -202,3 +203,62 @@ case class RenameFunctionEvent( | |||
name: String, | |||
newName: String) | |||
extends FunctionEvent | |||
|
|||
trait PartitionEvent extends DatabaseEvent { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since Partition
belongs to Table
, TableEvent
is better than DatabaseEvent
?
/** | ||
* Name of the table that was touched. | ||
*/ | ||
val name: String |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parts: Seq[CatalogTablePartition]
instead of val name: String
? If we extends TableEvent
, Table name is already inherited in TableEvent
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to use parts: Seq[TablePartitionSpec]
?
CatalogTablePartition
contains TablePartitionSpec
inside.
- Create and Alter partition have as parameter
Seq[CatalogTablePartition]
- Rename and Drop partition have as parameter
Seq[TablePartitionSpec]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. I was referring only a partial part of your code. In that case, we don't need anything here in PartitionEvent
.
parts: Seq[CatalogTablePartition]) extends PartitionEvent | ||
case class CreatePartitionPreEvent( | ||
database: String, | ||
name: String) extends PartitionEvent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ur? What I meant was we don't need to have the partition information at PartitionEvent
. We need partition information in this event, don't we?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the indentation, shall we put extends PartitionEvent
into a new line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean to create such a event:
case class CreatePartitionPreEvent(
database: String,
name: String,
parts: Seq[CatalogTablePartition]) extends PartitionEvent
but PartitionEvent will be empty ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we put
extends PartitionEvent
into a new line
yes, I'll put it into a new line.
Test build #116410 has finished for PR 27030 at commit
|
Test build #116418 has finished for PR 27030 at commit
|
Test build #116419 has finished for PR 27030 at commit
|
Test build #116464 has finished for PR 27030 at commit
|
Test build #116492 has finished for PR 27030 at commit
|
val dbDefinition = createDbDefinition() | ||
|
||
val storage = CatalogStorageFormat.empty.copy( | ||
locationUri = Option(uri1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this table storage pointing db path? Maybe, do you want the following?
val path1 = Files.createTempDirectory("db_")
val path2 = Files.createTempDirectory(path1, "tbl_")
val uri1 = preparePath(path1)
val uri2 = preparePath(path2)
// CREATE
val dbDefinition = createDbDefinition(uri1)
val storage = CatalogStorageFormat.empty.copy(
locationUri = Option(uri2))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, you are right
Test build #116570 has finished for PR 27030 at commit
|
catalog.createTable(tableDefinition, ignoreIfExists = false) | ||
checkEvents(CreateTablePreEvent("db5", "tbl1") :: CreateTableEvent("db5", "tbl1") :: Nil) | ||
|
||
catalog.createPartitions("db5", "tbl1", Seq(partition), ignoreIfExists = false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ur, is this a valid test case? What happens when we try to create CatalogTypes.emptyTablePartitionSpec
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree with you, CatalogTypes.emptyTablePartitionSpec
is not good option for unit test. I have changed it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Despite this, I think that empty partition creation should also produce the event.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it happen in the real world? Can you give me an SQL example to create the empty partition?
Despite this, I think that empty partition creation should also produce the event.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm, I cannot find an example, that's why I also think it isn't happening in the real world.
val newPartition = CatalogTablePartition(spec = Map("key1" -> "1", "key2" -> "2"), | ||
storage = CatalogStorageFormat.empty) | ||
|
||
catalog.renamePartitions("db5", "tbl1", Seq(partition.spec), Seq(newPartition.spec)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also sounds like empty spec
to key1=1 , key2=2
. Is this valid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now it looks like key=0
to key1=1, key2=2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you give me the real SQL statement for that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, I could not find the real SQL statement for that, that's why I changed this unit tests.
Test build #116661 has finished for PR 27030 at commit
|
Retest this please. |
Test build #116758 has finished for PR 27030 at commit
|
Please don't forget the above comment.
|
@dongjoon-hyun, sorry, I have added a comment |
Hi @dongjoon-hyun , Do you have still any suggestions or comments on this pull request? |
Sorry for being late, @ayudovin . I'll try to review again tonight. |
Retest this please. |
Test build #117321 has finished for PR 27030 at commit
|
|
||
catalog.createPartitions("db5", "tbl1", Seq(partition), ignoreIfExists = true) | ||
checkEvents(CreatePartitionPreEvent("db5", "tbl1", Seq(partition)) :: | ||
CreatePartitionEvent("db5", "tbl1", Seq(partition)) :: Nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ur, is this correct? We already created the same partition
at line 240 and we tried with ignoreIfExists=true
here. I guess CreatePartitionEvent
should not occur.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you, it looks incorrect, but we can not check if the partition has been created or not created in case of using ignoreIfExists=true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If then, this is not helpful in that case because we cannot distinguish the difference.
RenamePartitionEvent("db5", "tbl1", Seq(partition.spec), Seq(newPartition.spec)) :: | ||
Nil) | ||
|
||
// ALTER |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ALTER nothing
? In this case, I guess no event might be natural.
BTW, if there is no valid SQL query for this, I guess we can remove this test case.
intercept[AnalysisException] { | ||
catalog.dropPartitions("db5", "tbl2", Seq(newPartition.spec), | ||
ignoreIfNotExists = false, purge = true, retainData = false) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's always a good practice to check the error message. Sometime, AnalysisException
is changed unexpectedly.
Also, I'm worrying the situation where this implementation causes a sabotage on the event message pipeline. Sometimes, the changed number of partition can be huge in the production. Hi, @hvanhovell . How do you think about this PR? Could you give us some advice? |
Test build #117401 has finished for PR 27030 at commit
|
@ayudovin . The current status is that we are waiting the original author of this framework, @hvanhovell . We need to check whether this is not added intentionally or not. Also, #27030 (comment) is another concern no this approach because it can be misleading. |
I have got, It makes sense |
ok to test |
WDYT @hvanhovell? |
Test build #118893 has finished for PR 27030 at commit
|
Hi, @gatorsmile and @hvanhovell . |
@ayudovin @dongjoon-hyun @HyukjinKwon def calculateDynamicPartitions(
loadPath: Path,
partitionSpec: TablePartitionSpec,
numDP: Int): Array[TablePartitionSpec] = {
assert(numDP > 0)
try {
val fs = loadPath.getFileSystem(new Configuration())
fs.listStatus(loadPath).flatMap { fileStatus =>
val dirName = fileStatus.getPath.getName
if (dirName.contains("=")) {
val Array(partitionColumn, partitionValue) = dirName.split("=")
val newPartitionSpec = partitionSpec + (partitionColumn -> partitionValue)
if (numDP == 1) {
Array[TablePartitionSpec](newPartitionSpec)
} else {
calculateDynamicPartitions(fileStatus.getPath, newPartitionSpec, numDP - 1)
}
} else {
Array[TablePartitionSpec]()
}
}
} catch {
case ex: Exception => Array[TablePartitionSpec]()
}
}
} |
Can one of the admins verify this patch? |
would love a status update on this 😄 is it still possible for Spark 3.1.0? (current tag on the ticket) |
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
Add events to CREATE, DROP, RENAME and ALTER partitions in
ExternalCatalogWithListener
.Why are the changes needed?
This changes needs for adding hooks for partitions in
ExternalCatalogWithListener
.Does this PR introduce any user-facing change?
No
How was this patch tested?
This functionality is covered by unit tests.