<a href="https://colab.research.google.com/github/alizvak/Backfill-GA4-to-BigQuery/blob/main/AIOA_PUP.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install google-analytics-data==0.18.4
!pip install google-cloud-bigquery
!pip install google-auth==2.27.0
!pip install google-auth-oauthlib
!pip install google-auth-httplib2



In [15]:
import pandas as pd
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import DateRange, Dimension, Metric, RunReportRequest
from google.oauth2 import service_account
from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
import os
import json
import logging

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# GA4 Authentication and Client Setup
SCOPES_GA = ['https://www.googleapis.com/auth/analytics.readonly', 'https://www.googleapis.com/auth/bigquery']
creds_ga = service_account.Credentials.from_service_account_file(
    config['SERVICE_ACCOUNT_FILE'], scopes=SCOPES_GA)
bq_client = bigquery.Client(credentials=creds_ga, project=config['BIGQUERY_PROJECT'])
client_ga4 = BetaAnalyticsDataClient(credentials=creds_ga)

# UA Variables
SCOPES_UA = ['https://www.googleapis.com/auth/analytics.readonly']
KEY_FILE_LOCATION = config['SERVICE_ACCOUNT_FILE']
VIEW_ID = config['VIEW_ID_UA']
BIGQUERY_PROJECT = config['BIGQUERY_PROJECT']
BIGQUERY_DATASET = config['DATASET_ID']

# Setting up the environment variable for Google Application Credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = KEY_FILE_LOCATION

def initialize_analyticsreporting():
    """Initializes the Google Analytics Reporting API client."""
    credentials = ServiceAccountCredentials.from_json_keyfile_name(KEY_FILE_LOCATION, SCOPES_UA)
    analytics = build('analyticsreporting', 'v4', credentials=credentials)
    return analytics

def get_ga4_report():
    """Fetches GA4 data based on the defined dimensions and metrics."""
    request = RunReportRequest(
        property=f'properties/{config["PROPERTY_ID_GA4"]}',
        date_ranges=[DateRange(start_date=config['INITIAL_FETCH_FROM_DATE'], end_date=config['FETCH_TO_DATE'])],
        dimensions=[Dimension(name='date'),
                    Dimension(name='country'),
                    Dimension(name='language')
                    ],
        metrics=[
            Metric(name='sessions'),
            Metric(name='screenPageViews'),
            Metric(name='totalUsers'),
            Metric(name='newUsers')
        ],
        limit=10000000
    )
    return client_ga4.run_report(request)

def get_ua_report(analytics):
    """Fetches the report data from Google Analytics UA."""
    return analytics.reports().batchGet(
        body={
            'reportRequests': [
                {
                    'viewId': VIEW_ID,
                    'dateRanges': [{'startDate': config['UA_INITIAL_FETCH_FROM_DATE'], 'endDate': config['UA_FETCH_TO_DATE']}],
                    'metrics': [
                        {'expression': 'ga:sessions'},
                        {'expression': 'ga:pageviews'},
                        {'expression': 'ga:users'},
                        {'expression': 'ga:newUsers'}
                    ],
                    'dimensions': [{'name': 'ga:date'},
                                   {'name': 'ga:country'},
                                   {'name': 'ga:language'}
                                   ],
                    'pageSize': 10000000
                }
            ]
        }
    ).execute()

def response_to_dataframe(response, api_type='GA4'):
    list_rows = []
    metric_map = {
        'ga:sessions': 'sessions',
        'ga:pageviews': 'screenPageViews',
        'ga:users': 'totalUsers',
        'ga:newUsers': 'newUsers'
    }

    if api_type == 'GA4':
        for row in response.rows:
            try:
                date_value = pd.to_datetime(row.dimension_values[0].value, format='%Y%m%d')
            except ValueError:
                date_value = pd.NaT  # Use Not-a-Time for dates that fail to convert
            dim1 = row.dimension_values[1].value
            dim2 = row.dimension_values[2].value
            list_rows.append({
                'date': date_value,
                'country': dim1,
                'language': dim2,
                'sessions': pd.to_numeric(row.metric_values[0].value, errors='coerce') or 0,
                'screenPageViews': pd.to_numeric(row.metric_values[1].value, errors='coerce') or 0,
                'totalUsers': pd.to_numeric(row.metric_values[2].value, errors='coerce') or 0,
                'newUsers': pd.to_numeric(row.metric_values[3].value, errors='coerce') or 0
            })
    else:
        for report in response.get('reports', []):
            columnHeader = report.get('columnHeader', {})
            dimensionHeaders = columnHeader.get('dimensions', [])
            metricHeaders = columnHeader.get('metricHeader', {}).get('metricHeaderEntries', [])
            for row in report.get('data', {}).get('rows', []):
                try:
                    date_value = pd.to_datetime(row['dimensions'][0], format='%Y%m%d')
                except ValueError:
                    date_value = pd.NaT
                dim1 = row['dimensions'][1]
                dim2 = row['dimensions'][2]
                row_data = {'date': date_value, 'country': dim1 , 'language': dim2 }
                for i, metric in enumerate(metricHeaders):
                    mapped_name = metric_map.get(metric['name'])
                    if mapped_name:
                        row_data[mapped_name] = pd.to_numeric(row['metrics'][0]['values'][i], errors='coerce', downcast='float') or 0
                list_rows.append(row_data)
    return pd.DataFrame(list_rows)


def unify_and_upload_data(ga4_df, ua_df):
    # Concatenate GA4 and UA data
    unified_df = pd.concat([ga4_df, ua_df], ignore_index=True)

    # Fill NaN values with 0 to maintain uniformity
    unified_df.fillna(0, inplace=True)

    # Ensure 'date' remains as date object and not converted mistakenly
    unified_df['date'] = pd.to_datetime(unified_df['date'])

    # Convert data types
    unified_df = unified_df.astype({
        'sessions': 'int64',
        'screenPageViews': 'int64',
        'totalUsers': 'int64',
        'newUsers': 'int64'
    })

    print(unified_df.dtypes)


    # Create schema for the unified DataFrame
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("country", "STRING"),
        bigquery.SchemaField("language", "STRING"),
        bigquery.SchemaField("sessions", "INTEGER"),
        bigquery.SchemaField("screenPageViews", "INTEGER"),
        bigquery.SchemaField("totalUsers", "INTEGER"),
        bigquery.SchemaField("newUsers", "INTEGER")
    ]

    # Configure BigQuery job to partition the table by the 'date' column
    table_id = 'unified_data_geo_network'
    table_ref = f"{bq_client.project}.{BIGQUERY_DATASET}.{table_id}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload to BigQuery
    bq_client.load_table_from_dataframe(unified_df, table_ref, job_config=job_config).result()
    print(f"Unified data uploaded and partitioned by date to {table_ref}")

def main():
    try:
        # GA4 processing
        ga4_response = get_ga4_report()
        ga4_df = response_to_dataframe(ga4_response, api_type='GA4')

        # UA processing
        analytics = initialize_analyticsreporting()
        ua_response = get_ua_report(analytics)
        ua_df = response_to_dataframe(ua_response, api_type='UA')

        # Unify and upload the data
        unify_and_upload_data(ga4_df, ua_df)
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()

date               datetime64[ns]
country                    object
language                   object
sessions                    int64
screenPageViews             int64
totalUsers                  int64
newUsers                    int64
dtype: object
Unified data uploaded and partitioned by date to pup-data-warehouse.Siavak_Backfill_Data.unified_data_geo_network


In [16]:
import pandas as pd
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import DateRange, Dimension, Metric, RunReportRequest
from google.oauth2 import service_account
from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
import os
import json
import logging

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# GA4 Authentication and Client Setup
SCOPES_GA = ['https://www.googleapis.com/auth/analytics.readonly', 'https://www.googleapis.com/auth/bigquery']
creds_ga = service_account.Credentials.from_service_account_file(
    config['SERVICE_ACCOUNT_FILE'], scopes=SCOPES_GA)
bq_client = bigquery.Client(credentials=creds_ga, project=config['BIGQUERY_PROJECT'])
client_ga4 = BetaAnalyticsDataClient(credentials=creds_ga)

# UA Variables
SCOPES_UA = ['https://www.googleapis.com/auth/analytics.readonly']
KEY_FILE_LOCATION = config['SERVICE_ACCOUNT_FILE']
VIEW_ID = config['VIEW_ID_UA']
BIGQUERY_PROJECT = config['BIGQUERY_PROJECT']
BIGQUERY_DATASET = config['DATASET_ID']

# Setting up the environment variable for Google Application Credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = KEY_FILE_LOCATION

def initialize_analyticsreporting():
    """Initializes the Google Analytics Reporting API client."""
    credentials = ServiceAccountCredentials.from_json_keyfile_name(KEY_FILE_LOCATION, SCOPES_UA)
    analytics = build('analyticsreporting', 'v4', credentials=credentials)
    return analytics

def get_ga4_report():
    """Fetches GA4 data based on the defined dimensions and metrics."""
    request = RunReportRequest(
        property=f'properties/{config["PROPERTY_ID_GA4"]}',
        date_ranges=[DateRange(start_date=config['INITIAL_FETCH_FROM_DATE'], end_date=config['FETCH_TO_DATE'])],
        dimensions=[Dimension(name='date'),
                    Dimension(name='sessionSource'),
                    Dimension(name='eventName'),
                    Dimension(name='sessionMedium')],
        metrics=[
            Metric(name='sessions'),
            Metric(name='eventCount'),
            Metric(name='totalUsers')
        ],
        limit=10000000
    )
    return client_ga4.run_report(request)

def get_ua_report(analytics):
    """Fetches the report data from Google Analytics UA."""
    return analytics.reports().batchGet(
        body={
            'reportRequests': [
                {
                    'viewId': VIEW_ID,
                    'dateRanges': [{'startDate': config['UA_INITIAL_FETCH_FROM_DATE'], 'endDate': config['UA_FETCH_TO_DATE']}],
                    'metrics': [
                        {'expression': 'ga:goal4Completions'},
                    ],
                    'dimensions': [{'name': 'ga:date'},
                                   {'name': 'ga:source'},
                                   {'name': 'ga:hostname'},
                                   {'name': 'ga:medium'},],
                    'pageSize': 10000000
                }
            ]
        }
    ).execute()

def response_to_dataframe(response, api_type='GA4'):
    list_rows = []
    metric_map = {
        'ga:goal4Completions': 'eventCount'   }

    if api_type == 'GA4':
        for row in response.rows:
            try:
                date_value = pd.to_datetime(row.dimension_values[0].value, format='%Y%m%d')
            except ValueError:
                date_value = pd.NaT  # Use Not-a-Time for dates that fail to convert
            dim1 = row.dimension_values[1].value
            dim2 = row.dimension_values[2].value
            dim3 = row.dimension_values[3].value
            list_rows.append({
                'date': date_value,
                'sessionSource': dim1,
                'eventName': dim2,
                'sessionMedium': dim3,
                'eventCount': pd.to_numeric(row.metric_values[0].value, errors='coerce') or 0
            })
    else:
        for report in response.get('reports', []):
            columnHeader = report.get('columnHeader', {})
            dimensionHeaders = columnHeader.get('dimensions', [])
            metricHeaders = columnHeader.get('metricHeader', {}).get('metricHeaderEntries', [])
            for row in report.get('data', {}).get('rows', []):
                try:
                    date_value = pd.to_datetime(row['dimensions'][0], format='%Y%m%d')
                except ValueError:
                    date_value = pd.NaT
                dim1 = row['dimensions'][1]
                dim2 = row['dimensions'][2]
                dim3 = row['dimensions'][3]
                row_data = {'date': date_value, 'sessionSource': dim1 , 'eventName': dim2 , 'sessionMedium': dim3}
                for i, metric in enumerate(metricHeaders):
                    mapped_name = metric_map.get(metric['name'])
                    if mapped_name:
                        row_data[mapped_name] = pd.to_numeric(row['metrics'][0]['values'][i], errors='coerce', downcast='float') or 0
                list_rows.append(row_data)
    return pd.DataFrame(list_rows)

def unify_and_upload_data(ga4_df, ua_df):
    # Concatenate GA4 and UA data
    unified_df = pd.concat([ga4_df, ua_df], ignore_index=True)

    # Fill NaN values with 0 to maintain uniformity
    unified_df.fillna(0, inplace=True)

    # Ensure 'date' remains as date object and not converted mistakenly
    unified_df['date'] = pd.to_datetime(unified_df['date'])

    # Convert data types
    unified_df = unified_df.astype({
        'eventCount': 'int64'   })

    print(unified_df.dtypes)

    # Schema and upload as before
    # You can continue from here...


    # Create schema for the unified DataFrame
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("sessionSource", "STRING"),
        bigquery.SchemaField("eventName", "STRING"),
        bigquery.SchemaField("sessionMedium", "STRING"),
        bigquery.SchemaField("eventCount", "INTEGER")   ]

    # Configure BigQuery job to partition the table by the 'date' column
    table_id = 'unified_data_goal_events'
    table_ref = f"{bq_client.project}.{BIGQUERY_DATASET}.{table_id}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload to BigQuery
    bq_client.load_table_from_dataframe(unified_df, table_ref, job_config=job_config).result()
    print(f"Unified data uploaded and partitioned by date to {table_ref}")

def main():
    try:
        # GA4 processing
        ga4_response = get_ga4_report()
        ga4_df = response_to_dataframe(ga4_response, api_type='GA4')

        # UA processing
        analytics = initialize_analyticsreporting()
        ua_response = get_ua_report(analytics)
        ua_df = response_to_dataframe(ua_response, api_type='UA')

        # Unify and upload the data
        unify_and_upload_data(ga4_df, ua_df)
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()

date             datetime64[ns]
sessionSource            object
eventName                object
sessionMedium            object
eventCount                int64
dtype: object
Unified data uploaded and partitioned by date to pup-data-warehouse.Siavak_Backfill_Data.unified_data_goal_events


In [17]:
import pandas as pd
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import DateRange, Dimension, Metric, RunReportRequest
from google.oauth2 import service_account
from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
import os
import json
import logging

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# GA4 Authentication and Client Setup
SCOPES_GA = ['https://www.googleapis.com/auth/analytics.readonly', 'https://www.googleapis.com/auth/bigquery']
creds_ga = service_account.Credentials.from_service_account_file(
    config['SERVICE_ACCOUNT_FILE'], scopes=SCOPES_GA)
bq_client = bigquery.Client(credentials=creds_ga, project=config['BIGQUERY_PROJECT'])
client_ga4 = BetaAnalyticsDataClient(credentials=creds_ga)

# UA Variables
SCOPES_UA = ['https://www.googleapis.com/auth/analytics.readonly']
KEY_FILE_LOCATION = config['SERVICE_ACCOUNT_FILE']
VIEW_ID = config['VIEW_ID_UA']
BIGQUERY_PROJECT = config['BIGQUERY_PROJECT']
BIGQUERY_DATASET = config['DATASET_ID']

# Setting up the environment variable for Google Application Credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = KEY_FILE_LOCATION

def initialize_analyticsreporting():
    """Initializes the Google Analytics Reporting API client."""
    credentials = ServiceAccountCredentials.from_json_keyfile_name(KEY_FILE_LOCATION, SCOPES_UA)
    analytics = build('analyticsreporting', 'v4', credentials=credentials)
    return analytics

def get_ga4_report():
    """Fetches GA4 data based on the defined dimensions and metrics."""
    request = RunReportRequest(
        property=f'properties/{config["PROPERTY_ID_GA4"]}',
        date_ranges=[DateRange(start_date=config['INITIAL_FETCH_FROM_DATE'], end_date=config['FETCH_TO_DATE'])],
        dimensions=[Dimension(name='date'),
                    Dimension(name='itemName')

                    ],
        metrics=[

            Metric(name='itemPurchaseQuantity'),
            Metric(name='itemRevenue'),
        ],
        limit=10000000
    )
    return client_ga4.run_report(request)

def get_ua_report(analytics):
    """Fetches the report data from Google Analytics UA."""
    return analytics.reports().batchGet(
        body={
            'reportRequests': [
                {
                    'viewId': VIEW_ID,
                    'dateRanges': [{'startDate': config['UA_INITIAL_FETCH_FROM_DATE'], 'endDate': config['UA_FETCH_TO_DATE']}],
                    'metrics': [

                        {'expression': 'ga:itemQuantity'},
                        {'expression': 'ga:itemRevenue'}
                    ],
                    'dimensions': [{'name': 'ga:date'},
                                   {'name': 'ga:productName'}

                                   ],
                    'pageSize': 10000000
                }
            ]
        }
    ).execute()

def response_to_dataframe(response, api_type='GA4'):
    list_rows = []
    metric_map = {

        'ga:itemQuantity': 'itemPurchaseQuantity',
        'ga:itemRevenue': 'itemRevenue'
    }

    if api_type == 'GA4':
        for row in response.rows:
            try:
                date_value = pd.to_datetime(row.dimension_values[0].value, format='%Y%m%d')
            except ValueError:
                date_value = pd.NaT  # Use Not-a-Time for dates that fail to convert
            dim1 = row.dimension_values[1].value

            list_rows.append({
                'date': date_value,
                'itemName': dim1,

                'itemPurchaseQuantity': pd.to_numeric(row.metric_values[0].value, errors='coerce') or 0,
                'itemRevenue': pd.to_numeric(row.metric_values[1].value, errors='coerce') or 0

            })
    else:
        for report in response.get('reports', []):
            columnHeader = report.get('columnHeader', {})
            dimensionHeaders = columnHeader.get('dimensions', [])
            metricHeaders = columnHeader.get('metricHeader', {}).get('metricHeaderEntries', [])
            for row in report.get('data', {}).get('rows', []):
                try:
                    date_value = pd.to_datetime(row['dimensions'][0], format='%Y%m%d')
                except ValueError:
                    date_value = pd.NaT
                dim1 = row['dimensions'][1]

                row_data = {'date': date_value, 'itemName': dim1 }
                for i, metric in enumerate(metricHeaders):
                    mapped_name = metric_map.get(metric['name'])
                    if mapped_name:
                        row_data[mapped_name] = pd.to_numeric(row['metrics'][0]['values'][i], errors='coerce', downcast='float') or 0
                list_rows.append(row_data)
    return pd.DataFrame(list_rows)


def unify_and_upload_data(ga4_df, ua_df):
    # Concatenate GA4 and UA data
    unified_df = pd.concat([ga4_df, ua_df], ignore_index=True)

    # Fill NaN values with 0 to maintain uniformity
    unified_df.fillna(0, inplace=True)

    # Ensure 'date' remains as date object and not converted mistakenly
    unified_df['date'] = pd.to_datetime(unified_df['date'])

    # Convert data types
    unified_df = unified_df.astype({

        'itemPurchaseQuantity': 'int64',
        'itemRevenue': 'float'
    })

    print(unified_df.dtypes)


    # Create schema for the unified DataFrame
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("itemName", "STRING"),

        bigquery.SchemaField("itemPurchaseQuantity", "INTEGER"),
        bigquery.SchemaField("itemRevenue", "FLOAT")
    ]

    # Configure BigQuery job to partition the table by the 'date' column
    table_id = 'unified_data_items'
    table_ref = f"{bq_client.project}.{BIGQUERY_DATASET}.{table_id}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload to BigQuery
    bq_client.load_table_from_dataframe(unified_df, table_ref, job_config=job_config).result()
    print(f"Unified data uploaded and partitioned by date to {table_ref}")

def main():
    try:
        # GA4 processing
        ga4_response = get_ga4_report()
        ga4_df = response_to_dataframe(ga4_response, api_type='GA4')

        # UA processing
        analytics = initialize_analyticsreporting()
        ua_response = get_ua_report(analytics)
        ua_df = response_to_dataframe(ua_response, api_type='UA')

        # Unify and upload the data
        unify_and_upload_data(ga4_df, ua_df)
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()

Error occurred: 'date'


In [4]:
import pandas as pd
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import DateRange, Dimension, Metric, RunReportRequest
from google.oauth2 import service_account
from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
import os
import json
import logging

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# GA4 Authentication and Client Setup
SCOPES_GA = ['https://www.googleapis.com/auth/analytics.readonly', 'https://www.googleapis.com/auth/bigquery']
creds_ga = service_account.Credentials.from_service_account_file(
    config['SERVICE_ACCOUNT_FILE'], scopes=SCOPES_GA)
bq_client = bigquery.Client(credentials=creds_ga, project=config['BIGQUERY_PROJECT'])
client_ga4 = BetaAnalyticsDataClient(credentials=creds_ga)

# UA Variables
SCOPES_UA = ['https://www.googleapis.com/auth/analytics.readonly']
KEY_FILE_LOCATION = config['SERVICE_ACCOUNT_FILE']
VIEW_ID = config['VIEW_ID_UA']
BIGQUERY_PROJECT = config['BIGQUERY_PROJECT']
BIGQUERY_DATASET = config['DATASET_ID']

