In [None]:
from google.cloud import bigquery
from dataclasses import dataclass
from datetime import date, timedelta
from dotenv import load_dotenv
import os
from typing import List
import time
from sqlalchemy.sql import func as F
from sqlalchemy.engine import create_engine, Engine
from sqlalchemy.schema import Table, MetaData
from sqlalchemy import Column, Integer, select, String, literal_column, text
from sqlalchemy.sql.expression import Select, CompoundSelect, union_all
from sqlalchemy_bigquery import ARRAY, STRUCT, INTEGER, FLOAT, FLOAT64 as DOUBLE, STRING
from enum import Enum

load_dotenv()

## Fetch GCP data

In [None]:
project = os.environ["GCP_PROJECT_ID"]
client = bigquery.Client(project=project)
engine = create_engine(f"bigquery://{project}")

In [None]:
@dataclass
class Site:
    ga4_property_id: str
    
DALLAS_FREE_PRESS = Site(ga4_property_id=os.environ["DALLAS_FREE_PRESS_GA4_PROPERTY_ID"])

In [None]:
def enumerate_query_dates(end_date: date, num_days: int) -> List[date]:
    return [end_date - timedelta(days=i) for i in range(0, num_days)]

def construct_event_table_name(gcp_project_id: str, site: Site, dt: date) -> str:
    return f"{gcp_project_id}.analytics_{site.ga4_property_id}.events_{dt.strftime('%Y%m%d')}"


In [None]:
num_days = 5
query_dates = enumerate_query_dates(date(2023, 10, 15), num_days)

In [None]:
class StrEnum(str, Enum):
    def __repr__(self):
        return self.value

class ColumnBigQuery(StrEnum):
    EVENT_TIMESTAMP = "event_timestamp"
    EVENT_NAME = "event_name"
    EVENT_PARAMS = "event_params"
    USER_PSEUDO_ID = "user_pseudo_id"

class ColumnNew(StrEnum):
    EVENT_PAGE_LOCATION = "event_page_location"
    EVENT_ENGAGEMENT_TIME_MSEC = "event_engagement_time_msec"

def construct_query_single_table(table_name: str, engine: Engine) -> Select:
    table = Table(
        table_name, 
        MetaData(bind=engine), 
        Column(ColumnBigQuery.EVENT_TIMESTAMP, Integer),
        Column(ColumnBigQuery.EVENT_NAME, String),
        Column(ColumnBigQuery.EVENT_PARAMS, ARRAY(STRUCT(key=STRING, value=STRUCT(string_value=STRING, int_value=INTEGER, float_value=DOUBLE)))),
        Column(ColumnBigQuery.USER_PSEUDO_ID, String),
        autoload_with=engine
    )

    subquery_page_location = select(
        literal_column("value.string_value")
    ).select_from(
        F.unnest(table.c.event_params).alias("params")
    ).where(
        literal_column("params.key") == "page_location"
    ).label(ColumnNew.EVENT_PAGE_LOCATION)

    subquery_engagement_time = select(
        literal_column("value.int_value")
    ).select_from(
        F.unnest(table.c.event_params).alias("params")
    ).where(
        literal_column("params.key") == "engagement_time_msec"
    ).label(ColumnNew.EVENT_ENGAGEMENT_TIME_MSEC)

    query = select(
        table.c.event_timestamp,
        table.c.event_name,
        subquery_page_location,
        subquery_engagement_time,
        table.c.user_pseudo_id
    ).order_by(
        table.c.user_pseudo_id,
        table.c.event_timestamp
    )

    return query

def construct_query(query_dates: List[date], site: Site, engine: Engine) -> Select:
    queries = [construct_query_single_table(construct_event_table_name(project, site, dt), engine) for dt in query_dates]
    subquery = union_all(*queries).subquery()
    return select(subquery) # Because SQLAlchemy is dumb and doesn't allow direct stringification of a CompoundSelect

def stringify_query(query: Select, engine: Engine) -> str:
    return str(query.compile(engine, compile_kwargs={"literal_binds": True}))

In [None]:
query = construct_query(query_dates, site=DALLAS_FREE_PRESS, engine=engine)

In [None]:
query_str = stringify_query(query, engine)

In [None]:
print(query_str)

**NOTE:** Even if a query is less than 10 MiB, BigQuery still charges as if the query is 10 MiB. For queries less than 10 MiB, better to batch them together so the resulting query is at least 10 MiB, cutting cost.

In [None]:
def calculate_table_query_tib_processed(query_str: str, client: bigquery.Client) -> float:
    """
    Return number of bytes processed by a query
    """
    return client.query(query_str, job_config=bigquery.QueryJobConfig(dry_run=True)).total_bytes_processed



In [None]:
bytes_processed = calculate_table_query_tib_processed(query_str, client)

In [None]:
# MiBs processed
f"{bytes_processed / 1024**2:.2f} MiB"

In [None]:
# TiBs processed
f"{bytes_processed / 1024**4} TiB"

In [54]:
client.get_table(construct_event_table_name(project, DALLAS_FREE_PRESS, dt=query_dates[0])).num_bytes

1082787

One strategy to make sure we're not wasting money given the 10 MiB minimum charge per query
 TODO: A function that, given a site name, a list of dates, and a query bytes floor of FLOOR = 10 MiB & some CEIL,
- first, iterate along the list of dates until the cumulative total storage of the tables is greater than FLOOR
- then, interate and build query string as we go, until total bytes processed is just under CEIL
- return the query string
- if there are more dates left, repeat the process


Or caching: https://cloud.google.com/bigquery/docs/cached-results, but caches are only valid for at most 24 hours.
Could instead build our own cache; construct a dedicated events table that only keeps fetched events up to a certain threshold (say, 1 month if 1 month of events are needed to train recs; anything older would be deleted), 
but for small sites (and even bigger ones) this is probably overkill, though would still be nice to have 