# `hops-util-py` Integration Tests

This notebook can be converted to a python file and submitted as a spark job for integration tests

## Imports

In [1]:
from hops import experiment, hdfs, tensorboard, devices, kafka, featurestore, tls, util
import stat
import os
from pyspark.sql import SQLContext
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, LongType, IntegerType, FloatType
import pandas as pd
import numpy as np
import datetime
import time
from pyspark.sql import DataFrame
from petastorm.unischema import dict_to_spark_row, Unischema, UnischemaField
from petastorm.codecs import ScalarCodec, CompressedImageCodec, NdarrayCodec
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType
from pyspark.sql import SparkSession
import tensorflow as tf
import sys

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1554468059973_0008,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


## Experiments Tests

##### Test Custom Experiments

- `experiment.begin()`
- `experiment.end()`

In [3]:
experiment.begin(name='some custom thing 1', local_logdir=False)
assert tensorboard.logdir() != None
assert "hdfs://" in tensorboard.logdir()
pi = 1+3+0.14
experiment.end(pi)

In [4]:
experiment.begin(name='some custom thing 2', local_logdir=True, description='i am making custom exp on hops')
assert tensorboard.logdir() != None
assert "hdfs://" not in tensorboard.logdir()
pi = 1337
experiment.end(pi)

##### Test `experiment.launch`

In [8]:
def wrapper():
    assert tensorboard.logdir() != None
    assert devices.get_num_gpus() >= 0
    assert hdfs.project_path() == hdfs.project_path(hdfs.project_name())
    if tensorboard.local_logdir_bool:
        assert "hdfs://" not in tensorboard.logdir()
    else:
        assert "hdfs://" in tensorboard.logdir()

In [9]:
experiment.launch(wrapper, local_logdir=True)

Finished Experiment 

u'hdfs://10.0.2.15:8020/Projects/project_bde67d4d2ef6dcfc/Experiments/application_1552647327939_0024/launcher/run.3'

In [10]:
experiment.launch(wrapper, description='very interesting description', local_logdir=False)

Finished Experiment 

u'hdfs://10.0.2.15:8020/Projects/project_bde67d4d2ef6dcfc/Experiments/application_1552647327939_0024/launcher/run.4'

In [14]:
def parameter_wrapper(a, b):
    assert tensorboard.logdir() != None
    assert devices.get_num_gpus() >= 0
    if tensorboard.local_logdir_bool:
        assert "hdfs://" not in tensorboard.logdir()
    else:
        assert "hdfs://" in tensorboard.logdir()
    return a + b

In [15]:
experiment.launch(parameter_wrapper, {'a': [1,2], 'b': [-1,1]}, local_logdir=True)

Finished Experiment 

u'hdfs://10.0.2.15:8020/Projects/project_bde67d4d2ef6dcfc/Experiments/application_1552647327939_0024/launcher/run.7'

In [16]:
experiment.launch(parameter_wrapper, {'a': [1,2], 'b': [-1,1]}, description='very interesting description', local_logdir=False)

Finished Experiment 

u'hdfs://10.0.2.15:8020/Projects/project_bde67d4d2ef6dcfc/Experiments/application_1552647327939_0024/launcher/run.8'

##### Test Parallel Experiments `experiment.grid_search`, `experiment.random_search`, `experiment.differential_evolution`

In [17]:
experiment.grid_search(parameter_wrapper, {'a': [1,2], 'b': [-1,1]}, direction='min', name='test')


------ Grid Search results ------ direction(min) 
BEST combination a=1.b=-1 -- metric 0.0
WORST combination a=2.b=1 -- metric 3.0
AVERAGE metric -- 1.5
Total job time 1 seconds

Finished Experiment 

u'hdfs://10.0.2.15:8020/Projects/project_bde67d4d2ef6dcfc/Experiments/application_1552647327939_0024/grid_search/run.1'

In [18]:
experiment.grid_search(parameter_wrapper, {'a': [1,2], 'b': [-1,1]}, direction='max', name='test', local_logdir=True)


------ Grid Search results ------ direction(max) 
BEST combination a=2.b=1 -- metric 3.0
WORST combination a=1.b=-1 -- metric 0.0
AVERAGE metric -- 1.5
Total job time 1 seconds

Finished Experiment 

u'hdfs://10.0.2.15:8020/Projects/project_bde67d4d2ef6dcfc/Experiments/application_1552647327939_0024/grid_search/run.2'

In [19]:
experiment.random_search(parameter_wrapper, {'a': [1,2], 'b': [-1,1]}, direction='max', name='test')


------ Random Search results ------ direction(max) 
BEST combination a=2.b=1 -- metric 3.0
WORST combination a=1.b=0 -- metric 1.0
AVERAGE metric -- 1.8
Total job time 2 seconds

Finished Experiment 

u'hdfs://10.0.2.15:8020/Projects/project_bde67d4d2ef6dcfc/Experiments/application_1552647327939_0024/random_search/run.1'

In [20]:
experiment.random_search(parameter_wrapper, {'a': [1,2], 'b': [-1,1]}, direction='min', samples=3, name='test', local_logdir=True)


------ Random Search results ------ direction(min) 
BEST combination a=2.b=-1 -- metric 1.0
WORST combination a=2.b=0 -- metric 2.0
AVERAGE metric -- 1.5
Total job time 7 seconds

Finished Experiment 

u'hdfs://10.0.2.15:8020/Projects/project_bde67d4d2ef6dcfc/Experiments/application_1552647327939_0024/random_search/run.2'