# Setting up the environment variable for Google Application Credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = KEY_FILE_LOCATION

def initialize_analyticsreporting():
    """Initializes the Google Analytics Reporting API client."""
    credentials = ServiceAccountCredentials.from_json_keyfile_name(KEY_FILE_LOCATION, SCOPES_UA)
    analytics = build('analyticsreporting', 'v4', credentials=credentials)
    return analytics

def get_ga4_report():
    """Fetches GA4 data based on the defined dimensions and metrics."""
    request = RunReportRequest(
        property=f'properties/{config["PROPERTY_ID_GA4"]}',
        date_ranges=[DateRange(start_date=config['INITIAL_FETCH_FROM_DATE'], end_date=config['FETCH_TO_DATE'])],
        dimensions=[Dimension(name='date'),
                    Dimension(name='landingPage')

                    ],
        metrics=[
            Metric(name='sessions'),
            Metric(name='totalUsers'),
            Metric(name='newUsers'),
            Metric(name='ecommercePurchases'),
            Metric(name='purchaseRevenue'),
        ],
        limit=10000000
    )
    return client_ga4.run_report(request)

def get_ua_report(analytics):
    """Fetches the report data from Google Analytics UA."""
    return analytics.reports().batchGet(
        body={
            'reportRequests': [
                {
                    'viewId': VIEW_ID,
                    'dateRanges': [{'startDate': config['UA_INITIAL_FETCH_FROM_DATE'], 'endDate': config['UA_FETCH_TO_DATE']}],
                    'metrics': [
                        {'expression': 'ga:sessions'},
                        {'expression': 'ga:users'},
                        {'expression': 'ga:newUsers'},
                        {'expression': 'ga:uniquePurchases'},
                        {'expression': 'ga:transactionRevenue'}
                    ],
                    'dimensions': [{'name': 'ga:date'},
                                   {'name': 'ga:landingPagePath'}

                                   ],
                    'pageSize': 10000000
                }
            ]
        }
    ).execute()

def response_to_dataframe(response, api_type='GA4'):
    list_rows = []
    metric_map = {
        'ga:sessions': 'sessions',
        'ga:users': 'totalUsers',
        'ga:newUsers': 'newUsers',
        'ga:uniquePurchases': 'ecommercePurchases',
        'ga:transactionRevenue': 'purchaseRevenue'
    }

    if api_type == 'GA4':
        for row in response.rows:
            try:
                date_value = pd.to_datetime(row.dimension_values[0].value, format='%Y%m%d')
            except ValueError:
                date_value = pd.NaT  # Use Not-a-Time for dates that fail to convert
            dim1 = row.dimension_values[1].value

            list_rows.append({
                'date': date_value,
                'landingPage': dim1,

                'sessions': pd.to_numeric(row.metric_values[0].value, errors='coerce') or 0,
                'totalUsers': pd.to_numeric(row.metric_values[1].value, errors='coerce') or 0,
                 'newUsers': pd.to_numeric(row.metric_values[2].value, errors='coerce') or 0,
                'ecommercePurchases': pd.to_numeric(row.metric_values[3].value, errors='coerce') or 0,
                'purchaseRevenue': pd.to_numeric(row.metric_values[4].value, errors='coerce') or 0

            })
    else:
        for report in response.get('reports', []):
            columnHeader = report.get('columnHeader', {})
            dimensionHeaders = columnHeader.get('dimensions', [])
            metricHeaders = columnHeader.get('metricHeader', {}).get('metricHeaderEntries', [])
            for row in report.get('data', {}).get('rows', []):
                try:
                    date_value = pd.to_datetime(row['dimensions'][0], format='%Y%m%d')
                except ValueError:
                    date_value = pd.NaT
                dim1 = row['dimensions'][1]

                row_data = {'date': date_value, 'landingPage': dim1 }
                for i, metric in enumerate(metricHeaders):
                    mapped_name = metric_map.get(metric['name'])
                    if mapped_name:
                        row_data[mapped_name] = pd.to_numeric(row['metrics'][0]['values'][i], errors='coerce', downcast='float') or 0
                list_rows.append(row_data)
    return pd.DataFrame(list_rows)


def unify_and_upload_data(ga4_df, ua_df):
    # Concatenate GA4 and UA data
    unified_df = pd.concat([ga4_df, ua_df], ignore_index=True)

    # Fill NaN values with 0 to maintain uniformity
    unified_df.fillna(0, inplace=True)

    # Ensure 'date' remains as date object and not converted mistakenly
    unified_df['date'] = pd.to_datetime(unified_df['date'])

    # Convert data types
    unified_df = unified_df.astype({
        'sessions': 'int64',
        'totalUsers': 'int64',
        'newUsers': 'int64',
        'ecommercePurchases': 'int64',
        'purchaseRevenue': 'float'
    })

    print(unified_df.dtypes)


    # Create schema for the unified DataFrame
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("landingPage", "STRING"),
        bigquery.SchemaField("sessions", "INTEGER"),
        bigquery.SchemaField("totalUsers", "INTEGER"),
        bigquery.SchemaField("newUsers", "INTEGER"),
        bigquery.SchemaField("ecommercePurchases", "INTEGER"),
        bigquery.SchemaField("purchaseRevenue", "FLOAT")
    ]

    # Configure BigQuery job to partition the table by the 'date' column
    table_id = 'unified_data_landingpagepath'
    table_ref = f"{bq_client.project}.{BIGQUERY_DATASET}.{table_id}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload to BigQuery
    bq_client.load_table_from_dataframe(unified_df, table_ref, job_config=job_config).result()
    print(f"Unified data uploaded and partitioned by date to {table_ref}")

def main():
    try:
        # GA4 processing
        ga4_response = get_ga4_report()
        ga4_df = response_to_dataframe(ga4_response, api_type='GA4')

        # UA processing
        analytics = initialize_analyticsreporting()
        ua_response = get_ua_report(analytics)
        ua_df = response_to_dataframe(ua_response, api_type='UA')

        # Unify and upload the data
        unify_and_upload_data(ga4_df, ua_df)
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()

date                  datetime64[ns]
landingPage                   object
sessions                       int64
totalUsers                     int64
newUsers                       int64
ecommercePurchases             int64
purchaseRevenue              float64
dtype: object
Unified data uploaded and partitioned by date to pup-data-warehouse.Siavak_Backfill_Data.unified_data_landingpagepath


In [None]:
import pandas as pd
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import DateRange, Dimension, Metric, RunReportRequest
from google.oauth2 import service_account
from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
import os
import json
import logging

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# GA4 Authentication and Client Setup
SCOPES_GA = ['https://www.googleapis.com/auth/analytics.readonly', 'https://www.googleapis.com/auth/bigquery']
creds_ga = service_account.Credentials.from_service_account_file(
    config['SERVICE_ACCOUNT_FILE'], scopes=SCOPES_GA)
bq_client = bigquery.Client(credentials=creds_ga, project=config['BIGQUERY_PROJECT'])
client_ga4 = BetaAnalyticsDataClient(credentials=creds_ga)

# UA Variables
SCOPES_UA = ['https://www.googleapis.com/auth/analytics.readonly']
KEY_FILE_LOCATION = config['SERVICE_ACCOUNT_FILE']
VIEW_ID = config['VIEW_ID_UA']
BIGQUERY_PROJECT = config['BIGQUERY_PROJECT']
BIGQUERY_DATASET = config['DATASET_ID']

# Setting up the environment variable for Google Application Credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = KEY_FILE_LOCATION

def initialize_analyticsreporting():
    """Initializes the Google Analytics Reporting API client."""
    credentials = ServiceAccountCredentials.from_json_keyfile_name(KEY_FILE_LOCATION, SCOPES_UA)
    analytics = build('analyticsreporting', 'v4', credentials=credentials)
    return analytics

def get_ga4_report():
    """Fetches GA4 data based on the defined dimensions and metrics."""
    request = RunReportRequest(
        property=f'properties/{config["PROPERTY_ID_GA4"]}',
        date_ranges=[DateRange(start_date=config['INITIAL_FETCH_FROM_DATE'], end_date=config['FETCH_TO_DATE'])],
        dimensions=[Dimension(name='date'),
                    Dimension(name='pagePathPlusQueryString')

                    ],
        metrics=[
            Metric(name='screenPageViews'),
            Metric(name='totalUsers')

        ],
        limit=10000000
    )
    return client_ga4.run_report(request)

def get_ua_report(analytics):
    """Fetches the report data from Google Analytics UA."""
    return analytics.reports().batchGet(
        body={
            'reportRequests': [
                {
                    'viewId': VIEW_ID,
                    'dateRanges': [{'startDate': config['UA_INITIAL_FETCH_FROM_DATE'], 'endDate': config['UA_FETCH_TO_DATE']}],
                    'metrics': [
                        {'expression': 'ga:pageviews'},
                        {'expression': 'ga:users'}

                    ],
                    'dimensions': [{'name': 'ga:date'},
                                   {'name': 'ga:pagePath'}

                                   ],
                    'pageSize': 10000000
                }
            ]
        }
    ).execute()

def response_to_dataframe(response, api_type='GA4'):
    list_rows = []
    metric_map = {
        'ga:pageviews': 'screenPageViews',
        'ga:users': 'totalUsers'

    }

    if api_type == 'GA4':
        for row in response.rows:
            try:
                date_value = pd.to_datetime(row.dimension_values[0].value, format='%Y%m%d')
            except ValueError:
                date_value = pd.NaT  # Use Not-a-Time for dates that fail to convert
            dim1 = row.dimension_values[1].value

            list_rows.append({
                'date': date_value,
                'pagePathPlusQueryString': dim1,

                'screenPageViews': pd.to_numeric(row.metric_values[0].value, errors='coerce') or 0,
                'totalUsers': pd.to_numeric(row.metric_values[1].value, errors='coerce') or 0


            })
    else:
        for report in response.get('reports', []):
            columnHeader = report.get('columnHeader', {})
            dimensionHeaders = columnHeader.get('dimensions', [])
            metricHeaders = columnHeader.get('metricHeader', {}).get('metricHeaderEntries', [])
            for row in report.get('data', {}).get('rows', []):
                try:
                    date_value = pd.to_datetime(row['dimensions'][0], format='%Y%m%d')
                except ValueError:
                    date_value = pd.NaT
                dim1 = row['dimensions'][1]

                row_data = {'date': date_value, 'pagePathPlusQueryString': dim1 }
                for i, metric in enumerate(metricHeaders):
                    mapped_name = metric_map.get(metric['name'])
                    if mapped_name:
                        row_data[mapped_name] = pd.to_numeric(row['metrics'][0]['values'][i], errors='coerce', downcast='float') or 0
                list_rows.append(row_data)
    return pd.DataFrame(list_rows)


