## Imports

In [1]:
import io.hops.util.Hops
import scala.collection.JavaConversions._
import collection.JavaConverters._

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
47,application_1545231463715_0067,spark,idle,Link,Link,✔


SparkSession available as 'spark'.
import io.hops.util.Hops
import scala.collection.JavaConversions._
import collection.JavaConverters._


## Get Project Featurestore

Each project with the featurestore enabled gets its own Hive database for the featurestore, the name of the featurestore database is 'projectname_featurestore' and can be retrieved from the hops-util-py featurestore API

In [2]:
Hops.getProjectFeaturestore

res1: String = fs_demo_featurestore


## Get all Featurestores Accessible in the Current Project

Feature stores can be shared across projects just like other Hopsworks datasets. You can use this API function to list all the featurestores accessible in the project programmatically.

In [3]:
Hops.getProjectFeaturestores

res2: java.util.List[String] = [fs_demo_featurestore]


## Get Individual Feature

When retrieving a single feature from the featurestore, the hops-util-py library will infer which featuregroup the feature belongs to by querying the metastore, but you can also explicitly specify which featuregroup and version to query. If there are multiple features of the same name in the featurestore, it is required to specify enough information to uniquely identify the feature (e.g which featuregroup and which version).  If no featurestore is provided it will default to the project's featurestore.

Without specifying featuregroup:

In [4]:
Hops.getFeature(spark, "action", Hops.getProjectFeaturestore).show(5)

+------+
|action|
+------+
|     0|
|     0|
|     0|
|     0|
|     0|
+------+
only showing top 5 rows



With specifed featuregroup and version:

In [5]:
Hops.getFeature(spark, "action", Hops.getProjectFeaturestore, "web_logs_features", 1).show(5)

+------+
|action|
+------+
|     0|
|     0|
|     0|
|     0|
|     0|
+------+
only showing top 5 rows



## Get Featuregroup

You can get an entire featuregroup from the API. If no featurestore is provided the API will default to the project's featurestore, if no version is provided it will default to version 1 of the featuregroup.

In [6]:
Hops.getFeaturegroup(spark, "trx_summary_features", Hops.getProjectFeaturestore, 1).show(5)

+---------+---------+-------+---------+---------+
|  avg_trx|count_trx|cust_id|  max_trx|  min_trx|
+---------+---------+-------+---------+---------+
| 1090.509|       16|    148|2094.9958| 390.4109|
| 738.1404|       16|    496|1464.5397| 9.235389|
|899.89594|       30|    463|1828.2426|33.797318|
|607.17773|        4|    471|636.18713|578.16833|
| 698.5791|       28|    243| 1582.427|119.73669|
+---------+---------+-------+---------+---------+
only showing top 5 rows



## Get Set of Features

When retrieving a list of features from the featurestore, the hops-util-py library will infer which featuregroup the features belongs to by querying the metastore. If the features reside in different featuregroups, the library will also **try** to infer how to join the features together based on common columns. If the JOIN query cannot be inferred due to existence of multiple features with the same name or non-obvious JOIN query, the user need to supply enough information to the API call to be able to query the featurestore. If the user already knows the JOIN query it can also run `Hops.queryFeaturestore(joinQuery)` directly (an example of using this approach is shown further down in this notebook). If no featurestore is provided it will default to the project's featurestore.

In [7]:
val features = List("pagerank", "triangle_count", "avg_trx")

features: List[String] = List(pagerank, triangle_count, avg_trx)


In [8]:
Hops.getFeatures(spark, features, Hops.getProjectFeaturestore).show(5)

java.lang.IllegalArgumentException: Found the feature with name: avg_trx in more than one of the featuregroups of the featurestore fs_demo_featurestore please specify featuregroup that you want to get the feature from. The matched featuregroups are: pep_lookup_1, customer_type_lookup_1, trx_type_lookup_1, gender_lookup_1, industry_sector_lookup_1, country_lookup_1, alert_type_lookup_1, rule_name_lookup_1, browser_action_lookup_1, web_address_lookup_1, demographic_features_1, alert_features_1, trx_graph_summary_features_1, trx_features_1, trx_summary_features_1, hipo_features_1, trx_graph_edge_list_1, police_report_features_1, web_logs_features_1, trx_summary_features_2_1
  at io.hops.util.featurestore.FeaturestoreHelper.findFeature(FeaturestoreHelper.java:298)
  at io.hops.util.featurestore.FeaturestoreHelper.findFeaturegroupsThatContainsFeatures(FeaturestoreHelper.java:255)
  at io.hops.util.Hops.getFeatures(Hops.java:1504)
  ... 54 elided



