In [29]:
%%spark
println("Application Id: " + spark.sparkContext.applicationId )
println("Application Name: " + spark.sparkContext.appName)

StatementMeta(sparkpoolag, 25, 30, Finished, Available)

Application Id: application_1700214569049_0001
Application Name: SitesPermsDashboard_SiteEnhancedWithPerms_sparkpoolag_1700214475


## Set up variables

These initial variable values come from the notebook parameters

In [33]:

// var runId = "93a19c30-e8b2-4af4-883b-8752fead65e4"
// val windowStartTime  = "2023-10-31T00:00:00Z"
// val windowEndTime = "2023-10-31T00:00:00Z"
// val storageAccountName = "mgdcag" 
// val storageContainerName = "sites-permissions-dashbaord"
val retainForHistoricTrending: Boolean = true

// If start and end date are the same we are performing a full pull
val fullPull: Boolean = windowStartTime == windowEndTime




StatementMeta(sparkpoolag, 25, 34, Finished, Available)

runId: String = 93a19c30-e8b2-4af4-883b-8752fead65e4
windowStartTime: String = 2023-10-31T00:00:00Z
windowEndTime: String = 2023-10-31T00:00:00Z
storageAccountName: String = mgdcag
storageContainerName: String = sites-permissions-dashbaord
retainForHistoricTrending: Boolean = false
fullPull: Boolean = true


In [55]:
import java.text.SimpleDateFormat
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

val standardDatePattern: String = "yyyy-MM-dd'T'HH:mm:ss'Z'"
val windowStartDateTimeLocal: LocalDateTime =
      LocalDateTime.parse(windowStartTime, DateTimeFormatter.ofPattern(standardDatePattern))
val windowEndTimeLocal: LocalDateTime =
      LocalDateTime.parse(windowEndTime, DateTimeFormatter.ofPattern(standardDatePattern))

// set your storage account connection

val timeDirFormatter = DateTimeFormatter.ofPattern("yyyy/MM/dd")
val yearMonthDayFormat = windowStartDateTimeLocal.format(timeDirFormatter).stripSuffix("/")
val yearMonthDayFormatEnd = windowEndTimeLocal.format(timeDirFormatter).stripSuffix("/")

val adls_path = f"abfss://$storageContainerName@$storageAccountName.dfs.core.windows.net"

val spSites = adls_path + s"/raw/sites/$yearMonthDayFormatEnd/$runId/"
val spPermissions = adls_path + s"/raw/permissions/$yearMonthDayFormatEnd/$runId/"

val latestSPSitesEnhanced = adls_path + s"/latest/sites/"
val latestSPPermissions = adls_path + s"/latest/permissions/"


val sitesArchive = adls_path + s"/archive/sites/$yearMonthDayFormatEnd/$runId/"
val permsArchive = adls_path + s"/archive/permissions/$yearMonthDayFormatEnd/$runId/"



spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

StatementMeta(sparkpoolag, 25, 56, Finished, Available)

import java.text.SimpleDateFormat
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
standardDatePattern: String = yyyy-MM-dd'T'HH:mm:ss'Z'
windowStartDateTimeLocal: java.time.LocalDateTime = 2023-10-31T00:00
windowEndTimeLocal: java.time.LocalDateTime = 2023-10-31T00:00
timeDirFormatter: java.time.format.DateTimeFormatter = Value(YearOfEra,4,19,EXCEEDS_PAD)'/'Value(MonthOfYear,2)'/'Value(DayOfMonth,2)
yearMonthDayFormat: String = 2023/10/31
yearMonthDayFormatEnd: String = 2023/10/31
adls_path: String = abfss://sites-permissions-dashbaord@mgdcag.dfs.core.windows.net
spSites: String = abfss://sites-permissions-dashbaord@mgdcag.dfs.core.windows.net/sites/2023/10/31/93a19c30-e8b2-4af4-883b-8752fead65e4/
spPermissions: String = abfss://sites-permissions-dashbaord@mgdcag.dfs.core.windows.net/permissions/2023/10/31/93a19c30-e8b2-4af4-883b-8

## Read the datasets into DFs (Data Frames)
This are the files created by the MGDC copy tool

In [35]:
val permissionsDF = 
    spark
      .read
      .format("json")
      .option("recursiveFileLookup", "false")
      .load(spPermissions)

StatementMeta(sparkpoolag, 25, 36, Finished, Available)

permissionsDF: org.apache.spark.sql.DataFrame = [FileExtension: string, ItemType: string ... 17 more fields]


In [36]:
val sitesDF =
    spark
      .read
      .format("json")
      .option("recursiveFileLookup", "false")
      .load(spSites)

StatementMeta(sparkpoolag, 25, 37, Finished, Available)

sitesDF: org.apache.spark.sql.DataFrame = [BlockAccessFromUnmanagedDevices: boolean, BlockDownloadOfAllFilesOnUnmanagedDevices: boolean ... 27 more fields]


In [49]:
val sitesCount = sitesDF.count()
println(s"The number of sites: $sitesCount")

val permissionsCount = permissionsDF.count()
println(s"The number of permissions objects: $permissionsCount")

