Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Forest Inference Operator #118

Merged
merged 15 commits into from
Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
180 changes: 175 additions & 5 deletions merlin/systems/dag/ops/fil.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
#
# Copyright (c) 2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
import pathlib
import pickle
Expand All @@ -10,7 +25,136 @@
from merlin.dag import ColumnSelector # noqa
from merlin.schema import ColumnSchema, Schema # noqa
from merlin.systems.dag.ops.compat import cuml_ensemble, lightgbm, sklearn_ensemble, xgboost
from merlin.systems.dag.ops.operator import InferenceOperator
from merlin.systems.dag.ops.operator import (
InferenceDataFrame,
InferenceOperator,
PipelineableInferenceOperator,
)

try:
import triton_python_backend_utils as pb_utils
except ImportError:
pb_utils = None


class Forest(PipelineableInferenceOperator):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is less an issue with this PR and more an issue for the library generally, which I don't think should block this PR but is probably a worthwhile point of discussion on which I'm open to suggestions:

We should probably come up with a consistent naming scheme for the operators that make actual model predictions in separate back-ends (like FIL and Tensorflow.) Originally I was thinking to make the convention FrameworkNamePredict, which I thought read pretty nicely in the operator graph definition. On second thought, I realized it might be nice for the prediction operators to be conceptually and alphabetically grouped together, so we changed the name to PredictTensorflow, which would imply this operator might be called PredictForest or PredictFIL. I'm not entirely happy with either convention though, so open to suggestions for how we might do this in a consistent way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Also, as a point of order for interpreting my PR comments: I mean this as literally as you can possibly take it—and I usually do. This is not a polite way of saying something else; I am not making a veiled suggestion for changing the code herein. I honestly don't know what to do here, and could be persuaded to go in any direction someone makes a halfway compelling case for. I just think we should probably, at some point, not necessarily now or in this PR, figure out how to name these operators consistently but I can't myself make a halfway compelling case for any particular convention, so...I don't really have a suggestion here, just an issue to raise for your consideration and future pondering. Happy to address this someday, when we think of something smarter than I have so far.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency with the current operators, I've updated to use the name PredictForest. There seems to be a similar pattern of <Type of Operatior><Type Of Data or Model> (QueryFaiss, QueryFeast, PredictTensorflow, FilterCandidates, UnrollFeatures, TransformWorkflow)

"""Operator for running Forest models.

Uses the Forest Inference Library (FIL) backend for inference.
"""

def __init__(self, model, input_schema, *, backend="python", **fil_params):
"""Instantiate a FIL inference operator.

Parameters
----------
model : Forest Model Instance
A forest model class. Supports XGBoost, LightGBM, and Scikit-Learn.
input_schema : merlin.schema.Schema
The schema representing the input columns expected by the model.
backend : str
The Triton backend to use to when running this operator.
**fil_params
The parameters to pass to the FIL operator.
"""
if model is not None:
self.fil_op = FIL(model, **fil_params)
self.backend = backend
self.input_schema = input_schema
self._fil_model_name = None

def compute_output_schema(
self,
input_schema: Schema,
col_selector: ColumnSelector,
prev_output_schema: Schema = None,
) -> Schema:
"""Return the output schema representing the columns this operator returns."""
return self.fil_op.compute_output_schema(
input_schema, col_selector, prev_output_schema=prev_output_schema
)

def compute_input_schema(
self,
root_schema: Schema,
parents_schema: Schema,
deps_schema: Schema,
selector: ColumnSelector,
) -> Schema:
"""Return the input schema representing the input columns this operator expects to use."""
return self.input_schema

def export(self, path, input_schema, output_schema, params=None, node_id=None, version=1):
"""Export the class and related files to the path specified."""
fil_model_config = self.fil_op.export(
path,
input_schema,
output_schema,
params=params,
node_id=node_id,
version=version,
)
params = params or {}
params = {**params, "fil_model_name": fil_model_config.name}
return super().export(
path,
input_schema,
output_schema,
params=params,
node_id=node_id,
version=version,
backend=self.backend,
)

