In [2]:
!pip install feast[aws]

Collecting feast[aws]
  Downloading feast-0.19.3-py3-none-any.whl (289 kB)
[K     |████████████████████████████████| 289 kB 33.3 MB/s 
Collecting dask<2022.02.0,>=2021.*
  Downloading dask-2022.1.1-py3-none-any.whl (1.1 MB)
[K     |████████████████████████████████| 1.1 MB 50.0 MB/s 
Collecting mmh3
  Downloading mmh3-3.0.0-cp37-cp37m-manylinux2010_x86_64.whl (50 kB)
[K     |████████████████████████████████| 50 kB 6.2 MB/s 
[?25hCollecting uvicorn[standard]>=0.14.0
  Downloading uvicorn-0.17.6-py3-none-any.whl (53 kB)
[K     |████████████████████████████████| 53 kB 1.8 MB/s 
Collecting fastavro>=1.1.0
  Downloading fastavro-1.4.10-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.3 MB)
[K     |████████████████████████████████| 2.3 MB 59.8 MB/s 
Collecting proto-plus<1.19.7
  Downloading proto_plus-1.19.6-py3-none-any.whl (45 kB)
[K     |████████████████████████████████| 45 kB 3.1 MB/s 
[?25hCollecting PyYAML>=5.4.*
  Downloading PyYAML-6.0-cp37-cp37m-manylinux_2_5_x86

In [None]:
!feast init -t aws customer_segmentation

Feast is an open source project that collects anonymized error reporting and usage statistics. To opt out or learn more see https://docs.feast.dev/reference/usage
  from numpy.dual import register_func
  supported_dtypes = [np.typeDict[x] for x in supported_dtypes]
AWS Region (e.g. us-west-2): us-east-1
Redshift Cluster ID: feast-demo-mar-2022
Redshift Database Name: dev
Redshift User Name: awsuser
Redshift S3 Staging Location (s3://*): s3://feast-demo-mar-2022/staging
Redshift IAM Role for S3 (arn:aws:iam::*:role/*): arn:aws:iam::<account_numer>:role/feast-demo-mar-2022-spectrum-role
Should I upload example data to Redshift (overwriting 'feast_driver_hourly_stats' table)? [Y/n]: n

Creating a new Feast repository in [1m[32m/content/customer_segmentation[0m.



In [None]:
!ls -R /content/customer_segmentation/

/content/customer_segmentation/:
driver_repo.py	feature_store.yaml  __init__.py  test.py


In [None]:
%cd customer_segmentation/
!rm -rf driver_repo.py test.py

/content/customer_segmentation


In [None]:
!cat /content/customer_segmentation/feature_store.yaml

project: customer_segmentation
registry: data/registry.db
provider: aws
online_store:
  type: dynamodb
  region: us-east-1
offline_store:
  type: redshift
  cluster_id: feast-demo-mar-2022
  region: us-east-1
  database: dev
  user: awsuser
  s3_staging_location: s3://feast-demo-mar-2022/staging
  iam_role: arn:aws:iam::<account_numer>:role/feast-demo-mar-2022-spectrum-role


In [None]:
!feast apply

  from numpy.dual import register_func
  supported_dtypes = [np.typeDict[x] for x in supported_dtypes]
[1m[94mNo changes to registry


In [None]:
from feast import FeatureStore
store = FeatureStore(repo_path=".")

  from numpy.dual import register_func
  supported_dtypes = [np.typeDict[x] for x in supported_dtypes]


In [None]:
print(f"List of entities: {store.list_entities()}")
print(f"List of FeatureViews: {store.list_feature_views()}")

List of entities: []
List of FeatureViews: []


## **Apply Feature definition after creating all the resources and mapping external database**

In [28]:
import os
os.environ["AWS_ACCESS_KEY_ID"] = "<aws_key>"
os.environ["AWS_SECRET_ACCESS_KEY"] = "<aws_secret>"
os.environ["AWS_DEFAULT_REGION"] = "us-east-1"

In [None]:
%cd customer_segmentation/
!feast apply

[Errno 2] No such file or directory: 'customer_segmentation/'
/content/customer_segmentation
  from numpy.dual import register_func
  supported_dtypes = [np.typeDict[x] for x in supported_dtypes]
03/14/2022 03:01:38 AM INFO:Found credentials in environment variables.
Created data source [1m[32m[0m
Created entity [1m[32mcustomer[0m
Created feature view [1m[32mcustomer_rfm_features[0m

Deploying infrastructure for [1m[32mcustomer_rfm_features[0m


## **After Feast Apply**

In [6]:
%cd customer_segmentation/
from feast import FeatureStore
store = FeatureStore(repo_path=".")

/content/customer_segmentation


  from numpy.dual import register_func
  supported_dtypes = [np.typeDict[x] for x in supported_dtypes]


In [9]:
print("-----------------------Entity----------------------------")
for entity in store.list_entities():
  print(f"entity: {entity}")
print("--------------------Feature Views----------------------------")
for feature_view in store.list_feature_views():
  print(f"List of FeatureViews: {feature_view}")

-----------------------Entity----------------------------
entity: {
  "spec": {
    "name": "customer",
    "valueType": "STRING",
    "description": "Id of the customer",
    "joinKey": "CustomerID"
  },
  "meta": {
    "createdTimestamp": "2022-03-14T23:41:33.070471Z",
    "lastUpdatedTimestamp": "2022-03-14T23:41:33.070471Z"
  }
}
--------------------Feature Views----------------------------
List of FeatureViews: {
  "spec": {
    "name": "customer_rfm_features",
    "entities": [
      "customer"
    ],
    "features": [
      {
        "name": "Recency",
        "valueType": "INT32"
      },
      {
        "name": "Frequency",
        "valueType": "INT32"
      },
      {
        "name": "MonetaryValue",
        "valueType": "DOUBLE"
      },
      {
        "name": "R",
        "valueType": "INT32"
      },
      {
        "name": "F",
        "valueType": "INT32"
      },
      {
        "name": "M",
        "valueType": "INT32"
      },
      {
        "name": "RFMScore",
    



## **Query Data**

In [23]:
import pandas as pd
from datetime import datetime, timedelta
entity_df = pd.DataFrame.from_dict(
    {
        "CustomerID": ["12747.0", "12748.0", "12749.0"],
        "event_timestamp": [datetime.now()]*3
    }
)
entity_df.head()

Unnamed: 0,CustomerID,event_timestamp
0,12747.0,2022-03-15 00:58:45.594038
1,12748.0,2022-03-15 00:58:45.594038
2,12749.0,2022-03-15 00:58:45.594038


In [24]:
job = store.get_historical_features(
    entity_df=entity_df,
    features=[
              "customer_rfm_features:recency", 
              "customer_rfm_features:frequency", 
              "customer_rfm_features:MonetaryValue", 
              "customer_rfm_features:R", 
              "customer_rfm_features:F", 
              "customer_rfm_features:M"]
    )



In [25]:
df = job.to_df()

In [27]:
df.head()

Unnamed: 0,customerid,event_timestamp,recency,frequency,monetaryvalue,r,f,m
0,12747.0,2022-03-15 00:58:45.594038,7,35,1082.09,3,2,3
1,12749.0,2022-03-15 00:58:45.594038,8,54,782.1,3,3,3
2,12748.0,2022-03-15 00:58:45.594038,1,582,4336.73,3,3,3
