Skip to content

Commit

Permalink
Support for Schedule Spark Application (#59)
Browse files Browse the repository at this point in the history
* Support for Schedule Spark Application

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

* Remove the need of returning job as response

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

* Add e2e test

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

* Fix flag typo in spark ingestion job

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

* Fix linting

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

* Fix kubernetes api calls

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

* Add verification for e2e test

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

* Fix image tag for jobservice

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

* Fix schedule sparkapplication namespace

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

* Fix unscheduling

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

* Remove unused argument

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

* Remove schedule job id from label, add truncation

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

* formatting

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

Co-authored-by: Khor Shu Heng <khor.heng@gojek.com>
  • Loading branch information
khorshuheng and khorshuheng committed Apr 29, 2021
1 parent bb0fd44 commit 94b7446
Show file tree
Hide file tree
Showing 19 changed files with 523 additions and 6 deletions.
2 changes: 2 additions & 0 deletions infra/scripts/helm/k8s-jobservice.tpl.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
feast-jobservice:
image:
tag: ${IMAGE_TAG}
envOverrides:
FEAST_CORE_URL: feast-release-feast-core:6565
FEAST_SPARK_LAUNCHER: k8s
Expand Down
2 changes: 1 addition & 1 deletion infra/scripts/test-end-to-end-aws.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ PYTHONPATH=sdk/python pytest tests/e2e/ \
--redis-url $NODE_IP:32379 \
--emr-region us-west-2 \
--kafka-brokers $NODE_IP:30092 \
-m "not bq"
-m "not bq and not k8s"
2 changes: 1 addition & 1 deletion infra/scripts/test-end-to-end-gcp.sh
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ CMD=$(printf '%s' \
"--core-url feast-release-feast-core:6565 " \
"--serving-url feast-release-feast-online-serving:6566 " \
"--job-service-url js-feast-jobservice:6568 " \
"--kafka-brokers 10.128.0.103:9094 --bq-project kf-feast --feast-version dev")
"--kafka-brokers 10.128.0.103:9094 --bq-project kf-feast --feast-version dev -m \"not k8s\"")

# Delete old test running pod if it exists
kubectl delete pod -n "$NAMESPACE" ci-test-runner 2>/dev/null || true
Expand Down
2 changes: 1 addition & 1 deletion infra/scripts/test-end-to-end-local.sh
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,4 @@ PYTHONPATH=sdk/python pytest tests/e2e/ \
--staging-path s3a://feast-staging \
--redis-url sparkop-redis-master.sparkop.svc.cluster.local:6379 \
--kafka-brokers sparkop-kafka.sparkop.svc.cluster.local:9092 \
-m "not bq"
-m "not bq and not k8s"
1 change: 1 addition & 0 deletions infra/scripts/test-end-to-end-sparkop.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ CMD=$(printf '%s' \
"--core-url feast-release-feast-core:6565 " \
"--serving-url feast-release-feast-online-serving:6566 " \
"--job-service-url js-feast-jobservice:6568 " \
"--k8s-namespace sparkop-e2e " \
"--kafka-brokers feast-release-kafka-headless:9092 --bq-project kf-feast --feast-version dev")

# Delete old test running pod if it exists
Expand Down
29 changes: 29 additions & 0 deletions protos/feast_spark/api/JobService.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ service JobService {
// Start job to ingest data from offline store into online store
rpc StartOfflineToOnlineIngestionJob (StartOfflineToOnlineIngestionJobRequest) returns (StartOfflineToOnlineIngestionJobResponse);

// Start scheduled job to ingest data from offline store into online store
rpc ScheduleOfflineToOnlineIngestionJob (ScheduleOfflineToOnlineIngestionJobRequest) returns (ScheduleOfflineToOnlineIngestionJobResponse);

// Unschedule job to ingest data from offline store into online store
rpc UnscheduleOfflineToOnlineIngestionJob(UnscheduleOfflineToOnlineIngestionJobRequest) returns (UnscheduleOfflineToOnlineIngestionJobResponse);

// Produce a training dataset, return a job id that will provide a file reference
rpc GetHistoricalFeatures (GetHistoricalFeaturesRequest) returns (GetHistoricalFeaturesResponse);

Expand Down Expand Up @@ -127,6 +133,29 @@ message StartOfflineToOnlineIngestionJobResponse {
string log_uri = 4;
}

message ScheduleOfflineToOnlineIngestionJobRequest {
// Feature table to ingest
string project = 1;
string table_name = 2;


// Timespan of the ingested data per job, in days. The data from end of the day - timespan till end of the day will be ingested. Eg. if the job execution date is 10/4/2021, and ingestion timespan is 2, then data from 9/4/2021 00:00 to 10/4/2021 23:59 (inclusive) will be ingested.
int32 ingestion_timespan = 3;

// Crontab string. Eg. 0 13 * * *
string cron_schedule = 4;

}

message ScheduleOfflineToOnlineIngestionJobResponse {}

message UnscheduleOfflineToOnlineIngestionJobRequest {
string project = 1;
string table_name = 2;
}

message UnscheduleOfflineToOnlineIngestionJobResponse {}

message GetHistoricalFeaturesRequest {
// List of feature references that are being retrieved
repeated string feature_refs = 1;
Expand Down
60 changes: 60 additions & 0 deletions python/feast_spark/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import List, Optional, Union, cast

import pandas as pd
from croniter import croniter

import feast
from feast.config import Config
Expand All @@ -21,19 +22,23 @@
GetHistoricalFeaturesRequest,
GetJobRequest,
ListJobsRequest,
ScheduleOfflineToOnlineIngestionJobRequest,
StartOfflineToOnlineIngestionJobRequest,
StartStreamToOnlineIngestionJobRequest,
UnscheduleOfflineToOnlineIngestionJobRequest,
)
from feast_spark.api.JobService_pb2_grpc import JobServiceStub
from feast_spark.constants import ConfigOptions as opt
from feast_spark.pyspark.abc import RetrievalJob, SparkJob
from feast_spark.pyspark.launcher import (
get_job_by_id,
list_jobs,
schedule_offline_to_online_ingestion,
start_historical_feature_retrieval_job,
start_historical_feature_retrieval_spark_session,
start_offline_to_online_ingestion,
start_stream_to_online_ingestion,
unschedule_offline_to_online_ingestion,
)
from feast_spark.remote_job import (
RemoteBatchIngestionJob,
Expand Down Expand Up @@ -305,6 +310,61 @@ def start_offline_to_online_ingestion(
response.log_uri,
)

def schedule_offline_to_online_ingestion(
self,
feature_table: feast.FeatureTable,
ingestion_timespan: int,
cron_schedule: str,
):
"""
Launch Scheduled Ingestion Job from Batch Source to Online Store for given feature table
Args:
feature_table: FeatureTable that will be ingested into the online store
ingestion_timespan: Days of data which will be ingestion per job. The boundaries
on which to filter the source are [end of day of execution date - ingestion_timespan (days) ,
end of day of execution date)
cron_schedule: Cron schedule expression
Returns: Spark Job Proxy object
"""
if not croniter.is_valid(cron_schedule):
raise RuntimeError(f"{cron_schedule} is not a valid cron expression")
if not self._use_job_service:
schedule_offline_to_online_ingestion(
client=self,
project=self._feast.project,
feature_table=feature_table,
ingestion_timespan=ingestion_timespan,
cron_schedule=cron_schedule,
)
else:
request = ScheduleOfflineToOnlineIngestionJobRequest(
project=self._feast.project,
table_name=feature_table.name,
ingestion_timespan=ingestion_timespan,
cron_schedule=cron_schedule,
)
self._job_service.ScheduleOfflineToOnlineIngestionJob(request)

def unschedule_offline_to_online_ingestion(
self, feature_table: feast.FeatureTable, project=None
):
feature_table_project = self._feast.project if project is None else project

if not self._use_job_service:
unschedule_offline_to_online_ingestion(
client=self,
project=feature_table_project,
feature_table=feature_table.name,
)
else:
request = UnscheduleOfflineToOnlineIngestionJobRequest(
project=feature_table_project, table_name=feature_table.name,
)

self._job_service.UnscheduleOfflineToOnlineIngestionJob(request)

def start_stream_to_online_ingestion(
self,
feature_table: feast.FeatureTable,
Expand Down
34 changes: 34 additions & 0 deletions python/feast_spark/job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,14 @@
JobStatus,
JobType,
ListJobsResponse,
ScheduleOfflineToOnlineIngestionJobRequest,
ScheduleOfflineToOnlineIngestionJobResponse,
StartOfflineToOnlineIngestionJobRequest,
StartOfflineToOnlineIngestionJobResponse,
StartStreamToOnlineIngestionJobRequest,
StartStreamToOnlineIngestionJobResponse,
UnscheduleOfflineToOnlineIngestionJobRequest,
UnscheduleOfflineToOnlineIngestionJobResponse,
)
from feast_spark.constants import ConfigOptions as opt
from feast_spark.pyspark.abc import (
Expand All @@ -45,9 +49,11 @@
get_job_by_id,
get_stream_to_online_ingestion_params,
list_jobs,
schedule_offline_to_online_ingestion,
start_historical_feature_retrieval_job,
start_offline_to_online_ingestion,
start_stream_to_online_ingestion,
unschedule_offline_to_online_ingestion,
)
from feast_spark.third_party.grpc.health.v1.HealthService_pb2 import (
HealthCheckResponse,
Expand Down Expand Up @@ -142,6 +148,34 @@ def StartOfflineToOnlineIngestionJob(
log_uri=job.get_log_uri(), # type: ignore
)

def ScheduleOfflineToOnlineIngestionJob(
self, request: ScheduleOfflineToOnlineIngestionJobRequest, context
):
"""Schedule job to ingest data from offline store into online store periodically"""
feature_table = self.client.feature_store.get_feature_table(
request.table_name, request.project
)
schedule_offline_to_online_ingestion(
client=self.client,
project=request.project,
feature_table=feature_table,
ingestion_timespan=request.ingestion_timespan,
cron_schedule=request.cron_schedule,
)

return ScheduleOfflineToOnlineIngestionJobResponse()

def UnscheduleOfflineToOnlineIngestionJob(
self, request: UnscheduleOfflineToOnlineIngestionJobRequest, context
):
feature_table = self.client.feature_store.get_feature_table(
request.table_name, request.project
)
unschedule_offline_to_online_ingestion(
client=self.client, project=request.project, feature_table=feature_table,
)
return UnscheduleOfflineToOnlineIngestionJobResponse()

def GetHistoricalFeatures(self, request: GetHistoricalFeaturesRequest, context):
"""Produce a training dataset, return a job id that will provide a file reference"""

Expand Down
78 changes: 78 additions & 0 deletions python/feast_spark/pyspark/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class SparkJobType(Enum):
HISTORICAL_RETRIEVAL = 0
BATCH_INGESTION = 1
STREAM_INGESTION = 2
SCHEDULED_BATCH_INGESTION = 3

def to_pascal_case(self):
return self.name.title().replace("_", "")
Expand Down Expand Up @@ -483,6 +484,60 @@ def get_arguments(self) -> List[str]:
]


class ScheduledBatchIngestionJobParameters(IngestionJobParameters):
def __init__(
self,
feature_table: Dict,
source: Dict,
ingestion_timespan: int,
cron_schedule: str,
jar: str,
redis_host: Optional[str],
redis_port: Optional[int],
redis_ssl: Optional[bool],
bigtable_project: Optional[str],
bigtable_instance: Optional[str],
cassandra_host: Optional[str] = None,
cassandra_port: Optional[int] = None,
statsd_host: Optional[str] = None,
statsd_port: Optional[int] = None,
deadletter_path: Optional[str] = None,
stencil_url: Optional[str] = None,
):
super().__init__(
feature_table,
source,
jar,
redis_host,
redis_port,
redis_ssl,
bigtable_project,
bigtable_instance,
cassandra_host,
cassandra_port,
statsd_host,
statsd_port,
deadletter_path,
stencil_url,
)
self._ingestion_timespan = ingestion_timespan
self._cron_schedule = cron_schedule

def get_name(self) -> str:
return f"{self.get_job_type().to_pascal_case()}-{self.get_feature_table_name()}"

def get_job_type(self) -> SparkJobType:
return SparkJobType.SCHEDULED_BATCH_INGESTION

def get_arguments(self) -> List[str]:
return super().get_arguments() + [
"--mode",
"offline",
"--ingestion-timespan",
str(self._ingestion_timespan),
]


class StreamIngestionJobParameters(IngestionJobParameters):
def __init__(
self,
Expand Down Expand Up @@ -636,6 +691,29 @@ def offline_to_online_ingestion(
"""
raise NotImplementedError

@abc.abstractmethod
def schedule_offline_to_online_ingestion(
self, ingestion_job_params: ScheduledBatchIngestionJobParameters
):
"""
Submits a scheduled batch ingestion job to a Spark cluster.
Raises:
SparkJobFailure: The spark job submission failed, encountered error
during execution, or timeout.
Returns:
ScheduledBatchIngestionJob: wrapper around remote job that can be used to check when job completed.
"""
raise NotImplementedError

@abc.abstractmethod
def unschedule_offline_to_online_ingestion(self, project: str, feature_table: str):
"""
Unschedule a scheduled batch ingestion job.
"""
raise NotImplementedError

@abc.abstractmethod
def start_stream_to_online_ingestion(
self, ingestion_job_params: StreamIngestionJobParameters
Expand Down
49 changes: 49 additions & 0 deletions python/feast_spark/pyspark/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
JobLauncher,
RetrievalJob,
RetrievalJobParameters,
ScheduledBatchIngestionJobParameters,
SparkJob,
StreamIngestionJob,
StreamIngestionJobParameters,
Expand Down Expand Up @@ -293,6 +294,54 @@ def start_offline_to_online_ingestion(
)


def schedule_offline_to_online_ingestion(
client: "Client",
project: str,
feature_table: FeatureTable,
ingestion_timespan: int,
cron_schedule: str,
):

launcher = resolve_launcher(client.config)

launcher.schedule_offline_to_online_ingestion(
ScheduledBatchIngestionJobParameters(
jar=client.config.get(opt.SPARK_INGESTION_JAR),
source=_source_to_argument(feature_table.batch_source, client.config),
feature_table=_feature_table_to_argument(client, project, feature_table),
ingestion_timespan=ingestion_timespan,
cron_schedule=cron_schedule,
redis_host=client.config.get(opt.REDIS_HOST),
redis_port=bool(client.config.get(opt.REDIS_HOST))
and client.config.getint(opt.REDIS_PORT),
redis_ssl=client.config.getboolean(opt.REDIS_SSL),
bigtable_project=client.config.get(opt.BIGTABLE_PROJECT),
bigtable_instance=client.config.get(opt.BIGTABLE_INSTANCE),
cassandra_host=client.config.get(opt.CASSANDRA_HOST),
cassandra_port=bool(client.config.get(opt.CASSANDRA_HOST))
and client.config.getint(opt.CASSANDRA_PORT),
statsd_host=(
client.config.getboolean(opt.STATSD_ENABLED)
and client.config.get(opt.STATSD_HOST)
),
statsd_port=(
client.config.getboolean(opt.STATSD_ENABLED)
and client.config.getint(opt.STATSD_PORT)
),
deadletter_path=client.config.get(opt.DEADLETTER_PATH),
stencil_url=client.config.get(opt.STENCIL_URL),
)
)


def unschedule_offline_to_online_ingestion(
client: "Client", project: str, feature_table: FeatureTable,
):

launcher = resolve_launcher(client.config)
launcher.unschedule_offline_to_online_ingestion(project, feature_table.name)


def get_stream_to_online_ingestion_params(
client: "Client", project: str, feature_table: FeatureTable, extra_jars: List[str]
) -> StreamIngestionJobParameters:
Expand Down

0 comments on commit 94b7446

Please sign in to comment.