<a href="https://colab.research.google.com/github/evwhiz/tecton-training/blob/main/01_End_To_End_Scenario_Snowflake.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Lesson: Tecton End-To-End Scenario
In lesson you will:
* Build an end-to-end feature pipelines using the core Tecton components:
  * Data Source
  * Entity
  * Feature View
  * Feature Service
* Construct a Training DataSet by querying a Feature Service for historical feature values.
* Train a machine learning model.
* Make real-time predictions using features retreived from the Tecton online feature store.

In this scenario we will work with a dataset of sample purchase transactions from an e-commerce website with the goal of predicting fraud.

In [None]:
%pip install tecton[rift] scikit-learn snowflake-snowpark-python[pandas]

## Connect to Tecton
To get started, we need to import the Tecton module and login to the Tecton workspace.

In [None]:
# Import the tecton module
import tecton
tecton.version.summary()

# Automatically validate new tecton objects (recommended).
tecton.set_validation_mode("auto")

# Login to Tecton, if not already logged-in.
tecton.login("lab.tecton.ai")

## Sample Data
The following dataset simulatings credit card transactions.

In [None]:
# Set snowflake connection parameters
%env SNOWFLAKE_USER=NAB_MFT
%env SNOWFLAKE_PASSWORD=G#Rwzc54#z7LE9L
%env SNOWFLAKE_ACCOUNT=tectonpartner

In [None]:
# Create a Snowflake connection
import os, snowflake
connection_parameters = {
    'user': os.environ['SNOWFLAKE_USER'],
    'password': os.environ['SNOWFLAKE_PASSWORD'],
    'account': os.environ['SNOWFLAKE_ACCOUNT'],
    'warehouse': 'NAB_WH',
    'database': 'NAB',
    'schema': 'PUBLIC',
}
snowflake_conn = snowflake.connector.connect(**connection_parameters)

# Configure Tecton to use this Snowflake connection for all interactive queries
tecton.snowflake_context.set_connection(snowflake_conn)

# A quick helper function to query snowflake from a notebook
def query_snowflake(query):
    df = snowflake_conn.cursor().execute(query).fetch_pandas_all()
    return df

# Here's our Snowflake data
query_snowflake("SELECT * FROM transactions_2 LIMIT 5")

## Key Concept: DataSource
A **Tecton DataSource** tells Tecton where to obtain input data.

In [None]:
from tecton import BatchSource, SnowflakeConfig

transactions = BatchSource(
    name='transactions',
    batch_config=SnowflakeConfig(
        url="https://tectonpartner.snowflakecomputing.com/",
        warehouse="NAB_WH",
        database="NAB",
        schema="PUBLIC",
        table="TRANSACTIONS_2",
        timestamp_field="TIMESTAMP"
    ),
    owner='dbateman@tecton.ai',
    tags={'release': 'production'},
)

# Display sample data
display(transactions.get_dataframe().to_pandas().head(20))

## Key Concept: Entity
A **Tecton Entity** defines the business-concept for which we are modeling features.  The join keys will be used to aggregate, join, and retrieve features.

In [None]:
from tecton import Entity, batch_feature_view, Aggregation
from tecton.types import Field, String, Timestamp, Float64
from datetime import datetime, timedelta

user = Entity(name="user", join_keys=["user_id"])
user

## Key Concept: FeatureView
A **Tecton FeatureView** is how ETL pipelines for features are defined in Tecton:
* It is the fundamental unit of feature materialization and storage in the Tecton.
* SQL or DataFrames transform the raw data and Tecton aggregations to efficiently
  and accurately compute metrics across raw events.
* The Feature View decorators contain a wide range of attributes for materializing,
  cataloging, and monitoring feature.

