Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unified Schema #184

Merged
merged 15 commits into from
Feb 18, 2022
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ clean:
rm -r docs dist build *.egg-info

docstrings:
sphinx-apidoc -f -o docs/source/api merlin_models
sphinx-apidoc -f -o docs/source/api/merlin_standard_lib merlin_standard_lib
sphinx-apidoc -f -o docs/source/api models

docs:
cd docs && make html
Expand Down
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ Low-level API:

```python
import merlin_models.tf as ml
from merlin_standard_lib import Tag
from merlin.schema import Tags

user_tower = ml.InputBlock(schema.select_by_tag(Tag.USER), ml.MLPBlock([512, 256]))
item_tower = ml.InputBlock(schema.select_by_tag(Tag.ITEM), ml.MLPBlock([512, 256]))
user_tower = ml.InputBlock(schema.select_by_tag(Tags.USER), ml.MLPBlock([512, 256]))
item_tower = ml.InputBlock(schema.select_by_tag(Tags.ITEM), ml.MLPBlock([512, 256]))
two_tower = ml.ParallelBlock({"user": user_tower, "item": item_tower})
model = two_tower.connect(ml.ItemRetrievalTask())
```
Expand All @@ -83,9 +83,9 @@ High-level API:
import merlin_models.tf as ml

dlrm = ml.DLRMBlock(
schema,
embedding_dim=32,
bottom_block=ml.MLPBlock([512, 128]),
schema,
embedding_dim=32,
bottom_block=ml.MLPBlock([512, 128]),
top_block=ml.MLPBlock([512, 128])
)
model = dlrm.connect(ml.BinaryClassificationTask(schema))
Expand Down Expand Up @@ -158,7 +158,7 @@ import merlin_models.tf as ml

inputs = ml.InputBlock(schema)
prediction_tasks = ml.PredictionTasks(schema)
block = ml.MLPBlock([64])
block = ml.MLPBlock([64])
cgc = ml.CGCBlock(
prediction_tasks, expert_block=ml.MLPBlock([64]), num_task_experts=2, num_shared_experts=2
)
Expand Down
1 change: 0 additions & 1 deletion docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ Refer to the following instructions to build the docs.

```sh
sphinx-apidoc -f -o source/api ../merlin_models
sphinx-apidoc -f -o source/api/merlin_standard_lib ../merlin_standard_lib
```

4. Navigate to `models/docs/` and transform the documentation to HTML output:
Expand Down
172 changes: 50 additions & 122 deletions examples/music_streaming.ipynb

Large diffs are not rendered by default.

19 changes: 9 additions & 10 deletions examples/music_streaming.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
# import tensorflow as tf
#
# import merlin_models.tf as ml
# import merlin_standard_lib as msl
# from merlin_models.data import SyntheticDataset
# from merlin_standard_lib import Schema, Tag
# from merlin.schema import Schema, Tags
#
# # RETRIEVAL
#
Expand All @@ -15,7 +14,7 @@
#
#
# def build_youtube_dnn(schema: Schema, dims=(512, 256), num_sampled=50) -> ml.Model:
# user_schema = schema.select_by_tag(Tag.USER)
# user_schema = schema.select_by_tag(Tags.USER)
# dnn = ml.inputs(user_schema, post="continuous-powers").apply(ml.MLPBlock(dims))
# prediction_task = ml.SampledItemPredictionTask(schema, dim=dims[-1], num_sampled=num_sampled)
#
Expand All @@ -29,25 +28,25 @@
# return ml.TwoTowerBlock(schema, ml.MLPBlock(dims)).to_model(schema.select_by_name(target))
#
# def method_2() -> ml.Model:
# user_tower = ml.inputs(schema.select_by_tag(Tag.USER), ml.MLPBlock([512, 256]))
# item_tower = ml.inputs(schema.select_by_tag(Tag.ITEM), ml.MLPBlock([512, 256]))
# user_tower = ml.inputs(schema.select_by_tag(Tags.USER), ml.MLPBlock([512, 256]))
# item_tower = ml.inputs(schema.select_by_tag(Tags.ITEM), ml.MLPBlock([512, 256]))
# two_tower = ml.merge({"user": user_tower, "item": item_tower}, aggregation="cosine")
# model = two_tower.to_model(schema.select_by_name(target))
#
# return model
#
# def method_3() -> ml.Model:
# def routes_verbose(inputs, schema: Schema):
# user_features = schema.select_by_tag(Tag.USER).filter_columns_from_dict(inputs)
# item_features = schema.select_by_tag(Tag.ITEM).filter_columns_from_dict(inputs)
# user_features = schema.select_by_tag(Tags.USER).filter_columns_from_dict(inputs)
# item_features = schema.select_by_tag(Tags.ITEM).filter_columns_from_dict(inputs)
#
# user_tower = ml.MLPBlock(dims)(user_features)
# item_tower = ml.MLPBlock(dims)(item_features)
#
# return ml.ParallelBlock(dict(user=user_tower, item=item_tower), aggregation="cosine")
#
# user_tower = ml.MLPBlock(dims, filter=Tag.USER).as_tabular("user")
# item_tower = ml.MLPBlock(dims, filter=Tag.ITEM).as_tabular("item")
# user_tower = ml.MLPBlock(dims, filter=Tags.USER).as_tabular("user")
# item_tower = ml.MLPBlock(dims, filter=Tags.ITEM).as_tabular("item")
#
# two_tower = ml.inputs(schema).branch(user_tower, item_tower, aggregation="cosine")
# model = two_tower.to_model(schema.select_by_name(target))
Expand Down Expand Up @@ -139,7 +138,7 @@
# targets = {"item_id": data_df.pop("item_id")}
# else:
# targets = {}
# for target in schema.select_by_tag(Tag.BINARY_CLASSIFICATION):
# for target in schema.select_by_tag(Tags.BINARY_CLASSIFICATION):
# targets[target.name] = data_df.pop(target.name)
#
# dataset = tf.data.Dataset.from_tensor_slices((dict(data_df), targets))
Expand Down
4 changes: 2 additions & 2 deletions merlin_models/config/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#
from typing import Optional

