# Copyright 2023-2024 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

     https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.

# Preprocessing for SPADE Anomaly Detection

### This is an example of preprocessing an input dataset so that it is suitable for SPADE. Different use cases may require different types of preprocessing.


### Some use cases require a Colab runtime with at least 64GB of RAM and up to 256GB of RAM (for some preprocessing configurations). GPUs are not required.

This Notebook will read your BigQuery table and preprocess the features for SPADE training. In summary:

1. It will verify that the label column exists and is of type boolean, integer or float. It will verify its cardinality to be 2 (Positive+Negative samples) or 3 (Positive+Negative+Unlabeled samples).
2. All NaNs (Pandas and Numpy) will be replaced with an appropriate fill value for that type of column.
3. All datetime columns will be converted to string columns.
4. All string columns will be converted to categorical columns. *Optionally*, their maximum cardinality will be reduced to 5 categories (including an 'infrequent' category). All values with frequency less than 5% will be assigned to the 'infrequent' category. Then they will be [one-hot encoded](https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.OneHotEncoder.html).
5. Then all columns except the label column (integer, float, one-hot encoded) will be normalized in the range [0, 1] with [min-max scaling](https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.MinMaxScaler.html).
6. The resulting table will be uploaded to the same GCP Project and BigQuery Dataset as the input table.
7. The resulting table is then randomly split into a `train` and `test` table specified by the fractional size of the `test` table.
8. A specified fraction of the labels in the `train` table are discarded. This is for unsupervised training in Stage 1 of SPADE.
7. Both the `train` and `test` tables are uploaded to the same GCP Project and BigQuery Dataset as the input table.

In [None]:
# @title Install packages for preprocessing - it is safe to ignore any `subprocess` errors in this cell

!pip install -qq -U pandas-gbq pyarrow google-cloud-bigquery-storage
!pip install -qq -U dask dask-bigquery

# Upgrade Sklearn to the latest version - needed for preprocessing.
!pip install -qq -U sklearn

### Restart the kernel
Once you've installed the additional packages, you need to restart the notebook kernel so it can find the packages.


**Note: Once this cell has finished running, continue on. You do not need to re-run any of the cells above.**


In [None]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

### Set your project ID

Set your project ID below. If you know know your project ID, leave the field blank and the following cells may be able to find it. Optionally, you may also set a service account in the cell below.

In [None]:
PROJECT_ID = "[your-project-id]"  # @param {type:"string", default:"[your-project-id]"}

if PROJECT_ID == "" or PROJECT_ID is None or PROJECT_ID == "[your-project-id]":
    # Get your GCP project id from gcloud
    shell_output = !gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
print("Project ID:", PROJECT_ID)

In [None]:
!gcloud auth application-default login

In [None]:
from google.colab import auth as google_auth
google_auth.authenticate_user()

In [None]:
# @title Imports

from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple, Union

import numpy as np


In [None]:

import pandas as pd
import pandas_gbq

import dask
import dask_bigquery

from sklearn import model_selection, preprocessing
from google.cloud import bigquery


In [None]:
!gcloud auth application-default set-quota-project $PROJECT_ID
!gcloud config set project $PROJECT_ID

In [None]:
# @title The BigQuery location of your source and preprocessed data tables.

dataset_id = '[your-dataset]'  # @param {type:"string"}
source_table_id = '[source-source-table]'  # @param {type:"string"}
label_column = '[your-label-column]'  # @param {type:"string"}
preprocessed_table_id = '[your-preprocessed-table]'  # @param {type:"string"}

location = 'us-central1'  # @param {type:"string"}

client = bigquery.Client(project=PROJECT_ID, location=location)

In [None]:
# @title Preprocessing Functions - do not edit if not needed

def get_number_of_rows(
    df: Union[pd.DataFrame, dask.dataframe.core.DataFrame]
) -> int:
    if isinstance(df, dask.dataframe.core.DataFrame):
      return df.shape[0].compute()
    else:
      return len(df)

