# Feature Store - A unified storage of curated features

This notebook is intended to help you get started with Feature Store in the H2O AI Cloud using python.

* **Product Documentation:** https://h2oai.github.io/featurestore/

In [3]:
from featurestore import CSVFile, Schema
from pyspark.sql import SparkSession
from h2o_ai_cloud import fs_client

## Configure User Spark session for Feature Store

### Set up Java Environment for Spark 

In [4]:
import os
from jdk4py import JAVA_HOME
os.environ['JAVA_HOME'] = str(JAVA_HOME)

In [5]:
spark_dependencies_jar = "https://s3.amazonaws.com/artifacts.h2o.ai/releases/ai/h2o/feature-store/release/0.15.0/spark-dependencies/featurestore-azure-gen2-spark-dependencies-0.15.0.jar"
spark = SparkSession.builder \
    .master("local") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.1,org.apache.hadoop:hadoop-azure:3.3.1,io.delta:delta-core_2.12:2.2.0") \
    .config("spark.jars", spark_dependencies_jar) \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

:: loading settings :: url = jar:file:/opt/conda/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
org.apache.hadoop#hadoop-azure added as a dependency
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-67b05020-a1d9-41be-9fae-9f8b614c61c3;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.1 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.901 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
	found org.apache.hadoop#hadoop-azure;3.3.1 in central
	found org.apache.httpcomponents#httpclient;4.5.13 in central
	found org.apache.httpcomponents#httpcore;4.4.13 in central
	found commons-logging#commons-logging;1.1.3 in central
	found commons-codec#commons-codec;1.11 in central
	found com.microsoft.azure#azure-storage;7.0.1 in central
	found com.fasterxml.jackson.core#jackson-core;2.10.5 in central
	found org.slf4j#slf4j-ap

23/04/17 17:15:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Connect to Feature Store
We first connect to the Feature Store cloud endpoint using appropriate H2O Cloud discovery service inorder to initialize client. Then we can log into Feature Store.

In [6]:
client = fs_client()
client.auth.login()

17-04-2023 05:16:00 : INFO : client : Connecting to the server featurestore-api.cloud-qa.h2o.ai ...
17-04-2023 05:16:02 : ERROR : auth : Browser is not supported: Please visit https://auth.demo.h2o.ai/auth/realms/q8s-qa/protocol/openid-connect/auth?client_id=feature-store-qa&code_challenge=-IFutm4_E4ZeZiDe_Iqf35D1BPBCYcKGyKJGuxPWtwM&code_challenge_method=S256&redirect_uri=https://featurestore.cloud-qa.h2o.ai/Callback&response_type=code&scope=openid%20offline_access&state=gK8R62SM7l to continue authentication.


## Understand the environment

In [7]:
client.get_version()

'0.15.0'

## Define data source 
Feature Store supports different data sources - https://h2oai.github.io/featurestore/supported_data_sources

In [8]:
source = CSVFile("s3a://h2o-public-test-data/smalldata/gbm_test/titanic.csv")

## Extract schema from data source
The schema represents the features of the feature set

In [9]:
schema = client.extract_schema_from_source(source)

17-04-2023 05:16:52 : INFO : interactive_console : Job ID: 01gy83ewsh7zhcmg8zcjewmh7e, Status: Finished setting up spark session.
17-04-2023 05:17:06 : INFO : interactive_console : Job ID: 01gy83ewsh7zhcmg8zcjewmh7e, Status: Finished reading data from source location to extract schema.
17-04-2023 05:17:06 : INFO : interactive_console : Job ID: 01gy83ewsh7zhcmg8zcjewmh7e, Status: Schema generation completed.
17-04-2023 05:17:06 : INFO : interactive_console : 

Time taken - 60.696 seconds


## Create a project
User can follow naming conventions mentioned in here - https://h2oai.github.io/featurestore/api/naming_conventions

In [12]:
project = client.projects.create("sample_project")

## Create a feature set

In [13]:
feature_set = project.feature_sets.register(schema, "sample_fs")

## Ingest data from source
Uploading data into Feature Store

In [14]:
feature_set.ingest(source)

17-04-2023 05:18:14 : INFO : interactive_console : Job ID: 01gy83j2wbpdkg9cfsqt77v5pq, Status: Finished setting up spark session.
17-04-2023 05:18:14 : INFO : interactive_console : Job ID: 01gy83j2wbpdkg9cfsqt77v5pq, Status: Finished reading data to ingest.
17-04-2023 05:18:22 : INFO : interactive_console : Job ID: 01gy83j2wbpdkg9cfsqt77v5pq, Status: Finished extracting scope from the data.
17-04-2023 05:18:42 : INFO : interactive_console : Job ID: 01gy83j2wbpdkg9cfsqt77v5pq, Status: Finished computation of incremental statistics.
17-04-2023 05:19:57 : INFO : interactive_console : Job ID: 01gy83j2wbpdkg9cfsqt77v5pq, Status: Finished writing data to main storage.
17-04-2023 05:20:01 : INFO : interactive_console : 

Time taken - 131.536 seconds


{
  "rawCacheLocation": "01879039-0b43-d3e8-f649-3007b595fde7/01879039-0b5c-68d9-de71-de543814f45b-f6b4c38b-f891-4346-acef-4bd76e0d0476-raw",
  "ingestionTimestamp": "2023-04-17T17:18:13.096298028Z",
  "ingestScope": {
    "startDateTime": "2023-04-17T17:18:13.096298028Z",
    "endDateTime": "2023-04-17T17:18:13.096298028Z"
  },
  "ingestId": "01gy83js39km6f7ke8tjqgy8d0",
  "cacheLocation": "",
  "message": ""
}

## Retrieve the data

In [15]:
reference = feature_set.retrieve()

## Download features
Download the files from Feature Store

In [16]:
reference.download()

17-04-2023 05:20:28 : INFO : interactive_console : Job ID: 01gy83p3bk6x9qgegky2n3pqbe, Status: Finished setting up spark session.
17-04-2023 05:21:06 : INFO : interactive_console : Job ID: 01gy83p3bk6x9qgegky2n3pqbe, Status: Finished reading data from main storage.
17-04-2023 05:21:34 : INFO : interactive_console : Job ID: 01gy83p3bk6x9qgegky2n3pqbe, Status: Finished writing data to retrieve storage.
17-04-2023 05:21:34 : INFO : interactive_console : Job ID: 01gy83p3bk6x9qgegky2n3pqbe, Status: Finished generating pre-signed urls.


'/tmp/tmpbbu36pmt'

## Obtain data as a Spark Frame 
Download features as spark dataframe

In [17]:
reference.as_spark_frame(spark).show()

[Stage 8:>                                                          (0 + 1) / 1]

+------+--------+--------------------+------+----+-----+-----+----------------+--------+-----------+--------+----+----+--------------------+---------------------------------+
|pclass|survived|                name|   sex| age|sibsp|parch|          ticket|    fare|      cabin|embarked|boat|body|           home.dest|time_travel_column_auto_generated|
+------+--------+--------------------+------+----+-----+-----+----------------+--------+-----------+--------+----+----+--------------------+---------------------------------+
|     1|       1|Cardeza  Mr. Thom...|  male|36.0|    0|    1|        PC 17755|512.3292|B51 B53 B55|       C|   3|null|Austria-Hungary /...|              2023-04-17 17:18:13|
|     2|       0|Hickman  Mr. Stan...|  male|21.0|    2|    0|    S.O.C. 14879|    73.5|       null|       S|null|null|West Hampstead  L...|              2023-04-17 17:18:13|
|     2|       0|   Hold  Mr. Stephen|  male|44.0|    1|    0|           26707|    26.0|       null|       S|null|null|Englan

                                                                                

### Prepare a schema from a string
Schema can be created from a string format

In [18]:
schema_str = "id integer, value string"
schema = Schema.create_from(schema_str)

### Create another feature set

In [19]:
fs_online = project.feature_sets.register(schema, "sample_fs_online", primary_key="id")

## Ingest data from Online Feature Store

In [20]:
fs_online.ingest_online('{"id": 1, "value": "test"}')

## Retrieve data from Online Feature Store

In [22]:
fs_online.retrieve_online(1)

{'id': 1, 'value': 'test'}

## Delete a feature set

In [23]:
fs_online.delete()

17-04-2023 05:22:06 : INFO : feature_set : Feature set 'sample_fs_online' is deleted


## Delete a project

In [24]:
project.delete()

17-04-2023 05:22:06 : INFO : project : Project 'sample_project' is deleted
