Skip to content

Commit

Permalink
Merge 222bde8 into fe8a9d2
Browse files Browse the repository at this point in the history
  • Loading branch information
lydian committed Oct 16, 2020
2 parents fe8a9d2 + 222bde8 commit 52bcb4c
Show file tree
Hide file tree
Showing 8 changed files with 686 additions and 1,146 deletions.
427 changes: 143 additions & 284 deletions paasta_tools/cli/cmds/spark_run.py

Large diffs are not rendered by default.

187 changes: 2 additions & 185 deletions paasta_tools/spark_tools.py
Original file line number Diff line number Diff line change
@@ -1,197 +1,14 @@
import logging
import os
import socket
import sys
from typing import Mapping
from typing import Optional
from typing import Tuple

import boto3
from boto3 import Session
from ruamel.yaml import YAML
from typing_extensions import TypedDict

from paasta_tools.clusterman import get_clusterman_metrics
from paasta_tools.utils import PaastaColors

AWS_CREDENTIALS_DIR = "/etc/boto_cfg/"
DEFAULT_SPARK_MESOS_SECRET_FILE = "/nail/etc/paasta_spark_secret"
DEFAULT_SPARK_RUN_CONFIG = "/nail/srv/configs/spark.yaml"
DEFAULT_SPARK_SERVICE = "spark"
clusterman_metrics, CLUSTERMAN_YAML_FILE_PATH = get_clusterman_metrics()
log = logging.getLogger(__name__)


class DockerVolumeDict(TypedDict):
hostPath: str
containerPath: str
mode: str


def _load_aws_credentials_from_yaml(yaml_file_path) -> Tuple[str, str, Optional[str]]:
with open(yaml_file_path, "r") as yaml_file:
try:
credentials_yaml = YAML().load(yaml_file.read())
except Exception as e:
print(
PaastaColors.red(
"Encountered %s when trying to parse AWS credentials yaml %s. "
"Suppressing further output to avoid leaking credentials."
% (type(e), yaml_file_path)
)
)
sys.exit(1)

return (
credentials_yaml["aws_access_key_id"],
credentials_yaml["aws_secret_access_key"],
credentials_yaml.get("aws_session_token", None),
)


def get_aws_credentials(
service: str = DEFAULT_SPARK_SERVICE,
no_aws_credentials: bool = False,
aws_credentials_yaml: Optional[str] = None,
profile_name: Optional[str] = None,
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
if no_aws_credentials:
return None, None, None
elif aws_credentials_yaml:
return _load_aws_credentials_from_yaml(aws_credentials_yaml)
elif service != DEFAULT_SPARK_SERVICE:
service_credentials_path = os.path.join(AWS_CREDENTIALS_DIR, f"{service}.yaml")
if os.path.exists(service_credentials_path):
return _load_aws_credentials_from_yaml(service_credentials_path)
else:
print(
PaastaColors.yellow(
"Did not find service AWS credentials at %s. Falling back to "
"user credentials." % (service_credentials_path)
)
)

creds = Session(profile_name=profile_name).get_credentials()
return (
creds.access_key,
creds.secret_key,
creds.token,
)


def get_default_event_log_dir(**kwargs) -> str:
if "access_key" not in kwargs or "secret_key" not in kwargs:
access_key, secret_key, session_token = get_aws_credentials(**kwargs)
else:
access_key, secret_key, session_token = (
kwargs["access_key"],
kwargs["secret_key"],
kwargs.get("session_token", None),
)
if access_key is None:
log.warning(
"Since no AWS credentials were provided, spark event logging "
"will be disabled"
)
return None

try:
with open(DEFAULT_SPARK_RUN_CONFIG) as fp:
spark_run_conf = YAML().load(fp.read())
except Exception as e:
log.warning(f"Failed to load {DEFAULT_SPARK_RUN_CONFIG}: {e}")
log.warning("Returning empty default configuration")
spark_run_conf = {}

try:
account_id = (
boto3.client(
"sts",
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
aws_session_token=session_token,
)
.get_caller_identity()
.get("Account")
)
except Exception as e:
log.warning("Failed to identify account ID, error: {}".format(str(e)))
return None

for conf in spark_run_conf.get("environments", {}).values():
if account_id == conf["account_id"]:
default_event_log_dir = conf["default_event_log_dir"]
print(f"default event logging at: {default_event_log_dir}")
return default_event_log_dir
return None


def load_mesos_secret_for_spark():
try:
with open(DEFAULT_SPARK_MESOS_SECRET_FILE, "r") as f:
return f.read()
except IOError as e:
print(
"Cannot load mesos secret from %s" % DEFAULT_SPARK_MESOS_SECRET_FILE,
file=sys.stderr,
)
raise e


def _calculate_memory_per_executor(spark_memory_string, memory_overhead):
# expected to be in format "dg" where d is an integer
base_memory_per_executor = 1024 * int(spark_memory_string[:-1])

# by default, spark adds an overhead of 10% of the executor memory, with
# a minimum of 384mb
if memory_overhead is None:
memory_overhead = max(384, int(0.1 * base_memory_per_executor))
else:
memory_overhead = int(memory_overhead)

return base_memory_per_executor + memory_overhead


def get_spark_resource_requirements(
spark_config_dict: Mapping[str, str], webui_url: str,
) -> Mapping[str, Tuple[str, int]]:
if not clusterman_metrics:
return {}
num_executors = int(spark_config_dict["spark.cores.max"]) / int(
spark_config_dict["spark.executor.cores"]
)
memory_per_executor = _calculate_memory_per_executor(
spark_config_dict["spark.executor.memory"],
spark_config_dict.get("spark.mesos.executor.memoryOverhead"),
)

desired_resources = {
"cpus": int(spark_config_dict["spark.cores.max"]),
"mem": memory_per_executor * num_executors,
# rough guess since spark does not collect this information
"disk": memory_per_executor * num_executors,
}
dimensions = {
"framework_name": spark_config_dict["spark.app.name"],
"webui_url": webui_url,
}
qualified_resources = {}
for resource, quantity in desired_resources.items():
qualified_resources[resource] = (
clusterman_metrics.generate_key_with_dimensions(
f"requested_{resource}", dimensions
),
desired_resources[resource],
)

return qualified_resources


def get_webui_url(port: int) -> str:
def get_webui_url(port: str) -> str:
return f"http://{socket.getfqdn()}:{port}"


def inject_spark_conf_str(original_docker_cmd, spark_conf_str):
def inject_spark_conf_str(original_docker_cmd: str, spark_conf_str: str) -> str:
for base_cmd in ("pyspark", "spark-shell", "spark-submit"):
if base_cmd in original_docker_cmd:
return original_docker_cmd.replace(
Expand Down
104 changes: 51 additions & 53 deletions paasta_tools/tron_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@
import yaml
from service_configuration_lib import read_extra_service_information
from service_configuration_lib import read_yaml_file
from service_configuration_lib.spark_config import get_k8s_spark_env
from service_configuration_lib.spark_config import get_mesos_spark_auth_env
from service_configuration_lib.spark_config import get_mesos_spark_env
from service_configuration_lib.spark_config import generate_clusterman_metrics_entries
from service_configuration_lib.spark_config import get_aws_credentials
from service_configuration_lib.spark_config import get_resources_requested
from service_configuration_lib.spark_config import get_spark_conf
from service_configuration_lib.spark_config import K8S_AUTH_FOLDER
from service_configuration_lib.spark_config import stringify_spark_env

Expand All @@ -41,8 +42,7 @@
except ImportError: # pragma: no cover (no libyaml-dev / pypy)
Dumper = yaml.SafeDumper # type: ignore

from paasta_tools.spark_tools import get_aws_credentials
from paasta_tools.spark_tools import get_default_event_log_dir
from paasta_tools.clusterman import get_clusterman_metrics
from paasta_tools.tron.client import TronClient
from paasta_tools.tron import tron_command_context
from paasta_tools.utils import DEFAULT_SOA_DIR
Expand All @@ -57,7 +57,6 @@
from paasta_tools.utils import NoDeploymentsAvailable
from paasta_tools.utils import time_cache
from paasta_tools.utils import filter_templates_from_config
from paasta_tools.spark_tools import get_spark_resource_requirements
from paasta_tools.spark_tools import get_webui_url
from paasta_tools.spark_tools import inject_spark_conf_str

Expand All @@ -79,6 +78,7 @@
)
MESOS_EXECUTOR_NAMES = ("paasta", "spark")
DEFAULT_AWS_REGION = "us-west-2"
clusterman_metrics, _ = get_clusterman_metrics()


class TronNotConfigured(Exception):
Expand Down Expand Up @@ -213,52 +213,42 @@ def __init__(
soa_dir=soa_dir,
)
self.job, self.action = decompose_instance(instance)
self.spark_ui_port = pick_spark_ui_port(service, instance)
# Indicate whether this config object is created for validation
self.for_validation = for_validation

def get_spark_config_dict(self):
spark_config_dict = getattr(self, "_spark_config_dict", None)
# cached the created dict, so that we don't need to process it multiple
# times, and having inconsistent result
if spark_config_dict is not None:
return spark_config_dict

if self.get_spark_cluster_manager() == "mesos":
spark_env = get_mesos_spark_env(
spark_app_name=f"tron_spark_{self.get_service()}_{self.get_instance()}",
spark_ui_port=self.spark_ui_port,
mesos_leader=(
f"zk://{load_system_paasta_config().get_zk_hosts()}"
if not self.for_validation
else "N/A"
),
paasta_cluster=self.get_spark_paasta_cluster(),
paasta_pool=self.get_spark_paasta_pool(),
paasta_service=self.get_service(),
paasta_instance=self.get_instance(),
docker_img=self.get_docker_url(),
volumes=[
f"{v['hostPath']}:{v['containerPath']}:{v['mode'].lower()}"
for v in self.get_volumes(load_system_paasta_config().get_volumes())
],
user_spark_opts=self.config_dict.get("spark_args", {}),
event_log_dir=get_default_event_log_dir(
service=self.get_service(),
aws_credentials_yaml=self.config_dict.get("aws_credentials_yaml"),
),
mesos_leader = mesos_leader = (
f"zk://{load_system_paasta_config().get_zk_hosts()}"
if not self.for_validation
else "N/A"
)
else:
spark_env = get_k8s_spark_env(
spark_app_name=f"tron_spark_{self.get_service()}_{self.get_instance()}",
spark_ui_port=self.spark_ui_port,
paasta_cluster=self.get_spark_paasta_cluster(),
paasta_service=self.get_service(),
paasta_instance=self.get_instance(),
paasta_pool=self.get_spark_paasta_pool(),
docker_img=self.get_docker_url(),
volumes=self.get_volumes(load_system_paasta_config().get_volumes()),
user_spark_opts=self.config_dict.get("spark_args", {}),
event_log_dir=get_default_event_log_dir(
service=self.get_service(),
aws_credentials_yaml=self.config_dict.get("aws_credentials_yaml"),
),
)
return spark_env
mesos_leader = None

aws_creds = get_aws_credentials(
aws_credentials_yaml=self.config_dict.get("aws_credentials_yaml")
)
self._spark_config_dict = get_spark_conf(
cluster_manager=self.get_spark_cluster_manager(),
spark_app_base_name=f"tron_spark_{self.get_service()}_{self.get_instance()}",
user_spark_opts=self.config_dict.get("spark_args", {}),
paasta_cluster=self.get_spark_paasta_cluster(),
paasta_pool=self.get_spark_paasta_pool(),
paasta_service=self.get_service(),
paasta_instance=self.get_instance(),
docker_img=self.get_docker_url(),
extra_volumes=self.get_volumes(load_system_paasta_config().get_volumes()),
mesos_leader=mesos_leader,
aws_creds=aws_creds,
)
return self._spark_config_dict

def get_job_name(self):
return self.job
Expand Down Expand Up @@ -308,18 +298,26 @@ def get_spark_cluster_manager(self):
def get_env(self):
env = super().get_env()
if self.get_executor() == "spark":
spark_config_dict = self.get_spark_config_dict()
env["EXECUTOR_CLUSTER"] = self.get_spark_paasta_cluster()
env["EXECUTOR_POOL"] = self.get_spark_paasta_pool()
env["SPARK_OPTS"] = stringify_spark_env(self.get_spark_config_dict())
env.update(get_mesos_spark_auth_env())
env["CLUSTERMAN_RESOURCES"] = json.dumps(
dict(
get_spark_resource_requirements(
spark_config_dict=self.get_spark_config_dict(),
webui_url=get_webui_url(self.spark_ui_port),
).values()
env["SPARK_OPTS"] = stringify_spark_env(spark_config_dict)
# The actual mesos secret will be decrypted and injected on mesos master when assigning
# tasks.
env["SPARK_MESOS_SECRET"] = "SHARED_SECRET(SPARK_MESOS_SECRET)"
env["CLUSTERMAN_RESOURCES"] = (
json.dumps(
generate_clusterman_metrics_entries(
clusterman_metrics,
get_resources_requested(spark_config_dict),
spark_config_dict["spark.app.name"],
get_webui_url(spark_config_dict["spark.ui.port"]),
)
)
if clusterman_metrics
else {}
)

if "AWS_ACCESS_KEY_ID" not in env or "AWS_SECRET_ACCESS_KEY" not in env:
try:
access_key, secret_key, session_token = get_aws_credentials(
Expand Down
2 changes: 1 addition & 1 deletion requirements-minimal.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ requests-cache >= 0.4.10,<= 0.5.0
retry
ruamel.yaml
sensu-plugin
service-configuration-lib >= 2.4.1
service-configuration-lib >= 2.5.0
signalfx
slackclient >= 1.2.1
sticht >= 1.1.0
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ rsa==3.4.2
ruamel.yaml==0.15.96
s3transfer==0.3.3
sensu-plugin==0.3.1
service-configuration-lib==2.4.7
service-configuration-lib==2.5.0
setuptools==39.0.1
signalfx==1.0.17
simplejson==3.10.0
Expand Down

0 comments on commit 52bcb4c

Please sign in to comment.