In [74]:
%%spark
println("Application Id: " + spark.sparkContext.applicationId )
println("Application Name: " + spark.sparkContext.appName)

StatementMeta(sparkpoolag, 1, 37, Finished, Available)

Application Id: application_1693565264111_0002
Application Name: scala-pg_sparkpoolag_1693571957


In [75]:
val storageAccountName = "mgdcag" // replace with your blob name
val storageContainerName = "sites-pg" //replace with your container name

// Storage path
val adls_path = f"abfss://$storageContainerName@$storageAccountName.dfs.core.windows.net"

// Sites path
val latestSitesPath = adls_path + s"/latest/sites/"


spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
// if MSI Access not granted for syanpse workspace to blob then you might need to use below commands to read creds and to set spark conf
//spark.conf.set(s"fs.azure.account.key.${storageAccountName}.blob.core.windows.net",mssparkutils.credentials.getConnectionStringOrCreds("synapseworkspacename-WorkspaceDefaultStorage"))
//spark.conf.set(s"fs.azure.account.key.${storageAccountName}.blob.core.windows.net",mssparkutils.credentials.getConnectionStringOrCreds("LS_ADLSGen2"))


StatementMeta(sparkpoolag, 1, 38, Finished, Available)

storageAccountName: String = mgdcag
storageContainerName: String = sites-pg
adls_path: String = abfss://sites-pg@mgdcag.dfs.core.windows.net
latestSitesPath: String = abfss://sites-pg@mgdcag.dfs.core.windows.net/latest/sites/


## Blob Access details

https://learn.microsoft.com/en-us/azure/synapse-analytics/spark/microsoft-spark-utilities?pivots=programming-language-scala#configure-access-to-azure-blob-storage

## Read the sites dataset
This are the files created by the MGDC copy tool

In [76]:
val sitesDF =
    spark
      .read
      .format("json")
      .option("recursiveFileLookup", "false")
      .load(latestSitesPath)

StatementMeta(sparkpoolag, 1, 39, Finished, Available)

sitesDF: org.apache.spark.sql.DataFrame = [BlockAccessFromUnmanagedDevices: boolean, BlockDownloadOfAllFilesOnUnmanagedDevices: boolean ... 27 more fields]


## Check we have the sites

In [77]:
// This is the site Id of a known site in the tenant. Please update

display(sitesDF.filter("Id == 'd2166072-e770-418c-8486-d121f14ce21d'"))

StatementMeta(sparkpoolag, 1, 40, Finished, Available)

SynapseWidget(Synapse.DataFrame, f8049b66-6e97-4a9e-9262-5b1f0731075c)




# Enrich the Data
This is where we add value as SharePoint CSAs. The PG have provided the pizza base, now we need to add those toppings

## Add coloumns
### Using UDFs (User-Defined Functions):
You can define custom UDFs and use them to create new columns based on your specific logic. Here's an example:

```scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

// Define a custom UDF
val myUDF = udf((arg1: String, arg2: Int) => {
  // Your custom logic here
  // Return the value for the new column
}, StringType)

// Assuming you have a DataFrame called "df"
val dfWithNewColumn = df.withColumn("newColumnName", myUDF($"existingColumn1", $"existingColumn2"))
```

We will use this to add a boolean coloumn for OneDrive sites 

In [78]:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

// returns true if site is OneDrive
// Slighty different to the example above as I was getting scalla errors
val isOneDrive = udf((siteUrl: String) => siteUrl.contains("-my.sharepoint.com"))

// Assuming you have a DataFrame called "df"
val sitesDFOD = sitesDF.withColumn("OneDriveSite", isOneDrive($"Url"))

StatementMeta(sparkpoolag, 1, 41, Finished, Available)

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
isOneDrive: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($$$a5cddfc4633c5dd8aa603ddc4f9aad5$$$$w$$Lambda$7052/542511872@14df77eb,BooleanType,List(Some(class[value[0]: string])),Some(class[value[0]: boolean]),None,false,true)
sitesDFOD: org.apache.spark.sql.DataFrame = [BlockAccessFromUnmanagedDevices: boolean, BlockDownloadOfAllFilesOnUnmanagedDevices: boolean ... 28 more fields]


## Check our site
Shoudl be True for OneDrive or False for SPO Sites

In [79]:
// This is the site Id of a known site in the tenant. Please update

display(sitesDFOD.filter("Id == 'd2166072-e770-418c-8486-d121f14ce21d'"))

StatementMeta(sparkpoolag, 1, 42, Finished, Available)

SynapseWidget(Synapse.DataFrame, f447240a-7013-46c4-a510-d0faa9a36b18)




We can print for 20 items using

In [80]:
import org.apache.spark.sql.DataFrame

