Skip to content

Commit

Permalink
Unified Schema (#184)
Browse files Browse the repository at this point in the history
* Unified Schema

This modifies merlin-models to be compatabile with the new 'core' schema class that
will be shared with NVTabular

* Move to merlin.schema package instead of merlin.graph

* .

* Update requirements to pull in merlin-core

* .

* .

* .

* .

* Remove loader.dispatch in favour of merlin.core.dispatch

* Add create_continuous_column

* remove merlin_standard_lib

* Update music_streaming notebook

* Update tests/tf/test_core.py
  • Loading branch information
benfred committed Feb 18, 2022
1 parent 1249ac9 commit c7fad98
Show file tree
Hide file tree
Showing 83 changed files with 559 additions and 2,900 deletions.
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ clean:
rm -r docs dist build *.egg-info

docstrings:
sphinx-apidoc -f -o docs/source/api merlin_models
sphinx-apidoc -f -o docs/source/api/merlin_standard_lib merlin_standard_lib
sphinx-apidoc -f -o docs/source/api models

docs:
cd docs && make html
Expand Down
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ Low-level API:

```python
import merlin_models.tf as ml
from merlin_standard_lib import Tag
from merlin.schema import Tags

user_tower = ml.InputBlock(schema.select_by_tag(Tag.USER), ml.MLPBlock([512, 256]))
item_tower = ml.InputBlock(schema.select_by_tag(Tag.ITEM), ml.MLPBlock([512, 256]))
user_tower = ml.InputBlock(schema.select_by_tag(Tags.USER), ml.MLPBlock([512, 256]))
item_tower = ml.InputBlock(schema.select_by_tag(Tags.ITEM), ml.MLPBlock([512, 256]))
two_tower = ml.ParallelBlock({"user": user_tower, "item": item_tower})
model = two_tower.connect(ml.ItemRetrievalTask())
```
Expand All @@ -83,9 +83,9 @@ High-level API:
import merlin_models.tf as ml

dlrm = ml.DLRMBlock(
schema,
embedding_dim=32,
bottom_block=ml.MLPBlock([512, 128]),
schema,
embedding_dim=32,
bottom_block=ml.MLPBlock([512, 128]),
top_block=ml.MLPBlock([512, 128])
)
model = dlrm.connect(ml.BinaryClassificationTask(schema))
Expand Down Expand Up @@ -158,7 +158,7 @@ import merlin_models.tf as ml

inputs = ml.InputBlock(schema)
prediction_tasks = ml.PredictionTasks(schema)
block = ml.MLPBlock([64])
block = ml.MLPBlock([64])
cgc = ml.CGCBlock(
prediction_tasks, expert_block=ml.MLPBlock([64]), num_task_experts=2, num_shared_experts=2
)
Expand Down
1 change: 0 additions & 1 deletion docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ Refer to the following instructions to build the docs.

```sh
sphinx-apidoc -f -o source/api ../merlin_models
sphinx-apidoc -f -o source/api/merlin_standard_lib ../merlin_standard_lib
```

4. Navigate to `models/docs/` and transform the documentation to HTML output:
Expand Down
172 changes: 50 additions & 122 deletions examples/music_streaming.ipynb

Large diffs are not rendered by default.

19 changes: 9 additions & 10 deletions examples/music_streaming.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
# import tensorflow as tf
#
# import merlin_models.tf as ml
# import merlin_standard_lib as msl
# from merlin_models.data import SyntheticDataset
# from merlin_standard_lib import Schema, Tag
# from merlin.schema import Schema, Tags
#
# # RETRIEVAL
#
Expand All @@ -15,7 +14,7 @@
#
#
# def build_youtube_dnn(schema: Schema, dims=(512, 256), num_sampled=50) -> ml.Model:
# user_schema = schema.select_by_tag(Tag.USER)
# user_schema = schema.select_by_tag(Tags.USER)
# dnn = ml.inputs(user_schema, post="continuous-powers").apply(ml.MLPBlock(dims))
# prediction_task = ml.SampledItemPredictionTask(schema, dim=dims[-1], num_sampled=num_sampled)
#
Expand All @@ -29,25 +28,25 @@
# return ml.TwoTowerBlock(schema, ml.MLPBlock(dims)).to_model(schema.select_by_name(target))
#
# def method_2() -> ml.Model:
# user_tower = ml.inputs(schema.select_by_tag(Tag.USER), ml.MLPBlock([512, 256]))
# item_tower = ml.inputs(schema.select_by_tag(Tag.ITEM), ml.MLPBlock([512, 256]))
# user_tower = ml.inputs(schema.select_by_tag(Tags.USER), ml.MLPBlock([512, 256]))
# item_tower = ml.inputs(schema.select_by_tag(Tags.ITEM), ml.MLPBlock([512, 256]))
# two_tower = ml.merge({"user": user_tower, "item": item_tower}, aggregation="cosine")
# model = two_tower.to_model(schema.select_by_name(target))
#
# return model
#
# def method_3() -> ml.Model:
# def routes_verbose(inputs, schema: Schema):
# user_features = schema.select_by_tag(Tag.USER).filter_columns_from_dict(inputs)
# item_features = schema.select_by_tag(Tag.ITEM).filter_columns_from_dict(inputs)
# user_features = schema.select_by_tag(Tags.USER).filter_columns_from_dict(inputs)
# item_features = schema.select_by_tag(Tags.ITEM).filter_columns_from_dict(inputs)
#
# user_tower = ml.MLPBlock(dims)(user_features)
# item_tower = ml.MLPBlock(dims)(item_features)
#
# return ml.ParallelBlock(dict(user=user_tower, item=item_tower), aggregation="cosine")
#
# user_tower = ml.MLPBlock(dims, filter=Tag.USER).as_tabular("user")
# item_tower = ml.MLPBlock(dims, filter=Tag.ITEM).as_tabular("item")
# user_tower = ml.MLPBlock(dims, filter=Tags.USER).as_tabular("user")
# item_tower = ml.MLPBlock(dims, filter=Tags.ITEM).as_tabular("item")
#
# two_tower = ml.inputs(schema).branch(user_tower, item_tower, aggregation="cosine")
# model = two_tower.to_model(schema.select_by_name(target))
Expand Down Expand Up @@ -139,7 +138,7 @@
# targets = {"item_id": data_df.pop("item_id")}
# else:
# targets = {}
# for target in schema.select_by_tag(Tag.BINARY_CLASSIFICATION):
# for target in schema.select_by_tag(Tags.BINARY_CLASSIFICATION):
# targets[target.name] = data_df.pop(target.name)
#
# dataset = tf.data.Dataset.from_tensor_slices((dict(data_df), targets))
Expand Down
4 changes: 2 additions & 2 deletions merlin_models/config/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#
from typing import Optional

from merlin_standard_lib import Schema
from merlin.schema import Schema, Tags


class SchemaMixin:
Expand Down Expand Up @@ -54,7 +54,7 @@ def _maybe_set_schema(self, input, schema):
input.set_schema(schema)

def get_item_ids_from_inputs(self, inputs):
return inputs[self.schema.item_id_column_name]
return inputs[self.schema.select_by_tag(Tags.ITEM_ID).first.name]

def get_padding_mask_from_item_id(self, inputs, pad_token=0):
item_id_inputs = self.get_item_ids_from_inputs(inputs)
Expand Down
Binary file modified merlin_models/data/music_streaming/data.parquet
Binary file not shown.
73 changes: 42 additions & 31 deletions merlin_models/data/synthetic.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,15 @@
from random import randint
from typing import Optional, Union

import numpy as np
import pandas as pd
from merlin.schema import Schema, Tags
from merlin.schema.io.tensorflow_metadata import TensorflowMetadata

from merlin_standard_lib import Schema, Tag
from merlin_standard_lib.utils.proto_utils import has_field
from merlin_models.utils.schema import (
schema_to_tensorflow_metadata_json,
tensorflow_metadata_json_to_schema,
)

LOG = logging.getLogger("merlin-models")
HERE = pathlib.Path(__file__).parent
Expand Down Expand Up @@ -86,7 +91,7 @@ def from_schema(
output_dir = tempfile.mkdtemp()

if not os.path.exists(os.path.join(output_dir, "schema.json")):
schema.to_json(os.path.join(output_dir, "schema.json"))
schema_to_tensorflow_metadata_json(schema, os.path.join(output_dir, "schema.json"))

output = cls(output_dir, device=device)
output.generate_interactions(num_rows, min_session_length, max_session_length)
Expand All @@ -99,9 +104,9 @@ def read_schema(cls, path: Union[str, Path]) -> Schema:
os.path.join(str(path), "schema.json") if os.path.isdir(str(path)) else str(path)
)
if _schema_path.endswith(".pb") or _schema_path.endswith(".pbtxt"):
return Schema().from_proto_text(_schema_path)
TensorflowMetadata.from_from_proto_text(_schema_path).to_merlin_schema()

return Schema().from_json(_schema_path)
return tensorflow_metadata_json_to_schema(_schema_path)

@property
def schema(self) -> Schema:
Expand Down Expand Up @@ -164,7 +169,7 @@ def torch_dataloader(self, batch_size=50):
raise NotImplementedError()

def _pull_out_targets(self, inputs):
target_names = self.schema.select_by_tag("target").column_names
target_names = self.schema.select_by_tag(Tags.TARGET).column_names
targets = {}

for target_name in target_names:
Expand All @@ -185,9 +190,9 @@ def generate_user_item_interactions(
it supports the generation of conditional user, item and session features.
The schema should include a few tags:
- `Tag.SESSION_ID` to tag the session-id feature.
- `Tag.USER_ID` for user-id feature.
- `Tag.ITEM_ID` for item-id feature.
- `Tags.SESSION_ID` to tag the session-id feature.
- `Tags.USER_ID` for user-id feature.
- `Tags.ITEM_ID` for item-id feature.
It supports both, GPU-based and CPU-based, generation.
Expand Down Expand Up @@ -220,16 +225,16 @@ def generate_user_item_interactions(
data = _frame.DataFrame()
processed_cols = []
# get session cols
session_id_col = schema.select_by_tag(Tag.SESSION_ID)
session_id_col = list(schema.select_by_tag(Tags.SESSION_ID))
if session_id_col:
session_id_col = session_id_col.feature[0]
session_id_col = session_id_col[0]
data[session_id_col.name] = _array.clip(
_array.random.lognormal(3.0, 1.0, num_interactions).astype(_array.int32),
1,
session_id_col.int_domain.max,
).astype(_array.int64)

features = schema.select_by_tag(Tag.SESSION).remove_by_tag(Tag.SESSION_ID).feature
features = list(schema.select_by_tag(Tags.SESSION).remove_by_tag(Tags.SESSION_ID))
data = generate_conditional_features(
data,
features,
Expand All @@ -239,16 +244,17 @@ def generate_user_item_interactions(
device=device,
)
processed_cols += [f.name for f in features] + [session_id_col.name]

# get USER cols
user_id_col = schema.select_by_tag(Tag.USER_ID).feature
if user_id_col:
user_id_col = user_id_col[0]
user_id_cols = list(schema.select_by_tag(Tags.USER_ID))
if user_id_cols:
user_id_col = user_id_cols[0]
data[user_id_col.name] = _array.clip(
_array.random.lognormal(3.0, 1.0, num_interactions).astype(_array.int32),
1,
user_id_col.int_domain.max,
).astype(_array.int64)
features = schema.select_by_tag(Tag.USER).remove_by_tag(Tag.USER_ID).feature
features = list(schema.select_by_tag(Tags.USER).remove_by_tag(Tags.USER_ID))
data = generate_conditional_features(
data,
features,
Expand All @@ -260,8 +266,9 @@ def generate_user_item_interactions(
processed_cols += [f.name for f in features] + [user_id_col.name]

# get ITEM cols
item_id_col = schema.select_by_tag(Tag.ITEM_ID).feature[0]
is_list_feature = has_field(item_id_col, "value_count")
item_id_col = list(schema.select_by_tag(Tags.ITEM_ID))[0]

is_list_feature = item_id_col.is_list
if not is_list_feature:
shape = num_interactions
else:
Expand All @@ -275,7 +282,7 @@ def generate_user_item_interactions(
.astype(_array.int64)
.tolist()
)
features = schema.select_by_tag(Tag.ITEM).remove_by_tag(Tag.ITEM_ID).feature
features = list(schema.select_by_tag(Tags.ITEM).remove_by_tag(Tags.ITEM_ID))
data = generate_conditional_features(
data,
features,
Expand All @@ -287,28 +294,32 @@ def generate_user_item_interactions(
processed_cols += [f.name for f in features] + [item_id_col.name]

# Get remaining features
remaining = schema.remove_by_name(processed_cols)
remaining = schema.without(processed_cols)

for feature in remaining.select_by_tag(Tag.BINARY_CLASSIFICATION).feature:
for feature in remaining.select_by_tag(Tags.BINARY_CLASSIFICATION):
data[feature.name] = _array.random.randint(0, 2, num_interactions).astype(_array.int64)

for feature in remaining.remove_by_tag(Tag.BINARY_CLASSIFICATION).feature:
is_int_feature = has_field(feature, "int_domain")
is_list_feature = has_field(feature, "value_count")
for feature in remaining.remove_by_tag(Tags.BINARY_CLASSIFICATION):
is_int_feature = np.issubdtype(feature.dtype, np.integer)
is_list_feature = feature.is_list
if is_list_feature:
data[feature.name] = generate_random_list_feature(
feature, num_interactions, min_session_length, max_session_length, device
)

elif is_int_feature:
domain = feature.int_domain
min_value, max_value = (domain.min, domain.max) if domain else (0, 1)

data[feature.name] = _array.random.randint(
1, feature.int_domain.max, num_interactions
min_value, max_value, num_interactions
).astype(_array.int64)

else:
data[feature.name] = _array.random.uniform(
feature.float_domain.min, feature.float_domain.max, num_interactions
)
domain = feature.float_domain
min_value, max_value = (domain.min, domain.max) if domain else (0.0, 1.0)

data[feature.name] = _array.random.uniform(min_value, max_value, num_interactions)

return data

Expand All @@ -333,8 +344,8 @@ def generate_conditional_features(

num_interactions = data.shape[0]
for feature in features:
is_int_feature = has_field(feature, "int_domain")
is_list_feature = has_field(feature, "value_count")
is_int_feature = np.issubdtype(feature.dtype, np.integer)
is_list_feature = feature.is_list

if is_list_feature:
data[feature.name] = generate_random_list_feature(
Expand Down Expand Up @@ -368,7 +379,7 @@ def generate_random_list_feature(
else:
import cupy as _array

is_int_feature = has_field(feature, "int_domain")
is_int_feature = np.issubdtype(feature.dtype, np.integer)
if is_int_feature:
if max_session_length:
padded_array = []
Expand Down
Loading

0 comments on commit c7fad98

Please sign in to comment.