In [1]:
# Second example notebook for the ODSC West 2023 Workship:
# https://odsc.com/speakers/using-graphs-for-large-feature-engineering-pipelines/

In [1]:
import datetime

import pandas as pd

from graphreduce.node import GraphReduceNode
from graphreduce.graph_reduce import GraphReduce
from graphreduce.enum import ComputeLayerEnum, PeriodUnit

In [2]:
# In this notebook we'll look at constructing a slightly larger
# graph reduce pipeline.  This time we'll use 7 tables.

In [3]:
class CustomerNode(GraphReduceNode):
    def do_annotate(self):
        self.df[self.colabbr('name_length')] = self.df[self.colabbr('name')].apply(lambda x: len(x))
    
    def do_filters(self):
        pass
    
    def do_normalize(self):
        pass
    
    def do_post_join_annotate(self):
        pass
    
    def do_reduce(self, reduce_key, *args, **kwargs):
        pass
    
    def do_labels(self, reduce_key, *args, **kwargs):
        pass
    
    

In [4]:
class OrderNode(GraphReduceNode):
    def do_annotate(self):
        pass
    
    def do_filters(self):
        pass
    
    def do_normalize(self):
        pass
    
    def do_post_join_annotate(self):
        pass
    
    def do_reduce(self, reduce_key):
        return self.prep_for_features().groupby(self.colabbr(reduce_key)).agg(
            **{
                self.colabbr(f'{self.pk}_count') : pd.NamedAgg(column=self.colabbr(self.pk), aggfunc='count'),
                self.colabbr(f'amount_sum'): pd.NamedAgg(column=self.colabbr('amount'), aggfunc='sum')
            }
        ).reset_index()
    
    def do_labels(self, reduce_key):
        return self.prep_for_labels().groupby(self.colabbr(reduce_key)).agg(
            **{
                self.colabbr(f'{self.pk}_had_order') : pd.NamedAgg(column=self.colabbr(self.pk), aggfunc='count')
            }
        ).reset_index()

In [5]:
class OrderProductNode(GraphReduceNode):
    def do_annotate(self):
        pass
    
    def do_filters(self):
        pass
    
    def do_normalize(self):
        pass
    
    def do_post_join_annotate(self):
        pass
    
    def do_reduce(self, reduce_key):
        return self.prep_for_features().groupby(self.colabbr(reduce_key)).agg(
            **{
                self.colabbr(f'{self.pk}_count') : pd.NamedAgg(column=self.colabbr(self.pk), aggfunc='count')
            }
        ).reset_index()
    
    def do_labels(self, key):
        pass

In [6]:
class OrderEventNode(GraphReduceNode):
    def do_annotate(self):
        pass
    
    def do_filters(self):
        pass
    
    def do_normalize(self):
        pass
    
    def do_post_join_annotate(self):
        pass
    
    def do_reduce(self, reduce_key, *args, **kwargs):
        return self.prep_for_features().groupby(self.colabbr(reduce_key)).agg(
            **{
                self.colabbr(f'{self.pk}_count') : pd.NamedAgg(column=self.colabbr(self.pk), aggfunc='count')
            }
        ).reset_index()
    
    def do_labels(self, key):
        pass

In [7]:
class NotificationNode(GraphReduceNode):
    def do_annotate(self):
        pass
    
    def do_filters(self):
        pass
    
    def do_normalize(self):
        pass

    def do_post_join_annotate(self):
        pass
    
    def do_reduce(self, reduce_key, *args, **kwargs):
        df = self.prep_for_features().groupby(self.colabbr(reduce_key)).agg(
            **{
                self.colabbr(f'{self.pk}_count') : pd.NamedAgg(column=self.colabbr(self.pk), aggfunc='count'),
                #'ni_num_notification_interactions' : pd.NamedAgg(column='ni_num_interactions', aggfunc='sum')
            }
        ).reset_index()
        return df
        
    
    def do_labels(self, key):
        pass

