# Feathr Feature Kafka streaming ingestion

This notebook illustrates the use of kafka streaming ingestion in feathr.



## Prerequisite: Install Feathr

Install Feathr using pip:

`pip install -U feathr pandavro scikit-learn`

Or if you want to use the latest Feathr code from GitHub:

`pip install -I git+https://github.com/linkedin/feathr.git#subdirectory=feathr_project pandavro scikit-learn`

In [None]:
%pip install -U feathr pandavro scikit-learn

## Prerequisite: Configure the required environment

In the first step (Provision cloud resources), you should have provisioned all the required cloud resources. If you use Feathr CLI to create a workspace, you should have a folder with a file called `feathr_config.yaml` in it with all the required configurations. Otherwise, update the configuration below.

The code below will write this configuration string to a temporary location and load it to Feathr. Please still refer to [feathr_config.yaml](https://github.com/linkedin/feathr/blob/main/feathr_project/feathrcli/data/feathr_user_workspace/feathr_config.yaml) and use that as the source of truth. It should also have more explanations on the meaning of each variable.

In [None]:
import tempfile
yaml_config = """
# Please refer to https://github.com/linkedin/feathr/blob/main/feathr_project/feathrcli/data/feathr_user_workspace/feathr_config.yaml for explanations on the meaning of each field.
api_version: 1
project_config:
  project_name: 'feathr_home_credit'
  required_environment_variables:
    - 'REDIS_PASSWORD'
    - 'AZURE_CLIENT_ID'
    - 'AZURE_TENANT_ID'
    - 'AZURE_CLIENT_SECRET'
offline_store:
  adls:
    adls_enabled: tru
  wasb:
    wasb_enabled: true
  s3:
    s3_enabled: false
    s3_endpoint: 's3.amazonaws.com'
  jdbc:
    jdbc_enabled: false
    jdbc_database: 'feathrtestdb'
    jdbc_table: 'feathrtesttable'
  snowflake:
    url: "dqllago-ol19457.snowflakecomputing.com"
    user: "feathrintegration"
    role: "ACCOUNTADMIN"
spark_config:
  spark_cluster: 'azure_synapse'
  spark_result_output_parts: '1'
  azure_synapse:
    dev_url: "https://feathrhomecreditcaspark.dev.azuresynapse.net"
    pool_name: "spark31"
    # workspace dir for storing all the required configuration files and the jar resources
    workspace_dir: "abfss://feathrhomecreditcafs@feathrhomecreditcasto.dfs.core.windows.net/"
    executor_size: "Small"
    executor_num: 4
    feathr_runtime_location: wasbs://public@azurefeathrstorage.blob.core.windows.net/feathr-assembly-LATEST.jar
  databricks:
    workspace_instance_url: 'https://adb-6885802458123232.12.azuredatabricks.net/'
    workspace_token_value: ''
    config_template: {'run_name':'','new_cluster':{'spark_version':'9.1.x-scala2.12','node_type_id':'Standard_D3_v2','num_workers':2,'spark_conf':{}},'libraries':[{'jar':''}],'spark_jar_task':{'main_class_name':'','parameters':['']}}
    work_dir: 'dbfs:/feathr_getting_started'
    feathr_runtime_location: wasbs://public@azurefeathrstorage.blob.core.windows.net/feathr-assembly-LATEST.jar
online_store:
  redis:
    host: 'feathrhomecreditcaredis.redis.cache.windows.net'
    port: 6380
    ssl_enabled: True
feature_registry:
  purview:
    type_system_initialization: true
    purview_name: 'feathrhomecreditcapurview'
    delimiter: '__'
"""
tmp = tempfile.NamedTemporaryFile(mode='w', delete=False)

with open(tmp.name, "w") as text_file:
    text_file.write(yaml_config)


## View the data


In [None]:
import glob
import os
import tempfile
from datetime import datetime, timedelta
from math import sqrt

import pandas as pd
import pandavro as pdx
from feathr import FeathrClient
from feathr import BOOLEAN, FLOAT, INT32, ValueType, STRING
from feathr import Feature, DerivedFeature, FeatureAnchor
from feathr import BackfillTime, MaterializationSettings
from feathr import FeatureQuery, ObservationSettings
from feathr import RedisSink
from feathr import INPUT_CONTEXT, HdfsSource
from feathr import WindowAggTransformation
from feathr import TypedKey
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient

## Setup necessary environment variables

You have to setup the environment variables in order to run this sample. More environment variables can be set by referring to [feathr_config.yaml](https://github.com/linkedin/feathr/blob/main/feathr_project/feathrcli/data/feathr_user_workspace/feathr_config.yaml) and use that as the source of truth. It should also have more explanations on the meaning of each variable.

In [None]:
os.environ['REDIS_PASSWORD'] = ''
os.environ['AZURE_CLIENT_ID'] = ''
os.environ['AZURE_TENANT_ID'] = '' 
os.environ['AZURE_CLIENT_SECRET'] = ''

os.environ['KAFKA_SASL_JASS_CONFIG'] = ''

Then we will initialize a feathr client:


In [None]:
client = FeathrClient(config_path=tmp.name)

## Define Kafka streaming input source

In [None]:
from feathr import AvroJsonSchema, KafKaSource, KafkaConfig

KAFKA_BROKER = "<EVENTHUB_HOST_NAME>:9093"
KAFKA_TOPIC = "<EVENTHUB_TOPIC>"

schema = AvroJsonSchema(schemaStr="""
{
    "type": "record",
    "name": "DriverTrips",
    "fields": [
        {"name": "driver_id", "type": "long"},
        {"name": "trips_today", "type": "int"},
        {
            "name": "datetime",
            "type": {"type": "long", "logicalType": "timestamp-micros"}
        }
    ]
}
""")

stream_source = KafKaSource(name="kafkaStreamingSource",
    kafkaConfig=KafkaConfig(brokers=[KAFKA_BROKER], 
        topics=[KAFKA_TOPIC], 
        schema=schema
    )
)

## Define feature definition with the kafka source

In [None]:
driver_id = TypedKey(key_column="driver_id",
    key_column_type=ValueType.INT64,
    description="driver_id",
    full_name="nyc driver id"
)

kafkaAnchor = FeatureAnchor(name="kafkaAnchor",
    source=stream_source,
    features=[
        Feature(name="f_modified_streaming_count",
            feature_type=INT32,
            transform="trips_today + 1",
            key=driver_id
        ),
        Feature(name="f_modified_streaming_count2",
            feature_type=INT32,
            transform="trips_today + randn() * cos(trips_today)",
            key=driver_id
        )
    ]
)

## Build features

In [None]:
client.build_features(
    anchor_list=[
        kafkaAnchor
])

## Start streaming job

In [None]:
redisSink = RedisSink(table_name="kafkaSampleDemoFeature", streaming=True, streamingTimeoutMs=300000) # 5 minutes
settings = MaterializationSettings(name="kafkaSampleDemo",
    sinks=[redisSink],
    feature_names=["f_modified_streaming_count", "f_modified_streaming_count2"]
)
client.materialize_features(settings)
client.wait_job_to_finish(timeout_sec=600)

## Fetch streaming feature values

In [None]:
# single
res = client.get_online_features('kafkaSampleDemoFeature', '1', ['f_modified_streaming_count', 'f_modified_streaming_count2'])
print(res)
# get featues for multiple feature keys
res = client.multi_get_online_features('kafkaSampleDemoFeature', ['5','10'], ['f_modified_streaming_count', 'f_modified_streaming_count2'])
print(res)