In [23]:
logdir1, result_dict1 = experiment.differential_evolution(parameter_wrapper, {'a': [1,2], 'b': [-1,1]}, local_logdir=True, direction='max', generations=8, population=12)
assert int(result_dict1['a']) == 2 and int(result_dict1['b']) == 1

Generation 0 || average metric: 1.66666666667, best metric: 3.0, best parameter combination: ['a=2', 'b=1']

Generation 1 || average metric: 2.25, best metric: 3.0, best parameter combination: ['a=2', 'b=1']

Generation 2 || average metric: 2.75, best metric: 3.0, best parameter combination: ['a=2', 'b=1']

Generation 3 || average metric: 2.75, best metric: 3.0, best parameter combination: ['a=2', 'b=1']

Generation 4 || average metric: 2.91666666667, best metric: 3.0, best parameter combination: ['a=2', 'b=1']

Generation 5 || average metric: 2.91666666667, best metric: 3.0, best parameter combination: ['a=2', 'b=1']

Generation 6 || average metric: 3.0, best metric: 3.0, best parameter combination: ['a=2', 'b=1']

Generation 7 || average metric: 3.0, best metric: 3.0, best parameter combination: ['a=2', 'b=1']

Generation 8 || average metric: 3.0, best metric: 3.0, best parameter combination: ['a=2', 'b=1']

Finished Experiment

In [24]:
logdir2, result_dict2 = experiment.differential_evolution(parameter_wrapper, {'a': [1,2], 'b': [-1,1]}, generations=8, population=10, direction='min')
assert int(result_dict2['a']) == 1 and int(result_dict2['b']) == -1

Generation 0 || average metric: 1.1, best metric: 0.0, best parameter combination: ['a=1', 'b=-1']

Generation 1 || average metric: 0.8, best metric: 0.0, best parameter combination: ['a=1', 'b=-1']

Generation 2 || average metric: 0.4, best metric: 0.0, best parameter combination: ['a=1', 'b=-1']

Generation 3 || average metric: 0.4, best metric: 0.0, best parameter combination: ['a=1', 'b=-1']

Generation 4 || average metric: 0.3, best metric: 0.0, best parameter combination: ['a=1', 'b=-1']

Generation 5 || average metric: 0.2, best metric: 0.0, best parameter combination: ['a=1', 'b=-1']

Generation 6 || average metric: 0.0, best metric: 0.0, best parameter combination: ['a=1', 'b=-1']

Generation 7 || average metric: 0.0, best metric: 0.0, best parameter combination: ['a=1', 'b=-1']

Generation 8 || average metric: 0.0, best metric: 0.0, best parameter combination: ['a=1', 'b=-1']

Finished Experiment

## HopsFS Tests

##### Test HopsFS operations

- `hdfs.project_user()`
- `hdfs.project_name()`
- `hdfs.project_path()`
- `hdfs.exists()`
- `hdfs.load()`
- `hdfs.copy_to_local()`
- `hdfs.ls()`
- `hdfs.lsl()`
- `hdfs.glob()`
- `hdfs.cp()`
- `hdfs.rmr()`
- `hdfs.rename()`
- `hdfs.stat()`

In [None]:
project_user = hdfs.project_user()
project_name = hdfs.project_name()
assert project_name in project_user
project_path = hdfs.project_path()
assert project_name in project_path

In [None]:
logs_README = hdfs.load("Logs/README.md")
assert len(logs_README) > 0

In [None]:
hdfs.dump("test", "Logs/README_dump_test.md")
assert hdfs.exists("Logs/README_dump_test.md")

In [None]:
logs_README_dumped = hdfs.load("Logs/README_dump_test.md")
assert logs_README_dumped.decode("utf-8") == "test"

In [None]:
with open('test.txt', 'w') as f:
    f.write("test")
hdfs.copy_to_hdfs("test.txt", "Resources/test.txt", overwrite=True)
assert hdfs.exists("Resources/test.txt")

In [None]:
hdfs.copy_to_local("Resources/test.txt", "", overwrite=True)
hdfs_copied_file = hdfs.load("Resources/test.txt")
with open('test.txt', 'r') as f:
    local_copied_file = f.read()
assert hdfs_copied_file.decode("utf-8") == "test"
assert local_copied_file == "test"
assert hdfs.ls("Logs/").__class__.__name__ == 'list'

In [None]:
hdfs.copy_to_local("Logs", "", overwrite=True)
assert os.path.exists("Logs")
assert os.path.isdir("Logs")

In [None]:
logs_files_md = hdfs.glob("Logs/*.md")
logs_path_names = hdfs.lsl("Logs/")
if hdfs.exists("Logs/test.txt"):
    hdfs.rmr("Logs/test.txt")
assert not hdfs.exists("Logs/test.txt")

In [None]:
hdfs.cp("Resources/test.txt", "Logs/")
logs_files = hdfs.ls("Logs/")
assert "test.txt" in ",".join(logs_files)

In [None]:
hdfs.mkdir("Logs/test_dir")
assert hdfs.exists("Logs/test_dir")

In [None]:
logs_files_prior_delete = hdfs.ls("Logs/")
hdfs.rmr("Logs/test_dir")
logs_files_after_delete = hdfs.ls("Logs/")
assert len(logs_files_prior_delete) > len(logs_files_after_delete)

In [None]:
logs_files_prior_move = hdfs.ls("Logs/")
assert "README_dump_test.md" in ",".join(logs_files_prior_move)

In [None]:
hdfs.move("Logs/README_dump_test.md", "Logs/README_dump_test2.md")
logs_files_after_move = hdfs.ls("Logs/")
assert "README_dump_test.md" not in ",".join(logs_files_after_move)
assert "README_dump_test2.md" in ",".join(logs_files_after_move)

In [None]:
logs_files_prior_rename = hdfs.ls("Logs/")
assert "README_dump_test2.md" in ",".join(logs_files_prior_rename)

In [None]:
hdfs.rename("Logs/README_dump_test2.md", "Logs/README_dump_test.md")
logs_files_after_rename = hdfs.ls("Logs/")
assert "Logs/README_dump_test2.md" not in ",".join(logs_files_after_rename)
assert "Logs/README_dump_test.md" in ",".join(logs_files_after_rename)

In [None]:
file_stat = hdfs.stat("Logs/README.md")
hdfs.chmod("Logs/README.md", 775)
file_stat = hdfs.stat("Logs/README.md")
assert 775 == file_stat.st_mode

In [None]:
hdfs.chmod("Logs/README.md", 777)
file_stat = hdfs.stat("Logs/README.md")
assert 777 == file_stat.st_mode

In [None]:
file_owner = file_stat.st_uid
assert hdfs.exists("Logs/")
assert not hdfs.exists("Not_Existing/neither_am_i")

## Training Tests

##### Test Distributed Training MirroredStrategy (This may fail if not configured MirroredStrategy)

- `experiment.mirrored()`

In [None]:
def mirrored():
    assert 'TF_CONFIG' in os.environ
    assert devices.get_num_gpus() >= 0

In [None]:
experiment.mirrored(mirrored, local_logdir=True)

In [None]:
experiment.mirrored(mirrored, name='mirrortime', description='such desc', local_logdir=False)

##### Test Distributed Training CollectiveAllReduceStrategy (This may fail if not configured CollectiveAllReduceStrategy)

- `experiment.collective_all_reduce()`

In [2]:
def collective():
    assert 'TF_CONFIG' in os.environ
    assert devices.get_num_gpus() >= 0

In [3]:
experiment.collective_all_reduce(collective, local_logdir=True)

Finished Experiment 

u'hdfs://10.0.2.15:8020/Projects/project_bde67d4d2ef6dcfc/Experiments/application_1552647327939_0019/collective_all_reduce/run.1'

In [4]:
experiment.collective_all_reduce(collective, name='collectivetime', description='such desc', local_logdir=False)

Finished Experiment 

u'hdfs://10.0.2.15:8020/Projects/project_bde67d4d2ef6dcfc/Experiments/application_1552647327939_0019/collective_all_reduce/run.2'

##### Test Distributed Training ParameterServerStrategy (This may fail if not configured ParameterServerStrategy)

- `experiment.parameter_server()`

In [5]:
def ps():
    assert 'TF_CONFIG' in os.environ
    assert devices.get_num_gpus() >= 0

In [6]:
# TODO: these ps tests does not complete, it waits indefinitely
#experiment.parameter_server(ps, local_logdir=False)

In [7]:
#experiment.parameter_server(ps, local_logdir=True)

In [8]:
#experiment.parameter_server(ps, name='mirrortime', description='such desc')

## Feature Store Tests

These tests require that you have the following files in the Resources directory:

- `attendances_features.csv`
- `games_features.csv`
- `players_features.csv`
- `season_scores_features.csv`
- `teams_features.csv`

These files can be downloaded from here: `http://snurran.sics.se/hops/hops-util-py_test/`

In [10]:
spark = util._find_spark()

##### Test Featurestore Create Feature Group Operations (`featurestore.create_featuregroup()`)

In [11]:
def load_fs_sample_data():
    resources_path = hdfs.project_path() + "Resources/"
    games_features_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(resources_path + "games_features.csv")
    players_features_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(resources_path + "players_features.csv")
    teams_features_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(resources_path + "teams_features.csv")
    season_scores_features_df = spark.read.format("csv").option("header", "true").option("inferSchema","true").load(resources_path + "season_scores_features.csv")
    attendances_features_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(resources_path + "attendances_features.csv")
    return games_features_df,players_features_df,teams_features_df,season_scores_features_df, attendances_features_df
games_features_df,players_features_df,teams_features_df,season_scores_features_df, attendances_features_df = load_fs_sample_data()

In [12]:
featurestore.create_featuregroup(
    games_features_df,
    "games_features",
    description="Features of average season scores for football teams"
)

computing descriptive statistics for : games_features
computing feature correlation for: games_features
computing feature histograms for: games_features
computing cluster analysis for: games_features
Running sql: use project_bde67d4d2ef6dcfc_featurestore
Feature group created successfully

In [13]:
featurestore.create_featuregroup(
    teams_features_df,
    "teams_features",
    description="a spanish version of teams_features"
)

computing descriptive statistics for : teams_features
computing feature correlation for: teams_features
computing feature histograms for: teams_features
computing cluster analysis for: teams_features
Running sql: use project_bde67d4d2ef6dcfc_featurestore
Feature group created successfully

In [14]:
featurestore.create_featuregroup(
    season_scores_features_df,
    "season_scores_features",
    description="Features of average season scores for football teams"
)

computing descriptive statistics for : season_scores_features
computing feature correlation for: season_scores_features
computing feature histograms for: season_scores_features
computing cluster analysis for: season_scores_features
Running sql: use project_bde67d4d2ef6dcfc_featurestore
Feature group created successfully

In [15]:
featurestore.create_featuregroup(
    attendances_features_df,
    "attendances_features",
    description="Features of average attendance of games of football teams"
)

computing descriptive statistics for : attendances_features
computing feature correlation for: attendances_features
computing feature histograms for: attendances_features
computing cluster analysis for: attendances_features
Running sql: use project_bde67d4d2ef6dcfc_featurestore
Feature group created successfully

