Skip to content
This repository has been archived by the owner on Feb 3, 2021. It is now read-only.

Commit

Permalink
Feature: SDK refactor (#622)
Browse files Browse the repository at this point in the history
* start refactor

* continue refactor for cluster and job functions

* fix imports

* fixes

* fixes

* refactor integration test secrets management

* fix cluster create, add new test

* add tests for new sdk api and fix bugs

* fix naming and bugs

* update job operations naming, bug fixes

* fix cluster tests

* fix joboperations and tests

* update cli and fix some bugs

* start fixes

* fix pylint errors, bugs

* add deprecated warning checks, rename tests

* add docstrings for baseoperations

* add docstrings

* docstrings, add back compat for coreclient, fix init for spark client

* whitespace

* docstrings, whitespace

* docstrings, fixes

* docstrings, fixes

* fix the sdk documentation, bugs

* fix method call

* pool_id->id

* rename ids

* cluster_id->id

* cluster_id->id

* add todo

* fixes

* add some todos

* rename pool to cluster, add todo for nodes params

* add todos for nodes param removal

* update functions names

* remove deprecated fucntion calls

* update docs and docstrings

* update docstrings

* get rid of TODOs, fix docstrings

* remove unused setting

* inheritance -> composition

* fix models bugs

* fix create_user bug

* update sdk_example.py

* fix create user argument issue

* update sdk_example.py

* update doc

* use Software model instead of string

* add job wait flag, add cluster application wait functions

* add docs for wait, update tests

* fix bug

* add clientrequesterror catch to fix tests
  • Loading branch information
jafreck committed Aug 3, 2018
1 parent c9fd8bb commit b18eb69
Show file tree
Hide file tree
Showing 111 changed files with 3,711 additions and 842 deletions.
2 changes: 1 addition & 1 deletion .style.yapf
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ based_on_style=pep8
spaces_before_comment=4
split_before_logical_operator=True
indent_width=4
column_limit=140
column_limit=120
split_arguments_when_comma_terminated=True
2 changes: 1 addition & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@
"python.formatting.provider": "yapf",
"python.venvPath": "${workspaceFolder}/.venv/",
"python.pythonPath": "${workspaceFolder}/.venv/Scripts/python.exe",
"python.unitTest.pyTestEnabled": true
"python.unitTest.pyTestEnabled": true,
}
1 change: 1 addition & 0 deletions aztk/client/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .client import CoreClient
1 change: 1 addition & 0 deletions aztk/client/base/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .base_operations import BaseOperations
223 changes: 223 additions & 0 deletions aztk/client/base/base_operations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
from aztk import models
from aztk.internal import cluster_data
from aztk.utils import ssh as ssh_lib

from .helpers import (create_user_on_cluster, create_user_on_node, delete_user_on_cluster, delete_user_on_node,
generate_user_on_cluster, generate_user_on_node, get_application_log, get_remote_login_settings,
node_run, run, ssh_into_node)


class BaseOperations:
"""Base operations that all other operations have as an attribute
Attributes:
batch_client (:obj:`azure.batch.batch_service_client.BatchServiceClient`): Client used to interact with the
Azure Batch service.
blob_client (:obj:`azure.storage.blob.BlockBlobService`): Client used to interact with the Azure Storage
Blob service.
secrets_configuration (:obj:`aztk.models.SecretsConfiguration`): Model that holds AZTK secrets used to authenticate
with Azure and the clusters.
"""

def __init__(self, context):
self.batch_client = context['batch_client']
self.blob_client = context['blob_client']
self.secrets_configuration = context['secrets_configuration']

