In [16]:
%pip install google-cloud-bigquery-storage pyarrow 

Collecting google-cloud-bigquery-storage
  Downloading google_cloud_bigquery_storage-2.36.0-py3-none-any.whl.metadata (10 kB)
Downloading google_cloud_bigquery_storage-2.36.0-py3-none-any.whl (303 kB)
Installing collected packages: google-cloud-bigquery-storage
Successfully installed google-cloud-bigquery-storage-2.36.0
Note: you may need to restart the kernel to use updated packages.


In [14]:
from google.cloud import bigquery

import os
import pandas as pd
import numpy as np
import datetime
import warnings
warnings.filterwarnings('ignore')
from typing import List

# 2. Load the BigQuery magic extension into your notebook
%load_ext google.cloud.bigquery

pd.set_option('display.max_columns', None)

client = bigquery.Client(project='moloco-ods')
def process_query(input_query):
    job_config = bigquery.QueryJobConfig()
    query_job = client.query(input_query, job_config=job_config)
    df_return = query_job.result().to_dataframe()
    return df_return

def process_query_be(input_query):
    job_config = bigquery.QueryJobConfig()
    query_job = client.query(input_query, job_config=job_config)
    print(f"Submitted job: {query_job.job_id}")
    return query_job

def fetch_result(query_job):
    """
    Return DataFrame only if the query is complete and successful.
    If still running, return None.
    If failed, return a dict with error info (so next loop can still check other jobs).
    """
    try:
        if query_job.done():
            if query_job.error_result:
                # Query finished but failed
                print(f"Job {query_job.job_id} failed: {query_job.error_result}")
                return {"status": "error", "job_id": query_job.job_id, "error": query_job.error_result}
            else:
                # Query finished successfully
                print(f"Job {query_job.job_id} is complete!")
                df_return = query_job.result().to_dataframe()
                return {"status": "success", "job_id": query_job.job_id, "data": df_return}
        else:
            # Query still running
            print(f"Job {query_job.job_id} is still running...")
            return None
    except Exception as e:
        # Unexpected error while fetching results
        print(f"Job {query_job.job_id} raised an exception: {e}")
        return {"status": "exception", "job_id": query_job.job_id, "error": str(e)}