In [16]:
teams_features_1_df = featurestore.get_featuregroup("teams_features")
teams_features_2_df = teams_features_1_df.withColumnRenamed(
    "team_id", "equipo_id").withColumnRenamed(
    "team_budget", "equipo_presupuesto").withColumnRenamed(
    "team_position", "equipo_posicion")

Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: SELECT * FROM teams_features_1

In [17]:
featurestore.create_featuregroup(
    teams_features_2_df,
    "teams_features_spanish",
    description="a spanish version of teams_features",
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False
)

Running sql: use project_bde67d4d2ef6dcfc_featurestore
Feature group created successfully

In [18]:
featurestore.create_featuregroup(
    teams_features_2_df,
    "teams_features_spanish",
    description="a spanish version of teams_features",
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    featurestore=featurestore.project_featurestore(),
    featuregroup_version=1,
    job_name=None,
    dependencies=[]
)

Running sql: use project_bde67d4d2ef6dcfc_featurestore
Feature group created successfully

In [19]:
featurestore.create_featuregroup(
    teams_features_2_df,
    "teams_features_spanish",
    description="a spanish version of teams_features",
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    featuregroup_version=2
)

Running sql: use project_bde67d4d2ef6dcfc_featurestore
Feature group created successfully

In [20]:
assert "games_features_1" in featurestore.get_featuregroups()
assert "teams_features_1" in featurestore.get_featuregroups()
assert "season_scores_features_1" in featurestore.get_featuregroups()
assert "attendances_features_1" in featurestore.get_featuregroups()
assert "teams_features_spanish_1" in featurestore.get_featuregroups()
assert "teams_features_spanish_2" in featurestore.get_featuregroups()

##### Test Featurestore Utility Operations, 

- `featurestore.get_metadata()`,
- `featurestore.project_featurestore()`, 
- `featurestore.get_latest_featuregroup_version()`, 
- `featurestore.get_features_list()`

In [21]:
featurestore.get_featurestore_metadata(update_cache=True)

