In [None]:
# Copyright 2021 Google LLC.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

## Design Goals
1. Prepare datasets for ML workflow.
1. Decide on and execute balancing and splitting strategies.
1. Create separate datasets for TRAIN/VALID/TEST.

## Requirements
1. Using ML Windowing Pipeline to create instances and features tables.

### Imports

In [None]:
# !pip install gps_building_blocks

In [None]:
import google.auth
from google.cloud import bigquery
from gps_building_blocks.cloud.utils import bigquery as bigquery_utils

import numpy as np
import pandas as pd

import matplotlib
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px

### Settings

In [None]:
# Prints all the outputs from cell (instead of using display each time).
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

# Plotting configuration.
%matplotlib inline
plt.rcParams["figure.figsize"] = [20, 20]

# Formatting to show full numbers and suppress scientific notation.
pd.set_option('display.float_format', lambda x: '%.10f' % x)

## Configuration

In [None]:
# Project we are working in.
PROJECT_ID = 'project-id'

# Name of the BigQuery dataset with mlwp tables.
DATASET_SOURCE = "dataset"
# Name of the destinaton dataset where we will add ml datasets.
DATASET_DESTINATION = "experiment"

# Initial mwlp tables.
TABLE_INSTANCES = 'instances_01'
TABLE_FEATURES = 'features_01'

# ML datasets
# These 3 tables will be created in dataset destination.
TABLE_DS_SPLIT = 'ds_split'
TABLE_DS_TRAINING_BALANCED = 'ds_training_balanced'
TABLE_DS_TEST_TABLE ='ds_test_table'

In [None]:
# Initialize BigQuery client.
bq_client = bigquery_utils.BigQueryUtils(project_id=PROJECT_ID)

## Check underlying data.

- check imbalance
- verify date to use as a cut off for the ml datasets based on positive rate trends
- look at the difference in instances and features dataset in terms of total records and labels
- determine right splitting strategy

### Check Instances.

In [None]:
sql = f"""
SELECT
  EXTRACT(DATE
  FROM
    (snapshot_ts)) AS effective_date,
  COUNT(*) AS total_records,
  COUNTIF(label=TRUE) AS count_positive,
  COUNTIF(label=FALSE) AS count_negative,
  SAFE_DIVIDE(COUNTIF(label=TRUE),COUNTIF(label=FALSE)) AS ratio_positive_to_negative
FROM
  `{DATASET_SOURCE}.{TABLE_INSTANCES}`
GROUP BY
  1
ORDER BY
  1 desc;
"""
print (sql)
df_raw = bq_client.run_query(sql).to_dataframe()
df_raw.head()

In [None]:
df_instances_check = df_raw.copy(deep=True)

In [None]:
fig = px.line(
    df_instances_check, x="effective_date",
    y="ratio_positive_to_negative",
    hover_name="count_positive",
    title='Positive instances',
    height=800)
fig.show()
df_instances_check.head(20)

### Check Features dataset.

In [None]:
# Check list of columns to investigate what features are available.
sql = f"""
SELECT
 *
FROM
  `{DATASET_SOURCE}.{TABLE_FEATURES}`
LIMIT 10;
"""
print (sql)
df_raw = bq_client.run_query(sql).to_dataframe()
df_raw.head()

In [None]:
df_raw.info()

### Proportion of positive instances in features table.

In [None]:
# Produce summary of labels (positive/negative).
sql = f"""
SELECT
  EXTRACT(DATE FROM snapshot_ts) AS effective_date,
  COUNT(*) AS total_records,
  COUNTIF(label=TRUE) AS count_positive,
  COUNTIF(label=FALSE) AS count_negative,
  SAFE_DIVIDE(COUNTIF(label=TRUE),
  COUNTIF(label=FALSE)) AS ratio_positive_to_negative
FROM
  `{DATASET_SOURCE}.{TABLE_FEATURES}`
GROUP BY
  1
ORDER BY
  1 DESC;
"""
print (sql)
df_raw = bq_client.run_query(sql).to_dataframe()
df_raw.head()

In [None]:
df_features_check = df_raw.copy(deep=True)

In [None]:
fig = px.line(df_features_check,
              x='effective_date',
              y='ratio_positive_to_negative',
              hover_name='count_positive',
              title='Positive instances',
              height=800)
fig.show()
df_features_check.head(20)

### Comparison of instances and features tables.
 