In [8]:
class NotificationInteractionNode(GraphReduceNode):
    def do_annotate(self):
        self.df[self.colabbr(self.date_key)] = self.df[self.colabbr(self.date_key)].apply(
            lambda x: datetime.datetime.strptime(x, '%Y-%m-%d'))
    
    def do_filters(self):
        pass
    
    def do_normalize(self):
        pass
    
    def do_post_join_annotate(self):
        pass
    
    def do_reduce(self, 
                  reduce_key : str,
                  additional_agg : dict = {}
                 ):
        
        return self.prep_for_features().groupby(self.colabbr(reduce_key)).agg(
            **{
                self.colabbr(f'{self.pk}_counts') : pd.NamedAgg(column=self.colabbr(self.pk), aggfunc='count'),
                self.colabbr(f'{self.pk}_min') : pd.NamedAgg(column=self.colabbr(self.pk), aggfunc='min'),
                self.colabbr(f'{self.pk}_min'): pd.NamedAgg(column=self.colabbr(self.pk), aggfunc='max'),
                self.colabbr(f'num_interactions') : pd.NamedAgg(column=self.colabbr(self.pk), aggfunc='count'),

            }
        ).reset_index()
    
    def do_labels(self, reduce_key, *args, **kwargs):
        label_df = self.prep_for_labels().groupby(self.colabbr(reduce_key)).agg(
            **{
                # add a label / target for a model predicting the number of interactions
                self.colabbr(f'{self.pk}_num_interactions') : pd.NamedAgg(column=self.colabbr(self.pk), aggfunc='count'),
            }
        ).reset_index()
        return label_df

In [9]:
class ProductNode(GraphReduceNode):
    def do_annotate(self):
        pass
    
    def do_filters(self):
        pass
    
    def do_normalize(self):
        pass
    
    def do_post_join_annotate(self):
        pass
    
    def do_reduce(self, 
                  reduce_key : str,
                 ):
        pass
    
    def do_labels(self, reduce_key, *args, **kwargs):
        pass

In [17]:
cust = CustomerNode(pk='id', prefix='cust',fpath='dat/cust.csv', fmt='csv')
order = OrderNode(pk='id', prefix='order', fpath='dat/orders.csv', fmt='csv', date_key='ts')
order_event = OrderEventNode(pk='id', prefix='oe',fpath='dat/order_events.csv', fmt='csv')
order_product = OrderProductNode(pk='id', prefix='op',fpath='dat/order_products.csv', fmt='csv')
notification = NotificationNode(pk='id', prefix='no', fpath='dat/notifications.csv', fmt='csv', date_key='ts')
notification_interaction = NotificationInteractionNode(pk='id', prefix='ni',fpath='dat/notification_interactions.csv', fmt='csv',
                                                      date_key='ts')
product = ProductNode(pk='id',prefix='prod',fpath='dat/products.csv',fmt='csv')



In [18]:
gr = GraphReduce(
    name='odsc_example_2',
    parent_node=cust,
    cut_date=datetime.datetime(2023, 5, 6),
    compute_period_val=365,
    compute_period_unit=PeriodUnit.day,
    compute_layer=ComputeLayerEnum.pandas,
    has_labels=False,
    label_period_val=30,
    label_period_unit=PeriodUnit.day
)

In [19]:
gr.add_node(cust)
gr.add_node(order)
gr.add_node(order_event)
gr.add_node(order_product)
gr.add_node(notification)
gr.add_node(notification_interaction)
gr.add_node(product)

In [20]:
gr.add_entity_edge(
    parent_node=cust,
    relation_node=order,
    parent_key='id',
    relation_key='customer_id',
    reduce=True
)

gr.add_entity_edge(
    parent_node=order,
    relation_node=order_product,
    parent_key='id',
    relation_key='order_id',
    reduce=True
)

gr.add_entity_edge(
    parent_node=order,
    relation_node=order_event,
    parent_key='id',
    relation_key='order_id',
    reduce=True
)


gr.add_entity_edge(
    parent_node=cust,
    relation_node=notification,
    parent_key='id',
    relation_key='customer_id',
    reduce=True
)

gr.add_entity_edge(
    parent_node=notification,
    relation_node=notification_interaction,
    parent_key='id',
    relation_key='notification_id',
    reduce=True
)

gr.add_entity_edge(
    parent_node=order_product,
    relation_node=product,
    parent_key='product_id',
    relation_key='id',
    reduce=True
)

In [21]:
#gr.plot_graph('odsc_graph2.html')

2023-10-25 18:54:36 [info     ] plotted graph at odsc_graph2.html


In [34]:
#!open odsc_graph2.html

In [23]:
gr.do_transformations()

