# Pseudo nvTabular
What follows is a demonstration of a lightweight re-imagining of the nvTabular library to show potential new conceptual relationships between objects. In doing so, I've tried to simplify code as much as possible following a few guiding principles:
- writing python code is preferable to writing config files
- an extra function call is preferable to a shim
- explicit, and possibly longer, method and attribute names are preferable to shorthand
- stateless objects are preferable to stateful ones
    - as a corollary, use properties if you can
- namedtuple inheriters are preferable to `__init__` methods that just save initialization arguments

In [1]:
import nv_tabular as nvt

# Dataset Schema
We'll use some imaginary dataset I made to demonstrate the principles at work here. Note that we have two target labels, a common use case in recommendation systems for models like MMoE. Here we'll define the dataset **schema**, a high level description of the features the dataset uses and their type (continuous or categorical). This schema is conceptually distinct from any particular realization of this schema, e.g. the one found in `this_data.csv`.

In [2]:
CATEGORICAL_COLUMNS = ['uid', 'iid', 'location', 'has_interacted_before']
CONTINUOUS_COLUMNS = ['timestamp', 'user_age', 'item_average_rating']
LABEL_COLUMNS = ['click', 'purchase']

# `Workflow`
A `Workflow` will represent this schema and a chain of transformations, or `Op`s, performed on it. Just as the schema is not tied to any particular realization of itself, neither are the transformations that get applied to it. The same `Workflow` should be able to be applied to one realization just as easily as another, provided they both can be fit into the appropriate schema.

By default, the `Workflow` will be instantiated with no `Op`s in it, which for the sake of generality we can imagine as being initialized with just one op in its pipeline, the identity operation. We can add `Op`s to a workflow through a few methods, which we'll show below.

In [3]:
workflow = nvt.ops.Workflow(
    cat_names=CATEGORICAL_COLUMNS,
    cont_names=CONTINUOUS_COLUMNS,
    label_names=LABEL_COLUMNS,
    ops=[] # initialize with no ops
)

`workflow` now represents a `Workflow` acting on the schema given by the designated columns. It can be applied to `Dataset`s that have _more_ columns than these, which it will drop, but any `Dataset` must contain these columns (we might adopt some language like it can apply to `Dataset`s with schema that are a **superset** of the `Workflow` schema, but not a **subset**).

# Ops
`Op`s are stateless representations of transforms that get applied to the schema in a `Workflow`. They can be instantiated by specifying columns on which to act, or can be left generic in order to apply to an entire category of variable (continuous of categorical) in *any* `Workflow` to which the `Op` is applied. In this case, what gets added to the `Workflow` is not the `Op` itself, but a copy of the `Op` with `columns` set to the corresponding group of columns in the `Workflow`.

Alternatively, we can pass a function to `columns` that takes a column name as input and outputs a boolean indicating whether this `Op` should apply to such a column.

In [4]:
categorify = nvt.ops.Categorify(columns=['uid', 'iid', 'location'], replace=True)

Since `Op`s are operators acting on the existing `Workflow` schema, they are added to a `Workflow` by *calling* them on that `Workflow`, like so

In [5]:
workflow = categorify(workflow)
print(workflow.ops)

[Categorify(columns=['uid', 'iid', 'location'], replace=True, name=None)]


In fact, the `Workflow` object itself inherits from `Op`, so we can add all the ops from one workflow to another by calling:

In [6]:
log = nvt.ops.Log() # instantiate generically
normalize = nvt.ops.Normalize()
print(log.columns, normalize.columns)

log_and_normalize_workflow = nvt.ops.Workflow(ops=[log, normalize]) # the whole workflow is being instantiate generically
print(log_and_normalize_workflow.ops)

None None
[Log(columns=[], replace=True, name=None), Normalize(columns=[], replace=True, name=None)]


In [7]:
workflow = log_and_normalize_workflow(workflow)
print(workflow.ops)

[Categorify(columns=['uid', 'iid', 'location'], replace=True, name=None), Log(columns=['timestamp', 'user_age', 'item_average_rating'], replace=True, name=None), Normalize(columns=['timestamp', 'user_age', 'item_average_rating'], replace=True, name=None)]


Notice that while the `Log` `Op` in our workflow has explicit columns now:

In [8]:
print(workflow.ops[1].columns)

['timestamp', 'user_age', 'item_average_rating']


the original `Log` `Op` doesn't

In [9]:
print(log.columns)

None


This means that the same `Op` can be reused between different `Workflow`s without sacrificing its generality.

# Dataset
A `Dataset` object represents the file or files used in one realization of a dataset schema. It is an iterable object that iterates through that dataset in batches. This is probably the cheapest of my recreations here, but conceptually it should line up.

In [10]:
dataset = nvt.dataset('this_data.csv', batch_size=128)
next(iter(dataset))

Unnamed: 0,uid,iid,location,has_interacted_before,timestamp,user_age,item_average_rating,click,purchase
0,qbdcpblvlt,58866,San Francisco,0,1586992816,19,3.874372,0,1
1,atgapomeop,29207,Copenhagen,0,1587045716,39,4.417085,1,1
2,pmgfvettto,21504,Los Angeles,0,1587003510,40,3.070352,1,0
3,lebsdtjdvx,33679,San Francisco,1,1586976933,33,1.643216,1,1
4,veuvxtfefp,94304,San Francisco,1,1587054833,30,4.638191,0,0
...,...,...,...,...,...,...,...,...,...
123,kltkhskjzo,3916,Los Angeles,0,1587030892,28,4.698492,1,0
124,ejknasvbtb,44802,Berkeley,1,1587002114,38,2.849246,0,0
125,oehgtxbklu,35596,Los Angeles,0,1586982383,28,2.648241,1,1
126,ladbxduoqt,17234,Copenhagen,1,1586998553,20,3.110553,0,0


# Stats Context
Some of the `Op`s in a `Workflow` may require statistics about a `Dataset` in order to be used. For example, the `Normalize` op above needs a feature-wise mean and standard deviation calculated from a `Dataset`, and the `Categorify` op needs to know all the categories used in a `Dataset` in order to map them to a contiguous integer. The `Stat`s required by an `Op` are available via its `stats_required` property:

In [11]:
print(workflow.ops[0].stats_required)

[<nv_tabular.stats.DLLabelEncoder object at 0x0000021B64F52F98>]


The important thing is that a `Stat` is, in-and-of-itself, stateless: it represents the means by which to calculate an actual value given a `Dataset`, but not the value itself. It is not inherently *tied* to that value, in the same way that a dataset schema is not *tied* to a particular `Dataset`.

The `StatsContext` object, on the other hand, is an object associated with a particular `Workflow` that maintains a state containing the actual values the `Workflow` can use. However, it is a distinct entity and the relationship is not bi-directional: while a `StatsContext` will be associated with *one* `Workflow`, which informs it which statistics it needs to calculate and maintain, a `Workflow` is agnostic to whatever `StatsContext` gets used during its `apply` method (assuming it has all the appropriate statistics associated with it). Let's see what this look like in code to clarify things a bit.

In [12]:
stats_context = nvt.stats.StatsContext(workflow)
stats_context.fit(dataset)
print(stats_context.state['categorify'][0]['location']['encoder'])

Berkeley         0
Copenhagen       1
Los Angeles      2
San Francisco    3
dtype: int32


Now that we have a workflow *and* the stats we need in order to apply it, we can map it to a dataset, which will apply the ops defined by `workflow` and parameterized by `stats_context` at each iteration.

In [13]:
dataset.map(workflow, stats_context=stats_context)

What's nice about this formulation is that it decouples the dataset from the operations we want to apply to it, and vice versa, so that any `Workflow` can be recycled to apply to any new dataset that comes along, and can be used with statistics from any dataset we've fit a `StatsContext` to in the past. Meanwhile, any dataset can be remapped to apply some reformulated `Workflow` with a totally different `StatsContext`.

In [14]:
next(iter(dataset))

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  self[k1] = value[k2]
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  self[k1] = value[k2]