In [5]:
query = """
  DECLARE start_date DATE DEFAULT DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY);
  DECLARE end_date DATE DEFAULT CURRENT_DATE();
  DECLARE authors ARRAY<STRING> DEFAULT ['haewon.yum@moloco.com'];

  WITH completed_tests AS (
    SELECT
      -- Experiment-level
      d.id                      AS experiment_id,
      d.name                    AS experiment_name,
      d.experiment_type,
      d.layer_id,
      d.status,
      d.schedule.start          AS schedule_start,
      d.schedule.end            AS schedule_end,
      author,                -- array of authors

      -- Group-level
      gr.group_id               AS group_id,
      gr.control_group_id,      -- 0 if this group *is* the control
      gr.description            AS group_name,
      gr.bucket_size,           -- 0–1000 buckets
      gr.bucket_size / 10.0     AS nominal_traffic_pct   -- ≈ intended % traffic
    FROM `explab-298609.exp_prod.experiment_digest_v2` AS d
    CROSS JOIN UNNEST(d.groups) AS gr, UNNEST(d.authors) author
    WHERE 
      -- d.id = 'devsisters_usa_20260113_psa'
      TRUE
      AND DATE(timestamp) BETWEEN start_date AND end_date
      AND author IN UNNEST(authors)
      AND d.status LIKE '%COMPLETED%'
    QUALIFY
      -- keep only the latest config per group
      ROW_NUMBER() OVER (PARTITION BY gr.group_id ORDER BY d.timestamp DESC) = 1
    ORDER BY experiment_id
  ),
  completed_tests_summary AS (
    SELECT 
      experiment_id,
      experiment_name,
      schedule_start,
      schedule_end,
      ARRAY_AGG(group_id) AS group_id_array,
      MAX(control_group_id) AS control_group_id
    FROM completed_tests
    GROUP BY ALL
  ),

  experiments AS (
    WITH tmp AS(

      SELECT 
        experiment_id,
        experiment_name,
        schedule_start,
        schedule_end,
        ARRAY_AGG(group_id) AS group_ids
      FROM completed_tests_summary, UNNEST(group_id_array) group_id
      -- WHERE control_group_id <> test_group_id
      GROUP BY ALL
    )

    SELECT 
      experiment_id,
      experiment_name,
      schedule_start,
      schedule_end,
      group_id
    FROM tmp, UNNEST(group_ids) group_id
  )


  SELECT
    experiments.experiment_id,
    experiments.experiment_name,
    experiments.schedule_start,
    experiments.schedule_end,
    exp_group_id,
    SUM(count_imp) AS impressions,
    SUM(count_install) AS installs,
    SUM(total_moloco_spent) AS spend,
    SUM(total_revenue_kpi_d7) AS d7_revenue,
    -- Derived metrics
    SAFE_DIVIDE(SUM(count_install), SUM(count_imp)) AS cvr,
    SAFE_DIVIDE(SUM(total_moloco_spent), SUM(count_install)) AS cpi,
    SAFE_DIVIDE(SUM(total_revenue_kpi_d7), SUM(total_moloco_spent)) AS d7_roas
  FROM `explab-298609.summary_view.experiment_summary` summary
    JOIN experiments 
    ON summary.exp_group_id = experiments.group_id
  WHERE
    utc_date BETWEEN start_date AND end_date   -- required partition filter
    -- AND exp_group_id IN (15180, 15181)        -- your control & test groups
  GROUP BY ALL
  ORDER BY
    exp_group_id;

"""

In [6]:
df = process_query(query)
df

Unnamed: 0,experiment_id,experiment_name,schedule_start,schedule_end,exp_group_id,impressions,installs,spend,d7_revenue,cvr,cpi,d7_roas
0,1521236603-max_roas_under_cpi_constraint_beta-...,CPI balancer TT IOS T1,2025-09-10 00:00:00+00:00,2026-01-25 06:35:40+00:00,12485,,3,,,,,
1,1521236603-max_roas_under_cpi_constraint_beta-...,CPI balancer TT IOS T1,2025-09-10 00:00:00+00:00,2026-01-25 06:35:40+00:00,12486,,4,,,,,
2,1215933788-max_roas_under_cpi_constraint_beta-...,(ODSB-14105)SCOPELY_SqNaY3vftWzQ3nfp_CPI_Test_...,2025-10-28 00:00:00+00:00,2026-01-25 00:00:00+00:00,12789,3694,7,42.108162,7.077924,0.001895,6.015452,0.168089
3,1215933788-max_roas_under_cpi_constraint_beta-...,(ODSB-14105)SCOPELY_SqNaY3vftWzQ3nfp_CPI_Test_...,2025-10-28 00:00:00+00:00,2026-01-25 00:00:00+00:00,12790,119948,240,740.889292,79.766346,0.002001,3.087039,0.107663
4,d28_cpa_beta_test_design1_InDrive_NRGez0wvz5x4...,D28 CPA Beta Test InDrive sinet.startup.inDriver,2025-10-07 16:00:00+00:00,2026-01-26 15:00:00+00:00,13115,9013740,4518,811.839535,118.427138,0.000501,0.179690,0.145875
...,...,...,...,...,...,...,...,...,...,...,...,...
146,CREATIVE_TEST-N6VYzs3RN6uNL0eh-ADGROUP-EX3cdXb...,AB Test,2026-01-22 18:00:46+00:00,2026-01-23 18:01:00+00:00,4187195828,2140,21,10.030592,,0.009813,0.477647,
147,CREATIVE_TEST-mvEpBGMlh8ZCUA6E-ADGROUP-kjPHGxg...,AB Test,2026-01-22 18:00:29+00:00,2026-01-23 18:01:00+00:00,4210813160,6171,102,10.289496,,0.016529,0.100877,
148,CREATIVE_TEST-MnCh7PV9NllJoh87-ADGROUP-Yw3JeMX...,AB Test,2026-01-22 18:00:25+00:00,2026-01-23 18:01:00+00:00,4213115990,2248,39,9.615488,,0.017349,0.246551,
149,CREATIVE_TEST-MnCh7PV9NllJoh87-ADGROUP-Yw3JeMX...,AB Test,2026-01-22 18:00:25+00:00,2026-01-23 18:01:00+00:00,4263331937,2322,57,9.469702,,0.024548,0.166135,


