<a href="https://colab.research.google.com/github/Vaibhav-sa30/Workshop-Notes/blob/main/Tecton_Rift_Lab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 🧪 Lab: Productionizing Real-Time Features with Tecton and Rift

In this lab, we will explore how we can develop and test real-time features for a fraud detecton use case using Tecton and Rift.

Rift is Tecton's Python-first compute engine for efficiently computing batch, stream, and real-time features using Python and SQL. With Rift we can develop and test features locally in any Python environment and then productionize with a single step.

Let's try it out!

## ⚙️ Install Pre-Reqs

Run the following commands to install Tecton and other pre-requisites.

**After installation, be sure to restart your session via "Runtime -> Restart Session" in the menu above.**

In [None]:
!pip install virtualenv
!virtualenv tecton
!source tecton/bin/activate
!pip install --pre 'tecton[rift]' s3fs fsspec scikit-learn

created virtual environment CPython3.10.12.final.0-64 in 359ms
  creator CPython3Posix(dest=/content/tecton, clear=False, no_vcs_ignore=False, global=False)
  seeder FromAppData(download=False, pip=bundle, setuptools=bundle, wheel=bundle, via=copy, app_data_dir=/root/.local/share/virtualenv)
    added seed packages: pip==24.0, setuptools==69.1.0, wheel==0.42.0
  activators BashActivator,CShellActivator,FishActivator,NushellActivator,PowerShellActivator,PythonActivator


✅ Restart your session via "Runtime -> Restart Session"

---



## 👩‍💻 Log into a Tecton account

In [None]:
import tecton, os, requests, json
import pandas as pd
from pprint import pprint
from tecton import *
from tecton.types import *
from datetime import datetime, timedelta

tecton.set_validation_mode('auto')

tecton.login('lab.tecton.ai')

Please visit the following link to login and access the authentication code:
https://login.tecton.ai/oauth2/default/v1/authorize?response_type=code&client_id=0oasw3q3gyNq0Guf8357&redirect_uri=https%3A%2F%2Fwww.tecton.ai%2Fauthorization-callback&state=16546799035455486616&scope=openid+offline_access+profile+email&code_challenge_method=S256&code_challenge=A8fhRJMOp4MLJhfOXM5gSY6HDlNqD2HdVSyUNx7riIU
Paste the authentication code here:Wnir0fso7y-DJwy0dXcu0gz96ShWg2itw-QuwREytgk
✅ Authentication successful!


## 🔎 Examine Raw Data

On S3 we have a historical log of a transaction stream representing transactions that users made at different merchants in the last few years.

We can use this data to brainstorm streaming features and even test them out with Tecton!

In [None]:
df = pd.read_parquet("s3://tecton.ai.public/tutorials/fraud_demo/transactions/data.pq", storage_options={'anon': True})

display(df)

Unnamed: 0,user_id,transaction_id,category,amt,is_fraud,merchant,merch_lat,merch_long,timestamp
0,user_884240387242,3eb88afb219c9a10f5130d0b89a13451,gas_transport,68.23,0,"fraud_Kutch, Hermiston and Farrell",42.710006,-78.338644,2023-06-20 10:26:41
1,user_268514844966,72e23b9193f97c2ba654854a66890432,misc_pos,32.98,0,"fraud_Lehner, Reichert and Mills",39.153572,-122.364270,2023-06-20 12:57:20
2,user_722584453020,db7a41ce2d16a4452c973418d9e544b1,home,4.50,0,"fraud_Koss, Hansen and Lueilwitz",33.033236,-105.745700,2023-06-20 14:49:59
3,user_337750317412,edfc42f7bc4b86d8c142acefb88c4565,misc_pos,7.68,0,fraud_Buckridge PLC,40.682842,-88.808371,2023-06-20 14:50:13
4,user_934384811883,93d28b6d2e5afebf9c40304aa709ab29,kids_pets,68.97,1,fraud_Lubowitz-Walter,39.144282,-96.125035,2023-06-20 15:55:09
...,...,...,...,...,...,...,...,...,...
33804,user_650387977076,951fca1d5c06841f8a8dc2af9cfdfb7a,entertainment,77.07,1,fraud_Johns Inc,37.668954,-120.963155,2021-10-29 05:20:40
33805,user_722584453020,5240efa75db0dec37eb43d8f285649df,home,41.01,0,fraud_Reilly LLC,31.768261,-107.206737,2021-10-29 05:28:04
33806,user_394495759023,7f4fb01bf4f4c1174746d1128d7dca65,shopping_pos,8.07,0,"fraud_Bahringer, Schoen and Corkery",30.789731,-81.583807,2021-10-29 05:39:33
33807,user_687958452057,ab5057aca2f336a38e7877d52e30da18,home,122.95,0,fraud_Altenwerth-Kilback,41.540639,-85.859128,2021-10-29 09:31:12