This allows us to assess the impact of applying a look back window when creating features. Instance table is using all previous sessions whereas features will use sessions inside the prediction window (for example 90 days). So that will reduce the number of users being considered and matched with the label. By applying a look back window we are potentially reducing the number of possible positive instances, at the same time we would expect the ratio of positive instances to number records to be better for more recent data, so the further in the past we go in general the smaller incremental number of positive instances we can get.

Here are few selected fields that can help with understanding the impact of selected look back period:

- `total_records_diff` - positive instances that are not in the features table
- `other_positive_instances_pct` - what percentage of positive instances we are not including due to selection of the look-back window
- `features_by_instances_positive_ratio` - how many times larger is the - positive ratio in features than the instances table
- `features_by_instances_total_records` - how many times smaller is the number of records in feature table than instances table

In [None]:
df_combined = df_instances_check.set_index('effective_date').join(
    df_features_check.set_index('effective_date'),
    how='outer',
    rsuffix='_features')
df_combined['total_records_diff'] = df_combined['total_records'] - df_combined[
    'total_records_features']
df_combined['count_positive_diff'] = df_combined[
    'count_positive'] - df_combined['count_positive_features']
df_combined['ratio_positive_to_negative_diff'] = df_combined[
    'ratio_positive_to_negative'] - df_combined[
        'ratio_positive_to_negative_features']
df_combined.head(5)

In [None]:
cols = ['total_records',
        'total_records_features',
        'count_positive',
        'count_positive_features',
        'total_records_diff',
        'count_positive_diff']

df_summary = pd.DataFrame(df_combined[cols].sum())
df_summary = df_summary.rename(columns={0: 'value'}).T
df_summary['positive_ratio'] = df_summary['count_positive'] / df_summary[
    'total_records']
df_summary['positive_ratio_features'] = df_summary[
    'count_positive_features'] / df_summary['total_records_features']
df_summary['features_by_instances_positive_ratio'] = df_summary[
    'positive_ratio_features'] / df_summary['positive_ratio']
df_summary['features_by_instances_total_records'] = df_summary[
    'total_records_features'] / df_summary['total_records']
df_summary['other_positive_instances_pct'] = df_summary[
    'count_positive_diff'] / df_summary['count_positive'] * 100
df_summary.T

## Create dataset with split.

There are multiple ways to split the dataset.

- random sample of rows
- deterministic random sample of rows (based on hashid)
- temporal split

### Create dataset with split on effective dates.

#### Get recent effective dates.

In [None]:
# Get last effective dates.
n_last_dates = 3
recent_dates = df_features_check['effective_date'].sort_values(
    ascending=False).head(n_last_dates).values

In [None]:
# Keep this if you want to use data driven values for last dates in the dataset.
test_dates = [str(x) for x in recent_dates]

# Define dates here if you want to overwrite with curated dates.
# It is useful to keep looking at a date that was used when
# evaluating original model so we can make sure all performs as expected.
# test_dates = ('2021-05-15', '2021-05-09')
if len(test_dates) == 1:
  test_dates = f"('{tuple(test_dates)[0]}')"
else:
  test_dates = tuple(test_dates)
test_dates

In [None]:
# Create the dataset if it doesn't exist.
# TODO(): Fix the dataset creation with bq_client(utils version).
dataset = bqclient.create_dataset(PROJECT_ID + '.' + DATASET_DESTINATION, exists_ok=True)
print(f'https://console.cloud.google.com/bigquery?project={PROJECT_ID}&p={PROJECT_ID}&d={DATASET_DESTINATION}&page=dataset')

### Create dataset with columns indicating allocation to TRAIN/VALIDATE/TEST.

In [None]:
sql = f"""
# Add additional columns to dataset to indicate which rows are in which part (train/validate/test).
# Compliant with automl split conventions.
CREATE OR REPLACE TABLE `{PROJECT_ID}.{DATASET_DESTINATION}.{TABLE_DS_SPLIT}` AS (
WITH
  ds_features_key AS (
  SELECT
    *,
    FARM_FINGERPRINT(user_id) AS key,
  FROM
    `{PROJECT_ID}.{DATASET_SOURCE}.{TABLE_FEATURES}`)
SELECT
*,
  CASE
    WHEN EXTRACT(DATE FROM TIMESTAMP (snapshot_ts)) in {test_dates} THEN 'TEST'
    WHEN EXTRACT(DATE FROM TIMESTAMP (snapshot_ts)) not in {test_dates} and MOD(ABS(key), 10) in (0,1,2,3,4,5,6,7) THEN 'TRAIN'
    WHEN EXTRACT(DATE FROM TIMESTAMP (snapshot_ts)) not in {test_dates} and MOD(ABS(key), 10) in (8) THEN 'VALIDATE'
    WHEN EXTRACT(DATE FROM TIMESTAMP (snapshot_ts)) not in {test_dates} and  MOD(ABS(key), 10) in (9) THEN 'TEST'
END as split,
FROM
  ds_features_key;
  )
"""
print (sql)
df_raw = bq_client.run_query(sql).to_dataframe()
df_raw.head()

