# Upload Raw GA360 Data to BigQuery and GCS

In [1]:
%load_ext autoreload
%autoreload 2

::: {.content-hidden}
Import necessary Python modules
:::

In [2]:
import os
import sys
from datetime import datetime, timedelta
from glob import glob
from time import time
from typing import Any, Dict

import pandas as pd
import pytz
from google.cloud import bigquery, storage
from google.oauth2 import service_account

::: {.content-hidden}
Get relative path to project root directory
:::

In [3]:
PROJ_ROOT_DIR = os.path.join(os.pardir)
src_dir = os.path.join(PROJ_ROOT_DIR, "src")
sys.path.append(src_dir)

::: {.content-hidden}
Import custom Python modules
:::

In [4]:
%aimport bigquery_auth_helpers
from bigquery_auth_helpers import auth_to_bigquery

%aimport sql_helpers
import sql_helpers as sqlh

%aimport transform_helpers
import transform_helpers as th

Show datatypes and number of missing values for all columns in a `DataFrame`

In [5]:
def summarize_df(df: pd.DataFrame) -> None:
    """Show datatypes and count missing values in columns of DataFrame."""
    display(
        df.dtypes.rename("dtype")
        .to_frame()
        .merge(
            df.isna().sum().rename("missing").to_frame(),
            left_index=True,
            right_index=True,
            how="left",
        )
        .reset_index()
        .rename(columns={"index": "column"})
    )

## About

## User Inputs

Define the following

1. list of nested columns in raw GA360 tracking data that is accessible in the sample merchandise store dataset
2. GCP
   - dataset id
   - table id
   - bucket name
3. start and (exclusive) end index of local raw data data files to be uploaded to BigQuery table

In [6]:
#| echo: true
# 1. list of nested columns in raw data
nested_cols = [
    'totals',
    'trafficSource',
    'device',
    'geoNetwork',
    'customDimensions',
    'hits',
]

# 2. GCP resources
gbq_dataset_id = 'mydemo2asdf'
gbq_table_id = 'ecwa'
bucket_name = 'ecwa-raw'

# 3. subset of local files to be uploaded to BigQuery table
start_idx_upload = 28
end_idx_upload = 55

::: {.content-hidden}
Get path to data sub-folders
:::

In [7]:
data_dir = os.path.join(PROJ_ROOT_DIR, "data")
raw_data_dir = os.path.join(data_dir, "raw")

Set datatypes for non-nested fields in raw GA360 tracking data

In [8]:
dtypes_dict_raw = {
    "visitorId": pd.Int64Dtype(),
    "visitNumber": pd.Int64Dtype(),
    "visitId": pd.Int64Dtype(),
    "visitStartTime": pd.Int64Dtype(),
    "date": pd.StringDtype(),
    "fullVisitorId": pd.StringDtype(),
    "userId": pd.StringDtype(),
    "channelGrouping": pd.StringDtype(),
    "socialEngagementType": pd.StringDtype(),
}

::: {.content-hidden}
Load Google Cloud authentication credentials for use with the native BigQuery Python client
:::

In [9]:
gcp_proj_id = os.environ["GCP_PROJECT_ID"]
gcp_creds_fpath = glob(os.path.join(raw_data_dir, "*.json"))[0]
gcp_creds = service_account.Credentials.from_service_account_file(
    gcp_creds_fpath
)

::: {.content-hidden}
Get fully resolved name of the BigQuery table
:::

In [10]:
gbq_table_fully_resolved = f"{gcp_proj_id}.{gbq_dataset_id}.{gbq_table_id}"

::: {.content-hidden}
Get path to all raw data files that were previously exported
:::

In [11]:
raw_data_files = sorted(glob(f"{raw_data_dir}/ga_data/d2i_{gbq_table_id}*.parquet.gzip"))

::: {.content-hidden}
Select subset of local raw data data files to be uploaded to BigQuery table
:::

In [12]:
raw_data_files_upload = raw_data_files[start_idx_upload: end_idx_upload]

::: {.content-hidden}
Create authenticated native BigQuery Python client
:::

In [13]:
client = bigquery.Client(project=gcp_proj_id, credentials=gcp_creds)

::: {.content-hidden}
Create authenticated native GCS Python client
:::

In [14]:
storage_client = storage.Client(project=gcp_proj_id, credentials=gcp_creds)

::: {.content-hidden}
Authenticate to `BigQuery` using Google Cloud Service Account, in order to query BigQuery table using `pandas` (which uses the `pandas_gbq` Python library)
:::

In [15]:
gcp_auth_dict = auth_to_bigquery(raw_data_dir)

## Read Data from Local Disk

### Compare Raw Data on Local Disk to BigQuery Public Dataset

::: {.content-hidden}
Get summary of raw data in local GA360 files
:::

In [16]:
df_file_visits_summary = (
    pd.DataFrame.from_records(
        [
            {
                "filepath": f,
                "size_mb": os.stat(f).st_size/1_000_000,
                "date_start": datetime.strptime(
                    os.path.basename(f).split("ecwa__")[-1].split("__", 2)[0],
                    "%Y%m%d",
                ),
                "date_end": datetime.strptime(
                    os.path.basename(f).split("ecwa__")[-1].split("__", 2)[1],
                    "%Y%m%d",
                ),
                "num_rows": len(pd.read_parquet(f, columns=['fullVisitorId'])),
                "num_unique_visitors": (
                    pd.read_parquet(f, columns=['fullVisitorId'])
                    .squeeze()
                    .nunique()
                )
            }
            for f in raw_data_files
        ]
    )
    .assign(num_days=lambda df: (df['date_end']-df['date_start']).dt.days+1)
)
total_visits, total_size, start_date, end_date, num_days = [
    df_file_visits_summary['num_rows'].sum(),
    df_file_visits_summary['size_mb'].sum(),
    df_file_visits_summary['date_start'].min().strftime('%Y-%m-%d'),
    df_file_visits_summary['date_start'].max().strftime('%Y-%m-%d'),
    df_file_visits_summary['num_days'].sum(),
]
num_unique_visitors = (
    len(
        pd.concat(
            [
                pd.read_parquet(f, columns=['fullVisitorId'])
                for f in raw_data_files
            ]
        ).drop_duplicates()
    )
)
print(
    f"Found {total_visits:,} visits covering "
    f"{num_unique_visitors:,} unique visitors "
    f"from {start_date} to {end_date} ({num_days:,} days) "
    f"in {len(df_file_visits_summary)} GA360 "
    f"raw data files taking up {total_size:.1f} MB of disk space"
)
with pd.option_context('display.max_colwidth', None):
    display(df_file_visits_summary)

