Adding upsert functionality#4012
Adding upsert functionality#4012devinbost wants to merge 7 commits intoapache:masterfrom devinbost:master
Conversation
|
It looks like there were some failing tests that I didn't see locally, so I'll work on fixing those now. However, I'd still like some guidance on where to update documentation for this feature. |
jerrypeng
left a comment
There was a problem hiding this comment.
I would first like to thank @devinbost for actively participating in improving pulsar functions!
I have some concerns regarding the implementation/design of this upsert functionality in regards to increasing complexity of the function core code.
- Why does this need to be part of the REST API? Can't users create their own wrapper libraries to do this logic outside ? Something like:
if ( function exists) {
createFunction()
} else {
updateFunction()
}
Is this inconvenient enough for users to warrant an additional API endpoint for doing this?
We can even add this functionality into the pulsar-admin CLI.
- Even if we add a REST API endpoint for upserts, can't we also just use the above logic i.e. a combination of existing APIs to implement the functionality. Do we need to modify core class like FunctionRuntimeManager and FunctionMetaDataManager to do so? I would rather not unnecessarily increase the complexity of those class.
|
@jerrypeng Thank you for asking these questions. I will explain my reasoning behind these changes and then answer your questions. The GoalMy primary goal is to improve the continuous deployment process for Pulsar environments operating at-scale. In large projects where multiple teams are collaborating to develop functions, sinks, and sources, we need an automated deployment process that allows us to quickly and easily update a production Pulsar environment without downtime for cases where we may have hundreds or thousands of inter-dependent Pulsar functions, sinks, and sources that need to be deployed simultaneously. My hope is to allow Pulsar to handle more of the deployment logic to simplify the implementation of Pulsar in large/high-performance production environments. The ProblemBecause our pulsar-admin commands are dockerized to allow them to operate at scale, there is performance overhead with every pulsar-admin command that must be executed. Each pulsar-admin command takes approximately 3-5 seconds to execute. In a deployment with 300 Pulsar functions, if each pulsar-admin command must be executed in series (rather than in parallel), executing 300 pulsar-admin commands to update these objects takes 15-25 minutes. Because Pulsar is in a broken state while these commands are being executed, this deployment approach could result in a production Pulsar environment being down for 15-25 minutes, far beyond our SLA of 300 milliseconds of downtime. Furthermore, if we need to execute a get-status check before each update command (so that we can conditionally execute a create command instead), we double our wait time and significantly increase the complexity of the deployment code that we must maintain. The HopeIf we can offload more of the deployment logic to Pulsar, then we could easily parallelize the deployment process in a way that avoids the overhead and complexity associated with repeatedly executing pulsar admin commands and handling the text output. The Current ApproachThe way we have handled the deployment so far is to use SaltStack to read a YAML manifest file with component (function, sink, and source) metadata to generate pulsar-admin commands to construct the create statements for these components. Here is an obfuscated example of this YAML: The IdeaIf we could pass a YAML file like this to Pulsar and have Pulsar ensure that its state matches our YAML file, it would make large-scale continuous deployments seamless. The Upsert functionality is one step in this direction, but it's not anywhere near as important as a bigger-picture solution to this problem. My team wants to build changes into Pulsar to simplify deployments, but we need architectural guidance about where to make these changes in Pulsar so that we don't violate architectural expectations. Answering your questionsRegarding your Question 1:My current understanding is that the Admin CLI creates REST calls that hit the Pulsar REST API. If my understanding of the behavior of the Admin CLI is incorrect, then Upsert would not need to be added to the REST API. Regarding your Question 2:I agree that it would be better to use the existing APIs and not modify FunctionRuntimeManager and FunctionMetaDataManager, especially because the current PR introduces a considerable amount of code duplication. However, I didn't want to create new classes to handle the new functionality without getting architectural guidance because I wasn't sure where to put the new classes. What are your thoughts? If we can solve the broader deployment problem with a YAML based approach, then we could create a robust solution that wouldn't need Upsert to be added to the REST API to simplify the deployment process. |
|
This relates to issue #4021 (Pulsar admin commands do not support parallelization.) |
|
@devinbost thanks sharing your use case and the hurdles you are trying to overcome! In regards to:
Is there a reason why you can't just submit/update functions via the REST endpoints instead of using the pulsar-admin CLI from docker containers? Submitting/Updating functions by just making a HTTP REST call will be a lot faster than start up a docker container every time to execute commands via command line
Do you have 300 individual functions or is there a function with 300 instances or a group of functions that total 300 instances? There will be a huge submission time difference depending on which scenario. Submitting one function with 300 instances will take much less time that submitting 300 functions with one instance each.
What do you mean by this? The cluster will be running as it should when submitting functions.
In a situation, that somehow your whole pulsar cluster is down and all your functions disappeared, it is unrealistic to expect the downtime to be less that 300 milliseconds. As you probably already know, starting up a pulsar cluster regardless of functions will take longer than that. If you are just talking about resubmitting 300 functions, I am not sure its realistic to expect all the JARs/Packages for 300 functions can be upload in 300 milliseconds. If you are trying to avoid a situation in which you suffer downtime because a catastrophic event happened to your cluster, i would recommend having redundancy. Have geo-replicated clusters across multiple regions. So you can seamlessly cut traffic from your downed cluster to another cluster. If you have 300 functions, I don't think its going to be the norm for you to need to update all 300 functions. Its more likely that its going to be a subset of that. I think functionality you are looking is bulk create, update, or upserts. You want to bring a cluster from a potentially unknown state into a known consistent state in regards to functions. I am I understanding you correctly? While we can add upserts and even bulk upserts. I would suggest you to try just creating/updating functions directly using the REST endpoint first to see if that is good enough. I would still very much like to see features like bulk create/update/upserts in Pulsar functions. I do believe we can accomplish them by just adding/modifying the "front end" code i.e. the REST the endpoints and ComponentImpl.java to implement the bulk actions. Please reference the code in registerFunction and updateFunction and when can probably just run that in a loop for bulk actions. In regards, to this PR and implementing upserts, I think you can just do something like the following in ComponentImpl.java The caveat in the above logic is that a function can be deleted after "containsFunction" is called. To handle that scenario I would suggest you looking at how the updateFunction code works and basically copy that code and modify it to also allow functions that don't exist to also proceed in the logic. |
|
@jerrypeng Thank you for your very detailed response. I appreciate your time and attention to this matter. Regarding:
I appreciate your guidance. Based on advice from @merlimat earlier today, I am currently working on an implementation using the REST endpoints. Regarding:
At the current moment, all of our functions are individual because they represent different use cases. However, we appreciate your advice about the performance improvement that we will get from deploying function instances, so we will examine ways that we can refactor to obtain those benefits. Regarding:
I may have been unintentionally misleading, and I apologize for that. Please let me clarify. When I said:
I didn't mean that the Pulsar cluster is not running. What I meant is that our end-to-end production message pipelines will be in a broken state. (i.e. Our customers will experience problems.) Regarding:
That is exactly right. Regarding:
Thank you also for the guidance and example change to ComponentImpl.java for the Upsert functionality for this PR. |
The purpose of the messaging system is to buffer up for these scenarios. Functions use a subscription associated that is used to retain the data when no consumers are connected. You have to plan the amount of disk space based on the time buffer you want to have in case of issues. During an update of the function there will be a quick restart of the process. If you want to minimize that time, you just need to have >1 instance per function. During the rolling upgrade there'll be always one function instance consuming from the topic. |
|
@jerrypeng I wrote a Java method to performance test the process of creating components with the Pulsar Java Admin API, and it's orders of magnitude faster. (I was running Pulsar standalone locally in a docker container, but it's still indicative of the speedup.) Here's what I got:
So, it definitely appears that we can use the REST API to implement bulk operations because that performance is acceptable. |
Motivation
Upsert functionality is intended to simplify continuous deployment processes. This simplification is especially desirable because there is no Python Admin API available that provides typed objects. Rather, users are required to build workflows with the Admin CLI, which returns strings that can be messy to handle. Upsert helps solve this complexity by allowing Pulsar to handle behavior according to whether the component has already been created or not.
Motivation (with future in mind)
Upsert is a dependency of BulkUpsert (to be added), which will greatly simplify seamless continuous deployments by allowing Pulsar to rapidly update Functions, Sinks, and Sources (and leverage parallelization). BulkUpsert will enable automated deployments to handle all but deletes on components that are no longer used. (For that, functionality will need to be added to allow the user to easily compare their expected component tree with the component tree in Pulsar to identify components that need to be deleted.)
Modifications
My contributions add Upsert functionality for Functions, Sinks, and Sources to the REST API and to the Admin CLI. Upsert conditionally combines the functionality of Create and Update operations. If the component already exists in Pulsar, Upsert updates the component; if the component doesn't already exist in Pulsar, Upsert creates the component.
Commits 29e2a66 and 7749419 were just to get the latest changes from the Pulsar repo.
My primary code is in commit 441ba03, and the unit tests are in commit 441ba03.
Verifying this change
This commit is largely covered already by unit tests for registration and updates for Functions, Sinks, and Sources. However, additional unit tests have been added for Upsert specifically. Tests were added to:
When running the build tests, I did notice these unrelated test failures:
1.
It appears that these test failures are unrelated to any changes that I made to this repo. However, I ran these tests before merging in commit 29e2a66. (Sorry if that's confusing. I was working in a different directory.)
This pull request affects:
Documentation