Without specifying the join key but specifying featuregroups:

In [9]:
val featuregroupsMap = Map[String, Integer]("trx_graph_summary_features"->1,"trx_summary_features"->1)
val javaFeaturegroupsMap = new java.util.HashMap[String, Integer](featuregroupsMap)

featuregroupsMap: scala.collection.immutable.Map[String,Integer] = Map(trx_graph_summary_features -> 1, trx_summary_features -> 1)
javaFeaturegroupsMap: java.util.HashMap[String,Integer] = {trx_summary_features=1, trx_graph_summary_features=1}


In [10]:
Hops.getFeatures(spark, features, Hops.getProjectFeaturestore, javaFeaturegroupsMap).show(5)

+--------+--------------+---------+
|pagerank|triangle_count|  avg_trx|
+--------+--------------+---------+
|     1.0|           3.0|963.64233|
|     1.0|          12.0| 746.5783|
|     1.0|           7.0|687.91376|
|     1.0|          12.0| 732.6695|
|     1.0|           4.0|  641.785|
+--------+--------------+---------+
only showing top 5 rows



Specifying both featuregroups and join key:

In [11]:
Hops.getFeatures(spark, features, Hops.getProjectFeaturestore, javaFeaturegroupsMap, "cust_id").show(5)

+--------+--------------+---------+
|pagerank|triangle_count|  avg_trx|
+--------+--------------+---------+
|     1.0|           3.0|963.64233|
|     1.0|          12.0| 746.5783|
|     1.0|           7.0|687.91376|
|     1.0|          12.0| 732.6695|
|     1.0|           4.0|  641.785|
+--------+--------------+---------+
only showing top 5 rows



### Advanced examples

Getting 10 features from two different featuregroups without specifying the featuregroups

In [12]:
val features1 = List("pagerank", "triangle_count", "avg_trx", "count_trx", "max_trx", "min_trx", "balance", "birthdate", "join_date", "number_of_accounts")
Hops.getFeatures(spark, features1, Hops.getProjectFeaturestore).show(5)

java.lang.IllegalArgumentException: Found the feature with name: avg_trx in more than one of the featuregroups of the featurestore fs_demo_featurestore please specify featuregroup that you want to get the feature from. The matched featuregroups are: pep_lookup_1, customer_type_lookup_1, trx_type_lookup_1, gender_lookup_1, industry_sector_lookup_1, country_lookup_1, alert_type_lookup_1, rule_name_lookup_1, browser_action_lookup_1, web_address_lookup_1, demographic_features_1, alert_features_1, trx_graph_summary_features_1, trx_features_1, trx_summary_features_1, hipo_features_1, trx_graph_edge_list_1, police_report_features_1, web_logs_features_1, trx_summary_features_2_1
  at io.hops.util.featurestore.FeaturestoreHelper.findFeature(FeaturestoreHelper.java:298)
  at io.hops.util.featurestore.FeaturestoreHelper.findFeaturegroupsThatContainsFeatures(FeaturestoreHelper.java:255)
  at io.hops.util.Hops.getFeatures(Hops.java:1504)
  ... 54 elided



If you try to get features that exist in multiple featuregroups, the library will not be able to infer from which featuregroup to get the features, so you must specify the featuregroups explicitly as an argument

In [13]:
val features2 = List("pagerank", "triangle_count", "avg_trx", "count_trx", "max_trx", "min_trx", "balance", "birthdate", "join_date", "number_of_accounts", "pep")
Hops.getFeatures(spark, features2, Hops.getProjectFeaturestore).show(5)

java.lang.IllegalArgumentException: Found the feature with name: avg_trx in more than one of the featuregroups of the featurestore fs_demo_featurestore please specify featuregroup that you want to get the feature from. The matched featuregroups are: pep_lookup_1, customer_type_lookup_1, trx_type_lookup_1, gender_lookup_1, industry_sector_lookup_1, country_lookup_1, alert_type_lookup_1, rule_name_lookup_1, browser_action_lookup_1, web_address_lookup_1, demographic_features_1, alert_features_1, trx_graph_summary_features_1, trx_features_1, trx_summary_features_1, hipo_features_1, trx_graph_edge_list_1, police_report_features_1, web_logs_features_1, trx_summary_features_2_1
  at io.hops.util.featurestore.FeaturestoreHelper.findFeature(FeaturestoreHelper.java:298)
  at io.hops.util.featurestore.FeaturestoreHelper.findFeaturegroupsThatContainsFeatures(FeaturestoreHelper.java:255)
  at io.hops.util.Hops.getFeatures(Hops.java:1504)
  ... 54 elided



