<a href="https://colab.research.google.com/github/aliasoblomov/unify_test/blob/main/BFJ_UABackup.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install google-analytics-data==0.18.4
!pip install google-cloud-bigquery
!pip install google-auth==2.27.0
!pip install google-auth-oauthlib
!pip install google-auth-httplib2

In [None]:
import pandas as pd
from google.cloud import bigquery
from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
import os
import json

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# UA Variables
SCOPES_UA = ['https://www.googleapis.com/auth/analytics.readonly']
KEY_FILE_LOCATION = config['SERVICE_ACCOUNT_FILE']
VIEW_ID = config['VIEW_ID_UA']
BIGQUERY_PROJECT = config['BIGQUERY_PROJECT']
BIGQUERY_DATASET = config['DATASET_ID']

# Setting up the environment variable for Google Application Credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = KEY_FILE_LOCATION

def initialize_analyticsreporting():
    """Initializes the Google Analytics Reporting API client."""
    credentials = ServiceAccountCredentials.from_json_keyfile_name(KEY_FILE_LOCATION, SCOPES_UA)
    analytics = build('analyticsreporting', 'v4', credentials=credentials)
    return analytics

def get_ua_report(analytics):
    """Fetches the report data from Google Analytics UA."""
    return analytics.reports().batchGet(
        body={
            'reportRequests': [
                {
                    'viewId': VIEW_ID,
                    'dateRanges': [{'startDate': config['UA_INITIAL_FETCH_FROM_DATE'], 'endDate': config['UA_FETCH_TO_DATE']}],
                    'metrics': [
                        {'expression': 'ga:bounceRate'},
                        {'expression': 'ga:sessionDuration'},
                        {'expression': 'ga:goalCompletionsAll'},
                        {'expression': 'ga:pageviewsPerSession'},
                        {'expression': 'ga:avgTimeOnPage'},
                        {'expression': 'ga:totalEvents'},
                        {'expression': 'ga:hits'}
                    ],
                    'dimensions': [{'name': 'ga:date'}],
                    'pageSize': 10000000
                }
            ]
        }
    ).execute()

def response_to_dataframe(response):
    list_rows = []

    for report in response.get('reports', []):
        columnHeader = report.get('columnHeader', {})
        dimensionHeaders = columnHeader.get('dimensions', [])
        metricHeaders = columnHeader.get('metricHeader', {}).get('metricHeaderEntries', [])
        for row in report.get('data', {}).get('rows', []):
            row_data = {header: row['dimensions'][i] for i, header in enumerate(dimensionHeaders)}
            for i, metric in enumerate(metricHeaders):
                row_data[metric['name']] = float(row['metrics'][0]['values'][i]) if 'values' in row['metrics'][0] else 0.0
            list_rows.append(row_data)
    return pd.DataFrame(list_rows)

def upload_to_bigquery(df, table_name):
    # Rename columns to match BigQuery schema
    df = df.rename(columns={
        'ga:date': 'date',
        'ga:bounceRate': 'bounceRate',
        'ga:sessionDuration': 'sessionDuration',
        'ga:goalCompletionsAll': 'goalCompletionsAll',
        'ga:pageviewsPerSession': 'pageviewsPerSession',
        'ga:avgTimeOnPage': 'avgTimeOnPage',
        'ga:totalEvents': 'totalEvents',
        'ga:hits': 'hits'
    })

    # Create schema for the DataFrame
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("bounceRate", "FLOAT"),
        bigquery.SchemaField("sessionDuration", "FLOAT"),
        bigquery.SchemaField("goalCompletionsAll", "FLOAT"),
        bigquery.SchemaField("pageviewsPerSession", "FLOAT"),
        bigquery.SchemaField("avgTimeOnPage", "FLOAT"),
        bigquery.SchemaField("totalEvents", "INTEGER"),
        bigquery.SchemaField("hits", "INTEGER")
    ]

    # Check that all schema fields are in the DataFrame
    df_columns = set(df.columns)
    schema_fields = set(field.name for field in schema)
    if not schema_fields.issubset(df_columns):
        raise ValueError(f"Schema fields not present in DataFrame: {schema_fields - df_columns}")

    # Convert the 'date' column from string to datetime
    df['date'] = pd.to_datetime(df['date']).dt.date

    # Ensure data types match expected BigQuery types
    df['bounceRate'] = df['bounceRate'].astype(float)
    df['sessionDuration'] = df['sessionDuration'].astype(float)
    df['goalCompletionsAll'] = df['goalCompletionsAll'].astype(float)
    df['pageviewsPerSession'] = df['pageviewsPerSession'].astype(float)
    df['avgTimeOnPage'] = df['avgTimeOnPage'].astype(float)
    df['totalEvents'] = df['totalEvents'].astype(int)
    df['hits'] = df['hits'].astype(int)

    # Print data types for verification
    print(df.dtypes)

    # Configure BigQuery job to partition the table by the 'date' column
    table_ref = f"{BIGQUERY_PROJECT}.{BIGQUERY_DATASET}.{table_name}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload to BigQuery
    bq_client = bigquery.Client()
    bq_client.load_table_from_dataframe(df, table_ref, job_config=job_config).result()
    print(f"Data uploaded and partitioned by date to {table_ref}")

def main():
    try:
        # UA processing
        analytics = initialize_analyticsreporting()
        ua_response = get_ua_report(analytics)
        ua_df = response_to_dataframe(ua_response)

        # Check DataFrame content
        if ua_df.empty:
            print("No data found in the UA response.")
        else:
            print(f"DataFrame columns: {ua_df.columns}")
            print(f"Sample data:\n{ua_df.head()}")

            # Upload the UA DataFrame to BigQuery
            upload_to_bigquery(ua_df, 'ua_metrics_data')
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()

In [None]:
import pandas as pd
from google.cloud import bigquery
from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
import os
import json

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# UA Variables
SCOPES_UA = ['https://www.googleapis.com/auth/analytics.readonly']
KEY_FILE_LOCATION = config['SERVICE_ACCOUNT_FILE']
VIEW_ID = config['VIEW_ID_UA']
BIGQUERY_PROJECT = config['BIGQUERY_PROJECT']
BIGQUERY_DATASET = config['DATASET_ID']

# Setting up the environment variable for Google Application Credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = KEY_FILE_LOCATION

def initialize_analyticsreporting():
    """Initializes the Google Analytics Reporting API client."""
    credentials = ServiceAccountCredentials.from_json_keyfile_name(KEY_FILE_LOCATION, SCOPES_UA)
    analytics = build('analyticsreporting', 'v4', credentials=credentials)
    return analytics

def get_ua_report(analytics):
    """Fetches the report data from Google Analytics UA."""
    return analytics.reports().batchGet(
        body={
            'reportRequests': [
                {
                    'viewId': VIEW_ID,
                    'dateRanges': [{'startDate': config['UA_INITIAL_FETCH_FROM_DATE'], 'endDate': config['UA_FETCH_TO_DATE']}],
                    'metrics': [
                        {'expression': 'ga:users'},
                        {'expression': 'ga:sessions'},
                        {'expression': 'ga:bounceRate'},
                        {'expression': 'ga:pageviews'},
                        {'expression': 'ga:pageviewsPerSession'}
                    ],
                    'dimensions': [
                        {'name': 'ga:date'},
                        {'name': 'ga:userAgeBracket'},
                        {'name': 'ga:userGender'}
                    ],
                    'pageSize': 10000000
                }
            ]
        }
    ).execute()

def response_to_dataframe(response):
    list_rows = []

    for report in response.get('reports', []):
        columnHeader = report.get('columnHeader', {})
        dimensionHeaders = columnHeader.get('dimensions', [])
        metricHeaders = columnHeader.get('metricHeader', {}).get('metricHeaderEntries', [])
        for row in report.get('data', {}).get('rows', []):
            row_data = {header: row['dimensions'][i] for i, header in enumerate(dimensionHeaders)}
            for i, metric in enumerate(metricHeaders):
                row_data[metric['name']] = float(row['metrics'][0]['values'][i]) if 'values' in row['metrics'][0] else 0.0
            list_rows.append(row_data)
    return pd.DataFrame(list_rows)