In [None]:
@batch_feature_view(
    description="User transaction metrics over 1, 3 and 7 days",
    sources=[transactions],
    entities=[user],
    mode="snowflake_sql",
    schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amt", Float64)],
    aggregation_interval=timedelta(days=1),
    aggregations=[
        Aggregation(function="mean", column="amt", time_window=timedelta(days=1)),
        Aggregation(function="mean", column="amt", time_window=timedelta(days=3)),
        Aggregation(function="mean", column="amt", time_window=timedelta(days=7)),
        Aggregation(function="count", column="amt", time_window=timedelta(days=1)),
        Aggregation(function="count", column="amt", time_window=timedelta(days=3)),
        Aggregation(function="count", column="amt", time_window=timedelta(days=7)),
    ],
    online=True,
    offline=True,
    feature_start_time=datetime(2021, 1, 1),
    batch_schedule=timedelta(days=1),
    environment="tecton-rift-core-0.9.0",
)
def user_transaction_metrics(transactions):
    # ATTENTION: Snowflake columns all caps by default and must be quoted to match the lowercase field names above.
    return f'''
        SELECT USER_ID AS "user_id",
               TIMESTAMP AS "timestamp",
               AMT AS "amt"
        FROM {transactions}
    '''

# Display Sample Data
start = datetime(2021, 1, 1)
end = datetime(2099, 1, 1)
df = user_transaction_metrics.get_features_in_range(start_time=start, end_time=end).to_pandas()
display(df.sort_values(["user_id", "_valid_from"]).head(5))

## Key Concept: Feature Service
A **Tecton FeatureService** defines an API endpoint for querying for features from one or more Feature Views.

In [None]:
from tecton import FeatureService

fraud_detection_feature_service = FeatureService(
    name="fraud_detection_feature_service", features=[user_transaction_metrics]
)
fraud_detection_feature_service.get_feature_columns()

## Constructing a Training Dataset
To perform a batch query of a FeatureService to
construct a training dataset, we first must construct a DataFrame specifying
the entities and point in time for which we want the historical feature values.
We also include any training labels we want to associate with the features.

In [None]:
training_events = query_snowflake('''
    SELECT USER_ID AS "user_id",
           TIMESTAMP AS "timestamp",
           AMT AS "amt",
           IS_FRAUD AS "is_fraud"
    FROM TRANSACTIONS_2
    LIMIT 1000
''')

display(training_events.head(5))

In [None]:
training_data = fraud_detection_feature_service.get_features_for_events(training_events).to_pandas().fillna(0)
display(training_data.head(5))

## Use Scikit-Learn to train the model

In [None]:
from sklearn.pipeline import make_pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn import metrics


df = training_data.drop(["user_id", "timestamp", "amt"], axis=1)

X = df.drop("is_fraud", axis=1)
y = df["is_fraud"]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

num_cols = X_train.select_dtypes(exclude=["object"]).columns.tolist()
cat_cols = X_train.select_dtypes(include=["object"]).columns.tolist()

num_pipe = make_pipeline(SimpleImputer(strategy="median"), StandardScaler())

cat_pipe = make_pipeline(
    SimpleImputer(strategy="constant", fill_value="N/A"), OneHotEncoder(handle_unknown="ignore", sparse=False)
)

full_pipe = ColumnTransformer([("num", num_pipe, num_cols), ("cat", cat_pipe, cat_cols)])

model = make_pipeline(full_pipe, LogisticRegression(max_iter=1000, random_state=42))

model.fit(X_train, y_train)

y_predict = model.predict(X_test)

print(metrics.classification_report(y_test, y_predict, zero_division=0))

## Deploy to a live Tecton Workspace

We'd now like to query the feature service to make predictions.  This involves querying the Online Feature Store.  Before we can query the online feature store we must copy our feature definitions to a feature repository and then run `tecton apply` to deploy it to a live Tecton Workspace and launch the materialization jobs.


### Step 1.
Copy the code above into a local python project.  (The consolidated code is included below as `features.py`.)