## 🌊 Define and Test Streaming Features

Streaming features can be tested offline in a notebook and used to train a model. Tecton uses the historical log of a stream to compute accurate historical feature values.

✅ Try extending the definition below with more features, such as:

- The total dollar amount of transactions a user has made in the last 1 minute, 5 minutes, and 1 year.
- The total number of transactions a user has made in the last 1 minute, 5 minutes, and 1 year.

You may find [this documentation](https://docs.tecton.ai/docs/beta/defining-features/feature-views/aggregation-engine/aggregation-functions) helpful.

In [None]:
# Define a stream source, including the historical log of the stream
transactions_stream = StreamSource(
    name='transactions_stream',
    stream_config=PushConfig(),
    batch_config=FileConfig(
        uri='s3://tecton.ai.public/tutorials/fraud_demo/transactions/data.pq',
        file_format='parquet',
        timestamp_field='timestamp'
    ),
    schema=[Field('user_id', String), Field('timestamp', Timestamp), Field('amt', Float64)]
)

# Define the entity we are creating features for
user = Entity(name='user', join_keys=['user_id'])

# Define features
@stream_feature_view(
    source=transactions_stream,
    entities=[user],
    mode='pandas',
    aggregations=[
        Aggregation(function='mean', column='amt', time_window=timedelta(minutes=1)),
        Aggregation(function='mean', column='amt', time_window=timedelta(minutes=5)),
        Aggregation(function='mean', column='amt', time_window=timedelta(days=365))
        # Aggregation(function='sum', column='amt', time_window=timedelta(minutes=1)),
        # Aggregation(function='sum', column='amt', time_window=timedelta(minutes=5)),
        # Aggregation(function='sum', column='amt', time_window=timedelta(days=365)),
        # Aggregation(function='count', column='amt', time_window=timedelta(minutes=1)),
        # Aggregation(function='count', column='amt', time_window=timedelta(minutes=5)),
        # Aggregation(function='count', column='amt', time_window=timedelta(days=365))
    ],
    schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amt", Float64)]
)
def user_transaction_features(transactions):
    return transactions[['user_id', 'timestamp', 'amt']]


# Compute features
start = datetime(2023,1,1)
end = datetime(2023,6,1)

feature_df = user_transaction_features.get_historical_features(start_time=start, end_time=end).to_pandas()

display(feature_df)

StreamFeatureView 'user_transaction_features': Validating 3 dependencies.
    StreamSource 'transactions_stream': Deriving schema.
    StreamSource 'transactions_stream': Successfully validated.
    Entity 'user': Successfully validated.
    Transformation 'user_transaction_features': Successfully validated.
StreamFeatureView 'user_transaction_features': Successfully validated.
---- Executing stage: Reading Data Sources                               00:02

## ⏱️ Define and Test Real-Time Features

Now let's define a feature that checks if the current transaction amount a user is seeking to make is higher than their historical average.

Because this feature depends on real-time info (the current transaction amount), we need to compute it at the time of the request. That's exactly where on-demand features come in.

✅ Try changing the definition below to compare the transaction to the 1 year average instead of the 5 minute average.

In [None]:
# Define on-demand features
transaction_request = RequestSource(schema=[Field("amt", Float64)])

@on_demand_feature_view(
    sources=[transaction_request, user_transaction_features],
    mode="python",
    schema=[Field("transaction_amount_is_higher_than_average", Bool)],
)
def transaction_amount_is_higher_than_average(transaction_request, user_transaction_features):
    amount_mean = user_transaction_features["amt_mean_5m_continuous"]
    amount_mean = 0 if amount_mean is None else amount_mean
    return {"transaction_amount_is_higher_than_average": transaction_request["amt"] > amount_mean}


# Test on-demand features
averages = feature_df.drop(columns=['user_id', 'timestamp', '_effective_timestamp']).iloc[0].to_dict()
request = {'amt': 10.4}
features = transaction_amount_is_higher_than_average.run(transaction_request=request, user_transaction_features=averages)

print('\nRequest amount: ' + str(request['amt']))
print('Average: ' + str(averages['amt_mean_5m_continuous']))
print(str(features))

## 🧮 Generate Training Data

Now that we've created some features, it's time to join them into a training data set so we can train a model.

First let's load up a list of historical training events. These events represent labeled historical user transactions.

In [None]:
training_events = pd.read_parquet("s3://tecton.ai.public/tutorials/fraud_demo/transactions/data.pq", storage_options={'anon': True}) \
                    [['user_id', 'timestamp', 'amt', 'is_fraud']]

display(training_events)

Now that we have our training events, we can get features for those events by adding them to a Feature Service and calling `get_historical_features(events)`.

The feature service defines the set of features we want to serve to our model offline and online.

In [None]:
from tecton import FeatureService

fraud_detection_feature_service = FeatureService(
    name="fraud_detection_feature_service",
    features=[user_transaction_features, transaction_amount_is_higher_than_average]
)

training_data = fraud_detection_feature_service.get_historical_features(training_events).to_pandas()

display(training_data)

FeatureService 'fraud_detection_feature_service': Successfully validated.
---- Executing stage: Reading Data Sources                               00:04
---- Executing stage: Evaluating Feature View pipelines                  00:00
---- Executing stage: Computing aggregated features & joining results    00:18
---- Executing stage: Evaluating On-Demand Feature Views                 00:01


Unnamed: 0,user_id,timestamp,amt,is_fraud,user_transaction_features__amt_count_1m_continuous,user_transaction_features__amt_count_365d_continuous,user_transaction_features__amt_count_5m_continuous,user_transaction_features__amt_mean_1m_continuous,user_transaction_features__amt_mean_365d_continuous,user_transaction_features__amt_mean_5m_continuous,user_transaction_features__amt_sum_1m_continuous,user_transaction_features__amt_sum_365d_continuous,user_transaction_features__amt_sum_5m_continuous,transaction_amount_is_higher_than_average__transaction_amount_is_higher_than_average
0,user_205125746682,2023-06-24 14:09:31,6.98,0,1,107,1,6.98,72.763738,6.98,6.98,7785.72,6.98,False
1,user_205125746682,2023-06-27 22:45:07,5.55,1,1,107,1,5.55,72.736355,5.55,5.55,7782.79,5.55,False
2,user_205125746682,2023-06-29 04:12:08,7.12,0,1,108,1,7.12,72.128796,7.12,7.12,7789.91,7.12,False
3,user_205125746682,2023-06-29 21:09:28,60.71,0,1,108,1,60.71,71.297500,60.71,60.71,7700.13,60.71,False
4,user_205125746682,2023-07-08 20:51:07,59.19,1,1,107,1,59.19,71.900561,59.19,59.19,7693.36,59.19,False
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
33804,user_782510788708,2021-10-03 01:50:31,42.66,0,1,31,1,42.66,55.538387,42.66,42.66,1721.69,42.66,False
33805,user_782510788708,2021-10-07 21:48:03,70.77,0,1,32,1,70.77,56.014375,70.77,70.77,1792.46,70.77,False
33806,user_782510788708,2021-10-11 04:54:37,50.27,0,1,33,1,50.27,55.840303,50.27,50.27,1842.73,50.27,False
33807,user_782510788708,2021-10-15 05:17:52,41.80,1,1,34,1,41.80,55.427353,41.80,41.80,1884.53,41.80,False


## 🧠 Train a Model

With a training dataset full of features, we can now train a simple logistic regression model to detect fraudulent transactions.

In [None]:
from sklearn.pipeline import make_pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn import metrics

df = training_data.drop(['user_id', 'timestamp', 'amt'], axis=1)

X = df.drop('is_fraud', axis=1)
y = df['is_fraud']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

num_cols = X_train.select_dtypes(exclude=['object']).columns.tolist()
cat_cols = X_train.select_dtypes(include=['object']).columns.tolist()

num_pipe = make_pipeline(
    SimpleImputer(strategy='median'),
    StandardScaler()
)

cat_pipe = make_pipeline(
    SimpleImputer(strategy='constant', fill_value='N/A'),
    OneHotEncoder(handle_unknown='ignore', sparse_output=False)
)

full_pipe = ColumnTransformer([
    ('num', num_pipe, num_cols),
    ('cat', cat_pipe, cat_cols)
])

model = make_pipeline(full_pipe, LogisticRegression(max_iter=1000, random_state=42))

model.fit(X_train,y_train)

y_predict = model.predict(X_test)

print(model)

Pipeline(steps=[('columntransformer',
                 ColumnTransformer(transformers=[('num',
                                                  Pipeline(steps=[('simpleimputer',
                                                                   SimpleImputer(strategy='median')),
                                                                  ('standardscaler',
                                                                   StandardScaler())]),
                                                  ['user_transaction_features__amt_mean_1m_continuous',
                                                   'user_transaction_features__amt_mean_365d_continuous',
                                                   'user_transaction_features__amt_mean_5m_continuous',
                                                   'user_tran...
                                                                   SimpleImputer(fill_value='N/A',
                                                                    

## 🚀 Apply Features to Production

**NOTE: This step has been done for you already.**

Productionizing features with Tecton is easy. Simply paste the definitions into a repo of Python files, select a workspace, and run `tecton apply to productize

Create a feature repo:
```bash
mkdir feature-repo && cd feature-repo
tecton init
touch features.py
```

Apply features to production:
```bash
tecton login lab.tecton.ai
tecton workspace select prod
tecton apply
```

You can check out the applied features in Tecton's web UI [here](https://lab.tecton.ai/app/repo/prod/features).


## ⚡️ Ingest Streaming Events and Read Real-Time Features

Once we've productionized our Stream Source, we can start sending events to it. Any features defined against this source will be updated in real time!

Try adding your own name as the `user_id` below and watch how feature values update immediately.

In [None]:
tecton.set_credentials(tecton_api_key='3fcbbfb66c3c4d1a7ce1b9e02f410a1f')
os.environ['TECTON_API_KEY'] = '3fcbbfb66c3c4d1a7ce1b9e02f410a1f'

✅ Successfully set credentials.


In [None]:
ws = tecton.get_workspace('prod')
registered_transactions_stream = ws.get_data_source('transactions_stream')

In [None]:
registered_transactions_stream.ingest({
    'user_id': 'mahesh',
    'timestamp': datetime.utcnow(),
    'amt': 50.00
})

{'workspaceName': 'prod',
 'ingestMetrics': {'featureViewIngestMetrics': [{'featureViewName': 'user_transaction_features',
    'onlineRecordIngestCount': '1'}]}}

In [None]:
fs = ws.get_feature_service('fraud_detection_feature_service')
features = fs.get_online_features(join_keys={'user_id': 'mahesh'}, request_data={'amt': 50}).to_dict()

pprint(features)

{'transaction_amount_is_higher_than_average.transaction_amount_is_higher_than_average': False,
 'user_transaction_features.amt_count_1h_continuous': 1,
 'user_transaction_features.amt_count_1m_continuous': 1,
 'user_transaction_features.amt_count_365d_continuous': 2,
 'user_transaction_features.amt_mean_1m_continuous': 50.0,
 'user_transaction_features.amt_mean_365d_continuous': 75.0,
 'user_transaction_features.amt_mean_5m_continuous': 50.0,
 'user_transaction_features.amt_sum_1h_continuous': 50.0,
 'user_transaction_features.amt_sum_1m_continuous': 50.0,
 'user_transaction_features.amt_sum_365d_continuous': 150.0}


## 🔥 Define Online Prediction Pipeline

Now that we have online feature values, we can create a prediction pipeline to determine if a transaction is fraudulent and whether we should accept or reject it.

To do this we will define three functions to:

1. Get features from Tecton
2. Use the real-time features to make a prediction with the model
3. Use the model prediction to accept or reject a transaction

In [None]:
# Get features from Tecton
def get_online_feature_data(user_id, amt):
    headers = {"Authorization": "Tecton-key " + os.environ['TECTON_API_KEY']}

    request_data = f'''{{
        "params": {{
            "feature_service_name": "fraud_detection_feature_service",
            "join_key_map": {{"user_id": "{user_id}"}},
            "metadata_options": {{"include_names": true}},
            "request_context_map": {{"amt": {amt}}},
            "workspace_name": "prod"
        }}
    }}'''

    online_feature_data = requests.request(
        method="POST",
        headers=headers,
        url="https://lab.tecton.ai/api/v1/feature-service/get-features",
        data=request_data,
    )

    online_feature_data_json = json.loads(online_feature_data.text)

    return online_feature_data_json

# Use the real-time features to make a prediction with the model
def get_prediction_from_model(feature_data):
    columns = [f["name"].replace(".", "__") for f in feature_data["metadata"]["features"]]
    data = [feature_data["result"]["features"]]

    features = pd.DataFrame(data, columns=columns)

    return model.predict(features)[0]

# Use the model prediction to accept or reject a transaction
def evaluate_transaction(user_id, amt):
    online_feature_data = get_online_feature_data(user_id, amt)
    is_predicted_fraud = get_prediction_from_model(online_feature_data)

    print('Features: ' + str(online_feature_data["result"]["features"]))
    print('Model Score: ' + str(is_predicted_fraud))

    if is_predicted_fraud == 0:
        print('Transaction accepted.')
    else:
        print('Transaction denied.')

## ⭐️ Evaluate Transactions in Real-Time

Now we have a single decision API to evaluate transactions in real-time!

Let's test it out.

In [None]:
evaluate_transaction('mahesh', 182.46)

KeyError: "['user_transaction_features__amt_sum_5m_continuous'] not in index"