Skip to content

Commit

Permalink
feat: Add an experimental lambda-based materialization engine (#2923)
Browse files Browse the repository at this point in the history
* feat: Add an experimental lambda-based materialization engine

Signed-off-by: Achal Shah <achals@gmail.com>

* setup and teardown lambda func

Signed-off-by: Achal Shah <achals@gmail.com>

* actually get the test working correctly

Signed-off-by: Achal Shah <achals@gmail.com>

* actually get the test working correctly

Signed-off-by: Achal Shah <achals@gmail.com>

* parallelize with threads

Signed-off-by: Achal Shah <achals@gmail.com>

* super call

Signed-off-by: Achal Shah <achals@gmail.com>

* fix bugs

Signed-off-by: Achal Shah <achals@gmail.com>

* fix tests

Signed-off-by: Achal Shah <achals@gmail.com>

* fix tests

Signed-off-by: Achal Shah <achals@gmail.com>

* undo unintended changes

Signed-off-by: Achal Shah <achals@gmail.com>
  • Loading branch information
achals committed Jul 14, 2022
1 parent 054446c commit 6f79069
Show file tree
Hide file tree
Showing 12 changed files with 655 additions and 17 deletions.
12 changes: 10 additions & 2 deletions sdk/python/feast/infra/aws.py
Expand Up @@ -106,6 +106,15 @@ def update_infra(

self._deploy_feature_server(project, image_uri)

if self.batch_engine:
self.batch_engine.update(
project,
tables_to_delete,
tables_to_keep,
entities_to_delete,
entities_to_keep,
)

def _deploy_feature_server(self, project: str, image_uri: str):
_logger.info("Deploying feature server...")

Expand Down Expand Up @@ -198,8 +207,7 @@ def _deploy_feature_server(self, project: str, image_uri: str):
def teardown_infra(
self, project: str, tables: Sequence[FeatureView], entities: Sequence[Entity],
) -> None:
if self.online_store:
self.online_store.teardown(self.repo_config, tables, entities)
super(AwsProvider, self).teardown_infra(project, tables, entities)

if (
self.repo_config.feature_server is not None
Expand Down
25 changes: 25 additions & 0 deletions sdk/python/feast/infra/materialization/lambda/Dockerfile
@@ -0,0 +1,25 @@
FROM public.ecr.aws/lambda/python:3.9

RUN yum install -y git


# Copy app handler code
COPY sdk/python/feast/infra/materialization/lambda/app.py ${LAMBDA_TASK_ROOT}

# Copy necessary parts of the Feast codebase
COPY sdk/python sdk/python
COPY protos protos
COPY go go
COPY setup.py setup.py
COPY pyproject.toml pyproject.toml
COPY README.md README.md

# Install Feast for AWS with Lambda dependencies
# We need this mount thingy because setuptools_scm needs access to the
# git dir to infer the version of feast we're installing.
# https://github.com/pypa/setuptools_scm#usage-from-docker
# I think it also assumes that this dockerfile is being built from the root of the directory.
RUN --mount=source=.git,target=.git,type=bind pip3 install --no-cache-dir -e '.[aws,redis]'

# Set the CMD to your handler (could also be done as a parameter override outside of the Dockerfile)
CMD [ "app.handler" ]
11 changes: 11 additions & 0 deletions sdk/python/feast/infra/materialization/lambda/__init__.py
@@ -0,0 +1,11 @@
from .lambda_engine import (
LambdaMaterializationEngine,
LambdaMaterializationEngineConfig,
LambdaMaterializationJob,
)

__all__ = [
"LambdaMaterializationEngineConfig",
"LambdaMaterializationJob",
"LambdaMaterializationEngine",
]
82 changes: 82 additions & 0 deletions sdk/python/feast/infra/materialization/lambda/app.py
@@ -0,0 +1,82 @@
import base64
import json
import sys
import tempfile
import traceback
from pathlib import Path

import pyarrow.parquet as pq

from feast import FeatureStore
from feast.constants import FEATURE_STORE_YAML_ENV_NAME
from feast.infra.materialization.local_engine import DEFAULT_BATCH_SIZE
from feast.utils import _convert_arrow_to_proto, _run_pyarrow_field_mapping


def handler(event, context):
"""Provide an event that contains the following keys:
- operation: one of the operations in the operations dict below
- tableName: required for operations that interact with DynamoDB
- payload: a parameter to pass to the operation being performed
"""
print("Received event: " + json.dumps(event, indent=2), flush=True)

try:

config_base64 = event[FEATURE_STORE_YAML_ENV_NAME]

config_bytes = base64.b64decode(config_base64)

# Create a new unique directory for writing feature_store.yaml
repo_path = Path(tempfile.mkdtemp())

with open(repo_path / "feature_store.yaml", "wb") as f:
f.write(config_bytes)

# Initialize the feature store
store = FeatureStore(repo_path=str(repo_path.resolve()))

view_name = event["view_name"]
view_type = event["view_type"]
path = event["path"]

bucket = path[len("s3://") :].split("/", 1)[0]
key = path[len("s3://") :].split("/", 1)[1]
print(f"Inferred Bucket: `{bucket}` Key: `{key}`", flush=True)

if view_type == "batch":
# TODO: This probably needs to be become `store.get_batch_feature_view` at some point.
feature_view = store.get_feature_view(view_name)
else:
feature_view = store.get_stream_feature_view(view_name)

print(f"Got Feature View: `{feature_view}`", flush=True)

table = pq.read_table(path)
if feature_view.batch_source.field_mapping is not None:
table = _run_pyarrow_field_mapping(
table, feature_view.batch_source.field_mapping
)

join_key_to_value_type = {
entity.name: entity.dtype.to_value_type()
for entity in feature_view.entity_columns
}

written_rows = 0

for batch in table.to_batches(DEFAULT_BATCH_SIZE):
rows_to_write = _convert_arrow_to_proto(
batch, feature_view, join_key_to_value_type
)
store._provider.online_write_batch(
store.config, feature_view, rows_to_write, lambda x: None,
)
written_rows += len(rows_to_write)
return {"written_rows": written_rows}
except Exception as e:
print(f"Exception: {e}", flush=True)
print("Traceback:", flush=True)
print(traceback.format_exc(), flush=True)
sys.exit(1)
238 changes: 238 additions & 0 deletions sdk/python/feast/infra/materialization/lambda/lambda_engine.py
@@ -0,0 +1,238 @@
import base64
import json
import logging
from concurrent.futures import ThreadPoolExecutor, wait
from dataclasses import dataclass
from datetime import datetime
from typing import Callable, List, Literal, Optional, Sequence, Union

import boto3
from pydantic import StrictStr
from tqdm import tqdm

from feast.batch_feature_view import BatchFeatureView
from feast.constants import FEATURE_STORE_YAML_ENV_NAME
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.infra.materialization.batch_materialization_engine import (
BatchMaterializationEngine,
MaterializationJob,
MaterializationJobStatus,
MaterializationTask,
)
from feast.infra.offline_stores.offline_store import OfflineStore
from feast.infra.online_stores.online_store import OnlineStore
from feast.registry import BaseRegistry
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.stream_feature_view import StreamFeatureView
from feast.utils import _get_column_names
from feast.version import get_version

DEFAULT_BATCH_SIZE = 10_000

logger = logging.getLogger(__name__)


class LambdaMaterializationEngineConfig(FeastConfigBaseModel):
"""Batch Materialization Engine config for lambda based engine"""

type: Literal["lambda"] = "lambda"
""" Type selector"""

materialization_image: StrictStr
""" The URI of a container image in the Amazon ECR registry, which should be used for materialization. """

lambda_role: StrictStr
""" Role that should be used by the materialization lambda """


@dataclass
class LambdaMaterializationJob(MaterializationJob):
def __init__(self, job_id: str, status: MaterializationJobStatus) -> None:
super().__init__()
self._job_id: str = job_id
self._status = status
self._error = None

def status(self) -> MaterializationJobStatus:
return self._status

def error(self) -> Optional[BaseException]:
return self._error

def should_be_retried(self) -> bool:
return False

def job_id(self) -> str:
return self._job_id

def url(self) -> Optional[str]:
return None


class LambdaMaterializationEngine(BatchMaterializationEngine):
"""
WARNING: This engine should be considered "Alpha" functionality.
"""

def update(
self,
project: str,
views_to_delete: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
],
views_to_keep: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
):
# This should be setting up the lambda function.
r = self.lambda_client.create_function(
FunctionName=self.lambda_name,
PackageType="Image",
Role=self.repo_config.batch_engine.lambda_role,
Code={"ImageUri": self.repo_config.batch_engine.materialization_image},
Timeout=600,
Tags={
"feast-owned": "True",
"project": project,
"feast-sdk-version": get_version(),
},
)
logger.info("Creating lambda function %s, %s", self.lambda_name, r)

logger.info("Waiting for function %s to be active", self.lambda_name)
waiter = self.lambda_client.get_waiter("function_active")
waiter.wait(FunctionName=self.lambda_name)

def teardown_infra(
self,
project: str,
fvs: Sequence[Union[BatchFeatureView, StreamFeatureView, FeatureView]],
entities: Sequence[Entity],
):
# This should be tearing down the lambda function.
logger.info("Tearing down lambda %s", self.lambda_name)
r = self.lambda_client.delete_function(FunctionName=self.lambda_name)
logger.info("Finished tearing down lambda %s: %s", self.lambda_name, r)

def __init__(
self,
*,
repo_config: RepoConfig,
offline_store: OfflineStore,
online_store: OnlineStore,
**kwargs,
):
super().__init__(
repo_config=repo_config,
offline_store=offline_store,
online_store=online_store,
**kwargs,
)
repo_path = self.repo_config.repo_path
assert repo_path
feature_store_path = repo_path / "feature_store.yaml"
self.feature_store_base64 = str(
base64.b64encode(bytes(feature_store_path.read_text(), "UTF-8")), "UTF-8"
)

self.lambda_name = f"feast-materialize-{self.repo_config.project}"
if len(self.lambda_name) > 64:
self.lambda_name = self.lambda_name[:64]
self.lambda_client = boto3.client("lambda")

def materialize(
self, registry, tasks: List[MaterializationTask]
) -> List[MaterializationJob]:
return [
self._materialize_one(
registry,
task.feature_view,
task.start_time,
task.end_time,
task.project,
task.tqdm_builder,
)
for task in tasks
]

def _materialize_one(
self,
registry: BaseRegistry,
feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView],
start_date: datetime,
end_date: datetime,
project: str,
tqdm_builder: Callable[[int], tqdm],
):
entities = []
for entity_name in feature_view.entities:
entities.append(registry.get_entity(entity_name, project))

(
join_key_columns,
feature_name_columns,
timestamp_field,
created_timestamp_column,
) = _get_column_names(feature_view, entities)

job_id = f"{feature_view.name}-{start_date}-{end_date}"

offline_job = self.offline_store.pull_latest_from_table_or_query(
config=self.repo_config,
data_source=feature_view.batch_source,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
timestamp_field=timestamp_field,
created_timestamp_column=created_timestamp_column,
start_date=start_date,
end_date=end_date,
)

paths = offline_job.to_remote_storage()
max_workers = len(paths) if len(paths) <= 20 else 20
executor = ThreadPoolExecutor(max_workers=max_workers)
futures = []

for path in paths:
payload = {
FEATURE_STORE_YAML_ENV_NAME: self.feature_store_base64,
"view_name": feature_view.name,
"view_type": "batch",
"path": path,
}
# Invoke a lambda to materialize this file.

logger.info("Invoking materialization for %s", path)
futures.append(
executor.submit(
self.lambda_client.invoke,
FunctionName=self.lambda_name,
InvocationType="RequestResponse",
Payload=json.dumps(payload),
)
)

done, not_done = wait(futures)
logger.info("Done: %s Not Done: %s", done, not_done)
for f in done:
response = f.result()
output = json.loads(response["Payload"].read())

logger.info(
f"Ingested task; request id {response['ResponseMetadata']['RequestId']}, "
f"rows written: {output['written_rows']}"
)

for f in not_done:
response = f.result()
logger.error(f"Ingestion failed: {response}")

return LambdaMaterializationJob(
job_id=job_id,
status=MaterializationJobStatus.SUCCEEDED
if not not_done
else MaterializationJobStatus.ERROR,
)
3 changes: 2 additions & 1 deletion sdk/python/feast/infra/online_stores/dynamodb.py
Expand Up @@ -229,7 +229,8 @@ def online_read(
break
batch_entity_ids = {
table_instance.name: {
"Keys": [{"entity_id": entity_id} for entity_id in batch]
"Keys": [{"entity_id": entity_id} for entity_id in batch],
"ConsistentRead": True,
}
}
with tracing_span(name="remote_call"):
Expand Down

0 comments on commit 6f79069

Please sign in to comment.