## Feathr Quick Start Notebook

1. run zookeeper-server

    sudo /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties

2. run kafka-server

    sudo /usr/local/kafka/bin/kafka-server-start.sh  /usr/local/kafka/config/server.properties

3. create topic nyc_driver_test

    /usr/local/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic nyc_driver_test

4. check

    /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server=localhost:9092 --list
    
    /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server=localhost:9092 --describe --topic nyc_driver_test
    
5. run python nyc_taxi_kafka_producer.py


## 1. Install Feathr and Necessary Dependancies

Install feathr and necessary packages by running one of following commends if you haven't installed them already:

In [5]:
from datetime import timedelta
import os
from pathlib import Path


import pyspark.sql.functions as F

import feathr
from feathr import (
    FeathrClient,
    # Feature data types
    BOOLEAN, FLOAT, INT32, ValueType,
    # Feature data sources
    INPUT_CONTEXT, HdfsSource,
    # Feature aggregations
    TypedKey, WindowAggTransformation,
    # Feature types and anchor
    DerivedFeature, Feature, FeatureAnchor,
    # Materialization
    BackfillTime, MaterializationSettings, RedisSink,
    # Offline feature computation
    FeatureQuery, ObservationSettings,
)
from feathr.datasets import nyc_taxi
from feathr.spark_provider.feathr_configurations import SparkExecutionConfiguration
from feathr.utils.config import generate_config
from feathr.utils.job_utils import get_result_df
from feathr.utils.platform import is_databricks, is_jupyter

print(f"Feathr version: {feathr.__version__}")

Feathr version: 1.0.0


In [6]:
os.environ['SPARK_LOCAL_IP'] = "127.0.0.1"
os.environ['REDIS_PASSWORD'] = "foobared"  # default password for Redis
import glob
jar_name = glob.glob("./*.jar")[0]

# Make sure we get the Feathr jar name, assuming we just have one jar file.
PROJECT_NAME = "nyc_taxi_kafka"
yaml_config = f"""
api_version: 1
project_config:
  project_name: {PROJECT_NAME}
  
spark_config:
  # choice for spark runtime. Currently support: azure_synapse, databricks, local
  spark_cluster: 'local'
  spark_result_output_parts: '1'
  local:
    master: 'local[*]'
    feathr_runtime_location: '{jar_name}'

online_store:
  redis:
    # Redis configs to access Redis cluster
    host: '127.0.0.1'
    port: 6379
    ssl_enabled: False

feature_registry:
  # The API endpoint of the registry service
  api_endpoint: "http://127.0.0.1:8000/api/v1"
"""
feathr_workspace_folder = Path(f"./{PROJECT_NAME}_feathr_config.yaml")
feathr_workspace_folder.parent.mkdir(exist_ok=True, parents=True)
feathr_workspace_folder.write_text(yaml_config)
print(yaml_config)


api_version: 1
project_config:
  project_name: nyc_taxi_kafka
  
spark_config:
  # choice for spark runtime. Currently support: azure_synapse, databricks, local
  spark_cluster: 'local'
  spark_result_output_parts: '1'
  local:
    master: 'local[*]'
    feathr_runtime_location: './vnpt_feathr-0.0.1.jar'

online_store:
  redis:
    # Redis configs to access Redis cluster
    host: '127.0.0.1'
    port: 6379
    ssl_enabled: False

feature_registry:
  # The API endpoint of the registry service
  api_endpoint: "http://127.0.0.1:8000/api/v1"



All the configurations can be overwritten by environment variables with concatenation of `__` for different layers of the config file, same as how you may pass the keyword arguments to `generate_config` utility function.

For example, `feathr_runtime_location` for databricks config can be overwritten by setting `spark_config__databricks__feathr_runtime_location` environment variable.

### Initialize Feathr client

In [7]:
client = FeathrClient(str(feathr_workspace_folder))

2024-08-08 14:02:55.579 | INFO     | feathr.utils._env_config_reader:get:62 - Config secrets__azure_key_vault__name is not found in the environment variable, configuration file, or the remote key value store. Returning the default value: None.
2024-08-08 14:02:55.580 | INFO     | feathr.utils._env_config_reader:get:62 - Config offline_store__s3__s3_enabled is not found in the environment variable, configuration file, or the remote key value store. Returning the default value: None.
2024-08-08 14:02:55.581 | INFO     | feathr.utils._env_config_reader:get:62 - Config offline_store__adls__adls_enabled is not found in the environment variable, configuration file, or the remote key value store. Returning the default value: None.
2024-08-08 14:02:55.581 | INFO     | feathr.utils._env_config_reader:get:62 - Config offline_store__wasb__wasb_enabled is not found in the environment variable, configuration file, or the remote key value store. Returning the default value: None.
2024-08-08 14:02:55

### Prepare the NYC taxi fare dataset

In [8]:
TIMESTAMP_COL = "lpep_dropoff_datetime"
TIMESTAMP_FORMAT = "yyyy-MM-dd HH:mm:ss"

In [9]:
from feathr import (
    AvroJsonSchema,
    KafKaSource,
    KafkaConfig
)

In [10]:
# Define input data schema
schema = AvroJsonSchema(schemaStr="""
{
    "type": "record",
    "name": "DriverTrips",
    "fields": [
        {"name": "driver_id", "type": "long"},
        {"name": "trips_today", "type": "int"},
        {
        "name": "datetime",
        "type": {"type": "long", "logicalType": "timestamp-micros"}
        }
    ]
}
""")
stream_source = KafKaSource(name="kafkaStreamingSource",
                            kafkaConfig=KafkaConfig(brokers=["localhost:9092"],
                                                    topics=["nyc_driver_test"],
                                                    schema=schema)
                            )

In [11]:
driver_id = TypedKey(key_column="driver_id",
                     key_column_type=ValueType.INT64,
                     description="driver id",
                     full_name="nyc driver id")

kafkaAnchor = FeatureAnchor(name="kafkaAnchor",
                            source=stream_source,
                            features=[Feature(name="f_modified_streaming_count",
                                              feature_type=INT32,
                                              transform="trips_today + 1",
                                              key=driver_id),
                                      Feature(name="f_modified_streaming_count2",
                                              feature_type=INT32,
                                              transform="trips_today + randn() * cos(trips_today)",
                                              key=driver_id)]
                            )

In [12]:
client.build_features(
    anchor_list=[kafkaAnchor],
)

In [13]:
REGISTER_FEATURES = False
if REGISTER_FEATURES:
    try:
        client.register_features()
    except Exception as e:
        print(e)  
    print(client.list_registered_features(project_name=PROJECT_NAME))
    # You can get the actual features too by calling client.get_features_from_registry(PROJECT_NAME)

In [20]:
redisSink = RedisSink(table_name="kafkaSampleDemoFeature", streaming=True, streamingTimeoutMs=10000)
# The 'streamingTimeoutMs' parameter may need to be increased, such as 10min or even longer, to make sure some data sources from Kafka was captured.
settings = MaterializationSettings(name="kafkaSampleDemo",
                                   sinks=[redisSink],
                                   feature_names=['f_modified_streaming_count', 'f_modified_streaming_count2']
                                   )
client.materialize_features(settings, 
                            allow_materialize_non_agg_feature=True) # Will streaming for 10 seconds since streamingTimeoutMs is 10000
client.wait_job_to_finish(timeout_sec=1800)

2024-08-08 14:14:54.326 | INFO     | feathr.utils._env_config_reader:get:62 - Config monitoring__database__sql__url is not found in the environment variable, configuration file, or the remote key value store. Returning the default value: None.
2024-08-08 14:14:54.327 | INFO     | feathr.utils._env_config_reader:get:62 - Config monitoring__database__sql__user is not found in the environment variable, configuration file, or the remote key value store. Returning the default value: None.
2024-08-08 14:14:54.328 | INFO     | feathr.spark_provider._localspark_submission:_get_debug_file_name:292 - Spark log path is debug/nyc_taxi_kafka_feathr_feature_materialization_job20240808141454
2024-08-08 14:14:54.329 | INFO     | feathr.spark_provider._localspark_submission:_init_args:267 - Spark job: nyc_taxi_kafka_feathr_feature_materialization_job is running on local spark with master: local[*].
2024-08-08 14:14:54.339 | INFO     | feathr.spark_provider._localspark_submission:submit_feathr_job:147 -

>x

2024-08-08 14:15:26.378 | INFO     | feathr.spark_provider._localspark_submission:wait_for_completion:233 - Spark job with pid 15076 finished in: 32 seconds                     with returncode 0


In [21]:
res = client.get_online_features('kafkaSampleDemoFeature', '9' ,
                                 ['f_modified_streaming_count', 
                                  'f_modified_streaming_count2'])
res

[10, 9.53262992576499]

In [22]:
# Get features for multiple feature keys
res = client.multi_get_online_features('kafkaSampleDemoFeature', ['1', '2'], ['f_modified_streaming_count',
                                                                              'f_modified_streaming_count2'
                                                                             ])
res

{'1': [2, 0.9999112386259574], '2': [3, 2.3240336962830472]}

Now, you can retrieve features for online scoring as follows: