# License

In [None]:
# Copyright 2022 Google LLC

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

#      https://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [Run in Colab](https://colab.research.google.com/github/google/profit-bidder/blob/main/solution_test/profit_bidder_quickstart.ipynb)

# Overview

The current notebook acts as a quick startup guide to make you understand the different steps involved in the solution. Unlike the production pipeline that you can set up using the complete solution, the notebook runs through all the steps in one place using synthesized test data. Please note that you will **not be able to test the final step** because of fake synthesized data.

## Scope of this notebook
### Dataset
We provide synthesized data sets in the gitrepo that you will clone and use in the notebook. There are three csv files:
* p_Campaign_43939335402485897.csv
* p_Conversion_43939335402485897.csv
* client_profit.csv

In addition, we also provide the schema for the above files in json format which you will use in the notebook to create the tables in the BigQuery.

### Objective
 To help you be conversant on the following:
1. Setup your environment (install the libraries, initialize the variables, authenticate to Google Cloud, etc.)
1. Create a service account and two BigQuery datasets
1. Transform the data, create batches of the data, and push the data through a REST API call to CM360

### Costs
This tutorial uses billable components of Google Cloud:
* [BigQuery](https://cloud.google.com/bigquery)

Use the [Pricing Calculator](https://cloud.google.com/products/calculator/) to generate a cost estimate based on your projected usage.

## Before you begin
For this reference guide, you need a [Google Cloud project](https://console.cloud.google.com/cloud-resource-manager).

You can create a new one, or select a project you already created.
The following steps are required, regardless where you are running your notebook (local or in Cloud AI Platform Notebook).
* [Select or create a Google Cloud project](https://console.cloud.google.com/cloud-resource-manager). When you first create an account, you get a $300 free credit towards your compute/storage costs.
* [Make sure that billing is enabled for your project](https://cloud.google.com/billing/docs/how-to/modify-project). 
* (When using non-Google Cloud local envirionments)Install Google Cloud SDK [Google Cloud SDK](https://cloud.google.com/sdk/)

### Mandatory variables
You must set the below variables:
* PB_GCP_PROJECT to [Your Google Cloud Project]
* PB_GCP_APPLICATION_CREDENTIALS to [Full path with the file name to the Service Account json file, if you chose to use Service Account to authenticate to Google Cloud]

# Setup environment

## *PIP install appropriate packages*

In [None]:
%pip install google-cloud-storage # for Storage Account
%pip install google-cloud # for cloud sdk
%pip install google-cloud-bigquery # for BigQuery
%pip install google-cloud-bigquery-storage # for BigQuery Storage client
%pip install google-api-python-client # for Key management
%pip install oauth2client # for Key management

## *Initialize all the variables*

### *Remove all envrionment variables*
Comes handy in troubleshooting

In [None]:
# remove all localvariables
# ^^^^^^^^^^^^^^^^^^^^^
# beg utils
# ^^^^^^^^^^^^^^^^^^^^^
# local scope
myvar = [key for key in locals().keys() if not key.startswith('_')]
print (len(locals().keys()))
print (len(myvar))
# print (myvar)
for eachvar in myvar:
    print (eachvar)
    del locals()[eachvar]
print (len(locals().keys()))
# global scope
myvar = [key for key in globals().keys() if not key.startswith('_')]
print (len(globals().keys()))
print (len(myvar))
# print (myvar)
for eachvar in myvar:
    print (eachvar)
    del globals()[eachvar]
print (len(globals().keys()))
# ^^^^^^^^^^^^^^^^^^^^^
# end utils
# ^^^^^^^^^^^^^^^^^^^^^

### *Create Python and Shell envrionment variables*

In [None]:
# GCP Project
PB_GCP_PROJECT = "my-project" #@param {type:"string"}

# Default values
PB_SOLUTION_PREFIX="pb_"  #@param {type:"string"}
# service account
PB_SERVICE_ACCOUNT_NAME=PB_SOLUTION_PREFIX+"profit-bidder" #@param {type:"string"}
PB_SERVICE_ACCOUNT_NAME=PB_SERVICE_ACCOUNT_NAME.replace('_','-')
PB_SA_ROLES="roles/bigquery.dataViewer roles/pubsub.publisher roles/iam.serviceAccountTokenCreator"
PB_SA_EMAIL=PB_SERVICE_ACCOUNT_NAME + '@' + PB_GCP_PROJECT + '.iam.gserviceaccount.com'

# BQ DS for SA360/CM360
PB_DS_SA360=PB_SOLUTION_PREFIX + "sa360_data" #@param {type:"string"}
# BQ DS for Business data 
PB_DS_BUSINESS_DATA=PB_SOLUTION_PREFIX + "business_data" #@param {type:"string"}
# Client margin table
PB_CLIENT_MARGIN_DATA_TABLE_NAME="client_margin_data_table" #@param {type:"string"}
# Tranformed data table
PB_CM360_TABLE="my_transformed_data" #@param {type:"string"}
PB_CM360_PROFILE_ID="my_cm_profileid" #@param {type:"string"}
PB_CM360_FL_ACTIVITY_ID="my_fl_activity_id" #@param {type:"string"}
PB_CM360_FL_CONFIG_ID="my_fl_config_id" #@param {type:"string"}

# DON'T CHNAGE THE BELOW VARIABLES; it is hardcoded to match the test dataset
PB_SQL_TRANSFORM_ADVERTISER_ID="43939335402485897" #synthensized id to test.
PB_CAMPAIGN_TABLE_NAME="p_Campaign_" + PB_SQL_TRANSFORM_ADVERTISER_ID
PB_CONVERSION_TABLE_NAME="p_Conversion_" + PB_SQL_TRANSFORM_ADVERTISER_ID

PB_TIMEZONE="America/New_York"

PB_REQUIRED_KEYS = [
    'conversionId',
    'conversionQuantity',
    'conversionRevenue',
    'conversionTimestamp',
    'conversionVisitExternalClickId',
]
PB_API_SCOPES = ['https://www.googleapis.com/auth/dfareporting',
              'https://www.googleapis.com/auth/dfatrafficking',
              'https://www.googleapis.com/auth/ddmconversions',
              'https://www.googleapis.com/auth/devstorage.read_write']
PB_CM360_API_NAME = 'dfareporting'
PB_CM360_API_VERSION = 'v3.5'

PB_BATCH_SIZE=100

# create a variable that you can pass to the bq Cell magic
# import the variables to the shell
import os
PB_all_args = [key for key in locals().keys() if not key.startswith('_')]
# print (PB_all_args)
PB_BQ_ARGS = {}
for PB_each_key in PB_all_args:
    # print (f"{PB_each_key}:{locals()[PB_each_key]}")
    if PB_each_key.upper().startswith(PB_SOLUTION_PREFIX.upper()):
      PB_BQ_ARGS[PB_each_key] = locals()[PB_each_key]
      os.environ[PB_each_key] = str(PB_BQ_ARGS[PB_each_key])
print (PB_BQ_ARGS)

## *Setup your Google Cloud project*

In [None]:
# set the desired Google Cloud project
!gcloud config set project $PB_GCP_PROJECT
import os
os.environ['GOOGLE_CLOUD_PROJECT'] = PB_GCP_PROJECT
# validate that the Google Cloud project has been set properly.
!echo 'gcloud will use the below project:'
!gcloud info --format='value(config.project)'

## *Authenticate with Google Cloud*

### Authenticate using ServiceAccount Key file

In [None]:
# download the ServiceAccount key and provide the path to the file below
# PB_GCP_APPLICATION_CREDENTIALS = "<Full path with the file name to the above downloaded json file>"
# PB_GCP_APPLICATION_CREDENTIALS = "/Users/dpani/Downloads/dpani-sandbox-2-3073195cd132.json"

# uncomment the below code in codelab environment
# authenticate using service account
# from google.colab import files
# # Upload service account key
# keyfile_upload = files.upload()
# PB_GCP_APPLICATION_CREDENTIALS = list(keyfile_upload.keys())[0]

# import os
# os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = PB_GCP_APPLICATION_CREDENTIALS
# # set the account
# !echo "Setting Service Account:" $PB_GCP_APPLICATION_CREDENTIALS
# !gcloud auth activate-service-account --key-file=$PB_GCP_APPLICATION_CREDENTIALS

### Authenticate using OAuth

In [None]:
# uncomment the below code in codelab environment
# authenticate using oauth
import sys
if 'google.colab' in sys.modules:
  from google.colab import auth as google_auth
  google_auth.authenticate_user()

## *Enable the below Google Cloud Services for the solution*

In [None]:
# set the proper Permission for the required Google Cloud Services
!gcloud services enable \
  bigquery.googleapis.com \
  bigquerystorage.googleapis.com \
  bigquerydatatransfer.googleapis.com \
  doubleclickbidmanager.googleapis.com \
  doubleclicksearch.googleapis.com \
  storage-api.googleapis.com 

# Utilities fuctions

## *Delete a dataset in BigQuery (DDL)*

In [None]:
# delete the BigQuery dataset...!!! BE CAREFUL !!!
def delete_dataset(dataset_id):
    """Deletes a BigQuery dataset
    This is not recommendated to use it in a production enviornment.
    Comes handy in the iterative development and testing phases of the SDLC.
    !!! BE CAREFUL !!!!
    Args:
        dataset_id(:obj:`str`): The BigQuery dataset name that we want to delete
    """
    # [START bigquery_delete_dataset]
    from google.cloud import bigquery
    # Construct a BigQuery client object.
    client = bigquery.Client()
    # dataset_id = 'your-project.your_dataset'
    # Use the delete_contents parameter to delete a dataset and its contents.
    # Use the not_found_ok parameter to not receive an error if the
    #     dataset has already been deleted.
    client.delete_dataset(
        dataset_id, delete_contents=True, not_found_ok=True
    )  # Make an API request.
    print("Deleted dataset '{}'.".format(dataset_id))

## *Delete a table in BigQuery (DDL)*

In [None]:
# delete BigQuery table if not needed...!!! BE CAREFUL !!!
def delete_table(table_id):
  """Deletes a BigQuery table
    This is not recommendated to use it in a production enviornment.
    Comes handy in the iterative development and testing phases of the SDLC.
    !!! BE CAREFUL !!!!
    Args:
      table_id(:obj:`str`): The BigQuery table name that we want to delete
  """
  from google.cloud import bigquery
  # Construct a BigQuery client object.
  client = bigquery.Client()
  # client.delete_table(table_id, not_found_ok=True)  # Make an API request.
  client.delete_table(table_id)  # Make an API request.
  print("Deleted table '{}'.".format(table_id))

## *Deletes a Service Account* 

In [None]:
# delete a service account
def delete_service_account(PB_GCP_PROJECT: str,
                 PB_ACCOUNT_NAME: str
                 ):
  """The function deletes a service account

    This is not recommendated to use it in a production enviornment.
    Comes handy in the iterative development and testing phases of the SDLC.
    !!! BE CAREFUL !!!!

    Args:
      PB_GCP_PROJECT:(:obj:`str`): Google Cloud project for deployment
      PB_ACCOUNT_NAME:(:obj:`str`): Name of the service account.
  """

  from googleapiclient import discovery
  from oauth2client.client import GoogleCredentials

  credentials = GoogleCredentials.get_application_default()

  service = discovery.build('iam', 'v1', credentials=credentials)

  # The resource name of the service account in the following format:
  # `projects/{PROJECT_ID}/serviceAccounts/{ACCOUNT}`.
  # Using `-` as a wildcard for the `PROJECT_ID` will infer the project from
  # the account. The `ACCOUNT` value can be the `email` address or the
  # `unique_id` of the service account.
  name = f'projects/{PB_GCP_PROJECT}/serviceAccounts/{PB_ACCOUNT_NAME}@{PB_GCP_PROJECT}.iam.gserviceaccount.com'

  print("Going to delete service account '{}'.".format(name))  
  request = service.projects().serviceAccounts().delete(name=name)
  request.execute() 
  print("Account deleted")

# Profit bid solution

## *Creates the Service Account and BigQuery DSs:* 
*   Service account (the same one used to push the conversion to the SA360/CM360)
*   BQ DS for SA360/CM360
*   BQ DS for Business data 


In [None]:
%%bash
# create the service account
# and add necessary iam roles
function get_roles {
  gcloud projects get-iam-policy ${PB_GCP_PROJECT} --flatten="bindings[].members" --format='table(bindings.role)' --filter="bindings.members:${PB_SA_EMAIL}"
}
function create_service_account {
  echo "Creating service account $PB_SA_EMAIL"
  gcloud iam service-accounts describe $PB_SA_EMAIL > /dev/null 2>&1
  RETVAL=$?
  if (( ${RETVAL} != "0" )); then
    gcloud iam service-accounts create ${PB_SERVICE_ACCOUNT_NAME} --description 'Profit Bidder Service Account' --project ${PB_GCP_PROJECT}
  fi
  for role in ${PB_SA_ROLES}; do
    echo -n "Adding ${PB_SERVICE_ACCOUNT_NAME} to ${role} "
    if get_roles | grep $role &> /dev/null; then
      echo "already added."
    else
      gcloud projects add-iam-policy-binding ${PB_GCP_PROJECT} --member="serviceAccount:${PB_SA_EMAIL}" --role="${role}"
      echo "added."
    fi
  done   
}
# Creates the service account and adds necessary permissions
create_service_account

function create_bq_ds {
  dataset=$1
  echo "Creating BQ dataset: '${dataset}'" 
  bq --project_id=${PB_GCP_PROJECT} show --dataset ${dataset} > /dev/null 2>&1
  RETVAL=$?
  if (( ${RETVAL} != "0" )); then
    bq --project_id=${PB_GCP_PROJECT} mk --dataset ${dataset}
  else
    echo "Reusing ${dataset}."
  fi
}
#create the BQ DSs
create_bq_ds $PB_DS_SA360
create_bq_ds $PB_DS_BUSINESS_DATA

## *Download the test data*
Test data is in 'solution_test' folder

In [None]:
%%bash
# Download the test data from gitrepo
DIR=$HOME/solutions/profit-bidder
if [ -d "$DIR" ]
then
  echo $DIR already exists.
else
  mkdir -p $HOME/solutions/profit-bidder
  cd $HOME/solutions/profit-bidder
  git clone https://github.com/google/profit-bidder.git .
fi
export PB_TEST_DATA_DIR=$DIR/solution_test
ls -ltrah $PB_TEST_DATA_DIR
echo $PB_TEST_DATA_DIR folder contains the test data.

## *Uploads Test data to BigQuery* 

In [None]:
%%bash
# uploades the test data into the BigQuery
function create_bq_table {
  dataset=$1
  table_name=$2
  schema_name=$3

  sql_result=$(list_bq_table $1 $2)
  echo "Creating BQ table: '${dataset}.${table_name}'" 
  if [[ "$sql_result" == *"1"* ]]; then
    echo "Reusing ${dataset}.${table_name}."
  else
    bq --project_id=${PB_GCP_PROJECT} mk -t --schema ${schema_name} --time_partitioning_type DAY ${dataset}.${table_name}
  fi  
}

function delete_bq_table {
  dataset=$1
  table_name=$2
  sql_result=$(list_bq_table $1 $2)
  echo "Deleting BQ table: '${dataset}.${table_name}'" 
  if [[ "$sql_result" == *"1"* ]]; then
    bq rm -f -t $PB_GCP_PROJECT:$dataset.$table_name
  else
    echo "${dataset}.${table_name} doesn't exists."
  fi  
}

function list_bq_table {
  dataset=$1
  table_name=$2
  echo "Checking BQ table exist: '${dataset}.${table_name}'" 
  sql_query='SELECT
    COUNT(1) AS cnt
  FROM 
    `<myproject>`.<mydataset>.__TABLES_SUMMARY__
  WHERE table_id = "<mytable_name>"'
  sql_query="${sql_query/<myproject>/${PB_GCP_PROJECT}}"
  sql_query="${sql_query/<mydataset>/${dataset}}"
  sql_query="${sql_query/<mytable_name>/${table_name}}"

  bq_qry_cmd="bq query --use_legacy_sql=false --format=csv '<mysql_qery>'"
  bq_qry_cmd="${bq_qry_cmd/<mysql_qery>/${sql_query}}"
  sql_result=$(eval $bq_qry_cmd)  
  if [[ "$sql_result" == *"1"* ]]; then
    echo "${dataset}.${table_name} exist"
    echo "1"
  else
    echo "${dataset}.${table_name} doesn't exist"
    echo "0"
  fi   
}

function load_bq_table {
  dataset=$1
  table_name=$2
  data_file=$3
  schema_name=$4
  sql_result=$(list_bq_table $1 $2)
  echo "Loading data to BQ table: '${dataset}.${table_name}'" 
  if [[ "$sql_result" == *"1"* ]]; then
    delete_bq_table $dataset $table_name
  fi  
  if [[ "$schema_name" == *"autodetect"* ]]; then
    bq --project_id=${PB_GCP_PROJECT} load \
    --autodetect \
    --source_format=CSV \
    $dataset.$table_name \
    $data_file 
  else
    create_bq_table $dataset $table_name $schema_name
    bq --project_id=${PB_GCP_PROJECT} load \
      --source_format=CSV \
      --time_partitioning_type=DAY \
      --skip_leading_rows=1 \
      ${dataset}.${table_name} \
      ${data_file}
  fi  
}

# save the current working dierctory
current_working_dir=`pwd`

# change to the test data directory
DIR=$HOME/solutions/profit-bidder
export PB_TEST_DATA_DIR=$DIR/solution_test
ls -ltrah $PB_TEST_DATA_DIR
echo $PB_TEST_DATA_DIR folder contains the test data.
cd $PB_TEST_DATA_DIR
pwd

# create campaign table
# load test data to campaign table
load_bq_table $PB_DS_SA360 $PB_CAMPAIGN_TABLE_NAME "p_Campaign_${PB_SQL_TRANSFORM_ADVERTISER_ID}.csv" "p_Campaign_schema.json"
# create conversion table
# load test data to conversion
load_bq_table $PB_DS_SA360 $PB_CONVERSION_TABLE_NAME "p_Conversion_${PB_SQL_TRANSFORM_ADVERTISER_ID}.csv" "${PB_TEST_DATA_DIR}/p_Conversion_schema.json"
# load test profit data
load_bq_table $PB_DS_BUSINESS_DATA $PB_CLIENT_MARGIN_DATA_TABLE_NAME "client_profit.csv" "autodetect"

# change to original working directory
cd $current_working_dir
pwd



## *Create a BigQuery client, import the libraries, load the bigquery Cell magic*

In [None]:
# create a BigQuery client
from google.cloud import bigquery
bq_client = bigquery.Client(project=PB_GCP_PROJECT)
# load the bigquery Cell magic
# %load_ext google.cloud.bigquery
%reload_ext google.cloud.bigquery

In [None]:
# test that BigQuery client works
sql = """
    SELECT name
    FROM `bigquery-public-data.usa_names.usa_1910_current`
    WHERE state = 'TX'
    LIMIT 100
"""

# Run a Standard SQL query using the environment's default project
df = bq_client.query(sql).to_dataframe()
df

## *Transform and aggregate* 

In [None]:
# The below query transforms the data from Campaign, Conversion, 
#   and profit tables.
aggregate_sql = f"""
-- Copyright 2021 Google LLC
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
--      http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.

-- ******    TEMPLATE CODE    ******
-- NOTE: Please thoroughly review and test your version of this query before launching your pipeline
-- The resulting data from this script should provide all the necessary columns for upload via 
-- the CM360 API and the SA360 API

-- 
-- the below placeholders must be replaced with appropriate values.
--      install.sh does so
-- project_id as: {PB_GCP_PROJECT}
-- sa360_dataset_name as: {PB_DS_SA360}
-- advertiser_id as: {PB_SQL_TRANSFORM_ADVERTISER_ID}
-- timezone as: America/New_York e.g. America/New_York
-- floodlight_name as: My Sample Floodlight Activity
-- account_type as: Other engines
-- gmc_dataset_name as: pb_gmc_data
-- gmc_account_id as: mygmc_account_id
-- business_dataset_name as: {PB_DS_BUSINESS_DATA}
-- client_margin_data_table as: {PB_CLIENT_MARGIN_DATA_TABLE_NAME}
-- client_profit_data_sku_col as: sku
-- client_profit_data_profit_col as: profit
-- target_floodlight_name as: My Sample Floodlight Activity
-- product_sku_var as: u9
-- product_quantity_var as: u10
-- product_unit_price_var as: u11
-- product_sku_regex as: (.*?);
-- product_quantity_regex as: (.*?);
-- product_unit_price_regex as: (.*?);
-- product_sku_delim as: |
-- product_quantity_delim as: |
-- product_unit_price_delim as: |
-- 

WITH
campaigns AS (
    -- Example: Extracting all campaign names and IDs if needed for filtering for
    -- conversions for a subset of campaigns
    SELECT
        campaign,
        campaignId,
        row_number() OVER (partition BY campaignId ORDER BY lastModifiedTimestamp DESC) as row_num -- for de-duping
    FROM `{PB_GCP_PROJECT}.{PB_DS_SA360}.p_Campaign_{PB_SQL_TRANSFORM_ADVERTISER_ID}`
    -- Be sure to replace the Timezone with what is appropriate for your use case
    WHERE EXTRACT(DATE FROM _PARTITIONTIME) >= DATE_SUB(CURRENT_DATE('America/New_York'), INTERVAL 7 DAY)
)
,expanded_conversions AS (
    -- Parses out all relevant product data from a conversion request string
    SELECT
        conv.*,
        campaign,
        -- example of U-Variables that are parsed to extract product purchase data
        SPLIT(REGEXP_EXTRACT(floodlightEventRequestString, "u9=(.*?);"),"|") AS u9,
        SPLIT(REGEXP_EXTRACT(floodlightEventRequestString, "u10=(.*?);"),"|") AS u10,
        SPLIT(REGEXP_EXTRACT(floodlightEventRequestString, "u11=(.*?);"),"|") AS u11,
    FROM `{PB_GCP_PROJECT}.{PB_DS_SA360}.p_Conversion_{PB_SQL_TRANSFORM_ADVERTISER_ID}` AS conv
    LEFT JOIN (
        SELECT campaign, campaignId
        FROM campaigns
        WHERE row_num = 1
        GROUP BY 1,2
    ) AS camp
    USING (campaignId)
    WHERE
        -- Filter for conversions that occured in the previous day
        -- Be sure to replace the Timezone with what is appropriate for your use case
        floodlightActivity IN ('My Sample Floodlight Activity')
        AND accountType = 'Other engines' -- filter by Account Type as needed
)
,flattened_conversions AS (
    -- Flattens the extracted product data for each conversion which leaves us with a row
    -- of data for each product purchased as part of a given conversion
    SELECT
        advertiserId,
        campaignId,
        conversionId,
        skuId,
        pos1,
        quantity,
        pos2,
        cost,
        pos3
    FROM expanded_conversions,
    UNNEST(expanded_conversions.u9) AS skuId WITH OFFSET pos1,
    UNNEST(expanded_conversions.u10) AS quantity WITH OFFSET pos2,
    UNNEST(expanded_conversions.u11) AS cost WITH OFFSET pos3
    WHERE pos1 = pos2 AND pos1 = pos3 AND skuId != ''
    GROUP BY 1,2,3,4,5,6,7,8,9
    ORDER BY conversionId
)
,inject_gmc_margin AS (
    -- Merges Margin data with the products found in the conversion data
    SELECT 
        advertiserId,
        campaignId,
        conversionId,
        skuId,
        quantity,
        IF(cost = '', '0', cost) as cost,
        pos1,
        pos2,
        pos3,
        -- PLACEHOLDER MARGIN, X% for unclassified items
        CASE
        WHEN profit IS NULL THEN 0.0
        ELSE profit
        END AS margin,
        sku,
    FROM flattened_conversions
    LEFT JOIN `{PB_GCP_PROJECT}.{PB_DS_BUSINESS_DATA}.{PB_CLIENT_MARGIN_DATA_TABLE_NAME}`
    ON flattened_conversions.skuId = sku
group by 1,2,3,4,5,6,7,8,9,10,11
)
,all_conversions as (
    -- Rolls up all previously expanded conversion data while calculating profit based on the matched 
    -- margin value. Also assigns timestamp in millis and micros 
    SELECT
        e.account,
        e.accountId,
        e.accountType,
        e.advertiser,
        igm.advertiserId,
        e.agency,
        e.agencyId,
        igm.campaignId,
        e.campaign,
        e.conversionAttributionType,
        e.conversionDate,
        -- '00' may be changed to any string value that will help you identify these
        -- new conversions in reporting
        CONCAT(igm.conversionId, '00') as conversionId,
        e.conversionLastModifiedTimestamp,
        -- Note:Rounds float quantity and casts to INT, change based on use case
        -- This is done to support CM360 API
        CAST(ROUND(e.conversionQuantity) AS INT64) AS conversionQuantity,
        e.conversionRevenue,
        SUM(
            FLOOR(CAST(igm.cost AS FLOAT64))
        ) AS CALCULATED_REVENUE,
        -- PROFIT CALCULATED HERE, ADJUST LOGIC AS NEEDED FOR YOUR USE CASE
        ROUND(
            SUM(
                -- multiply item cost by class margin
                SAFE_MULTIPLY(
                    CAST(igm.cost AS FLOAT64),
                    igm.margin)
            ),2
        ) AS CALCULATED_PROFIT,
        e.conversionSearchTerm,
        e.conversionTimestamp,
        -- SA360 timestamp should be in millis
        UNIX_MILLIS(e.conversionTimestamp) as conversionTimestampMillis,
        -- CM360 Timestamp should be in micros
        UNIX_MICROS(e.conversionTimestamp) as conversionTimestampMicros,
        e.conversionType,
        e.conversionVisitExternalClickId,
        e.conversionVisitId,
        e.conversionVisitTimestamp,
        e.deviceSegment,
        e.floodlightActivity,
        e.floodlightActivityId,
        e.floodlightActivityTag,
        e.floodlightEventRequestString,
        e.floodlightOrderId,
        e.floodlightOriginalRevenue,
        status
    FROM inject_gmc_margin AS igm
    LEFT JOIN expanded_conversions AS e
    ON igm.advertiserID = e.advertiserId AND igm.campaignId = e.campaignID AND igm.conversionId = e.conversionId
    GROUP BY 1,2,3,4,5,6,8,7,9,10,11,12,13,14,15,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33
)
-- The columns below represent the original conversion data with their new profit
-- values calculated (assigned to conversionRevenue column) along with any original 
-- floofdlight data that the client wishes to keep for trouble shooting.
SELECT 
    account,
    accountId,
    accountType,
    advertiser,
    advertiserId,
    agency,
    agencyId,
    campaignId,
    campaign,
    conversionId,
    conversionAttributionType,
    conversionDate,
    conversionTimestamp,
    conversionTimestampMillis,
    conversionTimestampMicros,
    CALCULATED_PROFIT AS conversionRevenue,
    conversionQuantity,
    -- The below is used only troublehsooting purpose.
    "My Sample Floodlight Activity" AS floodlightActivity,
    conversionSearchTerm,
    conversionType,
    conversionVisitExternalClickId,
    conversionVisitId,
    conversionVisitTimestamp,
    deviceSegment,
    CALCULATED_PROFIT,
    CALCULATED_REVENUE,
    -- Please prefix any original conversion values you wish to keep with "original". 
    -- These values may help with troubleshooting
    conversionRevenue AS originalConversionRevenue,
    floodlightActivity AS originalFloodlightActivity,
    floodlightActivityId AS originalFloodlightActivityId,
    floodlightActivityTag AS originalFloodlightActivityTag,
    floodlightOriginalRevenue AS originalFloodlightRevenue,
    floodlightEventRequestString,
    floodlightOrderId
FROM all_conversions
WHERE CALCULATED_PROFIT > 0.0
ORDER BY account ASC
"""
# execute the transform query 
df = bq_client.query(aggregate_sql).to_dataframe()
# print a couple of records of the transformed query
df.head()

In [None]:
# write the data to a table
df.to_gbq(f'{PB_DS_BUSINESS_DATA}.{PB_CM360_TABLE}', 
          project_id=PB_GCP_PROJECT,
          if_exists='replace', 
          progress_bar=True,)

## *Formulate the payload and push to CM360* 

In [None]:
# Reads the from transformed table, chunks the data, 
#   and uploads the data to CM360
# We need to chunk the data so as to adhere 
#   to the payload limit of the CM360 REST API.
import pytz
import datetime
import decimal
import logging
import json
import google.auth
import google.auth.impersonated_credentials
import google_auth_httplib2
from googleapiclient import discovery

def today_date(timezone):
    """Returns today's date using the timezone
    Args:
        timezone(:obj:`str`): The timezone with default to America/New_York
    Returns:
      Date: today's date
    """
    tz = pytz.timezone(timezone)
    return datetime.datetime.now(tz).date()

def time_now_str(timezone):
    """Returns today's date using the timezone
    Args:
        timezone(:obj:`str`): The timezone with default to America/New_York
    Returns:
      Timezone: current timezone
    """
    # set correct timezone for datetime check
    tz = pytz.timezone(timezone)
    return datetime.datetime.now(tz).strftime("%m-%d-%Y, %H:%M:%S")

def pluralize(count):
    """An utility function 
    Args:
        count(:obj:`int`): A number
    Returns:
      str: 's' or empty
    """
    if count > 1:
        return 's'
    return ''  

def get_data(table_ref_name, cloud_client, batch_size):
    """Returns the data from the transformed table.
    Args:
        table_ref_name(:obj:`google.cloud.bigquery.table.Table`): Reference to the table
        cloud_client(:obj:`google.cloud.bigquery.client.Client`): BigQuery client
        batch_size(:obj:`int`): Batch size
    Returns:
      Array[]: list/rows of data
    """

    current_batch = []
    table = cloud_client.get_table(table_ref_name)
    print(f'Downloading {table.num_rows} rows from table {table_ref_name}')
    skip_stats = {}
    for row in cloud_client.list_rows(table_ref_name):
        missing_keys = []
        for key in PB_REQUIRED_KEYS:
            val = row.get(key)
            if val is None:
                missing_keys.append(key)
                count = skip_stats.get(key, 0)
                count += 1
                skip_stats[key] = count
        if len(missing_keys) > 0:
            row_as_dict = dict(row.items())
            logging.debug(f'Skipped row: missing values for keys {missing_keys} in row {row_as_dict}')
            continue
        result = {}
        conversionTimestamp = row.get('conversionTimestamp')
        # convert floating point seconds to microseconds since the epoch
        result['conversionTimestampMicros'] = int(conversionTimestamp.timestamp() * 1_000_000)
        for key in row.keys():
            value = row.get(key)
            if type(value) == datetime.datetime or type(value) == datetime.date:
                result[key] = value.strftime("%y-%m-%d ")
            elif type(value) == decimal.Decimal:
                result[key] = float(value)
            else:
                result[key] = value
        current_batch.append(result)
        if len(current_batch) >= batch_size:
            yield current_batch
            current_batch = []
    if len(current_batch) > 0:
        yield current_batch
    pretty_skip_stats = ', '.join([f'{val} row{pluralize(val)} missing key "{key}"' for key, val in skip_stats.items()])
    logging.info(f'Processed {table.num_rows} from table {table_ref_name} skipped {pretty_skip_stats}')

def setup(sa_email, api_scopes, api_name, api_version):
    """Impersonates a service account, authenticate with Google Service,
      and returns a discovery api for further communication with Google Services.
    Args:
        sa_email(:obj:`str`): Service Account to impersonate
        api_scopes(:obj:`Any`): An array of scope that the service account 
          expectes to have permission in the CM360
        api_name(:obj:`str`): CM360 API Name
        api_version(:obj:`str`): CM360 API version
    Returns:
      module:discovery: to interact with Goolge Services.
    """

    source_credentials, project_id = google.auth.default()

    target_credentials = google.auth.impersonated_credentials.Credentials(
        source_credentials=source_credentials,
        target_principal=sa_email,
        target_scopes=api_scopes,
        delegates=[],
        lifetime=500)

    http = google_auth_httplib2.AuthorizedHttp(target_credentials)
    # setup API service here
    try: 
      return discovery.build(
          api_name,
          api_version,
          cache_discovery=False,
          http=http)
    except:
        print('Could not authenticate')    


def upload_data(timezone, rows, profile_id, fl_configuration_id, fl_activity_id):
    """POSTs the conversion data using CM360 API
    Args:
        timezone(:obj:`Timezone`): Current timezone or defaulted to America/New_York 
        rows(:obj:`Any`): An array of conversion data
        profile_id(:obj:`str`): Profile id - should be gathered from the CM360
        fl_configuration_id(:obj:`str`): Floodlight config id - should be gathered from the CM360
        fl_activity_id(:obj:`str`): Floodlight activity id - should be gathered from the CM360
    """
  
    print('Starting conversions for ' + time_now_str(timezone))
    if not fl_activity_id or not fl_configuration_id:
        print('Please make sure to provide a value for both floodlightActivityId and floodlightConfigurationId!!')
        return
    # Build the API connection
    try:       
      service = setup(PB_SA_EMAIL, PB_API_SCOPES, 
                      PB_CM360_API_NAME,  PB_CM360_API_VERSION)
      # upload_log = ''
      print('Authorization successful')
      currentrow = 0
      all_conversions = """{"kind": "dfareporting#conversionsBatchInsertRequest", "conversions": ["""
      while currentrow < len(rows):
          for row in rows[currentrow:min(currentrow+100, len(rows))]:
              conversion = json.dumps({
                  'kind': 'dfareporting#conversion',
                  'gclid': row['conversionVisitExternalClickId'],
                  'floodlightActivityId': fl_activity_id, # (Use short form CM Floodlight Activity Id )
                  'floodlightConfigurationId': fl_configuration_id, # (Can be found in CM UI)
                  'ordinal': row['conversionId'],
                  'timestampMicros': row['conversionTimestampMicros'],
                  'value': row['conversionRevenue'],
                  'quantity': row['conversionQuantity'] #(Alternatively, this can be hardcoded to 1)
              })
              # print('Conversion: ', conversion) # uncomment if you want to output each conversion
              all_conversions = all_conversions + conversion + ','
          all_conversions = all_conversions[:-1] + ']}'
          payload = json.loads(all_conversions)
          print(f'CM360 request payload: {payload}')
          request = service.conversions().batchinsert(profileId=profile_id, body=payload)
          print('[{}] - CM360 API Request: '.format(time_now_str()), request)
          response = request.execute()
          print('[{}] - CM360 API Response: '.format(time_now_str()), response)
          if not response['hasFailures']:
              print('Successfully inserted batch of 100.')
          else:
              status = response['status']
              for line in status:
                  try:
                      if line['errors']:
                          for error in line['errors']:
                              print('Error in line ' + json.dumps(line['conversion']))
                              print('\t[%s]: %s' % (error['code'], error['message']))
                  except:
                      print('Conversion with gclid ' + line['gclid'] + ' inserted.')
          print('Either finished or found errors.')
          currentrow += 100
          all_conversions = """{"kind": "dfareporting#conversionsBatchInsertRequest", "conversions": ["""
    except:
        print('Could not authenticate')    

def partition_and_distribute(cloud_client, table_ref_name, batch_size, timezone, 
                             profile_id, fl_configuration_id, fl_activity_id):
    """Partitions the data to chunks of batch size and
        uploads to the CM360
    Args:
        table_ref_name(:obj:`google.cloud.bigquery.table.Table`): Reference to the table
        cloud_client(:obj:`google.cloud.bigquery.client.Client`): BigQuery client
        batch_size(:obj:`int`): Batch size
        timezone(:obj:`Timezone`): Current timezone or defaulted to America/New_York 
        profile_id(:obj:`str`): Profile id - should be gathered from the CM360
        fl_configuration_id(:obj:`str`): Floodlight config id - should be gathered from the CM360
        fl_activity_id(:obj:`str`): Floodlight activity id - should be gathered from the CM360
    """
    for batch in get_data(table_ref_name, cloud_client, batch_size):
        # print(f'Batch size: {len(batch)} batch: {batch}')
        upload_data(timezone, batch, profile_id, fl_configuration_id, 
                    fl_activity_id)
        # DEBUG BREAK!
        if batch_size == 1:
            break

try: 
    table = bq_client.get_table(f'{PB_DS_BUSINESS_DATA}.{PB_CM360_TABLE}')
except:
    print ('Could not find table with the provided table name: {}.'.format(f'{PB_DS_BUSINESS_DATA}.{PB_CM360_TABLE}'))    
    table = None

todays_date = today_date(PB_TIMEZONE)

if table is not None:
    table_ref_name = table.full_table_id.replace(':', '.')
    if table.modified.date() == todays_date or table.created.date() == todays_date:
        print('[{}] is up-to-date. Continuing with upload...'.format(table_ref_name))
        partition_and_distribute(bq_client, table_ref_name, PB_BATCH_SIZE,
                                 PB_TIMEZONE, PB_CM360_PROFILE_ID, 
                                 PB_CM360_FL_CONFIG_ID, PB_CM360_FL_ACTIVITY_ID) 
    else:
        print('[{}] data may be stale. Please check workflow to verfiy that it has run correctly. Upload is aborted!'.format(table_ref_name))
else:
    print('Table not found! Please double check your workflow for any errors.')

# Clean up - !!! BE CAREFUL!!!

## Delete the transformed table

In [None]:
# deletes the transformed table
delete_table(f'{PB_DS_BUSINESS_DATA}.{PB_CM360_TABLE}')

## Delete the SA and BQ DSs:
*   Service account (the same one used to push the conversion to the SA360/CM360)
*   BQ DS for SA360/CM360
*   BQ DS for Business data 


In [None]:
# deletes the service account
delete_service_account(PB_GCP_PROJECT, PB_SERVICE_ACCOUNT_NAME)
# deletes the dataset
delete_dataset(PB_DS_SA360)
delete_dataset(PB_DS_BUSINESS_DATA)

## Delete the Google Cloud Project
To avoid incurring charges to your Google Cloud Platform account for the resources used in this tutorial is to **Delete the project**.

The easiest way to eliminate billing is to delete the project you created for the tutorial.

**Caution**: Deleting a project has the following effects:
* *Everything in the project is deleted.* If you used an existing project for this tutorial, when you delete it, you also delete any other work you've done in the project.
* <b>Custom project IDs are lost. </b>When you created this project, you might have created a custom project ID that you want to use in the future. To preserve the URLs that use the project ID, such as an appspot.com</b> URL, delete selected resources inside the project instead of deleting the whole project. 

If you plan to explore multiple tutorials and quickstarts, reusing projects can help you avoid exceeding project quota limits.
<br>
<ol type="1">
    <li>In the Cloud Console, go to the <b>Manage resources</b> page.</li>
    Go to the <a href="https://console.cloud.google.com/iam-admin/projects">Manage resources page</a>
    <li>In the project list, select the project that you want to delete and then click <b>Delete</b> Trash icon.</li>
    <li>In the dialog, type the project ID and then click <b>Shut down</b> to delete the project. </li>
</ol>
