In [1]:
import prometheus_client
import pandas as pd

In [2]:
registry = prometheus_client.registry.REGISTRY

In [3]:
print(prometheus_client.exposition.generate_latest(registry).decode())

# HELP python_gc_objects_collected_total Objects collected during gc
# TYPE python_gc_objects_collected_total counter
python_gc_objects_collected_total{generation="0"} 1146.0
python_gc_objects_collected_total{generation="1"} 818.0
python_gc_objects_collected_total{generation="2"} 0.0
# HELP python_gc_objects_uncollectable_total Uncollectable object found during GC
# TYPE python_gc_objects_uncollectable_total counter
python_gc_objects_uncollectable_total{generation="0"} 0.0
python_gc_objects_uncollectable_total{generation="1"} 0.0
python_gc_objects_uncollectable_total{generation="2"} 0.0
# HELP python_gc_collections_total Number of times this generation was collected
# TYPE python_gc_collections_total counter
python_gc_collections_total{generation="0"} 249.0
python_gc_collections_total{generation="1"} 22.0
python_gc_collections_total{generation="2"} 2.0
# HELP python_info Python platform information
# TYPE python_info gauge
python_info{implementation="CPython",major="3",minor="7",patchl

In [4]:
class HistogramError(TypeError):
    pass
def process_name(name):
    return name.replace("-", "_").replace(">", "_")

In [5]:
import six

class BinaryHistogram:
    def __init__(self, name, reference=None):
        self.name = name
        self.counter = prometheus_client.Counter("data_monitoring_binary_histogram_"+process_name(name), "", ["value", "type"])
        self.counter.labels("false", "live")
        self.counter.labels("true", "live")
        if reference:
            self.counter.labels("false", "reference").inc(reference[0])
            self.counter.labels("true", "reference").inc(reference[1])
        
    @staticmethod
    def from_list(name, values):
        zeros = 0
        ones = 0
        for value in values:
            if isinstance(value, bool):
                if value == False:
                    zeros += 1
                    continue
                elif value == True:
                    ones += 1
                    continue

            if isinstance(value, six.string_types):
                # handle bool-like strings
                if value.lower() == "false":
                    zeros += 1
                    continue
                elif value.lower() == "true":
                    ones += 1
                    continue

                # handle num-like strings (falls through to numeric case)
                try:
                    value = float(value)
                except ValueError:
                    pass

            if isinstance(value, (six.integer_types, float)):
                if value == 0:
                    zeros += 1
                    continue
                elif value == 1:
                    ones += 1
                    continue

            # unsupported value
            raise HistogramError("invalid binary value {}".format(value))
            
        return BinaryHistogram(name, reference=[zeros, ones])
        
    def observe(self, val):
        if val:
            self.counter.labels("true", "live").inc()
        else:
            self.counter.labels("false", "live").inc()

In [6]:
class FloatHistogram:
    def __init__(self, name, buckets, reference=None):
        self.name = name
        self.histogram = prometheus_client.Histogram("data_monitoring_float_histogram_"+process_name(name), "", ["type"], buckets=buckets)
        self.histogram.labels("live")
        if reference:
            for index, val in enumerate(reference):
                for _ in range(val):
                    self.histogram.labels("reference").observe(buckets[index+1])
        
    @staticmethod
    def from_list(name, values, num_bins=10):
        try:
            values = list(map(float, values))
        except ValueError:
            raise TypeError(
                "unable to generate histogram from non-numeric column {}".format(data.name)
            )

        # calculate bin boundaries
        start, stop = min(values), max(values)
        space = (stop - start)/num_bins
        bin_boundaries = [start + space*i for i in range(num_bins)]
        # ensure last bin covers max value
        bin_boundaries.append(stop)

        # fit `data` into bins
        reference_counts = []
        bin_windows = list(zip(bin_boundaries[:-1], bin_boundaries[1:]))
        for l, r in bin_windows[:-1]:  # handle last bin shortly
            count = len([value for value in values if l <= value < r])
            reference_counts.append(count)
        # ensure last bin includes max value
        count = len([value for value in values if bin_boundaries[-2] <= value])
        reference_counts.append(count)
        
        return FloatHistogram(name, bin_boundaries, reference=reference_counts)
        
    def observe(self, val):
        self.histogram.labels("live").observe(val)

In [7]:
class Histogram:
    @staticmethod
    def from_dataframe(df):
        features = {}
        for column, vals in df.iteritems():
            features[column] = Histogram.from_series(column, vals)
        return Histogram(features)
    
    @staticmethod
    def from_series(name, series):
        try:
            return BinaryHistogram.from_list(name, series.tolist())
        except HistogramError:
            pass
        
        return FloatHistogram.from_list(name, series.tolist())
    
    def __init__(self, feature_histograms):
        self.feature_histograms = feature_histograms
        
    def observe(self, df):
        for column, series in df.iteritems():
            h = self.feature_histograms[column]
            for val in series.tolist():
                h.observe(val)

In [8]:
# import pandas as pd
# d = {'col1': [i % 2 for i in range(10)], 'col2': [i for i in range(10)]}
# df = pd.DataFrame(data=d)
# print(df)

In [9]:
# h = Histogram.from_dataframe(df)
# h.observe(df)
# h.observe(df)

In [10]:
df_train = pd.read_csv("census/census-train.csv")

In [11]:
df_train.head()

Unnamed: 0,age,capital-gain,capital-loss,hours-per-week,workclass_local-gov,workclass_private,workclass_self-emp-inc,workclass_self-emp-not-inc,workclass_state-gov,workclass_without-pay,...,occupation_handlers-cleaners,occupation_machine-op-inspct,occupation_other-service,occupation_priv-house-serv,occupation_prof-specialty,occupation_protective-serv,occupation_sales,occupation_tech-support,occupation_transport-moving,>50k
0,44,0,0,40,0,1,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,21,0,0,40,0,1,0,0,0,0,...,0,0,1,0,0,0,0,0,0,0
2,53,7298,0,60,0,1,0,0,0,0,...,0,0,0,0,0,0,1,0,0,1
3,49,0,0,40,0,1,0,0,0,0,...,0,0,1,0,0,0,0,0,0,0
4,53,0,1485,40,0,1,0,0,0,0,...,0,0,0,0,0,0,1,0,0,1


In [12]:
# from verta.monitoring import BinaryHistogram, Histogram

class Monitor:
    def __init__(self, df):
        self.feature_histograms = Histogram.from_dataframe(df)
        self.missing_features = {}
        for column, vals in df.iteritems():
            missing = vals.isnull().sum()
            present = vals.shape[0] - missing
            self.missing_features[column] = BinaryHistogram(name="missing_"+column, reference=[present, missing])
            
    def process(self, inputs, outputs):
        inputs = pd.DataFrame.from_dict(inputs, orient="columns")
        outputs = pd.DataFrame.from_dict(outputs, orient="columns")
        values = inputs.join(outputs)
        self.feature_histograms.observe(values)
        
        for feature, hist in self.missing_features.items():
            for val in values[feature].isnull().tolist():
                hist.observe(val)

In [13]:
mon = Monitor(df_train)

In [14]:
X_train = df_train.iloc[:,:-1]
y_train = df_train.iloc[:, -1]
mon.process(X_train, y_train)

In [15]:
print(prometheus_client.exposition.generate_latest(registry).decode())

# HELP python_gc_objects_collected_total Objects collected during gc
# TYPE python_gc_objects_collected_total counter
python_gc_objects_collected_total{generation="0"} 1351.0
python_gc_objects_collected_total{generation="1"} 858.0
python_gc_objects_collected_total{generation="2"} 0.0
# HELP python_gc_objects_uncollectable_total Uncollectable object found during GC
# TYPE python_gc_objects_uncollectable_total counter
python_gc_objects_uncollectable_total{generation="0"} 0.0
python_gc_objects_uncollectable_total{generation="1"} 0.0
python_gc_objects_uncollectable_total{generation="2"} 0.0
# HELP python_gc_collections_total Number of times this generation was collected
# TYPE python_gc_collections_total counter
python_gc_collections_total{generation="0"} 262.0
python_gc_collections_total{generation="1"} 23.0
python_gc_collections_total{generation="2"} 2.0
# HELP python_info Python platform information
# TYPE python_info gauge
python_info{implementation="CPython",major="3",minor="7",patchl