val oneDriveColoumn: DataFrame = sitesDFOD.select("OneDriveSite", "Url")
// using truncate = flase paramer to see full urls
oneDriveColoumn.show(20, truncate = false)

StatementMeta(sparkpoolag, 1, 43, Finished, Available)

+------------+----------------------------------------------------------------------------+
|OneDriveSite|Url                                                                         |
+------------+----------------------------------------------------------------------------+
|false       |https://groverale.sharepoint.com/sites/CarTest2                             |
|false       |https://groverale.sharepoint.com/sites/dev-countries3                       |
|false       |https://groverale.sharepoint.com/sites/collaboration                        |
|false       |https://groverale.sharepoint.com/sites/archivedev                           |
|false       |https://groverale.sharepoint.com/sites/migtest10                            |
|false       |https://groverale.sharepoint.com/sites/AdoptionTeam                         |
|false       |https://groverale.sharepoint.com/sites/AchiveHub                            |
|false       |https://groverale.sharepoint.com/sites/testcoms                   

## Enrich DF with API data

We want to call an API then append data to the DF based on the reponse. High level example below

```scala
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.json4s._
import org.json4s.jackson.JsonMethods._

// Step 1: Make the API request and obtain JSON data (simulated)
val apiResponseJson = """{"data1": "value1", "data2": "value2"}"""

// Step 2: Parse the JSON data
val parsedJson = parse(apiResponseJson)

// Assuming you have a DataFrame called "df"
// Step 3: Iterate through the DataFrame and add columns based on JSON data
val dfWithNewColumns: DataFrame = df
  .withColumn("data1_from_API", lit(parsedJson \ "data1"))
  .withColumn("data2_from_API", lit(parsedJson \ "data2"))

dfWithNewColumns.show()

```

## Call API from Scala
The function below can be used to make API calls and return the repsonse as a string

In [81]:
import org.apache.http.client.methods.HttpGet
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils

def makeAPICall(apiUrl: String): String = {
  val httpClient = HttpClients.createDefault()
  val httpGet = new HttpGet(apiUrl)

  val response = httpClient.execute(httpGet)
  val entity = response.getEntity
  val responseJson = EntityUtils.toString(entity)

  responseJson
}



StatementMeta(sparkpoolag, 1, 44, Finished, Cancelled)

We will first call the last activity API to get details around file and site activity 

In [None]:
// This is an Azure function that is hooked up to my tenant. Call it if you want :)
val apiUrl = "https://site-function-ag.azurewebsites.net/api/LastUserActivity?"
val apiResponseJson = makeAPICall(apiUrl)

// Now you can parse apiResponseJson and process it as needed


StatementMeta(, , , Cancelled, )

You can create a DataFrame from a JSON string using the spark.read.json method with a provided RDD (Resilient Distributed Dataset) of strings

With this DF we can select the coloumns we need and drop the rest and join with our main dataset

In [None]:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.json4s._
import org.json4s.jackson.JsonMethods._

// Step 1: Above

// Step 2: Parse the JSON data
//val parsedJson = parse(apiResponseJson)

val jsonRDD = spark.sparkContext.parallelize(Seq(apiResponseJson))

// Load JSON data into a DataFrame without specifying the schema
val jsonDF = spark.read.json(jsonRDD)
    .withColumnRenamed("SiteId", "Id") // Rename the "SiteId" column as we use to join
    .select("Id", "lastActivityDate", "activeFileCount", "pageViewCount")

// Join with existing dataset
val sitesDFODLA = sitesDFOD.join(jsonDF, "Id")


//sitesDFODLA.show()
display(sitesDFODLA.filter("Id == '9b88c7ff-6b3f-4df0-9f64-ec6ec52bbb54'"))


StatementMeta(, , , Cancelled, )

## I can datascience

Next steo is to call the API for each item in the DF. 

To call an API endpoint for each item in a DataFrame and create new columns based on the responses, you can use a combination of Spark's DataFrame transformations and User-Defined Functions (UDFs). Here's a high-level approach to achieve this:

1. Define a UDF that makes the API call, processes the response, and returns the desired result.

2. Apply the UDF to your DataFrame to create the new columns based on the API responses.

Here's a step-by-step guide:

Assuming you have a DataFrame named df and you want to call an API for each row, and then create new columns based on the API 

```scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame

// Define the UDF to make API calls and process responses
val callApiAndProcessResponse = udf((inputColumn1: String, inputColumn2: String) => {
  // Perform the API call here, process the response, and return the result
  // You can use libraries like HttpClient to make the API call

  val apiUrl = s"https://api.example.com/resource?param1=$inputColumn1&param2=$inputColumn2"
  // Make the API call and process the response
  // Replace this with your actual API call logic and response processing

  val apiResponse = makeApiCall(apiUrl) // Your API call function
  val processedResult = processApiResponse(apiResponse) // Your response processing logic

  processedResult // Return the result based on the API response
}, StringType)

// Apply the UDF to create new columns
val dfWithApiResponses: DataFrame = df.withColumn(
  "NewColumn1",
  callApiAndProcessResponse(col("Column1"), col("Column2")),
  "NewColumn2",
  callApiAndProcessResponse(col("Column3"), col("Column4"))
  // Add more columns and corresponding API calls as needed
)

dfWithApiResponses.show()
```

Whilst this would work and is using similar methods to ealier we don't want to call the API for each new coloumn. We want to make one API call per site.

## A new method

extract a list of Id values from your DataFrame and then iterate through that list to perform actions for each Id. Here's a general outline of how you can do this:

Extract a list of Id values from your DataFrame.
Iterate through the list of Id values.
For each Id, perform the desired actions.

```scala
import org.apache.spark.sql.functions._

// Assuming you have a DataFrame named "df" with a column "Id"
val idList: Array[String] = df.select("Id").distinct().collect().map(row => row.getString(0))

// Iterate through the list of Id values
for (id <- idList) {
  // Perform actions for each Id
  println(s"Processing Id: $id")

  // You can call your API or perform other actions here
}
```
This may not be the most effective, but for our usecase it makese sense. Speed is not a concern at this point.



In [None]:
import org.apache.spark.sql.functions._

// Assuming you have a DataFrame named "df" with a column "Id"
val idList: Array[String] = sitesDFODLA.select("Id").distinct().collect().map(row => row.getString(0))


// Pop the first item from the list to use as the schema
val firstItem = idList.head
//val remainingItems = idList.tail
val remainingItems = idList.tail.take(5) // using 5 as we are in dev - don't want to call the API 100s

// This is another Azure function that is hooked up to my tenant. Call it if you want :)
val baseApiUrl = "https://site-function-ag.azurewebsites.net/api/GetAdditionalSiteInfo"
// Use the makeAPICall function defined above
val firstApiResponse = makeAPICall(s"$baseApiUrl?siteId=$firstItem")

// Create a DataFrame based on the schema of the first item - Not suing as want to be dynaic so can update source
//val schema = StructType(firstApiResponse.keys.map(fieldName => StructField(fieldName, StringType, nullable = false)).toSeq)
//var apiResponseDF = spark.createDataFrame(Seq(Row.fromSeq(firstApiResponse.values.map(_.toString).toSeq)), schema)

println(firstApiResponse)


StatementMeta(, , , Cancelled, )

In [82]:
// Create an RDD from the data (Same as earlier)
val rdd = spark.sparkContext.parallelize(Seq(firstApiResponse))

// Create a DataFrame without specifying a schema (schema will be inferred)
// creating as var (mutable) so that you can update it within the loop.
var apiResponseDF = spark.read.json(rdd)

// Iterate through the list of Id values
for (id <- remainingItems) {
    // Perform actions for each Id
    println(s"Processing Id: $id")

    // You can call your API or perform other actions here
    val apiResponseJson = makeAPICall(s"$baseApiUrl?siteId=$id")

    // Create a new row with ApiResponse and append it to the DataFrame
    val newRowRDD = spark.sparkContext.parallelize(Seq(apiResponseJson))
    // Create a DataFrame from the new row
    val newRowDF = spark.read.json(newRowRDD)
    apiResponseDF = apiResponseDF.union(newRowDF)

}

// Show the DataFrame with API responses
// apiResponseDF.show()


StatementMeta(sparkpoolag, 1, 45, Finished, Available)

Processing Id: 533fbbba-b95b-4d56-9fe8-da688e047709
Processing Id: d1a1ec05-7528-4768-af43-9b9593e7470f
Processing Id: 55c23d0d-382d-41a6-92cd-3483a8415cb2
Processing Id: 81beb81b-c4e3-4451-bd4c-0fbf8ab02e21
Processing Id: 77ff9639-8e5a-4c7a-ab29-07f8fa646bd8
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[243] at parallelize at <console>:180
apiResponseDF: org.apache.spark.sql.DataFrame = [numberOfDrives: bigint, numberOfItemsInLists: bigint ... 5 more fields]


In [83]:
val siteIdColumn: DataFrame = apiResponseDF.select("siteId")
siteIdColumn.show()

StatementMeta(sparkpoolag, 1, 46, Finished, Available)

+--------------------+
|              siteId|
+--------------------+
|498a8cad-7133-4d5...|
|533fbbba-b95b-4d5...|
|d1a1ec05-7528-476...|
|55c23d0d-382d-41a...|
|81beb81b-c4e3-445...|
|77ff9639-8e5a-4c7...|
+--------------------+

