# Introduction 
This tutorial will go through how to setup a featurization pipeline in `ralf`. We'll setup a pipeline for computing user features given a data stream of user ratings. We'll then query the user features to predict the rating a user will give a movie. 

To do so, we'll do the following: 
1. Create feature tables from the movie lens dataset which are incrementally maintained by `ralf`
2. Create a ralf client which queries the feature tables 
3. Implement load shedding policies to reduce feature computation cost

In [1]:
!pip uninstall -y ralf && pip install git+https://github.com/feature-store/ralf.git@api-for-tutorial

Found existing installation: ralf 0.0.1
Uninstalling ralf-0.0.1:
  Successfully uninstalled ralf-0.0.1
Collecting git+https://github.com/feature-store/ralf.git@api-for-tutorial
  Cloning https://github.com/feature-store/ralf.git (to revision api-for-tutorial) to /tmp/pip-req-build-0pftqwst
  Running command git clone -q https://github.com/feature-store/ralf.git /tmp/pip-req-build-0pftqwst
  Running command git checkout -b api-for-tutorial --track origin/api-for-tutorial
  Switched to a new branch 'api-for-tutorial'
  Branch 'api-for-tutorial' set up to track remote branch 'api-for-tutorial' from 'origin'.
Building wheels for collected packages: ralf
  Building wheel for ralf (setup.py) ... [?25l[?25hdone
  Created wheel for ralf: filename=ralf-0.0.1-py3-none-any.whl size=16461 sha256=a7085b1cd5ef0f33714df533f0574a43e453904e886e82d639535ae695d7ff30
  Stored in directory: /tmp/pip-ephem-wheel-cache-q1b8wcqn/wheels/d1/cd/60/38fb8c5337bf668a95de0d6476d458cb419246dfc6e11671d2
Successfully

# Creating a featurization pipeline 
We create a instance of ralf to that we can start creating tables. 

In [2]:
from ralf import Ralf

In [3]:
ralf_server = Ralf()

Storing operators metrics at /tmp/ralf/1636007707_metrics


### Creating Source Tables
Source tables define the raw data sources that are run through ralf to become features. `ralf` lets you create both static batch (e.g. from a CSV) and dynamic streaming sources (e.g. from Kafka). 

To define a source, we implement a `SourceOperator`. 

In [4]:
from ralf.operators.source import SourceOperator
from ralf import Record
import random
import time

In [5]:
class RatingsSource(SourceOperator):
    def __init__(self, schema, kafka_topic):
        self.topic = kafka_topic

        super().__init__(schema)

    def next(self):
        time.sleep(0.01)
        user_id = random.randint(1, 10)
        movie_id = random.randint(100, 200)
        rating = random.randint(1, 5)
        return [Record(user=str(user_id), movie=movie_id, rating=rating)]

We specify a schema using ralf's `Schema` object. 

In [6]:
from ralf import Schema

source_schema = Schema(
    primary_key="user", columns={"user": str, "movie": int, "rating": float}
)

We can now add the source to our ralf instance. 

In [7]:
source = ralf_server.create_source(RatingsSource, args=(source_schema, "ratings_topic"))

### Creating Feature Tables 
Now that we have data streaming into ralf through the source table, we can define derived feature tables from the source table. 

Feature tables follow an API similar to pandas dataframes. We define feature tables in terms of 1-2 parent tables and an operator which specifies how to transform parent data. 


For example, we can calculate the average rating for each user with an `AverageRating` operator: 

In [8]:
from collections import defaultdict
import numpy as np

from ralf import Operator, Record

In [9]:
class AverageRating(Operator):
    def __init__(self, schema):
        self.user_ratings = defaultdict(list)

        super().__init__(schema)

    def on_record(self, record: Record):
        self.user_ratings[record.user].append(record.rating)
        ratings = np.array(self.user_ratings[record.user])
        output_record = Record(user=record.user, average=ratings.mean())
        return output_record  

The `AverageRating` operator can be used to define a feature table containing the average rating for each user. 

In [10]:
average_rating_schema = Schema(
    primary_key="user", columns={"user": str, "average": float}
)
average_rating = source.map(AverageRating, args=(average_rating_schema,))

### Adding Processing Policies
In many cases, we may only need to sub-sample some of the data to get the features we need. We can add a simple load shedding policy to the `average_rating` table. 

In [11]:
from ralf import LoadSheddingPolicy, Record

In [12]:
class SampleHalf(LoadSheddingPolicy):
    
    def process(self, candidate_record: Record, current_record: Record) -> bool:
        return random.random() < 0.5

average_rating.add_load_shedding(SampleHalf)
average_rating.as_queryable("average")

Table(AverageRating)

In [13]:
ralf_server.run()

{'Table(AverageRating)': {'actor_pool_size': 1,
                          'actor_state': [{'cache_size': 0,
                                           'lazy': False,
                                           'process': {'cpu_percent': 2.3,
                                                       'memory_mb': 102.5546875},
                                           'queue_size': {},
                                           'table': {'num_deletes': 0,
                                                     'num_records': 0,
                                                     'num_updates': 0},
                                           'thread_pool_size': 4}],
                          'children': [],
                          'is_source': False,
                          'operator_args': ['<ralf.state.Schema object at '
                                            '0x7f7d7aca9d50>'],
                          'operator_kwargs': {},
                          'operator_name': 'Table(Average

[2m[36m(pid=1460)[0m 2021-11-04 06:35:22,725	INFO checkpoint_path.py:15 -- Using RayInternalKVStore for controller checkpoint and recovery.
[2m[36m(pid=1460)[0m 2021-11-04 06:35:22,730	INFO http_state.py:101 -- Starting HTTP proxy with name 'SERVE_CONTROLLER_ACTOR:PDeMtb:SERVE_PROXY_ACTOR-node:172.28.0.2-0' on node 'node:172.28.0.2-0' listening on '127.0.0.1:8000'
2021-11-04 06:35:23,605	INFO api.py:441 -- Started Serve instance in namespace 'a4268eab-789d-4b73-a75e-c280d6828003'.
[2m[36m(pid=1489)[0m INFO:     Started server process [1489]
2021-11-04 06:35:23,648	INFO api.py:240 -- Updating deployment 'QueryableServer'. component=serve deployment=QueryableServer
[2m[36m(pid=1460)[0m 2021-11-04 06:35:23,688	INFO backend_state.py:910 -- Adding 1 replicas to deployment 'QueryableServer'. component=serve deployment=QueryableServer
2021-11-04 06:35:24,655	INFO api.py:248 -- Deployment 'QueryableServer' is ready at `http://127.0.0.1:8000/`. component=serve deployment=QueryableSe

## Creating a `ralf` Client 
Now that we have a simple pipeline, we can query the ralf server for features. 

In [14]:
from ralf import RalfClient
ralf_client = RalfClient()

In [15]:
ralf_client.point_query(table_name="average", key=1)

querying http://localhost:8000/table/average/1...


{'average': 3.2222222222222223, 'user': '1'}

In [16]:
ralf_client.bulk_query(table_name="average")

querying http://localhost:8000/table/average...


[{'average': 3.206896551724138, 'user': '1'},
 {'average': 3.0, 'user': '2'},
 {'average': 3.1666666666666665, 'user': '8'},
 {'average': 3.3125, 'user': '6'},
 {'average': 3.225806451612903, 'user': '10'},
 {'average': 3.2962962962962963, 'user': '7'},
 {'average': 3.0, 'user': '4'},
 {'average': 3.081081081081081, 'user': '9'},
 {'average': 2.78125, 'user': '3'},
 {'average': 3.0, 'user': '5'}]

# Advanced: Maintaining user vectors 
Now that we've setup a simple feature table and run some queries, we can create a more realistic feature table: a user vector representing their movie tastes. 

In this example, we'll assume we already have pre-computed movie vectors which are held constant. User vectors are updated over time as new rating information is recieved. 

In [17]:
import sys
!{sys.executable} -m pip install pandas



In [18]:
!wget https://raw.githubusercontent.com/feature-store/risecamp-2021/main/user_active_time.csv
!wget https://raw.githubusercontent.com/feature-store/risecamp-2021/main/movie_vectors.csv

--2021-11-04 06:35:38--  https://raw.githubusercontent.com/feature-store/risecamp-2021/main/user_active_time.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.111.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 80 [text/plain]
Saving to: ‘user_active_time.csv.1’


2021-11-04 06:35:38 (3.65 MB/s) - ‘user_active_time.csv.1’ saved [80/80]

--2021-11-04 06:35:38--  https://raw.githubusercontent.com/feature-store/risecamp-2021/main/movie_vectors.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 5201 (5.1K) [text/plain]
Saving to: ‘movie_vectors.csv.1’


2021-11-04 06:35:38 (38.2 MB/

In [19]:
import pandas as pd

class UserVector(Operator):
    
    def __init__(self, schema, movie_vectors_file): 
        self.user_ratings = {}
        self.movie_vectors = pd.read_csv(movie_vectors_file)
        
        super().__init__(schema)
    
    def on_record(self, record: Record):
        # TODO: add ALS thing 
        output_record = Record(user=record.user, user_vector=np.array([1, 2, 3]))
        return output_record  
    
user_schema = Schema(
    primary_key="user", columns={"user": str, "user_vector": np.array}
)
user_vectors = source.map(UserVector, args=(user_schema, "movie_vectors.csv"))
user_vectors.as_queryable("user_vectors")

Table(UserVector)

In [20]:
ralf_server.run()

2021-11-04 06:35:45,058	INFO api.py:393 -- Connecting to existing Serve instance in namespace 'a4268eab-789d-4b73-a75e-c280d6828003'.
2021-11-04 06:35:45,128	INFO api.py:240 -- Updating deployment 'QueryableServer'. component=serve deployment=QueryableServer


{'Table(AverageRating)': {'actor_pool_size': 1,
                          'actor_state': [{'cache_size': 0,
                                           'lazy': False,
                                           'process': {'cpu_percent': 19.3,
                                                       'memory_mb': 105.7421875},
                                           'queue_size': {'1': 0,
                                                          '10': 0,
                                                          '2': 0,
                                                          '3': 0,
                                                          '4': 0,
                                                          '5': 0,
                                                          '6': 0,
                                                          '7': 0,
                                                          '8': 0,
                                                          '9': 0},
              

[2m[36m(pid=1460)[0m 2021-11-04 06:35:45,204	INFO backend_state.py:872 -- Stopping 1 replicas of deployment 'QueryableServer' with outdated versions. component=serve deployment=QueryableServer
[2m[36m(pid=1517)[0m 2021-11-04 06:35:47,207	ERROR replica.py:372 -- Exception during graceful shutdown of replica: This event loop is already running component=serve deployment=QueryableServer replica=QueryableServer#ZxCHRe
[2m[36m(pid=1517)[0m Traceback (most recent call last):
[2m[36m(pid=1517)[0m   File "/usr/local/lib/python3.7/dist-packages/ray/serve/replica.py", line 369, in prepare_for_shutdown
[2m[36m(pid=1517)[0m     self.callable.__del__()
[2m[36m(pid=1517)[0m   File "/usr/local/lib/python3.7/dist-packages/ray/serve/api.py", line 591, in __del__
[2m[36m(pid=1517)[0m     self._serve_asgi_lifespan.shutdown())
[2m[36m(pid=1517)[0m   File "/usr/lib/python3.7/asyncio/base_events.py", line 563, in run_until_complete
[2m[36m(pid=1517)[0m     self._check_runnung()
[

In [21]:
ralf_client.bulk_query(table_name="user_vectors")

querying http://localhost:8000/table/user_vectors...


[{'user': '3', 'user_vector': [1, 2, 3]},
 {'user': '9', 'user_vector': [1, 2, 3]},
 {'user': '7', 'user_vector': [1, 2, 3]},
 {'user': '8', 'user_vector': [1, 2, 3]},
 {'user': '4', 'user_vector': [1, 2, 3]},
 {'user': '2', 'user_vector': [1, 2, 3]},
 {'user': '5', 'user_vector': [1, 2, 3]},
 {'user': '10', 'user_vector': [1, 2, 3]},
 {'user': '1', 'user_vector': [1, 2, 3]},
 {'user': '6', 'user_vector': [1, 2, 3]}]

## Prioritizing Active Users 
Ralf allows for key-level prioritization policies. Say that we want to prioritize computing updates to user vectors for users who especially active. We can use activity data to implement a prioritized lottery scheduling policy. 

In [22]:
user_activity = pd.read_csv("user_active_time.csv")
user_activity

Unnamed: 0.1,Unnamed: 0,user_id,activity
0,0,1,0
1,1,2,0
2,2,3,0
3,3,4,0
4,4,5,0
5,5,6,3
6,6,7,13
7,7,8,1
8,8,9,5
9,9,10,9


For example, we can set the subsampling rate of the data to be inversely proportional to how active the user is. 

In [23]:
class SampleActiveUsers(LoadSheddingPolicy):
    
    def __init__(self, user_activity_csv):
        user_activity = pd.read_csv("user_active_time.csv")
        self.weights = user_activity.set_index("user_id")["activity"].to_dict()

    def process(record: Record): 
        return random.random() < self.weights[record.user]

Alternatively, we can create a key prioritization policy which prioritizes keys uses lottery scheduling. 

In [24]:
from ralf import PrioritizationPolicy
from typing import List

class LotteryScheduling(PrioritizationPolicy): 
    
    def __init__(self, user_activity_csv): 
        user_activity = pd.read_csv(user_activity_csv)
        self.weights = user_activity.set_index("user_id")["activity"].to_dict()
        
    def choose(self, keys: List): 
        # TODO: implement prioritized lottery scheduling 
        return random.choose(keys)

user_vectors.add_prioritization_policy(LotteryScheduling, "user_active_time.csv")

Table(UserVector)