### Install required packages

In [None]:
pip install --upgrade pip setuptools wheel

In [None]:
%pip install feast==0.27.0
%pip install feast[cassandra]
%pip install psycopg2-binary
%pip install boto3
!pip install typeguard==2.13.3
%pip install "dask[dataframe]" --upgrade
%pip install pip --upgrade
%pip install pyopenssl --upgrade

In [1]:
import os
import warnings

warnings.filterwarnings('ignore')

### Set environment settings

### Look on data from database sources 

In [2]:
import sqlalchemy
import pandas as pd

def get_sqlalchemy_engine():
    url = "postgresql://postgres:postgres@cassandra-postgresql.feast-db:5432/FEAST_OFFLINE_STORE"
    return sqlalchemy.create_engine(url, client_encoding='utf8', connect_args={'options': '-c search_path={}'.format("public")})

con = get_sqlalchemy_engine()

In [3]:
pd.read_sql_query('select * from credit_history limit 5',con=con)

Unnamed: 0,event_timestamp,dob_ssn,credit_card_due,mortgage_due,student_loan_due,vehicle_loan_due,hard_pulls,missed_payments_2y,missed_payments_1y,missed_payments_6m,bankruptcies,created_timestamp
0,2020-04-26 18:01:04.746575,19530219_5179,8419,91803,22328,15078,0,1,0,0,0,2020-04-26 18:01:04.746575
1,2020-04-26 18:01:04.746575,19781116_7723,2944,741165,2515,28605,0,3,3,1,0,2020-04-26 18:01:04.746575
2,2020-04-26 18:01:04.746575,19931128_5771,833,976522,33000,21733,9,7,0,0,0,2020-04-26 18:01:04.746575
3,2020-04-26 18:01:04.746575,19500806_6783,5936,1553523,48955,26219,1,0,0,0,0,2020-04-26 18:01:04.746575
4,2020-04-26 18:01:04.746575,19620322_7692,1575,1067381,9501,15814,1,1,0,0,0,2020-04-26 18:01:04.746575


In [4]:
pd.read_sql_query('select * from zipcode limit 5',con=con)

Unnamed: 0,zipcode,city,state,location_type,tax_returns_filed,population,total_wages,event_timestamp,created_timestamp
0,7675,WESTWOOD,NJ,PRIMARY,13245,24083,1089095041,2017-01-01 12:00:00+00:00,2017-01-01 12:00:00+00:00
1,7677,WOODCLIFF LAKE,NJ,PRIMARY,2945,5471,325436960,2017-01-01 12:00:00+00:00,2017-01-01 12:00:00+00:00
2,7885,WHARTON,NJ,PRIMARY,5273,8999,240827990,2017-01-01 12:00:00+00:00,2017-01-01 12:00:00+00:00
3,7981,WHIPPANY,NJ,PRIMARY,4585,8057,292096795,2017-01-01 12:00:00+00:00,2017-01-01 12:00:00+00:00
4,8889,WHITEHOUSE STATION,NJ,PRIMARY,4691,8570,401312434,2017-01-01 12:00:00+00:00,2017-01-01 12:00:00+00:00


In [5]:
pd.read_sql_query('select * from income_history limit 5',con=con)

Unnamed: 0,dob_ssn,date,income,event_timestamp
0,19530219_5179,2020-01-01,19252,2023-02-12 21:23:14.347669
1,19520816_8737,2020-01-01,10351,2023-02-12 21:23:14.347669
2,19860413_2537,2020-01-01,17100,2023-02-12 21:23:14.347669
3,19760701_8090,2020-01-01,13424,2023-02-12 21:23:14.347669
4,19830125_8297,2020-01-01,19240,2023-02-12 21:23:14.347669


### Create FeatureView based on the clients credit history data

![image.png](attachment:99be1c8b-3294-4181-87b4-f917a4510a29.png)

In [6]:
from datetime import timedelta

from feast import (Entity, Field, FeatureView, ValueType, FeatureService)
from feast.types import Int64, String, Float64
from feast.infra.offline_stores.contrib.postgres_offline_store.postgres_source import (
    PostgreSQLSource,
)

zipcode = Entity(name="zipcode", value_type=ValueType.INT64)

zipcode_source = PostgreSQLSource(
    name="zipcode",
    query="SELECT * FROM zipcode",
    timestamp_field="event_timestamp",
    created_timestamp_column="created_timestamp",
)

zipcode_features = FeatureView(
    name="zipcode_features",
    entities=[zipcode],
    ttl=timedelta(days=3650),
    schema=[
        Field(name="city", dtype=String),
        Field(name="state", dtype=String),
        Field(name="location_type", dtype=String),
        Field(name="tax_returns_filed", dtype=Int64),
        Field(name="population", dtype=Int64),
        Field(name="total_wages", dtype=Int64),
    ],
    source=zipcode_source,
)

dob_ssn = Entity(
    name="dob_ssn",
    value_type=ValueType.STRING,
)

credit_history_source = PostgreSQLSource(
    name="credit_history",
    query="SELECT * FROM credit_history",
    timestamp_field="event_timestamp",
    created_timestamp_column="created_timestamp",
)

credit_history = FeatureView(
    name="credit_history",
    entities=[dob_ssn],
    ttl=timedelta(days=300),
    schema=[
        Field(name="credit_card_due", dtype=Int64),
        Field(name="mortgage_due", dtype=Int64),
        Field(name="student_loan_due", dtype=Int64),
        Field(name="vehicle_loan_due", dtype=Int64),
        Field(name="hard_pulls", dtype=Int64),
        Field(name="missed_payments_2y", dtype=Int64),
        Field(name="missed_payments_1y", dtype=Int64),
        Field(name="missed_payments_6m", dtype=Int64),
        Field(name="bankruptcies", dtype=Int64),
    ],
    source=credit_history_source,
)

client_source = PostgreSQLSource(
    name="client_stats",
    query="""SELECT dob_ssn, CAST(date AS timestamp) AS event_timestamp, 
    SUM(income) OVER(
        PARTITION BY dob_ssn
        ORDER BY date ASC
        RANGE BETWEEN INTERVAL '11' MONTH PRECEDING AND CURRENT ROW
    ) AS incomeAmount12M,
    MAX(date) OVER(
        PARTITION BY dob_ssn
        ORDER BY date ASC
        RANGE BETWEEN INTERVAL '11' MONTH PRECEDING AND CURRENT ROW
    ) AS created_timestamp
    FROM income_history""",
    timestamp_field="event_timestamp",
    created_timestamp_column="created_timestamp",
)

id_entity = Entity(
    name="dob_ssn",
    value_type=ValueType.STRING)

scoring_features = FeatureView(
    name="client_features",
    entities=[
        id_entity,
    ],
    ttl=timedelta(days=3650),
    schema=[
        Field(name="incomeamount12m", dtype=Float64),
    ],
    source=client_source,
)

feature_service = FeatureService(
    name='loan_features',
    features=[zipcode_features, credit_history, scoring_features]
)

Feast is an open source project that collects anonymized error reporting and usage statistics. To opt out or learn more see https://docs.feast.dev/reference/usage


  import pkg_resources
Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)
Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)
Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)
Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)


In [7]:
feast_features = [
    "zipcode_features:city",
    "zipcode_features:state",
    "zipcode_features:location_type",
    "zipcode_features:tax_returns_filed",
    "zipcode_features:population",
    "zipcode_features:total_wages",
    "credit_history:credit_card_due",
    "credit_history:mortgage_due",
    "credit_history:student_loan_due",
    "credit_history:vehicle_loan_due",
    "credit_history:hard_pulls",
    "credit_history:missed_payments_2y",
    "credit_history:missed_payments_1y",
    "credit_history:missed_payments_6m",
    "credit_history:bankruptcies",
]

  and should_run_async(code)


In [9]:
from feast import FeatureStore
store = FeatureStore(repo_path=".")
store.apply([zipcode, zipcode_source, zipcode_features, dob_ssn, credit_history_source, credit_history,
             client_source, id_entity, scoring_features, feature_service])

### Prepare dataset and test our features description

In [10]:
import pandas as pd

loans = pd.read_parquet("loan_table.parquet")

  and should_run_async(code)


In [11]:
loans.head(5)

  and should_run_async(code)


Unnamed: 0,loan_id,dob_ssn,zipcode,person_age,person_income,person_home_ownership,person_emp_length,loan_intent,loan_amnt,loan_int_rate,loan_status,event_timestamp,created_timestamp
0,10000,19530219_5179,76104,22,59000,RENT,123.0,PERSONAL,35000,16.02,1,2021-08-25 20:34:41.361000+00:00,2021-08-25 20:34:41.361000+00:00
1,10001,19520816_8737,70380,21,9600,OWN,5.0,EDUCATION,1000,11.14,0,2021-08-25 20:16:20.128000+00:00,2021-08-25 20:16:20.128000+00:00
2,10002,19860413_2537,97039,25,9600,MORTGAGE,1.0,MEDICAL,5500,12.87,1,2021-08-25 19:57:58.896000+00:00,2021-08-25 19:57:58.896000+00:00
3,10003,19760701_8090,63785,23,65500,RENT,4.0,MEDICAL,35000,15.23,1,2021-08-25 19:39:37.663000+00:00,2021-08-25 19:39:37.663000+00:00
4,10004,19830125_8297,82223,24,54400,RENT,8.0,MEDICAL,35000,14.27,1,2021-08-25 19:21:16.430000+00:00,2021-08-25 19:21:16.430000+00:00


In [12]:
loan_features = store.get_feature_service('loan_features')

  and should_run_async(code)


Enrich loans tables by the features from zipcode_features and credit_history (feast_features). (join loans table with zipcode and credit_history tables by zipcode (for zipcode_features), dob_ssn (for credit_history) columns)

In [13]:
training_features = store.get_historical_features(
    entity_df=loans, 
    features=loan_features
)

training_df = training_features.to_df()

training_df

Unnamed: 0,loan_id,dob_ssn,zipcode,person_age,person_income,person_home_ownership,person_emp_length,loan_intent,loan_amnt,loan_int_rate,...,credit_card_due,mortgage_due,student_loan_due,vehicle_loan_due,hard_pulls,missed_payments_2y,missed_payments_1y,missed_payments_6m,bankruptcies,incomeamount12m
0,10002,19860413_2537,97039,25,9600,MORTGAGE,1.0,MEDICAL,5500,12.87,...,5508,1010133,8533,16130,1,7,1,1,0,205200.0
1,10003,19760701_8090,63785,23,65500,RENT,4.0,MEDICAL,35000,15.23,...,2345,1619904,1429,20339,8,4,0,0,0,161088.0
2,10004,19830125_8297,82223,24,54400,RENT,8.0,MEDICAL,35000,14.27,...,9294,1485576,16009,11457,10,6,0,1,0,230880.0
3,10006,19650101_1416,73724,26,77100,RENT,8.0,EDUCATION,35000,12.42,...,5067,61660,39760,11878,10,6,3,0,0,182976.0
4,10009,19780808_9123,71260,21,10000,OWN,6.0,VENTURE,1600,14.74,...,9321,873285,39759,22508,7,3,3,0,1,231024.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
28663,38629,19710814_9007,40380,52,52000,OWN,0.0,PERSONAL,9600,8.49,...,3773,1522996,25688,15724,1,2,2,0,2,130184.0
28664,38631,19980307_2303,44068,52,65004,RENT,4.0,PERSONAL,20000,15.58,...,9581,776458,46628,12529,7,4,2,1,0,112656.0
28665,38633,19491126_1487,43205,57,53000,MORTGAGE,1.0,PERSONAL,5800,13.16,...,388,438726,41429,12736,6,4,0,1,0,97264.0
28666,38635,19880422_2592,68826,65,76000,RENT,3.0,HOMEIMPROVEMENT,35000,10.99,...,4089,1384134,31368,28279,3,7,0,0,0,101360.0


In [14]:
from feast.infra.offline_stores.contrib.postgres_offline_store.postgres_source import (
    PostgreSQLSource, SavedDatasetPostgreSQLStorage
)