### Step 2.
Run these shell commands (skipping the # ones):
```
pip install tecton
tecton init
tecton login lab.tecton.ai
# tecton workspace create "[your-name]-quickstart" --live
tecton workspace select "[your-name]-quickstart"
tecton apply
tecton service-account create --name "[your-name]-quickstart" --description "Quickstart service account"
tecton access-control assign-role -r consumer -w "[your-name]-quickstart" -s "[service-account-id from last command]"
```

### File: `Features.py`
```
from datetime import datetime, timedelta

import snowflake
import tecton
from tecton import BatchSource, SnowflakeConfig
from tecton import Entity, batch_feature_view, Aggregation, FeatureService
from tecton.types import Field, String, Timestamp, Float64


connection_parameters = {
    'user': 'NAB_MFT',
    'password': 'G#Rwzc54#z7LE9L',
    'account': 'tectonpartner',
    'warehouse': 'NAB_WH',
    'database': 'NAB',
    'schema': 'PUBLIC',
}
snowflake_conn = snowflake.connector.connect(**connection_parameters)
tecton.snowflake_context.set_connection(snowflake_conn)

def query_snowflake(query):
    df = snowflake_conn.cursor().execute(query).fetch_pandas_all()
    return df

transactions = BatchSource(
    name='transactions',
    batch_config=SnowflakeConfig(
        url="https://tectonpartner.snowflakecomputing.com/",
        warehouse="NAB_WH",
        database="NAB",
        schema="PUBLIC",
        table="TRANSACTIONS_2",
        timestamp_field="TIMESTAMP"
    ),
    owner='dbateman@tecton.ai',
    tags={'release': 'production'},
)

user = Entity(name="user", join_keys=["user_id"])

@batch_feature_view(
    description="User transaction metrics over 1, 3 and 7 days",
    sources=[transactions],
    entities=[user],
    mode="snowflake_sql",
    aggregation_interval=timedelta(days=1),
    aggregations=[
        Aggregation(function="mean", column="amt", time_window=timedelta(days=1)),
        Aggregation(function="mean", column="amt", time_window=timedelta(days=3)),
        Aggregation(function="mean", column="amt", time_window=timedelta(days=7)),
        Aggregation(function="count", column="amt", time_window=timedelta(days=1)),
        Aggregation(function="count", column="amt", time_window=timedelta(days=3)),
        Aggregation(function="count", column="amt", time_window=timedelta(days=7)),
    ],
    schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amt", Float64)],
    online=True,
    offline=True,
    feature_start_time=datetime(2021, 1, 1),
    batch_schedule=timedelta(days=1),    
)
def user_transaction_metrics(transactions):
    return f'''
        SELECT USER_ID AS "user_id",
               TIMESTAMP AS "timestamp",
               AMT AS "amt"
        FROM {transactions}
    '''

fraud_detection_feature_service = FeatureService(
    name="fraud_detection_feature_service", features=[user_transaction_metrics]
)
```

## Query for features in real-time
Query the online feature store via the FeatureService to obtain features to use to make predictions for a user.

In [None]:
import requests, json


def get_online_feature_data(user_id):
    TECTON_API_KEY = "f136aefbd94e99409bb73a0d76aaa16d"
    WORKSPACE_NAME = "tecton-quickstart"
    ACCOUNT_NAME = "lab"

    online_feature_data = requests.request(
        method="POST",
        url=f"https://{ACCOUNT_NAME}.tecton.ai/api/v1/feature-service/get-features",
        headers={
            "Authorization": "Tecton-key " + TECTON_API_KEY,
        },
        json={
            "params": {
                "feature_service_name": "fraud_detection_feature_service",
                "join_key_map": {"user_id": user_id},
                "metadata_options": {"include_names": True},
                "workspace_name": WORKSPACE_NAME,
            }
        },
    )

    return online_feature_data.json()


user_id = "user_502567604689"
feature_data = get_online_feature_data(user_id)
if "result" not in feature_data:
    print("Feature data is not materialized")
else:
    print(feature_data["result"])

## Perform real-time inferencing
Make a prediction by inputing the features into the model.

In [None]:
import pandas as pd

def get_prediction_from_model(feature_data):
    columns = [f["name"].replace(".", "__") for f in feature_data["metadata"]["features"]]
    data = [feature_data["result"]["features"]]
    features = pd.DataFrame(data, columns=columns)[X.columns]
    return model.predict_proba(features)[0][1]

def evaluate_transaction(user_id):
    online_feature_data = get_online_feature_data(user_id)
    is_predicted_fraud = get_prediction_from_model(online_feature_data)
    if is_predicted_fraud <= 0.5:
        return "Transaction accepted."
    else:
        return "Transaction denied."

evaluate_transaction("user_502567604689")