Skip to content

Commit

Permalink
[DPE-2082] User-friendly output when no primary account is defined (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
deusebio committed Jun 21, 2023
1 parent c52dd37 commit 8ea6156
Show file tree
Hide file tree
Showing 10 changed files with 770 additions and 577 deletions.
1,017 changes: 534 additions & 483 deletions poetry.lock

Large diffs are not rendered by default.

33 changes: 25 additions & 8 deletions spark8t/cli/pyspark.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#!/usr/bin/env python3

import re
from argparse import Namespace
from logging import Logger
from typing import Optional

from spark8t.cli.params import (
Expand All @@ -10,19 +12,15 @@
get_kube_interface,
k8s_parser,
parse_arguments_with,
setup_logging,
spark_user_parser,
)
from spark8t.domain import ServiceAccount
from spark8t.exceptions import AccountNotFound, PrimaryAccountNotFound
from spark8t.services import K8sServiceAccountRegistry, SparkInterface
from spark8t.utils import setup_logging

if __name__ == "__main__":
args, extra_args = parse_arguments_with(
[add_logging_arguments, k8s_parser, spark_user_parser, add_config_arguments]
).parse_known_args()

logger = setup_logging(args.log_level, args.log_conf_file, "spark8t.cli.pyspark")

def main(args: Namespace, logger: Logger):
kube_interface = get_kube_interface(args)

registry = K8sServiceAccountRegistry(
Expand All @@ -38,10 +36,29 @@
)

if service_account is None:
raise ValueError("Service account provided does not exist.")
raise AccountNotFound(
args.username
) if args.username else PrimaryAccountNotFound()

SparkInterface(
service_account=service_account,
kube_interface=kube_interface,
defaults=defaults,
).pyspark_shell(args.conf, args.properties_file, extra_args)


if __name__ == "__main__":
args, extra_args = parse_arguments_with(
[add_logging_arguments, k8s_parser, spark_user_parser, add_config_arguments]
).parse_known_args()

logger = setup_logging(args.log_level, args.log_conf_file, "spark8t.cli.pyspark")

try:
main(args, logger)
exit(0)
except (AccountNotFound, PrimaryAccountNotFound) as e:
logger.error(str(e))
exit(1)
except Exception as e:
raise e
56 changes: 34 additions & 22 deletions spark8t/cli/service_account_registry.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#!/usr/bin/env python3

from argparse import ArgumentParser
from argparse import ArgumentParser, Namespace
from enum import Enum
from logging import Logger

from spark8t.cli.params import (
add_config_arguments,
Expand All @@ -12,12 +13,12 @@
spark_user_parser,
)
from spark8t.domain import PropertyFile, ServiceAccount
from spark8t.exceptions import NoAccountFound
from spark8t.exceptions import AccountNotFound, PrimaryAccountNotFound
from spark8t.services import K8sServiceAccountRegistry, parse_conf_overrides
from spark8t.utils import setup_logging


def build_service_account_from_args(args) -> ServiceAccount:
def build_service_account_from_args(args, registry) -> ServiceAccount:
return ServiceAccount(
name=args.username,
namespace=args.namespace,
Expand Down Expand Up @@ -95,15 +96,7 @@ def create_service_account_registry_parser(parser: ArgumentParser):
return parser


if __name__ == "__main__":
args = create_service_account_registry_parser(
ArgumentParser(description="Spark Client Setup")
).parse_args()

logger = setup_logging(
args.log_level, args.log_conf_file, "spark8t.cli.service_account_registry"
)

def main(args: Namespace, logger: Logger):
kube_interface = get_kube_interface(args)
context = args.context or kube_interface.context_name

Expand All @@ -112,7 +105,7 @@ def create_service_account_registry_parser(parser: ArgumentParser):
registry = K8sServiceAccountRegistry(kube_interface.with_context(context))

if args.action == Actions.CREATE:
service_account = build_service_account_from_args(args)
service_account = build_service_account_from_args(args, registry)
service_account.extra_confs = (
PropertyFile.read(args.properties_file)
if args.properties_file is not None
Expand All @@ -122,17 +115,17 @@ def create_service_account_registry_parser(parser: ArgumentParser):
registry.create(service_account)

elif args.action == Actions.DELETE:
user_id = build_service_account_from_args(args).id
user_id = build_service_account_from_args(args, registry).id
logger.info(user_id)
registry.delete(user_id)

elif args.action == Actions.ADD_CONFIG:
input_service_account = build_service_account_from_args(args)
input_service_account = build_service_account_from_args(args, registry)

service_account_in_registry = registry.get(input_service_account.id)

if service_account_in_registry is None:
raise NoAccountFound(input_service_account.id)
raise AccountNotFound(input_service_account.id)

account_configuration = (
service_account_in_registry.configurations
Expand All @@ -147,38 +140,38 @@ def create_service_account_registry_parser(parser: ArgumentParser):
registry.set_configurations(input_service_account.id, account_configuration)

elif args.action == Actions.REMOVE_CONFIG:
input_service_account = build_service_account_from_args(args)
input_service_account = build_service_account_from_args(args, registry)

service_account_in_registry = registry.get(input_service_account.id)

if service_account_in_registry is None:
raise NoAccountFound(input_service_account.id)
raise AccountNotFound(input_service_account.id)

registry.set_configurations(
input_service_account.id,
service_account_in_registry.configurations.remove(args.conf),
)

elif args.action == Actions.GET_CONFIG:
input_service_account = build_service_account_from_args(args)
input_service_account = build_service_account_from_args(args, registry)

maybe_service_account = registry.get(input_service_account.id)

if maybe_service_account is None:
raise NoAccountFound(input_service_account.id)
raise AccountNotFound(input_service_account.id)

maybe_service_account.configurations.log(logger.info)

elif args.action == Actions.CLEAR_CONFIG:
registry.set_configurations(
build_service_account_from_args(args).id, PropertyFile.empty()
build_service_account_from_args(args, registry).id, PropertyFile.empty()
)

elif args.action == Actions.PRIMARY:
maybe_service_account = registry.get_primary()

if maybe_service_account is None:
raise NoAccountFound()
raise PrimaryAccountNotFound()

logger.info(maybe_service_account.id)

Expand All @@ -187,3 +180,22 @@ def create_service_account_registry_parser(parser: ArgumentParser):
logger.info(
str.expandtabs(f"{service_account.id}\t{service_account.primary}")
)


if __name__ == "__main__":
args = create_service_account_registry_parser(
ArgumentParser(description="Spark Client Setup")
).parse_args()

logger = setup_logging(
args.log_level, args.log_conf_file, "spark8t.cli.service_account_registry"
)

try:
main(args, logger)
exit(0)
except (AccountNotFound, PrimaryAccountNotFound) as e:
logger.error(str(e))
exit(1)
except Exception as e:
raise e
35 changes: 26 additions & 9 deletions spark8t/cli/spark_shell.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#!/usr/bin/env python3

import re
from argparse import Namespace
from logging import Logger
from typing import Optional

from spark8t.cli.params import (
Expand All @@ -13,18 +15,12 @@
spark_user_parser,
)
from spark8t.domain import ServiceAccount
from spark8t.exceptions import AccountNotFound, PrimaryAccountNotFound
from spark8t.services import K8sServiceAccountRegistry, SparkInterface
from spark8t.utils import setup_logging

if __name__ == "__main__":
args, extra_args = parse_arguments_with(
[add_logging_arguments, k8s_parser, spark_user_parser, add_config_arguments]
).parse_known_args()

logger = setup_logging(
args.log_level, args.log_conf_file, "spark8t.cli.spark_shell"
)

def main(args: Namespace, logger: Logger):
kube_interface = get_kube_interface(args)

registry = K8sServiceAccountRegistry(
Expand All @@ -40,10 +36,31 @@
)

if service_account is None:
raise ValueError("Service account provided does not exist.")
raise AccountNotFound(
args.username
) if args.username else PrimaryAccountNotFound()

SparkInterface(
service_account=service_account,
kube_interface=kube_interface,
defaults=defaults,
).spark_shell(args.conf, args.properties_file, extra_args)


if __name__ == "__main__":
args, extra_args = parse_arguments_with(
[add_logging_arguments, k8s_parser, spark_user_parser, add_config_arguments]
).parse_known_args()

logger = setup_logging(
args.log_level, args.log_conf_file, "spark8t.cli.spark_shell"
)

try:
main(args, logger)
exit(0)
except (AccountNotFound, PrimaryAccountNotFound) as e:
logger.error(str(e))
exit(1)
except Exception as e:
raise e
47 changes: 32 additions & 15 deletions spark8t/cli/spark_submit.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#!/usr/bin/env python3

import re
from argparse import Namespace
from logging import Logger
from typing import Optional

from spark8t.cli.params import (
Expand All @@ -14,24 +16,12 @@
spark_user_parser,
)
from spark8t.domain import ServiceAccount
from spark8t.exceptions import AccountNotFound, PrimaryAccountNotFound
from spark8t.services import K8sServiceAccountRegistry, SparkInterface
from spark8t.utils import setup_logging

if __name__ == "__main__":
args, extra_args = parse_arguments_with(
[
add_logging_arguments,
k8s_parser,
spark_user_parser,
add_deploy_arguments,
add_config_arguments,
]
).parse_known_args()

logger = setup_logging(
args.log_level, args.log_conf_file, "spark8t.cli.spark_submit"
)

def main(args: Namespace, logger: Logger):
kube_interface = get_kube_interface(args)

registry = K8sServiceAccountRegistry(
Expand All @@ -47,10 +37,37 @@
)

if service_account is None:
raise ValueError("Service account provided does not exist.")
raise AccountNotFound(
args.username
) if args.username else PrimaryAccountNotFound()

SparkInterface(
service_account=service_account,
kube_interface=kube_interface,
defaults=defaults,
).spark_submit(args.deploy_mode, args.conf, args.properties_file, extra_args)


if __name__ == "__main__":
args, extra_args = parse_arguments_with(
[
add_logging_arguments,
k8s_parser,
spark_user_parser,
add_deploy_arguments,
add_config_arguments,
]
).parse_known_args()

logger = setup_logging(
args.log_level, args.log_conf_file, "spark8t.cli.spark_submit"
)

try:
main(args, logger)
exit(0)
except (AccountNotFound, PrimaryAccountNotFound) as e:
logger.error(str(e))
exit(1)
except Exception as e:
raise e
48 changes: 41 additions & 7 deletions spark8t/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,50 @@
class K8sClusterNotReachable(Exception):
"""Kubernetes cluster cannot be reached successfully by the client."""

def __init__(self, k8s_master: str):
self.k8s_master = k8s_master


class NoAccountFound(Exception):
pass
class ResourceNotFound(FileNotFoundError):
"""General Exception representing a general resource not found."""

def __init__(self, resource_name: str):
self.resource_name = resource_name

class FormatError(SyntaxError):
pass

class K8sResourceNotFound(ResourceNotFound):
"""Requested resource in K8s cannot be found."""

class NoResourceFound(FileNotFoundError):
def __init__(self, resource_name: str):
self.resource_name = resource_name
def __init__(self, resource_name: str, resource_type: str):
super().__init__(resource_name)
self.resource_type = resource_type


class PrimaryAccountNotFound(ResourceNotFound):
"""Requested primary account cannot be fetched as there exists no account labeled as primary."""

def __init__(self):
super().__init__("primary")

def __str__(self) -> str:
return "Primary account not found. Please create or tag an account as primary."


class AccountNotFound(ResourceNotFound):
"""Requested Spark account that does not exist."""

def __init__(self, account: str):
super().__init__(account)

@property
def account(self):
return self.resource_name

def __str__(self) -> str:
return f"Account {self.account} not found"


class FormatError(SyntaxError):
"""Exception to be used when input provided by the user cannot be parsed."""

pass
Loading

0 comments on commit 8ea6156

Please sign in to comment.