StatementMeta(sparkpoolag, 25, 50, Finished, Available)

The number of sites: 235
The number of permissions objects: 1153
sitesCount: Long = 235
permissionsCount: Long = 1153


# Enrich the Data

Pretty sure this is called feature engineering 

## Add coloumns
### Using UDFs (User-Defined Functions):
You can define custom UDFs and use them to create new columns based on your specific logic. 

We will use this to add a boolean coloumn for OneDrive sites 

In [51]:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

// returns true if site is OneDrive
// Slighty different to the example above as I was getting scalla errors
val isOneDrive = udf((siteUrl: String) => siteUrl.contains("-my.sharepoint.com"))

val sitesDFOD = sitesDF.withColumn("OneDriveSite", isOneDrive($"Url"))

StatementMeta(sparkpoolag, 25, 52, Finished, Available)

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
isOneDrive: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$6708/25582520@1f8e042b,BooleanType,List(Some(class[value[0]: string])),Some(class[value[0]: boolean]),None,false,true)
sitesDFOD: org.apache.spark.sql.DataFrame = [BlockAccessFromUnmanagedDevices: boolean, BlockDownloadOfAllFilesOnUnmanagedDevices: boolean ... 28 more fields]


## For joining permissions

We need a way to join permissions. Unfortuantly there is no id so we need to make a composite id using the fileds are that are avalible

the working theorey is that we can use `SiteId + ItemURL + RoleDefinition + LinkId`

In [None]:
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.DataFrame

// Empty for permissions that are not links
val defaultLinkValue = ""

// Define the UDF for creating the composite key
val createCompositeKey = udf((siteId: String, itemUrl: String, roleDefinition: String, linkId: String) =>
  s"$siteId-$itemUrl-$roleDefinition-${Option(linkId).getOrElse(defaultLinkValue)}"
)

// Apply the UDF to create the composite key column
val permissionsDFCK = permissionsDF.withColumn(
  "CompositeKey",
  createCompositeKey($"SiteId", $"ItemURL", $"RoleDefinition", $"LinkId")
)


## Big Value Data Points

This is where the real magic happens. With the data in the DF it's possible to work out previous version storage. What an insight, and we haven't even itterated every object.

In the MGDC sites data set we can make the following assumption

`PreviousVersionSize = TotalSize - TotalFileStreamSize - MetadataSize`

With the addional data we now have we can make a far better assumption. We calcucate storage used in Drive by getting the size used by call the drives. We could probably even remove the metadata size

`PreviousVersionSize = storageUsedInDrives - TotalFileStreamSize`

This is just one example of what we can do with just a few extra toppings to add to this maverlous MGDC flavoured Pizza.

We will use one of the UDFs from the start

In [52]:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

// returns true if site is OneDrive
// Slighty different to the example above as I was getting scalla errors
val previousVersionSize = udf((totalSize: BigInt, totalFileStreamSize: BigInt, metadataSize: BigInt) => 
    totalSize - totalFileStreamSize - metadataSize
)
// Assuming you have a DataFrame called "df"
// TotalSize - TotalFileStreamSize - MetadataSize - storageUsedPreservationHold
val sitesDFODPV = sitesDFOD
    .withColumn("PreviousVersionSize", previousVersionSize($"StorageMetrics.TotalSize", $"StorageMetrics.TotalFileStreamSize", $"StorageMetrics.MetadataSize"))

val pvColoumns: DataFrame = sitesDFODPV.select("Id", "OneDriveSite", "PreviousVersionSize")
// using truncate = flase paramer to see full urls
pvColoumns.show(20, truncate = false)

StatementMeta(sparkpoolag, 25, 53, Finished, Available)

+------------------------------------+------------+-------------------+
|Id                                  |OneDriveSite|PreviousVersionSize|
+------------------------------------+------------+-------------------+
|5b925130-3421-4b81-81cb-2b905b924ff3|false       |1599325            |
|85b545cf-cde7-4ab8-8dec-abe4cb6fa377|false       |1522786            |
|757b7c0a-7d92-4184-8c36-99fcf003bb51|false       |2124450            |
|2be31aed-85d3-47f6-bc02-869575aea623|true        |1805365            |
|a207386f-65da-47a7-ae97-0594fe1a4cae|false       |667519             |
|1949f735-9790-410b-b6ad-a8f41475da3a|false       |5822548            |
|62727fba-be2d-4ab6-903d-b79171ec9fa7|false       |1779858            |
|93c3c5f1-c973-4a67-83b3-d37a741613d2|false       |1391780            |
|c1c2d2f9-bf17-43d9-9a68-04dbd7fa8826|false       |1483342            |
|8594a5ae-161c-4c5a-9dd1-b0d7b76c03ae|false       |2124607            |
|8e1b2ba2-9df2-466b-bc8a-ca12bec60fad|false       |1477505      

## Full Pull or Delta?

Do we need to merge the new data with the exising latest.

If we are perofrming a full pull then we do not. If this is a delta pull then we do

We will pull in the previous latest dataset (which will be a complete picture as of the last scan)

```scala
// Specify the join condition based on the common key(s)
val joinCondition = Seq("commonKeyColumn")

// Perform a left-join to keep all rows from existingDF and overwrite with values from deltaDF where commonKeyColumn matches
val mergedDF = existingDF.join(deltaDF, joinCondition, "left_outer")
  .select(existingDF.columns.map(colName => coalesce(deltaDF(colName), existingDF(colName)).alias(colName)): _*)

// Now, mergedDF contains the updated values from deltaDF where available, and the original values where not updated.

```


## Perms

In [None]:
// Define a function that takes a DataFrame as a parameter and returns a DataFrame
def createFullPermissionsDataFrame(): DataFrame = {

    if (fullPull) {
        println("Full pull, no need to merge deltas")

        return permissionsDFCK

    } else {
        println("Delta pull, we must merge")

        // so for sites the existing will be the latest
        val existingPermsDF =
            spark
            .read
            .format("json")
            .option("recursiveFileLookup", "false")
            .load(latestSPPermissions)

        // the delta will be the DF from the current pull
        val deltaPermsDF = permissionsDFCK

        // JOin condition will be site Id
        val permsJoinCondition = Seq("CompositeKey")

        // Perform a left-join to keep all rows from existingDF and overwrite with values from deltaDF where commonKeyColumn matches
        val mergedPermsDF = existingPermsDF.join(deltaPermsDF, permsJoinCondition, "left_outer")
            .select(existingPermsDF.columns.map(colName => coalesce(deltaPermsDF(colName), existingPermsDF(colName)).alias(colName)): _*)

        // Set sites DF to the new merged
        val fullPerms: DataFrame = mergedPermsDF


        return fullPerms

    }
}

In [None]:
// Define a function that takes a DataFrame as a parameter and returns a DataFrame
def createFullSitesDataFrame(): DataFrame = {

    if (fullPull) {
        println("Full pull, no need to merge deltas")

        return sitesDFODPV

    } else {
        println("Delta pull, we must merge")

        // so for sites the existing will be the latest
        val existingSitesDF =
            spark
            .read
            .format("json")
            .option("recursiveFileLookup", "false")
            .load(latestSPSitesEnhanced)

        // the delta will be the DF from the current pull
        val deltaSitesDF = sitesDFODPV

        // JOin condition will be site Id
        val sitesJoinCondition = Seq("Id")

        // Perform a left-join to keep all rows from existingDF and overwrite with values from deltaDF where commonKeyColumn matches
        val mergedSitesDF = existingSitesDF.join(deltaSitesDF, sitesJoinCondition, "left_outer")
            .select(existingSitesDF.columns.map(colName => coalesce(deltaSitesDF(colName), existingSitesDF(colName)).alias(colName)): _*)

        // Set sites DF to the new merged
        val fullSites: DataFrame = mergedSitesDF


        return fullSites

        // For permissions - parametrise to make this better
        // so for permissions the existing will be the latest
        // val existingPermsDF =
        //     spark
        //     .read
        //     .format("json")
        //     .option("recursiveFileLookup", "false")
        //     .load(latestSPPermissions)

        // // the delta will be the DF from the current pull
        // val deltaPermsDF = permissionsDF

        // // JOin condition will be site Id
        // val sitesJoinCondition = Seq("Id")

        // // Perform a left-join to keep all rows from existingDF and overwrite with values from deltaDF where commonKeyColumn matches
        // val mergedSitesDF = existingSitesDF.join(deltaSitesDF, sitesJoinCondition, "left_outer")
        //     .select(existingDF.columns.map(colName => coalesce(deltaSitesDF(colName), existingSitesDF(colName)).alias(colName)): _*)

        // // Set sites DF to the new merged
        // sitesDF = mergedSitesDF

    }
}

## Drop deletes

We need to drop any delete operations. Or they can be kept to see number of items deleted. Could be useful?

In [None]:
val fullSitesDF: DataFrame = createFullSitesDataFrame()

val fullSitesCount = fullSitesDF.count()
println(s"The number of sites: $fullSitesCount")

val fullPermsDF: DataFrame = createFullPermissionsDataFrame()

val fullPermsCount = fullPermsDF.count()
println(s"The number of perms: $fullPermsCount")

## Write back to blob storage

We need to write our new dataset back to the blobs - We will drop it in the latest folder. This will make the PowerBI end easier.

We also need to write out blob to archive to retiain. As the latest folder it ovwer written on every new run

In [56]:
//val latestSitesEnhanced = adls_path + s"/sitesenhanced/latest/"
fullSitesDF
    .repartition(1)
    .write
    .format("json")
    .mode("overwrite")
    .save(latestSPSitesEnhanced)


fullPermsDF
    .repartition(1)
    .write
    .format("json")
    .mode("overwrite")
    .save(latestSPPermissions)


if (retainForHistoricTrending) {
    fullSitesDF
        .repartition(1)
        .write
        .format("json")
        .mode("overwrite")
        .save(sitesArchive)

    fullPermsDF
        .repartition(1)
        .write
        .format("json")
        .mode("overwrite")
        .save(permsArchive)
}

StatementMeta(sparkpoolag, 25, 57, Finished, Available)