from datetime import datetime
datestr = datetime.now().strftime("%Y%m%d")

dataset = store.create_saved_dataset(
    from_=training_features,
    name=f'test_training_dataset_{datestr}',
    storage=SavedDatasetPostgreSQLStorage(table_ref=f'test_training_dataset_{datestr}')
)

  and should_run_async(code)


In [17]:
dataset = store.get_saved_dataset(f'test_training_dataset_{datestr}')
dataset.to_df()

dropped


### Materializing all feature view into Online Feature Store

In [18]:
from feast import FeatureStore

store = FeatureStore(repo_path=f".")

In [19]:
from datetime import datetime, timedelta
store.materialize(
    feature_views=["credit_history"],
    start_date=datetime.utcnow() - timedelta(days=1000), end_date=datetime.utcnow() - timedelta(minutes=10)
)

Materializing [1m[32m1[0m feature views from [1m[32m2021-03-24 11:00:06+00:00[0m to [1m[32m2023-12-19 10:50:06+00:00[0m into the [1m[32mcassandra[0m online store.

[1m[32mcredit_history[0m:


28636it [01:31, 311.28it/s]                                                                         


In [20]:
from datetime import datetime, timedelta
store.materialize(
    feature_views=["zipcode_features"],
    start_date=datetime.utcnow() - timedelta(days=3650), end_date=datetime.utcnow() - timedelta(minutes=10)
)

  and should_run_async(code)


Materializing [1m[32m1[0m feature views from [1m[32m2013-12-21 11:01:42+00:00[0m to [1m[32m2023-12-19 10:51:42+00:00[0m into the [1m[32mcassandra[0m online store.

[1m[32mzipcode_features[0m:


28847it [00:59, 481.73it/s]                                                                         


In [21]:
store.materialize(
    feature_views=["client_features"],
    start_date=datetime.utcnow() - timedelta(days=3650), end_date=datetime.utcnow() - timedelta(minutes=10)
)

  and should_run_async(code)


Materializing [1m[32m1[0m feature views from [1m[32m2013-12-21 11:02:42+00:00[0m to [1m[32m2023-12-19 10:52:42+00:00[0m into the [1m[32mcassandra[0m online store.

[1m[32mclient_features[0m:


28636it [00:10, 2711.56it/s]                                                                        


### Get online features

In [22]:
loans.head(1)

  and should_run_async(code)


Unnamed: 0,loan_id,dob_ssn,zipcode,person_age,person_income,person_home_ownership,person_emp_length,loan_intent,loan_amnt,loan_int_rate,loan_status,event_timestamp,created_timestamp
0,10000,19530219_5179,76104,22,59000,RENT,123.0,PERSONAL,35000,16.02,1,2021-08-25 20:34:41.361000+00:00,2021-08-25 20:34:41.361000+00:00


In [24]:
zipcode = loans['zipcode'].values
dob_ssn = loans['dob_ssn'].values

In [25]:
zipcode

0        76104
1        70380
2        97039
3        63785
4        82223
         ...  
28633    43205
28634    24872
28635    68826
28636    92014
28637    69033
Name: zipcode, Length: 28638, dtype: int64

In [27]:
online_features = store.get_online_features(
    features = loan_features,
    entity_rows=[
        {
            'zipcode': zipcode,
            'dob_ssn': dob_ssn
        }
    ]
)

ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().

In [16]:
def del_table(table): #parquet_name: str, table: str):

    def get_sqlalchemy_engine():
        import sqlalchemy
        url =  f"postgresql://postgres:postgres@cassandra-postgresql.feast-db:5432/FEAST_OFFLINE_STORE"
        return sqlalchemy.create_engine(url, client_encoding='utf8', connect_args={'options': '-c search_path={}'.format("public")})

    import pandas as pd
    con = None
    raw_con = None
    try:
        con = get_sqlalchemy_engine()

        con.execute("DROP TABLE IF EXISTS " + table)
    finally:
        print("dropped")

del_table(f'test_training_dataset_{datestr}')