Skip to content

Commit

Permalink
Append a random postfix to the app name (#133)
Browse files Browse the repository at this point in the history
  • Loading branch information
edingroot committed Oct 17, 2023
1 parent d247e46 commit 48cc792
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 25 deletions.
8 changes: 2 additions & 6 deletions service_configuration_lib/spark_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@
import logging
import math
import os
import random
import re
import string
import time
from typing import Any
from typing import Dict
Expand Down Expand Up @@ -1080,16 +1078,15 @@ def get_spark_conf(
)

spark_conf = {**(spark_opts_from_env or {}), **_filter_user_spark_opts(user_spark_opts)}
random_postfix = utils.get_random_string(4)

if aws_creds[2] is not None:
spark_conf['spark.hadoop.fs.s3a.aws.credentials.provider'] = AWS_ENV_CREDENTIALS_PROVIDER

# app_name from env is already appended port and time to make it unique
app_name = (spark_opts_from_env or {}).get('spark.app.name')
if not app_name:
# We want to make the app name more unique so that we can search it
# from history server.
app_name = f'{app_base_name}_{ui_port}_{int(time.time())}'
app_name = f'{app_base_name}_{ui_port}_{int(time.time())}_{random_postfix}'
is_jupyter = _is_jupyterhub_job(app_name)

# Explicitly setting app id: replace special characters to '_' to make it consistent
Expand All @@ -1099,7 +1096,6 @@ def get_spark_conf(
if is_jupyter:
raw_app_id = app_name
else:
random_postfix = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(4))
raw_app_id = f'{paasta_service}__{paasta_instance}__{random_postfix}'
app_id = re.sub(r'[\.,-]', '_', _get_k8s_resource_name_limit_size_with_hash(raw_app_id))

Expand Down
6 changes: 6 additions & 0 deletions service_configuration_lib/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import contextlib
import errno
import logging
import random
import string
from socket import error as SocketError
from socket import SO_REUSEADDR
from socket import socket
Expand Down Expand Up @@ -79,3 +81,7 @@ def ephemeral_port_reserve_range(preferred_port_start: int, preferred_port_end:
sock, _ = s.accept()
with contextlib.closing(sock):
return sockname[1]


def get_random_string(length: int) -> str:
return ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(length))
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

setup(
name='service-configuration-lib',
version='2.18.9',
version='2.18.10',
provides=['service_configuration_lib'],
description='Start, stop, and inspect Yelp SOA services',
url='https://github.com/Yelp/service_configuration_lib',
Expand Down
47 changes: 29 additions & 18 deletions tests/spark_config_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
TEST_ACCOUNT_ID = '123456789'
TEST_USER = 'UNIT_TEST_USER'

UI_PORT_RETURN_VALUE = 65432
EPHEMERAL_PORT_RETURN_VALUE = '12345'
TIME_RETURN_VALUE = 123.456
RANDOM_STRING_RETURN_VALUE = 'do1re2mi3fa4sol4'


@pytest.fixture
def mock_log(monkeypatch):
Expand All @@ -26,8 +31,14 @@ def mock_log(monkeypatch):

@pytest.fixture
def mock_time():
with mock.patch.object(spark_config.time, 'time', return_value=123.456):
yield 123.456
with mock.patch.object(spark_config.time, 'time', return_value=TIME_RETURN_VALUE):
yield TIME_RETURN_VALUE


@pytest.fixture
def mock_get_random_string():
with mock.patch.object(utils, 'get_random_string', return_value=RANDOM_STRING_RETURN_VALUE):
yield RANDOM_STRING_RETURN_VALUE


class TestGetAWSCredentials:
Expand Down Expand Up @@ -1083,11 +1094,10 @@ def test_convert_user_spark_opts_value_str(self):

@pytest.fixture
def mock_ephemeral_port_reserve_range(self):
port = '12345'
with mock.patch.object(utils, 'ephemeral_port_reserve_range', return_value=port):
yield port
with mock.patch.object(utils, 'ephemeral_port_reserve_range', return_value=EPHEMERAL_PORT_RETURN_VALUE):
yield EPHEMERAL_PORT_RETURN_VALUE

@pytest.fixture(params=[None, '23456'])
@pytest.fixture(params=[None, str(UI_PORT_RETURN_VALUE)])
def ui_port(self, request):
return request.param

Expand Down Expand Up @@ -1115,13 +1125,21 @@ def user_spark_opts(self, request):
return request.param

@pytest.fixture
def assert_app_name(self, spark_opts_from_env, user_spark_opts, ui_port, mock_ephemeral_port_reserve_range):
def assert_app_name(
self,
spark_opts_from_env,
user_spark_opts,
ui_port,
mock_ephemeral_port_reserve_range,
mock_get_random_string,
):
expected_output = (spark_opts_from_env or {}).get('spark.app.name')

if not expected_output:
expected_output = (
(user_spark_opts or {}).get('spark.app.name') or
self.spark_app_base_name
) + '_' + (ui_port or mock_ephemeral_port_reserve_range) + '_123'
base_name = (user_spark_opts or {}).get('spark.app.name') or self.spark_app_base_name
port = ui_port or mock_ephemeral_port_reserve_range
time_int = int(TIME_RETURN_VALUE)
expected_output = f'{base_name}_{port}_{time_int}_{mock_get_random_string}'

def verify(output):
key = 'spark.app.name'
Expand Down Expand Up @@ -1477,13 +1495,6 @@ def test_adjust_cpu_mem_ratio_thresh_non_regular_pool(
assert int(result_dict['spark.task.cpus']) == 1


def test_stringify_spark_env():
conf = {'spark.mesos.leader': '1234', 'spark.mesos.principal': 'spark'}
assert spark_config.stringify_spark_env(conf) == (
'--conf spark.mesos.leader=1234 --conf spark.mesos.principal=spark'
)


@pytest.mark.parametrize(
'memory_string,expected_output', [
('1g', 1024),
Expand Down

0 comments on commit 48cc792

Please sign in to comment.