This repository has been archived by the owner on Jul 7, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 6
/
clusters.py
219 lines (179 loc) · 8.22 KB
/
clusters.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
from azure_databricks_sdk_python.api import API
from azure_databricks_sdk_python.types.clusters import *
from cattr import structure, unstructure
from typing import List
class Clusters(API):
"""The Clusters API allows you to create, start, edit, list, terminate, and delete clusters.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
def list(self):
"""Return information about all pinned clusters, active clusters,
up to 70 of the most recently terminated all-purpose clusters in the past 30 days,
and up to 30 of the most recently terminated job clusters in the past 30 days.
Returns:
[ClusterInfo]: A list of clusters.
"""
endpoint = '/clusters/list'
res = self._get(endpoint)
return self._safe_handle(res, res.json().get('clusters', []), List[ClusterInfo])
def list_node_types(self):
"""Return a list of supported Spark node types.
These node types can be used to launch a cluster.
Returns:
[NodeType]: The list of available Spark node types.
"""
endpoint = '/clusters/list-node-types'
res = self._get(endpoint)
return self._safe_handle(res, res.json().get('node_types', []), List[NodeType])
def spark_versions(self):
"""Return the list of available runtime versions.
These versions can be used to launch a cluster.
Returns:
[SparkVersion]: All the available runtime versions.
"""
endpoint = '/clusters/spark-versions'
res = self._get(endpoint)
return self._safe_handle(res, res.json().get('versions', []), List[SparkVersion])
def get(self, cluster_id):
"""Retrieve the information for a cluster given its identifier.
Clusters can be described while they are running or up to 30 days after they are terminated.
Args:
cluster_id (str):The cluster about which to retrieve information. This field is required.
Returns:
ClusterInfo: Metadata about a cluster.
"""
endpoint = '/clusters/get'
data = {'cluster_id': cluster_id}
res = self._get(endpoint, data)
return self._safe_handle(res, res.json(), ClusterInfo)
def events(self, req: ClusterEventRequest, force: bool = False):
"""Retrieve a list of events about the activity of a cluster.
Args:
req (ClusterEventRequest): Cluster event request structure. This field is required.
force (bool): If false, it will check that req is a dict
then pass it as is, with no type validation.
Returns:
ClusterEventResponse: Cluster event request response structure.
"""
endpoint = '/clusters/events'
data = self._validate(req, ClusterEventRequest, not force)
res = self._post(endpoint, data)
return self._safe_handle(res, res.json(), ClusterEventResponse)
def pin(self, cluster_id):
"""Ensure that an all-purpose cluster configuration is retained
even after a cluster has been terminated for more than 30 days.
Pinning ensures that the cluster is always returned by the List API.
Pinning a cluster that is already pinned has no effect.
Args:
cluster_id (str):The cluster to pin. This field is required.
Returns:
ClusterId: in case of success or will raise an exception.
"""
endpoint = '/clusters/pin'
data = {'cluster_id': cluster_id}
res = self._post(endpoint, data)
return self._safe_handle(res, data, ClusterId)
def unpin(self, cluster_id):
"""Allows the cluster to eventually be removed from the list returned
by the List API. Unpinning a cluster that is not pinned has no effect.
Args:
cluster_id (str):The cluster to pin. This field is required.
Returns:
ClusterId: in case of success or will raise an exception.
"""
endpoint = '/clusters/unpin'
data = {'cluster_id': cluster_id}
res = self._post(endpoint, data)
return self._safe_handle(res, data, ClusterId)
def delete(self, cluster_id):
"""Terminate a cluster given its ID.
Args:
cluster_id (str): The cluster to be terminated.
This field is required.
Returns:
ClusterId: in case of success or will raise an exception.
"""
endpoint = '/clusters/delete'
data = {'cluster_id': cluster_id}
res = self._post(endpoint, data)
return self._safe_handle(res, data, ClusterId)
def permanent_delete(self, cluster_id):
"""Permanently delete a cluster.
Args:
cluster_id (str): The cluster to be permanently deleted.
This field is required.
Returns:
ClusterId: in case of success or will raise an exception.
"""
endpoint = '/clusters/permanent-delete'
data = {'cluster_id': cluster_id}
res = self._post(endpoint, data)
return self._safe_handle(res, data, ClusterId)
def resize(self, req: ClusterResizeRequest, force: bool = False):
"""Resize a cluster to have a desired number of workers.
The cluster must be in the RUNNING state.
Args:
req (ClusterResizeRequest): Cluster resize request structure. This field is required.
force (bool): If false, it will check that req is a dict
then pass it as is, with no type validation.
Returns:
ClusterId: in case of success or will raise an exception.
"""
endpoint = '/clusters/resize'
data = self._validate(req, ClusterResizeRequest, not force)
res = self._post(endpoint, data)
return self._safe_handle(res, ClusterId(cluster_id=data.get('cluster_id')))
def restart(self, cluster_id):
"""Restart a cluster given its ID.
The cluster must be in the RUNNING state.
Args:
cluster_id (str): The cluster to be started.
This field is required.
Returns:
ClusterId: in case of success or will raise an exception.
"""
endpoint = '/clusters/restart'
data = {'cluster_id': cluster_id}
res = self._post(endpoint, data)
return self._safe_handle(res, data, ClusterId)
def start(self, cluster_id):
"""Start a terminated cluster given its ID.
Args:
cluster_id (str): The cluster to be started.
This field is required.
Returns:
ClusterId: in case of success or will raise an exception.
"""
endpoint = '/clusters/start'
data = {'cluster_id': cluster_id}
res = self._post(endpoint, data)
return self._safe_handle(res, data, ClusterId)
def create(self, req: ClusterAttributes, force: bool = False):
"""Create a new Apache Spark cluster.
This method acquires new instances from the cloud provider if necessary.
Args:
req (ClusterAttributes): Common set of attributes set during cluster creation. This field is required.
force (bool): If false, it will check that req is a dict
then pass it as is, with no type validation.
Returns:
ClusterId: in case of success or will raise an exception.
"""
endpoint = '/clusters/create'
data = self._validate(req, ClusterAttributes, not force)
res = self._post(endpoint, data)
return self._safe_handle(res, res.json(), ClusterId)
def edit(self, req: ClusterAttributes, force: bool = False):
"""Edit the configuration of a cluster
to match the provided attributes and size.
Args
req (ClusterAttributes): Common set of attributes set during cluster creation. This field is required.
force (bool): If false, it will check that req is a dict
then pass it as is, with no type validation.
Returns:
ClusterId: in case of success or will raise an exception.
"""
endpoint = '/clusters/edit'
data = self._validate(req, ClusterAttributes, not force)
res = self._post(endpoint, data)
return self._safe_handle(res, ClusterId(data.get('cluster_id')))