siteIdColumn: org.apache.spark.sql.DataFrame = [siteId: string]


## Join back with the main dataset
We kind of want to have one master dataset as it will make the PowerBI task easier.



In [94]:
val sitesMoreDetails = apiResponseDF
    .withColumnRenamed("SiteId", "Id")

// Join with existing dataset
// val sitesDFODLAMORE = sitesDFODLA.join(apiResponseDF, "Id")

// We are going to join backwards as we only have 6 items in debug - Prod would use the above
val sitesDFODLAMORE = sitesMoreDetails.join(sitesDFODLA, "Id")

val moreColoumns: DataFrame = sitesDFODLAMORE.select("Id", "OneDriveSite", "numberOfDrives", "storageUsedInDrives", "lastActivityDate", "StorageMetrics.TotalFileStreamSize")
// using truncate = flase paramer to see full urls
moreColoumns.show(20, truncate = false)

StatementMeta(sparkpoolag, 1, 57, Finished, Available)

+------------------------------------+------------+--------------+-------------------+-------------------+-------------------+
|Id                                  |OneDriveSite|numberOfDrives|storageUsedInDrives|lastActivityDate   |TotalFileStreamSize|
+------------------------------------+------------+--------------+-------------------+-------------------+-------------------+
|498a8cad-7133-4d55-b283-b0864d61a49a|false       |1             |1465376            |2023-07-31T00:00:00|0                  |
|81beb81b-c4e3-4451-bd4c-0fbf8ab02e21|false       |1             |1998608            |2023-08-18T00:00:00|352031             |
|533fbbba-b95b-4d56-9fe8-da688e047709|false       |4             |69281704           |2018-10-10T00:00:00|3312213            |
|d1a1ec05-7528-4768-af43-9b9593e7470f|false       |2             |4253787586         |null               |2123799473         |
|55c23d0d-382d-41a6-92cd-3483a8415cb2|false       |3             |108583185          |2020-06-17T00:00:00|85328

## Big Value Data Points

This is where the real magic happens. With the data in the DF it's possible to work out previous version storage. What an insight, and we haven't even itterated every object.

In the MGDC sites data set we can make the following assumption

`PreviousVersionSize = TotalSize - TotalFileStreamSize - MetadataSize`

With the addional data we now have we can make a far better assumption

`PreviousVersionSize = TotalSize - TotalFileStreamSize - MetadataSize - storageUsedPreservationHold`

In an ideal world we will also have list size and perhpas even pages library size that we can remove. This is just one example of what we can do with just a few extra toppings to add to this maverlous MGDC flavoured Pizza.

We will use one of the UDFs from the start



In [95]:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

// returns true if site is OneDrive
// Slighty different to the example above as I was getting scalla errors
val previousVersionSize = udf((totalSize: BigInt, totalFileStreamSize: BigInt, metadataSize: BigInt, storageUsedPreservationHold: BigInt) => 
    totalSize - totalFileStreamSize - metadataSize - storageUsedPreservationHold
)
// Assuming you have a DataFrame called "df"
// TotalSize - TotalFileStreamSize - MetadataSize - storageUsedPreservationHold
val sitesDFODLAMOREPV = sitesDFODLAMORE
    .withColumn("PreviousVersionSize", previousVersionSize($"StorageMetrics.TotalSize", $"StorageMetrics.TotalFileStreamSize", $"StorageMetrics.MetadataSize", $"storageUsedPreservationHold"))

val pvColoumns: DataFrame = sitesDFODLAMOREPV.select("Id", "OneDriveSite", "PreviousVersionSize", "lastActivityDate")
// using truncate = flase paramer to see full urls
pvColoumns.show(20, truncate = false)

StatementMeta(sparkpoolag, 1, 58, Finished, Available)

+------------------------------------+------------+-------------------+-------------------+
|Id                                  |OneDriveSite|PreviousVersionSize|lastActivityDate   |
+------------------------------------+------------+-------------------+-------------------+
|533fbbba-b95b-4d56-9fe8-da688e047709|false       |13189018           |2018-10-10T00:00:00|
|d1a1ec05-7528-4768-af43-9b9593e7470f|false       |2390944            |null               |
|55c23d0d-382d-41a6-92cd-3483a8415cb2|false       |26523382           |2020-06-17T00:00:00|
|81beb81b-c4e3-4451-bd4c-0fbf8ab02e21|false       |1552909            |2023-08-18T00:00:00|
|498a8cad-7133-4d55-b283-b0864d61a49a|false       |1413165            |2023-07-31T00:00:00|
|77ff9639-8e5a-4c7a-ab29-07f8fa646bd8|false       |1407544            |2023-01-27T00:00:00|
+------------------------------------+------------+-------------------+-------------------+

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
pre

## Write back to blob storage

This can wait until next week