In [256]:
from rexerclient.rql import *
from rexerclient import client
import time

In [257]:
#url = 'http://a3333e07f31614dd988930ad78f3f994-727687896.ap-south-1.elb.amazonaws.com/data'
url = 'http://ad3946bfc322c4a23ac0f2546f1f6ea9-559200516.ap-south-1.elb.amazonaws.com/data'
#url = 'http://localhost:2425'
c = client.Client(url)

In [258]:
import requests
import functools
c.http = requests.Session()
c.http.request = functools.partial(c.http.request, timeout=30)


In [259]:
start = time.time()
c.set_profile('unicorn', 12312, 'city', 'lahore')
print('duration: ', time.time() - start)

duration:  0.5895018577575684


In [260]:
    # feature 1 -- no of views on Trail longer than 10 sec in the last 30 days
q = Var('args').actions.apply(
    Ops.std.filter(where=it.action_type == 'view'),
    Ops.std.filter(where=it.metadata.watch_time >= 10),
    Ops.std.addField(name='groupkey', value=it.target_id),
)

options = {'aggregate_type': 'count', 'duration': 3600*24*30}
c.store_aggregate('trail_num_10sec_views_30days_v2', q, options)

In [261]:
# feature 2 -- for a userId Increase in the followers in last 7 & 28 days
q = Var('args').actions.apply(
    Ops.std.filter(where=it.action_type == 'follow'),
    Ops.std.addField(name='groupkey', value=it.target_id),
)

options_7day = {'aggregate_type': 'count', 'duration': 3600*24*7}
c.store_aggregate('user_num_follows_7day_v2', q, options_7day)

In [262]:
def store_aggregates(c):

    q = Var('args').actions.apply(
        Ops.std.filter(where=it.action_type == 'follow'),
        Ops.std.addField(name='groupkey', value=it.target_id),
    )



    options_28day = {'aggregate_type': 'count', 'duration': 3600*24*28}
    c.store_aggregate('user_num_follows_28day_v2', q, options_28day)

    # Feature 3 -- Avg-watchtime of a Trail for given country+OS+city_state+mobile_brand+gender at
    # {Sunday 11 to 12 am/current time} in the last 30 days

    q = Var('args').actions.apply(
        Ops.std.filter(where=it.action_type == 'view'),
        Ops.std.profile(field='country', otype='user', oid=it.actor_id, key='country'),
        Ops.std.profile(field='os', otype='user',oid=it.actor_id, key='os'),
        Ops.std.profile(field='city_state',otype='user',oid=it.actor_id, key='city_state'),
        Ops.std.profile(field='mobile_brand', otype='user', oid=it.actor_id, key='mobile_brand'),
        Ops.std.profile(field='gender', otype='user', oid=it.actor_id, key='gender'),
        Ops.time.addDayOfWeek(name='day_of_week', timestamp=it.timestamp),
        Ops.time.addTimeBucketOfDay(name='time_bucket', timestamp=it.timestamp, bucket=3600),
        Ops.std.addField(name='groupkey', value=[
            it.target_id, it.country, it.os, it.city_state, it.mobile_brand, it.gender, it.day_of_week, it.time_bucket
        ]),
        Ops.std.addField(name='value', value=it.metadata.watch_time)
    )

    options = {'aggregate_type': 'average', 'duration': 3600*24*30}
    c.store_aggregate('complex_feature_v2', q, options)

    # feature  4 -- total views gained by a Trail on last 2 days for given city+gender+age_group
    q = Var('args').actions.apply(
        Ops.std.filter(where=it.action_type == 'view'),
        Ops.std.profile(field='city', otype='user', oid=it.actor_id, key='city'),
        Ops.std.profile(field='gender', otype='user', oid=it.actor_id, key='gender'),
        Ops.std.profile(field='age_group', otype='user', oid=it.actor_id, key='age_group'),
        Ops.std.addField(name='groupkey', value=[it.target_id, it.city, it.gender, it.age_group]),
    )

    options = {'duration': 3600*24*2, 'aggregate_type': 'count', }
    c.store_aggregate('trail_view_by_city_gender_agegroup_2days_v2', q, options)

    # feature 5 - avg-watchtime of a user id  for creatorId in 2-hour window averaged over 30 days
    q = Var('args').actions.apply(
        Ops.std.filter(where=it.action_type == 'view'),
        Ops.std.profile(field='creator_id', otype='user', oid=it.actor_id, key='creatorId'),
        Ops.time.addTimeBucketOfDay(name='time_bucket', timestamp=it.timestamp, bucket=2*3600),
        Ops.std.addField(name='groupkey', value=[it.actor_id, it.creator_id]),
        Ops.std.addField(name='value', value=it.metadata.watch_time),
    )
    options = {'aggregate_type': 'average', 'duration': 3600*24*30}
    c.store_aggregate('user_creator_avg_watchtime_by_2hour_windows_30days_v2', q, options)



In [263]:
def set_profiles(c, profiles):
    for profile in profiles:
       c.set_profile(profile['otype'], profile['oid'], profile['key'], profile['value'])


In [264]:
# now let's store the feature definitions (more technically known as "aggregates") using the client
store_aggregates(c)

In [265]:

# some features need some profile data. Since we don't have all the profile data yet, we will just
# store some of it manually
uid = 123
trell_id = 567
creator_id = 789
city = 'delhi'
gender = 1
age_group = '18_24'
country = 'IN'
os = 'android'
city_state = 'dehradun_uttarakhand'
mobile_brand = 'samsung'

set_profiles(c, [
    {'otype':'user', 'oid':uid, 'key':'city', 'value':city},
    {'otype':'user', 'oid':uid, 'key':'gender', 'value':gender},
    {'otype':'user', 'oid':uid, 'key':'age_group', 'value':age_group},
    {'otype':'user', 'oid':uid, 'key':'country', 'value':country},
    {'otype':'user', 'oid':uid, 'key':'os', 'value':os},
    {'otype':'user', 'oid':uid, 'key':'city_state', 'value':city_state},
    {'otype':'user', 'oid':uid, 'key':'mobile_brand', 'value':mobile_brand},
    {'otype':'trell', 'oid':trell_id, 'key':'creatorId', 'value':creator_id},
])


# now log a few actions - as these actions are logged, they will be picked up by stored aggregates
# in near realtime (with a lag of maybe a couple of minutes) and processed

# this action denotes that user with id: uid watched trell for 31 seconds
# note that any action has actor_type, actor_id, target_type, target_id, action_type, and timestamp
# but we can log any additional data with metadata, which can be an arbitrary jsonable data
c.log(actor_type='user', actor_id=uid, target_type='trail', target_id=trell_id, action_type='view',
      timestamp=int(time.time()), request_id=1, metadata={'watch_time': 31})

# another view action, but this time watch time is < 10 sec
c.log(actor_type='user', actor_id=uid, target_type='trail', target_id=trell_id, action_type='view',
      timestamp=int(time.time()), request_id=1, metadata={'watch_time': 7})

# now this is a follow action. Metadata is optional so we don't pass it here
c.log(actor_type='user', actor_id=uid, target_type='user', target_id=creator_id, action_type='follow',
      timestamp=int(time.time()), request_id=1, metadata = {'watch_time': 25})



In [None]:
f1 = c.aggregate_value('user_num_follows_28day_v2', trell_id)
print(f1)