2023-10-25 18:54:40 [info     ] hydrating graph attributes
2023-10-25 18:54:40 [info     ] hydrating attributes for CustomerNode
2023-10-25 18:54:40 [info     ] hydrating attributes for OrderNode
2023-10-25 18:54:40 [info     ] hydrating attributes for OrderEventNode
2023-10-25 18:54:40 [info     ] hydrating attributes for OrderProductNode
2023-10-25 18:54:40 [info     ] hydrating attributes for NotificationNode
2023-10-25 18:54:40 [info     ] hydrating attributes for NotificationInteractionNode
2023-10-25 18:54:40 [info     ] hydrating attributes for ProductNode
2023-10-25 18:54:40 [info     ] hydrating graph data
2023-10-25 18:54:40 [info     ] checking for prefix uniqueness
2023-10-25 18:54:40 [info     ] running filters, normalize, and annotations for <GraphReduceNode: fpath=dat/cust.csv fmt=csv>
2023-10-25 18:54:40 [info     ] running filters, normalize, and annotations for <GraphReduceNode: fpath=dat/orders.csv fmt=csv>
2023-10-25 18:54:40 [info     ] running filters, normalize, 

In [24]:
gr.parent_node.df

Unnamed: 0,cust_id,cust_name,cust_name_length,no_customer_id,no_id_count,order_customer_id,order_id_count,order_amount_sum
0,1,wes,3,1,3,,,
1,2,john,4,2,3,2.0,2.0,250.0


In [27]:
# let's aggregate all of the features by
# using dynamic propagation

In [29]:
cust = CustomerNode(pk='id', prefix='cust',fpath='dat/cust.csv', fmt='csv')
order = OrderNode(pk='id', prefix='order', fpath='dat/orders.csv', fmt='csv', date_key='ts')
order_event = OrderEventNode(pk='id', prefix='oe',fpath='dat/order_events.csv', fmt='csv')
order_product = OrderProductNode(pk='id', prefix='op',fpath='dat/order_products.csv', fmt='csv')
notification = NotificationNode(pk='id', prefix='no', fpath='dat/notifications.csv', fmt='csv', date_key='ts')
notification_interaction = NotificationInteractionNode(pk='id', prefix='ni',fpath='dat/notification_interactions.csv', fmt='csv',
                                                      date_key='ts')
product = ProductNode(pk='id',prefix='prod',fpath='dat/products.csv',fmt='csv')


gr = GraphReduce(
    name='odsc_example_2',
    parent_node=cust,
    cut_date=datetime.datetime(2023, 5, 6),
    compute_period_val=365,
    compute_period_unit=PeriodUnit.day,
    compute_layer=ComputeLayerEnum.pandas,
    has_labels=True,
    label_period_val=30,
    label_period_unit=PeriodUnit.day,
    dynamic_propagation=True
)

gr.add_node(cust)
gr.add_node(order)
gr.add_node(order_event)
gr.add_node(order_product)
gr.add_node(notification)
gr.add_node(notification_interaction)
gr.add_node(product)

gr.add_entity_edge(
    parent_node=cust,
    relation_node=order,
    parent_key='id',
    relation_key='customer_id',
    reduce=True
)

gr.add_entity_edge(
    parent_node=order,
    relation_node=order_product,
    parent_key='id',
    relation_key='order_id',
    reduce=True
)

gr.add_entity_edge(
    parent_node=order,
    relation_node=order_event,
    parent_key='id',
    relation_key='order_id',
    reduce=True
)


gr.add_entity_edge(
    parent_node=cust,
    relation_node=notification,
    parent_key='id',
    relation_key='customer_id',
    reduce=True
)

gr.add_entity_edge(
    parent_node=notification,
    relation_node=notification_interaction,
    parent_key='id',
    relation_key='notification_id',
    reduce=True
)

gr.add_entity_edge(
    parent_node=order_product,
    relation_node=product,
    parent_key='product_id',
    relation_key='id',
    reduce=True
)



In [30]:
gr.do_transformations()

