# Rice Simple Feature Store Demo

This is a demo for the rice-sfs system. In this notebook, we would see how to use the system to manage the features. Specifically, we would ingest some features into the system and retrieve them later. Through this process, you will see the workflow of a typical user of the system.  

## Setup

This section help you setup the environment needed in order to run the notebook.

We perform all the computation and storing in you local machine.

### Local DynamoDB

You have to have a local version of DynamoDB server running on your machine.

To run a DynamoDB locally, first go
to [AWS DynamoDB Developer Guide](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DynamoDBLocal.DownloadingAndRunning.html)
, download the archive, and put the extracted `DynamoDBLocal.jar` and `DynamoDBLocal_lib/`
in `dynamodb/` directory.

Then run:

```shell
bash dynamodb/start_dynamo_local.sh
```

Use aws-cli to access DynamoDB:

```shell
aws dynamodb --endpoint-url http://localhost:8000 <command> [args]
```

### Local PostgreSQL Server

You have to have a local version of PostgreSQL server running on your machine.

You can download a PostgreSQL for your OS [here](https://www.postgresql.org/download/) and start the server in the backgroud.

By default the PostgreSQL server would run on `localhost:5432`.

After getting PostgreSQL running, you should create a user and a database:

```shell
# create user "rice"
createuser --createdb rice

# create database "sfs"
createdb --username rice --no-password sfs
```

We would use the user `rice` to access the database `sfs`.

### Local Spark

You have to have a local Spark cluster running on your machine.

You can find instructions on intalling and deploying a Spark on your local machine [here](https://spark.apache.org/docs/latest/).

After getting spark running in the background, you can use `spark-shell` to start a Scala shell:

```bash
spark-shell --packages io.delta:delta-core_2.12:1.1.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
```

Note that we add Delta package so that later we can directly query delta lake.

### Download Dataset

We use the MovieLens 100K Dataset. You can download it [here[(https://grouplens.org/datasets/movielens/100k/).

After you download the dataset, please put the extracted `ml-100k/` in `example/data/` directory.

### Start the services

Go to the root directory of the project. Run `mvn clean package` to package the program.

Go the `sfs-registry/` directory. Run `mvn spring-boot:run` to start the feature registry service. By default it will listen to port `8081`.

Start another terminal, go to the `sfs-serving/` directory. Run `mvn spring-boot:run` to start the feature serving service. By default it will listen to port `8082`.

In [None]:
import pandas as pd
import requests
import os

%load_ext autoreload
%autoreload 2

## Feature Registry

We first register some features in the system.

In [None]:
registry_endpoint = "http://localhost:8081/api/v1"
serving_endpoint = "http://localhost:8082/api/v1"

In [None]:
# register the user entity
requests.post(registry_endpoint + "/entities", json={"name":"user","description":"user in the system"}).json()

In [None]:
# register some features
requests.post(registry_endpoint + "/features",
              json={"name":"user_age","description":"user age","valueType":"INT","deltaTableColName":"age","dynamoTableColName":"age"}).json()
requests.post(registry_endpoint + "/features",
              json={"name":"user_gender","description":"user gender","valueType":"STRING","deltaTableColName":"gender","dynamoTableColName":"gender"}).json()
requests.post(registry_endpoint + "/features",
              json={"name":"user_occupation","description":"user occupation","valueType":"STRING","deltaTableColName":"occupation","dynamoTableColName":"occupation"}).json()

In [None]:
# register a feature table
path = os.getcwd()
pj_root = os.path.join(path, os.pardir)
delta_root = os.path.join(pj_root, "delta/")
body = {
    "name":"user_feat",
    "description":"user feature",
    "entity":"user",
    "features":["user_age","user_gender"],
    "dynamoTableName":"user_feat",
    "deltaTablePath":os.path.abspath(os.path.join(delta_root, "user_feat/"))
}
requests.post(registry_endpoint + "/featureTables", json=body).json()

## Feature Ingestion

We would ingest some user features stored in the `ml-100k/u.user` file.

In [None]:
!spark-submit \
  --master "local[4]" \
  --deploy-mode client \
  --class edu.rice.sfs.ingest.BatchIngestApp \
  --name BatchIngestApp \
  --packages com.audienceproject:spark-dynamodb_2.12:1.1.2 \
  --packages io.delta:delta-core_2.12:1.1.0 \
  --conf "spark.driver.extraJavaOptions=-Daws.dynamodb.endpoint=http://localhost:8000/" \
  --conf "spark.sfs.source.batch.options=delimiter:|" \
  --conf "spark.sfs.source.batch.format=csv" \
  --conf "spark.sfs.source.batch.path=$(pwd)/data/ml-100k/u.user" \
  --conf "spark.sfs.source.batch.cols=id, age, gender, occupation, zip_code" \
  --conf "spark.sfs.source.batch.feature-table=user_feat" \
  --conf "spark.sfs.source.batch.entity-col=id" \
  --conf "spark.sfs.source.batch.feature-col-map=user_age:age,user_gender:gender" \
  --driver-java-options "-Daws.dynamodb.endpoint=http://localhost:8000/" \
  --driver-memory 1024M \
  --driver-cores 1 \
  --executor-memory 1G \
  $(dirname `pwd`)/sfs-ingest/target/sfs-ingest-0.0.1-SNAPSHOT-jar-with-dependencies.jar

Now you can use spark-shell to see if the feature is actually ingested.

## Feature Serving

We now query the serving service to get the online feature from DynamoDB.

In [None]:
# first register a feature table view.
body = {
    "name":"user_feat_view",
    "featureTableName":"user_feat",
    "featureNames":["user_age"] # only care about the ages
}
requests.post(registry_endpoint + "/featureTableViews", json=body).json()

In [None]:
requests.get(serving_endpoint + "/getFeature", params={"featureTableView": "user_feat_view", "entity": "42"}).json()

In [None]:
# if we care more features, we can use a different view
body = {
    "name":"user_feat_view_2",
    "featureTableName":"user_feat",
    "featureNames":["user_age", "user_gender"] # only care about the ages
}
requests.post(registry_endpoint + "/featureTableViews", json=body).json()

In [None]:
requests.get(serving_endpoint + "/getFeature", params={"featureTableView": "user_feat_view_2", "entity": "42"}).json()