def get_column_unique_counts(
    df: Union[pd.DataFrame, dask.dataframe.core.DataFrame]
) -> Dict[str, int]:
    # Get a column-to-number-of-unique-values dict, as it is expensive to compute.
    # Reuse the dict as needed.
    nuniques = df.nunique()
    if isinstance(df, dask.dataframe.core.DataFrame):
        nuniques = nuniques.compute()
    return nuniques.to_dict()
    # return {c: df[c].nunique() for c in df.columns}

def get_columns_to_drop(
    df: Union[pd.DataFrame, dask.dataframe.core.DataFrame],
    columns_to_nuniques: Dict[str, int],
    len_df: int,
) -> list[str]:
    df_columns = df.columns.to_list()
    columns_with_all_nan = [c for c in df_columns if columns_to_nuniques[c] == 0]
    columns_with_cardinality_1 = [c for c in df_columns if columns_to_nuniques[c] == 1]
    columns_with_all_uniques = [c for c in df_columns if columns_to_nuniques[c] == len_df]
    return columns_with_all_nan + columns_with_cardinality_1 + columns_with_all_uniques

def set_datetime_on_timestamp_columns(
    df: Union[pd.DataFrame, dask.dataframe.core.DataFrame],
    timestamp_columns: List[str],
) -> Union[pd.DataFrame, dask.dataframe.core.DataFrame]:
    for c in timestamp_columns:
      # 'utc' may need to be True for your use case.
      df[c] = pd.to_datetime(df[c], utc=False)
    return df

def convert_datetime_columns_to_string(
    df: Union[pd.DataFrame, dask.dataframe.core.DataFrame],
) -> Union[pd.DataFrame, dask.dataframe.core.DataFrame]:
    columns_datetime = df.select_dtypes(
        include=['datetime', 'datetimetz', 'datetime64', np.datetime64]).columns
    # Convert all datetime columns to strings.
    for c in columns_datetime:
        # df[c] = pd.to_datetime(df[c])
        df[c] = df[c].dt.strftime("%Y-%m-%d %H:%M:%S")
        df[c] = df[c].astype(str)
    return df

def replace_na(
    df: Union[pd.DataFrame, dask.dataframe.core.DataFrame],
) -> Union[pd.DataFrame, dask.dataframe.core.DataFrame]:
    """Replace NaN values with an appropriate fill value determined by column type."""
    columns_datetime = df.select_dtypes(
        include=['datetime', 'datetimetz', 'datetime64', np.datetime64,
        ]
    ).columns
    columns_numeric = df.select_dtypes(include='number').columns
    columns_string = df.select_dtypes(include='object').columns
    dt_nan_replacement = str(pd.to_datetime(datetime.fromtimestamp(0.0)))
    cols_dt_map = dict(
        zip(columns_datetime, [dt_nan_replacement] * len(columns_datetime))
    )
    cols_num_map = dict(zip(columns_numeric, [0.0] * len(columns_numeric)))
    cols_str_map = dict(zip(columns_string, [''] * len(columns_string)))
    cols_nan_map = {**cols_num_map, **cols_str_map, **cols_dt_map}
    return df.fillna(cols_nan_map)

def get_numeric_columns(
    df: Union[pd.DataFrame, dask.dataframe.core.DataFrame],
) -> list[str]:
    return df.select_dtypes(include=['number']).columns

def get_string_columns(
    df: Union[pd.DataFrame, dask.dataframe.core.DataFrame],
) -> list[str]:
    return df.select_dtypes(include=['object']).columns

def select_categorical_columns(
    df: Union[pd.DataFrame, dask.dataframe.core.DataFrame],
    columns_to_nuniques: Dict[str, int],
    cat_cutoff: int,
) -> list[str]:
    columns_string = get_string_columns(df)
    return [c for c in columns_string if columns_to_nuniques[c] <= cat_cutoff]

def get_binary_columns(
    df: Union[pd.DataFrame, dask.dataframe.core.DataFrame],
    columns_to_nuniques: Dict[str, int],
) -> list[str]:
    return [col for col, c in columns_to_nuniques.items() if c == 2]