If we specify the featuregroup to get the feature that exists in multiple featuregroups, the library can infer how to get the features:

In [14]:
val featuregroupsMap1 = Map[String, Integer](
    "trx_graph_summary_features"->1,
    "trx_summary_features"->1,
    "demographic_features" ->1
)
val javaFeaturegroupsMap1 = new java.util.HashMap[String, Integer](featuregroupsMap1)
Hops.getFeatures(spark, features2, Hops.getProjectFeaturestore, javaFeaturegroupsMap1).show(5)

featuregroupsMap1: scala.collection.immutable.Map[String,Integer] = Map(trx_graph_summary_features -> 1, trx_summary_features -> 1, demographic_features -> 1)
javaFeaturegroupsMap1: java.util.HashMap[String,Integer] = {demographic_features=1, trx_summary_features=1, trx_graph_summary_features=1}
+--------+--------------+---------+---------+---------+---------+---------+-------------------+-------------------+------------------+-------------+
|pagerank|triangle_count|  avg_trx|count_trx|  max_trx|  min_trx|  balance|          birthdate|          join_date|number_of_accounts|          pep|
+--------+--------------+---------+---------+---------+---------+---------+-------------------+-------------------+------------------+-------------+
|     1.0|           5.0| 1090.509|       16|2094.9958| 390.4109|12920.496|2003-04-12 00:00:00|1998-09-06 00:00:00|                10| 309237645312|
|     1.0|           5.0| 738.1404|       16|1464.5397| 9.235389| 11096.28|1985-09-14 00:00:00|2016-07-06 0

Example of getting 19 features from 5 different featuregroups:

In [15]:
val features3 = List("pagerank", "triangle_count", "avg_trx", "count_trx", "max_trx", "min_trx",
    "balance", "birthdate", "join_date", "number_of_accounts", "pep", "customer_type", "gender", "web_id",
    "time_spent_seconds", "address", "action", "report_date", "report_id")
val featuregroupsMap2 = Map[String, Integer](
    "trx_graph_summary_features"->1,
    "trx_summary_features"->1,
    "demographic_features" ->1,
    "web_logs_features" -> 1,
    "police_report_features" -> 1
)
val javaFeaturegroupsMap2 = new java.util.HashMap[String, Integer](featuregroupsMap2)
Hops.getFeatures(spark, features3, Hops.getProjectFeaturestore, javaFeaturegroupsMap2).show(5)

features3: List[String] = List(pagerank, triangle_count, avg_trx, count_trx, max_trx, min_trx, balance, birthdate, join_date, number_of_accounts, pep, customer_type, gender, web_id, time_spent_seconds, address, action, report_date, report_id)
featuregroupsMap2: scala.collection.immutable.Map[String,Integer] = Map(police_report_features -> 1, web_logs_features -> 1, trx_graph_summary_features -> 1, trx_summary_features -> 1, demographic_features -> 1)
javaFeaturegroupsMap2: java.util.HashMap[String,Integer] = {demographic_features=1, police_report_features=1, web_logs_features=1, trx_summary_features=1, trx_graph_summary_features=1}
+--------+--------------+---------+---------+---------+---------+---------+-------------------+-------------------+------------------+------------+-------------+------------+------+------------------+-------+------+-------------------+---------+
|pagerank|triangle_count|  avg_trx|count_trx|  max_trx|  min_trx|  balance|          birthdate|          join_date

Sometimes you might want to get a feature that exist in multiple featuregroups and you want to include all of these featuregroups in your query, then you can specify from which of the featuregroup to get the feature by prepending the feature-name with the featuregroup name + '_version', e.g: 'demographic_features_1.cust_id'. If you don't specify this the query will fail as the library won't know from which of your specified featuregroups to get the feature:

In [16]:
val features4 = List("pagerank", "triangle_count", "avg_trx", "count_trx", "max_trx", "min_trx",
    "balance", "birthdate", "join_date", "number_of_accounts", "pep", "customer_type", "gender", "web_id",
    "time_spent_seconds", "address", "action", "report_date", "report_id", "cust_id")