@classmethod
def from_config(cls, config: dict) -> "Forest":
"""Instantiate the class from a dictionary representation.

Expected structure:
{
"input_dict": str # JSON dict with input names and schemas
"params": str # JSON dict with params saved at export
}

"""
column_schemas = [
ColumnSchema(name, **schema_properties)
for name, schema_properties in json.loads(config["input_dict"]).items()
]
input_schema = Schema(column_schemas)
cls_instance = cls(None, input_schema)
params = json.loads(config["params"])
cls_instance.set_fil_model_name(params["fil_model_name"])
return cls_instance

@property
def fil_model_name(self):
return self._fil_model_name

def set_fil_model_name(self, fil_model_name):
self._fil_model_name = fil_model_name

def transform(self, df: InferenceDataFrame) -> InferenceDataFrame:
"""Transform the dataframe by applying this FIL operator to the set of input columns.

Parameters
-----------
df: InferenceDataFrame
A pandas or cudf dataframe that this operator will work on

Returns
-------
InferenceDataFrame
Returns a transformed dataframe for this operator"""
input0 = np.array([x.ravel() for x in df.tensors.values()]).astype(np.float32).T
inference_request = pb_utils.InferenceRequest(
model_name=self.fil_model_name,
requested_output_names=["output__0"],
inputs=[pb_utils.Tensor("input__0", input0)],
)
inference_response = inference_request.exec()
output0 = pb_utils.get_output_tensor_by_name(inference_response, "output__0")
return InferenceDataFrame({"output__0": output0})


