In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from itertools import combinations

In [0]:
# JOB CONFIGURATION FOR SPARK AGGREGATION
# THE JOB CONFIGURATION CAN BE CHANGED HERE CAN ALSO INTEGRATE IN ANY GIST OR JSON EXTERNALLY AND GET THE CONFIG FILE
# THIS TRANSFORMATION CAN BE HELD USING THE JOB CONFIGURATION AS MENTIONED
#for testing
job_config = {
    "source": "bigquery-public-data.google_analytics_sample.ga_sessions_*",
    "granularity": ["hourly","daily", "weekly", "monthly", "quarterly", "yearly"],
    # Aggregation list
    "kpis": [
        {"name": "Total visits", "sql": "sum(visits)"},
        {"name": "Total hits", "sql": "sum(hits)"},
        {"name": "Total pageviews", "sql": "sum(pageviews)"},
        {"name": "Revenue per visit", "sql": "sum(transactionRevenue)/sum(visits)"},
    ],
    # List of columns to track
    "dimensions": [
        "country",
        "browser",
        "deviceCategory",
    ],
    # Depth
    "depth": 2,
}

In [0]:
#LIST OF ALL KPIS AND DIMENSIONS
'''job_config = {
    "source": "bigquery-public-data.google_analytics_sample.ga_sessions_*",
    "granularity": ["daily","hourly","weekly", "monthly", "quarterly", "yearly"],
    # Aggregation list
    "kpis" : [
    {"name": "Bounce count", "sql": "sum(bounces)"},
    {"name": "Conversion rate", "sql": "(sum(transaction)/count(*))*100"},
    {"name": "Total visits", "sql": "sum(visits)"},
    {"name": "Total hits", "sql": "sum(hits)"},
    {"name": "Total pageviews", "sql": "sum(pageviews)"},
    {"name": "Avg time on site", "sql": "avg(timeOnSite)"},
    {"name": "Revenue per visit", "sql": "sum(transactionRevenue)/sum(visits)"},
    {"name": "Unique visitors", "sql": "count(DISTINCT visitorId)"},
    {"name": "Avg pageviews per visit", "sql": "sum(pageviews)/sum(visits)"},
    {"name": "Bounce rate", "sql": "(sum(bounces)/sum(visits))*100"},
    {"name": "Conversion rate per visit", "sql": "(sum(transaction)/sum(visits))*100"},
    {"name": "Revenue per transaction", "sql": "sum(transactionRevenue)/sum(transaction)"},
    {"name": "Pageviews per hit", "sql": "sum(pageviews)/sum(hits)"},
    {"name": "Avg session quality", "sql": "avg(sessionQualityDim)"},
    {"name": "Bounce rate per hit", "sql": "(sum(bounces)/sum(hits))*100"},
    {"name": "Avg hits per visit", "sql": "sum(hits)/sum(visits)"},
    {"name": "Avg pageviews per hit", "sql": "sum(pageviews)/sum(hits)"},
    {"name": "Avg time per pageview", "sql": "avg(timeOnSite)/sum(pageviews)"},
    {"name": "Avg time per hit", "sql": "avg(timeOnSite)/sum(hits)"},
    {"name": "Interaction rate", "sql": "(sum(isInteraction)/count(*))*100"},
    {"name": "Unique page paths", "sql": "count(DISTINCT pagePath)"},
    {"name": "Unique hostnames", "sql": "count(DISTINCT hostname)"},
    {"name": "Unique page titles", "sql": "count(DISTINCT pageTitle)"},
    {"name": "Avg screen depth", "sql": "avg(screenDepth)"},
    {"name": "Avg steps per transaction", "sql": "sum(step)/sum(transaction)"},
    {"name": "Unique content group 1", "sql": "count(DISTINCT contentGroup1)"},
    {"name": "Unique content group 2", "sql": "count(DISTINCT contentGroup2)"},
    {"name": "Unique previous content group 1", "sql": "count(DISTINCT previousContentGroup1)"},
    {"name": "Unique previous content group 2", "sql": "count(DISTINCT previousContentGroup2)"},
    {"name": "Unique data sources", "sql": "count(DISTINCT dataSource)"},
    {"name": "Unique full visitor IDs", "sql": "count(DISTINCT fullVisitorId)"}],
    # List of columns to track
    "dimensions": [
    "country",
    "browser",
    "deviceCategory",
    "operatingSystem",
    "channelGrouping",
    "medium",
    "continent",
    "subContinent",
    "networkDomain",
    "referralPath",
    "source",
    "hostname",
    "pagePath",
    "pageTitle",
    "screenName",
    "landingScreenName",
    "exitScreenName",
    "contentGroup1",
    "contentGroup2",
    "previousContentGroup1",
    "previousContentGroup2",
    "socialNetwork",
    "dataSource",
    "socialEngagementType",
    ],
    # Depth
    "depth": 2,
}'''


Out[4]: 'job_config = {\n    "source": "bigquery-public-data.google_analytics_sample.ga_sessions_*",\n    "granularity": ["daily","hourly","weekly", "monthly", "quarterly", "yearly"],\n    # Aggregation list\n    "kpis" : [\n    {"name": "Bounce count", "sql": "sum(bounces)"},\n    {"name": "Conversion rate", "sql": "(sum(transaction)/count(*))*100"},\n    {"name": "Total visits", "sql": "sum(visits)"},\n    {"name": "Total hits", "sql": "sum(hits)"},\n    {"name": "Total pageviews", "sql": "sum(pageviews)"},\n    {"name": "Avg time on site", "sql": "avg(timeOnSite)"},\n    {"name": "Revenue per visit", "sql": "sum(transactionRevenue)/sum(visits)"},\n    {"name": "Unique visitors", "sql": "count(DISTINCT visitorId)"},\n    {"name": "Avg pageviews per visit", "sql": "sum(pageviews)/sum(visits)"},\n    {"name": "Bounce rate", "sql": "(sum(bounces)/sum(visits))*100"},\n    {"name": "Conversion rate per visit", "sql": "(sum(transaction)/sum(visits))*100"},\n    {"name": "Revenue per transa

In [0]:
#LOAD THE SELECTED DATA FROM EXTRACTED BIGQUERY DATASET
delta_path = "dbfs:/user/hive/warehouse/extract_df"
df = spark.read.format("delta").load(delta_path)

In [0]:
#checkpoint 1 - aggregated values 
kpis = job_config["kpis"]
kpi_names = [kpi["name"] for kpi in kpis]
aggregated_df = df.groupBy("timestamp_daily").agg(
    *[F.expr(kpi["sql"]).alias(kpi["name"]) for kpi in job_config["kpis"]]
)
#display(aggregated_df)

In [0]:
#CHECKPOINT 2&3 - groupby multi-granularity and dimensions and aggregate for the subset data
#Limiting the dataframe to 6 row  for testing ##########
ts_df = spark.createDataFrame([], schema="Timestamp STRING, Granularity STRING, Aggregation STRING, Metric DOUBLE, Dimensions ARRAY<STRING>")
for granularity in job_config['granularity']:
    for kpi in job_config['kpis']:
        kpi_name = kpi['name']
        kpi_sql = kpi['sql']
        
        for depth in range(0, job_config['depth'] + 1):
            for dimensions in combinations(job_config['dimensions'], depth):
                dimension_details = [F.concat(F.lit(dim), F.lit("="), F.col(dim)).alias(dim) for dim in dimensions]
                dim_cols = [F.col(dim).alias(dim) for dim in dimensions]
                kpi_expr = F.coalesce(F.expr(kpi['sql']), F.lit(0)).alias("Metric")                
                grouped_df = df.groupBy(F.col(f"timestamp_{granularity}"), *dimension_details).agg(kpi_expr)
                subset_df = grouped_df.select(
                    F.col(f"timestamp_{granularity}").alias("Timestamp"),
                    F.lit(granularity).alias("Granularity"),
                    F.lit(kpi_name).alias("Aggregation"),
                    F.col('Metric'),
                    F.array(*dimension_details).alias("Dimensions")
                    #having error in orinting columname=columnname=value to fix this call dim_cols it will be fixed 
                    #F.array(*dim_cols).alias("Dimensions")
                )
                ts_df = ts_df.union(subset_df)

#display(ts_df)

In [0]:
#display checkpoint 1
#display(aggregated_df)

timestamp_daily,Total visits,Total hits,Total pageviews,Revenue per visit
2016-08-01T00:00:00,1711,13006,9843,3574552.893045003
2016-08-03T00:00:00,2890,17726,13724,
2016-08-02T00:00:00,2140,15522,11784,636070.0934579439
2017-07-01T00:00:00,2048,7975,6562,41279.296875
2017-07-02T00:00:00,1895,6618,5637,335087.07124010555


In [0]:
#display-checkpoint 2& 3
#display(ts_df)

Timestamp,Granularity,Aggregation,Metric,Dimensions
2016-08-04T02:00:00,hourly,Total visits,114.0,List()
2016-08-02T01:00:00,hourly,Total visits,61.0,List()
2016-08-01T16:00:00,hourly,Total visits,95.0,List()
2016-08-01T21:00:00,hourly,Total visits,98.0,List()
2016-08-02T05:00:00,hourly,Total visits,58.0,List()
2016-08-02T17:00:00,hourly,Total visits,143.0,List()
2016-08-03T02:00:00,hourly,Total visits,80.0,List()
2016-08-03T08:00:00,hourly,Total visits,83.0,List()
2016-08-02T06:00:00,hourly,Total visits,47.0,List()
2016-08-03T22:00:00,hourly,Total visits,131.0,List()


In [0]:
#CREATE A TEMPORARY DELTA TABLE TO LOAD IN DATABASE
spark.sql("DROP TABLE IF EXISTS timeseries_load")
ts_df.write.format("delta").saveAsTable("timeseries_load")