Hops.getFeatures(spark, features4, Hops.getProjectFeaturestore, javaFeaturegroupsMap2).show(5)

org.apache.spark.sql.AnalysisException: Reference 'cust_id' is ambiguous, could be: demographic_features_1.cust_id, police_report_features_1.cust_id, web_logs_features_1.cust_id, trx_summary_features_1.cust_id, trx_graph_summary_features_1.cust_id.; line 1 pos 219
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:213)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:97)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$37.apply(Analyzer.scala:826)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$37.apply(Analyzer.scala:828)
  at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:53)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveReferences$$resolve(Analyzer.scala:825)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$ano

If we change 'cust_id' to 'featuregroupname_version.cust_id' the library knows where to get the feature from and the query works:

In [17]:
val features5 = List("pagerank", "triangle_count", "avg_trx", "count_trx", "max_trx", "min_trx",
    "balance", "birthdate", "join_date", "number_of_accounts", "pep", "customer_type", "gender", "web_id",
    "time_spent_seconds", "address", "action", "report_date", "report_id", "demographic_features_1.cust_id")
Hops.getFeatures(spark, features5, Hops.getProjectFeaturestore, javaFeaturegroupsMap2).show(5)

features5: List[String] = List(pagerank, triangle_count, avg_trx, count_trx, max_trx, min_trx, balance, birthdate, join_date, number_of_accounts, pep, customer_type, gender, web_id, time_spent_seconds, address, action, report_date, report_id, demographic_features_1.cust_id)
+--------+--------------+---------+---------+---------+---------+---------+-------------------+-------------------+------------------+------------+-------------+------------+------+------------------+-------+------+-------------------+---------+-------+
|pagerank|triangle_count|  avg_trx|count_trx|  max_trx|  min_trx|  balance|          birthdate|          join_date|number_of_accounts|         pep|customer_type|      gender|web_id|time_spent_seconds|address|action|        report_date|report_id|cust_id|
+--------+--------------+---------+---------+---------+---------+---------+-------------------+-------------------+------------------+------------+-------------+------------+------+------------------+-------+------+--

## Free Text Query from Feature Store

For complex queries that cannot be inferred by the helper functions, enter the sql directly to the method `Hops.queryFeaturestore()` it will default to the project specific feature store but you can also specify it explicitly.

Without specifying the featurestore it will default to the project-specific featurestore:

In [18]:
Hops.queryFeaturestore(
    spark,
    "SELECT * FROM trx_graph_summary_features_1 WHERE triangle_count > 5",
    null
).show(5)

+-------+--------+--------------+
|cust_id|pagerank|triangle_count|
+-------+--------+--------------+
|     29|     1.0|          12.0|
|    474|     1.0|           7.0|
|     65|     1.0|          12.0|
|    222|     1.0|          13.0|
|    270|     1.0|           8.0|
+-------+--------+--------------+
only showing top 5 rows



You can also specify the featurestore to query explicitly:

In [19]:
Hops.queryFeaturestore(
    spark,
    "SELECT * FROM trx_graph_summary_features_1 WHERE triangle_count > 5",
    Hops.getProjectFeaturestore
).show(5)

+-------+--------+--------------+
|cust_id|pagerank|triangle_count|
+-------+--------+--------------+
|     29|     1.0|          12.0|
|    474|     1.0|           7.0|
|     65|     1.0|          12.0|
|    222|     1.0|          13.0|
|    270|     1.0|           8.0|
+-------+--------+--------------+
only showing top 5 rows



## Write to the Feature Store

Lets first get some sample data to insert

In [20]:
val sampleDataMap = Map("hops_customer_1"-> 3, "hops_customer_2"-> 4)
val sampleDataDf = sampleDataMap.toSeq.toDF("customer_type", "id")

sampleDataMap: scala.collection.immutable.Map[String,Int] = Map(hops_customer_1 -> 3, hops_customer_2 -> 4)
sampleDataDf: org.apache.spark.sql.DataFrame = [customer_type: string, id: int]


In [21]:
sampleDataDf.show()

+---------------+---+
|  customer_type| id|
+---------------+---+
|hops_customer_1|  3|
|hops_customer_2|  4|
+---------------+---+



Lets inspect the contents of the featuregroup 'customer_type_lookup' that we are going to insert the sample data into

In [22]:
val sparkDf = Hops.getFeaturegroup(spark, "customer_type_lookup", Hops.getProjectFeaturestore, 1)