def get_cluster_config(self, id: str) -> models.ClusterConfiguration:
"""Open an ssh tunnel to a node
Args:
id (:obj:`str`): the id of the cluster the node is in
node_id (:obj:`str`): the id of the node to open the ssh tunnel to
username (:obj:`str`): the username to authenticate the ssh session
ssh_key (:obj:`str`, optional): ssh public key to create the user with, must use ssh_key
or password. Defaults to None.
password (:obj:`str`, optional): password for the user, must use ssh_key or password. Defaults to None.
port_forward_list (:obj:`List[PortForwardingSpecification`, optional): list of PortForwardingSpecifications.
The defined ports will be forwarded to the client.
internal (:obj:`bool`, optional): if True, this will connect to the node using its internal IP.
Only use this if running within the same VNET as the cluster. Defaults to False.
Returns:
:obj:`aztk.models.ClusterConfiguration`: Object representing the cluster's configuration
"""
return self.get_cluster_data(id).read_cluster_config()

def get_cluster_data(self, id: str) -> cluster_data.ClusterData:
"""Gets the ClusterData object to manage data related to the given cluster
Args:
id (:obj:`str`): the id of the cluster to get
Returns:
:obj:`aztk.models.ClusterData`: Object used to manage the data and storage functions for a cluster
"""
return cluster_data.ClusterData(self.blob_client, id)

def ssh_into_node(self, id, node_id, username, ssh_key=None, password=None, port_forward_list=None, internal=False):
"""Open an ssh tunnel to a node
Args:
id (:obj:`str`): the id of the cluster the node is in
node_id (:obj:`str`): the id of the node to open the ssh tunnel to
username (:obj:`str`): the username to authenticate the ssh session
ssh_key (:obj:`str`, optional): ssh public key to create the user with, must use ssh_key or password. Defaults to None.
password (:obj:`str`, optional): password for the user, must use ssh_key or password. Defaults to None.
port_forward_list (:obj:`List[PortForwardingSpecification`, optional): list of PortForwardingSpecifications.
The defined ports will be forwarded to the client.
internal (:obj:`bool`, optional): if True, this will connect to the node using its internal IP.
Only use this if running within the same VNET as the cluster. Defaults to False.
Returns:
:obj:`None`
"""
ssh_into_node.ssh_into_node(self, id, node_id, username, ssh_key, password, port_forward_list, internal)

def create_user_on_node(self, id, node_id, username, ssh_key=None, password=None):
"""Create a user on a node
Args:
id (:obj:`str`): id of the cluster to create the user on.
node_id (:obj:`str`): id of the node in the cluster to create the user on.
username (:obj:`str`): name of the user to create.
ssh_key (:obj:`str`, optional): ssh public key to create the user with, must use ssh_key or password.
password (:obj:`str`, optional): password for the user, must use ssh_key or password.
Returns:
:obj:`None`
"""
return create_user_on_node.create_user_on_node(self, id, node_id, username, ssh_key, password)

#TODO: remove nodes as param
def create_user_on_cluster(self, id, nodes, username, ssh_pub_key=None, password=None):
"""Create a user on every node in the cluster
Args:
username (:obj:`str`): name of the user to create.
id (:obj:`str`): id of the cluster to create the user on.
nodes (:obj:`List[ComputeNode]`): list of nodes to create the user on
ssh_key (:obj:`str`, optional): ssh public key to create the user with, must use ssh_key or password. Defaults to None.
password (:obj:`str`, optional): password for the user, must use ssh_key or password. Defaults to None.
Returns:
:obj:`None`
"""
return create_user_on_cluster.create_user_on_cluster(self, id, nodes, username, ssh_pub_key, password)

def generate_user_on_node(self, id, node_id):
"""Create a user with an autogenerated username and ssh_key on the given node.
Args:
id (:obj:`str`): the id of the cluster to generate the user on.
node_id (:obj:`str`): the id of the node in the cluster to generate the user on.
Returns:
:obj:`tuple`: A tuple of the form (username: :obj:`str`, ssh_key: :obj:`Cryptodome.PublicKey.RSA`)
"""
return generate_user_on_node.generate_user_on_node(self, id, node_id)

