# Aggregate and save to parquet

## 1. Aggregate tenant counts in different countries

In [None]:
import org.apache.spark.sql.expressions.Window
// paths
val deltaTablesPath = "abfss://xxx.dfs.core.windows.net/deltaTables"
val parquetsPath = "abfss://xxx.dfs.core.windows.net/countryParquet"

val currentSnapshot = spark.read.format("delta").load(deltaTablesPath)

// step 01: deduplicate
val w = Window.partitionBy("id").orderBy(desc("createTime"))
val dudupDF = currentSnapshot.withColumn("rank",dense_rank().over(w)).
    where(col("rank") === 1).drop(col("rank"))

// step 02: aggregation
val aggregationResult = dudupDF.
    where(col("licenseStatus") === "Active").
    groupBy(col("country")).count()

// step 03: save as parquet
aggregationResult.coalesce(1).
    write.mode("overwrite").parquet(parquetsPath)

val folderPath = "abfss://xxx.dfs.core.windows.net/countryParquet/"
val files = mssparkutils.fs.ls(folderPath)

val oldParquetName = folderPath + files.filter(_.name.endsWith("parquet"))(0).name
val newParquetName = folderPath + "countryForBi.parquet"

mssparkutils.fs.cp(oldParquetName, newParquetName)
mssparkutils.fs.rm(oldParquetName)

## 2. Aggregate active/disabled tenant counts

In [None]:
// paths
val deltaTablesPath = "abfss://xxx.dfs.core.windows.net/deltaTables"
val parquetsPath = "abfss://xxx.dfs.core.windows.net/statusParquet"

val currentSnapshot = spark.read.format("delta").load(deltaTablesPath)

// step 01: deduplicate
val w = Window.partitionBy("id").orderBy(desc("createTime"))
val dudupDF = currentSnapshot.withColumn("rank",dense_rank().over(w)).
    where(col("rank") === 1).drop(col("rank"))

// step 02: aggregation
val aggregationResult = dudupDF.groupBy(col("licenseStatus")).count()

// step 03: save as parquet
aggregationResult.coalesce(1).
    write.mode("overwrite").parquet(parquetsPath)

val folderPath = "abfss://xxx.dfs.core.windows.net/statusParquet/"
val files = mssparkutils.fs.ls(folderPath)

val oldParquetName = folderPath + files.filter(_.name.endsWith("parquet"))(0).name
val newParquetName = folderPath + "statusForBi.parquet"

mssparkutils.fs.cp(oldParquetName, newParquetName)
mssparkutils.fs.rm(oldParquetName)

## 3. Aggregate onboard tenant counts every minute

In [None]:
// paths
val deltaTablesPath = "abfss://xxx.dfs.core.windows.net/deltaTables"
val parquetsPath = "abfss://xxx.dfs.core.windows.net/minuteCountParquet"

val currentSnapshot = spark.read.format("delta").load(deltaTablesPath)

// step 01: do aggregation w/o dedup
val aggregationResult = currentSnapshot.where(col("licenseStatus") === "Active").
    groupBy(window(col("createTime"), "60 seconds")).
    agg(count("id") as "tenantOnboardCount").
    select("window.start", "window.end", "tenantOnboardCount").
    withColumn("intervalTime",col("start") + expr("INTERVAL 30 seconds")).
    drop("start", "end")

// step 02: save as parquet
aggregationResult.coalesce(1).
    write.mode("overwrite").parquet(parquetsPath)

val folderPath = "abfss://xxx.dfs.core.windows.net/minuteCountParquet/"
val files = mssparkutils.fs.ls(folderPath)

val oldParquetName = folderPath + files.filter(_.name.endsWith("parquet"))(0).name
val newParquetName = folderPath + "minuteCountForBi.parquet"

mssparkutils.fs.cp(oldParquetName, newParquetName)
mssparkutils.fs.rm(oldParquetName)