In [11]:
query_statistics = """
    DECLARE start_date DATE DEFAULT '2025-09-10';
    DECLARE end_date DATE DEFAULT '2026-01-25';
    DECLARE _ctrl_id INT64 DEFAULT 12485;   -- control group_id
    DECLARE _test_id INT64 DEFAULT 12486;   -- test group_id
    -- Adjust these if needed
    DECLARE _campaign_goal   STRING DEFAULT 'ALL';   -- or 'OPTIMIZE_ROAS_FOR_APP_UA', etc.
    DECLARE _traffic_type    STRING DEFAULT 'ALL';   -- 'ALL' / 'IDFA' / 'LAT'
    DECLARE _os              STRING DEFAULT 'ALL';   -- 'ALL' / 'IOS' / 'ANDROID' / 'CTV'
    DECLARE _aggregation_ent STRING DEFAULT 'ALL';   -- 'ALL','PLATFORM','ADVERTISER','CAMPAIGN','BUNDLE';
    DECLARE _filtering_unit  STRING DEFAULT NULL;    -- NULL or 'PLATFORM'/'ADVERTISER'/'CAMPAIGN'
    DECLARE _whitelist       STRING DEFAULT NULL;    -- comma‑sep list, e.g. 'adv1,adv2'
    DECLARE _blacklist       STRING DEFAULT NULL;    -- optional blacklist



    SELECT
        metric_type,                 -- 'CPD_INSTALL' / 'CPD_DIST_ACT_D7' / 'cROAS_D7'
        entity_id,
        stat_result.camp_count,
        stat_result.estimate,
        stat_result.log_transform_std_err,
        stat_result.ci_lb,
        stat_result.ci_ub
    FROM `explab-298609.summary_supplement.pred_cpd_multi_metrics`(
        start_date,
        end_date,
        FALSE,                 -- is_global_view (TRUE for global_conduct)
        _ctrl_id,
        _test_id,
        _campaign_goal,
        _traffic_type,
        _os,
        _aggregation_ent,
        _filtering_unit,
        _whitelist,
        _blacklist,
        20                     -- total_bin
    )
    WHERE metric_type IN ('CPD_INSTALL', 'CPD_DIST_ACT_D7', 'cROAS_D7');

"""

In [12]:
df_statistics = process_query(query_statistics)
df_statistics

Unnamed: 0,metric_type,entity_id,camp_count,estimate,log_transform_std_err,ci_lb,ci_ub
0,CPD_INSTALL,ALL,1,1.3068,0.01902,1.259,1.3564
1,CPD_DIST_ACT_D7,ALL,1,0.8819,0.13093,0.6823,1.14
2,cROAS_D7,ALL,1,1.0566,0.25997,0.6348,1.7587


In [None]:
%%bigquery --project moloco-ods 

-- =================================================================================
-- Unified Query to Automatically Fetch Statistics for Multiple Experiments (Corrected)
-- =================================================================================
--
-- This query first discovers completed experiments by a given author and then
-- dynamically calls the statistics function for each control/test group pair.
--
-- =================================================================================

-- Configuration: Set the author and the lookback period for finding experiments.
DECLARE _authors ARRAY<STRING> DEFAULT ['haewon.yum@moloco.com'];
DECLARE _lookback_days INT64 DEFAULT 7;