def remap_to_binary_columns(
    df: Union[pd.DataFrame, dask.dataframe.core.DataFrame],
    binary_columns: list[str],
) -> Union[pd.DataFrame, dask.dataframe.core.DataFrame]:
    if binary_columns:
      if isinstance(df, dask.dataframe.core.DataFrame):
        # Find the pairs of unique values in the binary columns.
        uniques = [df[c].unique().compute().to_list() for c in binary_columns]
        # Need to call categorize() first in Dask.
        df = df.categorize(columns=binary_columns)
        df_bin = dask.dataframe.get_dummies(data=df, columns=binary_columns)
        df = dask.dataframe.concat([df, df_bin], axis=1)
        # At this point there are 2 new columns per binary column. Each new
        # column has a name equal to one of the 2 binary values from the
        # original column. We arbitrarily drop the first value and keep the
        # second.
        cols_to_drop = [u[0] for u in uniques] + binary_columns
        df = df.drop(cols_to_drop, axis=1)
        df = df.rename(dict(zip([u[1] for u in uniques], binary_columns)))
      elif isinstance(df, pd.DataFrame):
        uniques = [df[c].unique().to_list() for c in binary_columns]
        for column, unique_values in zip(binary_columns, uniques):
          df_bin = pd.get_dummies(df[column])
          df = pd.concat((df, df_bin), axis=1)
          # At this point there are 2 new columns per binary column. Each new
          # column has a name equal to one of the 2 binary values from the
          # original column. We arbitrarily drop the first value and keep the
          # second.
          cols_to_drop = [unique_values[0], column]
          df = df.drop(cols_to_drop, axis=1)
          df = df.rename(columns={unique_values[1]: column})
    return df

def drop_duplicate_columns(
    df: Union[pd.DataFrame, dask.dataframe.core.DataFrame],
) -> pd.DataFrame:
    return df.iloc[:,~df.columns.duplicated()]

# One-Hot Encoding
def _one_hot_encode_categorical_features(
    data: pd.DataFrame,
    original_columns: list[str],
    categorical_columns: list[str],
    ohe: preprocessing.OneHotEncoder = None,
    # How to handle unknown categories: one of ‘ignore’, ‘infrequent_if_exist’
    handle_unknown: str = 'infrequent_if_exist',
    # Minimum fraction of samples below which a category will be considered
    # infrequent.
    min_frequency: float = 0.05,
    # Upper limit to the number of output features for each input feature,
    # including an infrequent category.
    max_categories: int = 5,
) -> Tuple[pd.DataFrame, preprocessing.OneHotEncoder]:
  """one-hot encodes categorical feature columns in the input array."""
  # Setup the One-Hot Encoder. If it is already provided, that means that the
  # encoder was already previously fitted to training data. In that case we will
  # just apply the transform using the fitted encoder.
  if ohe is None:
    # Need to fit a new One-Hot Encoder.
    ohe = preprocessing.OneHotEncoder(
        sparse_output=False,
        drop='first',
        handle_unknown=handle_unknown,
        min_frequency=min_frequency,
        max_categories=max_categories)

    #One-hot-encode the categorical columns using the newly fitted encoder.
    data_onehot_encoded = ohe.fit_transform(data)
  else:
    #One-hot-encode the categorical columns using the previously fitted encoder.
    data_onehot_encoded = ohe.transform(data)

  # Concatenate the two arrays: categorical-one-hot-encoded and other.
  data_encoded = pd.DataFrame(
      data=data_onehot_encoded, columns=ohe.get_feature_names_out())
  return data_encoded, ohe