## Create test dataset

In [None]:
TABLE_DS_TEST_TABLE

In [None]:
sql = f"""
CREATE OR REPLACE TABLE `{PROJECT_ID}.{DATASET_DESTINATION}.{TABLE_DS_TEST_TABLE}` AS (
SELECT * FROM
`{PROJECT_ID}.{DATASET_DESTINATION}.{TABLE_DS_SPLIT}`
WHERE EXTRACT(DATE FROM TIMESTAMP (snapshot_ts)) in {test_dates}
)
"""
print (sql)
df_raw = bq_client.run_query(sql).to_dataframe()
df_raw.head()

## Create balanced Train dataset.

Below SQL script keeps all positive instances and then randomly samples records with negative labels, balance_ratio is a parameter we set to control how many times bigger is the number of negative instances comparing to number of positive instances in the resulting dataset. For example balance_ratio=99 would mean that for every positive instance we are sampling 99 negative instances.

In [None]:
balance_ratio = 99
sql = f"""
# Checks imbalance in features by grouping on train/test/validate dataset.
# Automatically balances and downsamples dataset and create one for training.
CREATE OR REPLACE TABLE
  `{PROJECT_ID}.{DATASET_DESTINATION}.{TABLE_DS_TRAINING_BALANCED}` AS (
WITH
  dataset AS (
  SELECT
    *
  FROM
    `{PROJECT_ID}.{DATASET_DESTINATION}.{TABLE_DS_SPLIT}`
  WHERE
    split='TRAIN' ),
  imbalance_stats AS (
  SELECT
    split,
    COUNT(*) AS total_records,
    COUNTIF(label=TRUE) AS count_positive,
    COUNTIF(label=FALSE) AS count_negative,
    FORMAT("%.5f", COUNTIF(label=TRUE)/COUNTIF(label=FALSE)) AS ratio_positive_to_negative
  FROM
    dataset
  GROUP BY
    1
  ORDER BY
    1),
  -- Add a random seed to the negative examples.
  negatives AS (
  SELECT
    *,
    RAND() AS rand_seed
  FROM
    dataset
  WHERE
    label = FALSE),
  negative_sampled AS (
  SELECT
    * EXCEPT (rand_seed)
  FROM
    negatives
  WHERE
    -- Here we are sampling the negatives using designed proportion.
    rand_seed <= (
    SELECT
      CAST(ratio_positive_to_negative AS float64)
    FROM
      imbalance_stats)*{balance_ratio} )
  -- Union all together.
SELECT
  *
FROM
  negative_sampled
UNION ALL
  -- Add all positive examples.
SELECT
  *
FROM
  dataset
WHERE
  label = TRUE AND split='TRAIN'
  );
"""
print (sql)
df_raw = bq_client.run_query(sql).to_dataframe()
df_raw.head()

### Check label balance in the new traning balanced dataset.

In [None]:
# Produce summary of labels(positive/negative).
sql = f"""
SELECT
  snapshot_ts,
  COUNT(*) AS total_records,
  COUNTIF(label=TRUE) AS count_positive,
  COUNTIF(label=FALSE) AS count_negative,
  FORMAT("%.7f", SAFE_DIVIDE(COUNTIF(label=TRUE),COUNTIF(label=FALSE))) AS ratio_positive_to_negative,
  FORMAT("%.7f", SAFE_DIVIDE(COUNTIF(label=TRUE), COUNT(*))) AS ratio_positive
FROM
  `{DATASET_DESTINATION}.{TABLE_DS_TRAINING_BALANCED}`
GROUP BY
  1
ORDER BY
  1 DESC;
"""

print (sql)
df_raw = bq_client.run_query(sql).to_dataframe()
df_raw.head()