sparkDf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [customer_type: string, id: bigint]


In [23]:
sparkDf.show()

+---------------+---+
|  customer_type| id|
+---------------+---+
|hops_customer_1|  3|
|hops_customer_2|  4|
+---------------+---+



In [24]:
sparkDf.count()

res19: Long = 2


Now we can insert the sample data and verify the new contents of the featuregroup. By default the insert mode is "append", the featurestore is the project's featurestore and the version is 1 (the statistics part will be covered later in the notebook)

In [25]:
val featuregroup = "customer_type_lookup"
val featurestore = Hops.getProjectFeaturestore 
val featuregroupVersion = 1 
val mode = "append"
val descriptiveStats = false
val featureCorr = false
val featureHistograms = false
val clusterAnalysis = false
val statColumns = List[String]().asJava
val numBins = null
val corrMethod = null
val numClusters = null
val description = "trx_summary_features without the column count_trx"

featuregroup: String = customer_type_lookup
featurestore: String = fs_demo_featurestore
featuregroupVersion: Int = 1
mode: String = append
descriptiveStats: Boolean = false
featureCorr: Boolean = false
featureHistograms: Boolean = false
clusterAnalysis: Boolean = false
statColumns: java.util.List[String] = []
numBins: Null = null
corrMethod: Null = null
numClusters: Null = null
description: String = trx_summary_features without the column count_trx


In [26]:
Hops.insertIntoFeaturegroup(
    sampleDataDf, 
    spark, 
    featuregroup,
    featurestore,
    featuregroupVersion,
    mode,
    descriptiveStats, 
    featureCorr,
    featureHistograms, 
    clusterAnalysis, 
    statColumns, 
    numBins,
    corrMethod, 
    numClusters
)

In [27]:
Hops.getFeaturegroup(spark, "customer_type_lookup", Hops.getProjectFeaturestore, 1).show()

+---------------+---+
|  customer_type| id|
+---------------+---+
|hops_customer_1|  3|
|hops_customer_1|  3|
|hops_customer_2|  4|
|hops_customer_2|  4|
+---------------+---+



In [28]:
Hops.getFeaturegroup(spark, "customer_type_lookup", Hops.getProjectFeaturestore, 1).count

res22: Long = 4


The two supported insert modes are "append" and "overwrite"

In [29]:
val mode = "overwrite"

mode: String = overwrite


In [30]:
Hops.insertIntoFeaturegroup(
    sampleDataDf, 
    spark, 
    featuregroup,
    featurestore,
    featuregroupVersion,
    mode,
    descriptiveStats, 
    featureCorr,
    featureHistograms, 
    clusterAnalysis, 
    statColumns, 
    numBins,
    corrMethod, 
    numClusters
)

In [31]:
Hops.getFeaturegroup(spark, "customer_type_lookup", Hops.getProjectFeaturestore, 1).show()

+---------------+---+
|  customer_type| id|
+---------------+---+
|hops_customer_1|  3|
|hops_customer_2|  4|
+---------------+---+



In [32]:
Hops.getFeaturegroup(spark, "customer_type_lookup", Hops.getProjectFeaturestore, 1).count

res25: Long = 2


## Create a Featuregroup From a Spark Dataframe

In most cases it is recommended that featuregroups are created in the UI on Hopsworks and that care is taken in documenting the featuregroup. However, sometimes it is practical to create a featuregroup directly from a spark dataframe and fill in the metadata about the featuregroup later in the UI. This can be done through the create_featuregroup API function.

Lets create a new featuregroup that contains the same contents as the featuregroup trx_summary except the the column count_trx is dropped

In [33]:
val trxSummaryDf = Hops.getFeaturegroup(spark, "trx_summary_features", Hops.getProjectFeaturestore, 1)
val trxSummaryDf1 = trxSummaryDf.drop("count_trx")

trxSummaryDf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [avg_trx: float, count_trx: bigint ... 3 more fields]
trxSummaryDf1: org.apache.spark.sql.DataFrame = [avg_trx: float, cust_id: int ... 2 more fields]


In [34]:
trxSummaryDf1.show(5)

+---------+-------+---------+---------+
|  avg_trx|cust_id|  max_trx|  min_trx|
+---------+-------+---------+---------+
| 1090.509|    148|2094.9958| 390.4109|
| 738.1404|    496|1464.5397| 9.235389|
|899.89594|    463|1828.2426|33.797318|
|607.17773|    471|636.18713|578.16833|
| 698.5791|    243| 1582.427|119.73669|
+---------+-------+---------+---------+
only showing top 5 rows



