# Define Parameters

In [None]:
import os

# Define input and output parameters
OUTPUT_SESSION_TABLE_NAME = os.getenv("SQLDB_SESSION")
OUTPUT_CHANNEL_TABLE_NAME = os.getenv("SQLDB_CHANNEL")
PROPERTY_IDs = {1:os.getenv("GA_US"), 2:os.getenv("GA_UK"), 3:os.getenv("GA_AU")}

#Define Functions

In [None]:
import os
import pandas as pd
import pyspark.sql.functions as F
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import (
    DateRange,
    Dimension,
    Metric,
    RunReportRequest,
)

# Set environment variables
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = dbutils.secrets.get(scope="azure_key_vault", key='GA-PW')

# Define functions to query report from GA4
def query_visits_and_sessions_report(property_id, startDate, endDate):
    client = BetaAnalyticsDataClient()

    request = RunReportRequest(
        property=f"properties/{property_id}",
        dimensions=[Dimension(name="date")],
        metrics=[
                 Metric(name="totalUsers"),
                 Metric(name="sessions"),
                 Metric(name="transactions"),
                 Metric(name="screenPageViews"),
                ],
        date_ranges=[DateRange(start_date=startDate, end_date=endDate)],
    )

    response = client.run_report(request)

    output = []
    for row in response.rows:
        output.append(
            {
                "sales_date": row.dimension_values[0].value, 
                "visits": row.metric_values[0].value, 
                "sessions": row.metric_values[1].value, 
                "checkouts": row.metric_values[2].value, 
                "page_views": row.metric_values[3].value,
            }
        )
    df = pd.DataFrame(output)
    return df

def query_channels_acquisition_report(property_id, startDate, endDate):
    client = BetaAnalyticsDataClient()

    request = RunReportRequest(
        property=f"properties/{property_id}",
        dimensions=[
                    Dimension(name="date"),
                    Dimension(name="sessionDefaultChannelGroup"),
                    Dimension(name="transactionId"),
                    ],
        metrics=[
                #  Metric(name="totalUsers"),
                 Metric(name="newUsers"),
                 Metric(name="sessions"),
                 Metric(name="bounceRate"),
                 Metric(name="screenPageViewsPerSession"),
                 Metric(name="averageSessionDuration"),
                 Metric(name="transactionsPerPurchaser"),
                 Metric(name="transactions"),
                 Metric(name="totalRevenue"),
                 Metric(name="screenPageViews"),
                ],
        date_ranges=[DateRange(start_date=startDate, end_date=endDate)],
    )

    response = client.run_report(request)

    output = []
    for row in response.rows:
        output.append(
            {
                "sales_date": row.dimension_values[0].value,
                "channel_grouping": row.dimension_values[1].value,
                "shopify_order_id": row.dimension_values[2].value,

                # "users": row.metric_values[0].value, 
                "new_users": row.metric_values[0].value, 
                "sessions": row.metric_values[1].value, 
                "bounce_rate": row.metric_values[2].value,
                "pages_per_session": row.metric_values[3].value, 
                "avg_session_duration": row.metric_values[4].value, 
                "ecomm_conversion_rate": row.metric_values[5].value, 
                "transactions": row.metric_values[6].value,
                "revenue": row.metric_values[7].value, 
                "pageviews": row.metric_values[8].value,
            }
        )
    df = pd.DataFrame(output)
    return df

# Load data from GA

In [None]:
# Concat the reports from all sales channels
visits_and_sessions_report_dfs = []
for sales_channel_id, property_id in PROPERTY_IDs.items():
    df = query_visits_and_sessions_report(property_id, "30daysAgo", "yesterday") #30daysAgo   yesterday
    df['sales_channel_id'] = sales_channel_id
    visits_and_sessions_report_dfs.append(df)

visits_and_sessions_df = pd.concat(visits_and_sessions_report_dfs)

channels_acquisition_report_dfs = []
for sales_channel_id, property_id in PROPERTY_IDs.items():
    df = query_channels_acquisition_report(property_id, "2daysAgo", "yesterday")
    df['sales_channel_id'] = sales_channel_id
    channels_acquisition_report_dfs.append(df)

channels_acquisition_df = pd.concat(channels_acquisition_report_dfs)

#Load the results to DB

In [None]:
# Define Azure SQL Database connection
jdbcHostname = os.getenv("SQLDB_HOST")
user = os.getenv("SQLDB_USER")
password = dbutils.secrets.get(scope="azure_key_vault", key='SQLDB-PW') # use Azure Key Vault to save this password. 
jdbcDatabase = os.getenv("SQLDB_DB")
jdbcPort = 1433
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
connectionProperties = {
"user" : user,
"password" : password,
"driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

In [None]:
# Get the current Pacific Time
current_timestamp_pt = F.from_utc_timestamp(F.current_timestamp(), "America/Los_Angeles")

# Output the results with the timestamp
spark_visits_and_sessions_df = spark.createDataFrame(visits_and_sessions_df).withColumn('RecordCreatedDate', current_timestamp_pt)
spark_channels_acquisition_df = spark.createDataFrame(channels_acquisition_df).withColumn('RecordCreatedDate', current_timestamp_pt)

# Save the results into Azure SQL Database
spark_visits_and_sessions_df.write.jdbc(url=jdbcUrl, table=OUTPUT_SESSION_TABLE_NAME, mode = "overwrite", properties=connectionProperties)
spark_channels_acquisition_df.write.jdbc(url=jdbcUrl, table=OUTPUT_CHANNEL_TABLE_NAME, mode = "overwrite", properties=connectionProperties)

#References

https://developers.google.com/analytics/devguides/reporting/data/v1/quickstart-client-libraries#python

https://automation-help.com/google-analytics-data-api-ga4-with-python-extensive-guide/

https://www.lupagedigital.com/blog/google-analytics-api-python/