Skip to content

Commit

Permalink
Configurable templates by job type for spark kubernetes jobs (#99)
Browse files Browse the repository at this point in the history
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex committed Oct 11, 2021
1 parent e55a145 commit 3169830
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,10 @@ metadata:
data:
jobTemplate.yaml: |
{{- toYaml .Values.sparkOperator.jobTemplate | nindent 4 }}
batchJobTemplate.yaml: |
{{- toYaml .Values.sparkOperator.batchJobTemplate | nindent 4 }}
streamJobTemplate.yaml: |
{{- toYaml .Values.sparkOperator.streamJobTemplate | nindent 4 }}
historicalJobTemplate.yaml: |
{{- toYaml .Values.sparkOperator.historicalJobTemplate | nindent 4 }}
{{- end }}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ spec:
{{- if .Values.sparkOperator.enabled }}
- name: FEAST_SPARK_K8S_JOB_TEMPLATE_PATH
value: /etc/configs/jobTemplate.yaml
- name: SPARK_K8S_BATCH_INGESTION_TEMPLATE_PATH
value: /etc/configs/batchJobTemplate.yaml
- name: SPARK_K8S_STREAM_INGESTION_TEMPLATE_PATH
value: /etc/configs/streamJobTemplate.yaml
- name: SPARK_K8S_HISTORICAL_RETRIEVAL_TEMPLATE_PATH
value: /etc/configs/historicalJobTemplate.yaml
{{- end }}
{{- range $key, $value := .Values.envOverrides }}
- name: {{ printf "%s" $key | replace "." "_" | upper | quote }}
Expand Down
4 changes: 4 additions & 0 deletions infra/charts/feast-spark/charts/feast-jobservice/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ sparkOperator:
enabled: false
# sparkOperator.jobTemplate -- Content of the job template, in yaml format
jobTemplate: {}
# specialized job templates by job types
batchJobTemplate: {}
streamJobTemplate: {}
historicalJobTemplate: {}

prometheus:
# prometheus.enabled -- Flag to enable scraping of metrics
Expand Down
9 changes: 9 additions & 0 deletions python/feast_spark/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ class ConfigOptions(metaclass=ConfigMeta):
# SparkApplication resource template
SPARK_K8S_JOB_TEMPLATE_PATH = None

# SparkApplication resource template for Batch Ingestion Jobs
SPARK_K8S_BATCH_INGESTION_TEMPLATE_PATH: Optional[str] = ""

# SparkApplication resource template for Stream Ingestion Jobs
SPARK_K8S_STREAM_INGESTION_TEMPLATE_PATH: Optional[str] = ""

# SparkApplication resource template for Historical Retrieval Jobs
SPARK_K8S_HISTORICAL_RETRIEVAL_TEMPLATE_PATH: Optional[str] = ""

#: File format of historical retrieval features
HISTORICAL_FEATURE_OUTPUT_FORMAT: str = "parquet"

Expand Down
11 changes: 10 additions & 1 deletion python/feast_spark/pyspark/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,16 @@ def _k8s_launcher(config: Config) -> JobLauncher:

return k8s.KubernetesJobLauncher(
namespace=config.get(opt.SPARK_K8S_NAMESPACE),
resource_template_path=config.get(opt.SPARK_K8S_JOB_TEMPLATE_PATH, None),
generic_resource_template_path=config.get(opt.SPARK_K8S_JOB_TEMPLATE_PATH),
batch_ingestion_resource_template_path=config.get(
opt.SPARK_K8S_BATCH_INGESTION_TEMPLATE_PATH, None
),
stream_ingestion_resource_template_path=config.get(
opt.SPARK_K8S_STREAM_INGESTION_TEMPLATE_PATH, None
),
historical_retrieval_resource_template_path=config.get(
opt.SPARK_K8S_HISTORICAL_RETRIEVAL_TEMPLATE_PATH, None
),
staging_location=staging_location,
incluster=config.getboolean(opt.SPARK_K8S_USE_INCLUSTER_CONFIG),
staging_client=get_staging_client(staging_uri.scheme, config),
Expand Down
42 changes: 32 additions & 10 deletions python/feast_spark/pyspark/launchers/k8s/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@
)


def _load_resource_template(job_template_path: Path) -> Dict[str, Any]:
def _load_resource_template(job_template_path: Optional[str]) -> Dict[str, Any]:
if not job_template_path or not Path(job_template_path).exists():
return {}

with open(job_template_path, "rt") as f:
return yaml.safe_load(f)

Expand Down Expand Up @@ -189,7 +192,10 @@ def __init__(
namespace: str,
incluster: bool,
staging_location: str,
resource_template_path: Optional[Path],
generic_resource_template_path: Optional[str],
batch_ingestion_resource_template_path: Optional[str],
stream_ingestion_resource_template_path: Optional[str],
historical_retrieval_resource_template_path: Optional[str],
staging_client: AbstractStagingClient,
azure_account_name: str,
azure_account_key: str,
Expand All @@ -200,10 +206,26 @@ def __init__(
self._staging_client = staging_client
self._azure_account_name = azure_account_name
self._azure_account_key = azure_account_key
if resource_template_path is not None:
self._resource_template = _load_resource_template(resource_template_path)
else:
self._resource_template = yaml.safe_load(DEFAULT_JOB_TEMPLATE)

generic_template = _load_resource_template(
generic_resource_template_path
) or yaml.safe_load(DEFAULT_JOB_TEMPLATE)

self._batch_ingestion_template = (
_load_resource_template(batch_ingestion_resource_template_path)
or generic_template
)

self._stream_ingestion_template = (
_load_resource_template(stream_ingestion_resource_template_path)
or generic_template
)

self._historical_retrieval_template = (
_load_resource_template(historical_retrieval_resource_template_path)
or generic_template
)

self._scheduled_resource_template = yaml.safe_load(
DEFAULT_SCHEDULED_JOB_TEMPLATE
)
Expand Down Expand Up @@ -281,7 +303,7 @@ def historical_feature_retrieval(
job_id = _generate_job_id()

resource = _prepare_job_resource(
job_template=self._resource_template,
job_template=self._historical_retrieval_template,
job_id=job_id,
job_type=HISTORICAL_RETRIEVAL_JOB_TYPE,
main_application_file=pyspark_script_path,
Expand Down Expand Up @@ -341,7 +363,7 @@ def offline_to_online_ingestion(
job_id = _generate_job_id()

resource = _prepare_job_resource(
job_template=self._resource_template,
job_template=self._batch_ingestion_template,
job_id=job_id,
job_type=OFFLINE_TO_ONLINE_JOB_TYPE,
main_application_file=jar_s3_path,
Expand Down Expand Up @@ -394,7 +416,7 @@ def schedule_offline_to_online_ingestion(
scheduled_job_template=self._scheduled_resource_template,
scheduled_job_id=schedule_job_id,
job_schedule=ingestion_job_params.get_job_schedule(),
job_template=self._resource_template,
job_template=self._batch_ingestion_template,
job_type=OFFLINE_TO_ONLINE_JOB_TYPE,
main_application_file=jar_s3_path,
main_class=ingestion_job_params.get_class_name(),
Expand Down Expand Up @@ -454,7 +476,7 @@ def start_stream_to_online_ingestion(
job_id = _generate_job_id()

resource = _prepare_job_resource(
job_template=self._resource_template,
job_template=self._stream_ingestion_template,
job_id=job_id,
job_type=STREAM_TO_ONLINE_JOB_TYPE,
main_application_file=jar_s3_path,
Expand Down

0 comments on commit 3169830

Please sign in to comment.