{u'trainingDatasets': [], u'featuregroups': [{u'lastComputed': None, u'featuresHistogram': {u'featureDistributions': [{u'frequencyDistribution': [{u'bin': u'37.75', u'frequency': 3}, {u'bin': u'3.45', u'frequency': 2}, {u'bin': u'1.0', u'frequency': 6}, {u'bin': u'20.6', u'frequency': 2}, {u'bin': u'15.7', u'frequency': 0}, {u'bin': u'40.2', u'frequency': 1}, {u'bin': u'5.9', u'frequency': 4}, {u'bin': u'27.95', u'frequency': 0}, {u'bin': u'47.55', u'frequency': 4}, {u'bin': u'30.4', u'frequency': 4}, {u'bin': u'23.05', u'frequency': 1}, {u'bin': u'8.35', u'frequency': 2}, {u'bin': u'35.3', u'frequency': 1}, {u'bin': u'25.5', u'frequency': 3}, {u'bin': u'10.8', u'frequency': 5}, {u'bin': u'13.25', u'frequency': 4}, {u'bin': u'18.15', u'frequency': 0}, {u'bin': u'32.85', u'frequency': 3}, {u'bin': u'45.1', u'frequency': 2}, {u'bin': u'42.65', u'frequency': 2}], u'statisticType': u'featureDistributions', u'featureName': u'away_team_id'}, {u'frequencyDistribution': [{u'bin': u'37.75', u'f

In [22]:
assert featurestore.project_featurestore() == hdfs.project_name() + "_featurestore"

In [23]:
assert featurestore.project_featurestore() in featurestore.get_project_featurestores()

In [24]:
assert len(featurestore.get_project_featurestores()) == 1

In [25]:
assert featurestore.get_latest_featuregroup_version("teams_features_spanish") == 2

In [26]:
assert featurestore.get_latest_featuregroup_version("teams_features") == 1

In [27]:
assert "away_team_id" in featurestore.get_features_list()

In [28]:
assert "home_team_id" in featurestore.get_features_list()

##### Test Read operations of Features and Feature Groups, 

- `featurestore.get_feature()`, 
- `featurestore.get_features()`, 
- `featurestore.get_featuregroup()`

In [29]:
tmp = featurestore.get_feature("team_budget")
assert tmp.count() == 50
assert len(tmp.columns) == 1
assert "team_budget" in tmp.columns

Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: SELECT team_budget FROM teams_features_1

In [30]:
tmp = featurestore.get_feature(
    "team_budget", 
    featurestore=featurestore.project_featurestore(), 
    featuregroup="teams_features", 
    featuregroup_version = 1,
    dataframe_type = "spark"
)
assert tmp.count() == 50
assert len(tmp.columns) == 1
assert "team_budget" in tmp.columns

Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: SELECT team_budget FROM teams_features_1

In [31]:
tmp = featurestore.get_featuregroup("teams_features")
assert tmp.count() == 50
assert len(tmp.columns) == 3
assert "team_budget" in tmp.columns
assert "team_id" in tmp.columns
assert "team_position" in tmp.columns

Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: SELECT * FROM teams_features_1

In [32]:
tmp = featurestore.get_featuregroup(
    "teams_features", 
    featurestore=featurestore.project_featurestore(), 
    featuregroup_version = 1,
    dataframe_type = "spark"
)
assert tmp.count() == 50
assert len(tmp.columns) == 3
assert "team_budget" in tmp.columns
assert "team_id" in tmp.columns
assert "team_position" in tmp.columns

Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: SELECT * FROM teams_features_1

In [33]:
features = ["team_budget", "average_attendance"]
tmp = featurestore.get_features(
    features
)
assert set(features) == set(tmp.columns)
assert tmp.count() == 50
assert len(tmp.columns) == len(features)

Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: SELECT team_budget, average_attendance FROM teams_features_1 JOIN attendances_features_1 ON teams_features_1.`team_id`=attendances_features_1.`team_id`

In [34]:
features = ["teams_features_1.team_budget", "attendances_features_1.average_attendance"]
tmp = featurestore.get_features(features)
assert set(["team_budget", "average_attendance"]) == set(tmp.columns)
assert tmp.count() == 50
assert len(tmp.columns) == len(features)

Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: SELECT teams_features_1.team_budget, attendances_features_1.average_attendance FROM teams_features_1 JOIN attendances_features_1 ON teams_features_1.`team_id`=attendances_features_1.`team_id`

In [35]:
features = ["team_budget", "average_attendance"]
tmp = featurestore.get_features(
    features,
    featurestore=featurestore.project_featurestore(),
    featuregroups_version_dict={
        "teams_features": 1, 
        "attendances_features": 1
    }
)
assert set(features) == set(tmp.columns)
assert tmp.count() == 50
assert len(tmp.columns) == len(features)

Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: SELECT team_budget, average_attendance FROM teams_features_1 JOIN attendances_features_1 ON teams_features_1.`team_id`=attendances_features_1.`team_id`

In [36]:
tmp = featurestore.get_features(
    features,
    featurestore=featurestore.project_featurestore(),
    featuregroups_version_dict={
        "teams_features": 1, 
        "attendances_features": 1
    },
    join_key = "team_id",
    dataframe_type = "spark"
)
assert set(features) == set(tmp.columns)
assert tmp.count() == 50
assert len(tmp.columns) == len(features)

Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: SELECT team_budget, average_attendance FROM teams_features_1 JOIN attendances_features_1 ON teams_features_1.`team_id`=attendances_features_1.`team_id`

In [37]:
features = ["team_budget", "average_attendance",
    "team_position", "sum_attendance"
    ]
tmp = featurestore.get_features(
   features
)
assert set(features) == set(tmp.columns)
assert tmp.count() == 50
assert len(tmp.columns) == len(features)

Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: SELECT team_budget, average_attendance, team_position, sum_attendance FROM teams_features_1 JOIN attendances_features_1 ON teams_features_1.`team_id`=attendances_features_1.`team_id`

In [38]:
features = ["team_budget", "team_id"]
tmp = featurestore.get_features(
    features,
    featuregroups_version_dict = {
        "teams_features" : 1
    }
)
assert set(features) == set(tmp.columns)
assert tmp.count() == 50
assert len(tmp.columns) == len(features)

Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: SELECT team_budget, team_id FROM teams_features_1

In [39]:
tmp = featurestore.sql(
    "SELECT team_budget, score " \
    "FROM teams_features_1 JOIN games_features_1 ON " \
    "games_features_1.home_team_id = teams_features_1.team_id")
features = ['team_budget', 'score']
assert set(features) == set(tmp.columns)
assert tmp.count() == 49
assert len(tmp.columns) == len(features)

Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: SELECT team_budget, score FROM teams_features_1 JOIN games_features_1 ON games_features_1.home_team_id = teams_features_1.team_id

In [40]:
tmp = featurestore.sql("SELECT * FROM teams_features_1 WHERE team_position < 5")
assert len(tmp.columns) == 3
assert "team_budget" in tmp.columns
assert "team_id" in tmp.columns
assert "team_position" in tmp.columns
for x in tmp.toPandas()["team_position"].values:
    assert x < 5

Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: SELECT * FROM teams_features_1 WHERE team_position < 5

In [41]:
tmp = featurestore.sql("SELECT * FROM teams_features_1 WHERE team_position < 5",
                featurestore=featurestore.project_featurestore(), 
                 dataframe_type = "spark")
assert len(tmp.columns) == 3
assert "team_budget" in tmp.columns
assert "team_id" in tmp.columns
assert "team_position" in tmp.columns
for x in tmp.toPandas()["team_position"].values:
    assert x < 5

Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: SELECT * FROM teams_features_1 WHERE team_position < 5

#####  Test Insert Operations in Existing Feature Groups, `featurestore.insert_into_featuregroup()`

In [42]:
sqlContext = SQLContext(spark.sparkContext)
schema = StructType([StructField("equipo_id", IntegerType(), True),
                     StructField("equipo_presupuesto", FloatType(), True),
                     StructField("equipo_posicion", IntegerType(), True)
                        ])
sample_df = sqlContext.createDataFrame([(999, 41251.52, 1), (998, 1319.4, 8), (997, 21219.1, 2)], schema)
insert_count = sample_df.count()
assert insert_count == 3

In [43]:
spanish_team_features_df = featurestore.get_featuregroup(
    "teams_features_spanish")
pre_insert_count = spanish_team_features_df.count()
assert pre_insert_count == 50

Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: SELECT * FROM teams_features_spanish_1

In [44]:
featurestore.insert_into_featuregroup(
    sample_df, 
    "teams_features_spanish", 
    descriptive_statistics=False, 
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False
)
spanish_team_features_df_updated = featurestore.get_featuregroup(
    "teams_features_spanish")

after_insert_count = spanish_team_features_df_updated.count()
assert after_insert_count == 53

Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: SELECT * FROM teams_features_spanish_1

In [45]:
featurestore.insert_into_featuregroup(
    sample_df, 
    "teams_features_spanish", 
    featurestore=featurestore.project_featurestore(), 
    featuregroup_version=1, 
    mode="append",
    descriptive_statistics=False, 
    feature_correlation=False, 
    feature_histograms=False,
    cluster_analysis=False, 
    stat_columns=None, 
    num_bins=20, 
    corr_method='pearson',
    num_clusters=5
)

after_insert_count2 = featurestore.get_featuregroup("teams_features_spanish").count()
assert after_insert_count2 == 56

Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: SELECT * FROM teams_features_spanish_1

In [46]:
featurestore.insert_into_featuregroup(
    sample_df, 
    "teams_features_spanish",
    descriptive_statistics=False, 
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    mode="overwrite")

count_after_overwrite = featurestore.get_featuregroup("teams_features_spanish").count()
assert count_after_overwrite == 3

Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: SELECT * FROM teams_features_spanish_1

##### Test integration of feature store with Numpy, Pandas and plain Python

In [47]:
pandas_df = featurestore.get_features(["team_budget", "average_attendance"], dataframe_type="pandas")
assert "team_budget" in pandas_df.columns.values
assert "average_attendance" in pandas_df.columns.values
assert len(pandas_df) == 50
assert len(pandas_df.columns.values) == 2
assert isinstance(pandas_df, pd.DataFrame)

Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: SELECT team_budget, average_attendance FROM teams_features_1 JOIN attendances_features_1 ON teams_features_1.`team_id`=attendances_features_1.`team_id`

In [48]:
numpy_df = featurestore.get_features(["team_budget", "average_attendance"], 
                                      dataframe_type="numpy")
assert numpy_df.shape[0] == 50
assert numpy_df.shape[1] == 2
assert isinstance(numpy_df, np.ndarray)

Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: SELECT team_budget, average_attendance FROM teams_features_1 JOIN attendances_features_1 ON teams_features_1.`team_id`=attendances_features_1.`team_id`

In [49]:
python_df = featurestore.get_features(["team_budget", "average_attendance"], 
                                      dataframe_type="python")
assert len(python_df) == 50
assert isinstance(python_df, list)

Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: SELECT team_budget, average_attendance FROM teams_features_1 JOIN attendances_features_1 ON teams_features_1.`team_id`=attendances_features_1.`team_id`

In [50]:
spark_df = featurestore.get_features(["team_budget", "average_attendance"], 
                                      dataframe_type="spark")
assert spark_df.count() == 50
assert isinstance(spark_df, DataFrame)

Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: SELECT team_budget, average_attendance FROM teams_features_1 JOIN attendances_features_1 ON teams_features_1.`team_id`=attendances_features_1.`team_id`

In [51]:
# Let's rename the columns to differentiate this feature group from existing ones in the feature store
pandas_df.columns = ["team_budget_test", "average_attendance_test"]

featurestore.create_featuregroup(
    pandas_df,
    "pandas_test_example",
    description="test featuregroup created from pandas dataframe",
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False
)
assert "pandas_test_example_1" in featurestore.get_featuregroups()

Running sql: use project_bde67d4d2ef6dcfc_featurestore
Feature group created successfully

In [52]:
count_pre_pandas_insert_overwrite = featurestore.get_featuregroup("pandas_test_example").count()
featurestore.insert_into_featuregroup(
    pandas_df, 
    "pandas_test_example",
    descriptive_statistics=False, 
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    mode="overwrite")
count_after_pandas_insert_overwrite = featurestore.get_featuregroup("pandas_test_example").count()
assert count_pre_pandas_insert_overwrite == count_after_pandas_insert_overwrite

Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: SELECT * FROM pandas_test_example_1
Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: SELECT * FROM pandas_test_example_1

In [53]:
featurestore.create_featuregroup(
    numpy_df,
    "numpy_test_example",
    description="test featuregroup created from numpy matrix",
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False
)
assert "numpy_test_example_1" in featurestore.get_featuregroups()

Running sql: use project_bde67d4d2ef6dcfc_featurestore
Feature group created successfully

In [54]:
numpy_test_df_count_pre_insert_overwrite = featurestore.get_featuregroup("numpy_test_example", dataframe_type="spark").count()
featurestore.insert_into_featuregroup(
    numpy_df, 
    "numpy_test_example",
    descriptive_statistics=False, 
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    mode="overwrite")
numpy_test_df_count_after_insert_overwrite = featurestore.get_featuregroup("numpy_test_example", dataframe_type="spark").count()
assert numpy_test_df_count_pre_insert_overwrite == numpy_test_df_count_pre_insert_overwrite

Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: SELECT * FROM numpy_test_example_1
Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: SELECT * FROM numpy_test_example_1

In [55]:
featurestore.create_featuregroup(
    python_df,
    "python_test_example",
    description="test featuregroup created from python 2D list",
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False
)

python_test_df_count_pre_insert_overwrite = featurestore.get_featuregroup("python_test_example", dataframe_type="spark").count()
assert "python_test_example_1" in featurestore.get_featuregroups()

Running sql: use project_bde67d4d2ef6dcfc_featurestore
Feature group created successfully
Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: SELECT * FROM python_test_example_1

In [56]:
featurestore.insert_into_featuregroup(
    python_df, 
    "python_test_example",
    descriptive_statistics=False, 
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    mode="overwrite")

python_test_df_count_after_insert_overwrite = featurestore.get_featuregroup("python_test_example", dataframe_type="spark").count()
assert python_test_df_count_pre_insert_overwrite == python_test_df_count_after_insert_overwrite

Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: SELECT * FROM python_test_example_1

##### Test update Feature Store Statistics `featurestore.update_featuregroup_stats()`

In [57]:
featurestore.update_featuregroup_stats("teams_features")

Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: SELECT * FROM teams_features_1
computing descriptive statistics for : teams_features
computing feature correlation for: teams_features
computing feature histograms for: teams_features
computing cluster analysis for: teams_features

In [58]:
featurestore.update_featuregroup_stats(
    "teams_features", 
    featuregroup_version=1, 
    featurestore=featurestore.project_featurestore(), 
    descriptive_statistics=True,
    feature_correlation=True, 
    feature_histograms=True,
    cluster_analysis=True,
    stat_columns=None)

Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: SELECT * FROM teams_features_1
computing descriptive statistics for : teams_features
computing feature correlation for: teams_features
computing feature histograms for: teams_features
computing cluster analysis for: teams_features

##### Test Write Training Dataset Operations 

- `featurestore.get_latest_training_dataset_version()`
- `create_training_dataset()`

In [59]:
features_df = featurestore.get_features(
    ["team_budget", "average_attendance",
    "team_position"]
)
latest_version = featurestore.get_latest_training_dataset_version("team_position_prediction")

Running sql: use project_bde67d4d2ef6dcfc_featurestore
Running sql: SELECT team_budget, average_attendance, team_position FROM teams_features_1 JOIN attendances_features_1 ON teams_features_1.`team_id`=attendances_features_1.`team_id`

In [60]:
featurestore.create_training_dataset(
    features_df, "team_position_prediction",
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    training_dataset_version = 1
)

Training Dataset created successfully

In [61]:
featurestore.create_training_dataset(
    features_df, "team_position_prediction_csv",
    description="a dataset with features for football teams, used for training a model to predict league-position",
    featurestore=featurestore.project_featurestore(),
    data_format="csv",
    training_dataset_version= 1,
    job_name=None,
    dependencies=[],
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    stat_columns=None)

Training Dataset created successfully

In [62]:
featurestore.create_training_dataset(
    features_df, "team_position_prediction_tsv",
    description="a dataset with features for football teams, used for training a model to predict league-position",
    featurestore=featurestore.project_featurestore(),
    data_format="tsv",
    training_dataset_version=1,
    job_name=None,
    dependencies=[],
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    stat_columns=None)

Training Dataset created successfully

In [63]:
featurestore.create_training_dataset(
    features_df, "team_position_prediction_parquet",
    description="a dataset with features for football teams, used for training a model to predict league-position",
    featurestore=featurestore.project_featurestore(),
    data_format="parquet",
    training_dataset_version=1,
    job_name=None,
    dependencies=[],
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    stat_columns=None)

Training Dataset created successfully

In [64]:
featurestore.create_training_dataset(
    features_df, "team_position_prediction_orc",
    description="a dataset with features for football teams, used for training a model to predict league-position",
    featurestore=featurestore.project_featurestore(),
    data_format="orc",
    training_dataset_version=1,
    job_name=None,
    dependencies=[],
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    stat_columns=None)

Training Dataset created successfully

In [65]:
featurestore.create_training_dataset(
    features_df, "team_position_prediction_avro",
    description="a dataset with features for football teams, used for training a model to predict league-position",
    featurestore=featurestore.project_featurestore(),
    data_format="avro",
    training_dataset_version=1,
    job_name=None,
    dependencies=[],
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    stat_columns=None)

Training Dataset created successfully

In [66]:
featurestore.create_training_dataset(
    features_df, "team_position_prediction_hdf5",
    description="a dataset with features for football teams, used for training a model to predict league-position",
    featurestore=featurestore.project_featurestore(),
    data_format="hdf5",
    training_dataset_version=1,
    job_name=None,
    dependencies=[],
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    stat_columns=None)

Training Dataset created successfully

In [67]:
featurestore.create_training_dataset(
    features_df, "team_position_prediction_npy",
    description="a dataset with features for football teams, used for training a model to predict league-position",
    featurestore=featurestore.project_featurestore(),
    data_format="npy",
    training_dataset_version=1,
    job_name=None,
    dependencies=[],
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    stat_columns=None)

Training Dataset created successfully

In [68]:
# Petastorm is only supported in python 3
if sys.version_info[0] >= 3:
    PetastormSchema = Unischema('team_position_prediction_petastorm_schema', [
        UnischemaField('team_budget', np.float32, (), ScalarCodec(FloatType()), False),
        UnischemaField('average_attendance', np.float32, (), ScalarCodec(FloatType()), False),
        UnischemaField('team_position', np.int32, (), ScalarCodec(IntegerType()), False)
    ])

    petastorm_args = {
        "schema": PetastormSchema
    }

    featurestore.create_training_dataset(
        features_df, "team_position_prediction_petastorm",
        description="a dataset with features for football teams, used for training a model to predict league-position",
        featurestore=featurestore.project_featurestore(),
        data_format="petastorm",
        training_dataset_version=1,
        job_name=None,
        dependencies=[],
        descriptive_statistics=False,
        feature_correlation=False,
        feature_histograms=False,
        cluster_analysis=False,
        stat_columns=None,
        petastorm_args=petastorm_args
    )

Petastorm is only supported in python 3

In [3]:
tds = featurestore.get_training_datasets()
assert 'team_position_prediction_1' in tds
assert 'team_position_prediction_csv_1' in tds
assert 'team_position_prediction_tsv_1' in tds
assert 'team_position_prediction_parquet_1' in tds
assert 'team_position_prediction_orc_1' in tds
assert 'team_position_prediction_avro_1' in tds
assert 'team_position_prediction_hdf5_1'in tds
assert 'team_position_prediction_npy_1' in tds
if sys.version_info[0] >= 3:
    assert 'team_position_prediction_petastorm_1' in tds

##### Test Insert into an existing training dataset, `featurestore.insert_into_training_dataset()`

In [74]:
count_pre_insert = featurestore.get_training_dataset("team_position_prediction_csv").count()
featurestore.insert_into_training_dataset(
    features_df, 
    "team_position_prediction_csv",
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    training_dataset_version=featurestore.get_latest_training_dataset_version("team_position_prediction_csv")
)
count_after_insert = featurestore.get_training_dataset("team_position_prediction_csv").count()
assert count_pre_insert == count_after_insert # td only support overwrites

##### Test Training Dataset Utility Methods

- `featurestore.get_training_dataset_path()`
- `featurestore.get_training_dataset_tf_record_schema`

In [75]:
assert hdfs.project_path() in featurestore.get_training_dataset_path("team_position_prediction_csv")

In [76]:
assert hdfs.project_name() + "_Training_Datasets" in featurestore.get_training_dataset_path("team_position_prediction_csv")

In [77]:
assert "team_position_prediction_csv" in featurestore.get_training_dataset_path("team_position_prediction_csv")

In [78]:
tf_schema = featurestore.get_training_dataset_tf_record_schema("team_position_prediction")
assert tf_schema == {'team_budget': tf.FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 
                     'average_attendance': tf.FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 
                     'team_position': tf.FixedLenFeature(shape=[], dtype=tf.int64, default_value=None)}

In [79]:
features_df = featurestore.get_training_dataset("team_position_prediction")
tf_schema = featurestore.get_dataframe_tf_record_schema(features_df)
assert tf_schema == {'team_budget': tf.FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 
                     'average_attendance': tf.FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 
                     'team_position': tf.FixedLenFeature(shape=[], dtype=tf.int64, default_value=None)}

##### Test update Training Dataset stats

- `featurestore.update_training_dataset_stats()`

In [80]:
featurestore.update_training_dataset_stats("team_position_prediction")

computing descriptive statistics for : team_position_prediction
computing feature correlation for: team_position_prediction
computing feature histograms for: team_position_prediction
computing cluster analysis for: team_position_prediction

In [81]:
featurestore.update_training_dataset_stats(
    "team_position_prediction", 
    training_dataset_version=1, 
    featurestore=featurestore.project_featurestore(), 
    descriptive_statistics=True,
    feature_correlation=True, 
    feature_histograms=True,
    cluster_analysis=True,
    stat_columns=None)

computing descriptive statistics for : team_position_prediction
computing feature correlation for: team_position_prediction
computing feature histograms for: team_position_prediction
computing cluster analysis for: team_position_prediction

##### Test Read Training Datasets API `featurestore.get_training_dataset()`

In [82]:
cols = ['team_budget', 'average_attendance', 'team_position']
tmp = featurestore.get_training_dataset("team_position_prediction_csv")
assert set(tmp.columns) == set(cols)
assert tmp.count() == 50

In [83]:
tmp = featurestore.get_training_dataset("team_position_prediction_hdf5")
assert tmp.count() == 50

In [90]:
if sys.version_info[0] >= 3:
    tmp = featurestore.get_training_dataset("team_position_prediction_petastorm")
    assert set(tmp.columns) == set(cols)
    assert tmp.count() == 50

In [91]:
tmp = featurestore.get_training_dataset("team_position_prediction_avro")
assert set(tmp.columns) == set(cols)
assert tmp.count() == 50

In [92]:
tmp = featurestore.get_training_dataset("team_position_prediction_orc")
assert set(tmp.columns) == set(cols)
assert tmp.count() == 50

In [93]:
tmp = featurestore.get_training_dataset("team_position_prediction_tsv")
assert set(tmp.columns) == set(cols)
assert tmp.count() == 50

In [94]:
tmp = featurestore.get_training_dataset("team_position_prediction_npy")
assert tmp.count() == 50

In [95]:
tmp = featurestore.get_training_dataset("team_position_prediction_parquet")
assert set(tmp.columns) == set(cols)
assert tmp.count() == 50

##### Cleanup (Delete FS Contents so that next test run works the same)

In [20]:
# Delete feature groups
spark.sql('use ' + featurestore.project_featurestore())
for fg in featurestore.get_featuregroups():
    spark.sql("drop table " + fg)

DataFrame[]
DataFrame[]
DataFrame[]
DataFrame[]
DataFrame[]
DataFrame[]
DataFrame[]
DataFrame[]
DataFrame[]

In [21]:
# Delete training datasets
td_dir = hdfs.project_name() + "_Training_Datasets/"
for td in featurestore.get_training_datasets():
    hdfs.rmr(td_dir + td)

In [27]:
featurestore.get_featurestore_metadata(update_cache=True)
assert featurestore.get_featuregroups() == []
assert featurestore.get_training_datasets() == []

## Kafka Tests

##### Test default config 

- `kafka.get_default_config()`, 
- `kafka.get_security_protocol()`,
- `kafka.get_broker_endpoints_list()`

In [96]:
config = kafka.get_kafka_default_config()
assert "bootstrap.servers" in config
assert "security.protocol" in config
assert "ssl.ca.location" in config
assert "ssl.key.location" in config
assert "ssl.certificate.location" in config

In [97]:
assert len(kafka.get_security_protocol()) > 0
assert len(kafka.get_broker_endpoints_list()) > 0

## TLS Tests

##### Test access to TLS tokens

- `tls.get_key_store()`
- `tls.get_trust_store()`
- `tls.get_key_store_pwd()`
- `tls.get_trust_store_pwd()`
- `tls.get_client_certificate_location()`
- `tls.get_client_key_location()`
- `tls.get_ca_chain_location()`

In [98]:
assert len(tls.get_key_store()) > 0
assert len(tls.get_trust_store()) > 0
assert len(tls.get_key_store_pwd()) > 0
assert len(tls.get_trust_store_pwd()) > 0
assert len(tls.get_client_certificate_location()) > 0
assert len(tls.get_client_key_location()) > 0
assert len(tls.get_ca_chain_location()) > 0