2023-10-25 22:21:02 [info     ] hydrating graph attributes
2023-10-25 22:21:02 [info     ] hydrating attributes for CustomerNode
2023-10-25 22:21:02 [info     ] hydrating attributes for OrderNode
2023-10-25 22:21:02 [info     ] hydrating attributes for OrderEventNode
2023-10-25 22:21:02 [info     ] hydrating attributes for OrderProductNode
2023-10-25 22:21:02 [info     ] hydrating attributes for NotificationNode
2023-10-25 22:21:02 [info     ] hydrating attributes for NotificationInteractionNode
2023-10-25 22:21:02 [info     ] hydrating attributes for ProductNode
2023-10-25 22:21:02 [info     ] hydrating graph data
2023-10-25 22:21:02 [info     ] checking for prefix uniqueness
2023-10-25 22:21:02 [info     ] running filters, normalize, and annotations for <GraphReduceNode: fpath=dat/cust.csv fmt=csv>
2023-10-25 22:21:02 [info     ] running filters, normalize, and annotations for <GraphReduceNode: fpath=dat/orders.csv fmt=csv>
2023-10-25 22:21:02 [info     ] running filters, normalize, 

In [31]:
gr.parent_node.df

Unnamed: 0,cust_id,cust_name,cust_name_length,no_customer_id,no_id_count,no_id_min,no_id_max,no_id_sum,no_customer_id_min,no_customer_id_max,...,prod_id_sum_max_min,prod_id_sum_max_max,prod_id_sum_max_sum,prod_id_sum_sum_min,prod_id_sum_sum_max,prod_id_sum_sum_sum,prod_name_first_first_first,prod_category_first_first_first,order_customer_id_dupe,order_id_had_order
0,1,wes,3,1,3,101,103,306,1,1,...,,,,,,,,,1.0,2.0
1,2,john,4,2,3,107,109,324,2,2,...,4.0,4.0,8.0,10.0,10.0,20.0,butter,food,,


In [32]:
pd.set_option('display.max_columns', 200)

In [33]:
gr.parent_node.df

