In [0]:
depth = dbutils.widgets.get("depth")
dimensions = dbutils.widgets.get("dimensions")
kpis = dbutils.widgets.get("kpis")
granularity = dbutils.widgets.get("granularity")
source = dbutils.widgets.get("source")

In [0]:
import json

job_config = {
    "source": json.loads(source),
    "granularity": granularity,
    "kpis": json.loads(kpis),
    "dimensions": json.loads(dimensions),
    "depth": int(depth)
}
# hourly, daily, weekly, monthly, quarterly, and yearly.
print(job_config)

In [0]:
import configparser

db_properties={}
config = configparser.ConfigParser()
config.read("db_properties.ini")
db_prop = config['postgresql']
db_properties['user']=db_prop['user']
db_properties['password']=db_prop['password']
db_properties['url']=db_prop['url']
db_properties['dbtable']=db_prop['dbtable']
db_properties['driver']=db_prop['driver']

In [0]:
from datetime import date, timedelta
from datetime import datetime

def daterange(start_date, end_date):
    for n in range(int((end_date - start_date).days)):
        yield start_date + timedelta(n)

def get_dataset_for_date(date):
    table = f'bigquery-public-data.google_analytics_sample.ga_sessions_{date}'
    return spark.read.format("bigquery") \
            .option("table",table) \
            .option("project", 'daniel-343806') \
            .option("parentProject", 'daniel-343806') \
            .load() \
            .limit(20)

def get_dataset_for_range(start_date, end_date):
    df = get_dataset_for_date(start_date.strftime("%Y%m%d"))
    for single_date in daterange(start_date + timedelta(days=1), end_date + timedelta(days=1)):
        df = df.union(get_dataset_for_date(single_date.strftime("%Y%m%d")))
    return df 

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from itertools import combinations

# Initialize a Spark session
spark = SparkSession.builder.appName("DataAggregation").getOrCreate()

start_date = datetime.strptime(job_config["source"]["start_date"], '%d/%m/%Y')
end_date = datetime.strptime(job_config["source"]["end_date"], '%d/%m/%Y')
df = get_dataset_for_range(start_date, end_date)

# Convert the "timestamp" column to a timestamp type
df = df.withColumn("timestamp", to_timestamp(col("date"), "yyyyMMdd"))

dimensions = [com for sub in range(-1, job_config["depth"]) for com in combinations(job_config["dimensions"], sub + 1)]

for dimension in dimensions:
    for kpi in job_config["kpis"]:
        window_interval = "1 day"
        if job_config["granularity"] == "weekly":
            window_interval = "7 days"
        elif job_config["granularity"] == "monthly":
            window_interval = "30 days"
        elif job_config["granularity"] == "quarterly":
            window_interval = "91 days"
        elif job_config["granularity"] == "yearly":
            window_interval = "365 days"
        df_grouped = df.groupBy(window("timestamp", window_interval).alias("timestamp"), *dimension) \
                        .agg(
                            coalesce(expr(kpi["sql"]), lit(0)) \
                            .alias(kpi["name"])
                        ) \
                        .withColumn("dimensions", concat(
                            lit("["),
                            concat_ws(", ",
                                *[concat_ws("=", lit(d), col(d.split('.')[-1])) for d in dimension]
                            ),
                            lit("]")
                        )) \
                        .withColumn("aggregation", lit(kpi["name"])) \
                        .withColumn("metric", col(kpi["name"])) \
                        .select("timestamp", "aggregation", "metric", "dimensions")
        # df_grouped.show()
        df_grouped.write \
            .jdbc(
                url=db_properties['url'], 
                table=db_properties['dbtable'],
                mode='append',
                properties=db_properties
            )


+--------------------+-----------+------+----------+
|           timestamp|aggregation|metric|dimensions|
+--------------------+-----------+------+----------+
|{2017-07-25 00:00...|Users count|    20|        []|
|{2017-07-26 00:00...|Users count|    20|        []|
|{2017-07-27 00:00...|Users count|    19|        []|
|{2017-07-28 00:00...|Users count|    19|        []|
|{2017-07-29 00:00...|Users count|    20|        []|
|{2017-07-30 00:00...|Users count|    20|        []|
+--------------------+-----------+------+----------+

+--------------------+------------+------+----------+
|           timestamp| aggregation|metric|dimensions|
+--------------------+------------+------+----------+
|{2017-07-25 00:00...|Bounce count|     0|        []|
|{2017-07-26 00:00...|Bounce count|     0|        []|
|{2017-07-27 00:00...|Bounce count|     0|        []|
|{2017-07-28 00:00...|Bounce count|    20|        []|
|{2017-07-29 00:00...|Bounce count|     0|        []|
|{2017-07-30 00:00...|Bounce count|  

In [0]:
spark.read \
    .jdbc(
        url=db_properties['url'], 
        table=db_properties['dbtable'],
        properties=db_properties
    ) \
    .show()

+-------------------+---------------+------+--------------------+
|          timestamp|    aggregation|metric|          dimensions|
+-------------------+---------------+------+--------------------+
|2017-07-25 00:00:00|    Users count|     1|                  []|
|2017-07-26 00:00:00|    Users count|     1|                  []|
|2017-07-27 00:00:00|    Users count|     1|                  []|
|2017-07-28 00:00:00|    Users count|     1|                  []|
|2017-07-29 00:00:00|    Users count|     1|                  []|
|2017-07-30 00:00:00|    Users count|     1|                  []|
|2017-07-25 00:00:00|   Bounce count|     0|                  []|
|2017-07-26 00:00:00|   Bounce count|     0|                  []|
|2017-07-27 00:00:00|   Bounce count|     0|                  []|
|2017-07-28 00:00:00|   Bounce count|     1|                  []|
|2017-07-29 00:00:00|   Bounce count|     0|                  []|
|2017-07-30 00:00:00|   Bounce count|     1|                  []|
|2017-07-2