def upload_to_bigquery(df, table_name):
    # Rename columns to match BigQuery schema
    df = df.rename(columns={
        'ga:date': 'date',
        'ga:userAgeBracket': 'userAgeBracket',
        'ga:userGender': 'userGender',
        'ga:users': 'users',
        'ga:sessions': 'sessions',
        'ga:bounceRate': 'bounceRate',
        'ga:pageviews': 'pageviews',
        'ga:pageviewsPerSession': 'pageviewsPerSession'
    })

    # Create schema for the DataFrame
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("userAgeBracket", "STRING"),
        bigquery.SchemaField("userGender", "STRING"),
        bigquery.SchemaField("users", "INTEGER"),
        bigquery.SchemaField("sessions", "INTEGER"),
        bigquery.SchemaField("bounceRate", "FLOAT"),
        bigquery.SchemaField("pageviews", "INTEGER"),
        bigquery.SchemaField("pageviewsPerSession", "FLOAT")
    ]

    # Check that all schema fields are in the DataFrame
    df_columns = set(df.columns)
    schema_fields = set(field.name for field in schema)
    if not schema_fields.issubset(df_columns):
        raise ValueError(f"Schema fields not present in DataFrame: {schema_fields - df_columns}")

    # Convert the 'date' column from string to datetime
    df['date'] = pd.to_datetime(df['date']).dt.date

    # Ensure data types match expected BigQuery types
    df['users'] = df['users'].astype(int)
    df['sessions'] = df['sessions'].astype(int)
    df['bounceRate'] = df['bounceRate'].astype(float)
    df['pageviews'] = df['pageviews'].astype(int)
    df['pageviewsPerSession'] = df['pageviewsPerSession'].astype(float)

    # Print data types for verification
    print(df.dtypes)

    # Configure BigQuery job to partition the table by the 'date' column
    table_ref = f"{BIGQUERY_PROJECT}.{BIGQUERY_DATASET}.{table_name}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload to BigQuery
    bq_client = bigquery.Client()
    bq_client.load_table_from_dataframe(df, table_ref, job_config=job_config).result()
    print(f"Data uploaded and partitioned by date to {table_ref}")

def main():
    try:
        # UA processing
        analytics = initialize_analyticsreporting()
        ua_response = get_ua_report(analytics)
        ua_df = response_to_dataframe(ua_response)

        # Check DataFrame content
        if ua_df.empty:
            print("No data found in the UA response.")
        else:
            print(f"DataFrame columns: {ua_df.columns}")
            print(f"Sample data:\n{ua_df.head()}")

            # Upload the UA DataFrame to BigQuery
            upload_to_bigquery(ua_df, 'ua_user_metrics_data')
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()

In [None]:
import pandas as pd
from google.cloud import bigquery
from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
import os
import json

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# UA Variables
SCOPES_UA = ['https://www.googleapis.com/auth/analytics.readonly']
KEY_FILE_LOCATION = config['SERVICE_ACCOUNT_FILE']
VIEW_ID = config['VIEW_ID_UA']
BIGQUERY_PROJECT = config['BIGQUERY_PROJECT']
BIGQUERY_DATASET = config['DATASET_ID']

# Setting up the environment variable for Google Application Credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = KEY_FILE_LOCATION

def initialize_analyticsreporting():
    """Initializes the Google Analytics Reporting API client."""
    credentials = ServiceAccountCredentials.from_json_keyfile_name(KEY_FILE_LOCATION, SCOPES_UA)
    analytics = build('analyticsreporting', 'v4', credentials=credentials)
    return analytics

def get_ua_report(analytics):
    """Fetches the report data from Google Analytics UA."""
    return analytics.reports().batchGet(
        body={
            'reportRequests': [
                {
                    'viewId': VIEW_ID,
                    'dateRanges': [{'startDate': config['UA_INITIAL_FETCH_FROM_DATE'], 'endDate': config['UA_FETCH_TO_DATE']}],
                    'metrics': [
                        {'expression': 'ga:sessions'},
                        {'expression': 'ga:users'},
                        {'expression': 'ga:newUsers'},
                        {'expression': 'ga:uniquePurchases'},
                        {'expression': 'ga:transactionRevenue'}
                    ],
                    'dimensions': [{'name': 'ga:date'},
                                   {'name': 'ga:channelGrouping'}],
                    'pageSize': 10000000
                }
            ]
        }
    ).execute()

def response_to_dataframe(response):
    list_rows = []

    for report in response.get('reports', []):
        columnHeader = report.get('columnHeader', {})
        dimensionHeaders = columnHeader.get('dimensions', [])
        metricHeaders = columnHeader.get('metricHeader', {}).get('metricHeaderEntries', [])
        for row in report.get('data', {}).get('rows', []):
            try:
                date_value = pd.to_datetime(row['dimensions'][0], format='%Y%m%d')
            except ValueError:
                date_value = pd.NaT
            dim1 = row['dimensions'][1]
            row_data = {
                'date': date_value,
                'channelGrouping': dim1,
                'sessions': int(row['metrics'][0]['values'][0]),
                'users': int(row['metrics'][0]['values'][1]),
                'newUsers': int(row['metrics'][0]['values'][2]),
                'uniquePurchases': int(row['metrics'][0]['values'][3]),
                'transactionRevenue': float(row['metrics'][0]['values'][4])
            }
            list_rows.append(row_data)
    return pd.DataFrame(list_rows)

def upload_to_bigquery(df, table_name):
    # Create schema for the DataFrame
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("channelGrouping", "STRING"),
        bigquery.SchemaField("sessions", "INTEGER"),
        bigquery.SchemaField("users", "INTEGER"),
        bigquery.SchemaField("newUsers", "INTEGER"),
        bigquery.SchemaField("uniquePurchases", "INTEGER"),
        bigquery.SchemaField("transactionRevenue", "FLOAT")
    ]

    # Check that all schema fields are in the DataFrame
    df_columns = set(df.columns)
    schema_fields = set(field.name for field in schema)
    if not schema_fields.issubset(df_columns):
        raise ValueError(f"Schema fields not present in DataFrame: {schema_fields - df_columns}")

    # Convert the 'date' column from string to datetime
    df['date'] = pd.to_datetime(df['date']).dt.date

    # Ensure data types match expected BigQuery types
    df['sessions'] = df['sessions'].astype(int)
    df['users'] = df['users'].astype(int)
    df['newUsers'] = df['newUsers'].astype(int)
    df['uniquePurchases'] = df['uniquePurchases'].astype(int)
    df['transactionRevenue'] = df['transactionRevenue'].astype(float)

    # Print data types for verification
    print(df.dtypes)

    # Configure BigQuery job to partition the table by the 'date' column
    table_ref = f"{BIGQUERY_PROJECT}.{BIGQUERY_DATASET}.{table_name}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload to BigQuery
    bq_client = bigquery.Client()
    bq_client.load_table_from_dataframe(df, table_ref, job_config=job_config).result()
    print(f"Data uploaded and partitioned by date to {table_ref}")

def main():
    try:
        # UA processing
        analytics = initialize_analyticsreporting()
        ua_response = get_ua_report(analytics)
        ua_df = response_to_dataframe(ua_response)

        # Check DataFrame content
        if ua_df.empty:
            print("No data found in the UA response.")
        else:
            print(f"DataFrame columns: {ua_df.columns}")
            print(f"Sample data:\n{ua_df.head()}")

            # Upload the UA DataFrame to BigQuery
            upload_to_bigquery(ua_df, 'ua_channel_group_data')
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()

In [None]:
import pandas as pd
from google.cloud import bigquery
from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
import os
import json

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# UA Variables
SCOPES_UA = ['https://www.googleapis.com/auth/analytics.readonly']
KEY_FILE_LOCATION = config['SERVICE_ACCOUNT_FILE']
VIEW_ID = config['VIEW_ID_UA']
BIGQUERY_PROJECT = config['BIGQUERY_PROJECT']
BIGQUERY_DATASET = config['DATASET_ID']

# Setting up the environment variable for Google Application Credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = KEY_FILE_LOCATION

def initialize_analyticsreporting():
    """Initializes the Google Analytics Reporting API client."""
    credentials = ServiceAccountCredentials.from_json_keyfile_name(KEY_FILE_LOCATION, SCOPES_UA)
    analytics = build('analyticsreporting', 'v4', credentials=credentials)
    return analytics

