Skip to content

Commit

Permalink
Add a new feature that allows extractor args to be specified at proce…
Browse files Browse the repository at this point in the history
…ss time
  • Loading branch information
JohnVinyard committed Oct 17, 2017
1 parent c7153f9 commit 8524c13
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 9 deletions.
2 changes: 1 addition & 1 deletion featureflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = '2.3.4'
__version__ = '2.4.4'

from model import BaseModel, ModelExistsError

Expand Down
9 changes: 9 additions & 0 deletions featureflow/eventlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ def gen():

return gen()

def unsubscribe(self):
raise NotImplementedError()

def publish(self, _id, message):
message = json.dumps({'_id': _id, 'message': message})
for generator in self.generators:
Expand All @@ -39,6 +42,9 @@ def __init__(self, channel, host='localhost', port=6379):
self.r = redis.StrictRedis(host=host, port=port)
self.p = self.r.pubsub(ignore_subscribe_messages=True)

def unsubscribe(self):
self.p.unsubscribe()

def subscribe(self, raise_when_empty=False):
if raise_when_empty:
raise NotImplementedError(
Expand Down Expand Up @@ -78,6 +84,9 @@ def append(self, data):
self.channel.publish(_id, data)
return _id

def unsubscribe(self):
self.channel.unsubscribe()

def subscribe(self, last_id='', raise_when_empty=False):
subscription = self.channel.subscribe(raise_when_empty=raise_when_empty)

Expand Down
23 changes: 18 additions & 5 deletions featureflow/feature.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from var import Var
from collections import OrderedDict
import inspect
from datawriter import DataWriter, StringIODataWriter
Expand Down Expand Up @@ -257,27 +258,39 @@ def _partial(self, _id, features=None, persistence=None):

return features

def _depends_on(self, _id, graph, persistence):
def _depends_on(self, _id, graph, persistence, **kwargs):
needs = OrderedDict()

for k, f in self.needs.iteritems():
if f.key in graph:
needs[k] = graph[f.key]
continue

e = f._build_extractor(_id, graph, persistence)
e = f._build_extractor(_id, graph, persistence, **kwargs)
needs[k] = e

return needs

def _build_extractor(self, _id, graph, persistence):
def _build_extractor(self, _id, graph, persistence, **kwargs):
try:
return graph[self.key]
except KeyError:
pass

needs = self._depends_on(_id, graph, persistence)
e = self.extractor(needs=needs, **self.extractor_args)
needs = self._depends_on(_id, graph, persistence, **kwargs)

extractor_args = dict()
for k, v in self.extractor_args.iteritems():
if isinstance(v, Var) and v.name == k:
try:
extractor_args[k] = kwargs[k]
except KeyError:
raise ValueError('{k} is a Var, but it was not provided'
.format(**locals()))
else:
extractor_args[k] = v

e = self.extractor(needs=needs, **extractor_args)

if isinstance(e, DecoderNode):
reader = self.reader(_id, self.key, persistence)
Expand Down
11 changes: 8 additions & 3 deletions featureflow/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ def __getattribute__(self, key):
return decoded

@classmethod
def _build_extractor(cls, _id):
def _build_extractor(cls, _id, **kwargs):
g = Graph()
for feature in cls.features.itervalues():
feature._build_extractor(_id, g, cls)
feature._build_extractor(_id, g, cls, **kwargs)
return g

@classmethod
Expand Down Expand Up @@ -125,7 +125,12 @@ def process(cls, raise_if_exists=False, **kwargs):
'{_id} is already stored in the database'.format(
**locals()))

graph = cls._build_extractor(_id)
try:
del kwargs['_id']
except KeyError:
pass

graph = cls._build_extractor(_id, **kwargs)
graph.remove_dead_nodes(cls.features.itervalues())
try:
graph.process(**kwargs)
Expand Down
22 changes: 22 additions & 0 deletions featureflow/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import sys
import time

from var import Var
from extractor import NotEnoughData, Aggregator, Node, InvalidProcessMethod
from iteratornode import IteratorNode
from model import BaseModel, NoPersistenceSettingsError, ModelExistsError
Expand Down Expand Up @@ -1259,6 +1260,27 @@ class Numbers(BaseModel, self.Settings):
doc = Numbers(_id)
self.assertEqual('2468101214161820', doc.sumup.read())

def test_feature_that_takes_a_variable(self):

class Numbers(BaseModel, self.Settings):
stream = Feature(NumberStream, store=False)
add1 = Feature(Add, needs=stream, store=False, rhs=Var('rhs'))
stringify = Feature(
lambda x: ''.join(map(str, x)), needs=add1, store=True)

_id = Numbers.process(stream='numbers', rhs=2)
doc = Numbers(_id)
self.assertEqual('234567891011', doc.stringify.read())

def test_raises_if_necessary_variable_is_not_provided(self):
class Numbers(BaseModel, self.Settings):
stream = Feature(NumberStream, store=False)
add1 = Feature(Add, needs=stream, store=False, rhs=Var('rhs'))
stringify = Feature(
lambda x: ''.join(map(str, x)), needs=add1, store=True)

self.assertRaises(ValueError, lambda: Numbers.process(stream='numbers'))

def test_feature_with_multiple_inputs_using_a_tuple(self):
class Numbers(BaseModel, self.Settings):
stream = Feature(NumberStream, store=False)
Expand Down
5 changes: 5 additions & 0 deletions featureflow/var.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@

class Var(object):
def __init__(self, name):
super(Var, self).__init__()
self.name = name

0 comments on commit 8524c13

Please sign in to comment.