<a href="https://colab.research.google.com/github/aliasoblomov/Backfill-GA4-to-BigQuery/blob/main/GA4_tables_backfill.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install google-analytics-data==0.18.4
!pip install google-cloud-bigquery
!pip install google-auth==2.27.0
!pip install google-auth-oauthlib
!pip install google-auth-httplib2

In [None]:
import pandas as pd
from google.cloud import bigquery
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import DateRange, Dimension, Metric, RunReportRequest
from google.oauth2 import service_account
from google_auth_oauthlib.flow import Flow
import json
import os
import pickle

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# Authenticate with service account for BigQuery
creds1 = service_account.Credentials.from_service_account_file(
    config['SERVICE_ACCOUNT_FILE'],
    scopes=['https://www.googleapis.com/auth/analytics.readonly', 'https://www.googleapis.com/auth/bigquery']
)
bq_client = bigquery.Client(credentials=creds1, project=creds1.project_id)

# Authenticate for GA4 Analytics Data API using OAuth2
def authenticate_ga4():
    creds = None
    if os.path.exists('token.pickle'):
        with open('token.pickle', 'rb') as token:
            creds = pickle.load(token)
    else:
        flow = Flow.from_client_secrets_file(
            config['CLIENT_SECRET_FILE'],
            scopes=config['SCOPES'],
            redirect_uri='http://localhost:8080/'
        )
        auth_url, _ = flow.authorization_url(prompt='consent')
        print('Please go to this URL and finish the authentication: ', auth_url)
        code = input('Enter the authorization code: ')
        flow.fetch_token(code=code)
        creds = flow.credentials
        with open('token.pickle', 'wb') as token:
            pickle.dump(creds, token)
    return creds

# Function to paginate and fetch GA4 report data with logging
def run_report_with_pagination(client, request, limit=10000):
    all_rows = []
    offset = 0
    page_number = 1

    while True:
        # Apply offset and limit to request
        request.offset = offset
        request.limit = limit

        # Fetch report data
        response = client.run_report(request)
        all_rows.extend(response.rows)

        print(f"Fetching data... Page {page_number}, Offset: {offset}, Rows fetched: {len(response.rows)}")

        # If fewer rows are fetched than the limit, we're done
        if len(response.rows) < limit:
            break

        # Update offset and page number to get the next set of rows
        offset += limit
        page_number += 1

    return all_rows

# Function to fetch GA4 data using pagination
def get_ga4_report(client):
    """Fetches GA4 data based on the defined dimensions and metrics."""
    request = RunReportRequest(
        property=f'properties/{config["PROPERTY_ID"]}',
        date_ranges=[DateRange(start_date=config['INITIAL_FETCH_FROM_DATE'], end_date=config['FETCH_TO_DATE'])],
        dimensions=[
            Dimension(name='transactionId'),
            Dimension(name='itemName'),
            Dimension(name='date')  # Added 'date' dimension
        ],
        metrics=[
            Metric(name='itemPurchaseQuantity'),
            Metric(name='itemRevenue')
        ]
    )
    return run_report_with_pagination(client, request)

# Function to convert GA4 response to a DataFrame
def response_to_dataframe(response):
    list_rows = []
    for row in response:
        transaction_id = row.dimension_values[0].value
        item_name = row.dimension_values[1].value
        date_value = row.dimension_values[2].value  # Added date handling
        list_rows.append({
            'transactionId': transaction_id,
            'itemName': item_name,
            'date': date_value,  # Added date column
            'itemPurchaseQuantity': pd.to_numeric(row.metric_values[0].value, errors='coerce') or 0,
            'itemRevenue': pd.to_numeric(row.metric_values[1].value, errors='coerce') or 0
        })
    return pd.DataFrame(list_rows)

# Function to upload data to BigQuery
def upload_to_bigquery(df, table_id):
    # Define BigQuery schema
    schema = [
        bigquery.SchemaField("transactionId", "STRING"),
        bigquery.SchemaField("itemName", "STRING"),
        bigquery.SchemaField("date", "STRING"),  # Added date field in schema
        bigquery.SchemaField("itemPurchaseQuantity", "INTEGER"),
        bigquery.SchemaField("itemRevenue", "FLOAT")
    ]

    # Configure BigQuery job to partition the table by the 'transactionId' field
    table_ref = f"{bq_client.project}.{config['DATASET_ID']}.{table_id}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE
    )

    # Upload the DataFrame to BigQuery
    bq_client.load_table_from_dataframe(df, table_ref, job_config=job_config).result()
    print(f"Data uploaded to {table_ref}")

# Main function
def main():
    try:
        # Authenticate GA4 using OAuth2
        creds = authenticate_ga4()
        client_ga4 = BetaAnalyticsDataClient(credentials=creds)

        # Fetch GA4 data
        ga4_response = get_ga4_report(client_ga4)

        # Convert the response to a DataFrame
        ga4_df = response_to_dataframe(ga4_response)

        # Define the BigQuery table ID and CSV filename (same as table ID)
        table_id = 'ga4_transaction_items'
        csv_filename = f"{table_id}.csv"

        # Save the DataFrame to a CSV file
        ga4_df.to_csv(csv_filename, index=False)
        print(f"Data saved to {csv_filename}")

        # Upload the DataFrame to BigQuery
        upload_to_bigquery(ga4_df, table_id)
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()


In [None]:
import pandas as pd
from google.cloud import bigquery
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import DateRange, Dimension, Metric, RunReportRequest
from google.oauth2 import service_account
from google_auth_oauthlib.flow import Flow
import json
import os
import pickle

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# Authenticate with service account for BigQuery
creds1 = service_account.Credentials.from_service_account_file(
    config['SERVICE_ACCOUNT_FILE'],
    scopes=['https://www.googleapis.com/auth/analytics.readonly', 'https://www.googleapis.com/auth/bigquery']
)
bq_client = bigquery.Client(credentials=creds1, project=creds1.project_id)

# Authenticate for GA4 Analytics Data API using OAuth2
def authenticate_ga4():
    creds = None
    if os.path.exists('token.pickle'):
        with open('token.pickle', 'rb') as token:
            creds = pickle.load(token)
    else:
        flow = Flow.from_client_secrets_file(
            config['CLIENT_SECRET_FILE'],
            scopes=config['SCOPES'],
            redirect_uri='http://localhost:8080/'
        )
        auth_url, _ = flow.authorization_url(prompt='consent')
        print('Please go to this URL and finish the authentication: ', auth_url)
        code = input('Enter the authorization code: ')
        flow.fetch_token(code=code)
        creds = flow.credentials
        with open('token.pickle', 'wb') as token:
            pickle.dump(creds, token)
    return creds

# Function to paginate and fetch GA4 report data with logging
def run_report_with_pagination(client, request, limit=10000):
    all_rows = []
    offset = 0
    page_number = 1

    while True:
        # Apply offset and limit to request
        request.offset = offset
        request.limit = limit

        # Fetch report data
        response = client.run_report(request)
        all_rows.extend(response.rows)

        print(f"Fetching data... Page {page_number}, Offset: {offset}, Rows fetched: {len(response.rows)}")

        # If fewer rows are fetched than the limit, we're done
        if len(response.rows) < limit:
            break

        # Update offset and page number to get the next set of rows
        offset += limit
        page_number += 1

    return all_rows

# Function to fetch GA4 data using pagination
def get_ga4_report(client):
    """Fetches GA4 data based on the defined dimensions and metrics."""
    request = RunReportRequest(
        property=f'properties/{config["PROPERTY_ID"]}',
        date_ranges=[DateRange(start_date=config['INITIAL_FETCH_FROM_DATE'], end_date=config['FETCH_TO_DATE'])],
        dimensions=[Dimension(name='date'), Dimension(name='sessionDefaultChannelGroup')],
        metrics=[
            Metric(name='sessions'),
            Metric(name='totalUsers'),
            Metric(name='newUsers'),
            Metric(name='ecommercePurchases'),
            Metric(name='purchaseRevenue'),
        ]
    )
    return run_report_with_pagination(client, request)

# Function to convert GA4 response to a DataFrame
def response_to_dataframe(response):
    list_rows = []
    for row in response:
        try:
            date_value = pd.to_datetime(row.dimension_values[0].value, format='%Y%m%d')
        except ValueError:
            date_value = pd.NaT  # Use Not-a-Time for dates that fail to convert
        session_channel_group = row.dimension_values[1].value
        list_rows.append({
            'date': date_value,
            'sessionPrimaryChannelGroup': session_channel_group,
            'sessions': pd.to_numeric(row.metric_values[0].value, errors='coerce') or 0,
            'totalUsers': pd.to_numeric(row.metric_values[1].value, errors='coerce') or 0,
            'newUsers': pd.to_numeric(row.metric_values[2].value, errors='coerce') or 0,
            'ecommercePurchases': pd.to_numeric(row.metric_values[3].value, errors='coerce') or 0,
            'purchaseRevenue': pd.to_numeric(row.metric_values[4].value, errors='coerce') or 0
        })
    return pd.DataFrame(list_rows)

# Function to upload data to BigQuery
def upload_to_bigquery(df, table_id):
    # Define BigQuery schema
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("sessionPrimaryChannelGroup", "STRING"),
        bigquery.SchemaField("sessions", "INTEGER"),
        bigquery.SchemaField("totalUsers", "INTEGER"),
        bigquery.SchemaField("newUsers", "INTEGER"),
        bigquery.SchemaField("ecommercePurchases", "INTEGER"),
        bigquery.SchemaField("purchaseRevenue", "FLOAT")
    ]

    # Configure BigQuery job to partition the table by the 'date' column
    table_ref = f"{bq_client.project}.{config['DATASET_ID']}.{table_id}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload the DataFrame to BigQuery
    bq_client.load_table_from_dataframe(df, table_ref, job_config=job_config).result()
    print(f"Data uploaded and partitioned by date to {table_ref}")

# Main function
def main():
    try:
        # Authenticate GA4 using OAuth2
        creds = authenticate_ga4()
        client_ga4 = BetaAnalyticsDataClient(credentials=creds)

        # Fetch GA4 data
        ga4_response = get_ga4_report(client_ga4)

        # Convert the response to a DataFrame
        ga4_df = response_to_dataframe(ga4_response)

        # Define the BigQuery table ID and CSV filename (same as table ID)
        table_id = 'ga4_data_session_channel_group'
        csv_filename = f"{table_id}.csv"

        # Save the DataFrame to a CSV file
        ga4_df.to_csv(csv_filename, index=False)
        print(f"Data saved to {csv_filename}")

        # Upload the DataFrame to BigQuery
        upload_to_bigquery(ga4_df, table_id)
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()

In [None]:
import pandas as pd
from google.cloud import bigquery
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import DateRange, Dimension, Metric, RunReportRequest
from google.oauth2 import service_account
from google_auth_oauthlib.flow import Flow
import json
import os
import pickle

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# Authenticate with service account for BigQuery
creds1 = service_account.Credentials.from_service_account_file(
    config['SERVICE_ACCOUNT_FILE'],
    scopes=['https://www.googleapis.com/auth/analytics.readonly', 'https://www.googleapis.com/auth/bigquery']
)
bq_client = bigquery.Client(credentials=creds1, project=creds1.project_id)

# Authenticate for GA4 Analytics Data API using OAuth2
def authenticate_ga4():
    creds = None
    if os.path.exists('token.pickle'):
        with open('token.pickle', 'rb') as token:
            creds = pickle.load(token)
    else:
        flow = Flow.from_client_secrets_file(
            config['CLIENT_SECRET_FILE'],
            scopes=config['SCOPES'],
            redirect_uri='http://localhost:8080/'
        )
        auth_url, _ = flow.authorization_url(prompt='consent')
        print('Please go to this URL and finish the authentication: ', auth_url)
        code = input('Enter the authorization code: ')
        flow.fetch_token(code=code)
        creds = flow.credentials
        with open('token.pickle', 'wb') as token:
            pickle.dump(creds, token)
    return creds

# Function to paginate and fetch GA4 report data with logging
def run_report_with_pagination(client, request, limit=10000):
    all_rows = []
    offset = 0
    page_number = 1

    while True:
        # Apply offset and limit to request
        request.offset = offset
        request.limit = limit

        # Fetch report data
        response = client.run_report(request)
        all_rows.extend(response.rows)

        print(f"Fetching data... Page {page_number}, Offset: {offset}, Rows fetched: {len(response.rows)}")

        # If fewer rows are fetched than the limit, we're done
        if len(response.rows) < limit:
            break

        # Update offset and page number to get the next set of rows
        offset += limit
        page_number += 1

    return all_rows

# Function to fetch GA4 data using pagination
def get_ga4_report(client):
    """Fetches GA4 data based on the defined dimensions and metrics."""
    request = RunReportRequest(
        property=f'properties/{config["PROPERTY_ID"]}',
        date_ranges=[DateRange(start_date=config['INITIAL_FETCH_FROM_DATE'], end_date=config['FETCH_TO_DATE'])],
        dimensions=[
            Dimension(name='date'),
            Dimension(name='sessionSource'),
            Dimension(name='sessionCampaignName'),
            Dimension(name='sessionMedium')
        ],
        metrics=[
            Metric(name='sessions'),
            Metric(name='totalUsers'),
            Metric(name='newUsers'),
            Metric(name='ecommercePurchases'),
            Metric(name='purchaseRevenue'),
        ]
    )
    return run_report_with_pagination(client, request)

# Function to convert GA4 response to a DataFrame
def response_to_dataframe(response):
    list_rows = []
    for row in response:
        try:
            date_value = pd.to_datetime(row.dimension_values[0].value, format='%Y%m%d')
        except ValueError:
            date_value = pd.NaT  # Use Not-a-Time for dates that fail to convert
        session_source = row.dimension_values[1].value
        session_campaign_name = row.dimension_values[2].value
        session_medium = row.dimension_values[3].value
        list_rows.append({
            'date': date_value,
            'sessionSource': session_source,
            'sessionCampaignName': session_campaign_name,
            'sessionMedium': session_medium,
            'sessions': pd.to_numeric(row.metric_values[0].value, errors='coerce') or 0,
            'totalUsers': pd.to_numeric(row.metric_values[1].value, errors='coerce') or 0,
            'newUsers': pd.to_numeric(row.metric_values[2].value, errors='coerce') or 0,
            'ecommercePurchases': pd.to_numeric(row.metric_values[3].value, errors='coerce') or 0,
            'purchaseRevenue': pd.to_numeric(row.metric_values[4].value, errors='coerce') or 0
        })
    return pd.DataFrame(list_rows)

# Function to upload data to BigQuery
def upload_to_bigquery(df, table_id):
    # Define BigQuery schema
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("sessionSource", "STRING"),
        bigquery.SchemaField("sessionCampaignName", "STRING"),
        bigquery.SchemaField("sessionMedium", "STRING"),
        bigquery.SchemaField("sessions", "INTEGER"),
        bigquery.SchemaField("totalUsers", "INTEGER"),
        bigquery.SchemaField("newUsers", "INTEGER"),
        bigquery.SchemaField("ecommercePurchases", "INTEGER"),
        bigquery.SchemaField("purchaseRevenue", "FLOAT")
    ]

    # Configure BigQuery job to partition the table by the 'date' column
    table_ref = f"{bq_client.project}.{config['DATASET_ID']}.{table_id}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload the DataFrame to BigQuery
    bq_client.load_table_from_dataframe(df, table_ref, job_config=job_config).result()
    print(f"Data uploaded and partitioned by date to {table_ref}")

# Main function
def main():
    try:
        # Authenticate GA4 using OAuth2
        creds = authenticate_ga4()
        client_ga4 = BetaAnalyticsDataClient(credentials=creds)

        # Fetch GA4 data
        ga4_response = get_ga4_report(client_ga4)

        # Convert the response to a DataFrame
        ga4_df = response_to_dataframe(ga4_response)

        # Define the BigQuery table ID and CSV filename (same as table ID)
        table_id = 'ga4_data_session_source_campaign_medium'
        csv_filename = f"{table_id}.csv"

        # Save the DataFrame to a CSV file
        ga4_df.to_csv(csv_filename, index=False)
        print(f"Data saved to {csv_filename}")

        # Upload the DataFrame to BigQuery
        upload_to_bigquery(ga4_df, table_id)
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()


In [None]:
import pandas as pd
from google.cloud import bigquery
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import DateRange, Dimension, Metric, RunReportRequest
from google.oauth2 import service_account
from google_auth_oauthlib.flow import Flow
import json
import os
import pickle

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# Authenticate with service account for BigQuery
creds1 = service_account.Credentials.from_service_account_file(
    config['SERVICE_ACCOUNT_FILE'],
    scopes=['https://www.googleapis.com/auth/analytics.readonly', 'https://www.googleapis.com/auth/bigquery']
)
bq_client = bigquery.Client(credentials=creds1, project=creds1.project_id)

# Authenticate for GA4 Analytics Data API using OAuth2
def authenticate_ga4():
    creds = None
    if os.path.exists('token.pickle'):
        with open('token.pickle', 'rb') as token:
            creds = pickle.load(token)
    else:
        flow = Flow.from_client_secrets_file(
            config['CLIENT_SECRET_FILE'],
            scopes=config['SCOPES'],
            redirect_uri='http://localhost:8080/'
        )
        auth_url, _ = flow.authorization_url(prompt='consent')
        print('Please go to this URL and finish the authentication: ', auth_url)
        code = input('Enter the authorization code: ')
        flow.fetch_token(code=code)
        creds = flow.credentials
        with open('token.pickle', 'wb') as token:
            pickle.dump(creds, token)
    return creds

# Function to paginate and fetch GA4 report data with logging
def run_report_with_pagination(client, request, limit=10000):
    all_rows = []
    offset = 0
    page_number = 1

    while True:
        # Apply offset and limit to request
        request.offset = offset
        request.limit = limit

        # Fetch report data
        response = client.run_report(request)
        all_rows.extend(response.rows)

        print(f"Fetching data... Page {page_number}, Offset: {offset}, Rows fetched: {len(response.rows)}")

        # If fewer rows are fetched than the limit, we're done
        if len(response.rows) < limit:
            break

        # Update offset and page number to get the next set of rows
        offset += limit
        page_number += 1

    return all_rows

# Function to fetch GA4 data using pagination
def get_ga4_report(client):
    """Fetches GA4 data based on the defined dimensions and metrics."""
    request = RunReportRequest(
        property=f'properties/{config["PROPERTY_ID"]}',
        date_ranges=[DateRange(start_date=config['INITIAL_FETCH_FROM_DATE'], end_date=config['FETCH_TO_DATE'])],
        dimensions=[
            Dimension(name='date'),
            Dimension(name='country'),
            Dimension(name='language'),
            Dimension(name='city')
        ],
        metrics=[
            Metric(name='sessions'),
            Metric(name='screenPageViews'),
            Metric(name='totalUsers'),
            Metric(name='newUsers'),
            Metric(name='ecommercePurchases'),
            Metric(name='purchaseRevenue')
        ]
    )
    return run_report_with_pagination(client, request)

# Function to convert GA4 response to a DataFrame
def response_to_dataframe(response):
    list_rows = []
    for row in response:
        try:
            date_value = pd.to_datetime(row.dimension_values[0].value, format='%Y%m%d')
        except ValueError:
            date_value = pd.NaT  # Use Not-a-Time for dates that fail to convert
        country = row.dimension_values[1].value
        language = row.dimension_values[2].value
        city = row.dimension_values[3].value
        list_rows.append({
            'date': date_value,
            'country': country,
            'language': language,
            'city': city,
            'sessions': pd.to_numeric(row.metric_values[0].value, errors='coerce') or 0,
            'screenPageViews': pd.to_numeric(row.metric_values[1].value, errors='coerce') or 0,
            'totalUsers': pd.to_numeric(row.metric_values[2].value, errors='coerce') or 0,
            'newUsers': pd.to_numeric(row.metric_values[3].value, errors='coerce') or 0,
            'ecommercePurchases': pd.to_numeric(row.metric_values[4].value, errors='coerce') or 0,
            'purchaseRevenue': pd.to_numeric(row.metric_values[5].value, errors='coerce') or 0
        })
    return pd.DataFrame(list_rows)

# Function to upload data to BigQuery
def upload_to_bigquery(df, table_id):
    # Define BigQuery schema
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("country", "STRING"),
        bigquery.SchemaField("language", "STRING"),
        bigquery.SchemaField("city", "STRING"),
        bigquery.SchemaField("sessions", "INTEGER"),
        bigquery.SchemaField("screenPageViews", "INTEGER"),
        bigquery.SchemaField("totalUsers", "INTEGER"),
        bigquery.SchemaField("newUsers", "INTEGER"),
        bigquery.SchemaField("ecommercePurchases", "INTEGER"),
        bigquery.SchemaField("purchaseRevenue", "FLOAT")
    ]

    # Configure BigQuery job to partition the table by the 'date' column
    table_ref = f"{bq_client.project}.{config['DATASET_ID']}.{table_id}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload the DataFrame to BigQuery
    bq_client.load_table_from_dataframe(df, table_ref, job_config=job_config).result()
    print(f"Data uploaded and partitioned by date to {table_ref}")

# Main function
def main():
    try:
        # Authenticate GA4 using OAuth2
        creds = authenticate_ga4()
        client_ga4 = BetaAnalyticsDataClient(credentials=creds)

        # Fetch GA4 data
        ga4_response = get_ga4_report(client_ga4)

        # Convert the response to a DataFrame
        ga4_df = response_to_dataframe(ga4_response)

        # Define the BigQuery table ID and CSV filename (same as table ID)
        table_id = 'ga4_data_country_language_city'
        csv_filename = f"{table_id}.csv"

        # Save the DataFrame to a CSV file
        ga4_df.to_csv(csv_filename, index=False)
        print(f"Data saved to {csv_filename}")

        # Upload the DataFrame to BigQuery
        upload_to_bigquery(ga4_df, table_id)
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()


In [None]:
import pandas as pd
from google.cloud import bigquery
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import DateRange, Dimension, Metric, RunReportRequest
from google.oauth2 import service_account
from google_auth_oauthlib.flow import Flow
import json
import os
import pickle

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# Authenticate with service account for BigQuery
creds1 = service_account.Credentials.from_service_account_file(
    config['SERVICE_ACCOUNT_FILE'],
    scopes=['https://www.googleapis.com/auth/analytics.readonly', 'https://www.googleapis.com/auth/bigquery']
)
bq_client = bigquery.Client(credentials=creds1, project=creds1.project_id)

# Authenticate for GA4 Analytics Data API using OAuth2
def authenticate_ga4():
    creds = None
    if os.path.exists('token.pickle'):
        with open('token.pickle', 'rb') as token:
            creds = pickle.load(token)
    else:
        flow = Flow.from_client_secrets_file(
            config['CLIENT_SECRET_FILE'],
            scopes=config['SCOPES'],
            redirect_uri='http://localhost:8080/'
        )
        auth_url, _ = flow.authorization_url(prompt='consent')
        print('Please go to this URL and finish the authentication: ', auth_url)
        code = input('Enter the authorization code: ')
        flow.fetch_token(code=code)
        creds = flow.credentials
        with open('token.pickle', 'wb') as token:
            pickle.dump(creds, token)
    return creds

# Function to paginate and fetch GA4 report data with logging
def run_report_with_pagination(client, request, limit=1000):
    all_rows = []
    offset = 0
    page_number = 1

    while True:
        # Apply offset and limit to request
        request.offset = offset
        request.limit = limit

        # Fetch report data
        response = client.run_report(request)
        all_rows.extend(response.rows)

        print(f"Fetching data... Page {page_number}, Offset: {offset}, Rows fetched: {len(response.rows)}")

        # If fewer rows are fetched than the limit, we're done
        if len(response.rows) < limit:
            break

        # Update offset and page number to get the next set of rows
        offset += limit
        page_number += 1

    return all_rows

# Function to fetch GA4 data using pagination
def get_ga4_report(client):
    """Fetches GA4 data based on the defined dimensions and metrics."""
    request = RunReportRequest(
        property=f'properties/{config["PROPERTY_ID"]}',
        date_ranges=[DateRange(start_date=config['INITIAL_FETCH_FROM_DATE'], end_date=config['FETCH_TO_DATE'])],
        dimensions=[
            Dimension(name='date'),
            Dimension(name='itemName')
        ],
        metrics=[
            Metric(name='itemPurchaseQuantity'),
            Metric(name='itemRevenue')
        ]
    )
    return run_report_with_pagination(client, request)

# Function to convert GA4 response to a DataFrame
def response_to_dataframe(response):
    list_rows = []
    for row in response:
        try:
            date_value = pd.to_datetime(row.dimension_values[0].value, format='%Y%m%d')
        except ValueError:
            date_value = pd.NaT  # Use Not-a-Time for dates that fail to convert
        item_name = row.dimension_values[1].value
        list_rows.append({
            'date': date_value,
            'itemName': item_name,
            'itemPurchaseQuantity': pd.to_numeric(row.metric_values[0].value, errors='coerce') or 0,
            'itemRevenue': pd.to_numeric(row.metric_values[1].value, errors='coerce') or 0
        })
    return pd.DataFrame(list_rows)

# Function to upload data to BigQuery
def upload_to_bigquery(df, table_id):
    # Define BigQuery schema
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("itemName", "STRING"),
        bigquery.SchemaField("itemPurchaseQuantity", "INTEGER"),
        bigquery.SchemaField("itemRevenue", "FLOAT")
    ]

    # Configure BigQuery job to partition the table by the 'date' column
    table_ref = f"{bq_client.project}.{config['DATASET_ID']}.{table_id}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload the DataFrame to BigQuery
    bq_client.load_table_from_dataframe(df, table_ref, job_config=job_config).result()
    print(f"Data uploaded and partitioned by date to {table_ref}")

# Main function
def main():
    try:
        # Authenticate GA4 using OAuth2
        creds = authenticate_ga4()
        client_ga4 = BetaAnalyticsDataClient(credentials=creds)

        # Fetch GA4 data
        ga4_response = get_ga4_report(client_ga4)

        # Convert the response to a DataFrame
        ga4_df = response_to_dataframe(ga4_response)

        # Define the BigQuery table ID and CSV filename (same as table ID)
        table_id = 'ga4_data_item_name'
        csv_filename = f"{table_id}.csv"

        # Save the DataFrame to a CSV file
        ga4_df.to_csv(csv_filename, index=False)
        print(f"Data saved to {csv_filename}")

        # Upload the DataFrame to BigQuery
        upload_to_bigquery(ga4_df, table_id)
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()


In [None]:
import pandas as pd
from google.cloud import bigquery
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import DateRange, Dimension, Metric, RunReportRequest
from google.oauth2 import service_account
from google_auth_oauthlib.flow import Flow
import json
import os
import pickle

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# Authenticate with service account for BigQuery
creds1 = service_account.Credentials.from_service_account_file(
    config['SERVICE_ACCOUNT_FILE'],
    scopes=['https://www.googleapis.com/auth/analytics.readonly', 'https://www.googleapis.com/auth/bigquery']
)
bq_client = bigquery.Client(credentials=creds1, project=creds1.project_id)

# Authenticate for GA4 Analytics Data API using OAuth2
def authenticate_ga4():
    creds = None
    if os.path.exists('token.pickle'):
        with open('token.pickle', 'rb') as token:
            creds = pickle.load(token)
    else:
        flow = Flow.from_client_secrets_file(
            config['CLIENT_SECRET_FILE'],
            scopes=config['SCOPES'],
            redirect_uri='http://localhost:8080/'
        )
        auth_url, _ = flow.authorization_url(prompt='consent')
        print('Please go to this URL and finish the authentication: ', auth_url)
        code = input('Enter the authorization code: ')
        flow.fetch_token(code=code)
        creds = flow.credentials
        with open('token.pickle', 'wb') as token:
            pickle.dump(creds, token)
    return creds

# Function to paginate and fetch GA4 report data with logging
def run_report_with_pagination(client, request, limit=1000):
    all_rows = []
    offset = 0
    page_number = 1

    while True:
        # Apply offset and limit to request
        request.offset = offset
        request.limit = limit

        # Fetch report data
        response = client.run_report(request)
        all_rows.extend(response.rows)

        print(f"Fetching data... Page {page_number}, Offset: {offset}, Rows fetched: {len(response.rows)}")

        # If fewer rows are fetched than the limit, we're done
        if len(response.rows) < limit:
            break

        # Update offset and page number to get the next set of rows
        offset += limit
        page_number += 1

    return all_rows

# Function to fetch GA4 data using pagination
def get_ga4_report(client):
    """Fetches GA4 data based on the defined dimensions and metrics."""
    request = RunReportRequest(
        property=f'properties/{config["PROPERTY_ID"]}',
        date_ranges=[DateRange(start_date=config['INITIAL_FETCH_FROM_DATE'], end_date=config['FETCH_TO_DATE'])],
        dimensions=[
            Dimension(name='date'),
            Dimension(name='browser'),
            Dimension(name='operatingSystem'),
            Dimension(name='deviceCategory')
        ],
        metrics=[
            Metric(name='sessions'),
            Metric(name='screenPageViews'),
            Metric(name='totalUsers'),
            Metric(name='newUsers'),
            Metric(name='ecommercePurchases'),
            Metric(name='purchaseRevenue')
        ]
    )
    return run_report_with_pagination(client, request)

# Function to convert GA4 response to a DataFrame
def response_to_dataframe(response):
    list_rows = []
    for row in response:
        try:
            date_value = pd.to_datetime(row.dimension_values[0].value, format='%Y%m%d')
        except ValueError:
            date_value = pd.NaT  # Use Not-a-Time for dates that fail to convert
        browser = row.dimension_values[1].value
        operating_system = row.dimension_values[2].value
        device_category = row.dimension_values[3].value
        list_rows.append({
            'date': date_value,
            'browser': browser,
            'operatingSystem': operating_system,
            'deviceCategory': device_category,
            'sessions': pd.to_numeric(row.metric_values[0].value, errors='coerce') or 0,
            'screenPageViews': pd.to_numeric(row.metric_values[1].value, errors='coerce') or 0,
            'totalUsers': pd.to_numeric(row.metric_values[2].value, errors='coerce') or 0,
            'newUsers': pd.to_numeric(row.metric_values[3].value, errors='coerce') or 0,
            'ecommercePurchases': pd.to_numeric(row.metric_values[4].value, errors='coerce') or 0,
            'purchaseRevenue': pd.to_numeric(row.metric_values[5].value, errors='coerce') or 0
        })
    return pd.DataFrame(list_rows)

# Function to upload data to BigQuery
def upload_to_bigquery(df, table_id):
    # Define BigQuery schema
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("browser", "STRING"),
        bigquery.SchemaField("operatingSystem", "STRING"),
        bigquery.SchemaField("deviceCategory", "STRING"),
        bigquery.SchemaField("sessions", "INTEGER"),
        bigquery.SchemaField("screenPageViews", "INTEGER"),
        bigquery.SchemaField("totalUsers", "INTEGER"),
        bigquery.SchemaField("newUsers", "INTEGER"),
        bigquery.SchemaField("ecommercePurchases", "INTEGER"),
        bigquery.SchemaField("purchaseRevenue", "FLOAT")
    ]

    # Configure BigQuery job to partition the table by the 'date' column
    table_ref = f"{bq_client.project}.{config['DATASET_ID']}.{table_id}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload the DataFrame to BigQuery
    bq_client.load_table_from_dataframe(df, table_ref, job_config=job_config).result()
    print(f"Data uploaded and partitioned by date to {table_ref}")

# Main function
def main():
    try:
        # Authenticate GA4 using OAuth2
        creds = authenticate_ga4()
        client_ga4 = BetaAnalyticsDataClient(credentials=creds)

        # Fetch GA4 data
        ga4_response = get_ga4_report(client_ga4)

        # Convert the response to a DataFrame
        ga4_df = response_to_dataframe(ga4_response)

        # Define the BigQuery table ID and CSV filename (same as table ID)
        table_id = 'ga4_data_browser_os_device'
        csv_filename = f"{table_id}.csv"

        # Save the DataFrame to a CSV file
        ga4_df.to_csv(csv_filename, index=False)
        print(f"Data saved to {csv_filename}")

        # Upload the DataFrame to BigQuery
        upload_to_bigquery(ga4_df, table_id)
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()


In [None]:
import pandas as pd
from google.cloud import bigquery
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import DateRange, Dimension, Metric, RunReportRequest
from google.oauth2 import service_account
from google_auth_oauthlib.flow import Flow
import json
import os
import pickle

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# Authenticate with service account for BigQuery
creds1 = service_account.Credentials.from_service_account_file(
    config['SERVICE_ACCOUNT_FILE'],
    scopes=['https://www.googleapis.com/auth/analytics.readonly', 'https://www.googleapis.com/auth/bigquery']
)
bq_client = bigquery.Client(credentials=creds1, project=creds1.project_id)

# Authenticate for GA4 Analytics Data API using OAuth2
def authenticate_ga4():
    creds = None
    if os.path.exists('token.pickle'):
        with open('token.pickle', 'rb') as token:
            creds = pickle.load(token)
    else:
        flow = Flow.from_client_secrets_file(
            config['CLIENT_SECRET_FILE'],
            scopes=config['SCOPES'],
            redirect_uri='http://localhost:8080/'
        )
        auth_url, _ = flow.authorization_url(prompt='consent')
        print('Please go to this URL and finish the authentication: ', auth_url)
        code = input('Enter the authorization code: ')
        flow.fetch_token(code=code)
        creds = flow.credentials
        with open('token.pickle', 'wb') as token:
            pickle.dump(creds, token)
    return creds

# Function to paginate and fetch GA4 report data with logging
def run_report_with_pagination(client, request, limit=10000):
    all_rows = []
    offset = 0
    page_number = 1

    while True:
        # Apply offset and limit to request
        request.offset = offset
        request.limit = limit

        # Fetch report data
        response = client.run_report(request)
        all_rows.extend(response.rows)

        print(f"Fetching data... Page {page_number}, Offset: {offset}, Rows fetched: {len(response.rows)}")

        # If fewer rows are fetched than the limit, we're done
        if len(response.rows) < limit:
            break

        # Update offset and page number to get the next set of rows
        offset += limit
        page_number += 1

    return all_rows

# Function to fetch GA4 data using pagination
def get_ga4_report(client):
    """Fetches GA4 data based on the defined dimensions and metrics."""
    request = RunReportRequest(
        property=f'properties/{config["PROPERTY_ID"]}',
        date_ranges=[DateRange(start_date=config['INITIAL_FETCH_FROM_DATE'], end_date=config['FETCH_TO_DATE'])],
        dimensions=[
            Dimension(name='date'),
            Dimension(name='firstUserMedium'),
            Dimension(name='firstUserSource'),
            Dimension(name='firstUserCampaignName')
        ],
        metrics=[
            Metric(name='totalUsers'),
            Metric(name='newUsers'),
            Metric(name='ecommercePurchases'),
            Metric(name='purchaseRevenue')
        ]
    )
    return run_report_with_pagination(client, request)

# Function to convert GA4 response to a DataFrame
def response_to_dataframe(response):
    list_rows = []
    for row in response:
        try:
            date_value = pd.to_datetime(row.dimension_values[0].value, format='%Y%m%d')
        except ValueError:
            date_value = pd.NaT  # Use Not-a-Time for dates that fail to convert
        first_user_medium = row.dimension_values[1].value
        first_user_source = row.dimension_values[2].value
        first_user_campaign_name = row.dimension_values[3].value
        list_rows.append({
            'date': date_value,
            'firstUserMedium': first_user_medium,
            'firstUserSource': first_user_source,
            'firstUserCampaignName': first_user_campaign_name,
            'totalUsers': pd.to_numeric(row.metric_values[0].value, errors='coerce') or 0,
            'newUsers': pd.to_numeric(row.metric_values[1].value, errors='coerce') or 0,
            'ecommercePurchases': pd.to_numeric(row.metric_values[2].value, errors='coerce') or 0,
            'purchaseRevenue': pd.to_numeric(row.metric_values[3].value, errors='coerce') or 0
        })
    return pd.DataFrame(list_rows)

# Function to upload data to BigQuery
def upload_to_bigquery(df, table_id):
    # Define BigQuery schema
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("firstUserMedium", "STRING"),
        bigquery.SchemaField("firstUserSource", "STRING"),
        bigquery.SchemaField("firstUserCampaignName", "STRING"),
        bigquery.SchemaField("totalUsers", "INTEGER"),
        bigquery.SchemaField("newUsers", "INTEGER"),
        bigquery.SchemaField("ecommercePurchases", "INTEGER"),
        bigquery.SchemaField("purchaseRevenue", "FLOAT")
    ]

    # Configure BigQuery job to partition the table by the 'date' column
    table_ref = f"{bq_client.project}.{config['DATASET_ID']}.{table_id}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload the DataFrame to BigQuery
    bq_client.load_table_from_dataframe(df, table_ref, job_config=job_config).result()
    print(f"Data uploaded and partitioned by date to {table_ref}")

# Main function
def main():
    try:
        # Authenticate GA4 using OAuth2
        creds = authenticate_ga4()
        client_ga4 = BetaAnalyticsDataClient(credentials=creds)

        # Fetch GA4 data
        ga4_response = get_ga4_report(client_ga4)

        # Convert the response to a DataFrame
        ga4_df = response_to_dataframe(ga4_response)

        # Define the BigQuery table ID and CSV filename (same as table ID)
        table_id = 'ga4_data_first_user_source_medium'
        csv_filename = f"{table_id}.csv"

        # Save the DataFrame to a CSV file
        ga4_df.to_csv(csv_filename, index=False)
        print(f"Data saved to {csv_filename}")

        # Upload the DataFrame to BigQuery
        upload_to_bigquery(ga4_df, table_id)
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()


In [None]:
import pandas as pd
from google.cloud import bigquery
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import DateRange, Dimension, Metric, RunReportRequest
from google.oauth2 import service_account
from google_auth_oauthlib.flow import Flow
import json
import os
import pickle

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# Authenticate with service account for BigQuery
creds1 = service_account.Credentials.from_service_account_file(
    config['SERVICE_ACCOUNT_FILE'],
    scopes=['https://www.googleapis.com/auth/analytics.readonly', 'https://www.googleapis.com/auth/bigquery']
)
bq_client = bigquery.Client(credentials=creds1, project=creds1.project_id)

# Authenticate for GA4 Analytics Data API using OAuth2
def authenticate_ga4():
    creds = None
    if os.path.exists('token.pickle'):
        with open('token.pickle', 'rb') as token:
            creds = pickle.load(token)
    else:
        flow = Flow.from_client_secrets_file(
            config['CLIENT_SECRET_FILE'],
            scopes=config['SCOPES'],
            redirect_uri='http://localhost:8080/'
        )
        auth_url, _ = flow.authorization_url(prompt='consent')
        print('Please go to this URL and finish the authentication: ', auth_url)
        code = input('Enter the authorization code: ')
        flow.fetch_token(code=code)
        creds = flow.credentials
        with open('token.pickle', 'wb') as token:
            pickle.dump(creds, token)
    return creds

# Function to paginate and fetch GA4 report data with logging
def run_report_with_pagination(client, request, limit=10000):
    all_rows = []
    offset = 0
    page_number = 1

    while True:
        # Apply offset and limit to request
        request.offset = offset
        request.limit = limit

        # Fetch report data
        response = client.run_report(request)
        all_rows.extend(response.rows)

        print(f"Fetching data... Page {page_number}, Offset: {offset}, Rows fetched: {len(response.rows)}")

        # If fewer rows are fetched than the limit, we're done
        if len(response.rows) < limit:
            break

        # Update offset and page number to get the next set of rows
        offset += limit
        page_number += 1

    return all_rows

# Function to fetch GA4 data using pagination
def get_ga4_report(client):
    """Fetches GA4 data based on the defined dimensions and metrics."""
    request = RunReportRequest(
        property=f'properties/{config["PROPERTY_ID"]}',
        date_ranges=[DateRange(start_date=config['INITIAL_FETCH_FROM_DATE'], end_date=config['FETCH_TO_DATE'])],
        dimensions=[
            Dimension(name='date'),
            Dimension(name='firstUserDefaultChannelGroup')
        ],
        metrics=[
            Metric(name='totalUsers'),
            Metric(name='newUsers'),
            Metric(name='ecommercePurchases'),
            Metric(name='purchaseRevenue')
        ]
    )
    return run_report_with_pagination(client, request)

# Function to convert GA4 response to a DataFrame
def response_to_dataframe(response):
    list_rows = []
    for row in response:
        try:
            date_value = pd.to_datetime(row.dimension_values[0].value, format='%Y%m%d')
        except ValueError:
            date_value = pd.NaT  # Use Not-a-Time for dates that fail to convert
        first_user_channel_group = row.dimension_values[1].value
        list_rows.append({
            'date': date_value,
            'firstUserDefaultChannelGroup': first_user_channel_group,
            'totalUsers': pd.to_numeric(row.metric_values[0].value, errors='coerce') or 0,
            'newUsers': pd.to_numeric(row.metric_values[1].value, errors='coerce') or 0,
            'ecommercePurchases': pd.to_numeric(row.metric_values[2].value, errors='coerce') or 0,
            'purchaseRevenue': pd.to_numeric(row.metric_values[3].value, errors='coerce') or 0
        })
    return pd.DataFrame(list_rows)

# Function to upload data to BigQuery
def upload_to_bigquery(df, table_id):
    # Define BigQuery schema
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("firstUserDefaultChannelGroup", "STRING"),
        bigquery.SchemaField("totalUsers", "INTEGER"),
        bigquery.SchemaField("newUsers", "INTEGER"),
        bigquery.SchemaField("ecommercePurchases", "INTEGER"),
        bigquery.SchemaField("purchaseRevenue", "FLOAT")
    ]

    # Configure BigQuery job to partition the table by the 'date' column
    table_ref = f"{bq_client.project}.{config['DATASET_ID']}.{table_id}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload the DataFrame to BigQuery
    bq_client.load_table_from_dataframe(df, table_ref, job_config=job_config).result()
    print(f"Data uploaded and partitioned by date to {table_ref}")

# Main function
def main():
    try:
        # Authenticate GA4 using OAuth2
        creds = authenticate_ga4()
        client_ga4 = BetaAnalyticsDataClient(credentials=creds)

        # Fetch GA4 data
        ga4_response = get_ga4_report(client_ga4)

        # Convert the response to a DataFrame
        ga4_df = response_to_dataframe(ga4_response)

        # Define the BigQuery table ID and CSV filename (same as table ID)
        table_id = 'ga4_data_first_user_channel_group'
        csv_filename = f"{table_id}.csv"

        # Save the DataFrame to a CSV file
        ga4_df.to_csv(csv_filename, index=False)
        print(f"Data saved to {csv_filename}")

        # Upload the DataFrame to BigQuery
        upload_to_bigquery(ga4_df, table_id)
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()


In [None]:
import pandas as pd
from google.cloud import bigquery
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import DateRange, Dimension, Metric, RunReportRequest
from google.oauth2 import service_account
from google_auth_oauthlib.flow import Flow
import json
import os
import pickle

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# Authenticate with service account for BigQuery
creds1 = service_account.Credentials.from_service_account_file(
    config['SERVICE_ACCOUNT_FILE'],
    scopes=['https://www.googleapis.com/auth/analytics.readonly', 'https://www.googleapis.com/auth/bigquery']
)
bq_client = bigquery.Client(credentials=creds1, project=creds1.project_id)

# Authenticate for GA4 Analytics Data API using OAuth2
def authenticate_ga4():
    creds = None
    if os.path.exists('token.pickle'):
        with open('token.pickle', 'rb') as token:
            creds = pickle.load(token)
    else:
        flow = Flow.from_client_secrets_file(
            config['CLIENT_SECRET_FILE'],
            scopes=config['SCOPES'],
            redirect_uri='http://localhost:8080/'
        )
        auth_url, _ = flow.authorization_url(prompt='consent')
        print('Please go to this URL and finish the authentication: ', auth_url)
        code = input('Enter the authorization code: ')
        flow.fetch_token(code=code)
        creds = flow.credentials
        with open('token.pickle', 'wb') as token:
            pickle.dump(creds, token)
    return creds

# Function to paginate and fetch GA4 report data with logging
def run_report_with_pagination(client, request, limit=10000):
    all_rows = []
    offset = 0
    page_number = 1

    while True:
        # Apply offset and limit to request
        request.offset = offset
        request.limit = limit

        # Fetch report data
        response = client.run_report(request)
        all_rows.extend(response.rows)

        print(f"Fetching data... Page {page_number}, Offset: {offset}, Rows fetched: {len(response.rows)}")

        # If fewer rows are fetched than the limit, we're done
        if len(response.rows) < limit:
            break

        # Update offset and page number to get the next set of rows
        offset += limit
        page_number += 1

    return all_rows

# Function to fetch GA4 data using pagination
def get_ga4_report(client):
    """Fetches GA4 data based on the defined dimensions and metrics."""
    request = RunReportRequest(
        property=f'properties/{config["PROPERTY_ID"]}',
        date_ranges=[DateRange(start_date=config['INITIAL_FETCH_FROM_DATE'], end_date=config['FETCH_TO_DATE'])],
        dimensions=[
            Dimension(name='date'),
            Dimension(name='sessionSource'),
            Dimension(name='sessionMedium'),
            Dimension(name='sessionCampaignName')
        ],
        metrics=[
            Metric(name='ecommercePurchases'),
            Metric(name='averagePurchaseRevenue'),
            Metric(name='purchaseRevenue'),
            Metric(name='advertiserAdClicks'),
            Metric(name='advertiserAdCost'),
            Metric(name='advertiserAdCostPerClick'),
            Metric(name='returnOnAdSpend')
        ]
    )
    return run_report_with_pagination(client, request)

# Function to convert GA4 response to a DataFrame
def response_to_dataframe(response):
    list_rows = []
    for row in response:
        try:
            date_value = pd.to_datetime(row.dimension_values[0].value, format='%Y%m%d')
        except ValueError:
            date_value = pd.NaT  # Use Not-a-Time for dates that fail to convert
        session_source = row.dimension_values[1].value
        session_medium = row.dimension_values[2].value
        session_campaign_name = row.dimension_values[3].value
        list_rows.append({
            'date': date_value,
            'sessionSource': session_source,
            'sessionMedium': session_medium,
            'sessionCampaignName': session_campaign_name,
            'ecommercePurchases': pd.to_numeric(row.metric_values[0].value, errors='coerce') or 0,
            'averagePurchaseRevenue': pd.to_numeric(row.metric_values[1].value, errors='coerce') or 0,
            'purchaseRevenue': pd.to_numeric(row.metric_values[2].value, errors='coerce') or 0,
            'advertiserAdClicks': pd.to_numeric(row.metric_values[3].value, errors='coerce') or 0,
            'advertiserAdCost': pd.to_numeric(row.metric_values[4].value, errors='coerce') or 0,
            'advertiserAdCostPerClick': pd.to_numeric(row.metric_values[5].value, errors='coerce') or 0,
            'returnOnAdSpend': pd.to_numeric(row.metric_values[6].value, errors='coerce') or 0
        })
    return pd.DataFrame(list_rows)

# Function to upload data to BigQuery
def upload_to_bigquery(df, table_id):
    # Define BigQuery schema
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("sessionSource", "STRING"),
        bigquery.SchemaField("sessionMedium", "STRING"),
        bigquery.SchemaField("sessionCampaignName", "STRING"),
        bigquery.SchemaField("ecommercePurchases", "INTEGER"),
        bigquery.SchemaField("averagePurchaseRevenue", "FLOAT"),
        bigquery.SchemaField("purchaseRevenue", "FLOAT"),
        bigquery.SchemaField("advertiserAdClicks", "INTEGER"),
        bigquery.SchemaField("advertiserAdCost", "FLOAT"),
        bigquery.SchemaField("advertiserAdCostPerClick", "FLOAT"),
        bigquery.SchemaField("returnOnAdSpend", "FLOAT")
    ]

    # Configure BigQuery job to partition the table by the 'date' column
    table_ref = f"{bq_client.project}.{config['DATASET_ID']}.{table_id}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload the DataFrame to BigQuery
    bq_client.load_table_from_dataframe(df, table_ref, job_config=job_config).result()
    print(f"Data uploaded and partitioned by date to {table_ref}")

# Main function
def main():
    try:
        # Authenticate GA4 using OAuth2
        creds = authenticate_ga4()
        client_ga4 = BetaAnalyticsDataClient(credentials=creds)

        # Fetch GA4 data
        ga4_response = get_ga4_report(client_ga4)

        # Convert the response to a DataFrame
        ga4_df = response_to_dataframe(ga4_response)

        # Define the BigQuery table ID and CSV filename (same as table ID)
        table_id = 'ga4_ads_data'
        csv_filename = f"{table_id}.csv"

        # Save the DataFrame to a CSV file
        ga4_df.to_csv(csv_filename, index=False)
        print(f"Data saved to {csv_filename}")

        # Upload the DataFrame to BigQuery
        upload_to_bigquery(ga4_df, table_id)
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()


In [None]:
import pandas as pd
from google.cloud import bigquery
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import DateRange, Dimension, Metric, RunReportRequest
from google.oauth2 import service_account
from google_auth_oauthlib.flow import Flow
import json
import os
import pickle

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# Authenticate with service account for BigQuery
creds1 = service_account.Credentials.from_service_account_file(
    config['SERVICE_ACCOUNT_FILE'],
    scopes=['https://www.googleapis.com/auth/analytics.readonly', 'https://www.googleapis.com/auth/bigquery']
)
bq_client = bigquery.Client(credentials=creds1, project=creds1.project_id)

# Authenticate for GA4 Analytics Data API using OAuth2
def authenticate_ga4():
    creds = None
    if os.path.exists('token.pickle'):
        with open('token.pickle', 'rb') as token:
            creds = pickle.load(token)
    else:
        flow = Flow.from_client_secrets_file(
            config['CLIENT_SECRET_FILE'],
            scopes=config['SCOPES'],
            redirect_uri='http://localhost:8080/'
        )
        auth_url, _ = flow.authorization_url(prompt='consent')
        print('Please go to this URL and finish the authentication: ', auth_url)
        code = input('Enter the authorization code: ')
        flow.fetch_token(code=code)
        creds = flow.credentials
        with open('token.pickle', 'wb') as token:
            pickle.dump(creds, token)
    return creds

# Function to paginate and fetch GA4 report data with logging
def run_report_with_pagination(client, request, limit=10000):
    all_rows = []
    offset = 0
    page_number = 1

    while True:
        # Apply offset and limit to request
        request.offset = offset
        request.limit = limit

        # Fetch report data
        response = client.run_report(request)
        all_rows.extend(response.rows)

        print(f"Fetching data... Page {page_number}, Offset: {offset}, Rows fetched: {len(response.rows)}")

        # If fewer rows are fetched than the limit, we're done
        if len(response.rows) < limit:
            break

        # Update offset and page number to get the next set of rows
        offset += limit
        page_number += 1

    return all_rows

# Function to fetch GA4 data using pagination
def get_ga4_report(client):
    """Fetches GA4 data based on the defined dimensions and metrics."""
    request = RunReportRequest(
        property=f'properties/{config["PROPERTY_ID"]}',
        date_ranges=[DateRange(start_date=config['INITIAL_FETCH_FROM_DATE'], end_date=config['FETCH_TO_DATE'])],
        dimensions=[
            Dimension(name='transactionId'),
            Dimension(name='itemName')
        ],
        metrics=[
            Metric(name='itemPurchaseQuantity'),
            Metric(name='itemRevenue')
        ]
    )
    return run_report_with_pagination(client, request)

# Function to convert GA4 response to a DataFrame
def response_to_dataframe(response):
    list_rows = []
    for row in response:
        transaction_id = row.dimension_values[0].value
        item_name = row.dimension_values[1].value
        list_rows.append({
            'transactionId': transaction_id,
            'itemName': item_name,
            'itemPurchaseQuantity': pd.to_numeric(row.metric_values[0].value, errors='coerce') or 0,
            'itemRevenue': pd.to_numeric(row.metric_values[1].value, errors='coerce') or 0
        })
    return pd.DataFrame(list_rows)

# Function to upload data to BigQuery
def upload_to_bigquery(df, table_id):
    # Define BigQuery schema
    schema = [
        bigquery.SchemaField("transactionId", "STRING"),
        bigquery.SchemaField("itemName", "STRING"),
        bigquery.SchemaField("itemPurchaseQuantity", "INTEGER"),
        bigquery.SchemaField("itemRevenue", "FLOAT")
    ]

    # Configure BigQuery job to partition the table by the 'transactionId' field
    table_ref = f"{bq_client.project}.{config['DATASET_ID']}.{table_id}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE
    )

    # Upload the DataFrame to BigQuery
    bq_client.load_table_from_dataframe(df, table_ref, job_config=job_config).result()
    print(f"Data uploaded to {table_ref}")

# Main function
def main():
    try:
        # Authenticate GA4 using OAuth2
        creds = authenticate_ga4()
        client_ga4 = BetaAnalyticsDataClient(credentials=creds)

        # Fetch GA4 data
        ga4_response = get_ga4_report(client_ga4)

        # Convert the response to a DataFrame
        ga4_df = response_to_dataframe(ga4_response)

        # Define the BigQuery table ID and CSV filename (same as table ID)
        table_id = 'ga4_transaction_items'
        csv_filename = f"{table_id}.csv"

        # Save the DataFrame to a CSV file
        ga4_df.to_csv(csv_filename, index=False)
        print(f"Data saved to {csv_filename}")

        # Upload the DataFrame to BigQuery
        upload_to_bigquery(ga4_df, table_id)
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()


In [None]:
import pandas as pd
from google.cloud import bigquery
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import DateRange, Dimension, Metric, RunReportRequest
from google.oauth2 import service_account
from google_auth_oauthlib.flow import Flow
import json
import os
import pickle

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# Authenticate with service account for BigQuery
creds1 = service_account.Credentials.from_service_account_file(
    config['SERVICE_ACCOUNT_FILE'],
    scopes=['https://www.googleapis.com/auth/analytics.readonly', 'https://www.googleapis.com/auth/bigquery']
)
bq_client = bigquery.Client(credentials=creds1, project=creds1.project_id)

# Authenticate for GA4 Analytics Data API using OAuth2
def authenticate_ga4():
    creds = None
    if os.path.exists('token.pickle'):
        with open('token.pickle', 'rb') as token:
            creds = pickle.load(token)
    else:
        flow = Flow.from_client_secrets_file(
            config['CLIENT_SECRET_FILE'],
            scopes=config['SCOPES'],
            redirect_uri='http://localhost:8080/'
        )
        auth_url, _ = flow.authorization_url(prompt='consent')
        print('Please go to this URL and finish the authentication: ', auth_url)
        code = input('Enter the authorization code: ')
        flow.fetch_token(code=code)
        creds = flow.credentials
        with open('token.pickle', 'wb') as token:
            pickle.dump(creds, token)
    return creds

# Function to paginate and fetch GA4 report data with logging
def run_report_with_pagination(client, request, limit=10000):
    all_rows = []
    offset = 0
    page_number = 1

    while True:
        # Apply offset and limit to request
        request.offset = offset
        request.limit = limit

        # Fetch report data
        response = client.run_report(request)
        all_rows.extend(response.rows)

        print(f"Fetching data... Page {page_number}, Offset: {offset}, Rows fetched: {len(response.rows)}")

        # If fewer rows are fetched than the limit, we're done
        if len(response.rows) < limit:
            break

        # Update offset and page number to get the next set of rows
        offset += limit
        page_number += 1

    return all_rows

# Function to fetch GA4 data using pagination
def get_ga4_report(client):
    """Fetches GA4 data based on the defined dimensions and metrics."""
    request = RunReportRequest(
        property=f'properties/{config["PROPERTY_ID"]}',
        date_ranges=[DateRange(start_date=config['INITIAL_FETCH_FROM_DATE'], end_date=config['FETCH_TO_DATE'])],
        dimensions=[Dimension(name='date')],
        metrics=[
            Metric(name='sessions'),
            Metric(name='totalUsers'),
            Metric(name='newUsers'),
            Metric(name='ecommercePurchases'),
            Metric(name='purchaseRevenue'),
            Metric(name='screenPageViews'),
            Metric(name='eventCount'),
            Metric(name='averageSessionDuration'),
            Metric(name='engagedSessions'),
            Metric(name='engagementRate')
        ]
    )
    return run_report_with_pagination(client, request)

# Function to convert GA4 response to a DataFrame
def response_to_dataframe(response):
    list_rows = []
    for row in response:
        try:
            date_value = pd.to_datetime(row.dimension_values[0].value, format='%Y%m%d')
        except ValueError:
            date_value = pd.NaT  # Use Not-a-Time for dates that fail to convert
        list_rows.append({
            'date': date_value,
            'sessions': pd.to_numeric(row.metric_values[0].value, errors='coerce') or 0,
            'totalUsers': pd.to_numeric(row.metric_values[1].value, errors='coerce') or 0,
            'newUsers': pd.to_numeric(row.metric_values[2].value, errors='coerce') or 0,
            'ecommercePurchases': pd.to_numeric(row.metric_values[3].value, errors='coerce') or 0,
            'purchaseRevenue': pd.to_numeric(row.metric_values[4].value, errors='coerce') or 0,
            'screenPageViews': pd.to_numeric(row.metric_values[5].value, errors='coerce') or 0,
            'eventCount': pd.to_numeric(row.metric_values[6].value, errors='coerce') or 0,
            'averageSessionDuration': pd.to_numeric(row.metric_values[7].value, errors='coerce') or 0,
            'engagedSessions': pd.to_numeric(row.metric_values[8].value, errors='coerce') or 0,
            'engagementRate': pd.to_numeric(row.metric_values[9].value, errors='coerce') or 0
        })
    return pd.DataFrame(list_rows)

# Function to upload data to BigQuery
def upload_to_bigquery(df, table_id):
    # Define BigQuery schema
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("sessions", "INTEGER"),
        bigquery.SchemaField("totalUsers", "INTEGER"),
        bigquery.SchemaField("newUsers", "INTEGER"),
        bigquery.SchemaField("ecommercePurchases", "INTEGER"),
        bigquery.SchemaField("purchaseRevenue", "FLOAT"),
        bigquery.SchemaField("screenPageViews", "INTEGER"),
        bigquery.SchemaField("eventCount", "INTEGER"),
        bigquery.SchemaField("averageSessionDuration", "FLOAT"),
        bigquery.SchemaField("engagedSessions", "INTEGER"),
        bigquery.SchemaField("engagementRate", "FLOAT")
    ]

    # Configure BigQuery job to partition the table by the 'date' column
    table_ref = f"{bq_client.project}.{config['DATASET_ID']}.{table_id}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload the DataFrame to BigQuery
    bq_client.load_table_from_dataframe(df, table_ref, job_config=job_config).result()
    print(f"Data uploaded and partitioned by date to {table_ref}")

# Main function
def main():
    try:
        # Authenticate GA4 using OAuth2
        creds = authenticate_ga4()
        client_ga4 = BetaAnalyticsDataClient(credentials=creds)

        # Fetch GA4 data
        ga4_response = get_ga4_report(client_ga4)

        # Convert the response to a DataFrame
        ga4_df = response_to_dataframe(ga4_response)

        # Define the BigQuery table ID and CSV filename (same as table ID)
        table_id = 'ga4_all_metrics_data'
        csv_filename = f"{table_id}.csv"

        # Save the DataFrame to a CSV file
        ga4_df.to_csv(csv_filename, index=False)
        print(f"Data saved to {csv_filename}")

        # Upload the DataFrame to BigQuery
        upload_to_bigquery(ga4_df, table_id)
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()


In [None]:
import pandas as pd
from google.cloud import bigquery
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import DateRange, Dimension, Metric, RunReportRequest
from google.oauth2 import service_account
from google_auth_oauthlib.flow import Flow
import json
import os
import pickle

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# Authenticate with service account for BigQuery
creds1 = service_account.Credentials.from_service_account_file(
    config['SERVICE_ACCOUNT_FILE'],
    scopes=['https://www.googleapis.com/auth/analytics.readonly', 'https://www.googleapis.com/auth/bigquery']
)
bq_client = bigquery.Client(credentials=creds1, project=creds1.project_id)

# Authenticate for GA4 Analytics Data API using OAuth2
def authenticate_ga4():
    creds = None
    if os.path.exists('token.pickle'):
        with open('token.pickle', 'rb') as token:
            creds = pickle.load(token)
    else:
        flow = Flow.from_client_secrets_file(
            config['CLIENT_SECRET_FILE'],
            scopes=config['SCOPES'],
            redirect_uri='http://localhost:8080/'
        )
        auth_url, _ = flow.authorization_url(prompt='consent')
        print('Please go to this URL and finish the authentication: ', auth_url)
        code = input('Enter the authorization code: ')
        flow.fetch_token(code=code)
        creds = flow.credentials
        with open('token.pickle', 'wb') as token:
            pickle.dump(creds, token)
    return creds

# Function to paginate and fetch GA4 report data with logging
def run_report_with_pagination(client, request, limit=10000):
    all_rows = []
    offset = 0
    page_number = 1

    while True:
        # Apply offset and limit to request
        request.offset = offset
        request.limit = limit

        # Fetch report data
        response = client.run_report(request)
        all_rows.extend(response.rows)

        print(f"Fetching data... Page {page_number}, Offset: {offset}, Rows fetched: {len(response.rows)}")

        # If fewer rows are fetched than the limit, we're done
        if len(response.rows) < limit:
            break

        # Update offset and page number to get the next set of rows
        offset += limit
        page_number += 1

    return all_rows

# Function to fetch GA4 data using pagination
def get_ga4_report(client):
    """Fetches GA4 data based on the defined dimensions and metrics."""
    request = RunReportRequest(
        property=f'properties/{config["PROPERTY_ID"]}',
        date_ranges=[DateRange(start_date=config['INITIAL_FETCH_FROM_DATE'], end_date=config['FETCH_TO_DATE'])],
        dimensions=[
            Dimension(name='date'),
            Dimension(name='eventName')
        ],
        metrics=[
            Metric(name='eventCount'),
            Metric(name='eventCountPerUser'),
            Metric(name='eventValue')
        ]
    )
    return run_report_with_pagination(client, request)

# Function to convert GA4 response to a DataFrame
def response_to_dataframe(response):
    list_rows = []
    for row in response:
        try:
            date_value = pd.to_datetime(row.dimension_values[0].value, format='%Y%m%d')
        except ValueError:
            date_value = pd.NaT  # Use Not-a-Time for dates that fail to convert
        event_name = row.dimension_values[1].value
        list_rows.append({
            'date': date_value,
            'eventName': event_name,
            'eventCount': pd.to_numeric(row.metric_values[0].value, errors='coerce') or 0,
            'eventCountPerUser': pd.to_numeric(row.metric_values[1].value, errors='coerce') or 0,
            'eventValue': pd.to_numeric(row.metric_values[2].value, errors='coerce') or 0
        })
    return pd.DataFrame(list_rows)

# Function to upload data to BigQuery
def upload_to_bigquery(df, table_id):
    # Define BigQuery schema
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("eventName", "STRING"),
        bigquery.SchemaField("eventCount", "INTEGER"),
        bigquery.SchemaField("eventCountPerUser", "FLOAT"),
        bigquery.SchemaField("eventValue", "FLOAT")
    ]

    # Configure BigQuery job to partition the table by the 'date' column
    table_ref = f"{bq_client.project}.{config['DATASET_ID']}.{table_id}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload the DataFrame to BigQuery
    bq_client.load_table_from_dataframe(df, table_ref, job_config=job_config).result()
    print(f"Data uploaded and partitioned by date to {table_ref}")

# Main function
def main():
    try:
        # Authenticate GA4 using OAuth2
        creds = authenticate_ga4()
        client_ga4 = BetaAnalyticsDataClient(credentials=creds)

        # Fetch GA4 data
        ga4_response = get_ga4_report(client_ga4)

        # Convert the response to a DataFrame
        ga4_df = response_to_dataframe(ga4_response)

        # Define the BigQuery table ID and CSV filename (same as table ID)
        table_id = 'ga4_event_metrics_data'
        csv_filename = f"{table_id}.csv"

        # Save the DataFrame to a CSV file
        ga4_df.to_csv(csv_filename, index=False)
        print(f"Data saved to {csv_filename}")

        # Upload the DataFrame to BigQuery
        upload_to_bigquery(ga4_df, table_id)
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()


In [None]:
import pandas as pd
from google.cloud import bigquery
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import DateRange, Dimension, Metric, RunReportRequest
from google.oauth2 import service_account
from google_auth_oauthlib.flow import Flow
import json
import os
import pickle

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# Authenticate with service account for BigQuery
creds1 = service_account.Credentials.from_service_account_file(
    config['SERVICE_ACCOUNT_FILE'],
    scopes=['https://www.googleapis.com/auth/analytics.readonly', 'https://www.googleapis.com/auth/bigquery']
)
bq_client = bigquery.Client(credentials=creds1, project=creds1.project_id)

# Authenticate for GA4 Analytics Data API using OAuth2
def authenticate_ga4():
    creds = None
    if os.path.exists('token.pickle'):
        with open('token.pickle', 'rb') as token:
            creds = pickle.load(token)
    else:
        flow = Flow.from_client_secrets_file(
            config['CLIENT_SECRET_FILE'],
            scopes=config['SCOPES'],
            redirect_uri='http://localhost:8080/'
        )
        auth_url, _ = flow.authorization_url(prompt='consent')
        print('Please go to this URL and finish the authentication: ', auth_url)
        code = input('Enter the authorization code: ')
        flow.fetch_token(code=code)
        creds = flow.credentials
        with open('token.pickle', 'wb') as token:
            pickle.dump(creds, token)
    return creds

# Function to paginate and fetch GA4 report data with logging
def run_report_with_pagination(client, request, limit=250000):
    all_rows = []
    offset = 0
    page_number = 1

    while True:
        # Apply offset and limit to request
        request.offset = offset
        request.limit = limit

        # Fetch report data
        response = client.run_report(request)
        all_rows.extend(response.rows)

        print(f"Fetching data... Page {page_number}, Offset: {offset}, Rows fetched: {len(response.rows)}")

        # If fewer rows are fetched than the limit, we're done
        if len(response.rows) < limit:
            break

        # Update offset and page number to get the next set of rows
        offset += limit
        page_number += 1

    return all_rows

# Function to fetch GA4 data using pagination
def get_ga4_report(client):
    """Fetches GA4 data based on the defined dimensions and metrics."""
    request = RunReportRequest(
        property=f'properties/{config["PROPERTY_ID"]}',
        date_ranges=[DateRange(start_date=config['INITIAL_FETCH_FROM_DATE'], end_date=config['FETCH_TO_DATE'])],
        dimensions=[
            Dimension(name='date'),
            Dimension(name='pageLocation')  # New dimension
        ],
        metrics=[
            Metric(name='totalUsers'),
            Metric(name='ecommercePurchases'),
            Metric(name='purchaseRevenue'),
            Metric(name='screenPageViews'),
            Metric(name='eventCount'),
            Metric(name='engagementRate')  # New metrics
        ]
    )
    return run_report_with_pagination(client, request)

# Function to convert GA4 response to a DataFrame
def response_to_dataframe(response):
    list_rows = []
    for row in response:
        try:
            date_value = pd.to_datetime(row.dimension_values[0].value, format='%Y%m%d')
        except ValueError:
            date_value = pd.NaT  # Use Not-a-Time for dates that fail to convert
        page_location = row.dimension_values[1].value  # New dimension
        list_rows.append({
            'date': date_value,
            'pageLocation': page_location,  # New dimension
            'totalUsers': pd.to_numeric(row.metric_values[0].value, errors='coerce') or 0,
            'ecommercePurchases': pd.to_numeric(row.metric_values[1].value, errors='coerce') or 0,
            'purchaseRevenue': pd.to_numeric(row.metric_values[2].value, errors='coerce') or 0,
            'screenPageViews': pd.to_numeric(row.metric_values[3].value, errors='coerce') or 0,
            'eventCount': pd.to_numeric(row.metric_values[4].value, errors='coerce') or 0,
            'engagementRate': pd.to_numeric(row.metric_values[5].value, errors='coerce') or 0  # New metric
        })
    return pd.DataFrame(list_rows)

# Function to upload data to BigQuery
def upload_to_bigquery(df, table_id):
    # Define BigQuery schema
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("pageLocation", "STRING"),  # New dimension
        bigquery.SchemaField("totalUsers", "INTEGER"),
        bigquery.SchemaField("ecommercePurchases", "INTEGER"),
        bigquery.SchemaField("purchaseRevenue", "FLOAT"),
        bigquery.SchemaField("screenPageViews", "INTEGER"),  # New metric
        bigquery.SchemaField("eventCount", "INTEGER"),  # New metric
        bigquery.SchemaField("engagementRate", "FLOAT")  # New metric
    ]

    # Configure BigQuery job to partition the table by the 'date' column
    table_ref = f"{bq_client.project}.{config['DATASET_ID']}.{table_id}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload the DataFrame to BigQuery
    bq_client.load_table_from_dataframe(df, table_ref, job_config=job_config).result()
    print(f"Data uploaded and partitioned by date to {table_ref}")

# Main function
def main():
    try:
        # Authenticate GA4 using OAuth2
        creds = authenticate_ga4()
        client_ga4 = BetaAnalyticsDataClient(credentials=creds)

        # Fetch GA4 data
        ga4_response = get_ga4_report(client_ga4)

        # Convert the response to a DataFrame
        ga4_df = response_to_dataframe(ga4_response)

        # Define the BigQuery table ID and CSV filename (same as table ID)
        table_id = 'ga4_page_location_data'  # New table name
        csv_filename = f"{table_id}.csv"

        # Save the DataFrame to a CSV file
        ga4_df.to_csv(csv_filename, index=False)
        print(f"Data saved to {csv_filename}")

        # Upload the DataFrame to BigQuery
        upload_to_bigquery(ga4_df, table_id)
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()

In [None]:
import pandas as pd
from google.cloud import bigquery
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import DateRange, Dimension, Metric, RunReportRequest
from google.oauth2 import service_account
from google_auth_oauthlib.flow import Flow
import json
import os
import pickle

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# Authenticate with service account for BigQuery
creds1 = service_account.Credentials.from_service_account_file(
    config['SERVICE_ACCOUNT_FILE'],
    scopes=['https://www.googleapis.com/auth/analytics.readonly', 'https://www.googleapis.com/auth/bigquery']
)
bq_client = bigquery.Client(credentials=creds1, project=creds1.project_id)

# Authenticate for GA4 Analytics Data API using OAuth2
def authenticate_ga4():
    creds = None
    if os.path.exists('token.pickle'):
        with open('token.pickle', 'rb') as token:
            creds = pickle.load(token)
    else:
        flow = Flow.from_client_secrets_file(
            config['CLIENT_SECRET_FILE'],
            scopes=config['SCOPES'],
            redirect_uri='http://localhost:8080/'
        )
        auth_url, _ = flow.authorization_url(prompt='consent')
        print('Please go to this URL and finish the authentication: ', auth_url)
        code = input('Enter the authorization code: ')
        flow.fetch_token(code=code)
        creds = flow.credentials
        with open('token.pickle', 'wb') as token:
            pickle.dump(creds, token)
    return creds

# Function to paginate and fetch GA4 report data with logging
def run_report_with_pagination(client, request, limit=250000):
    all_rows = []
    offset = 0
    page_number = 1

    while True:
        # Apply offset and limit to request
        request.offset = offset
        request.limit = limit

        # Fetch report data
        response = client.run_report(request)
        all_rows.extend(response.rows)

        print(f"Fetching data... Page {page_number}, Offset: {offset}, Rows fetched: {len(response.rows)}")

        # If fewer rows are fetched than the limit, we're done
        if len(response.rows) < limit:
            break

        # Update offset and page number to get the next set of rows
        offset += limit
        page_number += 1

    return all_rows

# Function to fetch GA4 data using pagination
def get_ga4_report(client):
    """Fetches GA4 data based on the defined dimensions and metrics."""
    request = RunReportRequest(
        property=f'properties/{config["PROPERTY_ID"]}',
        date_ranges=[DateRange(start_date=config['INITIAL_FETCH_FROM_DATE'], end_date=config['FETCH_TO_DATE'])],
        dimensions=[
            Dimension(name='date'),
            Dimension(name='landingPage')
        ],
        metrics=[
            Metric(name='totalUsers'),
            Metric(name='ecommercePurchases'),
            Metric(name='purchaseRevenue'),
            Metric(name='sessions'),
            Metric(name='eventCount'),
            Metric(name='engagementRate')
        ]
    )
    return run_report_with_pagination(client, request)

# Function to convert GA4 response to a DataFrame
def response_to_dataframe(response):
    list_rows = []
    for row in response:
        try:
            date_value = pd.to_datetime(row.dimension_values[0].value, format='%Y%m%d')
        except ValueError:
            date_value = pd.NaT  # Use Not-a-Time for dates that fail to convert
        landing_page = row.dimension_values[1].value
        list_rows.append({
            'date': date_value,
            'landingPage': landing_page,
            'totalUsers': pd.to_numeric(row.metric_values[0].value, errors='coerce') or 0,
            'ecommercePurchases': pd.to_numeric(row.metric_values[1].value, errors='coerce') or 0,
            'purchaseRevenue': pd.to_numeric(row.metric_values[2].value, errors='coerce') or 0,
            'sessions': pd.to_numeric(row.metric_values[3].value, errors='coerce') or 0,
            'eventCount': pd.to_numeric(row.metric_values[4].value, errors='coerce') or 0,
            'engagementRate': pd.to_numeric(row.metric_values[5].value, errors='coerce') or 0
        })
    return pd.DataFrame(list_rows)

# Function to upload data to BigQuery
def upload_to_bigquery(df, table_id):
    # Define BigQuery schema
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("landingPage", "STRING"),
        bigquery.SchemaField("totalUsers", "INTEGER"),
        bigquery.SchemaField("ecommercePurchases", "INTEGER"),
        bigquery.SchemaField("purchaseRevenue", "FLOAT"),
        bigquery.SchemaField("sessions", "INTEGER"),
        bigquery.SchemaField("eventCount", "INTEGER"),
        bigquery.SchemaField("engagementRate", "FLOAT")
    ]

    # Configure BigQuery job to partition the table by the 'date' column
    table_ref = f"{bq_client.project}.{config['DATASET_ID']}.{table_id}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload the DataFrame to BigQuery
    bq_client.load_table_from_dataframe(df, table_ref, job_config=job_config).result()
    print(f"Data uploaded and partitioned by date to {table_ref}")

# Main function
def main():
    try:
        # Authenticate GA4 using OAuth2
        creds = authenticate_ga4()
        client_ga4 = BetaAnalyticsDataClient(credentials=creds)

        # Fetch GA4 data
        ga4_response = get_ga4_report(client_ga4)

        # Convert the response to a DataFrame
        ga4_df = response_to_dataframe(ga4_response)

        # Define the BigQuery table ID and CSV filename (same as table ID)
        table_id = 'ga4_landing_page_data'
        csv_filename = f"{table_id}.csv"

        # Save the DataFrame to a CSV file
        ga4_df.to_csv(csv_filename, index=False)
        print(f"Data saved to {csv_filename}")

        # Upload the DataFrame to BigQuery
        upload_to_bigquery(ga4_df, table_id)
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()