def get_ua_report(analytics):
    """Fetches the report data from Google Analytics UA."""
    return analytics.reports().batchGet(
        body={
            'reportRequests': [
                {
                    'viewId': VIEW_ID,
                    'dateRanges': [{'startDate': config['UA_INITIAL_FETCH_FROM_DATE'], 'endDate': config['UA_FETCH_TO_DATE']}],
                    'metrics': [
                        {'expression': 'ga:sessions'},
                        {'expression': 'ga:pageviews'},
                        {'expression': 'ga:users'},
                        {'expression': 'ga:newUsers'}
                    ],
                    'dimensions': [{'name': 'ga:date'},
                                   {'name': 'ga:country'},
                                   {'name': 'ga:language'}
                                   ],
                    'pageSize': 10000000
                }
            ]
        }
    ).execute()

def response_to_dataframe(response):
    list_rows = []

    for report in response.get('reports', []):
        columnHeader = report.get('columnHeader', {})
        dimensionHeaders = columnHeader.get('dimensions', [])
        metricHeaders = columnHeader.get('metricHeader', {}).get('metricHeaderEntries', [])
        for row in report.get('data', {}).get('rows', []):
            try:
                date_value = pd.to_datetime(row['dimensions'][0], format='%Y%m%d')
            except ValueError:
                date_value = pd.NaT
            dim1 = row['dimensions'][1]
            dim2 = row['dimensions'][2]
            row_data = {
                'date': date_value,
                'country': dim1,
                'language': dim2,
                'sessions': int(row['metrics'][0]['values'][0]),
                'pageviews': int(row['metrics'][0]['values'][1]),
                'users': int(row['metrics'][0]['values'][2]),
                'newUsers': int(row['metrics'][0]['values'][3])
            }
            list_rows.append(row_data)
    return pd.DataFrame(list_rows)

def upload_to_bigquery(df, table_name):
    # Create schema for the DataFrame
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("country", "STRING"),
        bigquery.SchemaField("language", "STRING"),
        bigquery.SchemaField("sessions", "INTEGER"),
        bigquery.SchemaField("pageviews", "INTEGER"),
        bigquery.SchemaField("users", "INTEGER"),
        bigquery.SchemaField("newUsers", "INTEGER")
    ]

    # Configure BigQuery job to partition the table by the 'date' column
    table_ref = f"{BIGQUERY_PROJECT}.{BIGQUERY_DATASET}.{table_name}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload to BigQuery
    bq_client = bigquery.Client()
    bq_client.load_table_from_dataframe(df, table_ref, job_config=job_config).result()
    print(f"Data uploaded and partitioned by date to {table_ref}")

def main():
    try:
        # UA processing
        analytics = initialize_analyticsreporting()
        ua_response = get_ua_report(analytics)
        ua_df = response_to_dataframe(ua_response)

        # Upload the UA DataFrame to BigQuery
        upload_to_bigquery(ua_df, 'ua_data_geo_network')
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()


In [None]:
import pandas as pd
from google.cloud import bigquery
from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
import os
import json

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# UA Variables
SCOPES_UA = ['https://www.googleapis.com/auth/analytics.readonly']
KEY_FILE_LOCATION = config['SERVICE_ACCOUNT_FILE']
VIEW_ID = config['VIEW_ID_UA']
BIGQUERY_PROJECT = config['BIGQUERY_PROJECT']
BIGQUERY_DATASET = config['DATASET_ID']

# Setting up the environment variable for Google Application Credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = KEY_FILE_LOCATION

def initialize_analyticsreporting():
    """Initializes the Google Analytics Reporting API client."""
    credentials = ServiceAccountCredentials.from_json_keyfile_name(KEY_FILE_LOCATION, SCOPES_UA)
    analytics = build('analyticsreporting', 'v4', credentials=credentials)
    return analytics

def get_ua_report(analytics):
    """Fetches the report data from Google Analytics UA."""
    return analytics.reports().batchGet(
        body={
            'reportRequests': [
                {
                    'viewId': VIEW_ID,
                    'dateRanges': [{'startDate': config['UA_INITIAL_FETCH_FROM_DATE'], 'endDate': config['UA_FETCH_TO_DATE']}],
                    'metrics': [
                        {'expression': 'ga:totalEvents'},
                        {'expression': 'ga:uniqueEvents'},
                        {'expression': 'ga:eventValue'},
                        {'expression': 'ga:sessionsWithEvent'}
                    ],
                    'dimensions': [
                        {'name': 'ga:date'},
                        {'name': 'ga:eventCategory'},
                        {'name': 'ga:eventAction'},
                        {'name': 'ga:eventLabel'}
                    ],
                    'pageSize': 10000000
                }
            ]
        }
    ).execute()

def response_to_dataframe(response):
    list_rows = []

    for report in response.get('reports', []):
        columnHeader = report.get('columnHeader', {})
        dimensionHeaders = columnHeader.get('dimensions', [])
        metricHeaders = columnHeader.get('metricHeader', {}).get('metricHeaderEntries', [])
        for row in report.get('data', {}).get('rows', []):
            row_data = {header: row['dimensions'][i] for i, header in enumerate(dimensionHeaders)}
            for i, metric in enumerate(metricHeaders):
                row_data[metric['name']] = int(row['metrics'][0]['values'][i]) if 'values' in row['metrics'][0] else 0
            list_rows.append(row_data)
    return pd.DataFrame(list_rows)

def upload_to_bigquery(df, table_name):
    # Rename columns to match BigQuery schema
    df = df.rename(columns={
        'ga:date': 'date',
        'ga:eventCategory': 'eventCategory',
        'ga:eventAction': 'eventAction',
        'ga:eventLabel': 'eventLabel',
        'ga:totalEvents': 'totalEvents',
        'ga:uniqueEvents': 'uniqueEvents',
        'ga:eventValue': 'eventValue',
        'ga:sessionsWithEvent': 'sessionsWithEvent'
    })

    # Create schema for the DataFrame
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("eventCategory", "STRING"),
        bigquery.SchemaField("eventAction", "STRING"),
        bigquery.SchemaField("eventLabel", "STRING"),
        bigquery.SchemaField("totalEvents", "INTEGER"),
        bigquery.SchemaField("uniqueEvents", "INTEGER"),
        bigquery.SchemaField("eventValue", "INTEGER"),
        bigquery.SchemaField("sessionsWithEvent", "INTEGER")
    ]

    # Check that all schema fields are in the DataFrame
    df_columns = set(df.columns)
    schema_fields = set(field.name for field in schema)
    if not schema_fields.issubset(df_columns):
        raise ValueError(f"Schema fields not present in DataFrame: {schema_fields - df_columns}")

    # Convert the 'date' column from string to datetime
    df['date'] = pd.to_datetime(df['date']).dt.date

    # Ensure data types match expected BigQuery types
    df['totalEvents'] = df['totalEvents'].astype(int)
    df['uniqueEvents'] = df['uniqueEvents'].astype(int)
    df['eventValue'] = df['eventValue'].astype(int)
    df['sessionsWithEvent'] = df['sessionsWithEvent'].astype(int)

    # Print data types for verification
    print(df.dtypes)

    # Configure BigQuery job to partition the table by the 'date' column
    table_ref = f"{BIGQUERY_PROJECT}.{BIGQUERY_DATASET}.{table_name}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload to BigQuery
    bq_client = bigquery.Client()
    bq_client.load_table_from_dataframe(df, table_ref, job_config=job_config).result()
    print(f"Data uploaded and partitioned by date to {table_ref}")

def main():
    try:
        # UA processing
        analytics = initialize_analyticsreporting()
        ua_response = get_ua_report(analytics)
        ua_df = response_to_dataframe(ua_response)

        # Check DataFrame content
        if ua_df.empty:
            print("No data found in the UA response.")
        else:
            print(f"DataFrame columns: {ua_df.columns}")
            print(f"Sample data:\n{ua_df.head()}")

            # Upload the UA DataFrame to BigQuery
            upload_to_bigquery(ua_df, 'ua_event_data')
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()

In [None]:
import pandas as pd
from google.cloud import bigquery
from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
import os
import json

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# UA Variables
SCOPES_UA = ['https://www.googleapis.com/auth/analytics.readonly']
KEY_FILE_LOCATION = config['SERVICE_ACCOUNT_FILE']
VIEW_ID = config['VIEW_ID_UA']
BIGQUERY_PROJECT = config['BIGQUERY_PROJECT']
BIGQUERY_DATASET = config['DATASET_ID']

# Setting up the environment variable for Google Application Credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = KEY_FILE_LOCATION

def initialize_analyticsreporting():
    """Initializes the Google Analytics Reporting API client."""
    credentials = ServiceAccountCredentials.from_json_keyfile_name(KEY_FILE_LOCATION, SCOPES_UA)
    analytics = build('analyticsreporting', 'v4', credentials=credentials)
    return analytics

def get_ua_report(analytics):
    """Fetches the report data from Google Analytics UA."""
    return analytics.reports().batchGet(
        body={
            'reportRequests': [
                {
                    'viewId': VIEW_ID,
                    'dateRanges': [{'startDate': config['UA_INITIAL_FETCH_FROM_DATE'], 'endDate': config['UA_FETCH_TO_DATE']}],
                    'metrics': [
                        {'expression': 'ga:goal1Completions'},
                        {'expression': 'ga:goal2Completions'},
                        {'expression': 'ga:goal3Completions'},
                        {'expression': 'ga:goal4Completions'},
                        {'expression': 'ga:goal5Completions'},
                    ],
                    'dimensions': [
                        {'name': 'ga:date'},
                        {'name': 'ga:source'},
                        {'name': 'ga:campaign'},
                        {'name': 'ga:medium'}
                    ],
                    'pageSize': 10000000
                }
            ]
        }
    ).execute()

def response_to_dataframe(response):
    list_rows = []

    for report in response.get('reports', []):
        columnHeader = report.get('columnHeader', {})
        dimensionHeaders = columnHeader.get('dimensions', [])
        metricHeaders = columnHeader.get('metricHeader', {}).get('metricHeaderEntries', [])
        for row in report.get('data', {}).get('rows', []):
            try:
                date_value = pd.to_datetime(row['dimensions'][0], format='%Y%m%d')
            except ValueError:
                date_value = pd.NaT
            dim1 = row['dimensions'][1]
            dim2 = row['dimensions'][2]
            dim3 = row['dimensions'][3]
            row_data = {
                'date': date_value,
                'source': dim1,
                'campaign': dim2,
                'medium': dim3,
                'goal1Completions': int(row['metrics'][0]['values'][0]),
                'goal2Completions': int(row['metrics'][0]['values'][1]),
                'goal3Completions': int(row['metrics'][0]['values'][2]),
                'goal4Completions': int(row['metrics'][0]['values'][3]),
                'goal5Completions': int(row['metrics'][0]['values'][4])
            }
            list_rows.append(row_data)
    return pd.DataFrame(list_rows)

def upload_to_bigquery(df, table_name):
    # Create schema for the DataFrame
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("source", "STRING"),
        bigquery.SchemaField("campaign", "STRING"),
        bigquery.SchemaField("medium", "STRING"),
        bigquery.SchemaField("goal1Completions", "INTEGER"),
        bigquery.SchemaField("goal2Completions", "INTEGER"),
        bigquery.SchemaField("goal3Completions", "INTEGER"),
        bigquery.SchemaField("goal4Completions", "INTEGER"),
        bigquery.SchemaField("goal5Completions", "INTEGER")
    ]

    # Check that all schema fields are in the DataFrame
    df_columns = set(df.columns)
    schema_fields = set(field.name for field in schema)
    if not schema_fields.issubset(df_columns):
        raise ValueError(f"Schema fields not present in DataFrame: {schema_fields - df_columns}")

    # Configure BigQuery job to partition the table by the 'date' column
    table_ref = f"{BIGQUERY_PROJECT}.{BIGQUERY_DATASET}.{table_name}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload to BigQuery
    bq_client = bigquery.Client()
    bq_client.load_table_from_dataframe(df, table_ref, job_config=job_config).result()
    print(f"Data uploaded and partitioned by date to {table_ref}")

def main():
    try:
        # UA processing
        analytics = initialize_analyticsreporting()
        ua_response = get_ua_report(analytics)
        ua_df = response_to_dataframe(ua_response)

        # Check DataFrame content
        if ua_df.empty:
            print("No data found in the UA response.")
        else:
            print(f"DataFrame columns: {ua_df.columns}")
            print(f"Sample data:\n{ua_df.head()}")

            # Upload the UA DataFrame to BigQuery
            upload_to_bigquery(ua_df, 'ua_goal_completions')
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()


In [None]:
import pandas as pd
from google.cloud import bigquery
from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
import os
import json

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# UA Variables
SCOPES_UA = ['https://www.googleapis.com/auth/analytics.readonly']
KEY_FILE_LOCATION = config['SERVICE_ACCOUNT_FILE']
VIEW_ID = config['VIEW_ID_UA']
BIGQUERY_PROJECT = config['BIGQUERY_PROJECT']
BIGQUERY_DATASET = config['DATASET_ID']

# Setting up the environment variable for Google Application Credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = KEY_FILE_LOCATION

def initialize_analyticsreporting():
    """Initializes the Google Analytics Reporting API client."""
    credentials = ServiceAccountCredentials.from_json_keyfile_name(KEY_FILE_LOCATION, SCOPES_UA)
    analytics = build('analyticsreporting', 'v4', credentials=credentials)
    return analytics

def get_ua_report(analytics):
    """Fetches the report data from Google Analytics UA."""
    return analytics.reports().batchGet(
        body={
            'reportRequests': [
                {
                    'viewId': VIEW_ID,
                    'dateRanges': [{'startDate': config['UA_INITIAL_FETCH_FROM_DATE'], 'endDate': config['UA_FETCH_TO_DATE']}],
                    'metrics': [
                        {'expression': 'ga:itemQuantity'},
                        {'expression': 'ga:itemRevenue'}
                    ],
                    'dimensions': [{'name': 'ga:date'},
                                   {'name': 'ga:productName'}],
                    'pageSize': 10000000
                }
            ]
        }
    ).execute()

def response_to_dataframe(response):
    list_rows = []

    for report in response.get('reports', []):
        columnHeader = report.get('columnHeader', {})
        dimensionHeaders = columnHeader.get('dimensions', [])
        metricHeaders = columnHeader.get('metricHeader', {}).get('metricHeaderEntries', [])
        for row in report.get('data', {}).get('rows', []):
            try:
                date_value = pd.to_datetime(row['dimensions'][0], format='%Y%m%d')
            except ValueError:
                date_value = pd.NaT
            dim1 = row['dimensions'][1]
            row_data = {
                'date': date_value,
                'productName': dim1,
                'itemQuantity': int(row['metrics'][0]['values'][0]),
                'itemRevenue': float(row['metrics'][0]['values'][1])
            }
            list_rows.append(row_data)
    return pd.DataFrame(list_rows)

def upload_to_bigquery(df, table_name):
    # Create schema for the DataFrame
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("productName", "STRING"),
        bigquery.SchemaField("itemQuantity", "INTEGER"),
        bigquery.SchemaField("itemRevenue", "FLOAT")
    ]

    # Check that all schema fields are in the DataFrame
    df_columns = set(df.columns)
    schema_fields = set(field.name for field in schema)
    if not schema_fields.issubset(df_columns):
        raise ValueError(f"Schema fields not present in DataFrame: {schema_fields - df_columns}")

    # Configure BigQuery job to partition the table by the 'date' column
    table_ref = f"{BIGQUERY_PROJECT}.{BIGQUERY_DATASET}.{table_name}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload to BigQuery
    bq_client = bigquery.Client()
    bq_client.load_table_from_dataframe(df, table_ref, job_config=job_config).result()
    print(f"Data uploaded and partitioned by date to {table_ref}")

def main():
    try:
        # UA processing
        analytics = initialize_analyticsreporting()
        ua_response = get_ua_report(analytics)
        ua_df = response_to_dataframe(ua_response)

        # Check DataFrame content
        if ua_df.empty:
            print("No data found in the UA response.")
        else:
            print(f"DataFrame columns: {ua_df.columns}")
            print(f"Sample data:\n{ua_df.head()}")

            # Upload the UA DataFrame to BigQuery
            upload_to_bigquery(ua_df, 'ua_item_data')
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()

In [None]:
import pandas as pd
from google.cloud import bigquery
from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
import os
import json

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# UA Variables
SCOPES_UA = ['https://www.googleapis.com/auth/analytics.readonly']
KEY_FILE_LOCATION = config['SERVICE_ACCOUNT_FILE']
VIEW_ID = config['VIEW_ID_UA']
BIGQUERY_PROJECT = config['BIGQUERY_PROJECT']
BIGQUERY_DATASET = config['DATASET_ID']

# Setting up the environment variable for Google Application Credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = KEY_FILE_LOCATION

def initialize_analyticsreporting():
    """Initializes the Google Analytics Reporting API client."""
    credentials = ServiceAccountCredentials.from_json_keyfile_name(KEY_FILE_LOCATION, SCOPES_UA)
    analytics = build('analyticsreporting', 'v4', credentials=credentials)
    return analytics

def get_ua_report(analytics):
    """Fetches the report data from Google Analytics UA."""
    return analytics.reports().batchGet(
        body={
            'reportRequests': [
                {
                    'viewId': VIEW_ID,
                    'dateRanges': [{'startDate': config['UA_INITIAL_FETCH_FROM_DATE'], 'endDate': config['UA_FETCH_TO_DATE']}],
                    'metrics': [
                        {'expression': 'ga:sessions'},
                        {'expression': 'ga:users'},
                        {'expression': 'ga:newUsers'},
                        {'expression': 'ga:uniquePurchases'},
                        {'expression': 'ga:transactionRevenue'}
                    ],
                    'dimensions': [{'name': 'ga:date'},
                                   {'name': 'ga:landingPagePath'}],
                    'pageSize': 10000000
                }
            ]
        }
    ).execute()

def response_to_dataframe(response):
    list_rows = []

    for report in response.get('reports', []):
        columnHeader = report.get('columnHeader', {})
        dimensionHeaders = columnHeader.get('dimensions', [])
        metricHeaders = columnHeader.get('metricHeader', {}).get('metricHeaderEntries', [])
        for row in report.get('data', {}).get('rows', []):
            try:
                date_value = pd.to_datetime(row['dimensions'][0], format='%Y%m%d')
            except ValueError:
                date_value = pd.NaT
            dim1 = row['dimensions'][1]
            row_data = {
                'date': date_value,
                'landingPage': dim1,
                'sessions': int(row['metrics'][0]['values'][0]),
                'totalUsers': int(row['metrics'][0]['values'][1]),
                'newUsers': int(row['metrics'][0]['values'][2]),
                'ecommercePurchases': int(row['metrics'][0]['values'][3]),
                'purchaseRevenue': float(row['metrics'][0]['values'][4])
            }
            list_rows.append(row_data)
    return pd.DataFrame(list_rows)

def upload_to_bigquery(df, table_name):
    # Create schema for the DataFrame
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("landingPage", "STRING"),
        bigquery.SchemaField("sessions", "INTEGER"),
        bigquery.SchemaField("totalUsers", "INTEGER"),
        bigquery.SchemaField("newUsers", "INTEGER"),
        bigquery.SchemaField("ecommercePurchases", "INTEGER"),
        bigquery.SchemaField("purchaseRevenue", "FLOAT")
    ]

    # Check that all schema fields are in the DataFrame
    df_columns = set(df.columns)
    schema_fields = set(field.name for field in schema)
    if not schema_fields.issubset(df_columns):
        raise ValueError(f"Schema fields not present in DataFrame: {schema_fields - df_columns}")

    # Configure BigQuery job to partition the table by the 'date' column
    table_ref = f"{BIGQUERY_PROJECT}.{BIGQUERY_DATASET}.{table_name}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload to BigQuery
    bq_client = bigquery.Client()
    bq_client.load_table_from_dataframe(df, table_ref, job_config=job_config).result()
    print(f"Data uploaded and partitioned by date to {table_ref}")

def main():
    try:
        # UA processing
        analytics = initialize_analyticsreporting()
        ua_response = get_ua_report(analytics)
        ua_df = response_to_dataframe(ua_response)

        # Check DataFrame content
        if ua_df.empty:
            print("No data found in the UA response.")
        else:
            print(f"DataFrame columns: {ua_df.columns}")
            print(f"Sample data:\n{ua_df.head()}")

            # Upload the UA DataFrame to BigQuery
            upload_to_bigquery(ua_df, 'ua_landingpage_data')
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()

In [None]:
import pandas as pd
from google.cloud import bigquery
from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
import os
import json

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# UA Variables
SCOPES_UA = ['https://www.googleapis.com/auth/analytics.readonly']
KEY_FILE_LOCATION = config['SERVICE_ACCOUNT_FILE']
VIEW_ID = config['VIEW_ID_UA']
BIGQUERY_PROJECT = config['BIGQUERY_PROJECT']
BIGQUERY_DATASET = config['DATASET_ID']

# Setting up the environment variable for Google Application Credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = KEY_FILE_LOCATION

def initialize_analyticsreporting():
    """Initializes the Google Analytics Reporting API client."""
    credentials = ServiceAccountCredentials.from_json_keyfile_name(KEY_FILE_LOCATION, SCOPES_UA)
    analytics = build('analyticsreporting', 'v4', credentials=credentials)
    return analytics

def get_ua_report(analytics):
    """Fetches the report data from Google Analytics UA."""
    return analytics.reports().batchGet(
        body={
            'reportRequests': [
                {
                    'viewId': VIEW_ID,
                    'dateRanges': [{'startDate': config['UA_INITIAL_FETCH_FROM_DATE'], 'endDate': config['UA_FETCH_TO_DATE']}],
                    'metrics': [
                        {'expression': 'ga:pageviews'},
                        {'expression': 'ga:users'}
                    ],
                    'dimensions': [{'name': 'ga:date'},
                                   {'name': 'ga:pagePath'}],
                    'pageSize': 10000000
                }
            ]
        }
    ).execute()

def response_to_dataframe(response):
    list_rows = []

    for report in response.get('reports', []):
        columnHeader = report.get('columnHeader', {})
        dimensionHeaders = columnHeader.get('dimensions', [])
        metricHeaders = columnHeader.get('metricHeader', {}).get('metricHeaderEntries', [])
        for row in report.get('data', {}).get('rows', []):
            try:
                date_value = pd.to_datetime(row['dimensions'][0], format='%Y%m%d')
            except ValueError:
                date_value = pd.NaT
            dim1 = row['dimensions'][1]
            row_data = {
                'date': date_value,
                'pagePath': dim1,
                'pageviews': int(row['metrics'][0]['values'][0]),
                'users': int(row['metrics'][0]['values'][1])
            }
            list_rows.append(row_data)
    return pd.DataFrame(list_rows)

def upload_to_bigquery(df, table_name):
    # Create schema for the DataFrame
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("pagePath", "STRING"),
        bigquery.SchemaField("pageviews", "INTEGER"),
        bigquery.SchemaField("users", "INTEGER")
    ]

    # Check that all schema fields are in the DataFrame
    df_columns = set(df.columns)
    schema_fields = set(field.name for field in schema)
    if not schema_fields.issubset(df_columns):
        raise ValueError(f"Schema fields not present in DataFrame: {schema_fields - df_columns}")

    # Configure BigQuery job to partition the table by the 'date' column
    table_ref = f"{BIGQUERY_PROJECT}.{BIGQUERY_DATASET}.{table_name}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload to BigQuery
    bq_client = bigquery.Client()
    bq_client.load_table_from_dataframe(df, table_ref, job_config=job_config).result()
    print(f"Data uploaded and partitioned by date to {table_ref}")

def main():
    try:
        # UA processing
        analytics = initialize_analyticsreporting()
        ua_response = get_ua_report(analytics)
        ua_df = response_to_dataframe(ua_response)

        # Check DataFrame content
        if ua_df.empty:
            print("No data found in the UA response.")
        else:
            print(f"DataFrame columns: {ua_df.columns}")
            print(f"Sample data:\n{ua_df.head()}")

            # Upload the UA DataFrame to BigQuery
            upload_to_bigquery(ua_df, 'ua_pagepath_data')
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()

In [None]:
import pandas as pd
from google.cloud import bigquery
from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
import os
import json

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# UA Variables
SCOPES_UA = ['https://www.googleapis.com/auth/analytics.readonly']
KEY_FILE_LOCATION = config['SERVICE_ACCOUNT_FILE']
VIEW_ID = config['VIEW_ID_UA']
BIGQUERY_PROJECT = config['BIGQUERY_PROJECT']
BIGQUERY_DATASET = config['DATASET_ID']

# Setting up the environment variable for Google Application Credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = KEY_FILE_LOCATION

def initialize_analyticsreporting():
    """Initializes the Google Analytics Reporting API client."""
    credentials = ServiceAccountCredentials.from_json_keyfile_name(KEY_FILE_LOCATION, SCOPES_UA)
    analytics = build('analyticsreporting', 'v4', credentials=credentials)
    return analytics

def get_ua_report(analytics):
    """Fetches the report data from Google Analytics UA."""
    return analytics.reports().batchGet(
        body={
            'reportRequests': [
                {
                    'viewId': VIEW_ID,
                    'dateRanges': [{'startDate': config['UA_INITIAL_FETCH_FROM_DATE'], 'endDate': config['UA_FETCH_TO_DATE']}],
                    'metrics': [{'expression': 'ga:pageviews'}],
                    'dimensions': [{'name': 'ga:date'}],
                    'pageSize': 10000000
                }
            ]
        }
    ).execute()

def response_to_dataframe(response):
    list_rows = []

    for report in response.get('reports', []):
        columnHeader = report.get('columnHeader', {})
        dimensionHeaders = columnHeader.get('dimensions', [])
        metricHeaders = columnHeader.get('metricHeader', {}).get('metricHeaderEntries', [])
        for row in report.get('data', {}).get('rows', []):
            row_data = {header: row['dimensions'][i] for i, header in enumerate(dimensionHeaders)}
            for i, metric in enumerate(metricHeaders):
                row_data[metric['name']] = int(row['metrics'][0]['values'][i]) if 'values' in row['metrics'][0] else 0
            list_rows.append(row_data)
    return pd.DataFrame(list_rows)

def upload_to_bigquery(df, table_name):
    # Rename columns to match BigQuery schema
    df = df.rename(columns={
        'ga:date': 'date',
        'ga:pageviews': 'pageviews'
    })

    # Create schema for the DataFrame
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("pageviews", "INTEGER")
    ]

    # Check that all schema fields are in the DataFrame
    df_columns = set(df.columns)
    schema_fields = set(field.name for field in schema)
    if not schema_fields.issubset(df_columns):
        raise ValueError(f"Schema fields not present in DataFrame: {schema_fields - df_columns}")

    # Convert the 'date' column from string to datetime
    df['date'] = pd.to_datetime(df['date']).dt.date

    # Ensure data types match expected BigQuery types
    df['pageviews'] = df['pageviews'].astype(int)

    # Print data types for verification
    print(df.dtypes)

    # Configure BigQuery job to partition the table by the 'date' column
    table_ref = f"{BIGQUERY_PROJECT}.{BIGQUERY_DATASET}.{table_name}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload to BigQuery
    bq_client = bigquery.Client()
    bq_client.load_table_from_dataframe(df, table_ref, job_config=job_config).result()
    print(f"Data uploaded and partitioned by date to {table_ref}")

def main():
    try:
        # UA processing
        analytics = initialize_analyticsreporting()
        ua_response = get_ua_report(analytics)
        ua_df = response_to_dataframe(ua_response)

        # Check DataFrame content
        if ua_df.empty:
            print("No data found in the UA response.")
        else:
            print(f"DataFrame columns: {ua_df.columns}")
            print(f"Sample data:\n{ua_df.head()}")

            # Upload the UA DataFrame to BigQuery
            upload_to_bigquery(ua_df, 'ua_pageviews_data')
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()


In [None]:
import pandas as pd
from google.cloud import bigquery
from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
import os
import json

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# UA Variables
SCOPES_UA = ['https://www.googleapis.com/auth/analytics.readonly']
KEY_FILE_LOCATION = config['SERVICE_ACCOUNT_FILE']
VIEW_ID = config['VIEW_ID_UA']
BIGQUERY_PROJECT = config['BIGQUERY_PROJECT']
BIGQUERY_DATASET = config['DATASET_ID']

# Setting up the environment variable for Google Application Credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = KEY_FILE_LOCATION

def initialize_analyticsreporting():
    """Initializes the Google Analytics Reporting API client."""
    credentials = ServiceAccountCredentials.from_json_keyfile_name(KEY_FILE_LOCATION, SCOPES_UA)
    analytics = build('analyticsreporting', 'v4', credentials=credentials)
    return analytics

def get_ua_report(analytics):
    """Fetches the report data from Google Analytics UA."""
    return analytics.reports().batchGet(
        body={
            'reportRequests': [
                {
                    'viewId': VIEW_ID,
                    'dateRanges': [{'startDate': config['UA_INITIAL_FETCH_FROM_DATE'], 'endDate': config['UA_FETCH_TO_DATE']}],
                    'metrics': [
                        {'expression': 'ga:sessions'},
                        {'expression': 'ga:pageviews'},
                        {'expression': 'ga:users'},
                        {'expression': 'ga:newUsers'}
                    ],
                    'dimensions': [{'name': 'ga:date'},
                                   {'name': 'ga:browser'},
                                   {'name': 'ga:operatingSystem'},
                                   {'name': 'ga:deviceCategory'}],
                    'pageSize': 10000000
                }
            ]
        }
    ).execute()

def response_to_dataframe(response):
    list_rows = []

    for report in response.get('reports', []):
        columnHeader = report.get('columnHeader', {})
        dimensionHeaders = columnHeader.get('dimensions', [])
        metricHeaders = columnHeader.get('metricHeader', {}).get('metricHeaderEntries', [])
        for row in report.get('data', {}).get('rows', []):
            try:
                date_value = pd.to_datetime(row['dimensions'][0], format='%Y%m%d')
            except ValueError:
                date_value = pd.NaT
            dim1 = row['dimensions'][1]
            dim2 = row['dimensions'][2]
            dim3 = row['dimensions'][3]
            row_data = {
                'date': date_value,
                'browser': dim1,
                'operatingSystem': dim2,
                'deviceCategory': dim3,
                'sessions': int(row['metrics'][0]['values'][0]),
                'pageviews': int(row['metrics'][0]['values'][1]),
                'users': int(row['metrics'][0]['values'][2]),
                'newUsers': int(row['metrics'][0]['values'][3])
            }
            list_rows.append(row_data)
    return pd.DataFrame(list_rows)

def upload_to_bigquery(df, table_name):
    # Create schema for the DataFrame
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("browser", "STRING"),
        bigquery.SchemaField("operatingSystem", "STRING"),
        bigquery.SchemaField("deviceCategory", "STRING"),
        bigquery.SchemaField("sessions", "INTEGER"),
        bigquery.SchemaField("pageviews", "INTEGER"),
        bigquery.SchemaField("users", "INTEGER"),
        bigquery.SchemaField("newUsers", "INTEGER")
    ]

    # Check that all schema fields are in the DataFrame
    df_columns = set(df.columns)
    schema_fields = set(field.name for field in schema)
    if not schema_fields.issubset(df_columns):
        raise ValueError(f"Schema fields not present in DataFrame: {schema_fields - df_columns}")

    # Convert the 'date' column from string to datetime
    df['date'] = pd.to_datetime(df['date']).dt.date

    # Ensure data types match expected BigQuery types
    df['sessions'] = df['sessions'].astype(int)
    df['pageviews'] = df['pageviews'].astype(int)
    df['users'] = df['users'].astype(int)
    df['newUsers'] = df['newUsers'].astype(int)

    # Print data types for verification
    print(df.dtypes)

    # Configure BigQuery job to partition the table by the 'date' column
    table_ref = f"{BIGQUERY_PROJECT}.{BIGQUERY_DATASET}.{table_name}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload to BigQuery
    bq_client = bigquery.Client()
    bq_client.load_table_from_dataframe(df, table_ref, job_config=job_config).result()
    print(f"Data uploaded and partitioned by date to {table_ref}")

def main():
    try:
        # UA processing
        analytics = initialize_analyticsreporting()
        ua_response = get_ua_report(analytics)
        ua_df = response_to_dataframe(ua_response)

        # Check DataFrame content
        if ua_df.empty:
            print("No data found in the UA response.")
        else:
            print(f"DataFrame columns: {ua_df.columns}")
            print(f"Sample data:\n{ua_df.head()}")

            # Upload the UA DataFrame to BigQuery
            upload_to_bigquery(ua_df, 'ua_platform_device_data')
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()

In [None]:
import pandas as pd
from google.cloud import bigquery
from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
import os
import json

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# UA Variables
SCOPES_UA = ['https://www.googleapis.com/auth/analytics.readonly']
KEY_FILE_LOCATION = config['SERVICE_ACCOUNT_FILE']
VIEW_ID = config['VIEW_ID_UA']
BIGQUERY_PROJECT = config['BIGQUERY_PROJECT']
BIGQUERY_DATASET = config['DATASET_ID']

# Setting up the environment variable for Google Application Credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = KEY_FILE_LOCATION

def initialize_analyticsreporting():
    """Initializes the Google Analytics Reporting API client."""
    credentials = ServiceAccountCredentials.from_json_keyfile_name(KEY_FILE_LOCATION, SCOPES_UA)
    analytics = build('analyticsreporting', 'v4', credentials=credentials)
    return analytics

def get_ua_report(analytics):
    """Fetches the report data from Google Analytics UA."""
    return analytics.reports().batchGet(
        body={
            'reportRequests': [
                {
                    'viewId': VIEW_ID,
                    'dateRanges': [{'startDate': config['UA_INITIAL_FETCH_FROM_DATE'], 'endDate': config['UA_FETCH_TO_DATE']}],
                    'metrics': [
                        {'expression': 'ga:uniquePurchases'},
                        {'expression': 'ga:transactionRevenue'}
                    ],
                    'dimensions': [{'name': 'ga:date'}],
                    'pageSize': 10000000
                }
            ]
        }
    ).execute()

def response_to_dataframe(response):
    list_rows = []

    for report in response.get('reports', []):
        columnHeader = report.get('columnHeader', {})
        dimensionHeaders = columnHeader.get('dimensions', [])
        metricHeaders = columnHeader.get('metricHeader', {}).get('metricHeaderEntries', [])
        for row in report.get('data', {}).get('rows', []):
            try:
                date_value = pd.to_datetime(row['dimensions'][0], format='%Y%m%d')
            except ValueError:
                date_value = pd.NaT
            row_data = {
                'date': date_value,
                'uniquePurchases': int(row['metrics'][0]['values'][0]),
                'transactionRevenue': float(row['metrics'][0]['values'][1])
            }
            list_rows.append(row_data)
    return pd.DataFrame(list_rows)

def upload_to_bigquery(df, table_name):
    # Create schema for the DataFrame
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("uniquePurchases", "INTEGER"),
        bigquery.SchemaField("transactionRevenue", "FLOAT")
    ]

    # Check that all schema fields are in the DataFrame
    df_columns = set(df.columns)
    schema_fields = set(field.name for field in schema)
    if not schema_fields.issubset(df_columns):
        raise ValueError(f"Schema fields not present in DataFrame: {schema_fields - df_columns}")

    # Convert the 'date' column from string to datetime
    df['date'] = pd.to_datetime(df['date']).dt.date

    # Ensure data types match expected BigQuery types
    df['uniquePurchases'] = df['uniquePurchases'].astype(int)
    df['transactionRevenue'] = df['transactionRevenue'].astype(float)

    # Print data types for verification
    print(df.dtypes)

    # Configure BigQuery job to partition the table by the 'date' column
    table_ref = f"{BIGQUERY_PROJECT}.{BIGQUERY_DATASET}.{table_name}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload to BigQuery
    bq_client = bigquery.Client()
    bq_client.load_table_from_dataframe(df, table_ref, job_config=job_config).result()
    print(f"Data uploaded and partitioned by date to {table_ref}")

def main():
    try:
        # UA processing
        analytics = initialize_analyticsreporting()
        ua_response = get_ua_report(analytics)
        ua_df = response_to_dataframe(ua_response)

        # Check DataFrame content
        if ua_df.empty:
            print("No data found in the UA response.")
        else:
            print(f"DataFrame columns: {ua_df.columns}")
            print(f"Sample data:\n{ua_df.head()}")

            # Upload the UA DataFrame to BigQuery
            upload_to_bigquery(ua_df, 'ua_purchases_data')
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()

In [None]:
import pandas as pd
from google.cloud import bigquery
from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
import os
import json

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# UA Variables
SCOPES_UA = ['https://www.googleapis.com/auth/analytics.readonly']
KEY_FILE_LOCATION = config['SERVICE_ACCOUNT_FILE']
VIEW_ID = config['VIEW_ID_UA']
BIGQUERY_PROJECT = config['BIGQUERY_PROJECT']
BIGQUERY_DATASET = config['DATASET_ID']

# Setting up the environment variable for Google Application Credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = KEY_FILE_LOCATION

def initialize_analyticsreporting():
    """Initializes the Google Analytics Reporting API client."""
    credentials = ServiceAccountCredentials.from_json_keyfile_name(KEY_FILE_LOCATION, SCOPES_UA)
    analytics = build('analyticsreporting', 'v4', credentials=credentials)
    return analytics

def get_ua_report(analytics):
    """Fetches the report data from Google Analytics UA."""
    return analytics.reports().batchGet(
        body={
            'reportRequests': [
                {
                    'viewId': VIEW_ID,
                    'dateRanges': [{'startDate': config['UA_INITIAL_FETCH_FROM_DATE'], 'endDate': config['UA_FETCH_TO_DATE']}],
                    'metrics': [{'expression': 'ga:sessions'}],
                    'dimensions': [{'name': 'ga:date'}],
                    'pageSize': 10000
                }
            ]
        }
    ).execute()

def response_to_dataframe(response):
    list_rows = []

    for report in response.get('reports', []):
        columnHeader = report.get('columnHeader', {})
        dimensionHeaders = columnHeader.get('dimensions', [])
        metricHeaders = columnHeader.get('metricHeader', {}).get('metricHeaderEntries', [])
        for row in report.get('data', {}).get('rows', []):
            try:
                date_value = pd.to_datetime(row['dimensions'][0], format='%Y%m%d')
            except ValueError:
                date_value = pd.NaT
            row_data = {
                'date': date_value,
                'sessions': int(row['metrics'][0]['values'][0])
            }
            list_rows.append(row_data)
    return pd.DataFrame(list_rows)

def upload_to_bigquery(df, table_name):
    # Create schema for the DataFrame
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("sessions", "INTEGER")
    ]

    # Check that all schema fields are in the DataFrame
    df_columns = set(df.columns)
    schema_fields = set(field.name for field in schema)
    if not schema_fields.issubset(df_columns):
        raise ValueError(f"Schema fields not present in DataFrame: {schema_fields - df_columns}")

    # Convert the 'date' column from string to datetime
    df['date'] = pd.to_datetime(df['date']).dt.date

    # Ensure data types match expected BigQuery types
    df['sessions'] = df['sessions'].astype(int)

    # Print data types for verification
    print(df.dtypes)

    # Configure BigQuery job to partition the table by the 'date' column
    table_ref = f"{BIGQUERY_PROJECT}.{BIGQUERY_DATASET}.{table_name}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload to BigQuery
    bq_client = bigquery.Client()
    bq_client.load_table_from_dataframe(df, table_ref, job_config=job_config).result()
    print(f"Data uploaded and partitioned by date to {table_ref}")

def main():
    try:
        # UA processing
        analytics = initialize_analyticsreporting()
        ua_response = get_ua_report(analytics)
        ua_df = response_to_dataframe(ua_response)

        # Check DataFrame content
        if ua_df.empty:
            print("No data found in the UA response.")
        else:
            print(f"DataFrame columns: {ua_df.columns}")
            print(f"Sample data:\n{ua_df.head()}")

            # Upload the UA DataFrame to BigQuery
            upload_to_bigquery(ua_df, 'ua_sessions_data')
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()

In [None]:
import pandas as pd
from google.cloud import bigquery
from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
import os
import json

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# UA Variables
SCOPES_UA = ['https://www.googleapis.com/auth/analytics.readonly']
KEY_FILE_LOCATION = config['SERVICE_ACCOUNT_FILE']
VIEW_ID = config['VIEW_ID_UA']
BIGQUERY_PROJECT = config['BIGQUERY_PROJECT']
BIGQUERY_DATASET = config['DATASET_ID']

# Setting up the environment variable for Google Application Credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = KEY_FILE_LOCATION

def initialize_analyticsreporting():
    """Initializes the Google Analytics Reporting API client."""
    credentials = ServiceAccountCredentials.from_json_keyfile_name(KEY_FILE_LOCATION, SCOPES_UA)
    analytics = build('analyticsreporting', 'v4', credentials=credentials)
    return analytics

