Skip to content
This repository has been archived by the owner on Jul 7, 2023. It is now read-only.

Commit

Permalink
Merge pull request #15 from aminekaabachi/feature/clusters
Browse files Browse the repository at this point in the history
New clusters api endpoints
  • Loading branch information
aminekaabachi committed Sep 30, 2020
2 parents 6dd972b + 270f40e commit 97d96d8
Show file tree
Hide file tree
Showing 14 changed files with 798 additions and 42 deletions.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -30,7 +30,7 @@ Please refer to the progress below:

| API | Progress |
| :--- | :---: |
| Clusters API | 0% |
| Clusters API | 80% |
| Clusters Policies API | 0% |
| DBFS API | 0% |
| Groups API | 0% |
Expand Down
52 changes: 52 additions & 0 deletions azure_databricks_sdk_python/api.py
@@ -1,6 +1,7 @@
import json
import requests
import urllib.parse
from cattr import structure, unstructure

from azure_databricks_sdk_python.types import AuthMethods

Expand Down Expand Up @@ -108,6 +109,57 @@ def _handle_error(self, res):
res.json().get('error_code'),
res.json().get('message')))

def _safe_handle(self, res, value, type=None):
"""Helper method to safely handle http response
Args:
res (Response): http response.
value (any): value to return.
Returns:
any: the returned object. Raise exception if code is not 200.
"""

if res.status_code == 200:
if type:
return structure(value, type)
else:
return value
else:
self._handle_error(res)

def _validate(self, req, type, validate=True):
"""Validates users input to be passed to api
Args:
req (object): user input.
type (object): the type to be validated against.
validate (bool): to validate or not the input against the type.
Raises:
ValueError: if validates=True, Raises in case the input is not type serializable.
ValueError: if validates=True,Raises in case the input is not a dict.
Returns:
dict: the input data in dict format.
"""
data = req

if validate:
print(type)
if not isinstance(req, type):
try:
data = structure(req, type)
except Exception as err:
raise ValueError(
'Request is a valid {0}: {1}'.format(type.__name__, err))
return unstructure(data)
else:
if not isinstance(req, dict):
raise ValueError(
'Request is not a dict. {0}: {1} passed instead.'.format(type(req), req))
return data


class APIWithPersonalAccessToken(APIWithAuth):
"""API composers for PersonalAccessToken auth"""
Expand Down
7 changes: 4 additions & 3 deletions azure_databricks_sdk_python/client.py
@@ -1,12 +1,13 @@
from azure_databricks_sdk_python.types import AuthMethods
from azure_databricks_sdk_python.tokens import Tokens
from azure_databricks_sdk_python.clusters import Clusters


# Current API version
API_VERSION = 2.0

class Composer:
""" Composer that aggregates API wrappers.
"""
""" Composer that aggregates API wrappers """
def compose(self, args):
"""composes self with API wrappers.
Expand All @@ -17,11 +18,11 @@ def compose(self, args):
Composer: return new composed object.
"""
self.tokens = Tokens(**args)
self.clusters = Clusters(**args)
return self


class BaseClient:

""" Base Class for API Clients """

def __init__(self, databricks_instance: str, composer: Composer, config={}):
Expand Down
219 changes: 219 additions & 0 deletions azure_databricks_sdk_python/clusters.py
@@ -0,0 +1,219 @@
from azure_databricks_sdk_python.api import API
from azure_databricks_sdk_python.types.clusters import *

from cattr import structure, unstructure
from typing import List


class Clusters(API):
"""The Clusters API allows you to create, start, edit, list, terminate, and delete clusters.
"""

def __init__(self, **kwargs):
super().__init__(**kwargs)

def list(self):
"""Return information about all pinned clusters, active clusters,
up to 70 of the most recently terminated all-purpose clusters in the past 30 days,
and up to 30 of the most recently terminated job clusters in the past 30 days.
Returns:
[ClusterInfo]: A list of clusters.
"""
endpoint = '/clusters/list'
res = self._get(endpoint)
return self._safe_handle(res, res.json().get('clusters'), List[ClusterInfo])