def one_hot_encode_categorical_features(
    df: Union[pd.DataFrame, dask.dataframe.core.DataFrame],
    categorical_columns: list[str],
    label_column: str,
    ohe: preprocessing.OneHotEncoder = None,
    # How to handle unknown categories: one of ‘ignore’, ‘infrequent_if_exist’
    handle_unknown: str = 'infrequent_if_exist',
    # Minimum fraction of samples below which a category will be considered
    # infrequent.
    min_frequency: float = 0.05,
    # Upper limit to the number of output features for each input feature,
    # including an infrequent category.
    max_categories: int = 5,
) -> Tuple[np.ndarray, preprocessing.OneHotEncoder]:
  """one-hot encodes categorical feature columns in the input DataFrame."""
  # We choose to loop over the columns and pass each as a Numpy array to the
  # One Hot encoder, because the Dask DummyEncoder oes not support setting the
  # minimum frequency or the maximum categories.
  df_columns = df.columns.to_list()
  cols_excl_label = [c for c in df_columns if c != label_column]
  cols_excl_cat_cols = [c for c in cols_excl_label if c not in categorical_columns]
  # Extract the categorical array for this dataframe.
  if isinstance(df, dask.dataframe.core.DataFrame):
    df_cat = df.loc[:, categorical_columns].compute()
    df_noncat = df.loc[:, cols_excl_cat_cols].compute()
    df_label = df.loc[:, label_column].compute()
  elif isinstance(df, pd.DataFrame):
    df_cat = df.loc[:, categorical_columns]
    df_noncat = df.loc[:, cols_excl_cat_cols]
    df_label = df.loc[:, label_column]
  # Encode the array
  df_encoded, ohe = _one_hot_encode_categorical_features(
      data=df_cat,
      original_columns=df_columns,
      categorical_columns=categorical_columns,
      ohe=None,
      handle_unknown='infrequent_if_exist',
      min_frequency=min_frequency,
      max_categories=max_categories,
  )
  result = pd.concat([df_noncat, df_encoded, df_label], axis=1)
  return result, ohe

# Min-Max Scaling
def _min_max_scale_features(
    data: pd.DataFrame,
    mms: preprocessing.MinMaxScaler = None,
    # Whether to clip transformed data to the same range as the fitted data
    clip: bool = True,
) -> Tuple[pd.DataFrame, preprocessing.MinMaxScaler]:
  """min-max scales feature columns in the input array."""
  columns = data.columns.to_list()
  # Setup the Min-Max Scaler. If it is already provided, that means that the
  # encoder was already previously fitted to training data. In that case we will
  # just apply the transform using the fitted scaler.
  if mms is None:
    # Need to fit a new Min-Max Scaler.
    mms = preprocessing.MinMaxScaler(clip=clip)

    # Min-max-scale the feature columns using the newly fitted scaler.
    data_minmax_scaled = mms.fit_transform(data)
  else:
    # Min-max-scale the feature columns using the previously fitted scaler.
    data_minmax_scaled = mms.transform(data)

  # Create dataframe.
  data_scaled = pd.DataFrame(data=data_minmax_scaled, columns=columns)
  return data_scaled, mms

def min_max_scale_features(
    df: Union[pd.DataFrame, dask.dataframe.core.DataFrame],
    label_column: str,
    mms: preprocessing.MinMaxScaler = None,
    # Whether to clip transformed data to the same range as the fitted data
    clip: bool = True,
) -> Tuple[np.ndarray, preprocessing.MinMaxScaler]:
  """min-max scaled feature columns in the input DataFrame."""
  df_columns = df.columns.to_list()
  cols_excl_label = [c for c in df_columns if c != label_column]
  # Extract the categorical array for this dataframe.
  if isinstance(df, dask.dataframe.core.DataFrame):
    df_features = df.loc[:, cols_excl_label].compute()
    df_label = df.loc[:, label_column].compute()
  elif isinstance(df, pd.DataFrame):
    df_features = df.loc[:, cols_excl_label]
    df_label = df.loc[:, label_column]
  # Encode the array
  df_scaled, mms = _min_max_scale_features(
      data=df_features,
      mms=None,
      clip=clip,
  )
  df_label = df_label.astype(np.float64)
  result = pd.concat([df_scaled, df_label], axis=1)
  return result, mms


In [None]:
# @title Load the data using Pandas or Dask - Dask is not as well tested

%%time

use_pandas = True  # @param {type:"boolean"}

if use_pandas:
  df = pandas_gbq.read_gbq(
      query_or_table=f'{dataset_id}.{source_table_id}',
      project_id=PROJECT_ID,
      use_bqstorage_api=True,
  )
else:
  df = dask_bigquery.read_gbq(
      project_id=PROJECT_ID,
      dataset_id=dataset_id,
      table_id=source_table_id,
  )

In [None]:
df.info()

In [None]:
df.head(3)

In [None]:
# @title Get counts of unique values for each column