def get_ua_report(analytics):
    """Fetches the report data from Google Analytics UA."""
    return analytics.reports().batchGet(
        body={
            'reportRequests': [
                {
                    'viewId': VIEW_ID,
                    'dateRanges': [{'startDate': config['UA_INITIAL_FETCH_FROM_DATE'], 'endDate': config['UA_FETCH_TO_DATE']}],
                    'metrics': [
                        {'expression': 'ga:sessions'},
                        {'expression': 'ga:users'},
                        {'expression': 'ga:newUsers'},
                        {'expression': 'ga:uniquePurchases'},
                        {'expression': 'ga:transactionRevenue'}
                    ],
                    'dimensions': [
                        {'name': 'ga:date'},
                        {'name': 'ga:source'},
                        {'name': 'ga:campaign'},
                        {'name': 'ga:medium'}
                    ],
                    'pageSize': 10000000
                }
            ]
        }
    ).execute()

def response_to_dataframe(response):
    list_rows = []

    for report in response.get('reports', []):
        columnHeader = report.get('columnHeader', {})
        dimensionHeaders = columnHeader.get('dimensions', [])
        metricHeaders = columnHeader.get('metricHeader', {}).get('metricHeaderEntries', [])
        for row in report.get('data', {}).get('rows', []):
            try:
                date_value = pd.to_datetime(row['dimensions'][0], format='%Y%m%d')
            except ValueError:
                date_value = pd.NaT
            row_data = {
                'date': date_value,
                'source': row['dimensions'][1],
                'campaign': row['dimensions'][2],
                'medium': row['dimensions'][3],
                'sessions': int(row['metrics'][0]['values'][0]),
                'users': int(row['metrics'][0]['values'][1]),
                'newUsers': int(row['metrics'][0]['values'][2]),
                'uniquePurchases': int(row['metrics'][0]['values'][3]),
                'transactionRevenue': float(row['metrics'][0]['values'][4])
            }
            list_rows.append(row_data)
    return pd.DataFrame(list_rows)

def upload_to_bigquery(df, table_name):
    # Create schema for the DataFrame
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("source", "STRING"),
        bigquery.SchemaField("campaign", "STRING"),
        bigquery.SchemaField("medium", "STRING"),
        bigquery.SchemaField("sessions", "INTEGER"),
        bigquery.SchemaField("users", "INTEGER"),
        bigquery.SchemaField("newUsers", "INTEGER"),
        bigquery.SchemaField("uniquePurchases", "INTEGER"),
        bigquery.SchemaField("transactionRevenue", "FLOAT")
    ]

    # Check that all schema fields are in the DataFrame
    df_columns = set(df.columns)
    schema_fields = set(field.name for field in schema)
    if not schema_fields.issubset(df_columns):
        raise ValueError(f"Schema fields not present in DataFrame: {schema_fields - df_columns}")

    # Convert the 'date' column from string to datetime
    df['date'] = pd.to_datetime(df['date']).dt.date

    # Ensure data types match expected BigQuery types
    df['sessions'] = df['sessions'].astype(int)
    df['users'] = df['users'].astype(int)
    df['newUsers'] = df['newUsers'].astype(int)
    df['uniquePurchases'] = df['uniquePurchases'].astype(int)
    df['transactionRevenue'] = df['transactionRevenue'].astype(float)

    # Print data types for verification
    print(df.dtypes)

    # Configure BigQuery job to partition the table by the 'date' column
    table_ref = f"{BIGQUERY_PROJECT}.{BIGQUERY_DATASET}.{table_name}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload to BigQuery
    bq_client = bigquery.Client()
    bq_client.load_table_from_dataframe(df, table_ref, job_config=job_config).result()
    print(f"Data uploaded and partitioned by date to {table_ref}")

def main():
    try:
        # UA processing
        analytics = initialize_analyticsreporting()
        ua_response = get_ua_report(analytics)
        ua_df = response_to_dataframe(ua_response)

        # Check DataFrame content
        if ua_df.empty:
            print("No data found in the UA response.")
        else:
            print(f"DataFrame columns: {ua_df.columns}")
            print(f"Sample data:\n{ua_df.head()}")

            # Upload the UA DataFrame to BigQuery
            upload_to_bigquery(ua_df, 'ua_source_campaign_medium_data')
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()

In [None]:
import pandas as pd
from google.cloud import bigquery
from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
import os
import json

# Load configuration from a JSON file
with open("config.json", "r") as f:
    config = json.load(f)

# UA Variables
SCOPES_UA = ['https://www.googleapis.com/auth/analytics.readonly']
KEY_FILE_LOCATION = config['SERVICE_ACCOUNT_FILE']
VIEW_ID = config['VIEW_ID_UA']
BIGQUERY_PROJECT = config['BIGQUERY_PROJECT']
BIGQUERY_DATASET = config['DATASET_ID']

# Setting up the environment variable for Google Application Credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = KEY_FILE_LOCATION

def initialize_analyticsreporting():
    """Initializes the Google Analytics Reporting API client."""
    credentials = ServiceAccountCredentials.from_json_keyfile_name(KEY_FILE_LOCATION, SCOPES_UA)
    analytics = build('analyticsreporting', 'v4', credentials=credentials)
    return analytics

def get_ua_report(analytics):
    """Fetches the report data from Google Analytics UA."""
    return analytics.reports().batchGet(
        body={
            'reportRequests': [
                {
                    'viewId': VIEW_ID,
                    'dateRanges': [{'startDate': config['UA_INITIAL_FETCH_FROM_DATE'], 'endDate': config['UA_FETCH_TO_DATE']}],
                    'metrics': [
                        {'expression': 'ga:users'},
                        {'expression': 'ga:newUsers'}
                    ],
                    'dimensions': [{'name': 'ga:date'}],
                    'pageSize': 10000000
                }
            ]
        }
    ).execute()

def response_to_dataframe(response):
    list_rows = []

    for report in response.get('reports', []):
        columnHeader = report.get('columnHeader', {})
        dimensionHeaders = columnHeader.get('dimensions', [])
        metricHeaders = columnHeader.get('metricHeader', {}).get('metricHeaderEntries', [])
        for row in report.get('data', {}).get('rows', []):
            try:
                date_value = pd.to_datetime(row['dimensions'][0], format='%Y%m%d')
            except ValueError:
                date_value = pd.NaT
            row_data = {
                'date': date_value,
                'totalUsers': int(row['metrics'][0]['values'][0]),
                'newUsers': int(row['metrics'][0]['values'][1])
            }
            list_rows.append(row_data)
    return pd.DataFrame(list_rows)

def upload_to_bigquery(df, table_name):
    # Create schema for the DataFrame
    schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("totalUsers", "INTEGER"),
        bigquery.SchemaField("newUsers", "INTEGER")
    ]

    # Check that all schema fields are in the DataFrame
    df_columns = set(df.columns)
    schema_fields = set(field.name for field in schema)
    if not schema_fields.issubset(df_columns):
        raise ValueError(f"Schema fields not present in DataFrame: {schema_fields - df_columns}")

    # Convert the 'date' column from string to datetime
    df['date'] = pd.to_datetime(df['date']).dt.date

    # Ensure data types match expected BigQuery types
    df['totalUsers'] = df['totalUsers'].astype(int)
    df['newUsers'] = df['newUsers'].astype(int)

    # Print data types for verification
    print(df.dtypes)

    # Configure BigQuery job to partition the table by the 'date' column
    table_ref = f"{BIGQUERY_PROJECT}.{BIGQUERY_DATASET}.{table_name}"
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='date'
        )
    )

    # Upload to BigQuery
    bq_client = bigquery.Client()
    bq_client.load_table_from_dataframe(df, table_ref, job_config=job_config).result()
    print(f"Data uploaded and partitioned by date to {table_ref}")

def main():
    try:
        # UA processing
        analytics = initialize_analyticsreporting()
        ua_response = get_ua_report(analytics)
        ua_df = response_to_dataframe(ua_response)

        # Check DataFrame content
        if ua_df.empty:
            print("No data found in the UA response.")
        else:
            print(f"DataFrame columns: {ua_df.columns}")
            print(f"Sample data:\n{ua_df.head()}")

            # Upload the UA DataFrame to BigQuery
            upload_to_bigquery(ua_df, 'ua_users_newusers_data')
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()