Skip to content
This repository has been archived by the owner on Feb 3, 2021. It is now read-only.

Feature: Spark scheduling target #661

Merged
merged 81 commits into from Oct 23, 2018

Conversation

jafreck
Copy link
Member

@jafreck jafreck commented Sep 20, 2018

Reintroduce scheduling_target with a direct-to-node implementation that does not rely on Batch scheduling.

Reqs:

  • Submit application to master node
  • Resource file ingress
  • Get application status
  • Get application log
  • output / error file egress
  • get application exit_code
  • get application execution time
  • Add integration test for scheduling_target

This implementation should be aligned as much as possible with the standard Batch Task scheduling feature to maximize code reuse.

Fix #670
Fix #527

@@ -68,7 +68,9 @@ def get_log(batch_client, blob_client, cluster_id: str, application_name: str, t

task = __wait_for_app_to_be_running(batch_client, cluster_id, application_name)

if not __check_task_node_exist(batch_client, cluster_id, task):
#TODO: find a better way to detect ghost tasks -- metatdata
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be done before merging. Only detecting TaskState.completed is too flimsy, especially around many concurrent application submits.



def _download_resource_file(task_id, resource_file):
# timeout = 30 # set to default blob download timeout
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uncomment or remove?

if resource_file.file_path:
write_path = os.path.join(os.environ.get("AZ_BATCH_TASK_WORKING_DIR"), resource_file.file_path)
with open(write_path, 'wb') as stream:
for chunk in response.iter_content(chunk_size=16777216):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

16777216 magic number, comment why this?

serialized_task_sas_url = sys.argv[1]

try:
return_code = ssh_submit(serialized_task_sas_url)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if the ssh connection is killed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the ssh session only exists to kick off this process, it does not live until completion. So this return_code is not sent back to the client.

This could be written to storage for retrieval, though. Spark application status, execution time, and return value are still open issues here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok sounds good then

@@ -13,7 +13,7 @@
],
"python.formatting.provider": "yapf",
"python.venvPath": "${workspaceFolder}/.venv/",
"python.pythonPath": "${workspaceFolder}/.venv/Scripts/python.exe",
"python.pythonPath": ".venv\\Scripts\\python.exe",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make this a windows specific settings(I think if you do

"windows": {
  "python.pythonPath": ".venv\\Scripts\\python.exe"
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vscode doesn't like the windows key, not sure if this is possible. Do you have any docs on this? Would be really nice to have OS specific settings.


def get_task_status(core_cluster_operations, cluster_id: str, task_id: str):
try:
# TODO: return TaskState object instead of str
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo?

aztk/client/base/helpers/get_application_log.py Outdated Show resolved Hide resolved
aztk/client/base/helpers/list_tasks.py Outdated Show resolved Hide resolved
aztk/utils/retry.py Show resolved Hide resolved
tests/integration_tests/spark/sdk/clean_up_cluster.py Outdated Show resolved Hide resolved
tests/integration_tests/spark/sdk/cluster/test_cluster.py Outdated Show resolved Hide resolved
try:
tasks.append(yaml.load(stream))
except yaml.YAMLError as exc:
print(exc)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

print => log

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is captured in the task output and uploaded to storage. For user visibility into task errors, I think we should upload errors.

@jafreck jafreck merged commit 4408c4f into Azure:master Oct 23, 2018
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants