# Funnel builder (Dynamic)
This funnel builder should work for any bigquery table with any parameter and event names, if you noticed any problem, contact DE in DQ.

# **Chapter 1**: General set up

In [38]:
# Please enter the path your source table is located
SOURCE_PROJECT = 'marketing-stickerapp'
SOURCE_DATASET = 'analytics_266718104'  #analytics_382372468, analytics_320615837, analytics_305832606, analytics_382461651
SOURCE_TABLE_PREFIX = 'events_'

# Please enter the path you want the working table and result should be sent to:
WORKING_DATASET = 'maximus'      # A new dataset with this name will be created and all data sent to it
DEST_PROJECT = 'conversionista-se'  # The working dataset will be created to this project
DEST_DATASET = WORKING_DATASET
DEST_TABLE = 'stickerapp_analysis'      # The destination table will be created with this name


In [39]:
# Please enter the path your source table is located
SOURCE_PROJECT = 'ga-data-242308'
SOURCE_DATASET = 'analytics_305832606'  #analytics_382372468, analytics_320615837, analytics_305832606, analytics_382461651
SOURCE_TABLE_PREFIX = 'events_'

# Please enter the path you want the working table and result should be sent to:
WORKING_DATASET = 'Siwens_NoN1'      # A new dataset with this name will be created and all data sent to it
DEST_PROJECT = 'conversionista-se'  # The working dataset will be created to this project
DEST_DATASET = WORKING_DATASET
DEST_TABLE = 'Extracted_table'      # The destination table will be created with this name
FUNNEL_TYPE: str = 'Dynamic'        # Dashboard

##1.1 Global variable set up

In [40]:
from google.cloud import bigquery
from datetime import datetime, timedelta
import ipywidgets as widgets
from IPython.display import display, clear_output
import pandas as pd

# Initialize BigQuery client
client = bigquery.Client()

# Intermediate variables
SELECTED_START_DATE: datetime | None = None
SELECTED_END_DATE: datetime | None = None
SELECTED_DATES: list[datetime] = []
SELECTED_DIMENSIONS: list = []
SELECTED_FILTERS: dict = {}
SELECTED_EVENTS: dict = {}
CHOSEN_PARAMETER: str | None = None
CHOSEN_PARAMETER_NAME: str | None = None
PROJECT: str = DEST_PROJECT


# Updated BigQuery cost per TiB
COST_PER_TIB: float = 6.25

In [41]:
from google.cloud import bigquery
from google.api_core.exceptions import NotFound

client = bigquery.Client()

def create_or_check_working_dataset(project_id: str, dataset_id: str, location: str = "EU") -> str:
    """
    Create a working dataset if it doesn't exist, or check its location if it does.
    Returns the location of the dataset.
    """
    dataset_ref = bigquery.DatasetReference(project_id, dataset_id)

    try:
        try:
            dataset = client.get_dataset(dataset_ref)
            print(f"Dataset {project_id}.{dataset_id} already exists in location {dataset.location}")
            return dataset.location
        except NotFound:
            dataset = bigquery.Dataset(dataset_ref)
            dataset.location = location
            dataset = client.create_dataset(dataset)
            print(f"Dataset {project_id}.{dataset_id} created in location {dataset.location}")
            return dataset.location
    except Exception as e:
        print(f"An error occurred while creating or checking the dataset: {str(e)}")
        return None

def get_dataset_location(project_id: str, dataset_id: str) -> str:
    dataset_ref = bigquery.DatasetReference(project_id, dataset_id)
    try:
        dataset = client.get_dataset(dataset_ref)
        return dataset.location
    except NotFound:
        print(f"Dataset {project_id}.{dataset_id} not found.")
        return None
    except Exception as e:
        print(f"Error getting dataset location: {str(e)}")
        return None

# First, ensure the destination dataset exists and get its location
DEST_DATASET_LOCATION = create_or_check_working_dataset(DEST_PROJECT, DEST_DATASET,location="EU")
if DEST_DATASET_LOCATION is None:
    raise Exception(f"Unable to create or determine location for destination dataset {DEST_PROJECT}.{DEST_DATASET}")


# Then, get the source dataset location
SOURCE_DATASET_LOCATION = get_dataset_location(SOURCE_PROJECT, SOURCE_DATASET)
if SOURCE_DATASET_LOCATION is None:
    raise Exception(f"Unable to determine location for source dataset {SOURCE_PROJECT}.{SOURCE_DATASET}")



Dataset conversionista-se.Siwens_NoN1 already exists in location EU


# **Chapter 2**: User choices


##2.1 Fetch the dates from source
Get the dates from the source table (partitioned table), save it into a list, let the user be able to choose from the starting date and ending date from the list, save into another list for later fetching query.

In [42]:
from datetime import timedelta
from typing import List, Callable
from google.cloud import bigquery
import ipywidgets as widgets
from IPython.display import display, clear_output

def get_available_dates() -> List[datetime.date]:
    """Fetch available dates from the BigQuery table."""
    query = f"""
    SELECT DISTINCT
      PARSE_DATE('%Y%m%d', SUBSTR(table_name, LENGTH('{SOURCE_TABLE_PREFIX}') + 1)) as date
    FROM
      `{SOURCE_PROJECT}.{SOURCE_DATASET}.INFORMATION_SCHEMA.TABLES`
    WHERE
      table_name LIKE '{SOURCE_TABLE_PREFIX}________'
      AND table_name NOT LIKE '{SOURCE_TABLE_PREFIX}fresh_%'
    ORDER BY date
    """
    query_job = client.query(query)
    results = query_job.result()
    return [row.date for row in results]


def generate_date_filter() -> str:
    """Generate date filter string for BigQuery."""
    if SELECTED_START_DATE is None or SELECTED_END_DATE is None:
        raise ValueError("Date range has not been selected. Please run select_date_range() first.")

    return f"_TABLE_SUFFIX BETWEEN '{SELECTED_START_DATE.strftime('%Y%m%d')}' AND '{SELECTED_END_DATE.strftime('%Y%m%d')}'"

def estimate_query_cost(query: str) -> float:
    """Estimate the cost of a BigQuery query."""
    job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
    query_job = client.query(query, job_config=job_config)
    return query_job.total_bytes_processed / 1e12 * COST_PER_TIB

In [43]:
def select_date_range(callback):
    global SELECTED_START_DATE, SELECTED_END_DATE, SELECTED_DATES

    available_dates = get_available_dates()
    start_date = widgets.DatePicker(description='Start Date')
    end_date = widgets.DatePicker(description='End Date')

    # Set initial values
    latest_date = max(available_dates)
    initial_start_date = max(min(available_dates), latest_date - timedelta(days=3))

    start_date.value = initial_start_date
    end_date.value = latest_date

    confirm_button = widgets.Button(description="Confirm")
    output = widgets.Output()

    def update_end_date(change):
        new_end_date = min(change['new'] + timedelta(days=3), max(available_dates))
        end_date.value = new_end_date

    start_date.observe(update_end_date, names='value')

    def on_button_clicked(b):
        global SELECTED_START_DATE, SELECTED_END_DATE, SELECTED_DATES
        with output:
            clear_output()
            if start_date.value > end_date.value:
                print("Error: Start date cannot be after end date.")
            else:
                SELECTED_START_DATE = start_date.value
                SELECTED_END_DATE = end_date.value
                SELECTED_DATES = [date for date in available_dates if SELECTED_START_DATE <= date <= SELECTED_END_DATE]
                days_chosen = len(SELECTED_DATES)

                if days_chosen == 0:
                    print(f"Error: No available dates found in the selected range ({SELECTED_START_DATE} to {SELECTED_END_DATE}).")
                else:
                    print(f"Selected date range: {SELECTED_START_DATE} to {SELECTED_END_DATE}")
                    print(f"Number of days with available data: {days_chosen}")
                    print(f"First available date: {min(SELECTED_DATES)}")
                    print(f"Last available date: {max(SELECTED_DATES)}")
                    print("Processing... Please wait. This may take a while for large date ranges.")
                    callback()

    confirm_button.on_click(on_button_clicked)

    display(widgets.VBox([start_date, end_date, confirm_button, output]))

##2.2 Fetch the event names
Get the list of all event_names, check how many rows for each name, make it a list for user to choose, and thereafter check if the order is good or not for funnel building.

In [44]:
from ipywidgets import Button, Output, VBox, HBox, IntText, Dropdown, Text
from google.cloud import bigquery
import pandas as pd
from typing import List, Dict, Tuple

def get_event_data() -> pd.DataFrame:
    """Fetch event data from BigQuery."""
    print("Fetching event data... This may take a while for large date ranges.")
    date_filter = generate_date_filter()
    query = f"""
    SELECT event_name, COUNT(*) as count
    FROM `{SOURCE_PROJECT}.{SOURCE_DATASET}.{SOURCE_TABLE_PREFIX}*`
    WHERE {date_filter}
    GROUP BY event_name
    ORDER BY count DESC
    """
    job_config = bigquery.QueryJobConfig(use_query_cache=True)
    job = client.query(query, job_config=job_config, location=SOURCE_DATASET_LOCATION)
    return job.to_dataframe()

def get_event_names(levels: int, df: pd.DataFrame) -> None:
    """Process event names and counts, then build the funnel."""
    event_names = [f"{row['event_name']} ({row['count']})" for _, row in df.iterrows()]
    event_counts = dict(zip(df['event_name'], df['count']))
    funnel_dict: Dict[str, str] = {}
    build_funnel(event_names, funnel_dict, levels, 1, event_counts)

def check_funnel_order(funnel_dict: Dict[str, str], event_counts: Dict[str, int]) -> List[str]:
    """Check the order of events in the funnel and return warnings if any."""
    warnings = []
    events = list(funnel_dict.keys())
    for i in range(1, len(events)):
        if event_counts[events[i]] > event_counts[events[i-1]]:
            warnings.append(f"The count of level {i+1} ({events[i]}) is higher than the count of level {i} ({events[i-1]})")
    return warnings

def suggest_field_name(field_name: str) -> str:
    """Suggest a formatted field name."""
    return ' '.join(word.capitalize() for word in field_name.replace('.', '_').split('_'))

def create_working_dataset() -> None:
    """Create a working dataset in BigQuery."""
    dataset_id = f"{PROJECT}.{WORKING_DATASET}"
    dataset = bigquery.Dataset(dataset_id)
    dataset.location = DEST_DATASET_LOCATION  # Use the fetched location

    try:
        dataset = client.create_dataset(dataset, exists_ok=True)
        print(f"Dataset {dataset_id} created or already exists in {DEST_DATASET_LOCATION}.")
    except Exception as e:
        raise Exception(f"An error occurred while creating the dataset: {str(e)}")



In [45]:
def get_funnel_levels(callback: Callable) -> None:
    """
    Display interface for selecting funnel levels and initiate funnel building.

    Args:
        callback (Callable): Function to call after funnel levels are selected.
    """
    df = get_event_data()
    print("\n")

    level_input = IntText(
        value=5,
        description='Funnel Levels:',
        disabled=False,
        min=1,
        max=len(df)
    )
    confirm_button = Button(description="Confirm")
    output = Output()

    def on_button_clicked(b):
        with output:
            clear_output()
            levels = level_input.value
            if 0 < levels <= len(df):
                print(f"\nFunnel Levels: {levels}")
                print("Please select the data source for each level:")
                event_names = [f"{row['event_name']} ({row['count']})" for _, row in df.iterrows()]
                event_counts = dict(zip(df['event_name'], df['count']))
                build_funnel(event_names, {}, levels, 1, event_counts, callback)
            else:
                print(f"Please enter a positive number not exceeding {len(df)}.")

    confirm_button.on_click(on_button_clicked)

    display(VBox([
        HBox([level_input, confirm_button]),
        output
    ]))

In [46]:
from typing import Dict, Callable

def display_final_funnel(funnel_dict: Dict[str, str], event_counts: Dict[str, int], callback: Callable) -> None:
    """
    Display the final funnel and handle warnings if any.

    Args:
        funnel_dict (Dict[str, str]): Dictionary of events and their names in the funnel.
        event_counts (Dict[str, int]): Dictionary of event counts.
        callback (Callable): Function to call after funnel is confirmed.
    """
    global SELECTED_EVENTS
    SELECTED_EVENTS = funnel_dict.copy()

    print("Final Funnel:")
    print("{:<10} {:<30} {:<30} {:<15}".format("Level", "Name", "Event", "Count"))
    print("-" * 85)
    for i, (event, name) in enumerate(funnel_dict.items(), 1):
        count = event_counts[event]
        print("{:<10} {:<30} {:<30} {:>15,}".format(i, name, event, count))

    warnings = check_funnel_order(funnel_dict, event_counts)
    if warnings:
        print("\nWarnings:")
        for warning in warnings:
            print(f"- {warning}")
        print("These issues might make the funnel less trustworthy.")

        proceed_button = widgets.Button(description="Proceed anyway")
        cancel_button = widgets.Button(description="Cancel")
        output = widgets.Output()

        def on_proceed(b):
            with output:
                clear_output()
                print("Proceeding with funnel creation despite warnings...")
                print("Events selected. Please proceed to select a parameter for extraction.")
            proceed_button.close()
            cancel_button.close()
            callback()  # This will call start_field_selection_for_extraction

        def on_cancel(b):
            with output:
                clear_output()
                print("Funnel building cancelled.")
            proceed_button.close()
            cancel_button.close()

        proceed_button.on_click(on_proceed)
        cancel_button.on_click(on_cancel)

        display(widgets.VBox([
            widgets.HBox([proceed_button, cancel_button]),
            output
        ]))
    else:
        print("Events selected. Please proceed to select a parameter for extraction.")
        callback()  # This will call start_field_selection_for_extraction

##2.3 Fetch the parameters
Get top level of field name for user to choose, if chosen on a nestled field, will check what type that nestle it is, and thereafter un-nest depending on the specific type.

In [47]:
from google.cloud import bigquery
import pandas as pd
from typing import List, Tuple, Dict, Optional

def identify_nested_type(field: bigquery.SchemaField) -> str:
    """Identify the nested type of a BigQuery schema field."""
    if field.field_type == 'RECORD':
        if field.mode == 'REPEATED':
            return 'TYPE2' if any(f.name == 'key' for f in field.fields) else 'TYPE3'
        elif field.mode == 'NULLABLE':
            return 'TYPE1'
    return 'NOT_NESTED'

def get_type2_keys(field_name: str) -> pd.Series:
    """Get keys for TYPE2 nested fields."""
    if not SELECTED_DATES:
        raise ValueError("No dates have been selected. Please run select_date_range() first.")

    first_available_date = min(SELECTED_DATES)
    query = f"""
    SELECT DISTINCT {field_name}.key as param_name
    FROM `{SOURCE_PROJECT}.{SOURCE_DATASET}.{SOURCE_TABLE_PREFIX}{first_available_date.strftime('%Y%m%d')}`,
    UNNEST({field_name}) AS {field_name}
    WHERE {field_name}.key IS NOT NULL
    ORDER BY param_name
    LIMIT 1000
    """
    query_job = client.query(query)
    results = query_job.result()
    keys = [row['param_name'] for row in results]
    return pd.Series(keys, name='param_name')

def analyze_source_table_schema() -> Tuple[Optional[List[bigquery.SchemaField]], Optional[Dict]]:
    """Analyze the schema of the source table."""
    if not SELECTED_DATES:
        raise ValueError("No dates have been selected. Please run select_date_range() first.")

    first_available_date = min(SELECTED_DATES)
    table_ref = f"{SOURCE_PROJECT}.{SOURCE_DATASET}.{SOURCE_TABLE_PREFIX}{first_available_date.strftime('%Y%m%d')}"

    try:
        table = client.get_table(table_ref)
        schema = table.schema
    except Exception as e:
        print(f"Error accessing table: {e}")
        return None, None

    nested_fields = {}
    for field in schema:
        nested_type = identify_nested_type(field)
        if nested_type != 'NOT_NESTED':
            field_info = {
                'type': nested_type,
                'subfields': [subfield.name for subfield in field.fields] if field.fields else []
            }
            if nested_type == 'TYPE2':
                try:
                    field_info['keys'] = get_type2_keys(field.name)
                except Exception as e:
                    print(f"Error getting keys for {field.name}: {str(e)}")
                    field_info['keys'] = pd.Series([], name='key')
            nested_fields[field.name] = field_info

    return schema, nested_fields

def get_value_types_for_key(field_name: str, key: str) -> List[str]:
    """Get value types for a specific key in a TYPE2 nested field."""
    if not SELECTED_DATES:
        raise ValueError("No dates have been selected. Please run select_date_range() first.")

    first_available_date = min(SELECTED_DATES)
    table_suffix = first_available_date.strftime('%Y%m%d')

    query = f"""
    SELECT
        '{key}' as key,
        COUNTIF(value.string_value IS NOT NULL) > 0 AS has_string,
        COUNTIF(value.int_value IS NOT NULL) > 0 AS has_int,
        COUNTIF(value.float_value IS NOT NULL) > 0 AS has_float,
        COUNTIF(value.double_value IS NOT NULL) > 0 AS has_double
    FROM `{SOURCE_PROJECT}.{SOURCE_DATASET}.{SOURCE_TABLE_PREFIX}{table_suffix}`,
    UNNEST({field_name}) AS params
    WHERE params.key = '{key}'
    """
    query_job = client.query(query)
    result = list(query_job.result())[0]

    value_types = []
    if result['has_string']:
        value_types.append('string_value')
    if result['has_int']:
        value_types.append('int_value')
    if result['has_float']:
        value_types.append('float_value')
    if result['has_double']:
        value_types.append('double_value')

    return value_types

###2.3.1 Type1 nestled field treatment

In [48]:
def display_type1_options(parent_field: str, subfields: List[str], nested_fields: Dict, callback: Callable) -> None:
    """
    Display options for TYPE1 nested fields and handle user selection.

    Args:
        parent_field (str): The name of the parent field.
        subfields (List[str]): List of subfields to choose from.
        nested_fields (Dict): Dictionary containing nested field information.
        callback (Callable): Function to call after field selection.
    """
    options = subfields
    sub_schema = [bigquery.SchemaField(name, 'STRING') for name in options]

    dropdown = widgets.Dropdown(
        options=options,
        description='Subfield:',
        style={'description_width': 'initial'}
    )

    suggested_name = widgets.Text(
        value=suggest_field_name(options[0]),
        description='Field name:',
        style={'description_width': 'initial'}
    )

    confirm_button = widgets.Button(description="Confirm")
    output = widgets.Output()

    def on_field_select(change):
        selected = change['new']
        suggested_name.value = suggest_field_name(selected)

    dropdown.observe(on_field_select, names='value')

    def on_confirm_field_selection(b):
        with output:
            clear_output()
            selected = dropdown.value
            field_name = suggested_name.value
            print(f"Selected: {parent_field}.{selected}")
            print(f"Field name: {field_name}")

            final_selection = f"{parent_field}.{selected}"
            generate_extraction_query(final_selection, field_name, callback)

    confirm_button.on_click(on_confirm_field_selection)

    display(widgets.VBox([dropdown, suggested_name, confirm_button, output]))

###2.3.2 Type2 nestled field treatment

In [49]:
def display_type2_options(parent_field: str, keys: pd.Series, callback: Callable) -> None:
    """
    Display options for TYPE2 nested fields and handle user selection.

    Args:
        parent_field (str): The name of the parent field.
        keys (pd.Series): Series of keys to choose from.
        callback (Callable): Function to call after field selection.
    """
    if keys.empty:
        print(f"No keys found for {parent_field}. Please select another field.")
        return

    key_dropdown = widgets.Dropdown(
        options=keys.tolist(),
        description='Select key:',
        style={'description_width': 'initial'}
    )

    value_type_dropdown = widgets.Dropdown(
        options=['string_value', 'int_value', 'float_value', 'double_value'],
        description='Value type:',
        style={'description_width': 'initial'}
    )

    suggested_name = widgets.Text(
        value=suggest_field_name(keys.iloc[0]),
        description='Field name:',
        style={'description_width': 'initial'}
    )

    confirm_button = widgets.Button(description="Confirm")
    output = widgets.Output()

    def on_key_select(change):
        selected_key = change['new']
        try:
            value_types = get_value_types_for_key(parent_field, selected_key)
            if len(value_types) == 1:
                value_type_dropdown.options = value_types
                value_type_dropdown.value = value_types[0]
                value_type_dropdown.disabled = True
            elif len(value_types) > 1:
                value_type_dropdown.options = value_types
                value_type_dropdown.disabled = False
            else:
                value_type_dropdown.options = ['No data available']
                value_type_dropdown.disabled = True
        except Exception as e:
            print(f"Error getting value types: {str(e)}")
            value_type_dropdown.options = ['Error occurred']
            value_type_dropdown.disabled = True

        suggested_name.value = suggest_field_name(selected_key)

        with output:
            clear_output()
            print(f"Selected key: {selected_key}")
            print(f"Available value types: {', '.join(value_types)}")

    key_dropdown.observe(on_key_select, names='value')

    def on_confirm(b):
        with output:
            clear_output()
            selected_key = key_dropdown.value
            selected_type = value_type_dropdown.value
            field_name = suggested_name.value
            if selected_type == 'No data available':
                print("No data available for this key. Please select another key.")
            else:
                final_selection = f"{parent_field}.{selected_key}.{selected_type}"
                print(f"Final selection: {final_selection}")
                print(f"Field name: {field_name}")
                callback(final_selection, field_name)

    confirm_button.on_click(on_confirm)

    display(widgets.VBox([key_dropdown, value_type_dropdown, suggested_name, confirm_button, output]))

###2.3.3 Type3 nestled field treatment

In [50]:
import ipywidgets as widgets
from IPython.display import display, clear_output
import pandas as pd
from typing import List, Tuple

def select_keys(field: str, keys: pd.Series) -> List[str]:
    """
    Display a multi-select widget for choosing keys and return the selected keys.

    Args:
        field (str): The name of the field.
        keys (pd.Series): Series of keys to choose from.

    Returns:
        List[str]: List of selected keys.
    """
    key_selector = widgets.SelectMultiple(options=keys.tolist(), description='Select keys:')
    confirm_button = widgets.Button(description="Confirm")
    output = widgets.Output()

    def on_confirm(b):
        with output:
            clear_output()
            selected = list(key_selector.value)
            print(f"Selected keys for {field}: {', '.join(selected)}")
            confirm_button.close()

    confirm_button.on_click(on_confirm)

    display(widgets.VBox([key_selector, confirm_button, output]))

    wait_for_event(confirm_button)
    return list(key_selector.value)

def select_subfield_filters(field: str, subfields: List[str]) -> List[str]:
    """
    Display widgets for selecting subfield filters and return the selected filters.

    Args:
        field (str): The name of the field.
        subfields (List[str]): List of subfields to choose from.

    Returns:
        List[str]: List of filter strings.
    """
    subfield_selector = widgets.Dropdown(options=subfields, description='Select subfield:')
    value_input = widgets.Text(description='Enter value:')
    add_button = widgets.Button(description="Add Filter")
    done_button = widgets.Button(description="Done")
    output = widgets.Output()
    filters: List[Tuple[str, str]] = []

    def on_add(b):
        with output:
            clear_output()
            subfield = subfield_selector.value
            value = value_input.value
            if value:
                filters.append((subfield, value))
                print(f"Added filter: {field}.{subfield} = {value}")
                value_input.value = ''
            else:
                print("Please enter a value.")
            display_current_filters()

    def on_done(b):
        with output:
            clear_output()
            print(f"Finished adding filters for {field}")
            done_button.close()

    def display_current_filters():
        print("Current filters:")
        for subfield, value in filters:
            print(f"{field}.{subfield} = {value}")

    add_button.on_click(on_add)
    done_button.on_click(on_done)

    display(widgets.VBox([subfield_selector, value_input, add_button, done_button, output]))

    wait_for_event(done_button)
    return [f"{field}.{subfield} = '{value}'" for subfield, value in filters]

In [51]:
from google.cloud import bigquery
import ipywidgets as widgets
from IPython.display import display
import time
from typing import Callable, Dict, Optional, List

def select_filter_values(field: str, callback: Callable[[List[str]], None], field_info: Optional[Dict] = None) -> None:
    """
    Display a widget for selecting filter values and execute the callback with selected values.

    Args:
        field (str): The field to filter on.
        callback (Callable[[List[str]], None]): Function to call with selected values.
        field_info (Optional[Dict]): Information about the field if it's a nested field.
    """
    if field_info and field_info['type'] == 'TYPE2':
        # Handle Type 2 nested fields (like event_params)
        parent_field = field_info['parent_field']
        key = field_info['key']
        value_type = field_info['value_type']
        query = f"""
        SELECT DISTINCT
          (SELECT value.{value_type} FROM UNNEST({parent_field}) WHERE key = '{key}') AS value
        FROM `{SOURCE_PROJECT}.{SOURCE_DATASET}.{SOURCE_TABLE_PREFIX}*`
        WHERE _TABLE_SUFFIX BETWEEN '{SELECTED_START_DATE.strftime('%Y%m%d')}' AND '{SELECTED_END_DATE.strftime('%Y%m%d')}'
          AND event_name IN ({', '.join([f"'{event}'" for event in SELECTED_EVENTS.keys()])})
          AND (SELECT value.{value_type} FROM UNNEST({parent_field}) WHERE key = '{key}') IS NOT NULL
        ORDER BY value
        LIMIT 1000
        """
    else:
        # Original query for non-nested fields
        query = f"""
        SELECT DISTINCT {field} AS value
        FROM `{SOURCE_PROJECT}.{SOURCE_DATASET}.{SOURCE_TABLE_PREFIX}*`
        WHERE _TABLE_SUFFIX BETWEEN '{SELECTED_START_DATE.strftime('%Y%m%d')}' AND '{SELECTED_END_DATE.strftime('%Y%m%d')}'
          AND event_name IN ({', '.join([f"'{event}'" for event in SELECTED_EVENTS.keys()])})
          AND {field} IS NOT NULL
        ORDER BY {field}
        LIMIT 1000
        """

    query_job = client.query(query)
    results = query_job.result()
    distinct_values = [str(row.value) for row in results]

    value_selector = widgets.SelectMultiple(
        options=distinct_values,
        description='Select values (multi-select):',
        rows=10  # Adjust this value to show more or fewer options at once
    )
    confirm_button = widgets.Button(description="Confirm")

    def on_confirm(b):
        selected_values = list(value_selector.value)
        confirm_button.close()
        value_selector.close()
        callback(selected_values)

    confirm_button.on_click(on_confirm)
    display(widgets.VBox([widgets.Label("Hold Ctrl (Cmd on Mac) to select multiple values:"), value_selector, confirm_button]))

    # Wait for user to confirm
    wait_for_event(confirm_button)

def wait_for_event(widget: widgets.Widget, event: str = 'click') -> None:
    """
    Wait for an event to occur on a widget.

    Args:
        widget (widgets.Widget): The widget to observe.
        event (str): The event to wait for. Defaults to 'click'.
    """
    result = [None]
    def on_event(change):
        result[0] = change
    widget.observe(on_event, event)
    while result[0] is None:
        time.sleep(0.1)

In [52]:
import ipywidgets as widgets
from IPython.display import display, clear_output
from typing import List, Dict, Callable, Optional

def estimate_cost_and_confirm(
    selected_dimensions: List[Dict],
    selected_filters: Dict,
    callback: Callable[[], None]
) -> None:
    global CHOSEN_SAMPLE_PERCENTAGE, DEST_DATASET_LOCATION

    # Generate full query
    estimate_query = generate_combined_query(SELECTED_EVENTS, selected_dimensions, selected_filters, CHOSEN_SAMPLE_PERCENTAGE,for_estimation=True)

    # Estimate full query size and cost
    full_bytes = estimate_query_size(estimate_query)
    full_gb = full_bytes / (1024 * 1024 * 1024)
    full_cost = full_bytes / 1e12 * COST_PER_TIB

    # Create widgets
    sample_slider = widgets.FloatSlider(
        value=1.0, min=1.0, max=50.0, step=1.0,
        description='Sample:',
        disabled=False, continuous_update=True,
        orientation='horizontal', readout=True, readout_format='.0f',
        layout=widgets.Layout(width='300px')
    )

    percent_label = widgets.Label('%')
    sample_slider_with_percent = widgets.HBox([sample_slider, percent_label])

    interval_marks = {i: f'{i}' for i in range(5, 51, 5)}
    sample_slider.style.handle_color = 'lightblue'
    sample_slider.observe(lambda change: setattr(sample_slider, 'marks', interval_marks), names='value')

    sample_size_label = widgets.Label()
    sample_cost_label = widgets.Label()

    proceed_button = widgets.Button(description="Proceed with Full Query")
    sample_button = widgets.Button(description="Take Sample")
    cancel_button = widgets.Button(description="Cancel")
    output = widgets.Output()

    def update_sample_estimates(change):
        sample_percentage = change['new']
        sample_gb = full_gb * (sample_percentage / 100)
        sample_cost = full_cost * (sample_percentage / 100)
        sample_size_label.value = f"Estimated data to be processed (sample): {sample_gb:.2f} GB"
        sample_cost_label.value = f"Estimated cost (sample): ${sample_cost:.2f}"

    sample_slider.observe(update_sample_estimates, names='value')

    def on_proceed(b):
        with output:
            clear_output()
            print("Proceeding with full query execution...")
        CHOSEN_SAMPLE_PERCENTAGE = 100.0
        callback()

    def on_sample(b):
        global CHOSEN_SAMPLE_PERCENTAGE
        CHOSEN_SAMPLE_PERCENTAGE = int(sample_slider.value)
        with output:
            clear_output()
            print(f"Executing sample query with {CHOSEN_SAMPLE_PERCENTAGE}% sample...")
        callback()

    def on_cancel(b):
        with output:
            clear_output()
            print("Operation cancelled.")

    proceed_button.on_click(on_proceed)
    sample_button.on_click(on_sample)
    cancel_button.on_click(on_cancel)

    print(f"Estimated data to be processed (full): {full_gb:.2f} GB")
    print(f"Estimated cost (full): ${full_cost:.2f}")

    dimension_display = ", ".join([f"{dim['field']} ({dim['alias']})" for dim in selected_dimensions])

    display(widgets.VBox([
        widgets.Label("Current selections:"),
        widgets.Label(f"Dimensions: {dimension_display}"),
        widgets.Label(f"Filters: {', '.join(f'{k}: {v}' for k, v in selected_filters.items())}"),
        widgets.Label(f"Estimated data to be processed (full): {full_gb:.2f} GB"),
        widgets.Label(f"Estimated cost (full): ${full_cost:.2f}"),
        sample_slider_with_percent,
        sample_size_label,
        sample_cost_label,
        widgets.HBox([proceed_button, sample_button, cancel_button]),
        output
    ]))

    # Initial update of sample estimates
    update_sample_estimates({'new': sample_slider.value})

In [53]:
import ipywidgets as widgets
from IPython.display import display, clear_output
from typing import List, Dict, Callable
import pandas as pd

def add_non_nested_dimension(field: str) -> None:
    """Add a non-nested dimension to SELECTED_DIMENSIONS."""
    SELECTED_DIMENSIONS.append({
        'field': field,
        'nested_type': 'NOT_NESTED',
        'alias': field
    })

def select_type1_subfield(field: str, subfields: List[str], callback: Callable[[Dict], None]) -> None:
    """Display interface for selecting a TYPE1 subfield and handle user input."""
    subfield_dropdown = widgets.Dropdown(
        options=subfields,
        description='Select subfield:',
        style={'description_width': 'initial'}
    )
    alias_input = widgets.Text(
        description='Alias:',
        style={'description_width': 'initial'}
    )
    confirm_button = widgets.Button(description="Confirm")
    cancel_button = widgets.Button(description="Cancel")
    output = widgets.Output()

    def on_confirm(b):
        with output:
            clear_output()
            selected_subfield = subfield_dropdown.value
            alias = alias_input.value or f"{field}_{selected_subfield}"
            print(f"Selected subfield: {selected_subfield}, Alias: {alias}")
            callback({
                'field': f"{field}.{selected_subfield}",
                'nested_type': 'TYPE1',
                'parent_field': field,
                'subfield': selected_subfield,
                'alias': alias
            })

    def on_cancel(b):
        cancel_current_selection(output)
        subfield_dropdown.close()
        alias_input.close()
        confirm_button.close()
        cancel_button.close()

    confirm_button.on_click(on_confirm)
    cancel_button.on_click(on_cancel)
    display(widgets.VBox([subfield_dropdown, alias_input, widgets.HBox([confirm_button, cancel_button]), output]))

def select_type2_key(field: str, keys: pd.Series, callback: Callable[[Dict], None]) -> None:
    """Display interface for selecting a TYPE2 key and handle user input."""
    if keys.empty:
        print(f"No keys found for {field}. Please select another field.")
        return

    key_dropdown = widgets.Dropdown(
        options=keys.tolist(),
        description='Select key:',
        style={'description_width': 'initial'}
    )
    value_type_dropdown = widgets.Dropdown(
        description='Value type:',
        style={'description_width': 'initial'}
    )
    alias_input = widgets.Text(
        description='Alias:',
        style={'description_width': 'initial'}
    )
    confirm_button = widgets.Button(description="Confirm")
    output = widgets.Output()

    def suggest_alias(key: str) -> str:
        return ' '.join(word.capitalize() for word in key.replace('_', ' ').split())

    def update_value_types_and_alias(change):
        selected_key = change['new']
        value_types = get_value_types_for_key(field, selected_key)
        value_type_dropdown.options = value_types
        if value_types:
            value_type_dropdown.value = value_types[0]
        alias_input.value = suggest_alias(selected_key)

    key_dropdown.observe(update_value_types_and_alias, names='value')

    def on_confirm(b):
        with output:
            clear_output()
            selected_key = key_dropdown.value
            selected_value_type = value_type_dropdown.value
            alias = alias_input.value or suggest_alias(selected_key)
            print(f"Selected key: {selected_key}, Value type: {selected_value_type}, Alias: {alias}")
            confirm_button.close()
            key_dropdown.close()
            value_type_dropdown.close()
            alias_input.close()
            callback({
                'field': f"{field}.{selected_key}",
                'nested_type': 'TYPE2',
                'parent_field': field,
                'key': selected_key,
                'value_type': selected_value_type,
                'alias': alias
            })

    confirm_button.on_click(on_confirm)
    if not keys.empty:
        update_value_types_and_alias({'new': keys.iloc[0]})
    display(widgets.VBox([key_dropdown, value_type_dropdown, alias_input, confirm_button, output]))

def select_type3_subfield(field: str, subfields: List[str], callback: Callable[[Dict], None]) -> None:
    """Display interface for selecting a TYPE3 subfield and handle user input."""
    subfield_dropdown = widgets.Dropdown(
        options=subfields,
        description='Select subfield:',
        style={'description_width': 'initial'}
    )
    alias_input = widgets.Text(
        description='Alias:',
        style={'description_width': 'initial'}
    )
    confirm_button = widgets.Button(description="Confirm")
    output = widgets.Output()

    def on_confirm(b):
        with output:
            clear_output()
            selected_subfield = subfield_dropdown.value
            alias = alias_input.value or f"{field}_{selected_subfield}"
            print(f"Selected subfield: {selected_subfield}, Alias: {alias}")
            confirm_button.close()
            subfield_dropdown.close()
            alias_input.close()
            callback({
                'field': f"{field}.{selected_subfield}",
                'nested_type': 'TYPE3',
                'parent_field': field,
                'subfield': selected_subfield,
                'alias': alias
            })

    confirm_button.on_click(on_confirm)
    display(widgets.VBox([subfield_dropdown, alias_input, confirm_button, output]))

In [54]:
from google.cloud import bigquery
import ipywidgets as widgets
from IPython.display import display, clear_output
from typing import Dict, List, Callable, Any

def estimate_query_size(query: str) -> int:
    """Estimate the size of a BigQuery query in bytes."""
    job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
    query_job = client.query(query, job_config=job_config, location=SOURCE_DATASET_LOCATION)
    print(f"Query job details: ID={query_job.job_id}, Location={query_job.location}")
    return query_job.total_bytes_processed


def update_filter_list() -> str:
    """Generate an HTML string representing the current filters."""
    filter_text = "<b>Applied Filters:</b><br>"
    for field, value in SELECTED_FILTERS.items():
        if isinstance(value, dict):  # For nested fields
            for subfield, filter_values in value.items():
                filter_text += f"{field}.{subfield}: {', '.join(str(v) for v in filter_values)}<br>"
        else:  # For non-nested fields
            filter_text += f"{field}: {', '.join(str(v) for v in value)}<br>"
    return filter_text

def generate_where_clause() -> str:
    """Generate a WHERE clause based on the selected filters."""
    clauses = []
    for field, value in SELECTED_FILTERS.items():
        if isinstance(value, dict):  # For nested fields
            for subfield, filter_values in value.items():
                clauses.append(f"{field}.{subfield} IN ({', '.join(repr(v) for v in filter_values)})")
        else:  # For non-nested fields
            clauses.append(f"{field} IN ({', '.join(repr(v) for v in value)})")
    return " AND ".join(clauses)

def start_field_selection_for_extraction(callback: Callable[[], None]) -> None:
    """Start the process of selecting fields for extraction."""
    global SELECTED_DIMENSIONS, SELECTED_FILTERS
    SELECTED_DIMENSIONS = []
    SELECTED_FILTERS = {}

    schema, nested_fields = analyze_source_table_schema()
    all_fields = [field.name for field in schema if field.name not in ['event_date', 'user_pseudo_id', 'event_timestamp', 'event_name', 'ga_session_id', 'purchase_revenue']]

    field_selector = widgets.Dropdown(options=all_fields, description='Select field:')
    add_dimension_button = widgets.Button(description="Add Dimension")
    add_filter_button = widgets.Button(description="Add Filter")
    finish_button = widgets.Button(description="Finish Selection")
    cancel_button = widgets.Button(description="Cancel")
    output = widgets.Output()

    def cancel_current_selection():
        with output:
            clear_output()
            print("Selection cancelled.")
            display_current_selections()

    def select_subfield(field: str, subfields: List[str], callback: Callable[[str], None]) -> None:
        subfield_selector = widgets.Dropdown(options=subfields, description='Select subfield:')
        confirm_button = widgets.Button(description="Confirm")

        def on_confirm(b):
            selected_subfield = subfield_selector.value
            confirm_button.close()
            subfield_selector.close()
            callback(selected_subfield)

        confirm_button.on_click(on_confirm)
        display(widgets.VBox([subfield_selector, confirm_button]))

    def select_filter_values(field: str, callback: Callable[[List[str]], None], field_info: Dict[str, Any] = None) -> None:
        query = generate_filter_value_query(field, field_info)
        query_job = client.query(query)
        results = query_job.result()
        distinct_values = [str(row.value) for row in results]

        value_selector = widgets.SelectMultiple(
            options=distinct_values,
            description='Select values (multi-select):',
            rows=10
        )
        confirm_button = widgets.Button(description="Confirm")

        def on_confirm(b):
            selected_values = list(value_selector.value)
            confirm_button.close()
            value_selector.close()
            callback(selected_values)

        confirm_button.on_click(on_confirm)
        display(widgets.VBox([widgets.Label("Hold Ctrl (Cmd on Mac) to select multiple values:"), value_selector, confirm_button]))

    def on_add_dimension(b):
        field = field_selector.value
        if field in nested_fields:
            field_info = nested_fields[field]
            nested_type = field_info['type']

            if nested_type == 'TYPE1':
                select_type1_subfield(field, field_info['subfields'], lambda dim: SELECTED_DIMENSIONS.append(dim))
            elif nested_type == 'TYPE2':
                select_type2_key(field, field_info['keys'], lambda dim: SELECTED_DIMENSIONS.append(dim))
            elif nested_type == 'TYPE3':
                select_type3_subfield(field, field_info['subfields'], lambda dim: SELECTED_DIMENSIONS.append(dim))
            else:
                print(f"Unknown nested type for {field}. Please check the field structure.")
        else:
            add_non_nested_dimension(field)

    def on_add_filter(b):
        field = field_selector.value
        if field in nested_fields:
            field_info = nested_fields[field]
            nested_type = field_info['type']

            if nested_type == 'TYPE2':
                def on_key_selected(key):
                    value_types = get_value_types_for_key(field, key)
                    if value_types:
                        value_type = value_types[0]  # Use the first available value type
                        select_filter_values(
                            f"{field}.{key}",
                            lambda values: add_filter(f"{field}.{key}", values, {'nested_type': 'TYPE2', 'parent_field': field, 'key': key, 'value_type': value_type}),
                            {'type': 'TYPE2', 'parent_field': field, 'key': key, 'value_type': value_type}
                        )
                    else:
                        print(f"No value types found for {field}.{key}")

                select_type2_key_for_filter(field, field_info['keys'], on_key_selected)
            elif nested_type in ['TYPE1', 'TYPE3']:
                select_subfield(field, field_info['subfields'], lambda subfield: select_filter_values(f"{field}.{subfield}", lambda values: add_filter(f"{field}.{subfield}", values, {'nested_type': nested_type, 'field': f"{field}.{subfield}"})))
            else:
                print(f"Unknown nested type for {field}. Please check the field structure.")
        else:
            select_filter_values(field, lambda values: add_filter(field, values, {'nested_type': 'NOT_NESTED'}))

    def add_filter(field: str, values: List[str], field_info: Dict[str, Any]) -> None:
        global SELECTED_FILTERS
        if field_info['nested_type'] == 'TYPE2':
            parent = field_info['parent_field']
            key = field_info['key']
            if parent not in SELECTED_FILTERS:
                SELECTED_FILTERS[parent] = {}
            SELECTED_FILTERS[parent][key] = values
        elif field_info['nested_type'] in ['TYPE1', 'TYPE3']:
            SELECTED_FILTERS[field] = values
        else:
            SELECTED_FILTERS[field] = values

        with output:
            clear_output()
            print(f"Added filter: {field} = {values}")
            display_current_selections()

    def on_finish(b):
        with output:
            clear_output()
            display_current_selections()
            estimate_cost_and_confirm(SELECTED_DIMENSIONS, SELECTED_FILTERS, callback)

    def on_cancel(b):
        cancel_current_selection()

    def display_current_selections():
        print("Current selections:")
        print("Dimensions:")
        for dim in SELECTED_DIMENSIONS:
            print(f"  - Field: {dim['field']}, Type: {dim['nested_type']}, Alias: {dim['alias']}")
        print("Filters:")
        for field, values in SELECTED_FILTERS.items():
            if isinstance(values, dict):
                for subfield, subvalues in values.items():
                    print(f"  {field}.{subfield}: {', '.join(map(str, subvalues))}")
            else:
                print(f"  {field}: {', '.join(map(str, values))}")

    add_dimension_button.on_click(on_add_dimension)
    add_filter_button.on_click(on_add_filter)
    finish_button.on_click(on_finish)
    cancel_button.on_click(on_cancel)

    display(widgets.VBox([
        field_selector,
        widgets.HBox([add_dimension_button, add_filter_button, finish_button, cancel_button]),
        output
    ]))

def generate_filter_value_query(field: str, field_info: Dict[str, Any] = None) -> str:
    """Generate a query to fetch distinct values for a field."""
    if field_info and field_info['type'] == 'TYPE2':
        parent_field = field_info['parent_field']
        key = field_info['key']
        value_type = field_info['value_type']
        return f"""
        SELECT DISTINCT
          (SELECT value.{value_type} FROM UNNEST({parent_field}) WHERE key = '{key}') AS value
        FROM `{SOURCE_PROJECT}.{SOURCE_DATASET}.{SOURCE_TABLE_PREFIX}*`
        WHERE _TABLE_SUFFIX BETWEEN '{SELECTED_START_DATE.strftime('%Y%m%d')}' AND '{SELECTED_END_DATE.strftime('%Y%m%d')}'
          AND event_name IN ({', '.join([f"'{event}'" for event in SELECTED_EVENTS.keys()])})
          AND (SELECT value.{value_type} FROM UNNEST({parent_field}) WHERE key = '{key}') IS NOT NULL
        ORDER BY value
        LIMIT 1000
        """
    else:
        return f"""
        SELECT DISTINCT {field} AS value
        FROM `{SOURCE_PROJECT}.{SOURCE_DATASET}.{SOURCE_TABLE_PREFIX}*`
        WHERE _TABLE_SUFFIX BETWEEN '{SELECTED_START_DATE.strftime('%Y%m%d')}' AND '{SELECTED_END_DATE.strftime('%Y%m%d')}'
          AND event_name IN ({', '.join([f"'{event}'" for event in SELECTED_EVENTS.keys()])})
          AND {field} IS NOT NULL
        ORDER BY {field}
        LIMIT 1000
        """

In [55]:
from typing import Dict, List, Callable
import ipywidgets as widgets
from IPython.display import display, clear_output
import pandas as pd

def generate_dimension_select(dimension: Dict[str, str]) -> str:
    """
    Generate the SELECT clause for a dimension based on its nested type.

    Args:
        dimension (Dict[str, str]): A dictionary containing dimension information.

    Returns:
        str: The SELECT clause for the dimension.

    Raises:
        ValueError: If an unknown nested type is encountered.
    """
    if dimension['nested_type'] in ['NOT_NESTED', 'TYPE1']:
        return f"{dimension['field']} AS {dimension['alias']}"
    elif dimension['nested_type'] == 'TYPE2':
        return f"(SELECT value.{dimension['value_type']} FROM UNNEST({dimension['parent_field']}) WHERE key = '{dimension['key']}') AS {dimension['alias']}"
    elif dimension['nested_type'] == 'TYPE3':
        return f"(SELECT ARRAY_AGG({dimension['subfield']}) FROM UNNEST({dimension['parent_field']})) AS {dimension['alias']}"
    else:
        raise ValueError(f"Unknown nested type: {dimension['nested_type']}")

def select_type2_key_for_filter(field: str, keys: pd.Series, callback: Callable[[str], None]) -> None:
    """
    Display a dropdown for selecting a key from a TYPE2 nested field and handle the selection.

    Args:
        field (str): The name of the field.
        keys (pd.Series): A series of keys to choose from.
        callback (Callable[[str], None]): Function to call with the selected key.
    """
    key_dropdown = widgets.Dropdown(
        options=keys.tolist(),
        description='Select key:',
        style={'description_width': 'initial'}
    )

    confirm_button = widgets.Button(description="Confirm")
    output = widgets.Output()

    def on_confirm(b):
        with output:
            clear_output()
            selected_key = key_dropdown.value
            print(f"Selected key: {selected_key}")
            confirm_button.close()
            key_dropdown.close()
            callback(selected_key)

    confirm_button.on_click(on_confirm)
    display(widgets.VBox([key_dropdown, confirm_button, output]))

In [56]:
from typing import List, Dict, Callable, Optional
import ipywidgets as widgets
from IPython.display import display, clear_output
import pandas as pd
from google.cloud import bigquery

def display_type3_options(parent_field: str, subfields: List[str], nested_fields: Dict, callback: Callable) -> None:
    """Display options for TYPE3 nested fields and handle user selection."""
    options = subfields
    dropdown = widgets.Dropdown(
        options=options,
        description='Subfield:',
        style={'description_width': 'initial'}
    )
    suggested_name = widgets.Text(
        value=suggest_field_name(options[0]),
        description='Field name:',
        style={'description_width': 'initial'}
    )
    confirm_button = widgets.Button(description="Confirm")
    output = widgets.Output()

    def on_field_select(change):
        selected = change['new']
        suggested_name.value = suggest_field_name(selected)

    dropdown.observe(on_field_select, names='value')

    def on_confirm_field_selection(b):
        with output:
            clear_output()
            selected = dropdown.value
            field_name = suggested_name.value
            print(f"Selected: {parent_field}.{selected}")
            print(f"Field name: {field_name}")
            print("Note: This is an array field. The query will return the first non-null value.")
            final_selection = f"{parent_field}.{selected}"
            generate_extraction_query(final_selection, field_name, callback, is_array=True)

    confirm_button.on_click(on_confirm_field_selection)
    print("Warning: This field contains multiple values (array). The query will return the first non-null value.")
    display(widgets.VBox([dropdown, suggested_name, confirm_button, output]))

def generate_extraction_query(selected_field: str, field_name: str, callback: Callable, is_array: bool = False) -> None:
    """Generate and execute the extraction query."""
    global CHOSEN_PARAMETER, CHOSEN_PARAMETER_NAME
    CHOSEN_PARAMETER = selected_field
    CHOSEN_PARAMETER_NAME = field_name

    print(f"Parameter selected: {CHOSEN_PARAMETER}")
    print(f"Parameter name: {CHOSEN_PARAMETER_NAME}")
    print("Ready to create the final funnel table.")

    combined_query = generate_combined_query(SELECTED_EVENTS, SELECTED_DIMENSIONS, SELECTED_FILTERS, CHOSEN_SAMPLE_PERCENTAGE,for_estimation=False)
    execute_combined_query(combined_query, callback)

def get_value_type_for_key(field_name: str, key: str) -> Optional[str]:
    """Get the value type for a specific key in a field."""
    if not SELECTED_DATES:
        raise ValueError("No dates have been selected. Please run select_date_range() first.")

    first_available_date = min(SELECTED_DATES)
    table_suffix = first_available_date.strftime('%Y%m%d')

    query = f"""
    SELECT
        CASE
            WHEN value.string_value IS NOT NULL THEN 'string_value'
            WHEN value.int_value IS NOT NULL THEN 'int_value'
            WHEN value.float_value IS NOT NULL THEN 'float_value'
            WHEN value.double_value IS NOT NULL THEN 'double_value'
        END AS value_type
    FROM `{SOURCE_PROJECT}.{SOURCE_DATASET}.{SOURCE_TABLE_PREFIX}{table_suffix}`,
    UNNEST({field_name}) AS params
    WHERE params.key = '{key}'
    AND (value.string_value IS NOT NULL OR value.int_value IS NOT NULL OR value.float_value IS NOT NULL OR value.double_value IS NOT NULL)
    LIMIT 1
    """

    try:
        query_job = client.query(query)
        results = query_job.result()
        for row in results:
            return row.value_type
    except Exception as e:
        print(f"Error getting value type for {key}: {str(e)}")

    return None

def display_standard_features() -> None:
    """Display standard features that are automatically included."""
    standard_features = [
        widgets.Checkbox(value=True, description=feature, disabled=True)
        for feature in ['event_date', 'user_pseudo_id', 'event_timestamp', 'event_name', 'ga_session_id', 'purchase_revenue']
    ]
    display(widgets.VBox(standard_features, description='Standard Features (Automatically Included)'))

def analyze_source_table_schema() -> Tuple[Optional[List[bigquery.SchemaField]], Optional[Dict]]:
    """Analyze the schema of the source table."""
    if not SELECTED_DATES:
        raise ValueError("No dates have been selected. Please run select_date_range() first.")

    first_available_date = min(SELECTED_DATES)
    table_ref = f"{SOURCE_PROJECT}.{SOURCE_DATASET}.{SOURCE_TABLE_PREFIX}{first_available_date.strftime('%Y%m%d')}"

    try:
        table = client.get_table(table_ref)
        schema = table.schema
    except Exception as e:
        print(f"Error accessing table: {e}")
        return None, None

    nested_fields = {}
    for field in schema:
        nested_type = identify_nested_type(field)
        if nested_type != 'NOT_NESTED':
            field_info = {
                'type': nested_type,
                'subfields': [subfield.name for subfield in field.fields] if field.fields else []
            }
            if nested_type == 'TYPE2':
                try:
                    field_info['keys'] = get_type2_keys(field.name)
                except Exception as e:
                    print(f"Error getting keys for {field.name}: {str(e)}")
                    field_info['keys'] = pd.Series([], name='key')
            nested_fields[field.name] = field_info

    return schema, nested_fields


In [57]:
from typing import List, Dict, Callable, Optional, Tuple
import ipywidgets as widgets
from IPython.display import display, clear_output
import pandas as pd
from google.cloud import bigquery

def select_and_filter_fields(selected_fields: List[str], nested_fields: Dict, callback: Callable) -> None:
    """
    Select and filter fields based on their nested structure.

    Args:
        selected_fields (List[str]): List of selected field names.
        nested_fields (Dict): Dictionary containing information about nested fields.
        callback (Callable): Function to call after field selection and filtering.
    """
    global SELECTED_FILTERS
    SELECTED_FILTERS = []

    def process_field(field: str) -> Optional[List[str]]:
        if field in nested_fields:
            field_info = nested_fields[field]
            if field_info['type'] in ['TYPE1', 'TYPE3']:
                return select_subfield(field, field_info['subfields'])
            elif field_info['type'] == 'TYPE2':
                return select_key_value(field, field_info['keys'])
        else:
            return filter_field_values(field)

    for field in selected_fields:
        filters = process_field(field)
        if filters:
            SELECTED_FILTERS.extend(filters)

    print(f"Final selected filters: {SELECTED_FILTERS}")
    callback()

def generate_and_preview_query(filters: List[str], callback: Callable) -> None:
    """
    Generate and preview the query, allowing user confirmation or cancellation.

    Args:
        filters (List[str]): List of filter strings to be applied in the query.
        callback (Callable): Function to call after query confirmation or cancellation.
    """
    query = generate_combined_query(SELECTED_EVENTS, SELECTED_DIMENSIONS, SELECTED_FILTERS, CHOSEN_SAMPLE_PERCENTAGE,for_estimation=False)
    print("Preview of the generated query:")
    print(query)

    confirm_button = widgets.Button(description="Confirm Query")
    cancel_button = widgets.Button(description="Cancel")
    output = widgets.Output()

    def on_confirm(b):
        with output:
            clear_output()
            print("Query confirmed. Proceeding to execution...")
            execute_combined_query(query, callback)

    def on_cancel(b):
        with output:
            clear_output()
            print("Query cancelled. Returning to field selection...")
            start_field_selection_for_extraction(callback)

    confirm_button.on_click(on_confirm)
    cancel_button.on_click(on_cancel)

    display(widgets.VBox([confirm_button, cancel_button, output]))

    # Wait for user to confirm or cancel
    done = widgets.Button(description="Hidden")
    confirm_button.on_click(lambda b: done.click())
    cancel_button.on_click(lambda b: done.click())
    done.on_click(lambda b: b.close())
    done.wait_for_click()

def select_field_for_extraction(schema: List[bigquery.SchemaField], nested_fields: Dict, callback: Callable, level: int = 0) -> None:
    """
    Display interface for selecting a field for extraction and handle nested fields.

    Args:
        schema (List[bigquery.SchemaField]): List of schema fields.
        nested_fields (Dict): Dictionary containing information about nested fields.
        callback (Callable): Function to call after field selection.
        level (int, optional): Current nesting level. Defaults to 0.
    """
    options = [f"{field.name} (nested)" if field.name in nested_fields else field.name for field in schema]

    dropdown = widgets.Dropdown(
        options=options,
        description='Select field:',
        style={'description_width': 'initial'}
    )

    suggested_name = widgets.Text(
        value=suggest_field_name(options[0].split(' (')[0]),
        description='Field name:',
        style={'description_width': 'initial'}
    )

    confirm_button = widgets.Button(description="Confirm")
    output = widgets.Output()

    def on_field_select(change):
        selected = change['new'].split(' (')[0]
        suggested_name.value = suggest_field_name(selected)

    dropdown.observe(on_field_select, names='value')

    def on_confirm_field_selection(b):
        with output:
            clear_output()
            selected = dropdown.value.split(' (')[0]
            field_name = suggested_name.value
            print(f"Selected: {selected}")
            print(f"Field name: {field_name}")
            if selected not in nested_fields:
                generate_extraction_query(selected, field_name, callback)
            else:
                field_info = nested_fields[selected]
                nested_type = field_info['type']

                if nested_type == 'TYPE1':
                    print(f"Opening subfields for {selected} (Type 1 nested)...")
                    display_type1_options(selected, field_info['subfields'], nested_fields, callback)
                elif nested_type == 'TYPE2':
                    print(f"Opening key selection for {selected} (Type 2 nested)...")
                    display_type2_options(selected, field_info['keys'], callback)
                elif nested_type == 'TYPE3':
                    print(f"Opening array subfields for {selected} (Type 3 nested)...")
                    display_type3_options(selected, field_info['subfields'], nested_fields, callback)
                else:
                    print(f"Unknown nested type for {selected}. Please check the field structure.")

    confirm_button.on_click(on_confirm_field_selection)

    display(widgets.VBox([dropdown, suggested_name, confirm_button, output]))

##2.4 Final step for user interface
In this step we will set all together for the interface, including orders, selections, cost estimations etc.


###2.4.1 Function on choosing steps
Choosing steps on funnel levels (event names)

In [58]:
from typing import List, Dict, Callable
import ipywidgets as widgets
from IPython.display import display, clear_output

def build_funnel(
    event_names: List[str],
    initial_funnel_dict: Dict[str, str],
    total_levels: int,
    current_level: int,
    event_counts: Dict[str, int],
    callback: Callable
) -> None:
    """
    Build a funnel by allowing the user to select and order events.

    Args:
        event_names (List[str]): List of available event names.
        initial_funnel_dict (Dict[str, str]): Initial funnel dictionary.
        total_levels (int): Total number of levels in the funnel.
        current_level (int): Current level being built.
        event_counts (Dict[str, int]): Dictionary of event counts.
        callback (Callable): Function to call after funnel is built.
    """
    funnel_dict = initial_funnel_dict.copy()

    if current_level > total_levels:
        display_final_funnel(funnel_dict, event_counts, callback)
        return

    dropdown = widgets.Dropdown(options=event_names, description='Event name: ')
    name_input = widgets.Text(description='Name:')

    def update_selected_events():
        nonlocal selected_events
        selected_events.options = [
            (f"{level}. {name} ({event_counts[event]})", event)
            for level, (event, name) in enumerate(funnel_dict.items(), 1)
        ]

    selected_events = widgets.SelectMultiple(
        options=[
            (f"{level}. {name} ({event_counts[event]})", event)
            for level, (event, name) in enumerate(funnel_dict.items(), 1)
        ],
        description='Selected:',
        disabled=False
    )

    def on_select(change):
        selected_event = change['new'].split(' (')[0]
        formatted_name = suggest_field_name(selected_event)
        name_input.value = formatted_name

    dropdown.observe(on_select, names='value')

    initial_event = dropdown.value.split(' (')[0]
    formatted_initial_event = suggest_field_name(initial_event)
    name_input.value = formatted_initial_event

    add_button = widgets.Button(description='Add')
    remove_button = widgets.Button(description='Remove')
    move_up_button = widgets.Button(description='Move Up')
    move_down_button = widgets.Button(description='Move Down')
    next_button = widgets.Button(description='Next')
    output = widgets.Output()

    def on_add(b):
        nonlocal funnel_dict
        selected_event = dropdown.value.split(' (')[0]
        custom_name = name_input.value
        if selected_event not in funnel_dict:
            funnel_dict[selected_event] = custom_name
            update_selected_events()

    def on_remove(b):
        nonlocal funnel_dict
        if selected_events.value:
            event_to_remove = selected_events.value[0]
            del funnel_dict[event_to_remove]
            update_selected_events()

    def on_move_up(b):
        nonlocal funnel_dict
        if selected_events.value:
            event_to_move = selected_events.value[0]
            items = list(funnel_dict.items())
            index = next((i for i, (k, v) in enumerate(items) if k == event_to_move), None)
            if index is not None and index > 0:
                items[index-1], items[index] = items[index], items[index-1]
                funnel_dict = dict(items)
                update_selected_events()

    def on_move_down(b):
        nonlocal funnel_dict
        if selected_events.value:
            event_to_move = selected_events.value[0]
            items = list(funnel_dict.items())
            index = next((i for i, (k, v) in enumerate(items) if k == event_to_move), None)
            if index is not None and index < len(items) - 1:
                items[index], items[index+1] = items[index+1], items[index]
                funnel_dict = dict(items)
                update_selected_events()

    def on_next(b):
        with output:
            clear_output()
            if len(funnel_dict) == total_levels:
                display_final_funnel(funnel_dict, event_counts, callback)
            else:
                build_funnel(event_names, funnel_dict, total_levels, len(funnel_dict) + 1, event_counts, callback)

    add_button.on_click(on_add)
    remove_button.on_click(on_remove)
    move_up_button.on_click(on_move_up)
    move_down_button.on_click(on_move_down)
    next_button.on_click(on_next)

    display(widgets.VBox([
        widgets.HBox([dropdown, name_input]),
        widgets.HBox([add_button, remove_button, move_up_button, move_down_button]),
        selected_events,
        next_button,
        output
    ]))

###2.4.2 Generation of the final query
Generation for final query for extraction table


In [59]:
from IPython.display import display, HTML, clear_output
import ipywidgets as widgets
from google.cloud import bigquery
from typing import Callable

# Global variables
CHOSEN_SAMPLE_PERCENTAGE: float = 1.0
COST_PER_TIB: float = 6.25  # latest price updated NOV2024

def cancel_current_selection(output_widget: widgets.Output) -> None:
    """Cancel the current selection and display a cancellation message."""
    with output_widget:
        clear_output()
        print("Selection cancelled.")

def sample_size_selector(gb_processed: float, estimated_cost: float) -> None:
    """
    Display a slider for selecting sample size and update cost estimates.

    Args:
        gb_processed (float): Gigabytes of data to be processed.
        estimated_cost (float): Estimated cost for processing the data.
    """
    global CHOSEN_SAMPLE_PERCENTAGE

    sample_slider = widgets.FloatSlider(
        value=1, min=1, max=50, step=0.1,
        description='Sample %:',
        disabled=False, continuous_update=False,
        orientation='horizontal', readout=True, readout_format='.1f',
    )

    estimate_label = widgets.Label(value="Estimated data to be processed: calculating...")
    cost_label = widgets.Label(value="Estimated cost: calculating...")

    def update_estimate(change):
        percentage = change['new']
        sample_gb = gb_processed * (percentage / 100)
        sample_cost = estimated_cost * (percentage / 100)
        estimate_label.value = f"Estimated data to be processed: {sample_gb:.2f} GB"
        cost_label.value = f"Estimated cost: ${sample_cost:.4f}"

    sample_slider.observe(update_estimate, names='value')

    confirm_button = widgets.Button(description="Confirm Sample Size")

    def on_button_clicked(b):
        global CHOSEN_SAMPLE_PERCENTAGE
        CHOSEN_SAMPLE_PERCENTAGE = sample_slider.value
        print(f"Sample size confirmed: {CHOSEN_SAMPLE_PERCENTAGE}%")

    confirm_button.on_click(on_button_clicked)

    display(widgets.VBox([sample_slider, estimate_label, cost_label, confirm_button]))
    update_estimate({'new': sample_slider.value})

def color_text(text: str, color: str) -> str:
    """Wrap text in a colored HTML span."""
    return f'<span style="color: {color};">{text}</span>'

def execute_combined_query(query: str, callback: Callable) -> None:
    """
    Execute a combined query, displaying cost estimates and confirmation options.

    Args:
        query (str): The query to execute.
        callback (Callable): Function to call after query execution.
    """
    client = bigquery.Client(project=DEST_PROJECT, location=DEST_DATASET_LOCATION)
    job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
    query_job = client.query(query, job_config=job_config)
    bytes_processed = query_job.total_bytes_processed
    gb_processed = bytes_processed / (1024 * 1024 * 1024)
    estimated_cost = bytes_processed / 1e12 * COST_PER_TIB

    color = 'blue' if gb_processed < 1 else 'green' if 1 <= gb_processed <= 3 else 'red'
    data_size_text = f"<span style='color: {color};'>Estimated data to be processed: {gb_processed:.2f} GB</span>"
    display(HTML(data_size_text))

    print(f"Current price per TB: ${COST_PER_TIB:.2f}")
    print(f"Estimated cost for query: ${estimated_cost:.2f}")
    print("\nNOTE: This is only an estimate. No costs have been incurred yet.")
    print("Actual query execution and associated costs will only occur if you choose to proceed.")

    confirm_button = widgets.Button(description="Execute Query")
    cancel_button = widgets.Button(description="Cancel")
    output = widgets.Output()

    def on_confirm(b):
        with output:
            clear_output()
            print("Executing query. This will incur costs...")
            execute_query(query, callback)

    def on_cancel(b):
        cancel_current_selection(output)

    confirm_button.on_click(on_confirm)
    cancel_button.on_click(on_cancel)

    print("Choose an option:")
    display(widgets.VBox([
        widgets.HBox([confirm_button, cancel_button]),
        output
    ]))

    # Wait for user to make a choice
    done = widgets.Button(description="Hidden")
    confirm_button.on_click(lambda b: done.click())
    cancel_button.on_click(lambda b: done.click())
    done.on_click(lambda b: b.close())
    done.wait_for_click()

In [60]:
from typing import Dict, List, Union, Optional
from datetime import date

def generate_combined_query(
    selected_events: Dict[str, str],
    dimensions: List[Dict[str, str]],
    filters: Dict[str, Union[str, List[str], Dict[str, List[str]]]],
    sample_percentage: Optional[float] = None,
    for_estimation: bool = False
) -> str:
    """
    Generate a combined BigQuery query based on selected events, dimensions, and filters.

    Args:
        selected_events (Dict[str, str]): Dictionary of selected events.
        dimensions (List[Dict[str, str]]): List of dimension dictionaries.
        filters (Dict[str, Union[str, List[str], Dict[str, List[str]]]]): Dictionary of filters.
        sample_percentage (Optional[float]): Percentage of data to sample, if applicable.

    Returns:
        str: The generated BigQuery query.
    """
    events = ", ".join(f"'{event}'" for event in selected_events.keys())

    def handle_filter(field: str, value: Union[str, List[str]], is_nested: bool = False) -> str:
        if is_nested:
            formatted_values = ", ".join(f"'{v}'" for v in value)
            return f"(SELECT value.string_value FROM UNNEST({field}) WHERE key = '{value[0]}') IN ({formatted_values})"
        elif isinstance(value, (list, tuple)):
            formatted_values = ", ".join(f"'{v}'" for v in value)
            return f"{field} IN ({formatted_values})"
        else:
            return f"{field} = '{value}'"

    # Prepare WHERE clause
    where_conditions = [f"event_name IN ({events})"]
    for field, value in filters.items():
        if field == 'event_params':
            for nested_key, nested_value in value.items():
                where_conditions.append(handle_filter('event_params', [nested_key] + nested_value, is_nested=True))
        else:
            where_conditions.append(handle_filter(field, value))

    where_clause = " AND ".join(where_conditions)

    # Handle dimensions and filters in SELECT clause
    select_items = []

    # Add dimensions to select items
    for dim in dimensions:
        if dim['nested_type'] == 'TYPE2':
            select_items.append(f"(SELECT value.{dim['value_type']} FROM UNNEST({dim['parent_field']}) WHERE key = '{dim['key']}') AS `{dim['alias']}`")
        else:
            select_items.append(f"{dim['field']} AS `{dim['alias']}`")

    # Add filters to select items
    for field, value in filters.items():
        if field == 'event_params':
            for nested_key in value.keys():
                select_items.append(f"(SELECT value.string_value FROM UNNEST(event_params) WHERE key = '{nested_key}') AS `{nested_key}`")
        else:
            select_items.append(f"{field} AS `{field.split('.')[-1]}`")

    # Remove duplicates while preserving order
    select_items = list(dict.fromkeys(select_items))

    # Prepare the SELECT clause
    select_clause = f"""
        event_date, user_pseudo_id, event_timestamp, event_name,
        (SELECT value.int_value FROM UNNEST(event_params) WHERE key = 'ga_session_id') AS ga_session_id,
        (SELECT purchase_revenue FROM UNNEST([ecommerce])) AS purchase_revenue
    """

    if select_items:
        select_clause += ", " + ", ".join(select_items)

    # Create a query that unions all selected dates
    date_queries = []
    for date in SELECTED_DATES:
        date_str = date.strftime('%Y%m%d')
        date_query = f"""
        SELECT
            {select_clause}
        FROM `{SOURCE_PROJECT}.{SOURCE_DATASET}.{SOURCE_TABLE_PREFIX}{date_str}`
        WHERE {where_clause}
        """
        date_queries.append(date_query)

    # Combine all date queries with UNION ALL
    combined_query = " UNION ALL ".join(date_queries)

    if for_estimation:
        return combined_query

    else:
        # Wrap the combined query in a CREATE OR REPLACE TABLE statement
        final_query = f"""
        CREATE OR REPLACE TABLE `{DEST_PROJECT}.{DEST_DATASET}.{DEST_TABLE}` AS
        {combined_query}
        """
        return final_query

def quote_identifier(name: str) -> str:
    """Wrap an identifier in backticks."""
    return f"`{name}`"

###2.4.3 Summary interface
Summary interface before execution


In [61]:
from google.cloud import bigquery
from typing import Callable

def execute_query(query: str, callback: Callable[[], None]) -> None:
    """
    Execute a BigQuery query and call a callback function upon successful completion.

    Args:
        query (str): The BigQuery query to execute.
        callback (Callable[[], None]): Function to call after successful query execution.

    Raises:
        Exception: If an error occurs during query execution or table retrieval.
    """
    client = bigquery.Client(project=DEST_PROJECT, location=DEST_DATASET_LOCATION)
    job_config = bigquery.QueryJobConfig(use_legacy_sql=False)

    try:
        query_job = client.query(query, job_config=job_config, location=DEST_DATASET_LOCATION)
        query_job.result()  # Wait for the job to complete
        print("Query executed successfully.")

        # Get table info
        table_ref = client.dataset(DEST_DATASET).table(DEST_TABLE)
        table = client.get_table(table_ref)
        print(f"Table {DEST_TABLE} created with {table.num_rows:,} rows and {len(table.schema)} columns.")

        callback()  # Call the callback function after successful query execution
    except Exception as e:
        print(f"An error occurred while executing the query: {str(e)}")
        raise  # Re-raise the exception for proper error handling upstream

#Chapter 3: Funnel table building
In this chapter we will build up the Open User, Open Session, Close User and Close Session funnel tables from the extracted table data.

In [62]:
def handle_nested_field(field: str) -> str:
    """
    Handle nested field names by wrapping each part in backticks.

    This function takes a field name, which may be nested (containing dots),
    and wraps each part of the field name in backticks. This is useful for
    properly quoting field names in BigQuery queries.

    Args:
        field (str): The field name, potentially containing dots for nested fields.

    Returns:
        str: The field name with each part wrapped in backticks.

    Examples:
        >>> handle_nested_field('simple_field')
        '`simple_field`'
        >>> handle_nested_field('parent.child')
        '`parent`.`child`'
        >>> handle_nested_field('grand.parent.child')
        '`grand`.`parent`.`child`'
    """
    if '.' in field:
        parts = field.split('.')
        return '.'.join(f'`{part}`' for part in parts)
    return f'`{field}`'

##3.1 Open User Funnel building

In [63]:
from typing import List, Dict, Any
from google.cloud import bigquery

def generate_open_user_funnel_query() -> str:
    """
    Generate a BigQuery SQL query for creating an open user funnel.

    Returns:
        str: The generated SQL query.
    """
    event_steps = list(SELECTED_EVENTS.keys())
    event_aliases = list(SELECTED_EVENTS.values())
    purchase_event = next((event for event, alias in SELECTED_EVENTS.items() if 'purchase' in alias.lower()), None)

    def quote_identifier(name: str) -> str:
        return f"`{name}`"

    def handle_nested_field(field: Any) -> str:
        if isinstance(field, dict):
            return field['alias']
        elif isinstance(field, str) and '.' in field:
            parent, child = field.split('.', 1)
            return child if parent == 'event_params' else field.split('.')[-1]
        return field

    def generate_filter_condition(key: str, values: Any) -> str:
        if key == 'event_params':
            nested_conditions = [f"{quote_identifier(nested_key)} IN ({', '.join(map(repr, nested_values))})"
                                 for nested_key, nested_values in values.items()]
            return " AND ".join(nested_conditions)
        else:
            field = handle_nested_field(key)
            return f"{quote_identifier(field)} IN ({', '.join(map(repr, values))})"

    dimension_fields = [dim['alias'] for dim in SELECTED_DIMENSIONS]
    filter_fields = [handle_nested_field(key) if key != 'event_params' else value.keys()
                     for key, value in SELECTED_FILTERS.items()]
    filter_fields = [item for sublist in filter_fields for item in (sublist if isinstance(sublist, list) else [sublist])]
    all_fields = list(set(dimension_fields + filter_fields))

    field_select = ", ".join(quote_identifier(handle_nested_field(field)) for field in all_fields) if all_fields else ""
    field_group_by = field_select  # Since we're using aliases, select and group by are the same

    filter_conditions = " AND ".join([generate_filter_condition(k, v) for k, v in SELECTED_FILTERS.items()])
    where_clause = f"WHERE {filter_conditions}" if filter_conditions else ""

    base_table_cte = f"""
    WITH base_table AS (
      SELECT
        user_pseudo_id,
        event_timestamp,
        event_date,
        event_name,
        purchase_revenue
        {', ' + field_select if field_select else ''}
      FROM `{PROJECT}.{WORKING_DATASET}.{DEST_TABLE}`
      {where_clause}
    )
    """

    funnel_steps = [f"SUM(CASE WHEN event_name = '{event}' THEN 1 ELSE 0 END) AS {quote_identifier(alias)}"
                    for event, alias in zip(event_steps, event_aliases)]

    funnel_steps_cte = f"""
    , funnel_steps AS (
      SELECT
        event_date
        {', ' + field_select if field_select else ''}
        , {', '.join(funnel_steps)},
        SUM(CASE WHEN event_name = '{purchase_event}' THEN 1 ELSE 0 END) AS transaction_count,
        SUM(CASE WHEN event_name = '{purchase_event}' THEN purchase_revenue ELSE 0 END) AS total_revenue
      FROM base_table
      GROUP BY event_date{', ' + field_group_by if field_group_by else ''}
    )
    """

    final_select = f"""
    SELECT
      event_date
      {', ' + ", ".join(quote_identifier(handle_nested_field(field)) for field in all_fields) if all_fields else ''}
      , {', '.join([quote_identifier(alias) for alias in event_aliases])},
      total_revenue AS revenue,
      transaction_count
    FROM funnel_steps
    ORDER BY event_date{', ' + ", ".join(quote_identifier(handle_nested_field(field)) for field in all_fields) if all_fields else ''}
    """

    full_query = f"{base_table_cte}\n{funnel_steps_cte}\n{final_select}"
    return full_query

def create_open_user_funnel_table() -> None:
    """
    Create an open user funnel table in BigQuery.
    """
    query = generate_open_user_funnel_query()
    table_name = f"{PROJECT}.{WORKING_DATASET}.{FUNNEL_TYPE}_open_user_funnel"
    create_table_query = f"""
    CREATE OR REPLACE TABLE `{table_name}` AS
    {query}
    """

    job_config = bigquery.QueryJobConfig(use_legacy_sql=False)
    client = bigquery.Client()

    try:
        query_job = client.query(create_table_query, job_config=job_config)
        query_job.result()  # Wait for the job to complete
        print(f"Table {table_name} created successfully.")

        # Get table info
        table = client.get_table(table_name)
        print(f"Table has {table.num_rows} rows and {len(table.schema)} columns.")

    except Exception as e:
        print(f"An error occurred: {e}")
        raise  # Re-raise the exception for proper error handling upstream

##3.2 Open Session Funnel building

In [64]:
from typing import List, Dict, Any
from google.cloud import bigquery

def generate_open_session_funnel_query() -> str:
    """
    Generate a BigQuery SQL query for creating an open session funnel.

    Returns:
        str: The generated SQL query.
    """
    event_steps = list(SELECTED_EVENTS.keys())
    event_aliases = list(SELECTED_EVENTS.values())
    purchase_event = next((event for event, alias in SELECTED_EVENTS.items() if 'purchase' in alias.lower()), None)

    def quote_identifier(name: str) -> str:
        return f"`{name}`"

    def handle_nested_field(field: Any) -> str:
        if isinstance(field, dict):
            return field['alias']
        elif isinstance(field, str) and '.' in field:
            parent, child = field.split('.', 1)
            return child if parent == 'event_params' else field.split('.')[-1]
        return field

    def generate_filter_condition(key: str, values: Any) -> str:
        if key == 'event_params':
            nested_conditions = [f"{quote_identifier(nested_key)} IN ({', '.join(map(repr, nested_values))})"
                                 for nested_key, nested_values in values.items()]
            return " AND ".join(nested_conditions)
        else:
            field = handle_nested_field(key)
            return f"{quote_identifier(field)} IN ({', '.join(map(repr, values))})"

    dimension_fields = [dim['alias'] for dim in SELECTED_DIMENSIONS]
    filter_fields = [handle_nested_field(key) if key != 'event_params' else value.keys()
                     for key, value in SELECTED_FILTERS.items()]
    filter_fields = [item for sublist in filter_fields for item in (sublist if isinstance(sublist, list) else [sublist])]
    all_fields = list(set(dimension_fields + filter_fields))

    field_select = ", ".join(quote_identifier(handle_nested_field(field)) for field in all_fields) if all_fields else ""
    field_group_by = field_select  # Since we're using aliases, select and group by are the same

    filter_conditions = " AND ".join([generate_filter_condition(k, v) for k, v in SELECTED_FILTERS.items()])
    where_clause = f"WHERE {filter_conditions}" if filter_conditions else ""

    base_table_cte = f"""
    WITH base_table AS (
      SELECT
        ga_session_id AS session_id,
        event_timestamp,
        event_date,
        event_name,
        purchase_revenue
        {', ' + field_select if field_select else ''}
      FROM `{PROJECT}.{WORKING_DATASET}.{DEST_TABLE}`
      {where_clause}
    )
    """

    purchase_aggregation_cte = f"""
    , purchase_aggregation AS (
      SELECT
        session_id,
        event_date
        {', ' + ", ".join([f"COALESCE({quote_identifier(handle_nested_field(field))}, 'Unknown') AS {quote_identifier(handle_nested_field(field))}" for field in all_fields]) if all_fields else ''}
        , COUNT(*) AS transaction_count,
        SUM(purchase_revenue) AS total_revenue
      FROM base_table
      WHERE event_name = '{purchase_event}'
      GROUP BY session_id, event_date{', ' + field_group_by if field_group_by else ''}
    )
    """

    funnel_steps = [f"MAX(CASE WHEN event_name = '{original_event}' THEN 1 ELSE 0 END) AS {quote_identifier(alias)}"
                    for original_event, alias in zip(event_steps, event_aliases)]

    funnel_steps_cte = f"""
    , funnel_steps AS (
      SELECT
        session_id,
        event_date
        {', ' + field_select if field_select else ''}
        , {', '.join(funnel_steps)}
      FROM base_table
      GROUP BY session_id, event_date{', ' + field_group_by if field_group_by else ''}
    )
    """

    funnel_counts_cte = f"""
    , funnel_counts AS (
      SELECT
        event_date
        {', ' + field_select if field_select else ''}
        , {', '.join([f"SUM({quote_identifier(alias)}) AS {quote_identifier(alias)}" for alias in event_aliases])}
      FROM funnel_steps
      GROUP BY event_date{', ' + field_group_by if field_group_by else ''}
    )
    """

    join_conditions = " AND ".join([f"COALESCE(fc.{quote_identifier(handle_nested_field(field))}, 'Unknown') = COALESCE(pa.{quote_identifier(handle_nested_field(field))}, 'Unknown')" for field in all_fields]) if all_fields else ''

    final_select = f"""
    SELECT
      fc.event_date
      {', ' + ", ".join(f"fc.{quote_identifier(handle_nested_field(field))}" for field in all_fields) if all_fields else ''}
      , {', '.join([f"fc.{quote_identifier(alias)}" for alias in event_aliases])},
      COALESCE(SUM(pa.total_revenue), 0) AS revenue,
      COALESCE(SUM(pa.transaction_count), 0) AS transaction_count
    FROM funnel_counts fc
    LEFT JOIN purchase_aggregation pa
      ON fc.event_date = pa.event_date
      {' AND ' + join_conditions if join_conditions else ''}
    GROUP BY
      fc.event_date
      {', ' + ", ".join(f"fc.{quote_identifier(handle_nested_field(field))}" for field in all_fields) if all_fields else ''}
      , {', '.join([f"fc.{quote_identifier(alias)}" for alias in event_aliases])}
    ORDER BY
      fc.event_date
      {', ' + ", ".join(f"fc.{quote_identifier(handle_nested_field(field))}" for field in all_fields) if all_fields else ''}
    """

    full_query = f"{base_table_cte}\n{purchase_aggregation_cte}\n{funnel_steps_cte}\n{funnel_counts_cte}\n{final_select}"
    return full_query

def create_open_session_funnel_table() -> None:
    """
    Create an open session funnel table in BigQuery.
    """
    query = generate_open_session_funnel_query()
    table_name = f"{PROJECT}.{WORKING_DATASET}.{FUNNEL_TYPE}_open_session_funnel"
    create_table_query = f"""
    CREATE OR REPLACE TABLE `{table_name}` AS
    {query}
    """

    job_config = bigquery.QueryJobConfig(use_legacy_sql=False)
    client = bigquery.Client()

    try:
        query_job = client.query(create_table_query, job_config=job_config)
        query_job.result()  # Wait for the job to complete
        print(f"Table {table_name} created successfully.")

        # Get table info
        table = client.get_table(table_name)
        print(f"Table has {table.num_rows} rows and {len(table.schema)} columns.")

    except Exception as e:
        print(f"An error occurred: {e}")
        raise  # Re-raise the exception for proper error handling upstream

##3.3 Close User Funnel building

In [65]:
from typing import List, Dict, Any
from google.cloud import bigquery

def generate_close_user_funnel_query() -> str:
    """
    Generate a BigQuery SQL query for creating a close user funnel.

    Returns:
        str: The generated SQL query.
    """
    event_steps = list(SELECTED_EVENTS.keys())
    event_aliases = list(SELECTED_EVENTS.values())
    purchase_event = next((event for event, alias in SELECTED_EVENTS.items() if 'purchase' in alias.lower()), None)

    def quote_identifier(name: str) -> str:
        return f"`{name}`"

    def handle_nested_field(field: Any) -> str:
        if isinstance(field, dict):
            return field['alias']
        elif isinstance(field, str) and '.' in field:
            parent, child = field.split('.', 1)
            return child if parent == 'event_params' else field.split('.')[-1]
        return field

    def generate_filter_condition(key: str, values: Any) -> str:
        if key == 'event_params':
            nested_conditions = [f"{quote_identifier(nested_key)} IN ({', '.join(map(repr, nested_values))})"
                                 for nested_key, nested_values in values.items()]
            return " AND ".join(nested_conditions)
        else:
            field = handle_nested_field(key)
            return f"{quote_identifier(field)} IN ({', '.join(map(repr, values))})"

    dimension_fields = [dim['alias'] for dim in SELECTED_DIMENSIONS]
    filter_fields = [handle_nested_field(key) if key != 'event_params' else value.keys()
                     for key, value in SELECTED_FILTERS.items()]
    filter_fields = [item for sublist in filter_fields for item in (sublist if isinstance(sublist, list) else [sublist])]
    all_fields = list(set(dimension_fields + filter_fields))

    field_select = ", ".join(quote_identifier(handle_nested_field(field)) for field in all_fields) if all_fields else ""
    field_group_by = field_select  # Since we're using aliases, select and group by are the same

    filter_conditions = " AND ".join([generate_filter_condition(k, v) for k, v in SELECTED_FILTERS.items()])
    where_clause = f"WHERE {filter_conditions}" if filter_conditions else ""

    base_table_cte = f"""
    WITH base_table AS (
      SELECT
        user_pseudo_id,
        event_timestamp,
        event_date,
        event_name,
        purchase_revenue
        {', ' + field_select if field_select else ''}
      FROM `{PROJECT}.{WORKING_DATASET}.{DEST_TABLE}`
      {where_clause}
    )
    """

    funnel_analysis_cte = f"""
    , funnel_analysis AS (
      SELECT
        user_pseudo_id,
        event_date
        {', ' + field_select if field_select else ''}
        , ARRAY_AGG(STRUCT(event_name, event_timestamp, purchase_revenue) ORDER BY event_timestamp) AS events
      FROM base_table
      GROUP BY user_pseudo_id, event_date{', ' + field_group_by if field_group_by else ''}
    )
    """

    funnel_steps = []
    for i, (event, alias) in enumerate(zip(event_steps, event_aliases)):
        previous_events = event_steps[:i]
        previous_events_condition = " AND ".join([f"'{prev_event}' IN (SELECT event_name FROM UNNEST(events) WHERE event_timestamp < current_event.event_timestamp)" for prev_event in previous_events])
        step = f"""
        COUNT(DISTINCT CASE WHEN EXISTS (
          SELECT 1
          FROM UNNEST(events) AS current_event
          WHERE current_event.event_name = '{event}'
          {f'AND {previous_events_condition}' if previous_events else ''}
        ) THEN user_pseudo_id END) AS {quote_identifier(alias)}"""
        funnel_steps.append(step)

    final_select = f"""
    SELECT
      event_date
      {', ' + field_select if field_select else ''}
      , COUNT(DISTINCT user_pseudo_id) AS total_users,
      {', '.join(funnel_steps)},
      COUNT(DISTINCT CASE WHEN '{purchase_event}' IN (SELECT event_name FROM UNNEST(events)) THEN user_pseudo_id END) AS total_transactions,
      SUM((SELECT SUM(purchase_revenue) FROM UNNEST(events) WHERE event_name = '{purchase_event}')) AS total_revenue
    FROM funnel_analysis
    GROUP BY event_date{', ' + field_group_by if field_group_by else ''}
    ORDER BY event_date{', ' + ", ".join(quote_identifier(handle_nested_field(field)) for field in all_fields) if all_fields else ''}
    """

    full_query = base_table_cte + funnel_analysis_cte + final_select
    return full_query

def create_close_user_funnel_table() -> None:
    """
    Create a close user funnel table in BigQuery.
    """
    query = generate_close_user_funnel_query()
    table_name = f"{PROJECT}.{WORKING_DATASET}.{FUNNEL_TYPE}_close_user_funnel"
    create_table_query = f"""
    CREATE OR REPLACE TABLE `{table_name}` AS
    {query}
    """

    job_config = bigquery.QueryJobConfig(use_legacy_sql=False)
    client = bigquery.Client()

    try:
        query_job = client.query(create_table_query, job_config=job_config)
        query_job.result()  # Wait for the job to complete
        print(f"Table {table_name} created successfully.")

        # Fetch and display a sample of the data
        sample_query = f"""
        SELECT *
        FROM `{table_name}`
        LIMIT 5
        """
        sample_job = client.query(sample_query)
        sample_results = sample_job.result()

        # Print the sample results
        #print("Sample data:")
        #for row in sample_results:
         #   print(row)

    except Exception as e:
        print(f"An error occurred: {e}")
        raise  # Re-raise the exception for proper error handling upstream

##3.4 Close Session Funnel building

In [66]:
from typing import List, Dict, Any
from google.cloud import bigquery

def generate_close_session_funnel_query() -> str:
    """
    Generate a BigQuery SQL query for creating a close session funnel.

    Returns:
        str: The generated SQL query.
    """
    event_steps = list(SELECTED_EVENTS.keys())
    event_aliases = list(SELECTED_EVENTS.values())
    purchase_event = next((event for event, alias in SELECTED_EVENTS.items() if 'purchase' in alias.lower()), None)

    def quote_identifier(name: str) -> str:
        return f"`{name}`"

    def handle_nested_field(field: Any) -> str:
        if isinstance(field, dict):
            return field['alias']
        elif isinstance(field, str) and '.' in field:
            parent, child = field.split('.', 1)
            return child if parent == 'event_params' else field.split('.')[-1]
        return field

    def generate_filter_condition(key: str, values: Any) -> str:
        if key == 'event_params':
            nested_conditions = [f"{quote_identifier(nested_key)} IN ({', '.join(map(repr, nested_values))})"
                                 for nested_key, nested_values in values.items()]
            return " AND ".join(nested_conditions)
        else:
            field = handle_nested_field(key)
            return f"{quote_identifier(field)} IN ({', '.join(map(repr, values))})"

    dimension_fields = [dim['alias'] for dim in SELECTED_DIMENSIONS]
    filter_fields = [handle_nested_field(key) if key != 'event_params' else value.keys()
                     for key, value in SELECTED_FILTERS.items()]
    filter_fields = [item for sublist in filter_fields for item in (sublist if isinstance(sublist, list) else [sublist])]
    all_fields = list(set(dimension_fields + filter_fields))

    field_select = ", ".join(quote_identifier(handle_nested_field(field)) for field in all_fields) if all_fields else ""
    field_group_by = field_select  # Since we're using aliases, select and group by are the same

    filter_conditions = " AND ".join([generate_filter_condition(k, v) for k, v in SELECTED_FILTERS.items()])
    where_clause = f"WHERE {filter_conditions}" if filter_conditions else ""

    base_table_cte = f"""
    WITH all_events AS (
      SELECT
        ga_session_id AS session_id,
        event_timestamp,
        event_date,
        event_name,
        purchase_revenue
        {', ' + field_select if field_select else ''}
      FROM `{PROJECT}.{WORKING_DATASET}.{DEST_TABLE}`
      {where_clause}
    ),
    base_table AS (
      SELECT *
      FROM all_events
      WHERE session_id IS NOT NULL
    ),
    dropped_purchases AS (
      SELECT
        event_date
        {', ' + ", ".join([f"COALESCE({quote_identifier(handle_nested_field(field))}, 'Unknown') AS {quote_identifier(handle_nested_field(field))}" for field in all_fields]) if all_fields else ''}
        , COUNT(*) AS dropped_purchase_count
      FROM all_events
      WHERE event_name = '{purchase_event}' AND session_id IS NULL
      GROUP BY event_date{', ' + field_group_by if field_group_by else ''}
    )
    """

    funnel_steps = []
    for event, alias, next_event in zip(event_steps, event_aliases, event_steps[1:] + [None]):
        step = f"""
        COUNT(DISTINCT CASE WHEN '{event}' IN (
          SELECT event_name
          FROM UNNEST(events)
          WHERE event_timestamp <= (
            SELECT MIN(event_timestamp)
            FROM UNNEST(events)
            WHERE event_name = '{next_event if next_event else ''}'
          ) OR '{next_event if next_event else ''}' NOT IN (SELECT event_name FROM UNNEST(events))
        ) THEN session_id END) AS {quote_identifier(alias)}"""
        funnel_steps.append(step)

    funnel_analysis_cte = f"""
    , funnel_analysis AS (
      SELECT
        session_id,
        event_date
        {', ' + field_select if field_select else ''}
        , ARRAY_AGG(STRUCT(event_name, event_timestamp, purchase_revenue) ORDER BY event_timestamp) AS events
      FROM base_table
      GROUP BY session_id, event_date{', ' + field_group_by if field_group_by else ''}
    ),
    funnel_steps AS (
      SELECT
        event_date
        {', ' + field_select if field_select else ''}
        , COUNT(DISTINCT session_id) AS total_sessions,
        {', '.join(funnel_steps)},
        COUNT(CASE WHEN '{purchase_event}' IN (SELECT event_name FROM UNNEST(events)) THEN 1 END) AS total_transactions,
        SUM((SELECT SUM(purchase_revenue) FROM UNNEST(events) WHERE event_name = '{purchase_event}')) AS revenue
      FROM funnel_analysis
      GROUP BY event_date{', ' + field_group_by if field_group_by else ''}
    )
    """

    join_conditions = " AND ".join([f"COALESCE(fs.{quote_identifier(handle_nested_field(field))}, 'Unknown') = COALESCE(dp.{quote_identifier(handle_nested_field(field))}, 'Unknown')" for field in all_fields]) if all_fields else ''

    final_select = f"""
    SELECT
      fs.event_date
      {', ' + ", ".join(f"fs.{quote_identifier(handle_nested_field(field))}" for field in all_fields) if all_fields else ''}
      , fs.total_sessions,
      {', '.join([f"fs.{quote_identifier(alias)}" for alias in event_aliases])},
      fs.total_transactions,
      fs.revenue,
      COALESCE(dp.dropped_purchase_count, 0) AS dropped_purchase_null_session_id
    FROM funnel_steps fs
    LEFT JOIN dropped_purchases dp
      ON fs.event_date = dp.event_date
      {' AND ' + join_conditions if join_conditions else ''}
    ORDER BY
      fs.event_date
      {', ' + ", ".join(f"fs.{quote_identifier(handle_nested_field(field))}" for field in all_fields) if all_fields else ''}
    """

    full_query = base_table_cte + funnel_analysis_cte + final_select
    return full_query

def create_close_session_funnel_table() -> None:
    """
    Create a close session funnel table in BigQuery.
    """
    query = generate_close_session_funnel_query()
    table_name = f"{PROJECT}.{WORKING_DATASET}.{FUNNEL_TYPE}_close_session_funnel"
    create_table_query = f"""
    CREATE OR REPLACE TABLE `{table_name}` AS
    {query}
    """

    job_config = bigquery.QueryJobConfig(use_legacy_sql=False)
    client = bigquery.Client()

    try:
        query_job = client.query(create_table_query, job_config=job_config)
        query_job.result()  # Wait for the job to complete
        print(f"Table {table_name} created successfully.")

        # Get table info
        table = client.get_table(table_name)
        print(f"Table has {table.num_rows} rows and {len(table.schema)} columns.")

    except Exception as e:
        print(f"An error occurred: {e}")
        raise  # Re-raise the exception for proper error handling upstream

# Chapter 4: Final Orchestration
Here will will tailor up all functions so they will run in correct order

## 4.1 Orchester function

In [67]:
from typing import List, Dict, Optional

def create_funnel_tables(dimensions: Optional[List[Dict]] = None,
                         filters: Optional[Dict] = None,
                         sample_percentage: Optional[float] = None) -> None:
    """
    Create funnel tables based on the provided dimensions, filters, and sample percentage.

    This function creates an extracted table and then generates various funnel tables
    (open user, open session, close session, and close user) based on that extracted data.

    Args:
        dimensions (Optional[List[Dict]]): List of dimension dictionaries. Defaults to an empty list.
        filters (Optional[Dict]): Dictionary of filters. Defaults to an empty dictionary.
        sample_percentage (Optional[float]): Percentage of data to sample. Defaults to None (full dataset).
    """
    if dimensions is None:
        dimensions = []
    if filters is None:
        filters = {}

    def after_extracted_table_creation():
        print(f"Creating funnel tables...")

        #print("\nOpen User Funnel Query:")
        open_user_query = generate_open_user_funnel_query()
        #print(open_user_query)
        create_open_user_funnel_table()

        #print("\nOpen Session Funnel Query:")
        open_session_query = generate_open_session_funnel_query()
        #print(open_session_query)
        create_open_session_funnel_table()

        #print("\nClose Session Funnel Query:")
        close_session_query = generate_close_session_funnel_query()
        #print(close_session_query)
        create_close_session_funnel_table()

        #print("\nClose User Funnel Query:")
        close_user_query = generate_close_user_funnel_query()
        #print(close_user_query)
        create_close_user_funnel_table()

        print(f"Funnel tables created successfully.")

    print(f"Creating {'sample ' if sample_percentage is not None else ''}Extracted_table...")
    query = generate_combined_query(SELECTED_EVENTS, SELECTED_DIMENSIONS, SELECTED_FILTERS, CHOSEN_SAMPLE_PERCENTAGE,for_estimation=False)
    execute_query(query, after_extracted_table_creation)

In [68]:
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from typing import Optional
def start_funnel_builder():
    global SELECTED_START_DATE, SELECTED_END_DATE, SELECTED_DIMENSIONS, SELECTED_FILTERS

    print(f"Current BigQuery settings:")
    print(f"Project: {SOURCE_PROJECT}, Dataset: {SOURCE_DATASET}, Table prefix: {SOURCE_TABLE_PREFIX}")
    print(f"Destination Project: {DEST_PROJECT}, Dataset: {DEST_DATASET}, Table: {DEST_TABLE}")
    print()
    print(f"Destination dataset location: {DEST_DATASET_LOCATION}")
    print(f"Source dataset location: {SOURCE_DATASET_LOCATION}")
    print("\nStarting funnel builder...")

    def continue_funnel_building():
        get_funnel_levels(start_field_selection)

    def start_field_selection():
        start_field_selection_for_extraction(create_funnel_tables)

    # Call select_date_range with continue_funnel_building as the callback
    select_date_range(continue_funnel_building)




##4.2 Run all

In [69]:
start_funnel_builder()

Current BigQuery settings:
Project: ga-data-242308, Dataset: analytics_305832606, Table prefix: events_
Destination Project: conversionista-se, Dataset: Siwens_NoN1, Table: Extracted_table

Destination dataset location: EU
Source dataset location: EU

Starting funnel builder...


VBox(children=(DatePicker(value=datetime.date(2024, 11, 11), description='Start Date'), DatePicker(value=datet…

VBox(children=(Dropdown(description='Select field:', options=('event_params', 'event_previous_timestamp', 'eve…

VBox(children=(Dropdown(description='Select subfield:', options=('category', 'mobile_brand_name', 'mobile_mode…

Creating Extracted_table...
Query executed successfully.
Table Extracted_table created with 752,731 rows and 7 columns.
Creating funnel tables...
Table conversionista-se.Siwens_NoN1.Dynamic_open_user_funnel created successfully.
Table has 4 rows and 9 columns.
Table conversionista-se.Siwens_NoN1.Dynamic_open_session_funnel created successfully.
Table has 4 rows and 9 columns.
Table conversionista-se.Siwens_NoN1.Dynamic_close_session_funnel created successfully.
Table has 4 rows and 11 columns.
Table conversionista-se.Siwens_NoN1.Dynamic_close_user_funnel created successfully.
Funnel tables created successfully.


In [70]:
DEST_DATASET_LOCATION

'EU'

In [71]:
SOURCE_DATASET_LOCATION

'EU'

# Plots

In [74]:

def run_the_plot():
    from google.cloud import bigquery
    import pandas as pd
    import plotly.graph_objects as go
    from plotly.offline import init_notebook_mode, iplot
    import plotly.io as pio
    from typing import Any
    from typing import Optional, List, Dict
    from IPython.display import display, HTML
    import traceback


    # Set default renderer for plotly
    pio.renderers.default = "colab"

    # Initialize BigQuery client
    client = bigquery.Client()

    def custom_show(fig: go.Figure) -> None:
        """
        Custom function to display plotly figures in Google Colab.

        Args:
            fig (go.Figure): The plotly figure to display.
        """
        pio.show(fig, renderer="colab")

    # Initialize plotly for notebook use
    init_notebook_mode(connected=True)



    def fetch_data(table_name: str) -> pd.DataFrame:
        """
        Fetch data from a BigQuery table.

        Args:
            table_name (str): The full name of the BigQuery table.

        Returns:
            pd.DataFrame: A DataFrame containing the fetched data, or an empty DataFrame if an error occurs.
        """
        client = bigquery.Client()

        try:
            # Get the schema of the table
            table = client.get_table(table_name)
            table_columns = [field.name for field in table.schema]

            # Check if event_date exists
            date_col = "event_date" if "event_date" in table_columns else "NULL as event_date"

            # Prepare and execute the query
            query = f"SELECT {date_col}, * FROM `{table_name}`"
            df = client.query(query).to_dataframe()
            return df
        except Exception as e:
            print(f"Error executing query for table {table_name}: {str(e)}")
            return pd.DataFrame()  # Return an empty DataFrame if there's an error

    def process_funnel_data(table_name: str) -> Optional[pd.DataFrame]:
        """
        Process funnel data from a BigQuery table.

        Args:
            table_name (str): The full name of the BigQuery table.

        Returns:
            Optional[pd.DataFrame]: A DataFrame containing the processed funnel data, or None if no data is fetched.
        """
        df = fetch_data(table_name)
        if df.empty:
            print(f"No data fetched for {table_name}")
            return None

        # Get the event names and their corresponding column names
        event_columns: List[str] = list(SELECTED_EVENTS.values())

        # Sum the values for each event column
        total_data = df[event_columns].sum()

        # Sort values in descending order based on the order in SELECTED_EVENTS
        total_data = total_data.reindex(event_columns)

        # Create DataFrame with stage and value columns
        funnel_df = pd.DataFrame({'stage': total_data.index, 'value': total_data.values})

        print(f"Funnel data:\n{funnel_df}")

        return funnel_df


    def create_funnel_plot(funnel_data: pd.DataFrame) -> go.Figure:
        """
        Create a funnel plot from the given funnel data.

        Args:
            funnel_data (pd.DataFrame): DataFrame containing 'stage' and 'value' columns.

        Returns:
            go.Figure: A plotly Figure object representing the funnel plot.
        """
        fig = go.Figure()

        fig.add_trace(go.Funnel(
            y=funnel_data['stage'],
            x=funnel_data['value'],
            textposition="inside",
            textinfo="value+percent initial",
            opacity=0.65,
            marker={"color": ["royalblue", "lightskyblue", "lightcyan", "lightblue", "lightgreen"]},
            connector={"line": {"color": "royalblue", "dash": "solid", "width": 3}}
        ))

        # Add stage names and drop rate arrows
        for i, row in funnel_data.iterrows():
            percent = (row['value'] / funnel_data['value'].iloc[0]) * 100
            # Add stage name
            fig.add_annotation(
                x=0,  # Moved further left
                y=row['stage'],
                text=f"{row['stage']}: {row['value']} ({percent:.1f}%)",
                showarrow=False,
                xanchor="right",
                xref="paper",
                xshift=888,
                font=dict(size=12)
            )

            # Add drop rate arrow (except for the first stage)
            if i > 0:
                previous_value = funnel_data['value'].iloc[i-1]
                drop_rate = ((previous_value - row['value']) / previous_value) * 100

                # Adjust arrow position based on drop rate
                x_pos = previous_value * -0.5
                ax = 0

                fig.add_annotation(
                    x=x_pos,
                    y=row['stage'],
                    text=f"{drop_rate:.1f}% drop",
                    showarrow=True,
                    arrowhead=2,
                    arrowsize=1,
                    arrowwidth=2,
                    arrowcolor="red",
                    ax=ax,
                    ay=-40,  # Positive value to point downward
                    xanchor="right",
                    yanchor="bottom",
                    font=dict(size=10, color="red")
                )

        fig.update_layout(
            title_text="Session-based Funnel",
            title_x=0.3,  # Move title to the left
            font_size=12,
            width=1400,  # Increased width
            height=600,  # Decreased height
            margin=dict(l=350, r=50, t=50, b=50),  # Adjusted margins
            yaxis=dict(showticklabels=False, title=None)
        )

        return fig

    # List of funnel tables
    funnel_tables: List[str] = [
        f"{PROJECT}.{WORKING_DATASET}.{FUNNEL_TYPE}_open_user_funnel",
        f"{PROJECT}.{WORKING_DATASET}.{FUNNEL_TYPE}_open_session_funnel",
        f"{PROJECT}.{WORKING_DATASET}.{FUNNEL_TYPE}_close_user_funnel",
        f"{PROJECT}.{WORKING_DATASET}.{FUNNEL_TYPE}_close_session_funnel"
    ]


    def process_and_display_funnels(funnel_tables: List[str], selected_events: Dict[str, str], selected_dimensions: List[Dict]) -> None:
        """
        Process and display funnel plots for the given funnel tables.

        Args:
            funnel_tables (List[str]): List of funnel table names to process.
            selected_events (Dict[str, str]): Dictionary of selected events.
            selected_dimensions (List[Dict]): List of selected dimensions.
        """
        print(f"SELECTED_EVENTS: {selected_events}")
        print(f"SELECTED_DIMENSIONS: {selected_dimensions}")

        for table in funnel_tables:
            funnel_name = table.split('.')[-1]
            print(f"\n{'='*50}")
            print(f"Processing {funnel_name}")
            print(f"{'='*50}")

            try:
                funnel_data = process_funnel_data(table)

                if funnel_data is not None and not funnel_data.empty:
                    print(f"\nCreating plot for {funnel_name}")
                    fig = create_funnel_plot(funnel_data)
                    fig.update_layout(title_text=f"Funnel - {funnel_name}")

                    html = fig.to_html(full_html=False, include_plotlyjs='cdn')
                    display(HTML(html))

                    print("Plot should be displayed above.")
                else:
                    print(f"Unable to create plot for {funnel_name}: No data or empty DataFrame")
            except Exception as e:
                print(f"Error processing {funnel_name}: {str(e)}")
                traceback.print_exc()

            print(f"{'='*50}\n")

    process_and_display_funnels(funnel_tables,SELECTED_EVENTS,SELECTED_DIMENSIONS)



## final plots

In [75]:
# OBS! run this function after you have chosen and done all from "4.2 Run all"
run_the_plot()

SELECTED_EVENTS: {'session_start': 'Session Start', 'user_engagement': 'User Engagement', 'view_item': 'View Item', 'add_to_cart': 'Add To Cart', 'purchase': 'Purchase'}
SELECTED_DIMENSIONS: [{'field': 'device.category', 'nested_type': 'TYPE1', 'parent_field': 'device', 'subfield': 'category', 'alias': 'device_category'}]

Processing Dynamic_open_user_funnel
Funnel data:
             stage   value
0    Session Start  199849
1  User Engagement  270088
2        View Item  268103
3      Add To Cart   12863
4         Purchase    1828

Creating plot for Dynamic_open_user_funnel


Plot should be displayed above.


Processing Dynamic_open_session_funnel
Funnel data:
             stage  value
0    Session Start  93397
1  User Engagement  70850
2        View Item  62280
3      Add To Cart   6461
4         Purchase   1744

Creating plot for Dynamic_open_session_funnel


Plot should be displayed above.


Processing Dynamic_close_user_funnel
Funnel data:
             stage   value
0    Session Start  150501
1  User Engagement   95487
2        View Item   35921
3      Add To Cart    3440
4         Purchase    1352

Creating plot for Dynamic_close_user_funnel


Plot should be displayed above.


Processing Dynamic_close_session_funnel
Funnel data:
             stage  value
0    Session Start  93319
1  User Engagement  34554
2        View Item  61685
3      Add To Cart   6457
4         Purchase   1741

Creating plot for Dynamic_close_session_funnel


Plot should be displayed above.



Template for funnel dashboard (need to fix)


https://lookerstudio.google.com/u/0/reporting/796f20a8-99f1-4215-9055-8694a7fb8199/page/p_1o7xfswgld/edit