When a feature group is created you can specify metadata about the feature group or set it to null and fill it in later in the feature registry UI. (The statistics part will be explained later on in this notebook)

In [35]:
val jobId = null
val dependencies = List[String]().asJava
val primaryKey = null
val descriptiveStats = false
val featureCorr = false
val featureHistograms = false
val clusterAnalysis = false
val statColumns = List[String]().asJava
val numBins = null
val corrMethod = null
val numClusters = null
val description = "trx_summary_features without the column count_trx"

jobId: Null = null
dependencies: java.util.List[String] = []
primaryKey: Null = null
descriptiveStats: Boolean = false
featureCorr: Boolean = false
featureHistograms: Boolean = false
clusterAnalysis: Boolean = false
statColumns: java.util.List[String] = []
numBins: Null = null
corrMethod: Null = null
numClusters: Null = null
description: String = trx_summary_features without the column count_trx


Lets now create a new featuregroup using the transformed dataframe

In [36]:
Hops.createFeaturegroup(
    spark, trxSummaryDf1, "trx_summary_features_2", Hops.getProjectFeaturestore,
    1, description, jobId,
    dependencies, primaryKey, descriptiveStats, featureCorr,
      featureHistograms, clusterAnalysis, statColumns, numBins,
      corrMethod, numClusters)

## Compute Featuregroup Statistics

Statistics about a featuregroup can be useful in the stage of feature engineering and when deciding which features to use for training.

To compute statistics about an existing featuregroup (that should not be empty of course), you can use the API call update_featuregroup_stats. By default it will compute all statistics (descriptive, feature correlation, histograms, and cluster analysis), use the project's featurestore, use version 1 of the featuregroup and use all columns for computing statistics:

In [37]:
val featuregroup = "trx_summary_features"
val featurestore = Hops.getProjectFeaturestore
val featuregroupVersion = 1
val descriptiveStats = true
val featureCorr = true
val featureHistograms = true
val clusterAnalysis = true
val statColumns = null // null means all columns will be used
val numBins = 20
val corrMethod = "pearson"
val numClusters = 5

featuregroup: String = trx_summary_features
featurestore: String = fs_demo_featurestore
featuregroupVersion: Int = 1
descriptiveStats: Boolean = true
featureCorr: Boolean = true
featureHistograms: Boolean = true
clusterAnalysis: Boolean = true
statColumns: Null = null
numBins: Int = 20
corrMethod: String = pearson
numClusters: Int = 5


In [38]:
Hops.updateFeaturegroupStats(
    spark, featuregroup, Hops.getProjectFeaturestore, featuregroupVersion,
    descriptiveStats, featureCorr, featureHistograms, clusterAnalysis, statColumns,
    numBins, corrMethod, numClusters
)

## Create Managed Training Datasets From Sets of Features

After you have found the features you need in the featurestore you can materialize the features into a training dataset so that you can train a machine learning model using the features. Just as for featuregroups, it is useful to version and document training datasets, for this reason HopsML supports **managed training datasets** which enables you to easily version, document and automate the materialization of training datasets.