Found 970,532 visits covering 741,721 unique visitors from 2016-08-01 to 2017-08-01 (366 days) in 55 GA360 raw data files taking up 225.4 MB of disk space


Unnamed: 0,filepath,size_mb,date_start,date_end,num_rows,num_unique_visitors,num_days
0,../data/raw/ga_data/d2i_ecwa__20160801__20160807__20230611_200354.parquet.gzip,4.660864,2016-08-01,2016-08-07,15889,13923,7
1,../data/raw/ga_data/d2i_ecwa__20160808__20160814__20230611_200726.parquet.gzip,5.095205,2016-08-08,2016-08-14,17106,14811,7
2,../data/raw/ga_data/d2i_ecwa__20160815__20160821__20230611_201100.parquet.gzip,5.052166,2016-08-15,2016-08-21,17213,14981,7
3,../data/raw/ga_data/d2i_ecwa__20160822__20160828__20230611_201420.parquet.gzip,4.852887,2016-08-22,2016-08-28,16199,14161,7
4,../data/raw/ga_data/d2i_ecwa__20160829__20160831__20230611_201631.parquet.gzip,2.51759,2016-08-29,2016-08-31,8352,7497,3
5,../data/raw/ga_data/d2i_ecwa__20160901__20160907__20230611_110546.parquet.gzip,4.185168,2016-09-01,2016-09-07,15928,14175,7
6,../data/raw/ga_data/d2i_ecwa__20160908__20160914__20230611_110929.parquet.gzip,4.501941,2016-09-08,2016-09-14,16086,14080,7
7,../data/raw/ga_data/d2i_ecwa__20160915__20160921__20230611_111509.parquet.gzip,5.266326,2016-09-15,2016-09-21,17396,15027,7
8,../data/raw/ga_data/d2i_ecwa__20160922__20160928__20230611_111820.parquet.gzip,4.95172,2016-09-22,2016-09-28,16722,14565,7
9,../data/raw/ga_data/d2i_ecwa__20160929__20160930__20230611_200041.parquet.gzip,1.411386,2016-09-29,2016-09-30,4900,4431,2


::: {.content-hidden}
Get summary of raw data in BigQuery public dataset
:::

In [17]:
query_infer = f"""
              SELECT COUNT(fullVisitorId) AS num_visits,
                     COUNT(DISTINCT(fullVisitorId)) AS num_unique_visitors,
                     MIN(date) AS date_start,
                     MAX(date) AS date_end,
                     DATE_DIFF(PARSE_DATE('%Y%m%d', MAX(date)), PARSE_DATE('%Y%m%d', MIN(date)), DAY)+1 as num_days
              FROM `data-to-insights.ecommerce.web_analytics`
              """
df_all = th.extract_data(query_infer, gcp_auth_dict)
df_all

Query execution start time = 2023-06-12 16:27:54.551...done at 2023-06-12 16:27:55.822 (1.271 seconds).
Query returned 1 rows


Unnamed: 0,num_visits,num_unique_visitors,date_start,date_end,num_days
0,970532,741721,20160801,20170801,366


::: {.content-hidden}
Verify that the following summary attributes in local GA360 files are identical to those in BigQuery public dataset

1. number of visits (number of rows)
2. number of visitors
3. earliest visit date
4. latest visit date
5. number of days covered by visits
:::

In [18]:
assert df_all['num_visits'].squeeze() == total_visits
assert df_all['num_unique_visitors'].squeeze() == num_unique_visitors
assert df_all['date_start'].squeeze() == start_date.replace("-", "")
assert df_all['date_end'].squeeze() == end_date.replace("-", "")
assert df_all['num_days'].squeeze() == num_days

## Upload Data to Storage Bucket

### Create Bucket

In [19]:
#| echo: true
try:
    bucket = storage_client.create_bucket(bucket_name, location='us')
    print(f"Created bucket {bucket_name}")
except Exception as e:
    if "you already own it" in str(e):
        print(f"Bucket {bucket_name} already exists")
    else:
        print(f"Got response: {str(e)}")

Created bucket ecwa-raw


### Upload Raw Data Files to Bucket

In [20]:
bucket = storage_client.bucket(bucket_name)
for k, raw_source_ga_filepath in enumerate(raw_data_files):
    destination_blob_name = os.path.basename(raw_source_ga_filepath)
    blob = bucket.blob(destination_blob_name)
    start_time = datetime.now(pytz.timezone("US/Eastern"))
    start_time_str = start_time.strftime("%Y-%m-%d %H:%M:%S.%f")
    print(
        (
            f"{k+1}/{len(raw_data_files)} | "
            f"{os.path.basename(raw_source_ga_filepath)}: "
            f"Upload execution start time = {start_time_str[:-3]}..."
        ),
        end=""
    )
    blob.upload_from_filename(
        raw_source_ga_filepath,
        if_generation_match=0,
    )
    end_time = datetime.now(pytz.timezone("US/Eastern"))
    end_time_str = end_time.strftime("%Y-%m-%d %H:%M:%S.%f")
    duration = end_time - start_time
    duration = duration.seconds + (duration.microseconds / 1_000_000)
    print(f"done at {end_time_str[:-3]} ({duration:.3f} seconds).")

1/55 | d2i_ecwa__20160801__20160807__20230611_200354.parquet.gzip: Upload execution start time = 2023-06-12 14:42:20.334...done at 2023-06-12 14:42:20.728 (0.394 seconds).
2/55 | d2i_ecwa__20160808__20160814__20230611_200726.parquet.gzip: Upload execution start time = 2023-06-12 14:42:20.728...done at 2023-06-12 14:42:21.177 (0.449 seconds).
3/55 | d2i_ecwa__20160815__20160821__20230611_201100.parquet.gzip: Upload execution start time = 2023-06-12 14:42:21.178...done at 2023-06-12 14:42:21.545 (0.367 seconds).
4/55 | d2i_ecwa__20160822__20160828__20230611_201420.parquet.gzip: Upload execution start time = 2023-06-12 14:42:21.545...done at 2023-06-12 14:42:21.976 (0.431 seconds).
5/55 | d2i_ecwa__20160829__20160831__20230611_201631.parquet.gzip: Upload execution start time = 2023-06-12 14:42:21.976...done at 2023-06-12 14:42:22.352 (0.376 seconds).
6/55 | d2i_ecwa__20160901__20160907__20230611_110546.parquet.gzip: Upload execution start time = 2023-06-12 14:42:22.352...done at 2023-06-1

### List Files in Bucket

