Skip to content
This repository has been archived by the owner on Feb 3, 2021. It is now read-only.

Feature: add brief flag to debug tool #634

Merged
merged 6 commits into from
Aug 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions aztk/spark/client/cluster/helpers/diagnostics.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
from aztk.utils import helpers


def _run(spark_cluster_operations, cluster_id, output_directory=None):
def _run(spark_cluster_operations, cluster_id, output_directory=None, brief=False):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is brief the right name for this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure... I feel like it makes sense but I'm open to anything. This flag is meant to only grab the most important logs for an initial investigation, whereas without the flag, the output is very verbose.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

--silent --quiet maybe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

--quiet and --silent to me means control the amount of output to the console, not "write a subset of the files to disk" which is what we are doing here.

# copy debug program to each node
output = spark_cluster_operations.copy(
cluster_id, os.path.abspath("./aztk/spark/utils/debug.py"), "/tmp/debug.py", host=True)
ssh_cmd = _build_diagnostic_ssh_command()
ssh_cmd = _build_diagnostic_ssh_command(brief)
run_output = spark_cluster_operations.run(cluster_id, ssh_cmd, host=True)
remote_path = "/tmp/debug.zip"
result = None
Expand All @@ -19,25 +19,25 @@ def _run(spark_cluster_operations, cluster_id, output_directory=None):
result = spark_cluster_operations.download(cluster_id, remote_path, local_path, host=True)

# write run output to debug/ directory
with open(os.path.join(os.path.dirname(local_path), "debug-output.txt"), 'w', encoding="UTF-8") as f:
[f.write(line + '\n') for node_output in run_output for line in node_output.output]
with open(os.path.join(output_directory, "debug-output.txt"), 'w', encoding="UTF-8") as f:
[f.write(node_output.output + '\n') for node_output in run_output]
else:
result = spark_cluster_operations.download(cluster_id, remote_path, host=True)

return result


def _build_diagnostic_ssh_command():
def _build_diagnostic_ssh_command(brief):
return "sudo rm -rf /tmp/debug.zip; "\
"sudo apt-get install -y python3-pip; "\
"sudo -H pip3 install --upgrade pip; "\
"sudo -H pip3 install docker; "\
"sudo python3 /tmp/debug.py"
"sudo python3 /tmp/debug.py {}".format(brief)