# Warning: this cell can take a long time to run.

%%time

column_nuniques = get_column_unique_counts(df=df)

In [None]:
# @title Get length of dataframe

# Warning: this cell can take a long time to run.

%%time

len_df = get_number_of_rows(df)
print(f'Number of rows: {len_df}')

In [None]:
# @title Verify the type and cardinality of the label column

label_col_cardinality = column_nuniques.get(label_column, -1)
if label_col_cardinality == -1:
  raise ValueError(
      f'Label column "{label_column}" is not found in input table '
      f'{PROJECT_ID}.{dataset_id}.{source_table_id}.'
  )
if label_col_cardinality < 2 or label_col_cardinality > 3:
  raise ValueError(
      f'Label column "{label_column}" must have either 2 (Positive-Negative) '
      'or 3 (Positive-Negative-Unlabeled) unique values.'
  )

label_col_type = df[label_column].dtype
if label_col_type not in [pd.BooleanDtype(), np.dtypes.Float64DType(), pd.Int64Dtype()]:
  raise TypeError(
      f'The type {label_col_type} of label column "{label_column}" is not in '
      'the allowed types [int, float].'
  )


In [None]:
# @title Drop the columns that we want to exclude

%%time

# Columns with all NaNs, with cardinality=1 (only 1 unique value) or
# cardinality=number of rows (all unique values) should be dropped.

columns_to_drop = get_columns_to_drop(df, column_nuniques, len_df)
df = df.drop(columns_to_drop, axis='columns')

In [None]:
# @title Set datetime on timestamp columns

%%time

df = set_datetime_on_timestamp_columns(
    df=df, timestamp_columns=[c for c in df.columns if 'timestamp' in c]
)

In [None]:
# @title Replace all NaNs with zero.

%%time

df = replace_na(df=df)


In [None]:
# @title Convert datetime columns to string columns.

# Warning: this cell can take a long time to run.

%%time

df = convert_datetime_columns_to_string(df)

In [None]:
# @title Handle categorical columns

# Warning: this cell can take a long time to run.

%%time

# @markdown ### Set minimum frequency and maximum categories here.
min_frequency = 0.01
# Set maximum categories to None if no upper limit to the number of categories.
max_categories = None

df_dtypes = {c: df[c].dtype for c in df.columns}
categorical_columns = [c for c, t in df_dtypes.items() if t == np.dtypes.ObjectDType]

df_final, ohe = one_hot_encode_categorical_features(
    df=df,
    categorical_columns=categorical_columns,
    label_column=label_column,
    ohe=None,
    # How to handle unknown categories: one of ‘ignore’, ‘infrequent_if_exist’
    handle_unknown='infrequent_if_exist',
    # Minimum fraction of samples below which a category will be considered
    # infrequent.
    min_frequency=min_frequency,
    # Upper limit to the number of output features for each input feature,
    # including an infrequent category.
    max_categories=max_categories)

In [None]:
# @title Min-Max scale all features columns.

# Warning: this cell can take a long time to run.

%%time

df_final, mms = min_max_scale_features(
    df=df_final,
    label_column=label_column,
    mms=None,
    clip=True,
)

In [None]:
# @title Column names are mangled so rename for BigQuery compatibility.

# This means that only alphanumeric characters and underscores are allowed.
# Remap special characters to underscores.

%%time

replace_chars = '/\\.,:;@#{}()[]!?$%^&*+-=<>|\'"`~ '
replace_with = ''.join(['_'] * len(replace_chars))
translation = str.maketrans(replace_chars, replace_with)


df_final = df_final.rename(
    columns={
        c: c.translate(translation)
        for c in df_final.columns.to_list()
    }
)

In [None]:
df_final.columns

In [None]:
df_final.head(3)

In [None]:
df_final_dtypes = {c: df_final[c].dtype for c in df_final.columns}
df_final_dtypes

In [None]:
# @title Upload preprocessed table to BigQuery

%%time

