In [None]:
import logging
import warnings

from pyspark.sql import SparkSession

logging.basicConfig()
logging.getLogger().setLevel(logging.INFO)
warnings.filterwarnings("ignore")

spark = SparkSession.builder \
    .master("local[4]") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()

spark.sparkContext.setLogLevel('ERROR')

## Execute an ML pipeline written with pyspark/SparkML and setup a view generator

We execute a complex [ML pipeline for product review classification](/edit/classify_amazonreviews_sparkml.py) written with pyspark. During execution, we track provenance and capture intermediates, in order to setup a `view_generator`, which allows to debug the pipeline data later.

In [2]:
from classify_amazonreviews_sparkml import run_pipeline

start_date = '2014-01-01'
split_date = '2014-12-01'

run_pipeline(spark, start_date, split_date)



Accuracy: 0.8175150408977219


                                                                                

In [4]:
from freamon.adapters.pyspark.provenance import from_trace

view_generator = from_trace()

INFO:root:Registering source 0 with columns: ['marketplace', 'customer_id', 'review_id', 'product_id', 'vine', 'third_party', 'review_headline', 'review_body', 'review_date', 'prov_id_source_0']
INFO:root:Registering source 1 with columns: ['product_id', 'product_parent', 'product_title', 'category_id', 'prov_id_source_1']
INFO:root:Registering source 2 with columns: ['id', 'category', 'prov_id_source_2']
INFO:root:Registering source 3 with columns: ['review_id', 'star_rating', 'helpful_votes', 'total_votes', 'prov_id_source_3']
INFO:root:Computing train features and predictions...
INFO:root:Computing test features and predictions...                            
INFO:root:Generating virtual train view via                                     


CREATE OR REPLACE VIEW _freamon_virtual_train_view AS 
  SELECT * EXCLUDE (prov_id_source_0, prov_id_source_1, prov_id_source_2, prov_id_source_3)
  FROM _freamon_train t
    JOIN _freamon_source_0_with_prov s0 ON
    s0.prov_id_source_0 = t.prov_

## Generate and materialize a view for data debugging

Next, we generate and materialize a view over the test labels and predictions of the pipeline, sliceable by the `category` and `star_rating` attributes from two input tables.

In [5]:
materialized_view = view_generator.test_view(
    sliceable_by=['category', 'star_rating'], 
    with_features=False, 
    with_y_true=True, 
    with_y_pred=True)

materialized_view

Unnamed: 0,category,star_rating,y_true,y_pred
0,Digital_Software,5,0,0.0
1,Digital_Software,4,0,0.0
2,Digital_Software,5,0,0.0
3,Digital_Video_Games,5,0,0.0
4,Digital_Software,5,0,0.0
...,...,...,...,...
29581,Digital_Video_Games,3,0,0.0
29582,Digital_Video_Games,5,1,0.0
29583,Digital_Software,3,1,1.0
29584,Digital_Video_Games,3,0,0.0


## Feed the materialized view into the fairlearn library to compute fairness metrics

The materializes view can directly be used by external data debugging libraries [FairLearn](https://fairlearn.org) library. We can for example compute the recall and false positive rate for different groups of reviews in the data (e.g., based on the product category and rating).

In [6]:
from fairlearn.metrics import MetricFrame, false_positive_rate
from sklearn.metrics import recall_score

materialized_view['rating'] = '(low rated)'
materialized_view['rating'].loc[materialized_view.star_rating.astype(int) > 3] = '(highly rated)'
materialized_view['category_and_rating'] = materialized_view.category + ' ' + materialized_view.rating

fairness_metrics = MetricFrame(
    metrics={ 'recall' : recall_score, 'false_positive_rate' : false_positive_rate },
    y_true=materialized_view.y_true,
    y_pred=materialized_view.y_pred,
    sensitive_features=materialized_view.category_and_rating
)

fairness_metrics.by_group

Unnamed: 0_level_0,recall,false_positive_rate
category_and_rating,Unnamed: 1_level_1,Unnamed: 2_level_1
Digital_Software (highly rated),0.259978,0.009565
Digital_Software (low rated),0.436495,0.030914
Digital_Video_Games (highly rated),0.274116,0.010854
Digital_Video_Games (low rated),0.514019,0.091127


## Data-debugging a la SliceFinder via an aggregation query

In addition, we can directly run SQL queries against a virtual internal view over the inputs and intermediates for model training and testing in the pipeline. 

We can for example compute the mean and variance of the cross-entropy loss of the pipeline predictions for different slices of the data, analogous to [SliceFinder](https://research.google/pubs/pub47966/).

In [7]:
view_generator.execute_query(
"""
SELECT 
    category,
    star_rating > 3 as highly_rated,
    AVG(-(y_true * log(y_pred + 0.00001) + (1 - y_true) * log(1.0 - y_pred + 0.00001))) AS avg_loss,
    VARIANCE(-(y_true * log(y_pred + 0.00001) + (1 - y_true) * log(1.0 - y_pred + 0.00001))) AS var_loss,    
    COUNT(*) as size
FROM _freamon_virtual_test_view    
GROUP BY GROUPING SETS ((star_rating > 3, category), (star_rating > 3), (category))
""")

Unnamed: 0,category,highly_rated,avg_loss,var_loss,size
0,Digital_Video_Games,True,0.491107,2.214586,10181
1,Digital_Video_Games,False,1.087249,4.255307,3679
2,Digital_Software,False,1.636103,5.504518,6708
3,Digital_Software,True,0.778439,3.286612,9018
4,,True,0.62607,2.73855,19199
5,,False,1.441703,5.130517,10387
6,Digital_Video_Games,,0.649347,2.825306,13860
7,Digital_Software,,1.14428,4.412321,15726