from merlin_standard_lib import Schema
from merlin.schema import Schema, Tags


class SchemaMixin:
Expand Down Expand Up @@ -54,7 +54,7 @@ def _maybe_set_schema(self, input, schema):
input.set_schema(schema)

def get_item_ids_from_inputs(self, inputs):
return inputs[self.schema.item_id_column_name]
return inputs[self.schema.select_by_tag(Tags.ITEM_ID).first.name]

def get_padding_mask_from_item_id(self, inputs, pad_token=0):
item_id_inputs = self.get_item_ids_from_inputs(inputs)
Expand Down
Binary file modified merlin_models/data/music_streaming/data.parquet
Binary file not shown.
73 changes: 42 additions & 31 deletions merlin_models/data/synthetic.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,15 @@
from random import randint
from typing import Optional, Union

import numpy as np
import pandas as pd
from merlin.schema import Schema, Tags
from merlin.schema.io.tensorflow_metadata import TensorflowMetadata

from merlin_standard_lib import Schema, Tag
from merlin_standard_lib.utils.proto_utils import has_field
from merlin_models.utils.schema import (
schema_to_tensorflow_metadata_json,
tensorflow_metadata_json_to_schema,
)

LOG = logging.getLogger("merlin-models")
HERE = pathlib.Path(__file__).parent
Expand Down Expand Up @@ -86,7 +91,7 @@ def from_schema(
output_dir = tempfile.mkdtemp()

if not os.path.exists(os.path.join(output_dir, "schema.json")):
schema.to_json(os.path.join(output_dir, "schema.json"))
schema_to_tensorflow_metadata_json(schema, os.path.join(output_dir, "schema.json"))

output = cls(output_dir, device=device)
output.generate_interactions(num_rows, min_session_length, max_session_length)
Expand All @@ -99,9 +104,9 @@ def read_schema(cls, path: Union[str, Path]) -> Schema:
os.path.join(str(path), "schema.json") if os.path.isdir(str(path)) else str(path)
)
if _schema_path.endswith(".pb") or _schema_path.endswith(".pbtxt"):
return Schema().from_proto_text(_schema_path)
TensorflowMetadata.from_from_proto_text(_schema_path).to_merlin_schema()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be from_proto_text_file? Also not sure this works with the way that method is written, since I think the current implementation wants the directory and assumes the file is called schema.pbtxt unless you pass a different value of the filename param. Now wondering if this is covered by tests too...


return Schema().from_json(_schema_path)
return tensorflow_metadata_json_to_schema(_schema_path)

@property
def schema(self) -> Schema:
Expand Down Expand Up @@ -164,7 +169,7 @@ def torch_dataloader(self, batch_size=50):
raise NotImplementedError()

def _pull_out_targets(self, inputs):
target_names = self.schema.select_by_tag("target").column_names
target_names = self.schema.select_by_tag(Tags.TARGET).column_names
targets = {}