#TODO: remove nodes as param
def generate_user_on_cluster(self, id, nodes):
"""Create a user with an autogenerated username and ssh_key on the cluster
Args:
id (:obj:`str`): the id of the cluster to generate the user on.
node_id (:obj:`str`): the id of the node in the cluster to generate the user on.
Returns:
:obj:`tuple`: A tuple of the form (username: :obj:`str`, ssh_key: :obj:`Cryptodome.PublicKey.RSA`)
"""
return generate_user_on_cluster.generate_user_on_cluster(self, id, nodes)

def delete_user_on_node(self, id: str, node_id: str, username: str) -> str:
"""Delete a user on a node
Args:
id (:obj:`str`): the id of the cluster to delete the user on.
node_id (:obj:`str`): the id of the node in the cluster to delete the user on.
username (:obj:`str`): the name of the user to delete.
Returns:
:obj:`None`
"""
return delete_user_on_node.delete_user(self, id, node_id, username)

#TODO: remove nodes as param
def delete_user_on_cluster(self, username, id, nodes):
"""Delete a user on every node in the cluster
Args:
id (:obj:`str`): the id of the cluster to delete the user on.
node_id (:obj:`str`): the id of the node in the cluster to delete the user on.
username (:obj:`str`): the name of the user to delete.
Returns:
:obj:`None`
"""
return delete_user_on_cluster.delete_user_on_cluster(self, username, id, nodes)

def node_run(self, id, node_id, command, internal, container_name=None, timeout=None):
"""Run a bash command on the given node
Args:
id (:obj:`str`): the id of the cluster to run the command on.
node_id (:obj:`str`): the id of the node in the cluster to run the command on.
command (:obj:`str`): the bash command to execute on the node.
internal (:obj:`bool`): if True, this will connect to the node using its internal IP.
Only use this if running within the same VNET as the cluster. Defaults to False.
container_name=None (:obj:`str`, optional): the name of the container to run the command in.
If None, the command will run on the host VM. Defaults to None.
timeout=None (:obj:`str`, optional): The timeout in seconds for establishing a connection to the node.
Defaults to None.
Returns:
:obj:`aztk.models.NodeOutput`: object containing the output of the run command
"""
return node_run.node_run(self, id, node_id, command, internal, container_name, timeout)

def get_remote_login_settings(self, id: str, node_id: str):
"""Get the remote login information for a node in a cluster
Args:
id (:obj:`str`): the id of the cluster the node is in
node_id (:obj:`str`): the id of the node in the cluster
Returns:
:obj:`aztk.models.RemoteLogin`: Object that contains the ip address and port combination to login to a node
"""
return get_remote_login_settings.get_remote_login_settings(self, id, node_id)

def run(self, id, command, internal, container_name=None, timeout=None):
"""Run a bash command on every node in the cluster
Args:
id (:obj:`str`): the id of the cluster to run the command on.
command (:obj:`str`): the bash command to execute on the node.
internal (:obj:`bool`): if true, this will connect to the node using its internal IP.
Only use this if running within the same VNET as the cluster. Defaults to False.
container_name=None (:obj:`str`, optional): the name of the container to run the command in.
If None, the command will run on the host VM. Defaults to None.
timeout=None (:obj:`str`, optional): The timeout in seconds for establishing a connection to the node.
Defaults to None.
Returns:
:obj:`List[azkt.models.NodeOutput]`: list of NodeOutput objects containing the output of the run command
"""
return run.cluster_run(self, id, command, internal, container_name, timeout)

def get_application_log(self, id: str, application_name: str, tail=False, current_bytes: int = 0):
"""Get the log for a running or completed application
Args:
id (:obj:`str`): the id of the cluster to run the command on.
application_name (:obj:`str`): str
tail (:obj:`bool`, optional): If True, get the remaining bytes after current_bytes. Otherwise, the whole log will be retrieved.
Only use this if streaming the log as it is being written. Defaults to False.
current_bytes (:obj:`int`): Specifies the last seen byte, so only the bytes after current_bytes are retrieved.
Only useful is streaming the log as it is being written. Only used if tail is True.
Returns:
:obj:`aztk.models.ApplicationLog`: a model representing the output of the application.
"""
return get_application_log.get_application_log(self, id, application_name, tail, current_bytes)
Empty file.
11 changes: 11 additions & 0 deletions aztk/client/base/helpers/create_user_on_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import concurrent.futures