def _get_schema_from_dataframe(
    df: Union[pd.DataFrame, dask.dataframe.core.DataFrame]
) -> List[bigquery.SchemaField]:
  """Gets the BigQuery schema from a dataframe."""
  df_dtypes = {c: df[c].dtype for c in df.columns}
  # df_dtypes_updt = dict()
  # for c, dt in df_dtypes.items():
  #   if dt == pd.BooleanDtype(): df_dtypes_updt[c] = 'boolean'
  #   elif dt == pd.Int64Dtype(): df_dtypes_updt[c] = 'Int64'
  #   else: df_dtypes_updt[c] = dt
  # BOOL, FLOAT64, INT64 are the only allowed dtypes after preprocessing.
  # allowed_dtypes = ["boolean", np.dtypes.Float64DType, "Int64"]
  allowed_dtypes = [pd.BooleanDtype(), np.dtypes.Float64DType(), pd.Int64Dtype()]
  df_dtypes_filt = {c: dt for c, dt in df_dtypes.items() if dt not in allowed_dtypes}
  if df_dtypes_filt:
    raise ValueError(f'Preprocessing did not handle all dtypes: {df_dtypes_filt}')
  pd_to_bq_dtype_map = {
      pd.BooleanDtype(): bigquery.enums.SqlTypeNames.BOOL,
      np.dtypes.Float64DType(): bigquery.enums.SqlTypeNames.FLOAT64,
      pd.Int64Dtype(): bigquery.enums.SqlTypeNames.INT64,
  }
  schema=[
      bigquery.SchemaField(
          c, pd_to_bq_dtype_map[dt], mode='REQUIRED'
      ) for c, dt in df_dtypes.items()
  ]
  return schema

def load_dataframe_to_bq_table(
    df: pd.DataFrame,
    table_id: str,
):
  """Loads a dataframe into a BigQuery table."""
  job_config = bigquery.LoadJobConfig(
      schema=_get_schema_from_dataframe(df),
      write_disposition='WRITE_TRUNCATE',
  )

  job = client.load_table_from_dataframe(
      df, table_id, job_config=job_config
  )  # Make an API request.
  job.result()  # Wait for the job to complete.

  table = client.get_table(table_id)  # Make an API request.
  print(
      "Loaded {} rows and {} columns to {}".format(
          table.num_rows, len(table.schema), table_id
      )
  )


table_id = f'{PROJECT_ID}.{dataset_id}.{preprocessed_table_id}'

load_dataframe_to_bq_table(df=df_final, table_id=table_id)

In [None]:
# @title Split the final dataframe into a train and test set

%%time

test_fraction = 0.2  # @param {type:"number"}
assert 0.0 < test_fraction <= 0.5, "'test_fraction' should be between 0 an 0.5"

df_train, df_test = model_selection.train_test_split(
    df_final,
    test_size=test_fraction,
    random_state=42,
    shuffle=True,
    stratify=None,
)

In [None]:
df_train.reset_index(drop=True, inplace=True)
df_test.reset_index(drop=True, inplace=True)

In [None]:
# @title For the training set, set a large fraction of labels to 'unlabeled'.

%%time

# Set the same value for the unlabeled data here as you use in the training script.
unlabeled_data_value = -1  # @param {type:"integer"}

# This fraction of labels will be discarded for unsupervisd training.
unlabeled_data_fraction = 0.98  # @param {type:"number"}

unlabeled_size = int(len(df_train) * unlabeled_data_fraction) + 1
unlabeled_size = min(len(df_train), unlabeled_size)
print(f'Setting {unlabeled_size} rows to unlabeled.')

unlabeled_idxs = np.random.choice(np.arange(len(df_train)), size=unlabeled_size, replace=False)

df_train.loc[unlabeled_idxs, label_column] = unlabeled_data_value

In [None]:
df_train.head(3)

In [None]:
df_test.head(3)

In [None]:
# @title Upload both train and test tables to BigQuery

%%time

# Train table
train_table_id = f'{PROJECT_ID}.{dataset_id}.{preprocessed_table_id}_train'
load_dataframe_to_bq_table(df=df_train, table_id=train_table_id)

# Test table
test_table_id = f'{PROJECT_ID}.{dataset_id}.{preprocessed_table_id}_test'
load_dataframe_to_bq_table(df=df_test, table_id=test_table_id)