def list_node_types(self):
"""Return a list of supported Spark node types.
These node types can be used to launch a cluster.
Returns:
[NodeType]: The list of available Spark node types.
"""
endpoint = '/clusters/list-node-types'
res = self._get(endpoint)
return self._safe_handle(res, res.json().get('node_types'), List[NodeType])

def spark_versions(self):
"""Return the list of available runtime versions.
These versions can be used to launch a cluster.
Returns:
[SparkVersion]: All the available runtime versions.
"""
endpoint = '/clusters/spark-versions'
res = self._get(endpoint)
return self._safe_handle(res, res.json().get('versions'), List[SparkVersion])

def get(self, cluster_id):
"""Retrieve the information for a cluster given its identifier.
Clusters can be described while they are running or up to 30 days after they are terminated.
Args:
cluster_id (str):The cluster about which to retrieve information. This field is required.
Returns:
ClusterInfo: Metadata about a cluster.
"""
endpoint = '/clusters/get'
data = {'cluster_id': cluster_id}
res = self._get(endpoint, data)
return self._safe_handle(res, res.json(), ClusterInfo)

def events(self, req: ClusterEventRequest, force: bool = True):
"""Retrieve a list of events about the activity of a cluster.
Args:
req (ClusterEventRequest): Cluster event request structure. This field is required.
force (bool): If false, it will check that req is a dict
then pass it as is, with no type validation.
Returns:
ClusterEventResponse: Cluster event request response structure.
"""
endpoint = '/clusters/events'
data = self._validate(req, ClusterEventRequest, force)
res = self._post(endpoint, unstructure(data))
return self._safe_handle(res, res.json(), ClusterEventResponse)

def pin(self, cluster_id):
"""Ensure that an all-purpose cluster configuration is retained
even after a cluster has been terminated for more than 30 days.
Pinning ensures that the cluster is always returned by the List API.
Pinning a cluster that is already pinned has no effect.
Args:
cluster_id (str):The cluster to pin. This field is required.
Returns:
ClusterId: in case of success or will raise an exception.
"""
endpoint = '/clusters/pin'
data = {'cluster_id': cluster_id}
res = self._post(endpoint, data)
return self._safe_handle(res, data, ClusterId)

def unpin(self, cluster_id):
"""Allows the cluster to eventually be removed from the list returned
by the List API. Unpinning a cluster that is not pinned has no effect.
Args:
cluster_id (str):The cluster to pin. This field is required.
Returns:
ClusterId: in case of success or will raise an exception.
"""
endpoint = '/clusters/unpin'
data = {'cluster_id': cluster_id}
res = self._post(endpoint, data)
return self._safe_handle(res, data, ClusterId)

def delete(self, cluster_id):
"""Terminate a cluster given its ID.
Args:
cluster_id (str): The cluster to be terminated.
This field is required.
Returns:
ClusterId: in case of success or will raise an exception.
"""
endpoint = '/clusters/delete'
data = {'cluster_id': cluster_id}
res = self._post(endpoint, data)
return self._safe_handle(res, data, ClusterId)

def permanent_delete(self, cluster_id):
"""Permanently delete a cluster.
Args:
cluster_id (str): The cluster to be permanently deleted.
This field is required.
Returns:
ClusterId: in case of success or will raise an exception.
"""
endpoint = '/clusters/permanent-delete'
data = {'cluster_id': cluster_id}
res = self._post(endpoint, data)
return self._safe_handle(res, data, ClusterId)

def resize(self, req: ClusterResizeRequest, force: bool = True):
"""Resize a cluster to have a desired number of workers.
The cluster must be in the RUNNING state.
Args:
req (ClusterResizeRequest): Cluster resize request structure. This field is required.
force (bool): If false, it will check that req is a dict
then pass it as is, with no type validation.
Returns:
ClusterId: in case of success or will raise an exception.
"""
endpoint = '/clusters/resize'
data = self._validate(req, ClusterResizeRequest, force)
res = self._post(endpoint, unstructure(data))
return self._safe_handle(res, ClusterId(cluster_id=data.get('cluster_id')))