def unify_and_upload_data(ga4_df, ua_df):
    # Concatenate GA4 and UA data
    unified_df = pd.concat([ga4_df, ua_df], ignore_index=True)

    # Fill NaN values with 0 to maintain uniformity
    unified_df.fillna(0, inplace=True)

    # Ensure 'date' remains as date object and not converted mistakenly
    unified_df['date'] = pd.to_datetime(unified_df['date'])

    # Convert data types
    unified_df = unified_df.astype({
        'screenPageViews': 'int64',
        'totalUsers': 'int64'

    })

    print(unified_df.dtypes)


    # Create schema for the unified DataFrame
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("pagePathPlusQueryString", "STRING"),
        bigquery.SchemaField("screenPageViews", "INTEGER"),
        bigquery.SchemaField("totalUsers", "INTEGER")

    ]

    # Configure BigQuery job to partition the table by the 'date' column
    table_id = 'unified_data_pagepath'
    table_ref = f"{bq_client.project}.{BIGQUERY_DATASET}.{table_id}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload to BigQuery
    bq_client.load_table_from_dataframe(unified_df, table_ref, job_config=job_config).result()
    print(f"Unified data uploaded and partitioned by date to {table_ref}")

def main():
    try:
        # GA4 processing
        ga4_response = get_ga4_report()
        ga4_df = response_to_dataframe(ga4_response, api_type='GA4')

        # UA processing
        analytics = initialize_analyticsreporting()
        ua_response = get_ua_report(analytics)
        ua_df = response_to_dataframe(ua_response, api_type='UA')

        # Unify and upload the data
        unify_and_upload_data(ga4_df, ua_df)
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()

In [6]:
import pandas as pd
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import DateRange, Dimension, Metric, RunReportRequest
from google.oauth2 import service_account
from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
import os
import json
import logging

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# GA4 Authentication and Client Setup
SCOPES_GA = ['https://www.googleapis.com/auth/analytics.readonly', 'https://www.googleapis.com/auth/bigquery']
creds_ga = service_account.Credentials.from_service_account_file(
    config['SERVICE_ACCOUNT_FILE'], scopes=SCOPES_GA)
bq_client = bigquery.Client(credentials=creds_ga, project=config['BIGQUERY_PROJECT'])
client_ga4 = BetaAnalyticsDataClient(credentials=creds_ga)

# UA Variables
SCOPES_UA = ['https://www.googleapis.com/auth/analytics.readonly']
KEY_FILE_LOCATION = config['SERVICE_ACCOUNT_FILE']
VIEW_ID = config['VIEW_ID_UA']
BIGQUERY_PROJECT = config['BIGQUERY_PROJECT']
BIGQUERY_DATASET = config['DATASET_ID']

# Setting up the environment variable for Google Application Credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = KEY_FILE_LOCATION

def initialize_analyticsreporting():
    """Initializes the Google Analytics Reporting API client."""
    credentials = ServiceAccountCredentials.from_json_keyfile_name(KEY_FILE_LOCATION, SCOPES_UA)
    analytics = build('analyticsreporting', 'v4', credentials=credentials)
    return analytics

def get_ga4_report():
    """Fetches GA4 data based on the defined dimensions and metrics."""
    request = RunReportRequest(
        property=f'properties/{config["PROPERTY_ID_GA4"]}',
        date_ranges=[DateRange(start_date=config['INITIAL_FETCH_FROM_DATE'], end_date=config['FETCH_TO_DATE'])],
        dimensions=[Dimension(name='date')],
        metrics=[Metric(name='screenPageViews')],
        limit=100000
    )
    return client_ga4.run_report(request)

def get_ua_report(analytics):
    """Fetches the report data from Google Analytics UA."""
    return analytics.reports().batchGet(
        body={
            'reportRequests': [
                {
                    'viewId': VIEW_ID,
                    'dateRanges': [{'startDate': config['UA_INITIAL_FETCH_FROM_DATE'], 'endDate': config['UA_FETCH_TO_DATE']}],
                    'metrics': [{'expression': 'ga:pageviews'}],
                    'dimensions': [{'name': 'ga:date'}],
                    'pageSize': 10000
                }
            ]
        }
    ).execute()



def response_to_dataframe(response, api_type='GA4'):
    if api_type == 'GA4':
        list_rows = []
        for row in response.rows:
            row_data = {
                'date': row.dimension_values[0].value,
                'screenPageViews': int(row.metric_values[0].value)
            }
            list_rows.append(row_data)
        return pd.DataFrame(list_rows)
    else:
        list_rows = []
        for report in response.get('reports', []):
            columnHeader = report.get('columnHeader', {})
            dimensionHeaders = columnHeader.get('dimensions', [])
            metricHeaders = columnHeader.get('metricHeader', {}).get('metricHeaderEntries', [])
            for row in report.get('data', {}).get('rows', []):
                row_data = {header: row['dimensions'][i] for i, header in enumerate(dimensionHeaders)}
                for metric, values in zip(metricHeaders, row['metrics'][0]['values']):
                    row_data[metric['name']] = values
                list_rows.append(row_data)
        return pd.DataFrame(list_rows)

def unify_and_upload_data(ga4_df, ua_df):
    # Standardize column names for UA
    ua_df.columns = ['date', 'screenPageViews']

    # Concatenate GA4 and UA data
    unified_df = pd.concat([ga4_df, ua_df], ignore_index=True)

    # Convert the 'date' column from string to datetime
    unified_df['date'] = pd.to_datetime(unified_df['date']).dt.date  # Ensuring date format

    # Ensure data types match expected BigQuery types
    unified_df['screenPageViews'] = unified_df['screenPageViews'].astype(int)  # Ensuring integers

    # Print data types for verification
    print(unified_df.dtypes)

    # Create schema for the unified DataFrame
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("screenPageViews", "INTEGER")
    ]

    # Configure BigQuery job to partition the table by the 'date' column
    table_id = 'unified_data_pageviews'
    table_ref = f"{bq_client.project}.{BIGQUERY_DATASET}.{table_id}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload to BigQuery
    bq_client.load_table_from_dataframe(unified_df, table_ref, job_config=job_config).result()
    print(f"Unified data uploaded and partitioned by date to {table_ref}")


def main():
    try:
        # GA4 processing
        ga4_response = get_ga4_report()
        ga4_df = response_to_dataframe(ga4_response, api_type='GA4')

        # UA processing
        analytics = initialize_analyticsreporting()
        ua_response = get_ua_report(analytics)
        ua_df = response_to_dataframe(ua_response, api_type='UA')

        # Unify and upload the data
        unify_and_upload_data(ga4_df, ua_df)
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()

date               object
screenPageViews     int64
dtype: object
Unified data uploaded and partitioned by date to pup-data-warehouse.Siavak_Backfill_Data.unified_data_pageviews


In [8]:
import pandas as pd
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import DateRange, Dimension, Metric, RunReportRequest
from google.oauth2 import service_account
from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
import os
import json
import logging

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# GA4 Authentication and Client Setup
SCOPES_GA = ['https://www.googleapis.com/auth/analytics.readonly', 'https://www.googleapis.com/auth/bigquery']
creds_ga = service_account.Credentials.from_service_account_file(
    config['SERVICE_ACCOUNT_FILE'], scopes=SCOPES_GA)
bq_client = bigquery.Client(credentials=creds_ga, project=config['BIGQUERY_PROJECT'])
client_ga4 = BetaAnalyticsDataClient(credentials=creds_ga)

# UA Variables
SCOPES_UA = ['https://www.googleapis.com/auth/analytics.readonly']
KEY_FILE_LOCATION = config['SERVICE_ACCOUNT_FILE']
VIEW_ID = config['VIEW_ID_UA']
BIGQUERY_PROJECT = config['BIGQUERY_PROJECT']
BIGQUERY_DATASET = config['DATASET_ID']

# Setting up the environment variable for Google Application Credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = KEY_FILE_LOCATION

def initialize_analyticsreporting():
    """Initializes the Google Analytics Reporting API client."""
    credentials = ServiceAccountCredentials.from_json_keyfile_name(KEY_FILE_LOCATION, SCOPES_UA)
    analytics = build('analyticsreporting', 'v4', credentials=credentials)
    return analytics

def get_ga4_report():
    """Fetches GA4 data based on the defined dimensions and metrics."""
    request = RunReportRequest(
        property=f'properties/{config["PROPERTY_ID_GA4"]}',
        date_ranges=[DateRange(start_date=config['INITIAL_FETCH_FROM_DATE'], end_date=config['FETCH_TO_DATE'])],
        dimensions=[Dimension(name='date'),
                    Dimension(name='browser'),
                    Dimension(name='operatingSystem'),
                    Dimension(name='deviceCategory')],
        metrics=[
            Metric(name='sessions'),
            Metric(name='screenPageViews'),
            Metric(name='totalUsers'),
            Metric(name='newUsers')
        ],
        limit=10000000
    )
    return client_ga4.run_report(request)

def get_ua_report(analytics):
    """Fetches the report data from Google Analytics UA."""
    return analytics.reports().batchGet(
        body={
            'reportRequests': [
                {
                    'viewId': VIEW_ID,
                    'dateRanges': [{'startDate': config['UA_INITIAL_FETCH_FROM_DATE'], 'endDate': config['UA_FETCH_TO_DATE']}],
                    'metrics': [
                        {'expression': 'ga:sessions'},
                        {'expression': 'ga:pageviews'},
                        {'expression': 'ga:users'},
                        {'expression': 'ga:newUsers'}
                    ],
                    'dimensions': [{'name': 'ga:date'},
                                   {'name': 'ga:browser'},
                                   {'name': 'ga:operatingSystem'},
                                   {'name': 'ga:deviceCategory'},],
                    'pageSize': 10000000
                }
            ]
        }
    ).execute()

def response_to_dataframe(response, api_type='GA4'):
    list_rows = []
    metric_map = {
        'ga:sessions': 'sessions',
        'ga:pageviews': 'screenPageViews',
        'ga:users': 'totalUsers',
        'ga:newUsers': 'newUsers'
    }

    if api_type == 'GA4':
        for row in response.rows:
            try:
                date_value = pd.to_datetime(row.dimension_values[0].value, format='%Y%m%d')
            except ValueError:
                date_value = pd.NaT  # Use Not-a-Time for dates that fail to convert
            dim1 = row.dimension_values[1].value
            dim2 = row.dimension_values[2].value
            dim3 = row.dimension_values[3].value
            list_rows.append({
                'date': date_value,
                'browser': dim1,
                'operatingSystem': dim2,
                'deviceCategory': dim3,
                'sessions': pd.to_numeric(row.metric_values[0].value, errors='coerce') or 0,
                'screenPageViews': pd.to_numeric(row.metric_values[1].value, errors='coerce') or 0,
                'totalUsers': pd.to_numeric(row.metric_values[2].value, errors='coerce') or 0,
                'newUsers': pd.to_numeric(row.metric_values[3].value, errors='coerce') or 0
            })
    else:
        for report in response.get('reports', []):
            columnHeader = report.get('columnHeader', {})
            dimensionHeaders = columnHeader.get('dimensions', [])
            metricHeaders = columnHeader.get('metricHeader', {}).get('metricHeaderEntries', [])
            for row in report.get('data', {}).get('rows', []):
                try:
                    date_value = pd.to_datetime(row['dimensions'][0], format='%Y%m%d')
                except ValueError:
                    date_value = pd.NaT
                dim1 = row['dimensions'][1]
                dim2 = row['dimensions'][2]
                dim3 = row['dimensions'][3]
                row_data = {'date': date_value, 'browser': dim1 , 'operatingSystem': dim2 , 'deviceCategory': dim3}
                for i, metric in enumerate(metricHeaders):
                    mapped_name = metric_map.get(metric['name'])
                    if mapped_name:
                        row_data[mapped_name] = pd.to_numeric(row['metrics'][0]['values'][i], errors='coerce', downcast='float') or 0
                list_rows.append(row_data)
    return pd.DataFrame(list_rows)


def unify_and_upload_data(ga4_df, ua_df):
    # Concatenate GA4 and UA data
    unified_df = pd.concat([ga4_df, ua_df], ignore_index=True)

    # Fill NaN values with 0 to maintain uniformity
    unified_df.fillna(0, inplace=True)

    # Ensure 'date' remains as date object and not converted mistakenly
    unified_df['date'] = pd.to_datetime(unified_df['date'])

    # Convert data types
    unified_df = unified_df.astype({
        'sessions': 'int64',
        'screenPageViews': 'int64',
        'totalUsers': 'int64',
        'newUsers': 'int64'
    })

    print(unified_df.dtypes)


    # Create schema for the unified DataFrame
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("browser", "STRING"),
        bigquery.SchemaField("operatingSystem", "STRING"),
        bigquery.SchemaField("deviceCategory", "STRING"),
        bigquery.SchemaField("sessions", "INTEGER"),
        bigquery.SchemaField("screenPageViews", "INTEGER"),
        bigquery.SchemaField("totalUsers", "INTEGER"),
        bigquery.SchemaField("newUsers", "INTEGER")
    ]

    # Configure BigQuery job to partition the table by the 'date' column
    table_id = 'unified_data_platform_device'
    table_ref = f"{bq_client.project}.{BIGQUERY_DATASET}.{table_id}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload to BigQuery
    bq_client.load_table_from_dataframe(unified_df, table_ref, job_config=job_config).result()
    print(f"Unified data uploaded and partitioned by date to {table_ref}")

def main():
    try:
        # GA4 processing
        ga4_response = get_ga4_report()
        ga4_df = response_to_dataframe(ga4_response, api_type='GA4')

        # UA processing
        analytics = initialize_analyticsreporting()
        ua_response = get_ua_report(analytics)
        ua_df = response_to_dataframe(ua_response, api_type='UA')

        # Unify and upload the data
        unify_and_upload_data(ga4_df, ua_df)
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()

date               datetime64[ns]
browser                    object
operatingSystem            object
deviceCategory             object
sessions                    int64
screenPageViews             int64
totalUsers                  int64
newUsers                    int64
dtype: object
Unified data uploaded and partitioned by date to pup-data-warehouse.Siavak_Backfill_Data.unified_data_platform_device


In [7]:
import pandas as pd
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import DateRange, Dimension, Metric, RunReportRequest
from google.oauth2 import service_account
from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
import os
import json
import logging

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# GA4 Authentication and Client Setup
SCOPES_GA = ['https://www.googleapis.com/auth/analytics.readonly', 'https://www.googleapis.com/auth/bigquery']
creds_ga = service_account.Credentials.from_service_account_file(
    config['SERVICE_ACCOUNT_FILE'], scopes=SCOPES_GA)
bq_client = bigquery.Client(credentials=creds_ga, project=config['BIGQUERY_PROJECT'])
client_ga4 = BetaAnalyticsDataClient(credentials=creds_ga)

# UA Variables
SCOPES_UA = ['https://www.googleapis.com/auth/analytics.readonly']
KEY_FILE_LOCATION = config['SERVICE_ACCOUNT_FILE']
VIEW_ID = config['VIEW_ID_UA']
BIGQUERY_PROJECT = config['BIGQUERY_PROJECT']
BIGQUERY_DATASET = config['DATASET_ID']

# Setting up the environment variable for Google Application Credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = KEY_FILE_LOCATION

def initialize_analyticsreporting():
    """Initializes the Google Analytics Reporting API client."""
    credentials = ServiceAccountCredentials.from_json_keyfile_name(KEY_FILE_LOCATION, SCOPES_UA)
    analytics = build('analyticsreporting', 'v4', credentials=credentials)
    return analytics

def get_ga4_report():
    """Fetches GA4 data based on the defined dimensions and metrics."""
    request = RunReportRequest(
        property=f'properties/{config["PROPERTY_ID_GA4"]}',
        date_ranges=[DateRange(start_date=config['INITIAL_FETCH_FROM_DATE'], end_date=config['FETCH_TO_DATE'])],
        dimensions=[Dimension(name='date')


                    ],
        metrics=[

            Metric(name='ecommercePurchases'),
            Metric(name='purchaseRevenue'),
        ],
        limit=10000000
    )
    return client_ga4.run_report(request)

def get_ua_report(analytics):
    """Fetches the report data from Google Analytics UA."""
    return analytics.reports().batchGet(
        body={
            'reportRequests': [
                {
                    'viewId': VIEW_ID,
                    'dateRanges': [{'startDate': config['UA_INITIAL_FETCH_FROM_DATE'], 'endDate': config['UA_FETCH_TO_DATE']}],
                    'metrics': [

                        {'expression': 'ga:uniquePurchases'},
                        {'expression': 'ga:transactionRevenue'}
                    ],
                    'dimensions': [{'name': 'ga:date'}


                                   ],
                    'pageSize': 10000000
                }
            ]
        }
    ).execute()

def response_to_dataframe(response, api_type='GA4'):
    list_rows = []
    metric_map = {

        'ga:uniquePurchases': 'ecommercePurchases',
        'ga:transactionRevenue': 'purchaseRevenue'
    }

    if api_type == 'GA4':
        for row in response.rows:
            try:
                date_value = pd.to_datetime(row.dimension_values[0].value, format='%Y%m%d')
            except ValueError:
                date_value = pd.NaT  # Use Not-a-Time for dates that fail to convert

            list_rows.append({
                'date': date_value,


                'ecommercePurchases': pd.to_numeric(row.metric_values[0].value, errors='coerce') or 0,
                'purchaseRevenue': pd.to_numeric(row.metric_values[1].value, errors='coerce') or 0

            })
    else:
        for report in response.get('reports', []):
            columnHeader = report.get('columnHeader', {})
            dimensionHeaders = columnHeader.get('dimensions', [])
            metricHeaders = columnHeader.get('metricHeader', {}).get('metricHeaderEntries', [])
            for row in report.get('data', {}).get('rows', []):
                try:
                    date_value = pd.to_datetime(row['dimensions'][0], format='%Y%m%d')
                except ValueError:
                    date_value = pd.NaT


                row_data = {'date': date_value }
                for i, metric in enumerate(metricHeaders):
                    mapped_name = metric_map.get(metric['name'])
                    if mapped_name:
                        row_data[mapped_name] = pd.to_numeric(row['metrics'][0]['values'][i], errors='coerce', downcast='float') or 0

                list_rows.append(row_data)
    return pd.DataFrame(list_rows)


def unify_and_upload_data(ga4_df, ua_df):
    # Concatenate GA4 and UA data
    unified_df = pd.concat([ga4_df, ua_df], ignore_index=True)

    # Fill NaN values with 0 to maintain uniformity
    unified_df.fillna(0, inplace=True)

    # Ensure 'date' remains as date object and not converted mistakenly
    unified_df['date'] = pd.to_datetime(unified_df['date'])

    # Convert data types
    unified_df = unified_df.astype({

        'ecommercePurchases': 'int64',
        'purchaseRevenue': 'float'
    })

    print(unified_df.dtypes)


    # Create schema for the unified DataFrame
    schema = [
        bigquery.SchemaField("date", "DATE"),

        bigquery.SchemaField("ecommercePurchases", "INTEGER"),
        bigquery.SchemaField("purchaseRevenue", "FLOAT")
    ]

    # Configure BigQuery job to partition the table by the 'date' column
    table_id = 'unified_data_purchases'
    table_ref = f"{bq_client.project}.{BIGQUERY_DATASET}.{table_id}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload to BigQuery
    bq_client.load_table_from_dataframe(unified_df, table_ref, job_config=job_config).result()
    print(f"Unified data uploaded and partitioned by date to {table_ref}")

def main():
    try:
        # GA4 processing
        ga4_response = get_ga4_report()
        ga4_df = response_to_dataframe(ga4_response, api_type='GA4')

        # UA processing
        analytics = initialize_analyticsreporting()
        ua_response = get_ua_report(analytics)
        ua_df = response_to_dataframe(ua_response, api_type='UA')

        # Unify and upload the data
        unify_and_upload_data(ga4_df, ua_df)
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()

Error occurred: 'date'


In [9]:
import pandas as pd
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import DateRange, Dimension, Metric, RunReportRequest
from google.oauth2 import service_account
from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
import os
import json
import logging

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# GA4 Authentication and Client Setup
SCOPES_GA = ['https://www.googleapis.com/auth/analytics.readonly', 'https://www.googleapis.com/auth/bigquery']
creds_ga = service_account.Credentials.from_service_account_file(
    config['SERVICE_ACCOUNT_FILE'], scopes=SCOPES_GA)
bq_client = bigquery.Client(credentials=creds_ga, project=config['BIGQUERY_PROJECT'])
client_ga4 = BetaAnalyticsDataClient(credentials=creds_ga)

# UA Variables
SCOPES_UA = ['https://www.googleapis.com/auth/analytics.readonly']
KEY_FILE_LOCATION = config['SERVICE_ACCOUNT_FILE']
VIEW_ID = config['VIEW_ID_UA']
BIGQUERY_PROJECT = config['BIGQUERY_PROJECT']
BIGQUERY_DATASET = config['DATASET_ID']

# Setting up the environment variable for Google Application Credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = KEY_FILE_LOCATION

def initialize_analyticsreporting():
    """Initializes the Google Analytics Reporting API client."""
    credentials = ServiceAccountCredentials.from_json_keyfile_name(KEY_FILE_LOCATION, SCOPES_UA)
    analytics = build('analyticsreporting', 'v4', credentials=credentials)
    return analytics

def get_ga4_report():
    """Fetches GA4 data based on the defined dimensions and metrics."""
    request = RunReportRequest(
        property=f'properties/{config["PROPERTY_ID_GA4"]}',
        date_ranges=[DateRange(start_date=config['INITIAL_FETCH_FROM_DATE'], end_date=config['FETCH_TO_DATE'])],
        dimensions=[Dimension(name='date')


                    ],
        metrics=[

            Metric(name='ecommercePurchases'),
            Metric(name='purchaseRevenue'),
        ],
        limit=10000000
    )
    return client_ga4.run_report(request)

def get_ua_report(analytics):
    """Fetches the report data from Google Analytics UA."""
    return analytics.reports().batchGet(
        body={
            'reportRequests': [
                {
                    'viewId': VIEW_ID,
                    'dateRanges': [{'startDate': config['UA_INITIAL_FETCH_FROM_DATE'], 'endDate': config['UA_FETCH_TO_DATE']}],
                    'metrics': [

                        {'expression': 'ga:uniquePurchases'},
                        {'expression': 'ga:transactionRevenue'}
                    ],
                    'dimensions': [{'name': 'ga:date'}


                                   ],
                    'pageSize': 10000000
                }
            ]
        }
    ).execute()

def response_to_dataframe(response, api_type='GA4'):
    list_rows = []
    metric_map = {

        'ga:uniquePurchases': 'ecommercePurchases',
        'ga:transactionRevenue': 'purchaseRevenue'
    }

    if api_type == 'GA4':
        for row in response.rows:
            try:
                date_value = pd.to_datetime(row.dimension_values[0].value, format='%Y%m%d')
            except ValueError:
                date_value = pd.NaT  # Use Not-a-Time for dates that fail to convert

            list_rows.append({
                'date': date_value,


                'ecommercePurchases': pd.to_numeric(row.metric_values[0].value, errors='coerce') or 0,
                'purchaseRevenue': pd.to_numeric(row.metric_values[1].value, errors='coerce') or 0

            })
    else:
        for report in response.get('reports', []):
            columnHeader = report.get('columnHeader', {})
            dimensionHeaders = columnHeader.get('dimensions', [])
            metricHeaders = columnHeader.get('metricHeader', {}).get('metricHeaderEntries', [])
            for row in report.get('data', {}).get('rows', []):
                try:
                    date_value = pd.to_datetime(row['dimensions'][0], format='%Y%m%d')
                except ValueError:
                    date_value = pd.NaT


                row_data = {'date': date_value }
                for i, metric in enumerate(metricHeaders):
                    mapped_name = metric_map.get(metric['name'])
                    if mapped_name:
                        row_data[mapped_name] = pd.to_numeric(row['metrics'][0]['values'][i], errors='coerce', downcast='float') or 0

                list_rows.append(row_data)
    return pd.DataFrame(list_rows)


def unify_and_upload_data(ga4_df, ua_df):
    # Concatenate GA4 and UA data
    unified_df = pd.concat([ga4_df, ua_df], ignore_index=True)

    # Fill NaN values with 0 to maintain uniformity
    unified_df.fillna(0, inplace=True)

    # Ensure 'date' remains as date object and not converted mistakenly
    unified_df['date'] = pd.to_datetime(unified_df['date'])

    # Convert data types
    unified_df = unified_df.astype({

        'ecommercePurchases': 'int64',
        'purchaseRevenue': 'float'
    })

    print(unified_df.dtypes)


    # Create schema for the unified DataFrame
    schema = [
        bigquery.SchemaField("date", "DATE"),

        bigquery.SchemaField("ecommercePurchases", "INTEGER"),
        bigquery.SchemaField("purchaseRevenue", "FLOAT")
    ]

    # Configure BigQuery job to partition the table by the 'date' column
    table_id = 'unified_data_purchases'
    table_ref = f"{bq_client.project}.{BIGQUERY_DATASET}.{table_id}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload to BigQuery
    bq_client.load_table_from_dataframe(unified_df, table_ref, job_config=job_config).result()
    print(f"Unified data uploaded and partitioned by date to {table_ref}")

def main():
    try:
        # GA4 processing
        ga4_response = get_ga4_report()
        ga4_df = response_to_dataframe(ga4_response, api_type='GA4')

        # UA processing
        analytics = initialize_analyticsreporting()
        ua_response = get_ua_report(analytics)
        ua_df = response_to_dataframe(ua_response, api_type='UA')

        # Unify and upload the data
        unify_and_upload_data(ga4_df, ua_df)
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()

Error occurred: 'date'


In [10]:
import pandas as pd
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import DateRange, Dimension, Metric, RunReportRequest
from google.oauth2 import service_account
from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
import os
import json
import logging

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# GA4 Authentication and Client Setup
SCOPES_GA = ['https://www.googleapis.com/auth/analytics.readonly', 'https://www.googleapis.com/auth/bigquery']
creds_ga = service_account.Credentials.from_service_account_file(
    config['SERVICE_ACCOUNT_FILE'], scopes=SCOPES_GA)
bq_client = bigquery.Client(credentials=creds_ga, project=config['BIGQUERY_PROJECT'])
client_ga4 = BetaAnalyticsDataClient(credentials=creds_ga)

# UA Variables
SCOPES_UA = ['https://www.googleapis.com/auth/analytics.readonly']
KEY_FILE_LOCATION = config['SERVICE_ACCOUNT_FILE']
VIEW_ID = config['VIEW_ID_UA']
BIGQUERY_PROJECT = config['BIGQUERY_PROJECT']
BIGQUERY_DATASET = config['DATASET_ID']

# Setting up the environment variable for Google Application Credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = KEY_FILE_LOCATION

def initialize_analyticsreporting():
    """Initializes the Google Analytics Reporting API client."""
    credentials = ServiceAccountCredentials.from_json_keyfile_name(KEY_FILE_LOCATION, SCOPES_UA)
    analytics = build('analyticsreporting', 'v4', credentials=credentials)
    return analytics

def get_ga4_report():
    """Fetches GA4 data based on the defined dimensions and metrics."""
    request = RunReportRequest(
        property=f'properties/{config["PROPERTY_ID_GA4"]}',
        date_ranges=[DateRange(start_date=config['INITIAL_FETCH_FROM_DATE'], end_date=config['FETCH_TO_DATE'])],
        dimensions=[Dimension(name='date'),
                    Dimension(name='sessionPrimaryChannelGroup')

                    ],
        metrics=[
            Metric(name='sessions'),
            Metric(name='totalUsers'),
            Metric(name='newUsers'),
            Metric(name='ecommercePurchases'),
            Metric(name='purchaseRevenue'),
        ],
        limit=10000000
    )
    return client_ga4.run_report(request)

def get_ua_report(analytics):
    """Fetches the report data from Google Analytics UA."""
    return analytics.reports().batchGet(
        body={
            'reportRequests': [
                {
                    'viewId': VIEW_ID,
                    'dateRanges': [{'startDate': config['UA_INITIAL_FETCH_FROM_DATE'], 'endDate': config['UA_FETCH_TO_DATE']}],
                    'metrics': [
                        {'expression': 'ga:sessions'},
                        {'expression': 'ga:users'},
                        {'expression': 'ga:newUsers'},
                        {'expression': 'ga:uniquePurchases'},
                        {'expression': 'ga:transactionRevenue'}
                    ],
                    'dimensions': [{'name': 'ga:date'},
                                   {'name': 'ga:channelGrouping'}

                                   ],
                    'pageSize': 10000000
                }
            ]
        }
    ).execute()

def response_to_dataframe(response, api_type='GA4'):
    list_rows = []
    metric_map = {
        'ga:sessions': 'sessions',
        'ga:users': 'totalUsers',
        'ga:newUsers': 'newUsers',
        'ga:uniquePurchases': 'ecommercePurchases',
        'ga:transactionRevenue': 'purchaseRevenue'
    }

    if api_type == 'GA4':
        for row in response.rows:
            try:
                date_value = pd.to_datetime(row.dimension_values[0].value, format='%Y%m%d')
            except ValueError:
                date_value = pd.NaT  # Use Not-a-Time for dates that fail to convert
            dim1 = row.dimension_values[1].value

            list_rows.append({
                'date': date_value,
                'sessionPrimaryChannelGroup': dim1,

                'sessions': pd.to_numeric(row.metric_values[0].value, errors='coerce') or 0,
                'totalUsers': pd.to_numeric(row.metric_values[1].value, errors='coerce') or 0,
                'newUsers': pd.to_numeric(row.metric_values[2].value, errors='coerce') or 0,
                'ecommercePurchases': pd.to_numeric(row.metric_values[3].value, errors='coerce') or 0,
                'purchaseRevenue': pd.to_numeric(row.metric_values[4].value, errors='coerce') or 0

            })
    else:
        for report in response.get('reports', []):
            columnHeader = report.get('columnHeader', {})
            dimensionHeaders = columnHeader.get('dimensions', [])
            metricHeaders = columnHeader.get('metricHeader', {}).get('metricHeaderEntries', [])
            for row in report.get('data', {}).get('rows', []):
                try:
                    date_value = pd.to_datetime(row['dimensions'][0], format='%Y%m%d')
                except ValueError:
                    date_value = pd.NaT
                dim1 = row['dimensions'][1]

                row_data = {'date': date_value, 'sessionPrimaryChannelGroup': dim1 }
                for i, metric in enumerate(metricHeaders):
                    mapped_name = metric_map.get(metric['name'])
                    if mapped_name:
                        row_data[mapped_name] = pd.to_numeric(row['metrics'][0]['values'][i], errors='coerce', downcast='float') or 0
                list_rows.append(row_data)
    return pd.DataFrame(list_rows)


def unify_and_upload_data(ga4_df, ua_df):
    # Concatenate GA4 and UA data
    unified_df = pd.concat([ga4_df, ua_df], ignore_index=True)

    # Fill NaN values with 0 to maintain uniformity
    unified_df.fillna(0, inplace=True)

    # Ensure 'date' remains as date object and not converted mistakenly
    unified_df['date'] = pd.to_datetime(unified_df['date'])

    # Convert data types
    unified_df = unified_df.astype({
        'sessions': 'int64',
        'totalUsers': 'int64',
        'newUsers': 'int64',
        'ecommercePurchases': 'int64',
        'purchaseRevenue': 'float'
    })

    print(unified_df.dtypes)


    # Create schema for the unified DataFrame
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("sessionPrimaryChannelGroup", "STRING"),
        bigquery.SchemaField("sessions", "INTEGER"),
        bigquery.SchemaField("totalUsers", "INTEGER"),
        bigquery.SchemaField("newUsers", "INTEGER"),
        bigquery.SchemaField("ecommercePurchases", "INTEGER"),
        bigquery.SchemaField("purchaseRevenue", "FLOAT")
    ]

    # Configure BigQuery job to partition the table by the 'date' column
    table_id = 'unified_data_session_channel_group'
    table_ref = f"{bq_client.project}.{BIGQUERY_DATASET}.{table_id}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload to BigQuery
    bq_client.load_table_from_dataframe(unified_df, table_ref, job_config=job_config).result()
    print(f"Unified data uploaded and partitioned by date to {table_ref}")

def main():
    try:
        # GA4 processing
        ga4_response = get_ga4_report()
        ga4_df = response_to_dataframe(ga4_response, api_type='GA4')

        # UA processing
        analytics = initialize_analyticsreporting()
        ua_response = get_ua_report(analytics)
        ua_df = response_to_dataframe(ua_response, api_type='UA')

        # Unify and upload the data
        unify_and_upload_data(ga4_df, ua_df)
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()

date                          datetime64[ns]
sessionPrimaryChannelGroup            object
sessions                               int64
totalUsers                             int64
newUsers                               int64
ecommercePurchases                     int64
purchaseRevenue                      float64
dtype: object
Unified data uploaded and partitioned by date to pup-data-warehouse.Siavak_Backfill_Data.unified_data_session_channel_group


In [12]:
import pandas as pd
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import DateRange, Dimension, Metric, RunReportRequest
from google.oauth2 import service_account
from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
import os
import json
import logging

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# GA4 Authentication and Client Setup
SCOPES_GA = ['https://www.googleapis.com/auth/analytics.readonly', 'https://www.googleapis.com/auth/bigquery']
creds_ga = service_account.Credentials.from_service_account_file(
    config['SERVICE_ACCOUNT_FILE'], scopes=SCOPES_GA)
bq_client = bigquery.Client(credentials=creds_ga, project=config['BIGQUERY_PROJECT'])
client_ga4 = BetaAnalyticsDataClient(credentials=creds_ga)

# UA Variables
SCOPES_UA = ['https://www.googleapis.com/auth/analytics.readonly']
KEY_FILE_LOCATION = config['SERVICE_ACCOUNT_FILE']
VIEW_ID = config['VIEW_ID_UA']
BIGQUERY_PROJECT = config['BIGQUERY_PROJECT']
BIGQUERY_DATASET = config['DATASET_ID']

# Setting up the environment variable for Google Application Credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = KEY_FILE_LOCATION

def initialize_analyticsreporting():
    """Initializes the Google Analytics Reporting API client."""
    credentials = ServiceAccountCredentials.from_json_keyfile_name(KEY_FILE_LOCATION, SCOPES_UA)
    analytics = build('analyticsreporting', 'v4', credentials=credentials)
    return analytics

def get_ga4_report():
    """Fetches GA4 data based on the defined dimensions and metrics."""
    request = RunReportRequest(
        property=f'properties/{config["PROPERTY_ID_GA4"]}',
        date_ranges=[DateRange(start_date=config['INITIAL_FETCH_FROM_DATE'], end_date=config['FETCH_TO_DATE'])],
        dimensions=[Dimension(name='date')],
        metrics=[Metric(name='sessions')],
        limit=100000
    )
    return client_ga4.run_report(request)

def get_ua_report(analytics):
    """Fetches the report data from Google Analytics UA."""
    return analytics.reports().batchGet(
        body={
            'reportRequests': [
                {
                    'viewId': VIEW_ID,
                    'dateRanges': [{'startDate': config['UA_INITIAL_FETCH_FROM_DATE'], 'endDate': config['UA_FETCH_TO_DATE']}],
                    'metrics': [{'expression': 'ga:sessions'}],
                    'dimensions': [{'name': 'ga:date'}],
                    'pageSize': 10000
                }
            ]
        }
    ).execute()



def response_to_dataframe(response, api_type='GA4'):
    if api_type == 'GA4':
        list_rows = []
        for row in response.rows:
            row_data = {
                'date': row.dimension_values[0].value,
                'Sessions': int(row.metric_values[0].value)
            }
            list_rows.append(row_data)
        return pd.DataFrame(list_rows)
    else:
        list_rows = []
        for report in response.get('reports', []):
            columnHeader = report.get('columnHeader', {})
            dimensionHeaders = columnHeader.get('dimensions', [])
            metricHeaders = columnHeader.get('metricHeader', {}).get('metricHeaderEntries', [])
            for row in report.get('data', {}).get('rows', []):
                row_data = {header: row['dimensions'][i] for i, header in enumerate(dimensionHeaders)}
                for metric, values in zip(metricHeaders, row['metrics'][0]['values']):
                    row_data[metric['name']] = values
                list_rows.append(row_data)
        return pd.DataFrame(list_rows)

def unify_and_upload_data(ga4_df, ua_df):
    # Standardize column names for UA
    ua_df.columns = ['date', 'Sessions']

    # Concatenate GA4 and UA data
    unified_df = pd.concat([ga4_df, ua_df], ignore_index=True)

    # Convert the 'date' column from string to datetime
    unified_df['date'] = pd.to_datetime(unified_df['date']).dt.date  # Ensuring date format

    # Ensure data types match expected BigQuery types
    unified_df['Sessions'] = unified_df['Sessions'].astype(int)  # Ensuring integers

    # Print data types for verification
    print(unified_df.dtypes)

    # Create schema for the unified DataFrame
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("Sessions", "INTEGER")
    ]

    # Configure BigQuery job to partition the table by the 'date' column
    table_id = 'unified_data_sessions'
    table_ref = f"{bq_client.project}.{BIGQUERY_DATASET}.{table_id}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload to BigQuery
    bq_client.load_table_from_dataframe(unified_df, table_ref, job_config=job_config).result()
    print(f"Unified data uploaded and partitioned by date to {table_ref}")


def main():
    try:
        # GA4 processing
        ga4_response = get_ga4_report()
        ga4_df = response_to_dataframe(ga4_response, api_type='GA4')

        # UA processing
        analytics = initialize_analyticsreporting()
        ua_response = get_ua_report(analytics)
        ua_df = response_to_dataframe(ua_response, api_type='UA')

        # Unify and upload the data
        unify_and_upload_data(ga4_df, ua_df)
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()


date        object
Sessions     int64
dtype: object
Unified data uploaded and partitioned by date to pup-data-warehouse.Siavak_Backfill_Data.unified_data_sessions


In [13]:
import pandas as pd
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import DateRange, Dimension, Metric, RunReportRequest
from google.oauth2 import service_account
from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
import os
import json
import logging

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# GA4 Authentication and Client Setup
SCOPES_GA = ['https://www.googleapis.com/auth/analytics.readonly', 'https://www.googleapis.com/auth/bigquery']
creds_ga = service_account.Credentials.from_service_account_file(
    config['SERVICE_ACCOUNT_FILE'], scopes=SCOPES_GA)
bq_client = bigquery.Client(credentials=creds_ga, project=config['BIGQUERY_PROJECT'])
client_ga4 = BetaAnalyticsDataClient(credentials=creds_ga)

# UA Variables
SCOPES_UA = ['https://www.googleapis.com/auth/analytics.readonly']
KEY_FILE_LOCATION = config['SERVICE_ACCOUNT_FILE']
VIEW_ID = config['VIEW_ID_UA']
BIGQUERY_PROJECT = config['BIGQUERY_PROJECT']
BIGQUERY_DATASET = config['DATASET_ID']

# Setting up the environment variable for Google Application Credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = KEY_FILE_LOCATION

def initialize_analyticsreporting():
    """Initializes the Google Analytics Reporting API client."""
    credentials = ServiceAccountCredentials.from_json_keyfile_name(KEY_FILE_LOCATION, SCOPES_UA)
    analytics = build('analyticsreporting', 'v4', credentials=credentials)
    return analytics

def get_ga4_report():
    """Fetches GA4 data based on the defined dimensions and metrics."""
    request = RunReportRequest(
        property=f'properties/{config["PROPERTY_ID_GA4"]}',
        date_ranges=[DateRange(start_date=config['INITIAL_FETCH_FROM_DATE'], end_date=config['FETCH_TO_DATE'])],
        dimensions=[Dimension(name='date'),
                    Dimension(name='sessionSource'), Dimension(name='sessionCampaignName'), Dimension(name='sessionMedium')

                    ],
        metrics=[
            Metric(name='sessions'),
            Metric(name='totalUsers'),
            Metric(name='newUsers'),
            Metric(name='ecommercePurchases'),
            Metric(name='purchaseRevenue'),
        ],
        limit=10000000
    )
    return client_ga4.run_report(request)

def get_ua_report(analytics):
    """Fetches the report data from Google Analytics UA."""
    return analytics.reports().batchGet(
        body={
            'reportRequests': [
                {
                    'viewId': VIEW_ID,
                    'dateRanges': [{'startDate': config['UA_INITIAL_FETCH_FROM_DATE'], 'endDate': config['UA_FETCH_TO_DATE']}],
                    'metrics': [
                        {'expression': 'ga:sessions'},
                        {'expression': 'ga:users'},
                        {'expression': 'ga:newUsers'},
                        {'expression': 'ga:uniquePurchases'},
                        {'expression': 'ga:transactionRevenue'}
                    ],
                    'dimensions': [{'name': 'ga:date'},
                                   {'name': 'ga:source'},{'name': 'ga:campaign'},{'name': 'ga:medium'}

                                   ],
                    'pageSize': 10000000
                }
            ]
        }
    ).execute()