-- This is the main logic that finds experiments and their groups.
WITH completed_experiments AS (
  SELECT
    exp.experiment_id,
    exp.experiment_name,
    -- Use the actual start/end dates from the experiment schedule for the function call
    DATE(exp.schedule_start) AS schedule_start,
    DATE(exp.schedule_end)   AS schedule_end,
    ctrl.group_id            AS control_group_id,
    test.group_id            AS test_group_id
  FROM (
    -- Inner subquery to find unique experiments and their properties
    SELECT DISTINCT
      d.id AS experiment_id,
      d.name AS experiment_name,
      d.schedule.start AS schedule_start,
      d.schedule.end AS schedule_end
    FROM `explab-298609.exp_prod.experiment_digest_v2` AS d,
    UNNEST(d.authors) AS author
    WHERE
      DATE(d.timestamp) BETWEEN DATE_SUB(CURRENT_DATE(), INTERVAL _lookback_days DAY) AND CURRENT_DATE()
      AND author IN UNNEST(_authors)
      AND d.status LIKE '%COMPLETED%'
  ) AS exp
  -- Join to find the control group for the experiment
  JOIN `explab-298609.exp_prod.experiment_digest_v2` AS ctrl_digest
    ON exp.experiment_id = ctrl_digest.id
  CROSS JOIN UNNEST(ctrl_digest.groups) AS ctrl
  -- Join to find the test group(s) for the experiment
  JOIN `explab-298609.exp_prod.experiment_digest_v2` AS test_digest
    ON exp.experiment_id = test_digest.id
  CROSS JOIN UNNEST(test_digest.groups) AS test
  WHERE
    -- A group is a control if its control_group_id is 0
    ctrl.control_group_id = 0
    -- A group is a test if its control_group_id points to the actual control group's ID
    AND test.control_group_id = ctrl.group_id
  QUALIFY
    -- Ensure we only have the latest configuration for each group
    ROW_NUMBER() OVER (PARTITION BY test.group_id ORDER BY test_digest.timestamp DESC) = 1
)

-- Final SELECT: Iterate over each experiment and call the statistics function
SELECT
  -- Experiment identifiers
  ex.experiment_id,
  ex.experiment_name,
  ex.control_group_id,
  ex.test_group_id,

  -- Results from the function call
  stats.metric_type,
  stats.entity_id,
  stats.stat_result.camp_count,
  stats.stat_result.estimate,
  stats.stat_result.log_transform_std_err,
  stats.stat_result.ci_lb,
  stats.stat_result.ci_ub

FROM completed_experiments AS ex,
-- Use a LATERAL JOIN (comma) to call the function for each row from 'ex'
LATERAL JOIN `explab-298609.summary_supplement.pred_cpd_multi_metrics`(
    ex.schedule_start,
    ex.schedule_end,
    FALSE,                 -- is_global_view
    ex.control_group_id,
    ex.test_group_id,
    'ALL',                 -- _campaign_goal
    'ALL',                 -- _traffic_type
    'ALL',                 -- _os
    'ALL',                 -- _aggregation_ent
    NULL,                  -- _filtering_unit
    NULL,                  -- _whitelist
    NULL,                  -- _blacklist
    20                     -- total_bin
) AS stats
WHERE stats.metric_type IN ('CPD_INSTALL', 'CPD_DIST_ACT_D7', 'cROAS_D7')
ORDER BY
  ex.experiment_id,
  ex.test_group_id,
  stats.metric_type;

Executing query with job ID: bc147a7c-606c-44a5-95b6-f72a5ea11db0
Query executing: 0.42s


ERROR:
 400 Syntax error: Unexpected keyword JOIN at [75:9]; reason: invalidQuery, location: query, message: Syntax error: Unexpected keyword JOIN at [75:9]

Location: US
Job ID: bc147a7c-606c-44a5-95b6-f72a5ea11db0