def restart(self, cluster_id):
"""Restart a cluster given its ID.
The cluster must be in the RUNNING state.
Args:
cluster_id (str): The cluster to be started.
This field is required.
Returns:
ClusterId: in case of success or will raise an exception.
"""
endpoint = '/clusters/restart'
data = {'cluster_id': cluster_id}
res = self._post(endpoint, data)
return self._safe_handle(res, data, ClusterId)

def start(self, cluster_id):
"""Start a terminated cluster given its ID.
Args:
cluster_id (str): The cluster to be started.
This field is required.
Returns:
ClusterId: in case of success or will raise an exception.
"""
endpoint = '/clusters/start'
data = {'cluster_id': cluster_id}
res = self._post(endpoint, data)
return self._safe_handle(res, data, ClusterId)

def create(self, req: ClusterAttributes, force: bool = True):
"""Create a new Apache Spark cluster.
This method acquires new instances from the cloud provider if necessary.
Args:
req (ClusterAttributes): Common set of attributes set during cluster creation. This field is required.
force (bool): If false, it will check that req is a dict
then pass it as is, with no type validation.
Returns:
ClusterId: in case of success or will raise an exception.
"""
endpoint = '/clusters/create'
data = self._validate(req, ClusterAttributes, force)
res = self._post(endpoint, unstructure(data))
return self._safe_handle(res, res.json(), ClusterId)

def edit(self, req: ClusterAttributes, force: bool = True):
"""Edit the configuration of a cluster
to match the provided attributes and size.
Args
req (ClusterAttributes): Common set of attributes set during cluster creation. This field is required.
force (bool): If false, it will check that req is a dict
then pass it as is, with no type validation.
Returns:
ClusterId: in case of success or will raise an exception.
"""
endpoint = '/clusters/edit'
data = self._validate(req, ClusterAttributes, force)
res = self._post(endpoint, unstructure(data))
return self._safe_handle(res, res.json(), ClusterId)
28 changes: 12 additions & 16 deletions azure_databricks_sdk_python/tokens.py
@@ -1,5 +1,8 @@
from azure_databricks_sdk_python.api import API
from azure_databricks_sdk_python.types.tokens import PublicTokenInfo
from azure_databricks_sdk_python.types.tokens import PublicTokenInfo, TokenId

from cattr import structure
from typing import List


class Tokens(API):
Expand All @@ -17,14 +20,12 @@ def list(self):
[PublicTokenInfo]: A list of token information for a user-workspace pair.
"""
endpoint = '/token/list'

res = self._get(endpoint)
if res.status_code == 200:
return [PublicTokenInfo(**token) for token in res.json().get('token_infos')]
else:
self._handle_error(res)
return self._safe_handle(res, structure(res.json().get('token_infos'), List[PublicTokenInfo]))

def create(self, comment: str = None, lifetime_seconds: int = 7776000):
"""Createsand return a token.
"""Create and return a token.
Args:
comment (str, optional): Optional description to attach to the token.
Expand All @@ -39,12 +40,10 @@ def create(self, comment: str = None, lifetime_seconds: int = 7776000):
endpoint = '/token/create'
data = {'lifetime_seconds': lifetime_seconds,
'comment': comment}

res = self._post(endpoint, data)
if res.status_code == 200:
return {'token_value': res.json().get('token_value'),
'token_info': PublicTokenInfo(**res.json().get('token_info'))}
else:
self._handle_error(res)
return self._safe_handle(res, {'token_value': res.json().get('token_value'),
'token_info': PublicTokenInfo(**res.json().get('token_info'))})

def delete(self, token_id: str):
"""Revoke an access token.
Expand All @@ -53,12 +52,9 @@ def delete(self, token_id: str):
token_id (str): The ID of the token to be revoked.
Returns:
Boolean: In case of success or will raise an exception.
TokenId: in case of success or will raise an exception.
"""
endpoint = '/token/delete'
data = {'token_id': token_id}
res = self._post(endpoint, data)
if res.status_code == 200:
return True
else:
self._handle_error(res)
return self._safe_handle(res, data, TokenId)

0 comments on commit 97d96d8

Please sign in to comment.