Skip to content
This repository has been archived by the owner on Feb 3, 2021. It is now read-only.

Fix: spark roll back scheduling disable #653

Merged
merged 8 commits into from Aug 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 12 additions & 16 deletions aztk/models/__init__.py
@@ -1,22 +1,18 @@
from .application_log import ApplicationLog
from .cluster import Cluster
from .cluster_configuration import ClusterConfiguration
from .custom_script import CustomScript
from .file_share import FileShare
from .toolkit import TOOLKIT_MAP, Toolkit
from .user_configuration import UserConfiguration
from .secrets_configuration import (
SecretsConfiguration,
ServicePrincipalConfiguration,
SharedKeyConfiguration,
DockerConfiguration,
)
from .file import File
from .file_share import FileShare
from .node_output import NodeOutput
from .plugins import *
# from .scheduling_target import SchedulingTarget
from .port_forward_specification import PortForwardingSpecification
from .remote_login import RemoteLogin
from .secrets_configuration import (DockerConfiguration, SecretsConfiguration, ServicePrincipalConfiguration,
SharedKeyConfiguration)
from .software import Software
from .ssh_log import SSHLog
from .toolkit import TOOLKIT_MAP, Toolkit
from .user_configuration import UserConfiguration
from .vm_image import VmImage
from .node_output import NodeOutput
from .software import Software
from .cluster import Cluster
from .scheduling_target import SchedulingTarget
from .port_forward_specification import PortForwardingSpecification
from .application_log import ApplicationLog
from .plugins import *
9 changes: 5 additions & 4 deletions aztk/models/cluster_configuration.py
Expand Up @@ -5,7 +5,7 @@
from .custom_script import CustomScript
from .file_share import FileShare
from .plugins import PluginConfiguration
from .scheduling_target import SchedulingTarget
# from .scheduling_target import SchedulingTarget
from .toolkit import Toolkit
from .user_configuration import UserConfiguration

Expand Down Expand Up @@ -37,7 +37,8 @@ class ClusterConfiguration(Model):
plugins = fields.List(PluginConfiguration)
file_shares = fields.List(FileShare)
user_configuration = fields.Model(UserConfiguration, default=None)
scheduling_target = fields.Enum(SchedulingTarget, default=None)

# scheduling_target = fields.Enum(SchedulingTarget, default=None)

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Expand Down Expand Up @@ -73,5 +74,5 @@ def __validate__(self) -> bool:
"You must configure a VNET to use AZTK in mixed mode (dedicated and low priority nodes). "
"Set the VNET's subnet_id in your cluster.yaml or with a parameter (--subnet-id).")

if self.scheduling_target == SchedulingTarget.Dedicated and self.size == 0:
raise error.InvalidModelError("Scheduling target cannot be Dedicated if dedicated vm size is 0")
# if self.scheduling_target == SchedulingTarget.Dedicated and self.size == 0:
# raise error.InvalidModelError("Scheduling target cannot be Dedicated if dedicated vm size is 0")
4 changes: 2 additions & 2 deletions aztk/node_scripts/install/install.py
Expand Up @@ -6,7 +6,7 @@
from core import config
from install import create_user, pick_master, plugins, spark, spark_container

from .node_scheduling import setup_node_scheduling
# from .node_scheduling import setup_node_scheduling


def read_cluster_config():
Expand Down Expand Up @@ -48,7 +48,7 @@ def setup_host(docker_repo: str, docker_run_options: str):

cluster_conf = read_cluster_config()

setup_node_scheduling(client, cluster_conf, is_master)
# setup_node_scheduling(client, cluster_conf, is_master)