class FIL(InferenceOperator):
Expand All @@ -32,6 +176,7 @@ def __init__(
threads_per_tree=1,
blocks_per_sm=0,
transfer_threshold=0,
instance_group="AUTO",
):
"""Instantiate a FIL inference operator.

Expand Down Expand Up @@ -88,6 +233,9 @@ def __init__(
to the GPU for processing) will provide optimal latency and throughput, but
for low-latency deployments with the use_experimental_optimizations flag set
to true, higher values may be desirable.
instance_group : str
One of "AUTO", "GPU", "CPU". Default value is "AUTO". Specifies whether
inference will take place on the GPU or CPU.
"""
self.max_batch_size = max_batch_size
self.parameters = dict(
Expand All @@ -98,6 +246,7 @@ def __init__(
blocks_per_sm=blocks_per_sm,
storage_type=storage_type,
threshold=threshold,
instance_group=instance_group,
)
self.fil_model = get_fil_model(model)
super().__init__()
Expand All @@ -121,7 +270,15 @@ def compute_output_schema(
"""Returns output schema for FIL op"""
return Schema([ColumnSchema("output__0", dtype=np.float32)])

def export(self, path, input_schema, output_schema, node_id=None, version=1):
def export(
self,
path,
input_schema,
output_schema,
params: dict = None,
node_id=None,
version=1,
):
"""Export the model to the supplied path. Returns the config"""
node_name = f"{node_id}_{self.export_name}" if node_id is not None else self.export_name
node_export_path = pathlib.Path(path) / node_name
Expand Down Expand Up @@ -391,6 +548,7 @@ def fil_config(
blocks_per_sm=0,
threads_per_tree=1,
transfer_threshold=0,
instance_group="AUTO",
) -> model_config.ModelConfig:
"""Construct and return a FIL ModelConfig protobuf object.

Expand Down Expand Up @@ -453,6 +611,9 @@ def fil_config(
to the GPU for processing) will provide optimal latency and throughput, but
for low-latency deployments with the use_experimental_optimizations flag set
to true, higher values may be desirable.
instance_group : str
One of "AUTO", "GPU", "CPU". Default value is "AUTO". Specifies whether
inference will take place on the GPU or CPU.

Returns
model_config.ModelConfig
Expand Down Expand Up @@ -485,6 +646,17 @@ def fil_config(
"transfer_threshold": f"{transfer_threshold:d}",
}

supported_instance_groups = {"auto", "cpu", "gpu"}
instance_group = instance_group.lower() if isinstance(instance_group, str) else instance_group
if instance_group == "auto":
instance_group_kind = model_config.ModelInstanceGroup.Kind.KIND_AUTO
elif instance_group == "cpu":
instance_group_kind = model_config.ModelInstanceGroup.Kind.KIND_CPU
elif instance_group == "gpu":
instance_group_kind = model_config.ModelInstanceGroup.Kind.KIND_GPU
else:
raise ValueError(f"instance_group must be one of {supported_instance_groups}")

config = model_config.ModelConfig(
name=name,
backend="fil",
Expand All @@ -501,9 +673,7 @@ def fil_config(
name="output__0", data_type=model_config.TYPE_FP32, dims=[output_dim]
)
],
instance_group=[
model_config.ModelInstanceGroup(kind=model_config.ModelInstanceGroup.Kind.KIND_AUTO)
],
instance_group=[model_config.ModelInstanceGroup(kind=instance_group_kind)],
)

for parameter_key, parameter_value in parameters.items():
Expand Down
3 changes: 2 additions & 1 deletion merlin/systems/dag/ops/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ def export(
params: dict = None,
node_id: int = None,
version: int = 1,
backend: str = "python",
):
"""
Export the class object as a config and all related files to the user-defined path.
Expand Down Expand Up @@ -200,7 +201,7 @@ def export(
node_export_path = pathlib.Path(path) / node_name
node_export_path.mkdir(parents=True, exist_ok=True)

config = model_config.ModelConfig(name=node_name, backend="python", platform="op_runner")
config = model_config.ModelConfig(name=node_name, backend=backend, platform="op_runner")

config.parameters["operator_names"].string_value = json.dumps([node_name])

Expand Down
52 changes: 45 additions & 7 deletions merlin/systems/triton/oprunner_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,48 @@


class TritonPythonModel:
"""Model for Triton Python Backend.

Every Python model must have "TritonPythonModel" as the class name
"""

def initialize(self, args):
"""Called only once when the model is being loaded. Allowing
the model to initialize any state associated with this model.

Parameters
----------
args : dict
Both keys and values are strings. The dictionary keys and values are:
* model_config: A JSON string containing the model configuration
* model_instance_kind: A string containing model instance kind
* model_instance_device_id: A string containing model instance device ID
* model_repository: Model repository path
* model_version: Model version
* model_name: Model name
"""
self.model_config = json.loads(args["model_config"])
self.runner = OperatorRunner(self.model_config)

def execute(self, requests: List[InferenceRequest]) -> List[InferenceResponse]:
"""Receives a list of pb_utils.InferenceRequest as the only argument. This
function is called when an inference is requested for this model. Depending on the
batching configuration (e.g. Dynamic Batching) used, `requests` may contain
multiple requests. Every Python model, must create one pb_utils.InferenceResponse
for every pb_utils.InferenceRequest in `requests`. If there is an error, you can
set the error argument when creating a pb_utils.InferenceResponse.

Parameters
----------
requests : list
A list of pb_utils.InferenceRequest

Returns
-------
list
A list of pb_utils.InferenceResponse. The length of this list must
be the same as `requests`
"""
params = self.model_config["parameters"]
op_names = json.loads(params["operator_names"]["string_value"])
first_operator_name = op_names[0]
Expand All @@ -67,14 +104,15 @@ def execute(self, requests: List[InferenceRequest]) -> List[InferenceResponse]:

raw_tensor_tuples = self.runner.execute(inf_df)

tensors = {
name: (data.get() if hasattr(data, "get") else data)
for name, data in raw_tensor_tuples
}

result = [Tensor(name, data) for name, data in tensors.items()]
output_tensors = []
for name, data in raw_tensor_tuples:
if isinstance(data, Tensor):
output_tensors.append(data)
data = data.get() if hasattr(data, "get") else data
tensor = Tensor(name, data)
output_tensors.append(tensor)

responses.append(InferenceResponse(result))
responses.append(InferenceResponse(output_tensors))

except Exception: # pylint: disable=broad-except
exc_type, exc_value, exc_traceback = sys.exc_info()
Expand Down