diff --git a/client/verta/tests/test_deployment.py b/client/verta/tests/test_deployment.py index 6d33d766fb..5bc27668d0 100644 --- a/client/verta/tests/test_deployment.py +++ b/client/verta/tests/test_deployment.py @@ -15,6 +15,7 @@ import requests import verta +from verta._internal_utils import _histogram_utils from verta._internal_utils import _utils @@ -579,6 +580,195 @@ def test_dataframe(self, experiment_run, model_for_deployment): assert X_train.join(y_train).to_csv(index=False) == six.ensure_str(data_csv) +class TestHistogram: + @staticmethod + def assert_histograms_match_dataframe(histograms, df): + """Common assertions for this test suite.""" + np = pytest.importorskip("numpy") + + # features match + assert set(histograms['features'].keys()) == set(df.columns) + # all rows counted + assert histograms['total_count'] == len(df.index) + + for feature_name, histogram in histograms['features'].items(): + series = df[feature_name] + histogram_type = histogram['type'] + histogram_data = histogram['histogram'][histogram_type] + + # all data points counted + counts = histogram_data['count'] + assert sum(counts) == len(series) + + if histogram_type == "binary": + num_false = sum(~series) + num_true = sum(series) + + assert counts == [num_false, num_true] + elif histogram_type == "discrete": + buckets = histogram_data['bucket_values'] + + # buckets in ascending order + assert buckets == list(sorted(buckets)) + + # data within buckets + assert all(buckets[0] <= series) + assert all(series <= buckets[-1]) + + # appropriate leftmost and rightmost buckets + assert buckets[0] == series.min() + assert buckets[-1] == series.max() + + # all buckets have data + # NOTE: this might not be behavior that we want in the future + assert all(counts) + + # counts correct + for value, count in zip(buckets, counts): + assert sum(series == value) == count + elif histogram_type == "float": + limits = histogram_data['bucket_limits'] + + # limits in ascending order + assert limits == list(sorted(limits)) + + # data within limits + assert all(limits[0] <= series) + assert all(series <= limits[-1]) + + # appropriate leftmost and rightmost limits + assert np.isclose(limits[0], series.min()) + assert np.isclose(limits[-1], series.max()) + + # buckets equal in width + bucket_widths = np.diff(limits) + assert np.allclose(bucket_widths, bucket_widths[0]) + + # correct number of buckets + assert len(limits) == 11 + + # counts correct + bin_windows = list(zip(limits[:-1], limits[1:])) + for i, (l, r) in enumerate(bin_windows[:-1]): + assert sum((l <= series) & (series < r)) == counts[i] + assert sum(limits[-2] <= series) == counts[-1] + + def test_binary(self): + np = pytest.importorskip("numpy") + pd = pytest.importorskip("pandas") + num_rows = 90 + + df = pd.concat( + objs=[ + pd.Series(np.random.random(size=num_rows).round().astype(bool), name="A"), + pd.Series(np.random.random(size=num_rows).round().astype(bool), name="B"), + pd.Series(np.random.random(size=num_rows).round().astype(bool), name="C"), + ], + axis='columns', + ) + histograms = _histogram_utils.calculate_histograms(df) + + assert all( + histogram['type'] == "binary" + for histogram + in histograms['features'].values() + ) + self.assert_histograms_match_dataframe(histograms, df) + + def test_discrete(self): + np = pytest.importorskip("numpy") + pd = pytest.importorskip("pandas") + num_rows = 90 + + df = pd.concat( + objs=[ + pd.Series(np.random.randint(6, 12, size=num_rows), name="A"), + pd.Series(np.random.randint(-12, -6, size=num_rows), name="B"), + pd.Series(np.random.randint(-3, 3, size=num_rows), name="C"), + ], + axis='columns', + ) + histograms = _histogram_utils.calculate_histograms(df) + + assert all( + histogram['type'] == "discrete" + for histogram + in histograms['features'].values() + ) + self.assert_histograms_match_dataframe(histograms, df) + + def test_float(self): + np = pytest.importorskip("numpy") + pd = pytest.importorskip("pandas") + num_rows = 90 + + df = pd.concat( + objs=[ + pd.Series(np.random.normal(loc=9, size=num_rows), name="A"), + pd.Series(np.random.normal(scale=12, size=num_rows), name="B"), + pd.Series(np.random.normal(loc=-3, scale=6, size=num_rows), name="C"), + ], + axis='columns', + ) + histograms = _histogram_utils.calculate_histograms(df) + + assert all( + histogram['type'] == "float" + for histogram + in histograms['features'].values() + ) + self.assert_histograms_match_dataframe(histograms, df) + + def test_integration(self, experiment_run): + np = pytest.importorskip("numpy") + pd = pytest.importorskip("pandas") + + binary_col_name = 'binary col' + discrete_col_name = 'discrete col' + float_col_name = 'float col' + df = pd.concat( + objs=[ + pd.Series([True]*10 + [False]*20, name=binary_col_name), + pd.Series([0]*5 + [1]*10 + [2]*15, name=discrete_col_name), + pd.Series(range(30), name=float_col_name), + ], + axis='columns', + ) + histograms = _histogram_utils.calculate_histograms(df) + + experiment_run.log_training_data(df[[binary_col_name, discrete_col_name]], df[float_col_name]) + endpoint = "{}://{}/api/v1/monitoring/data/references/{}".format( + experiment_run._conn.scheme, + experiment_run._conn.socket, + experiment_run.id, + ) + response = _utils.make_request("GET", endpoint, experiment_run._conn) + _utils.raise_for_http_error(response) + retrieved_histograms = response.json() + + # features match + features = histograms['features'] + retrieved_features = retrieved_histograms['features'] + assert set(features.keys()) == set(retrieved_features.keys()) + + # binary matches + binary_hist = histograms['features'][binary_col_name]['histogram']['binary'] + retrieved_binary_hist = retrieved_histograms['features'][binary_col_name]['histogram']['binary'] + assert binary_hist['count'] == retrieved_binary_hist['count'] + + # discrete matches + discrete_hist = histograms['features'][discrete_col_name]['histogram']['discrete'] + retrieved_discrete_hist = retrieved_histograms['features'][discrete_col_name]['histogram']['discrete'] + assert discrete_hist['bucket_values'] == retrieved_discrete_hist['bucket_values'] + assert discrete_hist['count'] == retrieved_discrete_hist['count'] + + # float matches + float_hist = histograms['features'][float_col_name]['histogram']['float'] + retrieved_float_hist = retrieved_histograms['features'][float_col_name]['histogram']['float'] + assert all(np.isclose(float_hist['bucket_limits'], retrieved_float_hist['bucket_limits'])) + assert float_hist['count'] == retrieved_float_hist['count'] + + @pytest.mark.not_oss class TestDeploy: def test_auto_path_auto_token_deploy(self, experiment_run, model_for_deployment): diff --git a/client/verta/verta/_internal_utils/_histogram_utils.py b/client/verta/verta/_internal_utils/_histogram_utils.py new file mode 100644 index 0000000000..07b969852a --- /dev/null +++ b/client/verta/verta/_internal_utils/_histogram_utils.py @@ -0,0 +1,237 @@ +# -*- coding: utf-8 -*- + +from ..external import six + + +def calculate_histograms(df): + """ + Calculates histograms for the columns of `df`. + + Parameters + ---------- + df : pandas.DataFrame + Data to be binned. + + Returns + ------- + histograms : dict + + """ + histograms = {'total_count': len(df.index), 'features': {}} + for colname in df: + histogram = calculate_single_histogram(df[colname]) + histograms['features'][str(colname)] = histogram # TODO: directly store non-str column names + + return histograms + + +def calculate_single_histogram(data): + """ + Calculates a histogram for `data`. + + Parameters + ---------- + data : pandas.Series + Data to be binned. + + Returns + ------- + histogram : dict + + """ + try: # binary + return calculate_binary_histogram(data) + except HistogramError: + pass + + try: # discrete + return calculate_discrete_histogram(data) + except HistogramError: + pass + + # continuous + return calculate_float_histogram(data) + +def calculate_binary_histogram(data): + """ + Calculates a histogram for binary `data`. + + Parameters + ---------- + data : pandas.Series + Binary data to be binned. + + Returns + ------- + histogram : dict + + Raises + ------ + HistogramError + If a binary histogram cannot be calculated from `data`. + + """ + values = data.values.tolist() + + zeros = 0 + ones = 0 + for value in values: + if isinstance(value, bool): + if value == False: + zeros += 1 + continue + elif value == True: + ones += 1 + continue + + if isinstance(value, six.string_types): + # handle bool-like strings + if value.lower() == "false": + zeros += 1 + continue + elif value.lower() == "true": + ones += 1 + continue + + # handle num-like strings (falls through to numeric case) + try: + value = float(value) + except ValueError: + pass + + if isinstance(value, (six.integer_types, float)): + if value == 0: + zeros += 1 + continue + elif value == 1: + ones += 1 + continue + + # unsupported value + raise HistogramError("invalid binary value {}".format(value)) + + return { + 'histogram': { + 'binary': { + 'count': [zeros, ones], + }, + }, + 'type': "binary", + } + +def calculate_discrete_histogram(data): + """ + Calculates a histogram for discrete `data`. + + Parameters + ---------- + data : pandas.Series of int + Discrete data to be binned. + + Returns + ------- + histogram : dict + + Raises + ------ + HistogramError + If a discrete histogram cannot be calculated from `data`. + + """ + value_counts = data.value_counts().sort_index() + values = value_counts.index.tolist() + counts = value_counts.values.tolist() + + # reject non-numbers + try: + values = list(map(float, values)) + except ValueError: + raise HistogramError( + "values must be castable to numbers" + ) + + # reject non-integral floats + if not all(value.is_integer() for value in values): + raise HistogramError( + "values must be integers" + ) + values = list(map(int, values)) + + # heuristic: reject if too many values + if len(values) > 10: + raise HistogramError( + "got {} possible discrete values but heuristic says the maximum is 10".format(len(values)) + ) + + # heuristic: reject if counts don't seem high enough + if value_counts.mean() < 10: # `value_counts` instead of `counts` for mean() method + raise HistogramError( + "heuristic says that each discrete value should average at least 10 appearances" + ) + + return { + 'histogram': { + 'discrete': { + 'bucket_values': values, + 'count': counts, + }, + }, + 'type': "discrete", + } + +def calculate_float_histogram(data, num_bins=10): + """ + Calculates a histogram for continuous `data`. + + Parameters + ---------- + data : pandas.Series of float + Continuous data to be binned. + num_bins : int, default 10 + Number of bins to use. + + Returns + ------- + histogram : dict + + """ + values = data.values.tolist() + + # reject non-numbers + try: + values = list(map(float, values)) + except ValueError: + raise TypeError( + "unable to generate histogram from non-numeric column {}".format(data.name) + ) + + # calculate bin boundaries + start, stop = min(values), max(values) + space = (stop - start)/num_bins + bin_boundaries = [start + space*i for i in range(num_bins)] + # ensure last bin covers max value + bin_boundaries.append(stop) + + # fit `data` into bins + reference_counts = [] + bin_windows = list(zip(bin_boundaries[:-1], bin_boundaries[1:])) + for l, r in bin_windows[:-1]: # handle last bin shortly + count = len([value for value in values if l <= value < r]) + reference_counts.append(count) + # ensure last bin includes max value + count = len([value for value in values if bin_boundaries[-2] <= value]) + reference_counts.append(count) + + return { + 'histogram': { + 'float': { + 'bucket_limits': bin_boundaries, + 'count': reference_counts, + }, + }, + 'type': "float", + } + + +class HistogramError(TypeError): # TODO: move to exceptions submodule + pass diff --git a/client/verta/verta/client.py b/client/verta/verta/client.py index c88b4ca0a5..3db9c4a998 100644 --- a/client/verta/verta/client.py +++ b/client/verta/verta/client.py @@ -39,6 +39,7 @@ from ._internal_utils import _artifact_utils from ._internal_utils import _config_utils from ._internal_utils import _git_utils +from ._internal_utils import _histogram_utils from ._internal_utils import _pip_requirements_utils from ._internal_utils import _utils @@ -3586,11 +3587,15 @@ def log_training_data(self, train_features, train_targets, overwrite=False): train_df = train_features.join(train_targets) - tempf = tempfile.TemporaryFile('w+') - train_df.to_csv(tempf, index=False) - tempf.seek(0) + histograms = _histogram_utils.calculate_histograms(train_df) - self._log_artifact("train_data", tempf, _CommonService.ArtifactTypeEnum.DATA, 'csv', overwrite=overwrite) + endpoint = "{}://{}/api/v1/monitoring/data/references/{}".format( + self._conn.scheme, + self._conn.socket, + self.id, + ) + response = _utils.make_request("PUT", endpoint, self._conn, json=histograms) + _utils.raise_for_http_error(response) def fetch_artifacts(self, keys): """