Skip to content

Commit

Permalink
lock manager for job service (#135)
Browse files Browse the repository at this point in the history
* Update version to v0.2.24 (#132)

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

Co-authored-by: Khor Shu Heng <khor.heng@gojek.com>
Signed-off-by: KeshavSharma <keshav.sharma@gojek.com>

* distributed lock for starting/stopping streaming ingestion jobs

Signed-off-by: KeshavSharma <keshav.sharma@gojek.com>

* unit tests for lock manager

Signed-off-by: KeshavSharma <keshav.sharma@gojek.com>

* Perform data type conversion automatically (#133)

* Perform data type conversion automatically

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

* Use wheel installation for local setup to avoid module not found issue

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

Co-authored-by: Khor Shu Heng <khor.heng@gojek.com>
Signed-off-by: KeshavSharma <keshav.sharma@gojek.com>

* format python files

Signed-off-by: KeshavSharma <keshav.sharma@gojek.com>

* install feast-spark from setup.py

Signed-off-by: KeshavSharma <keshav.sharma@gojek.com>

Co-authored-by: Khor Shu Heng <32997938+khorshuheng@users.noreply.github.com>
Co-authored-by: Khor Shu Heng <khor.heng@gojek.com>
Co-authored-by: KeshavSharma <keshav.sharma@gojek.com>
  • Loading branch information
4 people committed Apr 12, 2022
1 parent 7d3aa9d commit 6499fb2
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 18 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ install-python-ci-dependencies:

# Supports feast-dev repo master branch
install-python: install-python-ci-dependencies
pip install --user --upgrade setuptools wheel grpcio-tools mypy-protobuf
cd ${ROOT_DIR}/python; rm -rf dist; python setup.py bdist_wheel; pip install --find-links=dist feast-spark
pip install --user --upgrade setuptools wheel
cd ${ROOT_DIR}/python; rm -rf dist; python setup.py install

lint-python:
cd ${ROOT_DIR}/python ; mypy feast_spark/ tests/
Expand Down
9 changes: 9 additions & 0 deletions python/feast_spark/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,15 @@ class ConfigOptions(metaclass=ConfigMeta):
#: Default Redis port to Redis Instance which stores Spark Ingestion Job metrics
SPARK_METRICS_REDIS_PORT: Optional[str] = None

#: Host to Redis Instance which stores locks for job management
LOCK_MGR_REDIS_HOST: Optional[str] = None

#: Port to Redis Instance which stores locks for job management
LOCK_MGR_REDIS_PORT: Optional[str] = None

#: TTL for locks for job management
LOCK_EXPIRY: Optional[str] = "60"

#: File format of historical retrieval features
HISTORICAL_FEATURE_OUTPUT_FORMAT: str = "parquet"

Expand Down
31 changes: 25 additions & 6 deletions python/feast_spark/job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
UnscheduleOfflineToOnlineIngestionJobResponse,
)
from feast_spark.constants import ConfigOptions as opt
from feast_spark.lock_manager import JobOperation, JobOperationLock
from feast_spark.metrics import (
job_schedule_count,
job_submission_count,
Expand Down Expand Up @@ -548,14 +549,28 @@ def ensure_stream_ingestion_jobs(client: Client, all_projects: bool):
f"expected_job_hashes = {sorted(list(expected_job_hashes))}"
)

lock_config = {
"redis_host": client.config.get(opt.LOCK_MGR_REDIS_HOST),
"redis_port": client.config.getint(opt.LOCK_MGR_REDIS_PORT),
"lock_expiry": client.config.getint(opt.LOCK_EXPIRY),
}

for job_hash in job_hashes_to_start:
# Any job that we wish to start should be among expected table refs map
project, feature_table = expected_job_hash_to_tables[job_hash]
logger.warning(
f"Starting a stream ingestion job for project={project}, "
f"table_name={feature_table.name} with job_hash={job_hash}"
)
client.start_stream_to_online_ingestion(feature_table, [], project=project)

# start the job if lock is available
with JobOperationLock(
job_hash=job_hash, operation=JobOperation.START, **lock_config
) as lock:
if lock:
logger.warning(
f"Starting a stream ingestion job for project={project}, "
f"table_name={feature_table.name} with job_hash={job_hash}"
)
client.start_stream_to_online_ingestion(
feature_table, [], project=project
)

# prevent scheduler from peak load
time.sleep(client.config.getint(opt.JOB_SERVICE_PAUSE_BETWEEN_JOBS))
Expand All @@ -572,6 +587,10 @@ def ensure_stream_ingestion_jobs(client: Client, all_projects: bool):
f"Cancelling a stream ingestion job with job_hash={job_hash} job_id={job.get_id()} status={job.get_status()}"
)
try:
job.cancel()
with JobOperationLock(
job_hash=job_hash, operation=JobOperation.CANCEL, **lock_config
) as lock:
if lock:
job.cancel()
except FailedPrecondition as exc:
logger.error(f"Job canceling failed with exception {exc}")
115 changes: 115 additions & 0 deletions python/feast_spark/lock_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
"""
Classes to manage distributed locks
"""
import enum
import logging
import secrets
import time

import redis
from redis.exceptions import ConnectionError

# retries for acquiring lock
LOCK_ACQUIRE_RETRIES = 3
# wait between retries
LOCK_ACQUIRE_WAIT = 1
LOCK_KEY_PREFIX = "lock"

logger = logging.getLogger(__name__)


class JobOperation(enum.Enum):
"""
Enum for job operations
"""

START = "st"
CANCEL = "cn"


class JobOperationLock:
"""
Lock for starting and cancelling spark ingestion jobs.
Implemented as a context manager to automatically release lock after operation.
Usage:
with JobOperationLock(job_hash, <start/cancel>):
client.start_stream_to_online_ingestion(feature_table, [], project=project)
"""

def __init__(
self,
redis_host: str,
redis_port: int,
lock_expiry: int,
job_hash: str,
operation: JobOperation = JobOperation.START,
):
"""
Init method, initialized redis key for the lock
Args:
redis_host: host to redis instance to store locks
redis_port: port to redis instance to store locks
lock_expiry: time in seconds for auto releasing lock
job_hash: job hash string for the job which needs to be operated upon
operation: operation to be performed <START/CANCEL>
"""
self._redis = redis.Redis(host=redis_host, port=redis_port)
self._lock_expiry = lock_expiry
self._lock_key = f"{LOCK_KEY_PREFIX}_{operation.value}_{job_hash}"
self._lock_value = secrets.token_hex(nbytes=8)

def __enter__(self):
"""
Context manager method for setup - acquire lock
lock_key is a combination of a prefix, job hash and operation(start/cancel)
lock_value is a randomly generated 8 byte hexadecimal, this is to ensure
that lock can be deleted only by the agent who created it
NX option is used only set the key if it does not already exist,
this will ensure that locks are not overwritten
EX option is used to set the specified expire time to release the lock automatically after TTL
"""
# Retry acquiring lock on connection failures
retry_attempts = 0
while retry_attempts < LOCK_ACQUIRE_RETRIES:
try:
if self._redis.set(
name=self._lock_key,
value=self._lock_value,
nx=True,
ex=self._lock_expiry,
):
return self._lock_value
else:
logger.info(f"lock not available: {self._lock_key}")
return False
except ConnectionError:
# wait before attempting to retry
logger.warning(
f"connection error while acquiring lock: {self._lock_key}"
)
time.sleep(LOCK_ACQUIRE_WAIT)
retry_attempts += 1
logger.warning(f"Can't acquire lock, backing off: {self._lock_key}")
return False

def __exit__(self, *args, **kwargs):
"""
context manager method for teardown - release lock
safe release - delete lock key only if value exists and is same as set by this object
otherwise rely on auto-release on expiry
"""
try:
lock_value = self._redis.get(self._lock_key)
if lock_value and lock_value.decode() == self._lock_value:
self._redis.delete(self._lock_key)
except ConnectionError:
logger.warning(
f"connection error while deleting lock: {self._lock_key}."
f"rely on auto-release after {self._lock_expiry} seconds"
)
71 changes: 71 additions & 0 deletions python/tests/test_lock_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from unittest.mock import patch

import pytest

from feast_spark.lock_manager import JobOperation, JobOperationLock

job_hash = "dummy_hash"


class MockRedis:
def __init__(self, cache=dict()):
self.cache = cache

def get(self, name):
if name in self.cache:
return self.cache[name]
return None

def set(self, name, value, *args, **kwargs):
if name not in self.cache:
self.cache[name] = value.encode("utf-8")
return "OK"

def delete(self, name):
if name in self.cache:
self.cache.pop(name)
return None


@pytest.fixture
def lock_config():
return {"redis_host": "localhost", "redis_port": 0, "lock_expiry": 5}


@patch("redis.Redis")
def test_lock_manager_context(mock_redis, lock_config):
mock_redis_connection = MockRedis()
mock_redis.return_value = mock_redis_connection
with JobOperationLock(
job_hash=job_hash, operation=JobOperation.START, **lock_config
) as lock:
# test lock acquired
assert lock
# verify lock key in cache
assert (
f"lock_{JobOperation.START.value}_{job_hash}" in mock_redis_connection.cache
)
# verify release
assert (
f"lock_{JobOperation.START.value}_{job_hash}" not in mock_redis_connection.cache
)


@patch("redis.Redis")
def test_lock_manager_lock_not_available(mock_redis, lock_config):
cache = {"lock_st_dummy_hash": b"127a32aaf729dc87"}
mock_redis_connection = MockRedis(cache)
mock_redis.return_value = mock_redis_connection
with JobOperationLock(
job_hash=job_hash, operation=JobOperation.START, **lock_config
) as lock:
# test lock not acquired
assert not lock


def test_lock_manager_connection_error(lock_config):
with JobOperationLock(
job_hash=job_hash, operation=JobOperation.START, **lock_config
) as lock:
# test lock not acquired
assert not lock
35 changes: 25 additions & 10 deletions python/tests/test_streaming_job_scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import hashlib
import json
from datetime import datetime
from unittest.mock import Mock
from unittest.mock import Mock, patch

import pytest

Expand All @@ -21,7 +21,11 @@
def feast_client():
c = FeastClient(
job_service_pause_between_jobs=0,
options={"whitelisted_projects": "default,ride"},
options={
"whitelisted_projects": "default,ride",
"lock_mgr_redis_host": "localhost",
"lock_mgr_redis_port": "0",
},
)
c.list_projects = Mock(return_value=["default", "ride", "invalid_project"])
c.list_feature_tables = Mock()
Expand Down Expand Up @@ -88,7 +92,8 @@ def get_start_time(self) -> datetime:
pass


def test_new_job_creation(spark_client, feature_table):
@patch("redis.Redis")
def test_new_job_creation(mock_redis, spark_client, feature_table):
""" No job existed prior to call """

spark_client.feature_store.list_feature_tables.return_value = [feature_table]
Expand Down Expand Up @@ -118,7 +123,8 @@ def test_no_changes(spark_client, feature_table):
spark_client.start_stream_to_online_ingestion.assert_not_called()


def test_update_existing_job(spark_client, feature_table):
@patch("redis.Redis")
def test_update_existing_job(mock_redis, spark_client, feature_table):
""" Feature Table spec was updated """

new_ft = copy.deepcopy(feature_table)
Expand All @@ -136,7 +142,8 @@ def test_update_existing_job(spark_client, feature_table):
assert spark_client.start_stream_to_online_ingestion.call_count == 2


def test_not_cancelling_starting_job(spark_client, feature_table):
@patch("redis.Redis")
def test_not_cancelling_starting_job(mock_redis, spark_client, feature_table):
""" Feature Table spec was updated but previous version is still starting """

new_ft = copy.deepcopy(feature_table)
Expand All @@ -154,7 +161,8 @@ def test_not_cancelling_starting_job(spark_client, feature_table):
assert spark_client.start_stream_to_online_ingestion.call_count == 2


def test_not_retrying_failed_job(spark_client, feature_table):
@patch("redis.Redis")
def test_not_retrying_failed_job(mock_redis, spark_client, feature_table):
""" Job has failed on previous try """

job = SimpleStreamingIngestionJob(
Expand All @@ -173,7 +181,8 @@ def test_not_retrying_failed_job(spark_client, feature_table):
)


def test_restarting_completed_job(spark_client, feature_table):
@patch("redis.Redis")
def test_restarting_completed_job(mock_redis, spark_client, feature_table):
""" Job has succesfully finished on previous try """
job = SimpleStreamingIngestionJob(
"", "default", feature_table, SparkJobStatus.COMPLETED
Expand All @@ -187,7 +196,8 @@ def test_restarting_completed_job(spark_client, feature_table):
assert spark_client.start_stream_to_online_ingestion.call_count == 2


def test_stopping_running_job(spark_client, feature_table):
@patch("redis.Redis")
def test_stopping_running_job(mock_redis, spark_client, feature_table):
""" Streaming source was deleted """
new_ft = copy.deepcopy(feature_table)
new_ft.stream_source = None
Expand All @@ -205,13 +215,18 @@ def test_stopping_running_job(spark_client, feature_table):
spark_client.start_stream_to_online_ingestion.assert_not_called()


def test_restarting_failed_jobs(feature_table):
@patch("redis.Redis")
def test_restarting_failed_jobs(mock_redis, feature_table):
""" If configured - restart failed jobs """

feast_client = FeastClient(
job_service_pause_between_jobs=0,
job_service_retry_failed_jobs=True,
options={"whitelisted_projects": "default,ride"},
options={
"whitelisted_projects": "default,ride",
"lock_mgr_redis_host": "localhost",
"lock_mgr_redis_port": "0",
},
)
feast_client.list_projects = Mock(return_value=["default"])
feast_client.list_feature_tables = Mock()
Expand Down

0 comments on commit 6499fb2

Please sign in to comment.