def response_to_dataframe(response, api_type='GA4'):
    list_rows = []
    metric_map = {
        'ga:sessions': 'sessions',
        'ga:users': 'totalUsers',
        'ga:newUsers': 'newUsers',
        'ga:uniquePurchases': 'ecommercePurchases',
        'ga:transactionRevenue': 'purchaseRevenue'
    }

    if api_type == 'GA4':
        for row in response.rows:
            try:
                date_value = pd.to_datetime(row.dimension_values[0].value, format='%Y%m%d')
            except ValueError:
                date_value = pd.NaT  # Use Not-a-Time for dates that fail to convert
            dim1 = row.dimension_values[1].value
            dim2 = row.dimension_values[2].value
            dim3 = row.dimension_values[3].value

            list_rows.append({
                'date': date_value,
                'sessionSource': dim1,
                'sessionCampaignName': dim2,
                'sessionMedium': dim3,

                'sessions': pd.to_numeric(row.metric_values[0].value, errors='coerce') or 0,
                'totalUsers': pd.to_numeric(row.metric_values[1].value, errors='coerce') or 0,
                 'newUsers': pd.to_numeric(row.metric_values[2].value, errors='coerce') or 0,
                'ecommercePurchases': pd.to_numeric(row.metric_values[3].value, errors='coerce') or 0,
                'purchaseRevenue': pd.to_numeric(row.metric_values[4].value, errors='coerce') or 0

            })
    else:
        for report in response.get('reports', []):
            columnHeader = report.get('columnHeader', {})
            dimensionHeaders = columnHeader.get('dimensions', [])
            metricHeaders = columnHeader.get('metricHeader', {}).get('metricHeaderEntries', [])
            for row in report.get('data', {}).get('rows', []):
                try:
                    date_value = pd.to_datetime(row['dimensions'][0], format='%Y%m%d')
                except ValueError:
                    date_value = pd.NaT
                dim1 = row['dimensions'][1]
                dim2 = row['dimensions'][2]
                dim3 = row['dimensions'][3]

                row_data = {'date': date_value,'sessionSource': dim1,
                'sessionCampaignName': dim2,
                'sessionMedium': dim3}
                for i, metric in enumerate(metricHeaders):
                    mapped_name = metric_map.get(metric['name'])
                    if mapped_name:
                        row_data[mapped_name] = pd.to_numeric(row['metrics'][0]['values'][i], errors='coerce', downcast='float') or 0
                list_rows.append(row_data)
    return pd.DataFrame(list_rows)


def unify_and_upload_data(ga4_df, ua_df):
    # Concatenate GA4 and UA data
    unified_df = pd.concat([ga4_df, ua_df], ignore_index=True)

    # Fill NaN values with 0 to maintain uniformity
    unified_df.fillna(0, inplace=True)

    # Ensure 'date' remains as date object and not converted mistakenly
    unified_df['date'] = pd.to_datetime(unified_df['date'])

    # Convert data types
    unified_df = unified_df.astype({
        'sessions': 'int64',
        'totalUsers': 'int64',
        'newUsers': 'int64',
        'ecommercePurchases': 'int64',
        'purchaseRevenue': 'float'
    })

    print(unified_df.dtypes)


    # Create schema for the unified DataFrame
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("sessionSource", "STRING"),
        bigquery.SchemaField("sessionCampaignName", "STRING"),
        bigquery.SchemaField("sessionMedium", "STRING"),
        bigquery.SchemaField("sessions", "INTEGER"),
        bigquery.SchemaField("totalUsers", "INTEGER"),
        bigquery.SchemaField("newUsers", "INTEGER"),
        bigquery.SchemaField("ecommercePurchases", "INTEGER"),
        bigquery.SchemaField("purchaseRevenue", "FLOAT")
    ]

    # Configure BigQuery job to partition the table by the 'date' column
    table_id = 'unified_data_source_campaign_medium'
    table_ref = f"{bq_client.project}.{BIGQUERY_DATASET}.{table_id}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload to BigQuery
    bq_client.load_table_from_dataframe(unified_df, table_ref, job_config=job_config).result()
    print(f"Unified data uploaded and partitioned by date to {table_ref}")

def main():
    try:
        # GA4 processing
        ga4_response = get_ga4_report()
        ga4_df = response_to_dataframe(ga4_response, api_type='GA4')

        # UA processing
        analytics = initialize_analyticsreporting()
        ua_response = get_ua_report(analytics)
        ua_df = response_to_dataframe(ua_response, api_type='UA')

        # Unify and upload the data
        unify_and_upload_data(ga4_df, ua_df)
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()

date                   datetime64[ns]
sessionSource                  object
sessionCampaignName            object
sessionMedium                  object
sessions                        int64
totalUsers                      int64
newUsers                        int64
ecommercePurchases              int64
purchaseRevenue               float64
dtype: object
Unified data uploaded and partitioned by date to pup-data-warehouse.Siavak_Backfill_Data.unified_data_source_campaign_medium


In [14]:
import pandas as pd
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import DateRange, Dimension, Metric, RunReportRequest
from google.oauth2 import service_account
from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
import os
import json
import logging

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# GA4 Authentication and Client Setup
SCOPES_GA = ['https://www.googleapis.com/auth/analytics.readonly', 'https://www.googleapis.com/auth/bigquery']
creds_ga = service_account.Credentials.from_service_account_file(
    config['SERVICE_ACCOUNT_FILE'], scopes=SCOPES_GA)
bq_client = bigquery.Client(credentials=creds_ga, project=config['BIGQUERY_PROJECT'])
client_ga4 = BetaAnalyticsDataClient(credentials=creds_ga)

# UA Variables
SCOPES_UA = ['https://www.googleapis.com/auth/analytics.readonly']
KEY_FILE_LOCATION = config['SERVICE_ACCOUNT_FILE']
VIEW_ID = config['VIEW_ID_UA']
BIGQUERY_PROJECT = config['BIGQUERY_PROJECT']
BIGQUERY_DATASET = config['DATASET_ID']

# Setting up the environment variable for Google Application Credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = KEY_FILE_LOCATION

def initialize_analyticsreporting():
    """Initializes the Google Analytics Reporting API client."""
    credentials = ServiceAccountCredentials.from_json_keyfile_name(KEY_FILE_LOCATION, SCOPES_UA)
    analytics = build('analyticsreporting', 'v4', credentials=credentials)
    return analytics

def get_ga4_report():
    """Fetches GA4 data based on the defined dimensions and metrics."""
    request = RunReportRequest(
        property=f'properties/{config["PROPERTY_ID_GA4"]}',
        date_ranges=[DateRange(start_date=config['INITIAL_FETCH_FROM_DATE'], end_date=config['FETCH_TO_DATE'])],
        dimensions=[Dimension(name='date')
                    ],
        metrics=[
            Metric(name='totalUsers'),
            Metric(name='newUsers')

        ],
        limit=10000000
    )
    return client_ga4.run_report(request)

def get_ua_report(analytics):
    """Fetches the report data from Google Analytics UA."""
    return analytics.reports().batchGet(
        body={
            'reportRequests': [
                {
                    'viewId': VIEW_ID,
                    'dateRanges': [{'startDate': config['UA_INITIAL_FETCH_FROM_DATE'], 'endDate': config['UA_FETCH_TO_DATE']}],
                    'metrics': [
                        {'expression': 'ga:users'},
                        {'expression': 'ga:newUsers'}

                    ],
                    'dimensions': [{'name': 'ga:date'}

                                   ],
                    'pageSize': 10000000
                }
            ]
        }
    ).execute()

def response_to_dataframe(response, api_type='GA4'):
    list_rows = []
    metric_map = {

        'ga:users': 'totalUsers',
        'ga:newUsers': 'newUsers'

    }

    if api_type == 'GA4':
        for row in response.rows:
            try:
                date_value = pd.to_datetime(row.dimension_values[0].value, format='%Y%m%d')
            except ValueError:
                date_value = pd.NaT  # Use Not-a-Time for dates that fail to convert


            list_rows.append({
                'date': date_value,

                'totalUsers': pd.to_numeric(row.metric_values[0].value, errors='coerce') or 0,
                 'newUsers': pd.to_numeric(row.metric_values[1].value, errors='coerce') or 0


            })
    else:
        for report in response.get('reports', []):
            columnHeader = report.get('columnHeader', {})
            dimensionHeaders = columnHeader.get('dimensions', [])
            metricHeaders = columnHeader.get('metricHeader', {}).get('metricHeaderEntries', [])
            for row in report.get('data', {}).get('rows', []):
                try:
                    date_value = pd.to_datetime(row['dimensions'][0], format='%Y%m%d')
                except ValueError:
                    date_value = pd.NaT

                row_data = {'date': date_value}
                for i, metric in enumerate(metricHeaders):
                    mapped_name = metric_map.get(metric['name'])
                    if mapped_name:
                        row_data[mapped_name] = pd.to_numeric(row['metrics'][0]['values'][i], errors='coerce', downcast='float') or 0
                list_rows.append(row_data)
    return pd.DataFrame(list_rows)


def unify_and_upload_data(ga4_df, ua_df):
    # Concatenate GA4 and UA data
    unified_df = pd.concat([ga4_df, ua_df], ignore_index=True)

    # Fill NaN values with 0 to maintain uniformity
    unified_df.fillna(0, inplace=True)

    # Ensure 'date' remains as date object and not converted mistakenly
    unified_df['date'] = pd.to_datetime(unified_df['date'])

    # Convert data types
    unified_df = unified_df.astype({

        'totalUsers': 'int64',
        'newUsers': 'int64'
    })

    print(unified_df.dtypes)


    # Create schema for the unified DataFrame
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("totalUsers", "INTEGER"),
        bigquery.SchemaField("newUsers", "INTEGER")

    ]

    # Configure BigQuery job to partition the table by the 'date' column
    table_id = 'unified_data_users_newusers'
    table_ref = f"{bq_client.project}.{BIGQUERY_DATASET}.{table_id}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload to BigQuery
    bq_client.load_table_from_dataframe(unified_df, table_ref, job_config=job_config).result()
    print(f"Unified data uploaded and partitioned by date to {table_ref}")

def main():
    try:
        # GA4 processing
        ga4_response = get_ga4_report()
        ga4_df = response_to_dataframe(ga4_response, api_type='GA4')

        # UA processing
        analytics = initialize_analyticsreporting()
        ua_response = get_ua_report(analytics)
        ua_df = response_to_dataframe(ua_response, api_type='UA')

        # Unify and upload the data
        unify_and_upload_data(ga4_df, ua_df)
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()

date          datetime64[ns]
totalUsers             int64
newUsers               int64
dtype: object
Unified data uploaded and partitioned by date to pup-data-warehouse.Siavak_Backfill_Data.unified_data_users_newusers