for target_name in target_names:
Expand All @@ -185,9 +190,9 @@ def generate_user_item_interactions(
it supports the generation of conditional user, item and session features.

The schema should include a few tags:
- `Tag.SESSION_ID` to tag the session-id feature.
- `Tag.USER_ID` for user-id feature.
- `Tag.ITEM_ID` for item-id feature.
- `Tags.SESSION_ID` to tag the session-id feature.
- `Tags.USER_ID` for user-id feature.
- `Tags.ITEM_ID` for item-id feature.

It supports both, GPU-based and CPU-based, generation.

Expand Down Expand Up @@ -220,16 +225,16 @@ def generate_user_item_interactions(
data = _frame.DataFrame()
processed_cols = []
# get session cols
session_id_col = schema.select_by_tag(Tag.SESSION_ID)
session_id_col = list(schema.select_by_tag(Tags.SESSION_ID))
if session_id_col:
session_id_col = session_id_col.feature[0]
session_id_col = session_id_col[0]
data[session_id_col.name] = _array.clip(
_array.random.lognormal(3.0, 1.0, num_interactions).astype(_array.int32),
1,
session_id_col.int_domain.max,
).astype(_array.int64)

features = schema.select_by_tag(Tag.SESSION).remove_by_tag(Tag.SESSION_ID).feature
features = list(schema.select_by_tag(Tags.SESSION).remove_by_tag(Tags.SESSION_ID))
data = generate_conditional_features(
data,
features,
Expand All @@ -239,16 +244,17 @@ def generate_user_item_interactions(
device=device,
)
processed_cols += [f.name for f in features] + [session_id_col.name]

# get USER cols
user_id_col = schema.select_by_tag(Tag.USER_ID).feature
if user_id_col:
user_id_col = user_id_col[0]
user_id_cols = list(schema.select_by_tag(Tags.USER_ID))
if user_id_cols:
user_id_col = user_id_cols[0]
data[user_id_col.name] = _array.clip(
_array.random.lognormal(3.0, 1.0, num_interactions).astype(_array.int32),
1,
user_id_col.int_domain.max,
).astype(_array.int64)
features = schema.select_by_tag(Tag.USER).remove_by_tag(Tag.USER_ID).feature
features = list(schema.select_by_tag(Tags.USER).remove_by_tag(Tags.USER_ID))
data = generate_conditional_features(
data,
features,
Expand All @@ -260,8 +266,9 @@ def generate_user_item_interactions(
processed_cols += [f.name for f in features] + [user_id_col.name]

# get ITEM cols
item_id_col = schema.select_by_tag(Tag.ITEM_ID).feature[0]
is_list_feature = has_field(item_id_col, "value_count")
item_id_col = list(schema.select_by_tag(Tags.ITEM_ID))[0]

is_list_feature = item_id_col.is_list
if not is_list_feature:
shape = num_interactions
else:
Expand All @@ -275,7 +282,7 @@ def generate_user_item_interactions(
.astype(_array.int64)
.tolist()
)
features = schema.select_by_tag(Tag.ITEM).remove_by_tag(Tag.ITEM_ID).feature
features = list(schema.select_by_tag(Tags.ITEM).remove_by_tag(Tags.ITEM_ID))
data = generate_conditional_features(
data,
features,
Expand All @@ -287,28 +294,32 @@ def generate_user_item_interactions(
processed_cols += [f.name for f in features] + [item_id_col.name]

# Get remaining features
remaining = schema.remove_by_name(processed_cols)
remaining = schema.without(processed_cols)

for feature in remaining.select_by_tag(Tag.BINARY_CLASSIFICATION).feature:
for feature in remaining.select_by_tag(Tags.BINARY_CLASSIFICATION):
data[feature.name] = _array.random.randint(0, 2, num_interactions).astype(_array.int64)

for feature in remaining.remove_by_tag(Tag.BINARY_CLASSIFICATION).feature:
is_int_feature = has_field(feature, "int_domain")
is_list_feature = has_field(feature, "value_count")
for feature in remaining.remove_by_tag(Tags.BINARY_CLASSIFICATION):
is_int_feature = np.issubdtype(feature.dtype, np.integer)
is_list_feature = feature.is_list
if is_list_feature:
data[feature.name] = generate_random_list_feature(
feature, num_interactions, min_session_length, max_session_length, device
)

elif is_int_feature:
domain = feature.int_domain
min_value, max_value = (domain.min, domain.max) if domain else (0, 1)

data[feature.name] = _array.random.randint(
1, feature.int_domain.max, num_interactions
min_value, max_value, num_interactions
).astype(_array.int64)

else:
data[feature.name] = _array.random.uniform(
feature.float_domain.min, feature.float_domain.max, num_interactions
)
domain = feature.float_domain
min_value, max_value = (domain.min, domain.max) if domain else (0.0, 1.0)

data[feature.name] = _array.random.uniform(min_value, max_value, num_interactions)

return data

Expand All @@ -333,8 +344,8 @@ def generate_conditional_features(

num_interactions = data.shape[0]
for feature in features:
is_int_feature = has_field(feature, "int_domain")
is_list_feature = has_field(feature, "value_count")
is_int_feature = np.issubdtype(feature.dtype, np.integer)
is_list_feature = feature.is_list

if is_list_feature:
data[feature.name] = generate_random_list_feature(
Expand Down Expand Up @@ -368,7 +379,7 @@ def generate_random_list_feature(
else:
import cupy as _array

is_int_feature = has_field(feature, "int_domain")
is_int_feature = np.issubdtype(feature.dtype, np.integer)
if is_int_feature:
if max_session_length:
padded_array = []
Expand Down