Unnamed: 0,cust_id,cust_name,cust_name_length,no_customer_id,no_id_count,no_id_min,no_id_max,no_id_sum,no_customer_id_min,no_customer_id_max,no_customer_id_sum,no_ts_first,ni_notification_id_min,ni_notification_id_max,ni_notification_id_sum,ni_id_counts_min,ni_id_counts_max,ni_id_counts_sum,ni_id_min_min,ni_id_min_max,ni_id_min_sum,ni_num_interactions_min,ni_num_interactions_max,ni_num_interactions_sum,ni_id_min_dupe_min,ni_id_min_dupe_max,ni_id_min_dupe_sum,ni_id_max_min,ni_id_max_max,ni_id_max_sum,ni_id_sum_min,ni_id_sum_max,ni_id_sum_sum,ni_notification_id_min_min,ni_notification_id_min_max,ni_notification_id_min_sum,ni_notification_id_max_min,ni_notification_id_max_max,ni_notification_id_max_sum,ni_notification_id_sum_min,ni_notification_id_sum_max,ni_notification_id_sum_sum,ni_interaction_type_id_min_min,ni_interaction_type_id_min_max,ni_interaction_type_id_min_sum,ni_interaction_type_id_max_min,ni_interaction_type_id_max_max,ni_interaction_type_id_max_sum,ni_interaction_type_id_sum_min,ni_interaction_type_id_sum_max,ni_interaction_type_id_sum_sum,ni_notification_id_dupe_min,ni_notification_id_dupe_max,ni_notification_id_dupe_sum,ni_id_num_interactions_min,ni_id_num_interactions_max,ni_id_num_interactions_sum,order_customer_id,order_id_count,order_amount_sum,order_id_min,order_id_max,order_id_sum,order_customer_id_min,order_customer_id_max,order_customer_id_sum,order_ts_first,order_amount_min,order_amount_max,order_amount_sum_dupe,oe_order_id_min,oe_order_id_max,oe_order_id_sum,oe_id_count_min,oe_id_count_max,oe_id_count_sum,oe_id_min_min,oe_id_min_max,oe_id_min_sum,oe_id_max_min,oe_id_max_max,oe_id_max_sum,oe_id_sum_min,oe_id_sum_max,oe_id_sum_sum,oe_order_id_min_min,oe_order_id_min_max,oe_order_id_min_sum,oe_order_id_max_min,oe_order_id_max_max,oe_order_id_max_sum,oe_order_id_sum_min,oe_order_id_sum_max,oe_order_id_sum_sum,oe_event_id_min_min,oe_event_id_min_max,oe_event_id_min_sum,oe_event_id_max_min,oe_event_id_max_max,oe_event_id_max_sum,oe_event_id_sum_min,oe_event_id_sum_max,oe_event_id_sum_sum,op_order_id_min,op_order_id_max,op_order_id_sum,op_id_count_min,op_id_count_max,op_id_count_sum,op_id_min_min,op_id_min_max,op_id_min_sum,op_id_max_min,op_id_max_max,op_id_max_sum,op_id_sum_min,op_id_sum_max,op_id_sum_sum,op_order_id_min_min,op_order_id_min_max,op_order_id_min_sum,op_order_id_max_min,op_order_id_max_max,op_order_id_max_sum,op_order_id_sum_min,op_order_id_sum_max,op_order_id_sum_sum,op_product_id_min_min,op_product_id_min_max,op_product_id_min_sum,op_product_id_max_min,op_product_id_max_max,op_product_id_max_sum,op_product_id_sum_min,op_product_id_sum_max,op_product_id_sum_sum,prod_id_min_min,prod_id_min_max,prod_id_min_sum,prod_id_max_min,prod_id_max_max,prod_id_max_sum,prod_id_sum_min,prod_id_sum_max,prod_id_sum_sum,prod_id_min_min_min,prod_id_min_min_max,prod_id_min_min_sum,prod_id_min_max_min,prod_id_min_max_max,prod_id_min_max_sum,prod_id_min_sum_min,prod_id_min_sum_max,prod_id_min_sum_sum,prod_id_max_min_min,prod_id_max_min_max,prod_id_max_min_sum,prod_id_max_max_min,prod_id_max_max_max,prod_id_max_max_sum,prod_id_max_sum_min,prod_id_max_sum_max,prod_id_max_sum_sum,prod_id_sum_min_min,prod_id_sum_min_max,prod_id_sum_min_sum,prod_id_sum_max_min,prod_id_sum_max_max,prod_id_sum_max_sum,prod_id_sum_sum_min,prod_id_sum_sum_max,prod_id_sum_sum_sum,prod_name_first_first_first,prod_category_first_first_first,order_customer_id_dupe,order_id_had_order
0,1,wes,3,1,3,101,103,306,1,1,3,2022-08-05,101.0,103.0,306.0,1.0,3.0,7.0,1002.0,1006.0,3013.0,1.0,3.0,7.0,1000.0,1006.0,3009.0,1002.0,1006.0,3013.0,1006.0,3012.0,7021.0,101.0,103.0,306.0,101.0,103.0,306.0,103.0,306.0,712.0,1500.0,1500.0,4500.0,1500.0,1700.0,4900.0,1500.0,4800.0,11100.0,103.0,103.0,103.0,1.0,1.0,1.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,1.0,2.0
1,2,john,4,2,3,107,109,324,2,2,6,2022-09-05,,,0.0,,,0.0,,,0.0,,,0.0,,,0.0,,,0.0,,,0.0,,,0.0,,,0.0,,,0.0,,,0.0,,,0.0,,,0.0,107.0,107.0,107.0,1.0,1.0,1.0,2.0,2.0,250.0,3.0,4.0,7.0,2.0,2.0,4.0,2023-01-01,100.0,150.0,250.0,3.0,3.0,3.0,6.0,6.0,6.0,21.0,21.0,21.0,26.0,26.0,26.0,141.0,141.0,141.0,3.0,3.0,3.0,3.0,3.0,3.0,18.0,18.0,18.0,21.0,21.0,21.0,26.0,26.0,26.0,141.0,141.0,141.0,3.0,4.0,7.0,4.0,4.0,8.0,9.0,13.0,22.0,12.0,16.0,28.0,42.0,58.0,100.0,3.0,4.0,7.0,3.0,4.0,7.0,12.0,16.0,28.0,1.0,1.0,2.0,4.0,4.0,8.0,10.0,10.0,20.0,1.0,1.0,2.0,4.0,4.0,8.0,10.0,10.0,20.0,1.0,1.0,2.0,4.0,4.0,8.0,10.0,10.0,20.0,1.0,1.0,2.0,4.0,4.0,8.0,10.0,10.0,20.0,1.0,1.0,2.0,4.0,4.0,8.0,10.0,10.0,20.0,butter,food,,
