# Distributed Profiling of Model Features with Whylogs & Fugue

It is a usual practice in the Machine Learning worls to log incoming model inference requests and outgoing predictions. These logs are then processed and aggregated later for various monitoring and drift detection purposes. However, consuming this raw data presents several pain points:
+ Machine Learning models vary widely in the number and nature of features and predictions. Some have 10 features and emit probability scores while others may have 30 features and emit a ranking. 
+ They also differ significantly in the type of features, with some having more categorical features and others having more numerical features.

It is imperative for us to devise a uniform way of processing them. We cannot have a specific monitoring logic for each model. 

In this tutorial we show how to use [Whylogs](https://whylabs.ai/whylogs) to profile the features and predictions and extract only the essential metrics from these profiles, regardless of the scale of the data.

The purposes of profiling are:
+ To normalize and compress metric data while retaining maximal information.
+ We can unify data from totally different models and process them using the same pipeline in the following step.
+ The subsequent steps will only need to handle purely numerical time series.
+ Significantly reduce the scale of the problem, so the compute can be more efficient and cost effective.

We also use the open source framework called [Fugue](https://fugue-tutorials.readthedocs.io/index.html) for its excellent abstraction layer that unifies the computing logic over Pandas, Spark, Ray or Dask.One of Fugue's most popular features is the ability to use a simple Python function call to distribute logic across many partitions of a larger dataframe. Users can provide functions with type-annotated inputs and outputs, and Fugue then converts the data based on the type annotations. This makes the custom logic independent from Fugue and Spark, removing the need for any pre-existing knowledge.

![](images/scale-up-ad.png)

In [None]:
import seaborn as sns
from matplotlib import pyplot as plt

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

# this allows plots to appear directly in the notebook
%matplotlib inline
%config InlineBackend.figure_format = 'retina'

In [None]:
import pandas as pd

pd.set_option('display.max_columns', 50)
pd.set_option('display.max_colwidth', 100)

In [None]:
demo_df = pd.read_parquet('addemo23/demo_raw_data_20.parquet')

In [None]:
demo_df.shape

In [None]:
demo_df.head(5)

## Load Model Feature and Prediction Logs

### Extract Features and Predictions from model logs

In [None]:
import json
import pandas as pd

def extract_features(df: pd.DataFrame) -> pd.DataFrame:
    json_str = "[" + (",".join(df.features)) + "]"
    feature_df = pd.DataFrame(json.loads(json_str))
    #feature_df = feature_df.reset_index(drop=True)
    return feature_df[sorted(feature_df.columns)]

In [None]:
extracted_features_df = extract_features(demo_df)

In [None]:
extracted_features_df.head(5)

In [None]:
extracted_features_df.shape, demo_df.shape

In [None]:
pd.concat([demo_df, extracted_features_df], axis=1, ignore_index=True).head(5)

### A unit function to work on a partition of data

In [None]:
import json
import pandas as pd

def extract_features(model_logs_df: pd.DataFrame) -> pd.DataFrame:
    json_str = "[" + (",".join(model_logs_df.features)) + "]"
    extracted_features_df = pd.DataFrame(json.loads(json_str))
    extracted_features_df = extracted_features_df[sorted(extracted_features_df.columns)]
    model_logs_df['occurred_at'] = model_logs_df['occurred_at'].apply(lambda x: x.replace(microsecond=0))
    model_logs_df['ds'] = model_logs_df['occurred_at'].apply(lambda x: x.strftime("%Y-%m-%d"))
    model_logs_df['hour'] = model_logs_df['occurred_at'].apply(lambda x: x.hour)
    #return pd.merge(model_logs_df[['occurred_at', 'ds', 'hour', 'model_name', 'version', 'predictions']], feature_df, left_index=True, right_index=True)
    features_df = pd.concat([model_logs_df[['occurred_at', 'ds', 'hour', 'model_name', 'version', 'predictions']], extracted_features_df], axis=1, ignore_index=True)
    features_df.columns = ['occurred_at', 'ds', 'hour', 'model_name', 'version', 'predictions', 'feature_1', 'feature_2', 'feature_3', 'feature_4', 'feature_5', 'feature_6']
    return features_df

In [None]:
features_df = extract_features(demo_df)

In [None]:
features_df.shape

In [None]:
features_df.head(5)

In [None]:
features_df.tail(5)

In [None]:
features_df.dtypes

In [None]:
len(features_df.ds.unique())

In [None]:
features_df.hour.unique()

In [None]:
features_df[(features_df['ds'] == '2023-02-10') & (features_df['hour'] == 5)].head(5)

### !!PAUSE!! Questions ?

### Generate Whylogs Profiles - For month of Feb and Mar

In [None]:
import json
import numpy as np

import whylogs as why
from whylogs import DatasetProfileView

In [None]:
feb_test_df = features_df[(features_df['ds'] == '2023-02-10') & (features_df['hour'] == 5)]

In [None]:
feb_test_df.head(5)

In [None]:
feb_whylogs_prof = why.log(feb_test_df[['feature_5', 'feature_6']]).view()

In [None]:
mar_test_df = features_df[(features_df['ds'] == '2023-03-10') & (features_df['hour'] == 5)]

In [None]:
mar_test_df.head(5)

In [None]:
mar_whylogs_prof = why.log(mar_test_df[['feature_5', 'feature_6']]).view()

In [None]:
feb_whylogs_prof.to_pandas()

In [None]:
mar_whylogs_prof.to_pandas()

### Visualize Whylogs Profiles

In [None]:
from whylogs.viz import NotebookProfileVisualizer

from whylogs.viz.utils.histogram_calculations import histogram_from_view
from whylogs.viz.utils.frequent_items_calculations import frequent_items_from_view

In [None]:
visualization = NotebookProfileVisualizer()
visualization.set_profiles(target_profile_view=feb_whylogs_prof, reference_profile_view=mar_whylogs_prof)

In [None]:
visualization.double_histogram(feature_name="feature_6")

### Serialize Whylogs Profiles

In [None]:
feb_whylogs_prof.serialize()[0:100]

### !!PAUSE!! Questions ?

### Generate Hourly Profiles using Fugue

Showing profiling and serializing in one place.

### A unit function to work on a partition of data

In [None]:
import json
import pandas as pd

def profile_features(features_df: pd.DataFrame) -> pd.DataFrame:
    features_buf = why.log(features_df[['feature_1', 'feature_2', 'feature_3', 'feature_4', 'feature_5', 'feature_6']]).view().serialize()
    predictions_buf = why.log(features_df[['predictions']]).view().serialize()
    profiled_features = features_df.head(1).copy()
    profiled_features = profiled_features.drop(['occurred_at'], axis=1)
    profiled_features = profiled_features.assign(features_profile=features_buf, predictions_profile = predictions_buf, sample_records=len(features_df))
    return profiled_features

In [None]:
feb_test_df.shape

In [None]:
profile_features(feb_test_df[(feb_test_df['ds'] == '2023-02-10') & (feb_test_df['hour'] == 5)])

### SCALE up the unit function to work on ALL partitions of data [Takes 3 mins]

In [None]:
from fugue import transform

hourly_feature_profile_df = transform(
    df=features_df, 
    using=profile_features, 
    schema="*-occurred_at+features_profile:binary,predictions_profile:binary,sample_records:long",
    partition=dict(by=['ds', 'hour', 'model_name', 'version']), 
    engine=None
)

In [None]:
hourly_feature_profile_df

### !!PAUSE!! Questions ?

### Merge Whylogs Profiles

We already have the Hourly profiles. Can we resuse that to get the daily profiles ? Can help incremental merging ? 

In [None]:
type(feb_whylogs_prof)

In [None]:
feb_whylogs_prof.to_pandas()

In [None]:
mar_whylogs_prof.to_pandas()

In [None]:
merged_prof_view = feb_whylogs_prof.merge(mar_whylogs_prof)
merged_prof_view.to_pandas()

In [None]:
merge_test_df = features_df[((features_df['ds'] == '2023-02-10') | (features_df['ds'] == '2023-03-10')) & (features_df['hour'] == 5)]

In [None]:
merge_test_df['ds'].unique()

In [None]:
merged_whylogs_prof = why.log(merge_test_df[['feature_5', 'feature_6']]).view()

In [None]:
merged_whylogs_prof.to_pandas()

### Generate Daily Profiles

### A unit function to work on a partition of data

In [None]:
from functools import reduce

def profile_reduce(hourly_profiles_df: pd.DataFrame) -> pd.DataFrame:
    features_buf = reduce(
        lambda acc, x: acc.merge(x),
        hourly_profiles_df.features_profile.apply(DatasetProfileView.deserialize),
    ).serialize()
    predictions_buf = reduce(
        lambda acc, x: acc.merge(x),
        hourly_profiles_df.predictions_profile.apply(DatasetProfileView.deserialize),
    ).serialize()
    records = hourly_profiles_df.sample_records.sum()
    daily_profiles_df = hourly_profiles_df.head(1).copy()
    daily_profiles_df = daily_profiles_df.drop(['hour'], axis=1)
    daily_profiles_df = daily_profiles_df.assign(features_profile=features_buf, predictions_profile = predictions_buf, sample_records=records)
    return daily_profiles_df

In [None]:
hourly_feature_profile_df

In [None]:
profile_reduce(hourly_feature_profile_df[hourly_feature_profile_df['ds'] == '2023-01-01'])

In [None]:
from fugue import transform

daily_feature_profile_df = transform(
    df=hourly_feature_profile_df, 
    using=profile_reduce, 
    schema="*-hour",
    partition=dict(by=['ds', 'model_name', 'version']), 
    engine=None
)

In [None]:
daily_feature_profile_df

### !!PAUSE!! Questions ?

### Scaling up with Fugue & Dask

#### DASK

In [None]:
from fugue import transform

hourly_feature_profile_df = transform(
    df=features_df, 
    using=profile_features, 
    schema="*-occurred_at+features_profile:binary,predictions_profile:binary,sample_records:long",
    partition=dict(by=['ds', 'hour', 'model_name', 'version']), 
    engine="dask"
)

In [None]:
hourly_feature_profile_df.compute().head(5)

Similarly, we can also use `engine="ray"` `engine="spark"` as the backend engines to scale it up seamlessly with `Ray` or `Spark`.