Skip to content
This repository has been archived by the owner on Feb 3, 2021. It is now read-only.

Feature: managed storage for clusters and jobs #443

Merged
merged 6 commits into from
Mar 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion aztk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def _get_cluster_data(self, cluster_id: str) -> cluster_data.ClusterData:
General Batch Operations
'''

def __delete_pool_and_job(self, pool_id: str):
def __delete_pool_and_job(self, pool_id: str, keep_logs: bool = False):
"""
Delete a pool and it's associated job
:param cluster_id: the pool to add the user to
Expand All @@ -61,6 +61,10 @@ def __delete_pool_and_job(self, pool_id: str):

if pool_exists:
self.batch_client.pool.delete(pool_id)

if not keep_logs:
cluster_data = self._get_cluster_data(pool_id)
cluster_data.delete_container(pool_id)

return job_exists or pool_exists

Expand Down Expand Up @@ -256,6 +260,7 @@ def __cluster_copy(self, cluster_id, container_name, source_path, destination_pa
self.__delete_user_on_pool('aztk', pool.id, nodes)
except (OSError, batch_error.BatchErrorException) as exc:
raise exc

def __submit_job(self,
job_configuration,
start_task,
Expand Down
3 changes: 3 additions & 0 deletions aztk/internal/cluster_data/cluster_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,6 @@ def upload_node_data(self, node_data: NodeData) -> BlobData:

def _ensure_container(self):
self.blob_client.create_container(self.cluster_id, fail_on_exist=False)

def delete_container(self, container_name: str):
self.blob_client.delete_container(container_name)
8 changes: 4 additions & 4 deletions aztk/spark/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ def create_clusters_in_parallel(self, cluster_confs):
for cluster_conf in cluster_confs:
self.create_cluster(cluster_conf)

def delete_cluster(self, cluster_id: str):
def delete_cluster(self, cluster_id: str, keep_logs: bool = False):
try:
return self.__delete_pool_and_job(cluster_id)
return self.__delete_pool_and_job(cluster_id, keep_logs)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

Expand Down Expand Up @@ -239,9 +239,9 @@ def stop_job(self, job_id):
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

def delete_job(self, job_id):
def delete_job(self, job_id: str, keep_logs: bool = False):
try:
return job_submit_helper.delete(self, job_id)
return job_submit_helper.delete(self, job_id, keep_logs)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

Expand Down
7 changes: 6 additions & 1 deletion aztk/spark/helpers/job_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def stop(spark_client, job_id):
spark_client.batch_client.job_schedule.terminate(job_id)


def delete(spark_client, job_id):
def delete(spark_client, job_id, keep_logs: bool = False):
recent_run_job = __get_recent_job(spark_client, job_id)
deleted_job_or_job_schedule = False
# delete job
Expand All @@ -138,6 +138,11 @@ def delete(spark_client, job_id):
except batch_models.batch_error.BatchErrorException:
pass

# delete storage container
if keep_logs:
cluster_data = spark_client._get_cluster_data(job_id)
cluster_data.delete_container(job_id)

return deleted_job_or_job_schedule


Expand Down
14 changes: 11 additions & 3 deletions aztk_cli/spark/endpoints/cluster/cluster_delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,34 @@ def setup_parser(parser: argparse.ArgumentParser):
dest='cluster_id',
required=True,
help='The unique id of your spark cluster')
parser.add_argument('--force',
parser.add_argument('--force', '-f',
dest='force',
required=False,
action='store_true',
help='Do not prompt for confirmation, force deletion of cluster.')
parser.set_defaults(force=False)
parser.add_argument('--keep-logs', '-k',
dest='keep_logs',
action='store_true',
required=False,
help='Prevent logs in storage from being deleted.')
parser.set_defaults(force=False, keep_logs=False)


def execute(args: typing.NamedTuple):
spark_client = aztk.spark.Client(config.load_aztk_secrets())
cluster_id = args.cluster_id

if not args.force:
if not args.keep_logs:
log.warn("All logs persisted for this cluster will be deleted.")

confirmation_cluster_id = input("Please confirm the id of the cluster you wish to delete: ")

if confirmation_cluster_id != cluster_id:
log.error("Confirmation cluster id does not match. Please try again.")
return

if spark_client.delete_cluster(cluster_id):
if spark_client.delete_cluster(cluster_id, args.keep_logs):
log.info("Deleting cluster %s", cluster_id)
else:
log.error("Cluster with id '%s' doesn't exist or was already deleted.", cluster_id)
14 changes: 11 additions & 3 deletions aztk_cli/spark/endpoints/job/delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@ def setup_parser(parser: argparse.ArgumentParser):
dest='job_id',
required=True,
help='The unique id of your AZTK Job')
parser.add_argument('--force',
parser.add_argument('--force', '-f',
dest='force',
required=False,
action='store_true',
help='Do not prompt for confirmation, force deletion of cluster.')
parser.set_defaults(force=False)
parser.add_argument('--keep-logs', '-k',
dest='keep_logs',
action='store_true',
required=False,
help='Prevent logs in storage from being deleted.')
parser.set_defaults(force=False, keep_logs=False)


def execute(args: typing.NamedTuple):
Expand All @@ -25,13 +30,16 @@ def execute(args: typing.NamedTuple):
# check if job exists before prompting for confirmation
spark_client.get_job(job_id)

if not args.keep_logs:
log.warn("All logs persisted for this job will be deleted.")

confirmation_cluster_id = input("Please confirm the id of the cluster you wish to delete: ")

if confirmation_cluster_id != job_id:
log.error("Confirmation cluster id does not match. Please try again.")
return

if spark_client.delete_job(job_id):
if spark_client.delete_job(job_id, args.keep_logs):
log.info("Deleting Job %s", job_id)
else:
log.error("Job with id '%s' doesn't exist or was already deleted.", job_id)
3 changes: 2 additions & 1 deletion docs/10-clusters.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ aztk spark cluster create --id spark --vm-size standard_a2 --size 4

You can find more information on VM sizes [here.](https://docs.microsoft.com/en-us/azure/virtual-machines/linux/sizes) Please note that you must use the official SKU name when setting your VM size - they usually come in the form: "standard_d2_v2".

NOTE: The cluster id (`--id`) can only contain alphanumeric characters including hyphens and underscores, and cannot contain more than 64 characters. Each cluster **must** have a unique cluster id.
_Note: The cluster id (`--id`) can only contain alphanumeric characters including hyphens and underscores, and cannot contain more than 64 characters. Each cluster **must** have a unique cluster id._

By default, you cannot create clusters of more than 20 cores in total. Visit [this page](https://docs.microsoft.com/en-us/azure/batch/batch-quota-limit#view-batch-quotas) to request a core quota increase.

Expand Down Expand Up @@ -74,6 +74,7 @@ To delete a cluster run:
```sh
aztk spark cluster delete --id <your_cluster_id>
```
Deleting a cluster also permanently deletes any data or logs associated with that cluster. If you wish to persist this data, use the `--keep-logs` flag.

__You are charged for the cluster as long as the nodes are provisioned in your account.__ Make sure to delete any clusters you are not using to avoid unwanted costs.

Expand Down
10 changes: 7 additions & 3 deletions docs/50-sdk.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,16 @@ Find some samples and getting stated tutorial in the `examples/sdk/` directory o

- None

- `delete_cluster(self, cluster_id: str)`
- `delete_cluster(self, cluster_id: str, keep_logs: bool = False)`

Delete an AZTK cluster with the given ID

Parameters:

- cluster_id: str
- The ID of the cluster to delete
- keep_logs: bool
- If true, the logs associated with this cluster will not be deleted.

Returns:

Expand Down Expand Up @@ -291,15 +293,17 @@ Find some samples and getting stated tutorial in the `examples/sdk/` directory o

- None

- `delete_job(self, job_id)`
- `delete_job(self, job_id, keep_logs: bool = False)`

Delete the AZTK Spark Job with id job_id

Parameters:

- job_id: str
The id of the Job

- keep_logs: bool
- If true, the logs associated with this Job will not be deleted.

Returns:

- bool
Expand Down
1 change: 1 addition & 0 deletions docs/70-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ To delete a Job run:
```sh
aztk spark job delete --id <your_job_id>
```
Deleting a Job also permanently deletes any data or logs associated with that cluster. If you wish to persist this data, use the `--keep-logs` flag.

__You are only charged for the job while it is active, Jobs handle provisioning and destorying infrastructure, so you are only charged for the time that your applications are running.__

Expand Down