In [21]:
blobs = list(storage_client.list_blobs(bucket_name))
if blobs:
    df_blobs = (
        pd.DataFrame.from_records(
            [
                {
                    "name": b.name,
                    "bucket": b.bucket.name,
                    "size_mb": b.size / 1_000_000,
                    "id": b.id,
                    "time_created": b.time_created,
                }
                for b in blobs
            ]
        )
        .assign(time_created=lambda df: df['time_created'].dt.tz_convert('US/Eastern'))
    )
    print(
        f"Showing {len(df_blobs):,} files found in bucket {bucket_name}, "
        f"taking up {df_blobs['size_mb'].sum():.1f} MB of space"
    )
    with pd.option_context('display.max_colwidth', None):
        display(df_blobs)
else:
    print(f"Found no files in bucket {bucket_name}")

Showing 55 files found in bucket ecwa-raw, taking up 225.4 MB of space


Unnamed: 0,name,bucket,size_mb,id,time_created
0,d2i_ecwa__20160801__20160807__20230611_200354.parquet.gzip,ecwa-raw,4.660864,ecwa-raw/d2i_ecwa__20160801__20160807__20230611_200354.parquet.gzip/1686595340628697,2023-06-12 14:42:20.667000-04:00
1,d2i_ecwa__20160808__20160814__20230611_200726.parquet.gzip,ecwa-raw,5.095205,ecwa-raw/d2i_ecwa__20160808__20160814__20230611_200726.parquet.gzip/1686595341075283,2023-06-12 14:42:21.115000-04:00
2,d2i_ecwa__20160815__20160821__20230611_201100.parquet.gzip,ecwa-raw,5.052166,ecwa-raw/d2i_ecwa__20160815__20160821__20230611_201100.parquet.gzip/1686595341443138,2023-06-12 14:42:21.483000-04:00
3,d2i_ecwa__20160822__20160828__20230611_201420.parquet.gzip,ecwa-raw,4.852887,ecwa-raw/d2i_ecwa__20160822__20160828__20230611_201420.parquet.gzip/1686595341878293,2023-06-12 14:42:21.916000-04:00
4,d2i_ecwa__20160829__20160831__20230611_201631.parquet.gzip,ecwa-raw,2.51759,ecwa-raw/d2i_ecwa__20160829__20160831__20230611_201631.parquet.gzip/1686595342243319,2023-06-12 14:42:22.289000-04:00
5,d2i_ecwa__20160901__20160907__20230611_110546.parquet.gzip,ecwa-raw,4.185168,ecwa-raw/d2i_ecwa__20160901__20160907__20230611_110546.parquet.gzip/1686595342665193,2023-06-12 14:42:22.703000-04:00
6,d2i_ecwa__20160908__20160914__20230611_110929.parquet.gzip,ecwa-raw,4.501941,ecwa-raw/d2i_ecwa__20160908__20160914__20230611_110929.parquet.gzip/1686595343071890,2023-06-12 14:42:23.116000-04:00
7,d2i_ecwa__20160915__20160921__20230611_111509.parquet.gzip,ecwa-raw,5.266326,ecwa-raw/d2i_ecwa__20160915__20160921__20230611_111509.parquet.gzip/1686595343503050,2023-06-12 14:42:23.550000-04:00
8,d2i_ecwa__20160922__20160928__20230611_111820.parquet.gzip,ecwa-raw,4.95172,ecwa-raw/d2i_ecwa__20160922__20160928__20230611_111820.parquet.gzip/1686595343919037,2023-06-12 14:42:23.962000-04:00
9,d2i_ecwa__20160929__20160930__20230611_200041.parquet.gzip,ecwa-raw,1.411386,ecwa-raw/d2i_ecwa__20160929__20160930__20230611_200041.parquet.gzip/1686595344265518,2023-06-12 14:42:24.305000-04:00


## Upload Data to BigQuery Table

### Create Empty BigQuery Table

In [22]:
#| echo: true
table = bigquery.Table(gbq_table_fully_resolved)
try:
    table = client.create_table(table)
    print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}")
except Exception as e:
    if "Already Exists" in str(e):
        print(f"Found existing table {gbq_table_fully_resolved}")
    else:
        print(str(e))

Created table demoabc-381618.mydemo2asdf.ecwa


::: {.content-hidden}
### Define BigQuery Table Schema
:::