Unnamed: 0,uid,iid,location,has_interacted_before,timestamp,user_age,item_average_rating,click,purchase
0,68,36,3,0,-0.915828,-1.737907,0.759322,0,1
1,3,17,1,0,1.232985,1.005439,1.054142,1,1
2,65,13,2,0,-0.481429,1.102022,0.236251,1,0
3,44,22,3,1,-1.561014,0.368152,-1.169607,1,1
4,90,45,3,1,1.603313,0.004558,1.163987,0,0
...,...,...,...,...,...,...,...,...,...
123,38,2,2,0,0.630838,-0.258639,1.193037,1,0
124,16,29,0,1,-0.538135,0.906346,0.068175,0,0
125,55,24,2,0,-1.339628,-0.258639,-0.096350,1,1
126,41,10,1,1,-0.682786,-1.542231,0.265505,0,0


# Writer
Now that we have a `Dataset` and have defined some transformations that we'd like to apply to it, we need to decide what to do with it. One route is to apply those transformations online during model training, but if our `Op`s are complex enough, that may end up bottlenecking performance. If we're confident that the `Workflow` we've defined is sufficiently robust that we'd like to use it for multiple training runs, it might make sense to transform this `Dataset` up front, possibly with some shuffling, and save it to disk to be read by our training runs later, which then won't need to do any preprocessing.

`Writer` objects are associated with a particular file pattern or location and save out transformed and shuffled `Dataset`s as parquet files.

In [15]:
writer = nvt.writer.Writer('this_data.parquet')
# I won't actually write because I don't have a parquet engine locally
# writer.write(dataset)

What if we want to apply our `Workflow` with the `StatsContext` we've just taken the time to compute online at training time? The fundamental Python objects on which these are built are very simple, and so we can use Python's `pickle` protocol to save them to disk and load them back in later to apply to any new dataset that comes our way.

In [16]:
import pickle
with open('workflow.pickle', 'wb') as f:
    pickle.dump(workflow, f)
with open('stats.pickle', 'wb') as f:
    pickle.dump(stats_context, f)
del workflow, stats_context, dataset

In [17]:
with open('workflow.pickle', 'rb') as f:
    workflow = pickle.load(f)
with open('stats.pickle', 'rb') as f:
    stats_context = pickle.load(f)

dataset = nvt.dataset('this_data.csv', batch_size=128)
dataset.map(workflow, stats_context=stats_context)
next(iter(dataset))

Unnamed: 0,uid,iid,location,has_interacted_before,timestamp,user_age,item_average_rating,click,purchase
0,68,36,3,0,-0.915828,-1.737907,0.759322,0,1
1,3,17,1,0,1.232985,1.005439,1.054142,1,1
2,65,13,2,0,-0.481429,1.102022,0.236251,1,0
3,44,22,3,1,-1.561014,0.368152,-1.169607,1,1
4,90,45,3,1,1.603313,0.004558,1.163987,0,0
...,...,...,...,...,...,...,...,...,...
123,38,2,2,0,0.630838,-0.258639,1.193037,1,0
124,16,29,0,1,-0.538135,0.906346,0.068175,0,0
125,55,24,2,0,-1.339628,-0.258639,-0.096350,1,1
126,41,10,1,1,-0.682786,-1.542231,0.265505,0,0


# All Together Now
Let's run that back in one place to see how this code looks all together

In [18]:
import nv_tabular as nvt

CATEGORICAL_COLUMNS = ['uid', 'iid', 'location', 'has_interacted_before']
CONTINUOUS_COLUMNS = ['timestamp', 'user_age', 'item_average_rating']
LABEL_COLUMNS = ['click', 'purchase']

workflow = nvt.ops.Workflow(
    cat_names=CATEGORICAL_COLUMNS,
    cont_names=CONTINUOUS_COLUMNS,
    label_names=LABEL_COLUMNS,
    ops=[nvt.ops.Categorify(columns=['uid', 'iid', 'location'], replace=True)]
)

log_and_normalize_workflow = nvt.ops.Workflow(ops=[nvt.ops.Log(), nvt.ops.Normalize()])
workflow = log_and_normalize_workflow(workflow)

dataset = nvt.dataset('this_data.csv', batch_size=128)
stats_context = nvt.stats.StatsContext(workflow)
stats_context.fit(dataset)
dataset.map(workflow, stats_context=stats_context)

writer = nvt.writer.Writer('this_data.parquet')
# writer.write(dataset)

with open('workflow.pickle', 'wb') as f:
    pickle.dump(workflow, f)
with open('stats.pickle', 'wb') as f:
    pickle.dump(stats_context, f)