def run_cluster_diagnostics(spark_cluster_operations, cluster_id, output_directory=None):
def run_cluster_diagnostics(spark_cluster_operations, cluster_id, output_directory=None, brief=False):
try:
output = _run(spark_cluster_operations, cluster_id, output_directory)
output = _run(spark_cluster_operations, cluster_id, output_directory, brief)
return output
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
4 changes: 2 additions & 2 deletions aztk/spark/client/cluster/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ def download(self,
return download.cluster_download(self._core_cluster_operations, id, source_path, destination_path, host,
internal, timeout)

def diagnostics(self, id, output_directory=None):
def diagnostics(self, id, output_directory: str = None, brief: bool = False):
"""Download a file from every node in a cluster.

Args:
Expand All @@ -206,7 +206,7 @@ def diagnostics(self, id, output_directory=None):
Returns:
:obj:`List[aztk.spark.models.NodeOutput]`: A list of NodeOutput objects representing the output of the copy command.
"""
return diagnostics.run_cluster_diagnostics(self, id, output_directory)
return diagnostics.run_cluster_diagnostics(self, id, output_directory, brief)

def get_application_log(self, id: str, application_name: str, tail=False, current_bytes: int = 0):
"""Get the log for a running or completed application
Expand Down
39 changes: 31 additions & 8 deletions aztk/spark/utils/debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import json
import os
import socket
import sys
import tarfile
from subprocess import STDOUT, CalledProcessError, check_output
from zipfile import ZIP_DEFLATED, ZipFile
Expand All @@ -14,16 +15,24 @@


def main():
zipf = create_zip_archive()

# general node diagnostics
zipf.writestr("hostname.txt", data=get_hostname())
zipf.writestr("df.txt", data=get_disk_free())
brief = sys.argv[1] == "True"

# docker container diagnostics
docker_client = docker.from_env()
for filename, data in get_docker_diagnostics(docker_client):
zipf.writestr(filename, data=data)

zipf = create_zip_archive()

if brief:
for filename, data in get_brief_diagnostics():
print("writing {} to zip", filename)
zipf.writestr(filename, data=data)
else:
# general node diagnostics
zipf.writestr("hostname.txt", data=get_hostname())
zipf.writestr("df.txt", data=get_disk_free())

for filename, data in get_docker_diagnostics(docker_client):
zipf.writestr(filename, data=data)

zipf.close()

Expand Down Expand Up @@ -99,7 +108,7 @@ def get_docker_containers(docker_client):

def get_docker_process_status(container):
try:
exit_code, output = container.exec_run("ps -auxw", tty=True, privileged=True)
exit_code, output = container.exec_run("ps faux", privileged=True)
out_file_name = container.name + "/ps_aux.txt"
if exit_code == 0:
return (out_file_name, output)
Expand Down Expand Up @@ -159,5 +168,19 @@ def extract_tar_in_memory(container, data):
return logs


def get_brief_diagnostics():
batch_dir = "/mnt/batch/tasks/startup/"
files = ["stdout.txt", "stderr.txt", "wd/logs/docker.log"]
logs = []
for file_name in files:
try:
logs.append((file_name, open(batch_dir + file_name, 'rb').read()))
# print("LOG:", (file_name, open(batch_dir+file_name, 'rb').read()))
except FileNotFoundError as e:
print("file not found", e)
logs.append((file_name, bytes(e.__str__(), encoding="utf-8")))
return logs


if __name__ == "__main__":
main()
7 changes: 5 additions & 2 deletions aztk_cli/spark/endpoints/cluster/cluster_debug.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import argparse
import os
import typing
import time
import typing

import aztk.spark
from aztk_cli import config, utils
Expand All @@ -11,6 +11,9 @@ def setup_parser(parser: argparse.ArgumentParser):
parser.add_argument('--id', dest='cluster_id', required=True, help='The unique id of your spark cluster')

parser.add_argument('--output', '-o', required=False, help='the directory for the output folder')
parser.add_argument(
'--brief', '-b', required=False, action='store_true', help='Only gets a small subset of key logs')
parser.set_defaults(brief=False)


def execute(args: typing.NamedTuple):
Expand All @@ -20,5 +23,5 @@ def execute(args: typing.NamedTuple):
if not args.output:
args.output = os.path.join(os.getcwd(), "debug-{0}-{1}".format(args.cluster_id, timestr))
with utils.Spinner():
spark_client.cluster.diagnostics(id=args.cluster_id, output_directory=args.output)
spark_client.cluster.diagnostics(id=args.cluster_id, output_directory=args.output, brief=args.brief)
# TODO: analyze results, display some info about status
9 changes: 9 additions & 0 deletions docs/10-clusters.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,15 @@ The debug utility will pull logs from all nodes in the cluster. The utility will

__Please be careful sharing the output of the `debug` command as secrets and application code are present in the output.__

Pass the `--brief` flag to only download the most essential logs from each node:
```sh
aztk spark cluster debug --id <cluster-id> --output </path/to/output/directory/> --brief
```
This command will retrieve:
- stdout file from the node's startup
- stderr file from the node's startup
- the docker log for the spark container


### Interact with your Spark cluster
By default, the `aztk spark cluster ssh` command port forwards the Spark Web UI to *localhost:8080*, Spark Jobs UI to *localhost:4040*, and Spark History Server to your *localhost:18080*. This can be [configured in *.aztk/ssh.yaml*](../docs/13-configuration.html#sshyaml).
Expand Down
12 changes: 6 additions & 6 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# Distribution
azure-batch==4.1.3
azure-mgmt-batch==5.0.0
azure-mgmt-storage==1.5.0
azure-storage-blob==1.1.0
pyyaml==3.12
azure-batch~=4.1.3
azure-mgmt-batch~=5.0.0
azure-mgmt-storage~=2.0.0
azure-storage-blob~=1.3.1
pyyaml>=3.12
pycryptodomex>=3.4
paramiko==2.4.0
paramiko>=2.4

# Development
yapf==0.22.0
Expand Down