In [19]:
job_config = bigquery.LoadJobConfig(schema=[
    bigquery.SchemaField("visitorId", "INTEGER", mode='NULLABLE'),
    bigquery.SchemaField("visitNumber", "INTEGER", mode='NULLABLE'),
    bigquery.SchemaField("visitId", "INTEGER", mode='NULLABLE'),
    bigquery.SchemaField("visitStartTime", "INTEGER", mode='NULLABLE'),
    bigquery.SchemaField("date", "STRING", mode='NULLABLE'),
    bigquery.SchemaField(
        "totals",
        "RECORD",
        mode="NULLABLE",
        fields=[
            bigquery.SchemaField("visits", "INTEGER", mode="NULLABLE"),
            bigquery.SchemaField("hits", "INTEGER", mode="NULLABLE"),
            bigquery.SchemaField("pageviews", "INTEGER", mode="NULLABLE"),
            bigquery.SchemaField("timeOnSite", "INTEGER", mode="NULLABLE"),
            bigquery.SchemaField("bounces", "INTEGER", mode="NULLABLE"),
            bigquery.SchemaField("transactions", "INTEGER", mode="NULLABLE"),
            bigquery.SchemaField("transactionRevenue", "INTEGER", mode="NULLABLE"),
            bigquery.SchemaField("newVisits", "INTEGER", mode="NULLABLE"),
            bigquery.SchemaField("screenviews", "INTEGER", mode="NULLABLE"),
            bigquery.SchemaField("UniqueScreenViews", "INTEGER", mode="NULLABLE"),
            bigquery.SchemaField("timeOnScreen", "INTEGER", mode="NULLABLE"),
            bigquery.SchemaField("totalTransactionRevenue", "INTEGER", mode="NULLABLE"),
            bigquery.SchemaField("sessionQualityDim", "INTEGER", mode="NULLABLE"),
        ],
    ),
    bigquery.SchemaField(
        "trafficSource",
        "RECORD",
        mode="NULLABLE",
        fields=[
            bigquery.SchemaField("referralPath", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("campaign", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("source", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("medium", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("keyword", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("adContent", "STRING", mode="NULLABLE"),
            bigquery.SchemaField(
                "adwordsClickInfo",
                "RECORD",
                mode="NULLABLE",
                fields=[
                    bigquery.SchemaField("campaignId", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adGroupId", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("creativeId", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("criteriaId", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("page", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("slot", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("criteriaParameters", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("gclId", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("customerId", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adNetworkType", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField(
                        "targetingCriteria",
                        "RECORD",
                        mode="NULLABLE",
                        fields=[bigquery.SchemaField("boomUserlistId", "INTEGER", mode="NULLABLE")],
                    ),
                    bigquery.SchemaField("isVideoId", "BOOLEAN", mode="NULLABLE"),
                ],
            ),
            bigquery.SchemaField("isTrueDirect", "BOOLEAN", mode="NULLABLE"),
            bigquery.SchemaField("campaignCode", "STRING", mode="NULLABLE"),
        ],
    ),
    bigquery.SchemaField(
        "device",
        "RECORD",
        mode="NULLABLE",
        fields=[
            bigquery.SchemaField("browser", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("browserVersion", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("browserSize", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("operatingSystem", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("operatingSystemVersion", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("isMobile", "BOOLEAN", mode="NULLABLE"),
            bigquery.SchemaField("mobileDeviceBranding", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("mobileDeviceModel", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("mobileInputSelector", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("mobileDeviceInfo", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("mobileDeviceMarketingName", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("flashVersion", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("javaEnabled", "BOOLEAN", mode="NULLABLE"),
            bigquery.SchemaField("language", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("screenColors", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("screenResolution", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("deviceCategory", "STRING", mode="NULLABLE"),
        ],
    ),
    bigquery.SchemaField(
        "geoNetwork",
        "RECORD",
        mode="NULLABLE",
        fields=[
            bigquery.SchemaField("continent", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("subContinent", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("country", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("region", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("metro", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("city", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("cityId", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("networkDomain", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("latitude", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("longitude", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("networkLocation", "STRING", mode="NULLABLE"),
        ],
    ),
    bigquery.SchemaField(
        "customDimensions",
        "RECORD",
        mode="REPEATED",
        fields=[
            bigquery.SchemaField("index", "INTEGER", mode="NULLABLE"),
            bigquery.SchemaField("value", "STRING", mode="NULLABLE"),
        ],
    ),
    # hits
    bigquery.SchemaField(
        "hits",
        "RECORD",
        mode="REPEATED",
        fields=[
            bigquery.SchemaField("hitNumber", "INTEGER", mode="NULLABLE"),
            bigquery.SchemaField("time", "INTEGER", mode="NULLABLE"),
            bigquery.SchemaField("hour", "INTEGER", mode="NULLABLE"),
            bigquery.SchemaField("minute", "INTEGER", mode="NULLABLE"),
            bigquery.SchemaField("isSecure", "BOOLEAN", mode="NULLABLE"),
            bigquery.SchemaField("isInteraction", "BOOLEAN", mode="NULLABLE"),
            bigquery.SchemaField("isEntrance", "BOOLEAN", mode="NULLABLE"),
            bigquery.SchemaField("isExit", "BOOLEAN", mode="NULLABLE"),
            bigquery.SchemaField("referer", "STRING", mode="NULLABLE"),
            # page (NESTED)
            bigquery.SchemaField(
                "page",
                "RECORD",
                mode="NULLABLE",
                fields=[
                    bigquery.SchemaField("pagePath", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("hostname", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("pageTitle", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("searchKeyword", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("searchCategory", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("pagePathLevel1", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("pagePathLevel2", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("pagePathLevel3", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("pagePathLevel4", "STRING", mode="NULLABLE"),
                ],
            ),
            # transaction (NESTED)
            bigquery.SchemaField(
                "transaction",
                "RECORD",
                mode="NULLABLE",
                fields=[
                    bigquery.SchemaField("transactionId", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("transactionRevenue", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("transactionTax", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("transactionShipping", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("affiliation", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("currencyCode", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("localTransactionRevenue", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("localTransactionTax", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("localTransactionShipping", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("transactionCoupon", "STRING", mode="NULLABLE"),
                ],
            ),
            # item (NESTED)
            bigquery.SchemaField(
                "item",
                "RECORD",
                mode="NULLABLE",
                fields=[
                    bigquery.SchemaField("transactionId", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("productName", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("productCategory", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("productSku", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("itemQuantity", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("itemRevenue", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("currencyCode", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("localItemRevenue", "INTEGER", mode="NULLABLE"),
                ],
            ),
            # contentInfo (NESTED)
            bigquery.SchemaField(
                "contentInfo",
                "RECORD",
                mode="NULLABLE",
                fields=[
                    bigquery.SchemaField("contentDescription", "STRING", mode="NULLABLE"),
                ],
            ),
            # appInfo (NESTED)
            bigquery.SchemaField(
                "appInfo",
                "RECORD",
                mode="NULLABLE",
                fields=[
                    bigquery.SchemaField("name", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("version", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("id", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("installerId", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("appInstallerId", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("appName", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("appVersion", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("appId", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("screenName", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("landingScreenName", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("exitScreenName", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("screenDepth", "STRING", mode="NULLABLE"),
                ],
            ),
            # exceptionInfo (NESTED)
            bigquery.SchemaField(
                "exceptionInfo",
                "RECORD",
                mode="NULLABLE",
                fields=[
                    bigquery.SchemaField("description", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("isFatal", "BOOLEAN", mode="NULLABLE"),
                    bigquery.SchemaField("exceptions", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("fatalExceptions", "INTEGER", mode="NULLABLE"),
                ],
            ),
            # eventInfo (NESTED)
            bigquery.SchemaField(
                "eventInfo",
                "RECORD",
                mode="NULLABLE",
                fields=[
                    bigquery.SchemaField("eventCategory", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("eventAction", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("eventLabel", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("eventValue", "INTEGER", mode="NULLABLE"),
                ],
            ),
            # product (NESTED)
            bigquery.SchemaField(
                "product",
                "RECORD",
                mode="REPEATED",
                fields=[
                    bigquery.SchemaField("productSKU", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("v2ProductName", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("v2ProductCategory", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("productVariant", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("productBrand", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("productRevenue", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("localProductRevenue", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("productPrice", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("localProductPrice", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("productQuantity", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("productRefundAmount", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("localProductRefundAmount", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("isImpression", "BOOLEAN", mode="NULLABLE"),
                    bigquery.SchemaField("isClick", "BOOLEAN", mode="NULLABLE"),
                    bigquery.SchemaField(
                        "customDimensions",
                        "RECORD",
                        mode="REPEATED",
                        fields=[
                            bigquery.SchemaField("index", "INTEGER", mode="NULLABLE"),
                            bigquery.SchemaField("value", "STRING", mode="NULLABLE"),
                        ],
                    ),
                    bigquery.SchemaField(
                        "customMetrics",
                        "RECORD",
                        mode="REPEATED",
                        fields=[
                            bigquery.SchemaField("index", "INTEGER", mode="NULLABLE"),
                            bigquery.SchemaField("value", "INTEGER", mode="NULLABLE"),
                        ],
                    ),
                    bigquery.SchemaField("productListName", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("productListPosition", "INTEGER", mode="NULLABLE"),
                ],
            ),
            # promotion (NESTED)
            bigquery.SchemaField(
                "promotion",
                "RECORD",
                mode="REPEATED",
                fields=[
                    bigquery.SchemaField("promoId", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("promoName", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("promoCreative", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("promoPosition", "STRING", mode="NULLABLE"),
                ],
            ),
            # promotionActionInfo (NESTED)
            bigquery.SchemaField(
                "promotionActionInfo",
                "RECORD",
                mode="NULLABLE",
                fields=[
                    bigquery.SchemaField("promoIsView", "BOOLEAN", mode="NULLABLE"),
                    bigquery.SchemaField("promoIsClick", "BOOLEAN", mode="NULLABLE"),
                ],
            ),
            # refund (NESTED)
            bigquery.SchemaField(
                "refund",
                "RECORD",
                mode="NULLABLE",
                fields=[
                    bigquery.SchemaField("refundAmount", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("localRefundAmount", "INTEGER", mode="NULLABLE"),
                ],
            ),
            # eCommerceAction (NESTED)
            bigquery.SchemaField(
                "eCommerceAction",
                "RECORD",
                mode="NULLABLE",
                fields=[
                    bigquery.SchemaField("action_type", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("step", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("option", "STRING", mode="NULLABLE"),
                ],
            ),
            # experiment (NESTED)
            bigquery.SchemaField(
                "experiment",
                "RECORD",
                mode="REPEATED",
                fields=[
                    bigquery.SchemaField("experimentId", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("experimentVariant", "STRING", mode="NULLABLE"),
                ],
            ),
            # publisher (NESTED)
            bigquery.SchemaField(
                "publisher",
                "RECORD",
                mode="NULLABLE",
                fields=[
                    bigquery.SchemaField("dfpClicks", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("dfpImpressions", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("dfpMatchedQueries", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("dfpMeasurableImpressions", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("dfpQueries", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("dfpRevenueCpm", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("dfpRevenueCpc", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("dfpViewableImpressions", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("dfpPagesViewed", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adsenseBackfillDfpClicks", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adsenseBackfillDfpImpressions", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adsenseBackfillDfpMatchedQueries", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adsenseBackfillDfpMeasurableImpressions", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adsenseBackfillDfpQueries", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adsenseBackfillDfpRevenueCpm", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adsenseBackfillDfpRevenueCpc", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adsenseBackfillDfpViewableImpressions", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adsenseBackfillDfpPagesViewed", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adxBackfillDfpClicks", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adxBackfillDfpImpressions", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adxBackfillDfpMatchedQueries", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adxBackfillDfpMeasurableImpressions", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adxBackfillDfpQueries", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adxBackfillDfpRevenueCpm", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adxBackfillDfpRevenueCpc", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adxBackfillDfpViewableImpressions", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adxBackfillDfpPagesViewed", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adxClicks", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adxImpressions", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adxMatchedQueries", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adxMeasurableImpressions", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adxQueries", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adxRevenue", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adxViewableImpressions", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adxPagesViewed", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adsViewed", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adsUnitsViewed", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adsUnitsMatched", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("viewableAdsViewed", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("measurableAdsViewed", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adsPagesViewed", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adsClicked", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adsRevenue", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("dfpAdGroup", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("dfpAdUnits", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("dfpNetworkId", "STRING", mode="NULLABLE"),
                ],
            ),
            # customVariables (NESTED)
            bigquery.SchemaField(
                "customVariables",
                "RECORD",
                mode="REPEATED",
                fields=[
                    bigquery.SchemaField("index", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("customVarName", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("customVarValue", "STRING", mode="NULLABLE"),
                ],
            ),
            # customDimensions (NESTED)
            bigquery.SchemaField(
                "customDimensions",
                "RECORD",
                mode="REPEATED",
                fields=[
                    bigquery.SchemaField("index", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("value", "STRING", mode="NULLABLE"),
                ],
            ),
            # customMetrics (NESTED)
            bigquery.SchemaField(
                "customMetrics",
                "RECORD",
                mode="REPEATED",
                fields=[
                    bigquery.SchemaField("index", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("value", "INTEGER", mode="NULLABLE"),
                ],
            ),
            bigquery.SchemaField("type", "STRING", mode="NULLABLE"),
            # social (NESTED)
            bigquery.SchemaField(
                "social",
                "RECORD",
                mode="NULLABLE",
                fields=[
                    bigquery.SchemaField("socialInteractionNetwork", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("socialInteractionAction", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("socialInteractions", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("socialInteractionTarget", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("socialNetwork", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("uniqueSocialInteractions", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("hasSocialSourceReferral", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("socialInteractionNetworkAction", "STRING", mode="NULLABLE"),
                ],
            ),
            # latencyTracking (NESTED)
            bigquery.SchemaField(
                "latencyTracking",
                "RECORD",
                mode="NULLABLE",
                fields=[
                    bigquery.SchemaField("pageLoadSample", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("pageLoadTime", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("pageDownloadTime", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("redirectionTime", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("speedMetricsSample", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("domainLookupTime", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("serverConnectionTime", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("serverResponseTime", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("domLatencyMetricsSample", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("domInteractiveTime", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("domContentLoadedTime", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("userTimingValue", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("userTimingSample", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("userTimingVariable", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("userTimingCategory", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("userTimingLabel", "STRING", mode="NULLABLE"),
                ],
            ),
            # sourcePropertyInfo (NESTED)
            bigquery.SchemaField(
                "sourcePropertyInfo",
                "RECORD",
                mode="NULLABLE",
                fields=[
                    bigquery.SchemaField("sourcePropertyDisplayName", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("sourcePropertyTrackingId", "STRING", mode="NULLABLE"),
                ],
            ),
            # contentGroup (NESTED)
            bigquery.SchemaField(
                "contentGroup",
                "RECORD",
                mode="NULLABLE",
                fields=[
                    bigquery.SchemaField("contentGroup1", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("contentGroup2", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("contentGroup3", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("contentGroup4", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("contentGroup5", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("previousContentGroup1", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("previousContentGroup2", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("previousContentGroup3", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("previousContentGroup4", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("previousContentGroup5", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("contentGroupUniqueViews1", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("contentGroupUniqueViews2", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("contentGroupUniqueViews3", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("contentGroupUniqueViews4", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("contentGroupUniqueViews5", "INTEGER", mode="NULLABLE"),
                ],
            ),
            bigquery.SchemaField("dataSource", "STRING", mode="NULLABLE"),
            # publisher_infos (NESTED)
            bigquery.SchemaField(
                "publisher_infos",
                "RECORD",
                mode="REPEATED",
                fields=[
                    bigquery.SchemaField("dfpClicks", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("dfpImpressions", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("dfpMatchedQueries", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("dfpMeasurableImpressions", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("dfpQueries", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("dfpRevenueCpm", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("dfpRevenueCpc", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("dfpViewableImpressions", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("dfpPagesViewed", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adsenseBackfillDfpClicks", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adsenseBackfillDfpImpressions", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adsenseBackfillDfpMatchedQueries", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adsenseBackfillDfpMeasurableImpressions", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adsenseBackfillDfpQueries", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adsenseBackfillDfpRevenueCpm", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adsenseBackfillDfpRevenueCpc", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adsenseBackfillDfpViewableImpressions", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adsenseBackfillDfpPagesViewed", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adxBackfillDfpClicks", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adxBackfillDfpImpressions", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adxBackfillDfpMatchedQueries", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adxBackfillDfpMeasurableImpressions", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adxBackfillDfpQueries", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adxBackfillDfpRevenueCpm", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adxBackfillDfpRevenueCpc", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adxBackfillDfpViewableImpressions", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adxBackfillDfpPagesViewed", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adxClicks", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adxImpressions", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adxMatchedQueries", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adxMeasurableImpressions", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adxQueries", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adxRevenue", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adxViewableImpressions", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adxPagesViewed", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adsViewed", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adsUnitsViewed", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adsUnitsMatched", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("viewableAdsViewed", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("measurableAdsViewed", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adsPagesViewed", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adsClicked", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("adsRevenue", "INTEGER", mode="NULLABLE"),
                    bigquery.SchemaField("dfpAdGroup", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("dfpAdUnits", "STRING", mode="NULLABLE"),
                    bigquery.SchemaField("dfpNetworkId", "STRING", mode="NULLABLE"),
                ],
            ),
        ],
    ),
    bigquery.SchemaField("fullVisitorId", "STRING", mode='NULLABLE'),
    bigquery.SchemaField("userId", "STRING", mode='NULLABLE'),
    bigquery.SchemaField("channelGrouping", "STRING", mode='NULLABLE'),
    bigquery.SchemaField("socialEngagementType", "STRING", mode='NULLABLE'),
])
job_config.write_disposition = 'WRITE_APPEND'

::: {.content-hidden}
Get list of columns in the table
:::

In [20]:
cols_to_use = [f.name for f in job_config.schema]

### Appending Batch(es) of Data to BigQuery Table

Append `DataFrame` for single file of data to newly-created table in BigQuery dataset

In [21]:
#| echo: true
for k, f in enumerate(raw_data_files_upload):
    start_time = time()
    print(
        f"{start_idx_upload+k+1}/{start_idx_upload+len(raw_data_files_upload)} | "
        f"{os.path.basename(f).rstrip('.parquet.gzip')}: ",
        end=""
    )
    df = pd.read_parquet(f, columns=cols_to_use)
    load_duration = time() - start_time
    start_time = datetime.now(pytz.timezone("US/Eastern"))
    start_time_str = start_time.strftime("%Y-%m-%d %H:%M:%S.%f")
    print(
        f"Loaded local data in {load_duration:.3f} seconds. "
        f"Upload execution start time = {start_time_str[:-3]}...",
        end=""
    )
    job = client.load_table_from_dataframe(
        df,
        destination=f"{gbq_dataset_id}.{gbq_table_id}",
        job_config=job_config,
    )
    _ = job.result()
    end_time = datetime.now(pytz.timezone("US/Eastern"))
    end_time_str = end_time.strftime("%Y-%m-%d %H:%M:%S.%f")
    duration = end_time - start_time
    duration = duration.seconds + (duration.microseconds / 1_000_000)
    print(f"done at {end_time_str[:-3]} ({duration:.3f} seconds).")

29/55 | d2i_ecwa__20170131__20170206__20230605_183843: Loaded local data in 2.878 seconds. Upload execution start time = 2023-06-12 16:28:22.466...done at 2023-06-12 16:28:38.768 (16.303 seconds).
30/55 | d2i_ecwa__20170207__20170213__20230606_115635: Loaded local data in 2.930 seconds. Upload execution start time = 2023-06-12 16:28:41.698...done at 2023-06-12 16:29:07.062 (25.364 seconds).
31/55 | d2i_ecwa__20170214__20170220__20230606_115920: Loaded local data in 2.984 seconds. Upload execution start time = 2023-06-12 16:29:10.047...done at 2023-06-12 16:30:05.325 (55.279 seconds).
32/55 | d2i_ecwa__20170221__20170227__20230606_120157: Loaded local data in 2.845 seconds. Upload execution start time = 2023-06-12 16:30:08.170...done at 2023-06-12 16:30:32.254 (24.083 seconds).
33/55 | d2i_ecwa__20170228__20170306__20230606_185806: Loaded local data in 3.026 seconds. Upload execution start time = 2023-06-12 16:30:35.280...done at 2023-06-12 16:30:50.066 (14.786 seconds).
34/55 | d2i_ecw

Count number of rows in BigQuery table using

1. newly created table
2. table in public dataset

In [22]:
#| echo: true
table_summaries = []
for gbq_table_id_fully_resolved, dataset_type in zip(
    [gbq_table_fully_resolved, 'data-to-insights.ecommerce.web_analytics'],
    ['new', 'public-data'],
):
    table = client.get_table(gbq_table_id_fully_resolved)
    print(
        f"Found {table.num_rows:,} rows and {len(table.schema):,} columns in "
        f"table {table.dataset_id}.{table.table_id}"
    )
    table_summaries.append(
        {
            "table_id_full": f"{table.dataset_id}.{table.table_id}",
            "num_rows": table.num_rows,
            "num_columns": len(table.schema),
            "dataset_type": dataset_type
        }
    )
df_table_summaries = pd.DataFrame.from_records(table_summaries)
df_table_summaries

Found 970,532 rows and 15 columns in table mydemo2asdf.ecwa
Found 970,532 rows and 15 columns in table ecommerce.web_analytics


Unnamed: 0,table_id_full,num_rows,num_columns,dataset_type
0,mydemo2asdf.ecwa,970532,15,new
1,ecommerce.web_analytics,970532,15,public-data


::: {.content-hidden}
Verify that number of rows and columns using the table in both datasets are identical
:::

In [23]:
assert (
    df_table_summaries.query("dataset_type == 'new'")
    .reset_index(drop=True)
    .drop(columns=['table_id_full', 'dataset_type'])
    .equals(
        df_table_summaries.query("dataset_type == 'public-data'")
        .reset_index(drop=True)
        .drop(columns=['table_id_full', 'dataset_type'])
    )
)

## Read Data From Newly-Created BigQuery Table

### Basic Query

Read single batch of raw GA360 tracking data from the newly created table covering the start and end dates defined above, drop duplicated rows (if any) and change datatypes

In [24]:
#| echo: true
query = f"""
        SELECT visitorId,
               visitNumber,
               visitId,
               visitStartTime,
               date,
               totals,
               trafficSource,
               device,
               geoNetwork,
               customDimensions,
               hits,
               fullVisitorId,
               userId,
               channelGrouping,
               socialEngagementType
        FROM `{gbq_table_fully_resolved}`
        LIMIT 400
        """
df_query = (
    th.extract_data(query, gcp_auth_dict)
    .pipe(th.set_datatypes, dtypes_dict_raw)
    .pipe(th.set_datatypes, dict(zip(nested_cols, [pd.StringDtype()]*len(nested_cols))))
)
df_query

Query execution start time = 2023-06-12 16:38:47.615...done at 2023-06-12 16:38:50.718 (3.103 seconds).
Query returned 400 rows


Unnamed: 0,visitorId,visitNumber,visitId,visitStartTime,date,totals,trafficSource,device,geoNetwork,customDimensions,hits,fullVisitorId,userId,channelGrouping,socialEngagementType
0,,1,1501607173,1501607173,20170801,"{'visits': 1, 'hits': 1, 'pageviews': 1, 'time...","{'referralPath': '/analytics/web/', 'campaign'...","{'browser': 'Chrome', 'browserVersion': 'not a...","{'continent': 'Americas', 'subContinent': 'Nor...","[{'index': 4, 'value': 'North America'}]","[{'hitNumber': 1, 'time': 0, 'hour': 10, 'minu...",4073710137623180310,,Referral,Not Socially Engaged
1,,1,1501625398,1501625398,20170801,"{'visits': 1, 'hits': 1, 'pageviews': 1, 'time...","{'referralPath': None, 'campaign': 'AW - Dynam...","{'browser': 'Chrome', 'browserVersion': 'not a...","{'continent': 'Americas', 'subContinent': 'Nor...","[{'index': 4, 'value': 'North America'}]","[{'hitNumber': 1, 'time': 0, 'hour': 15, 'minu...",3760205947897353033,,Paid Search,Not Socially Engaged
2,,1,1501644116,1501644116,20170801,"{'visits': 1, 'hits': 1, 'pageviews': 1, 'time...","{'referralPath': None, 'campaign': 'AW - Dynam...","{'browser': 'Chrome', 'browserVersion': 'not a...","{'continent': 'Americas', 'subContinent': 'Nor...","[{'index': 4, 'value': 'North America'}]","[{'hitNumber': 1, 'time': 0, 'hour': 20, 'minu...",3678020209252369953,,Paid Search,Not Socially Engaged
3,,1,1501604704,1501604704,20170801,"{'visits': 1, 'hits': 1, 'pageviews': 1, 'time...","{'referralPath': None, 'campaign': '(not set)'...","{'browser': 'Chrome', 'browserVersion': 'not a...","{'continent': 'Americas', 'subContinent': 'Nor...","[{'index': 4, 'value': 'North America'}]","[{'hitNumber': 1, 'time': 0, 'hour': 9, 'minut...",3876257535022064438,,Organic Search,Not Socially Engaged
4,,1,1501603274,1501603274,20170801,"{'visits': 1, 'hits': 1, 'pageviews': 1, 'time...",{'referralPath': '/youtube/forum/AAAAiuErobUfI...,"{'browser': 'Safari', 'browserVersion': 'not a...","{'continent': 'Europe', 'subContinent': 'North...","[{'index': 4, 'value': 'EMEA'}]","[{'hitNumber': 1, 'time': 0, 'hour': 9, 'minut...",1521501189098314714,,Referral,Not Socially Engaged
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
395,,1,1501618371,1501618371,20170801,"{'visits': 1, 'hits': 2, 'pageviews': 2, 'time...","{'referralPath': '/', 'campaign': '(not set)',...","{'browser': 'Chrome', 'browserVersion': 'not a...","{'continent': 'Americas', 'subContinent': 'Nor...","[{'index': 4, 'value': 'North America'}]","[{'hitNumber': 1, 'time': 0, 'hour': 13, 'minu...",5626141748667947030,,Referral,Not Socially Engaged
396,,3,1501622651,1501622651,20170801,"{'visits': 1, 'hits': 2, 'pageviews': 2, 'time...","{'referralPath': '/', 'campaign': '(not set)',...","{'browser': 'Chrome', 'browserVersion': 'not a...","{'continent': 'Americas', 'subContinent': 'Nor...","[{'index': 4, 'value': 'North America'}]","[{'hitNumber': 1, 'time': 0, 'hour': 14, 'minu...",101877320439056275,,Referral,Not Socially Engaged
397,,1,1501600680,1501600680,20170801,"{'visits': 1, 'hits': 2, 'pageviews': 2, 'time...","{'referralPath': None, 'campaign': '(not set)'...","{'browser': 'Safari', 'browserVersion': 'not a...","{'continent': 'Americas', 'subContinent': 'Nor...","[{'index': 4, 'value': 'North America'}]","[{'hitNumber': 1, 'time': 0, 'hour': 8, 'minut...",5203365220731190903,,Direct,Not Socially Engaged
398,,2,1501604624,1501604624,20170801,"{'visits': 1, 'hits': 2, 'pageviews': 2, 'time...","{'referralPath': '/', 'campaign': '(not set)',...","{'browser': 'Chrome', 'browserVersion': 'not a...","{'continent': 'Americas', 'subContinent': 'Nor...","[{'index': 4, 'value': 'North America'}]","[{'hitNumber': 1, 'time': 0, 'hour': 9, 'minut...",4198073474422567104,,Referral,Not Socially Engaged


::: {.content-hidden}
Show the datatypes and number of missing values in the newly created table
:::

In [25]:
#| output: false
summarize_df(df_query)

Unnamed: 0,column,dtype,missing
0,visitorId,Int64,400
1,visitNumber,Int64,0
2,visitId,Int64,0
3,visitStartTime,Int64,0
4,date,string[python],0
5,totals,string[python],0
6,trafficSource,string[python],0
7,device,string[python],0
8,geoNetwork,string[python],0
9,customDimensions,string[python],0


::: {.content-hidden}
Show the number of

1. visits (rows)
2. unique visitors
:::

In [26]:
#| output: false
print(
    len(df_query),
    len(df_query.drop_duplicates([c for c in cols_to_use if c not in nested_cols])),
    df_query['fullVisitorId'].nunique(),
)

400 400 379


### QL Queries

Task 1

In [27]:
#| echo: true
df_queries_combo = pd.concat(
    [
        th.extract_data(
            f"""
            WITH visitors AS(
                SELECT COUNT(DISTINCT fullVisitorId) AS total_visitors
                FROM `{gbq_table_id_fully_resolved}`
            ),
            purchasers AS (
                SELECT COUNT(DISTINCT fullVisitorId) AS total_purchasers
                FROM `{gbq_table_id_fully_resolved}`
                WHERE totals.transactions IS NOT NULL
            )
            SELECT total_visitors,
                   total_purchasers,
                   100* (total_purchasers / total_visitors) AS conversion_rate
            FROM visitors, purchasers
            """,
            gcp_auth_dict,
        ).assign(dataset_type=dataset_type)
        for gbq_table_id_fully_resolved, dataset_type in zip(
            [gbq_table_fully_resolved, 'data-to-insights.ecommerce.web_analytics'],
            ['new', 'public-data'],
        )
    ],
    ignore_index=True,
)
df_queries_combo

Query execution start time = 2023-06-12 16:38:55.486...done at 2023-06-12 16:38:57.510 (2.024 seconds).
Query returned 1 rows
Query execution start time = 2023-06-12 16:38:57.511...done at 2023-06-12 16:38:59.852 (2.341 seconds).
Query returned 1 rows


Unnamed: 0,total_visitors,total_purchasers,conversion_rate,dataset_type
0,741721,20015,2.698454,new
1,741721,20015,2.698454,public-data


::: {.content-hidden}
Verify that outputs of task 1. using the table in both datasets are identical
:::

In [28]:
assert (
    df_queries_combo.query("dataset_type == 'new'")
    .reset_index(drop=True)
    .drop(columns=['dataset_type'])
    .equals(
        df_queries_combo.query("dataset_type == 'public-data'")
        .reset_index(drop=True)
        .drop(columns=['dataset_type'])
    )
)

Task 2

In [29]:
#| echo: true
query = f"""
        WITH t1 AS (
            SELECT hits
            FROM `{gbq_table_fully_resolved}`
        )
        SELECT p.v2ProductName,
               p.v2ProductCategory,
               SUM(p.productQuantity) AS units_sold,
               ROUND(SUM(p.localProductRevenue/1000000),2) AS revenue
        FROM t1,
        UNNEST(hits) AS h,
        UNNEST(h.product) AS p
        GROUP BY 1, 2
        ORDER BY revenue DESC
        LIMIT 5
        """
df_query = th.extract_data(query, gcp_auth_dict)
df_query

Query execution start time = 2023-06-12 16:39:04.252...done at 2023-06-12 16:39:06.720 (2.467 seconds).
Query returned 5 rows


Unnamed: 0,v2ProductName,v2ProductCategory,units_sold,revenue
0,Nest® Learning Thermostat 3rd Gen-USA - Stainl...,Nest-USA,17651,870976.95
1,Nest® Cam Outdoor Security Camera - USA,Nest-USA,16930,684034.55
2,Nest® Cam Indoor Security Camera - USA,Nest-USA,14155,548104.47
3,Nest® Protect Smoke + CO White Wired Alarm-USA,Nest-USA,6394,178937.6
4,Nest® Protect Smoke + CO White Battery Alarm-USA,Nest-USA,6340,178572.4


Task 3

In [30]:
#| echo: true
query = f"""
        # visitors who bought on a return visit (could have bought on first as well
        WITH all_visitor_stats AS (
            SELECT fullvisitorid,
                   IF(COUNTIF(totals.transactions > 0 AND totals.newVisits IS NULL) > 0, 1, 0) AS will_buy_on_return_visit
            FROM `{gbq_table_fully_resolved}`
            GROUP BY fullvisitorid
        )
        SELECT will_buy_on_return_visit,
               COUNT(DISTINCT fullvisitorid) AS total_visitors
        FROM all_visitor_stats
        GROUP BY will_buy_on_return_visit
        """
df_query = (
    th.extract_data(query, gcp_auth_dict)
    .assign(frac_visitors=lambda df: 100*(df['total_visitors']/df['total_visitors'].sum()))
)
df_query

Query execution start time = 2023-06-12 16:39:08.050...done at 2023-06-12 16:39:10.808 (2.758 seconds).
Query returned 2 rows


Unnamed: 0,will_buy_on_return_visit,total_visitors,frac_visitors
0,0,729848,98.399263
1,1,11873,1.600737


Tasks 4 and 5

In [31]:
#| echo: true
query = f"""
        WITH t1 AS (
            SELECT fullVisitorId,
                   IFNULL(totals.bounces, 0) AS bounces,
                   IFNULL(totals.timeOnSite, 0) AS time_on_site
            FROM `{gbq_table_fully_resolved}`
            WHERE totals.newVisits = 1
        ),
        t2 AS (
            SELECT fullvisitorid,
                   IF(COUNTIF(totals.transactions > 0 AND totals.newVisits IS NULL) > 0, 1, 0) AS will_buy_on_return_visit
            FROM `{gbq_table_fully_resolved}`
            GROUP BY fullvisitorid
        )
        SELECT *
        FROM t1 JOIN t2 USING (fullVisitorId)
        ORDER BY time_on_site DESC
        """
df_query = th.extract_data(query, gcp_auth_dict)
df_query.head(10)

Query execution start time = 2023-06-12 16:39:12.980...done at 2023-06-12 16:39:41.367 (28.386 seconds).
Query returned 734,704 rows


Unnamed: 0,fullVisitorId,bounces,time_on_site,will_buy_on_return_visit
0,2706961341001088633,0,15047,0
1,6957245643416321514,0,12136,0
2,5208937953046059083,0,11201,0
3,3924372865099736100,0,10046,0
4,434744388841615987,0,9974,0
5,5564610750564086192,0,9564,0
6,4691667039083430712,0,9520,0
7,5037208788065339806,0,9275,1
8,5531986444834608429,0,9138,0
9,6879336706336070910,0,8872,0


::: {.content-hidden}
From the output of tasks 4. and 5., show the number of

1. visits (rows)
2. unique visitors
3. :::

In [32]:
#| output: false
print(
    len(df_query),
    len(df_query.drop_duplicates(['fullVisitorId'])),
    df_query['fullVisitorId'].nunique(),
)

734704 733085 733085


## Next Step

The next step is to train a ML model to predict future (return) visit purchase propensity using the GA360 visits data.