Metadata for a training dataset can be created from the Hopsworks UI or directly from the API with the function create_training_dataset. The training datasets in a project are stored in a top-level dataset called Training_Datasets, (i.e `hdfs:///Projects/<ProjectName>/Training_Datasets`.

Once a training dataset have been created you can find it in the featurestore UI in hopsworks under the tab Training datasets, from there you can also edit the metadata if necessary. After a training dataset have been created with the necessary metadata you can save the actual data in the training dataset by using the API function insert_into_training_dataset.

Lets create a dataset called `AML_dataset` by using a set of relevant features from the featurestore.

First we select the features (and/or labels) that we want

In [42]:
val features = List("pagerank", 
                    "triangle_count", 
                    "avg_trx", 
                    "count_trx", 
                    "max_trx", 
                    "min_trx", 
                    "balance", 
                    "number_of_accounts", 
                    "pep")
val featuregroupsToVersionMap = Map[String, Integer](
    "trx_graph_summary_features"->1,
    "trx_summary_features"->1,
    "demographic_features" ->1
)
val javaFeaturegroupsMap = new java.util.HashMap[String, Integer](featuregroupsToVersionMap)

features: List[String] = List(pagerank, triangle_count, avg_trx, count_trx, max_trx, min_trx, balance, number_of_accounts, pep)
featuregroupsToVersionMap: scala.collection.immutable.Map[String,Integer] = Map(trx_graph_summary_features -> 1, trx_summary_features -> 1, demographic_features -> 1)
javaFeaturegroupsMap: java.util.HashMap[String,Integer] = {demographic_features=1, trx_summary_features=1, trx_graph_summary_features=1}


In [43]:
val datasetDf = Hops.getFeatures(spark, features, Hops.getProjectFeaturestore, javaFeaturegroupsMap)

datasetDf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [pagerank: float, triangle_count: float ... 7 more fields]


In [44]:
datasetDf.show(5)

+--------+--------------+---------+---------+---------+---------+---------+------------------+-------------+
|pagerank|triangle_count|  avg_trx|count_trx|  max_trx|  min_trx|  balance|number_of_accounts|          pep|
+--------+--------------+---------+---------+---------+---------+---------+------------------+-------------+
|     1.0|           5.0| 1090.509|       16|2094.9958| 390.4109|12920.496|                10| 309237645312|
|     1.0|           5.0| 738.1404|       16|1464.5397| 9.235389| 11096.28|                 7|1331439861760|
|     1.0|           6.0|899.89594|       30|1828.2426|33.797318|1868.0168|                14| 309237645312|
|     1.0|           4.0|607.17773|        4|636.18713|578.16833| 9278.589|                 1| 309237645312|
|     1.0|           9.0| 698.5791|       28| 1582.427|119.73669| 593.9806|                11|1331439861760|
+--------+--------------+---------+---------+---------+---------+---------+------------------+-------------+
only showing top 5 

When a training dataset is created you can specify metadata about the training dataset or set it to null and fill it in later in the feature registry UI.

In [45]:
val trainingDatasetName = "AML_dataset"
val jobId = null
val dependencies = List[String]().asJava
val primaryKey = null
val dataFormat = "tfrecords"
val descriptiveStats = false
val featureCorr = false
val featureHistograms = false
val clusterAnalysis = false
val statColumns = List[String]().asJava
val numBins = null
val corrMethod = null
val numClusters = null
val description = "Dataset with features for training an AML model"

trainingDatasetName: String = AML_dataset
jobId: Null = null
dependencies: java.util.List[String] = []
primaryKey: Null = null
dataFormat: String = tfrecords
descriptiveStats: Boolean = false
featureCorr: Boolean = false
featureHistograms: Boolean = false
clusterAnalysis: Boolean = false
statColumns: java.util.List[String] = []
numBins: Null = null
corrMethod: Null = null
numClusters: Null = null
description: String = Dataset with features for training an AML model


In [46]:
Hops.createTrainingDataset(
    spark, datasetDf, trainingDatasetName, Hops.getProjectFeaturestore,
    1, description, jobId, dataFormat,
    dependencies, descriptiveStats, featureCorr,
      featureHistograms, clusterAnalysis, statColumns, numBins,
      corrMethod, numClusters)

In [47]:
val trainingDatasetName = "TestDataset"
val jobId = null
val dependencies = List[String]().asJava
val primaryKey = null
val dataFormat = "csv"
val descriptiveStats = false
val featureCorr = false
val featureHistograms = false
val clusterAnalysis = false
val statColumns = List[String]().asJava
val numBins = null
val corrMethod = null
val numClusters = null
val description = "Dataset for Demo purposes"

trainingDatasetName: String = TestDataset
jobId: Null = null
dependencies: java.util.List[String] = []
primaryKey: Null = null
dataFormat: String = csv
descriptiveStats: Boolean = false
featureCorr: Boolean = false
featureHistograms: Boolean = false
clusterAnalysis: Boolean = false
statColumns: java.util.List[String] = []
numBins: Null = null
corrMethod: Null = null
numClusters: Null = null
description: String = Dataset for Demo purposes


In [48]:
Hops.createTrainingDataset(
    spark, datasetDf, trainingDatasetName, Hops.getProjectFeaturestore,
    1, description, jobId, dataFormat,
    dependencies, descriptiveStats, featureCorr,
      featureHistograms, clusterAnalysis, statColumns, numBins,
      corrMethod, numClusters)

## Inserting Into an Existing Training Dataset

Once a dataset have been created, its metadata is browsable in the featurestore registry in the Hopsworks UI. If you don't want to create a new training dataset but just overwrite new data into an existing training dataset (training datasets are immutable and generally stored in binary formats, modifying an existing traning dataset is not supported), you can use the API function `insertIntoTrainingDataset`

In [49]:
val trainingDataset = "TestDataset"
val featurestore = Hops.getProjectFeaturestore 
val trainingDatasetVersion = 1 
val mode = "append"
val descriptiveStats = false
val featureCorr = false
val featureHistograms = false
val clusterAnalysis = false
val statColumns = List[String]().asJava
val numBins = null
val corrMethod = null
val numClusters = null
val description = "trx_summary_features without the column count_trx"

trainingDataset: String = TestDataset
featurestore: String = fs_demo_featurestore
trainingDatasetVersion: Int = 1
mode: String = append
descriptiveStats: Boolean = false
featureCorr: Boolean = false
featureHistograms: Boolean = false
clusterAnalysis: Boolean = false
statColumns: java.util.List[String] = []
numBins: Null = null
corrMethod: Null = null
numClusters: Null = null
description: String = trx_summary_features without the column count_trx


In [50]:
Hops.insertIntoTrainingDataset(
    datasetDf, 
    spark,
    trainingDataset,
    featurestore,
    trainingDatasetVersion,
    descriptiveStats, 
    featureCorr,
    featureHistograms, 
    clusterAnalysis, 
    statColumns, 
    numBins,
    corrMethod, 
    numClusters)

## Get Training Dataset Path

After a **managed** dataset have been created, it is easy to share it and re-use it for training various models. For example if the dataset have been materialized in tf-records format you can call the method `getTrainingDatasetPath(training_dataset)` to get the HDFS path and read it directly in your tensorflow/keras/pytorch code. By default the library will look for the training dataset in the project's featurestore and use version 1

In [51]:
Hops.getTrainingDatasetPath("AML_dataset", Hops.getProjectFeaturestore, 1)

res34: String = /Projects/fs_demo/Training_Datasets/AML_dataset_1/AML_dataset


## Get Featurestore Metadata

To explore the contents of the featurestore we recommend using the featurestore page in the Hopsworks UI but you can also get the metadata programmatically from the REST API

### List all Feature Stores Accessible In the Project

In [52]:
Hops.getProjectFeaturestores()

res35: java.util.List[String] = [fs_demo_featurestore]


### List all Feature Groups in a Feature Store

In [53]:
Hops.getFeaturegroups(Hops.getProjectFeaturestore)

res36: java.util.List[String] = [pep_lookup, customer_type_lookup, trx_type_lookup, gender_lookup, industry_sector_lookup, country_lookup, alert_type_lookup, rule_name_lookup, browser_action_lookup, web_address_lookup, demographic_features, alert_features, trx_graph_summary_features, trx_features, trx_summary_features, hipo_features, trx_graph_edge_list, police_report_features, web_logs_features, trx_summary_features_2]


### List all Training Datasets in a Feature Store

In [54]:
Hops.getTrainingDatasets(Hops.getProjectFeaturestore)

res37: java.util.List[String] = [AML_dataset, TestDataset]


### Get All Metadata (Features, Feature groups, Training Datasets) for a Feature Store

In [55]:
Hops.getFeaturestoreMetadata(Hops.getProjectFeaturestore)

res38: io.hops.util.featurestore.FeaturegroupsAndTrainingDatasetsDTO = FeaturegroupsAndTrainingDatasetsDTO{featuregroups=[FeaturegroupDTO{, hdfsStorePaths=[hdfs://10.0.2.15:8020/apps/hive/warehouse/fs_demo_featurestore.db/pep_lookup_1]}, FeaturegroupDTO{, hdfsStorePaths=[hdfs://10.0.2.15:8020/apps/hive/warehouse/fs_demo_featurestore.db/customer_type_lookup_1]}, FeaturegroupDTO{, hdfsStorePaths=[hdfs://10.0.2.15:8020/apps/hive/warehouse/fs_demo_featurestore.db/trx_type_lookup_1]}, FeaturegroupDTO{, hdfsStorePaths=[hdfs://10.0.2.15:8020/apps/hive/warehouse/fs_demo_featurestore.db/gender_lookup_1]}, FeaturegroupDTO{, hdfsStorePaths=[hdfs://10.0.2.15:8020/apps/hive/warehouse/fs_demo_featurestore.db/industry_sector_lookup_1]}, FeaturegroupDTO{, hdfsStorePaths=[hdfs://10.0.2.15:8020/apps/hive...