Databricks ClusteState & Clsuter GET API#34071
Databricks ClusteState & Clsuter GET API#34071Seokyun-Ha wants to merge 26 commits intoapache:mainfrom
Conversation
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
|
a5c4beb to
77376fb
Compare
Lee-W
left a comment
There was a problem hiding this comment.
I think we might need some test cases as well
|
We are making mock testing code. After the task, we will let you know 😄 w/ @kyeonghoon-kim |
|
Hello, @Lee-W @hussein-awala, We've applied all your feedbacks! 😄 Also, we implemented mock testing code as well. Please, take a look 🙏 Thanks! |
* add TestClusterState * rename a_ to async_ * add get_cluster_state test * add GET_CLUSTER_RESPONSE * add async * add acitvate_cluster test * add TestClusterState * rename a_ to async_ * add get_cluster_state test * add GET_CLUSTER_RESPONSE * add async * add acitvate_cluster test * review * review * review * review
1e921cc to
1b30c4e
Compare
|
We checked some test are failed :( |
* fix lint * fix lint
* fix test * fix test * fix line too long
|
We checked our code works well using mock test and validation 😄 Please, take a look at it 🙏 |
|
cc: @alexott ? |
# Conflicts: # airflow/providers/databricks/hooks/databricks.py
|
Hello, @alexott . We wrote some code for get |
| api_called = False | ||
| time_start = time.time() | ||
|
|
||
| while True: |
There was a problem hiding this comment.
Do we want to add an option for users to trigger the API simply but not wait for it?
There was a problem hiding this comment.
I think just triggering the API is already implemented in start_cluster method on line 562
There was a problem hiding this comment.
If that's the case, should we consolidate them into one method? For me, I can not understand the logic difference by reading the names start_cluster and activate_cluster. But this's not a huge issue though.
There was a problem hiding this comment.
When start a Databricks Cluster, it tunrs on as PENDING state, so we cannot use the cluster until it's on ready state. That's the what start_cluster() method does.
Meanwhile, active_cluster() method guarantees the cluster is on Running State, so after the completed, we can utilize the cluster immediately.
Sometimes users want just starting the cluster, or want making the cluster into the usable status.
There was a problem hiding this comment.
Yep, I know it's a desired behavior. Just wondering if it would be possible for us to merge the method and toggle the waiting behavior through a parameter
There was a problem hiding this comment.
Yes, maybe. I now understand your suggestion. I think callingactive_cluster() method without polling and timeout is same as calling start_cluster(). So, if I change the code it looks like this
def start_cluster(self, json: dict, polling: int | None = None, timeout: int | None = None) -> None:
cluster_id = json["cluster_id"]
api_called = False
time_start = time.time()
while True:
run_state = self.get_cluster_state(cluster_id)
if run_state.is_running:
return
elif run_state.is_terminal:
if api_called:
raise AirflowException(
f"Cluster {cluster_id} start failed with '{run_state.state}' "
f"state: {run_state.state_message}"
)
# This part changed
# self.start_cluster(json)
self._do_api_call(START_CLUSTER_ENDPOINT, json)
api_called = True
# This part changed
if polling:
# wait for cluster to start
time.sleep(polling)
else:
return
elapsed_time = time.time() - time_start
if timeout and elapsed_time > timeout:
raise AirflowException(f"Cluster {cluster_id} start timed out after {timeout} seconds")Please, take a look and get some opinions 🙏
+ I think then we can apply this to restart_cluster() method too!
There was a problem hiding this comment.
I would prefer to have one method only, with default to return immediately, but allow to wait until start/rester when specifying options.
| async def a_get_run_page_url(self, run_id: int) -> str: | ||
| async def async_get_run_page_url(self, run_id: int) -> str: |
There was a problem hiding this comment.
this & other similar changes could be considered as breaking. There is quite heavy direct use of the hook. I suggest that we rename, but leave functions with original names calling the new names, but adding deprecation warnings to them.
There was a problem hiding this comment.
I agree, the best approach is by renaming it as you did, then create a new method deprecated method with the old name which call the new name method:
async def a_get_run_page_url(self, run_id: int) -> str:
warnings.warn(
"This method is deprecated. Please use `<path to the new method>` instead.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
return await async_get_run_page_url(run_id= run_id)Then we can remove it in the next major version
| """ | ||
| self._do_api_call(START_CLUSTER_ENDPOINT, json) | ||
|
|
||
| def activate_cluster(self, json: dict, polling: int, timeout: int | None = None) -> None: |
There was a problem hiding this comment.
Maybe call it start_cluster_and_wait ?
| api_called = False | ||
| time_start = time.time() | ||
|
|
||
| while True: |
There was a problem hiding this comment.
I would prefer to have one method only, with default to return immediately, but allow to wait until start/rester when specifying options.
|
Hello, guys, I and @kyeonghoon-kim discussed about this active_cluster and this PR. We noticed this PR has too many changes on DatabricksHook and other. So, we decided to separate this PR into smaller ones. Kind of changes
Thanks for your kind and sincere reviews 🙏 : We will close this PR after we make a new separated PR 😄 |
|
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions. |
closes: #19490
ClusterStateclassget_cluster()async_get_cluster()activate_cluster()a_get_...methods toasync_get_...RunState,ClusterStateon__init__()level.Organization: @bagelcode-data
Co-workers: @Seokyun-Ha, @bskim45, @kyeonghoon-kim
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in newsfragments.