# Description
BigQuery DataFrames provides a set of [pre-defined transformers](https://cloud.google.com/python/docs/reference/bigframes/latest/bigframes.ml.preprocessing) for the ML workflows.

BigQuery DataFrames also provides a way to turn custom python code into a cloud run function and run it at scale on a DataFrame. To use it in a project certain APIs need to be enabled and IAM permissions set up. Read more details in the following links:

1. [remote_function](https://cloud.google.com/python/docs/reference/bigframes/latest/bigframes.pandas#bigframes_pandas_remote_function) API
1. Cloud artifacts and lifecycle management [details](https://cloud.google.com/bigquery/docs/use-bigquery-dataframes#remote-functions)
1. Project [set-up](https://cloud.google.com/bigquery/docs/use-bigquery-dataframes#remote-function-requirements).

In this notebook we propose three approaches based on remote functions to support custom transformer user cases:

1. ### `RemoteFunctionTransformer`
You can deploy a remote function independently and instantiate a RemoteFunctionTransformer with its path.

1. ### `BaseCustomTransformer` + `to_gbq` within
Provides the well-known fit-transform abstraction which you can override with your implementation. In the `fit` phase any statistics can be derived from the existing data, which then can be used in the `transform` phase to produce the transformation. It also provides a `to_gbq` method to persist (if needed) the computed stats and custom transform logic to a remote function, which then can be reused via the `RemoteFunctionTransformer`.

1. ### `BaseCustomTransformer` + `to_gbq` outside
Provides the well-known fit-transform abstraction which you can override with your implementation. In the `fit` phase any statistics can be derived from the existing data, which then can be used in the `transform` phase to produce the transformation. A module level `to_gbq` method is provided to persist (if needed) the computed stats and custom transform logic to a remote function, which then can be reused via the `RemoteFunctionTransformer`.

## Current limitations:

This prototype has the following limitations which will be handled in the final implementation.

1. The datatypes of the transformed dataframe are currently all strings, but they can be updated on the user side via [as_type](https://cloud.google.com/python/docs/reference/bigframes/latest/bigframes.dataframe.DataFrame#bigframes_dataframe_DataFrame_astype). The way we think it can be taken care of in the final implementation is by serializing the type information in the remote function return value.
1. Transformation of empty dataframe will break. This is because we are trying to derive the output column names from the transformation output, so we need at least one row in the output. The way we think it can be handled in the final implementation is by allowing the user to set a schema in the transformer .ctor.
1. We are including "bigframes" in the list of package dependencies during remote function deployment in the `CustomTranformer.to_gbq` method. This is likely a bug which needs more investigation.

# Set up

In [None]:
import bigframes.pandas as bpd
bpd.options.bigquery.project = "bigframes-dev"
bpd.options.bigquery.location = "US"

import pandas as pd

In [None]:
df = bpd.read_gbq("bigquery-public-data.baseball.schedules")
df = df[["gameId", "year", "homeTeamName", "awayTeamName", "duration_minutes"]]
train_df = df.iloc[:100]
test_df = df.iloc[100:200]
train_df.peek()

Unnamed: 0,gameId,year,homeTeamName,awayTeamName,duration_minutes
30,49aed28c-a6cb-40f6-a99f-cc621b727818,2016,Athletics,Rangers,148
43,a8e5300f-9c50-47c8-815b-842ae568bd16,2016,Cardinals,Rangers,178
3,198f4eed-a29f-41e2-8623-cb261e5ab370,2016,Rockies,Giants,182
28,27eac34f-66e3-4981-b37b-69ef17f0c66f,2016,Red Sox,Astros,229
52,428b245f-8ba6-4962-9575-29d2391eff98,2016,Nationals,Twins,159


# RemoteFunctionTransformer

## BigFrames provided

In [None]:
# This is supposed to be an implementation in the BigFrames library
import json
import textwrap

import bigframes.core as core
from bigframes.core import guid
import bigframes.core.schema as bf_schema
import bigframes.dtypes

from bigframes.ml.base import Transformer

class RemoteFunctionTransformer():
    def __init__(self, remote_function: str):
        """
        A customer transformer that works on a bigframes remote function.

        Args:
            remote_function:
                A function that accepts a row as input and a json serialized
                dictionary representation of the transformed row.
        """
        self.is_fitted = True
        self.remote_function = bpd.read_gbq_function(remote_function, is_row_processor=True)

    def fit(self, X: bpd.DataFrame):
        pass

    def transform(self, X: bpd.DataFrame):
        # apply remote function
        train_df_tx = X.apply(self.remote_function, axis=1)
        schema = json.loads(train_df_tx.iloc[0]).keys()

        # preserve ordering
        expr, ordering_column_name = train_df_tx._block.expr.promote_offsets()
        expr_sql = train_df_tx._block.session._executor.to_sql(expr)

        # construct sql
        select_columns = []
        select_columns.extend(train_df_tx._block.index_columns)
        select_columns.append(ordering_column_name)
        fields_selected = [f"{col}" for col in select_columns]
        fields_selected.extend(
            [
                f"JSON_VALUE({train_df_tx._block.value_columns[0]}, '$.{col}') AS {col}"
                for col in schema
            ]
        )
        fields_selected_sql = ",\n".join(fields_selected)
        sql = f"""
WITH T0 AS (
{textwrap.indent(expr_sql, "    ")}
)
SELECT
{textwrap.indent(fields_selected_sql, "    ")}
FROM T0
"""

        # materialize
        destination, _ = train_df_tx._block.session._loader._query_to_destination(
                    sql, index_cols=[ordering_column_name], api_name="transform"
                )

        # updated schema
        new_schema = train_df_tx._block.expr.schema.select(
            [*train_df_tx._block.index_columns]
        )
        for col in schema:
            new_schema = new_schema.append(
                bf_schema.SchemaItem(col, bigframes.dtypes.STRING_DTYPE)
            )
        new_schema = new_schema.append(
                        bf_schema.SchemaItem(ordering_column_name, bigframes.dtypes.INT_DTYPE)
        )

        # construct ArrayValue object
        expr = core.ArrayValue.from_table(
                    train_df_tx._block.session.bqclient.get_table(destination),
                    schema=new_schema,
                    session=train_df_tx._block.session,
                    offsets_col=ordering_column_name,
                ).drop_columns([ordering_column_name])

        # construct Block
        block = core.blocks.Block(
                    expr,
                    index_columns=train_df_tx._block.index_columns,
                    column_labels=schema,
                    index_labels=train_df_tx._block._index_labels,
                )

        # construct dataframe and return
        return bpd.DataFrame(block)

## User Journey

In [None]:
# Example transformation dependent on any stats from the original dataframe
duration_minutes_min = train_df["duration_minutes"].min()
duration_minutes_max = train_df["duration_minutes"].max()
awayTeamNames = train_df["awayTeamName"].unique().to_list()

@bpd.remote_function()
def row_transformer(row: pd.Series) -> str:
    row["duration_minutes"] = (
        (row["duration_minutes"] - duration_minutes_min) /
        (duration_minutes_max - duration_minutes_min)
    )
    vocab_dict = {word: idx for idx, word in enumerate(awayTeamNames)}
    row['awayTeamNameLabel'] = vocab_dict.get(row['awayTeamName'])
    return json.dumps(row.to_dict())



In [None]:
rf_transformer = RemoteFunctionTransformer(row_transformer.bigframes_remote_function)
transformed_df = rf_transformer.transform(train_df)
transformed_df.peek()





Unnamed: 0,gameId,year,homeTeamName,awayTeamName,duration_minutes,awayTeamNameLabel
75,37071542-7af7-484c-b059-a96969d447ad,2016,Indians,Angels,0.3586206896551724,19
95,19fdde1c-1d68-4036-ba18-4d22dcd12416,2016,Rangers,Angels,0.1724137931034483,19
64,e49412b1-1673-43cd-a732-41347a76a343,2016,Yankees,Angels,0.096551724137931,19
34,138adc90-d564-4e90-be92-41dedb96fa50,2016,Tigers,Angels,0.2,19
47,a7fc942e-8f2d-4d1c-82e5-e93248a899ae,2016,Rangers,Astros,0.1448275862068965,15


# CustomTransformer (persistence achieved via `to_gbq` in base class)

## BigFrames provided

In [None]:
# This is supposed to be an implementation in the BigFrames library
import abc
import cloudpickle
import json

class BaseCustomTransformer:
    def __init__(self):
        self.is_fitted = False

    @abc.abstractmethod
    def fit(self, X: pd.DataFrame):
        raise NotImplementedError()

    @abc.abstractmethod
    def transform(self, X: pd.DataFrame):
        raise NotImplementedError()

    def to_gbq(self, dataset: str = None, routine_name: str = None, packages: list[str] = None):
        if not self.is_fitted:
            raise RuntimeError("The transformer has not been fitted yet. Please call 'fit' before 'to_gbq'.")

        def dataframe_transformer(series: pd.Series) -> str:
            df = series.to_frame().T
            transformed = self.transform(df)
            return json.dumps(transformed.iloc[0].to_dict())

        # THIS IS A HACK!
        # For some reason cloudpickle.load() is failing in the cloud function
        # code due to bigframes module not found error.
        # For now let's make sure bigframes package is installed in the cloud
        # function environment. Ideally we should not need this.
        # TODO: investigate the root cause and remove bigframes from the
        # required packages
        required_packages = ["bigframes"]

        if packages:
             required_packages.extend(packages)

        remote_transform = bpd.remote_function(dataset=dataset, name=routine_name, packages=required_packages)(dataframe_transformer)
        return remote_transform.bigframes_remote_function

## User Journey

In [None]:
# This is customer's implementation
class CustomTransformer(BaseCustomTransformer):
    def __init__(self):
        self.is_fitted = False
        self.duration_minutes_min = None
        self.duration_minutes_max = None
        self.away_team_name = None

    def fit(self, dataframe: pd.DataFrame):
        """
        Fit the transformer on a dataframe. This method should be overridden by child classes to implement custom logic.

        :param dataframe: Input dataframe to fit the transformer on
        """
        self.duration_minutes_min = dataframe["duration_minutes"].min()
        self.duration_minutes_max = dataframe["duration_minutes"].max()
        self.away_team_name = dataframe["awayTeamName"].unique().to_list()
        self.is_fitted = True

    def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
        """
        Apply the transformation to the input dataframe.

        :param dataframe: Input dataframe to apply the transformation
        :return: Transformed dataframe
        """
        if not self.is_fitted:
            raise RuntimeError("The transformer has not been fitted yet. Please call 'fit' before 'transform'.")

        dataframe["duration_minutes"] = (dataframe["duration_minutes"] - self.duration_minutes_min) / (
            self.duration_minutes_max - self.duration_minutes_min)
        vocab_dict = {word: idx for idx, word in enumerate(self.away_team_name)}
        dataframe['label_encoded_away_team_name'] = dataframe['awayTeamName'].map(vocab_dict)
        return dataframe

In [None]:
tx = CustomTransformer()
tx.fit(train_df)
tx.transform(train_df).peek()

Unnamed: 0,gameId,year,homeTeamName,awayTeamName,duration_minutes,label_encoded_away_team_name
99,26b728ab-ddd0-473a-8c75-d27181adac0f,2016,Cardinals,Reds,0.158621,14
12,bca90342-7ddc-468e-b189-d43fad7528ec,2016,Astros,Rays,0.365517,1
62,e27bdd74-2b5c-4657-aeb6-286efe0395df,2016,Dodgers,Giants,0.282759,2
95,19fdde1c-1d68-4036-ba18-4d22dcd12416,2016,Rangers,Angels,0.172414,19
79,6eba83cd-89bd-4a9c-bb5c-96d915f00cd6,2016,Astros,Rangers,0.662069,16


In [None]:
bqrf = tx.to_gbq(dataset="zzz_shobs_us", routine_name="my_custom_transformer")
bqrf



'bigframes-dev.zzz_shobs_us.my_custom_transformer'

In [None]:
rftx = RemoteFunctionTransformer("zzz_shobs_us.my_custom_transformer")
train_df_tx = rftx.transform(train_df)
train_df_tx.peek()





Unnamed: 0,gameId,year,homeTeamName,awayTeamName,duration_minutes,label_encoded_away_team_name
46,92f43f65-bb36-4d12-8dca-728285aca1b4,2016,Athletics,Brewers,-0.9690368608799048,0
1,af72a0b9-65f7-49fb-9b30-d505068bdf6d,2016,Reds,Brewers,-0.9709393579072532,0
0,d60c6036-0ce1-4c90-8dd9-de3b403c92a8,2016,Nationals,Brewers,-0.9711771700356716,0
85,8751e809-17cf-44bb-9c8b-b705eda3f104,2016,Braves,Brewers,-0.970178359096314,0
40,02a721f3-1074-48a0-9d47-0a1e918ea7c4,2016,Marlins,Rays,-0.9695600475624256,1


# CustomTransformer (persistence achieved via module level `to_gbq`)

## BigFrames provided

In [None]:
# This is supposed to be an implementation in the BigFrames library
import abc
import cloudpickle
import json

class BaseCustomTransformer:
    def __init__(self):
        self.is_fitted = False

    @abc.abstractmethod
    def fit(self, X: pd.DataFrame):
        raise NotImplementedError()

    @abc.abstractmethod
    def transform(self, X: pd.DataFrame):
        raise NotImplementedError()


def to_gbq(transformer, dataset: str = None, routine_name: str = None, packages: list[str] = None):
    if not transformer.is_fitted:
        raise RuntimeError("The transformer has not been fitted yet. Please call 'fit' before 'to_gbq'.")

    def dataframe_transformer(series: pd.Series) -> str:
        df = series.to_frame().T
        transformed = transformer.transform(df)
        return json.dumps(transformed.iloc[0].to_dict())

    remote_transform = bpd.remote_function(dataset=dataset, name=routine_name, packages=packages)(dataframe_transformer)
    return remote_transform.bigframes_remote_function

## User Journey

In [None]:
# This is customer's implementation
class CustomTransformer(BaseCustomTransformer):
    def __init__(self):
        self.is_fitted = False
        self.duration_minutes_min = None
        self.duration_minutes_max = None
        self.away_team_name = None

    def fit(self, dataframe: pd.DataFrame):
        """
        Fit the transformer on a dataframe. This method should be overridden by child classes to implement custom logic.

        :param dataframe: Input dataframe to fit the transformer on
        """
        self.duration_minutes_min = dataframe["duration_minutes"].min()
        self.duration_minutes_max = dataframe["duration_minutes"].max()
        self.away_team_name = dataframe["awayTeamName"].unique().to_list()
        self.is_fitted = True

    def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
        """
        Apply the transformation to the input dataframe.

        :param dataframe: Input dataframe to apply the transformation
        :return: Transformed dataframe
        """
        if not self.is_fitted:
            raise RuntimeError("The transformer has not been fitted yet. Please call 'fit' before 'transform'.")

        dataframe["duration_minutes"] = (dataframe["duration_minutes"] - self.duration_minutes_min) / (
            self.duration_minutes_max - self.duration_minutes_min)
        vocab_dict = {word: idx for idx, word in enumerate(self.away_team_name)}
        dataframe['label_encoded_away_team_name'] = dataframe['awayTeamName'].map(vocab_dict)
        return dataframe

In [None]:
tx = CustomTransformer()
tx.fit(train_df)
tx.transform(train_df).peek()

Unnamed: 0,gameId,year,homeTeamName,awayTeamName,duration_minutes,label_encoded_away_team_name
2,f57e1271-d217-400a-aea6-2e2d7d6a59a0,2016,Orioles,Rays,0.172414,1
40,02a721f3-1074-48a0-9d47-0a1e918ea7c4,2016,Marlins,Rays,0.413793,1
88,5164e2ac-8872-45e2-a4c0-bacb91dcb111,2016,Royals,Red Sox,0.227586,27
4,cb3ef033-dd57-41fd-b206-cdd3bc12c74f,2016,Twins,Indians,0.434483,3
45,d53d0458-842b-4993-bed6-2465b9dd5e4a,2016,Giants,Orioles,0.365517,4


In [None]:
bqrf = to_gbq(tx, dataset="zzz_shobs_us", routine_name="my_custom_transformer")
bqrf



'bigframes-dev.zzz_shobs_us.my_custom_transformer'

In [None]:
rftx = RemoteFunctionTransformer("zzz_shobs_us.my_custom_transformer")
train_df_tx = rftx.transform(train_df)
train_df_tx.peek()





Unnamed: 0,gameId,year,homeTeamName,awayTeamName,duration_minutes,label_encoded_away_team_name
34,138adc90-d564-4e90-be92-41dedb96fa50,2016,Tigers,Angels,0.2,19
64,e49412b1-1673-43cd-a732-41347a76a343,2016,Yankees,Angels,0.096551724137931,19
75,37071542-7af7-484c-b059-a96969d447ad,2016,Indians,Angels,0.3586206896551724,19
95,19fdde1c-1d68-4036-ba18-4d22dcd12416,2016,Rangers,Angels,0.1724137931034483,19
47,a7fc942e-8f2d-4d1c-82e5-e93248a899ae,2016,Rangers,Astros,0.1448275862068965,15