# TODO pass azure file shares
spark_container.start_spark_container(
Expand Down
6 changes: 5 additions & 1 deletion aztk/node_scripts/install/node_scheduling.py
@@ -1,7 +1,11 @@
from azure import batch
from aztk.models import ClusterConfiguration, SchedulingTarget

from aztk.models import ClusterConfiguration
from core import config, log

# from aztk.models import SchedulingTarget
SchedulingTarget = "SchedulingTarget" # this code isn't used anywhere until scheduling_target reenabled


def disable_scheduling(batch_client: batch.BatchServiceClient):
"""
Expand Down
15 changes: 7 additions & 8 deletions aztk/spark/client/cluster/helpers/create.py
Expand Up @@ -12,19 +12,18 @@
auto_user=batch_models.AutoUserSpecification(
scope=batch_models.AutoUserScope.pool, elevation_level=batch_models.ElevationLevel.admin))


def _default_scheduling_target(vm_count: int):
if vm_count == 0:
return models.SchedulingTarget.Any
else:
return models.SchedulingTarget.Dedicated
# def _default_scheduling_target(vm_count: int):
# if vm_count == 0:
# return models.SchedulingTarget.Any
# else:
# return models.SchedulingTarget.Dedicated


def _apply_default_for_cluster_config(configuration: models.ClusterConfiguration):
cluster_conf = models.ClusterConfiguration()
cluster_conf.merge(configuration)
if cluster_conf.scheduling_target is None:
cluster_conf.scheduling_target = _default_scheduling_target(cluster_conf.size)
# if cluster_conf.scheduling_target is None:
# cluster_conf.scheduling_target = _default_scheduling_target(cluster_conf.size)
return cluster_conf


Expand Down
14 changes: 7 additions & 7 deletions aztk/spark/client/job/helpers/submit.py
Expand Up @@ -53,16 +53,16 @@ def generate_job_manager_task(core_job_operations, job, application_tasks):
return task


def _default_scheduling_target(vm_count: int):
if vm_count == 0:
return models.SchedulingTarget.Any
else:
return models.SchedulingTarget.Dedicated
# def _default_scheduling_target(vm_count: int):
# if vm_count == 0:
# return models.SchedulingTarget.Any
# else:
# return models.SchedulingTarget.Dedicated


def _apply_default_for_job_config(job_conf: models.JobConfiguration):
if job_conf.scheduling_target is None:
job_conf.scheduling_target = _default_scheduling_target(job_conf.max_dedicated_nodes)
# if job_conf.scheduling_target is None:
# job_conf.scheduling_target = _default_scheduling_target(job_conf.max_dedicated_nodes)

return job_conf

Expand Down
12 changes: 6 additions & 6 deletions aztk/spark/models/models.py
Expand Up @@ -103,7 +103,7 @@ class PluginConfiguration(aztk.models.PluginConfiguration):
pass


SchedulingTarget = aztk.models.SchedulingTarget
# SchedulingTarget = aztk.models.SchedulingTarget


class ClusterConfiguration(aztk.models.ClusterConfiguration):
Expand Down Expand Up @@ -198,7 +198,7 @@ def __init__(
max_dedicated_nodes=0,
max_low_pri_nodes=0,
subnet_id=None,
scheduling_target: SchedulingTarget = None,
# scheduling_target: SchedulingTarget = None,
worker_on_master=None,
):

Expand All @@ -214,7 +214,7 @@ def __init__(
self.max_low_pri_nodes = max_low_pri_nodes
self.subnet_id = subnet_id
self.worker_on_master = worker_on_master
self.scheduling_target = scheduling_target
# self.scheduling_target = scheduling_target

def to_cluster_config(self):
return ClusterConfiguration(
Expand All @@ -226,7 +226,7 @@ def to_cluster_config(self):
subnet_id=self.subnet_id,
worker_on_master=self.worker_on_master,
spark_configuration=self.spark_configuration,
scheduling_target=self.scheduling_target,
# scheduling_target=self.scheduling_target,
)

def mixed_mode(self) -> bool:
Expand Down Expand Up @@ -263,8 +263,8 @@ def validate(self) -> bool:
"You must configure a VNET to use AZTK in mixed mode (dedicated and low priority nodes) "
"and pass the subnet_id in your configuration..")

if self.scheduling_target == SchedulingTarget.Dedicated and self.max_dedicated_nodes == 0:
raise error.InvalidModelError("Scheduling target cannot be Dedicated if dedicated vm size is 0")
# if self.scheduling_target == SchedulingTarget.Dedicated and self.max_dedicated_nodes == 0:
# raise error.InvalidModelError("Scheduling target cannot be Dedicated if dedicated vm size is 0")


class JobState:
Expand Down
12 changes: 7 additions & 5 deletions aztk_cli/config.py
Expand Up @@ -5,7 +5,9 @@
import aztk.spark
from aztk.models import Toolkit
from aztk.models.plugins.internal import PluginReference
from aztk.spark.models import (ClusterConfiguration, SchedulingTarget, SecretsConfiguration)
from aztk.spark.models import ClusterConfiguration, SecretsConfiguration

# from aztk.spark.models import SchedulingTarget


def load_aztk_secrets() -> SecretsConfiguration:
Expand Down Expand Up @@ -179,7 +181,7 @@ def __init__(self):
self.core_site_xml = None
self.subnet_id = None
self.worker_on_master = None
self.scheduling_target = None
# self.scheduling_target = None
self.jars = []

def _merge_dict(self, config):
Expand All @@ -198,9 +200,9 @@ def _merge_dict(self, config):
self.max_low_pri_nodes = cluster_configuration.get("size_low_priority")
self.subnet_id = cluster_configuration.get("subnet_id")
self.worker_on_master = cluster_configuration.get("worker_on_master")
scheduling_target = cluster_configuration.get("scheduling_target")
if scheduling_target:
self.scheduling_target = SchedulingTarget(scheduling_target)
# scheduling_target = cluster_configuration.get("scheduling_target")
# if scheduling_target:
# self.scheduling_target = SchedulingTarget(scheduling_target)

applications = config.get("applications")
if applications:
Expand Down
2 changes: 0 additions & 2 deletions aztk_cli/config/cluster.yaml
Expand Up @@ -61,8 +61,6 @@ plugins:
# Allow master node to also be a worker <true/false> (Default: true)
# worker_on_master: true

# Where do you want to run the driver <dedicated/master/any> (Default: dedicated if at least one dedicated node or any otherwise)
# scheduling_target: dedicated

# wait: <true/false>
wait: false
2 changes: 0 additions & 2 deletions aztk_cli/config/job.yaml
Expand Up @@ -25,8 +25,6 @@ job:
# Optional command line options to pass to `docker run`
# docker_run_options: <additional command line options to pass to `docker run` (for more information, see https://github.com/Azure/aztk/blob/master/docs/12-docker-image.md)>

# Where do you want to run the driver <dedicated/master/any> (Default: dedicated if at least one dedicated node or any otherwise)
# scheduling_target: dedicated

spark_configuration:
spark_defaults_conf: .aztk/spark-defaults.conf
Expand Down
2 changes: 1 addition & 1 deletion aztk_cli/spark/endpoints/job/submit.py
Expand Up @@ -48,7 +48,7 @@ def execute(args: typing.NamedTuple):
max_low_pri_nodes=job_conf.max_low_pri_nodes,
subnet_id=job_conf.subnet_id,
worker_on_master=job_conf.worker_on_master,
scheduling_target=job_conf.scheduling_target,
# scheduling_target=job_conf.scheduling_target,
)

# TODO: utils.print_job_conf(job_configuration)
Expand Down
6 changes: 4 additions & 2 deletions docs/13-configuration.md
Expand Up @@ -47,12 +47,14 @@ plugins:
# Allow master node to also be a worker <true/false> (Default: true)
# worker_on_master: true

# Where do you want to run the driver <dedicated/master/any> (Default: dedicated if at least one dedicated node or any otherwise)
# scheduling_target: dedicated

# wait: <true/false>
wait: true
```
<!--- this goes about wait: true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should read "this goes after wait: true". The reason this is here is because you can't comment out things inside a code block in markdown, so I moved the scheduling stuff outside the code block and into a comment. This is just meant to point to where it should go when it is uncommented.

# Where do you want to run the driver <dedicated/master/any> (Default: dedicated if at least one dedicated node or any otherwise)
# scheduling_target: dedicated
-->

Running `aztk spark cluster create` will create a cluster of 4 **Standard\_A2** nodes called 'spark\_cluster' with a linux user named 'spark'. This is equivalent to running the command

Expand Down
25 changes: 13 additions & 12 deletions tests/models/test_cluster_configuration.py
@@ -1,7 +1,8 @@
import pytest

from aztk.error import InvalidModelError
from aztk.models import (ClusterConfiguration, SchedulingTarget, Toolkit, UserConfiguration)
from aztk.models import ClusterConfiguration, Toolkit, UserConfiguration
# from aztk.models import SchedulingTarget
from aztk.spark.models.plugins import HDFSPlugin, JupyterPlugin


Expand Down Expand Up @@ -60,15 +61,15 @@ def test_cluster_configuration():
assert config.plugins[1].name == 'hdfs'


def test_scheduling_target_dedicated_with_no_dedicated_nodes_raise_error():
with pytest.raises(InvalidModelError, match="Scheduling target cannot be Dedicated if dedicated vm size is 0"):
conf = ClusterConfiguration(
cluster_id="abc",
scheduling_target=SchedulingTarget.Dedicated,
vm_size="standard_a2",
size=0,
size_low_priority=2,
toolkit=Toolkit(software="spark", version="1.6.3"),
)
# def test_scheduling_target_dedicated_with_no_dedicated_nodes_raise_error():
# with pytest.raises(InvalidModelError, match="Scheduling target cannot be Dedicated if dedicated vm size is 0"):
# conf = ClusterConfiguration(
# cluster_id="abc",
# scheduling_target=SchedulingTarget.Dedicated,
# vm_size="standard_a2",
# size=0,
# size_low_priority=2,
# toolkit=Toolkit(software="spark", version="1.6.3"),
# )

conf.validate()
# conf.validate()