#TODO: remove nodes param
def create_user_on_cluster(base_operations, id, nodes, username, ssh_pub_key=None, password=None):
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {
executor.submit(base_operations.create_user_on_node, id, node.id, username, ssh_pub_key, password): node
for node in nodes
}
concurrent.futures.wait(futures)
42 changes: 42 additions & 0 deletions aztk/client/base/helpers/create_user_on_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from datetime import datetime, timedelta, timezone

import azure.batch.models as batch_models
import azure.batch.models.batch_error as batch_error

from aztk import models
from aztk.utils import get_ssh_key


def __create_user(self, id: str, node_id: str, username: str, password: str = None, ssh_key: str = None) -> str:
"""
Create a pool user
:param pool: the pool to add the user to
:param node: the node to add the user to
:param username: username of the user to add
:param password: password of the user to add
:param ssh_key: ssh_key of the user to add
"""
# Create new ssh user for the given node
self.batch_client.compute_node.add_user(
id,
node_id,
batch_models.ComputeNodeUser(
name=username,
is_admin=True,
password=password,
ssh_public_key=get_ssh_key.get_user_public_key(ssh_key, self.secrets_configuration),
expiry_time=datetime.now(timezone.utc) + timedelta(days=365),
),
)


def create_user_on_node(base_client, id, node_id, username, ssh_key=None, password=None):
try:
__create_user(
base_client, id=id, node_id=node_id, username=username, ssh_key=ssh_key, password=password)
except batch_error.BatchErrorException as error:
try:
base_client.delete_user_on_node(id, node_id, username)
base_client.create_user_on_node(id=id, node_id=node_id, username=username, ssh_key=ssh_key)
except batch_error.BatchErrorException as error:
raise error
7 changes: 7 additions & 0 deletions aztk/client/base/helpers/delete_user_on_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import concurrent.futures

#TODO: remove nodes param
def delete_user_on_cluster(base_client, id, nodes, username):
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(base_client.delete_user_on_node, id, node.id, username) for node in nodes]
concurrent.futures.wait(futures)
9 changes: 9 additions & 0 deletions aztk/client/base/helpers/delete_user_on_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
def delete_user(self, pool_id: str, node_id: str, username: str) -> str:
"""
Create a pool user
:param pool: the pool to add the user to
:param node: the node to add the user to
:param username: username of the user to add
"""
# Delete a user on the given node
self.batch_client.compute_node.delete_user(pool_id, node_id, username)
20 changes: 20 additions & 0 deletions aztk/client/base/helpers/generate_user_on_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import concurrent.futures

from Cryptodome.PublicKey import RSA

from aztk.utils import secure_utils


#TODO: remove nodes param
def generate_user_on_cluster(base_operations, id, nodes):
generated_username = secure_utils.generate_random_string()
ssh_key = RSA.generate(2048)
ssh_pub_key = ssh_key.publickey().exportKey('OpenSSH').decode('utf-8')
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {
executor.submit(base_operations.create_user_on_node, id, node.id, generated_username, ssh_pub_key): node
for node in nodes
}
concurrent.futures.wait(futures)

return generated_username, ssh_key
11 changes: 11 additions & 0 deletions aztk/client/base/helpers/generate_user_on_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from Cryptodome.PublicKey import RSA

from aztk.utils import secure_utils


def generate_user_on_node(base_client, pool_id, node_id):
generated_username = secure_utils.generate_random_string()
ssh_key = RSA.generate(2048)
ssh_pub_key = ssh_key.publickey().exportKey('OpenSSH').decode('utf-8')
base_client.create_user_on_node(pool_id, node_id, generated_username, ssh_pub_key)
return generated_username, ssh_key
Loading

0 comments on commit b18eb69

Please sign in to comment.