In [13]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from faker import Faker
import datetime

# data lake and container information
storage_account = 'stoeahybriddev2'
use_test_env = False

if use_test_env:
    stage1np = 'abfss://test-env@' + storage_account + '.dfs.core.windows.net/stage1np'
    stage2np = 'abfss://test-env@' + storage_account + '.dfs.core.windows.net/stage2np'
    stage2p = 'abfss://test-env@' + storage_account + '.dfs.core.windows.net/stage2p'
else:
    stage1np = 'abfss://stage1np@' + storage_account + '.dfs.core.windows.net'
    stage2np = 'abfss://stage2np@' + storage_account + '.dfs.core.windows.net'
    stage2p = 'abfss://stage2p@' + storage_account + '.dfs.core.windows.net'

StatementMeta(spark3p1sm, 58, 12, Finished, Available)

# User data

In [14]:
# schemas for each of the JSON files into tables
user_schema = StructType(fields=[
    StructField('value', ArrayType(
        StructType([
            StructField('surname', StringType(), False),
            StructField('givenName', StringType(), False),
            StructField('userPrincipalName', StringType(), False),
            StructField('id', StringType(), False)
        ])
    ))
])

StatementMeta(spark3p1sm, 58, 13, Finished, Available)

In [39]:
# load needed tables from JSON data lake storage
dfUsersRaw = spark.read.format('json').load(f'{stage1np}/GraphAPI/Users/*.json', schema=user_schema)
dfUsersRaw.printSchema()

StatementMeta(spark3p1sm, 58, 38, Finished, Available)

root
 |-- value: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- surname: string (nullable = true)
 |    |    |-- givenName: string (nullable = true)
 |    |    |-- userPrincipalName: string (nullable = true)
 |    |    |-- id: string (nullable = true)

In [40]:
dfUsers = dfUsersRaw.select(explode('value').alias('exploded_values')).select("exploded_values.*")
print(dfUsers.count())
# remove all unneeded users, keep contoso only
dfUsers = dfUsers.filter(~dfUsers.userPrincipalName.contains('#EXT#') & ~dfUsers.userPrincipalName.contains('admin'))
print(dfUsers.count())
display(dfUsers.limit(10))

StatementMeta(spark3p1sm, 58, 39, Finished, Available)

0
0


SynapseWidget(Synapse.DataFrame, 0d80afcb-8df6-4379-bd6b-daae34fbaa8c)

In [17]:
dfUsersStructured = dfUsers.select(
    array(struct(
        col("surname"),
        col("givenName"),
        col("userPrincipalName"),
        col("id")
    )).alias("value")
)

StatementMeta(spark3p1sm, 58, 16, Finished, Available)

In [18]:
dfUsersStructured.write.format("org.apache.spark.sql.json").mode("overwrite").save(stage1np + '/temp/Users/')

StatementMeta(spark3p1sm, 58, 17, Finished, Available)

In [19]:
dfUsersGenLoad = spark.read.format('json').load(f'{stage1np}/temp/Users/*.json', schema=user_schema)
dfUsersGenLoad.printSchema()
dfUsersGen = dfUsersGenLoad.select(explode('value').alias('exploded_values')).select("exploded_values.*")
print(dfUsersGen.count())
display(dfUsersGen.limit(10))

StatementMeta(spark3p1sm, 58, 18, Finished, Available)

root
 |-- value: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- surname: string (nullable = true)
 |    |    |-- givenName: string (nullable = true)
 |    |    |-- userPrincipalName: string (nullable = true)
 |    |    |-- id: string (nullable = true)

77


SynapseWidget(Synapse.DataFrame, 3f0febc1-551b-4f1f-9196-7d97a03ec639)

# M365 data

In [20]:

m365_app_user_details_schema = StructType(fields=[
    StructField('value', ArrayType(
        StructType([
            StructField('reportRefreshDate', StringType(), False),
            StructField('userPrincipalName', StringType(), False),
            StructField('lastActivityDate', StringType(), False),
            StructField('details', ArrayType(
                StructType([
                    StructField('reportPeriod', StringType(), False),
                    StructField('excel', StringType(), False),
                    StructField('excelWeb', StringType(), False),
                    StructField('outlook', StringType(), False),
                    StructField('outlookWeb', StringType(), False),
                    StructField('powerPoint', StringType(), False),
                    StructField('powerPointWeb', StringType(), False),
                    StructField('teams', StringType(), False),
                    StructField('teamsWeb', StringType(), False),
                    StructField('word', StringType(), False),
                    StructField('wordWeb', StringType(), False),
                ])
            ))
        ])
    ))
])

StatementMeta(spark3p1sm, 58, 19, Finished, Available)

In [21]:
# load needed tables from JSON data lake storage
dfM365UserActivityRaw = spark.read.format('json').load(f'{stage1np}/GraphAPI/M365_App_User_Detail/*.json', schema=m365_app_user_details_schema)

StatementMeta(spark3p1sm, 58, 20, Finished, Available)

In [22]:
dfM365UserActivity = dfM365UserActivityRaw.select(explode('value').alias('exploded_values')).select("exploded_values.*")
dfM365UserActivity = dfM365UserActivity.withColumn('reportPeriod', explode(col('details').reportPeriod)) \
                        .withColumn('excel', explode(col('details').excel)) \
                        .withColumn('excelWeb', explode(col('details').excelWeb)) \
                        .withColumn('outlook', explode(col('details').outlook)) \
                        .withColumn('outlookWeb', explode(col('details').outlookWeb)) \
                        .withColumn('powerPoint', explode(col('details').powerPoint)) \
                        .withColumn('powerPointWeb', explode(col('details').powerPointWeb)) \
                        .withColumn('teams', explode(col('details').teams)) \
                        .withColumn('teamsWeb', explode(col('details').teamsWeb)) \
                        .withColumn('word', explode(col('details').word)) \
                        .withColumn('wordWeb', explode(col('details').wordWeb)) \
                        .drop('details')
display(dfM365UserActivity.limit(10))

StatementMeta(spark3p1sm, 58, 21, Finished, Available)

SynapseWidget(Synapse.DataFrame, 4c0c5cd8-b848-4eac-bd9b-822541cd7d56)

In [23]:
num_of_users = dfUsersGen.count()
users = dfUsersGen.select('userPrincipalName').collect()
num_weeks = 10

m365schema = StructType(fields=[
    StructField('reportRefreshDate', StringType(), False),
    StructField('userPrincipalName', StringType(), False),
    StructField('lastActivityDate', StringType(), False),
    StructField('reportPeriod', StringType(), False),
    StructField('excel', StringType(), False),
    StructField('excelWeb', StringType(), False),
    StructField('outlook', StringType(), False),
    StructField('outlookWeb', StringType(), False),
    StructField('powerPoint', StringType(), False),
    StructField('powerPointWeb', StringType(), False),
    StructField('teams', StringType(), False),
    StructField('teamsWeb', StringType(), False),
    StructField('word', StringType(), False),
    StructField('wordWeb', StringType(), False)
    ])

dfM365Gen = spark.createDataFrame([], m365schema)

today = datetime.datetime.now()

for m in range(num_weeks):
    weeks = datetime.timedelta(days=7*m)
    date = today - weeks
    date = date.strftime('%Y-%m-%d')
    for n in range(num_of_users):
        user = users[n][0]
        newRow = spark.createDataFrame([(date,'user',date,'7','true','true','true','true','true','true','true','true','true','true')])
        
        dfM365Gen = dfM365Gen.union(newRow)

dfM365Gen = dfM365Gen.withColumn('excel', when(rand() > 0.3, "true").otherwise("false")).withColumn('excelWeb', when(rand() > 0.7, "true").otherwise("false")).withColumn('outlook', when(rand() > 0.9, "true").otherwise("false")).withColumn('outlookWeb', when(rand() > 0.1, "true").otherwise("false")).withColumn('powerPoint', when(rand() > 0.7, "true").otherwise("false")).withColumn('powerPointWeb', when(rand() > 0.9, "true").otherwise("false")).withColumn('teams', when(rand() > 0.1, "true").otherwise("false")).withColumn('teamsWeb', when(rand() > 0.3, "true").otherwise("false")).withColumn('word', when(rand() > 0.4, "true").otherwise("false")).withColumn('wordWeb', when(rand() > 0.7, "true").otherwise("false"))

display(dfM365Gen.limit(10))

StatementMeta(spark3p1sm, 58, 22, Finished, Available)

SynapseWidget(Synapse.DataFrame, 49336244-d97e-4e29-910a-3453cee61e1c)

In [24]:
dfM365UserActivityStructured = dfM365Gen.select(
    array(struct(
        col("reportRefreshDate"),
        col("userPrincipalName"),
        col("lastActivityDate"),
        array(struct(
            col("reportPeriod"),
            col("excel"),col("excelWeb"),
            col("outlook"),col("outlookWeb"),
            col("powerPoint"),col("powerPointWeb"),
            col("teams"),col("teamsWeb"),
            col("word"),col("wordWeb")
        )).alias("details")        
    )).alias("value")
)

StatementMeta(spark3p1sm, 58, 23, Finished, Available)

In [25]:
dfM365UserActivityStructured.write.format("org.apache.spark.sql.json").mode("overwrite").save(stage1np + '/temp/M365/')

StatementMeta(spark3p1sm, 58, 24, Finished, Available)

In [26]:
dfM365GenLoad = spark.read.format('json').load(f'{stage1np}/temp/M365/*.json', schema=m365_app_user_details_schema)
dfM365GenLoad.printSchema()
dfM365Gen = dfM365GenLoad.select(explode('value').alias('exploded_values')).select("exploded_values.*")
dfM365Gen = dfM365Gen.withColumn('reportPeriod', explode(col('details').reportPeriod)) \
                        .withColumn('excel', explode(col('details').excel)) \
                        .withColumn('excelWeb', explode(col('details').excelWeb)) \
                        .withColumn('outlook', explode(col('details').outlook)) \
                        .withColumn('outlookWeb', explode(col('details').outlookWeb)) \
                        .withColumn('powerPoint', explode(col('details').powerPoint)) \
                        .withColumn('powerPointWeb', explode(col('details').powerPointWeb)) \
                        .withColumn('teams', explode(col('details').teams)) \
                        .withColumn('teamsWeb', explode(col('details').teamsWeb)) \
                        .withColumn('word', explode(col('details').word)) \
                        .withColumn('wordWeb', explode(col('details').wordWeb)) \
                        .drop('details')
print(dfM365Gen.count())
display(dfM365Gen.limit(10))

StatementMeta(spark3p1sm, 58, 25, Finished, Available)

root
 |-- value: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- reportRefreshDate: string (nullable = true)
 |    |    |-- userPrincipalName: string (nullable = true)
 |    |    |-- lastActivityDate: string (nullable = true)
 |    |    |-- details: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- reportPeriod: string (nullable = true)
 |    |    |    |    |-- excel: string (nullable = true)
 |    |    |    |    |-- excelWeb: string (nullable = true)
 |    |    |    |    |-- outlook: string (nullable = true)
 |    |    |    |    |-- outlookWeb: string (nullable = true)
 |    |    |    |    |-- powerPoint: string (nullable = true)
 |    |    |    |    |-- powerPointWeb: string (nullable = true)
 |    |    |    |    |-- teams: string (nullable = true)
 |    |    |    |    |-- teamsWeb: string (nullable = true)
 |    |    |    |    |-- word: string (nullable = true)
 |    |    |    |    |-- wo

SynapseWidget(Synapse.DataFrame, 92903d68-6132-44a3-be59-c5762b682042)

# Teams Activity

In [27]:

teams_activity_user_details_schema = StructType(fields=[
    StructField('value', ArrayType(
        StructType([
            StructField('reportRefreshDate', StringType(), False),
            StructField('reportPeriod', StringType(), False),
            StructField('userPrincipalName', StringType(), False),
            StructField('privateChatMessageCount', IntegerType(), False),
            StructField('teamChatMessageCount', IntegerType(), False),
            StructField('meetingsAttendedCount', IntegerType(), False),
            StructField('meetingCount', IntegerType(), False),
            StructField('audioDuration', StringType(), False),
            StructField('videoDuration', StringType(), False)
        ])
    ))
])

StatementMeta(spark3p1sm, 58, 26, Finished, Available)

In [28]:
# load needed tables from JSON data lake storage
dfTeamsUserActivityRaw = spark.read.format('json').load(f'{stage1np}/GraphAPI/Teams_Activity_User_Detail/*.json', schema=teams_activity_user_details_schema)

StatementMeta(spark3p1sm, 58, 27, Finished, Available)

In [29]:
dfTeamsUserActivity = dfTeamsUserActivityRaw.select(explode('value').alias('exploded_values')).select("exploded_values.*")
display(dfTeamsUserActivity.limit(10))

StatementMeta(spark3p1sm, 58, 28, Finished, Available)

SynapseWidget(Synapse.DataFrame, a5148b3c-7c29-49e7-984f-f537ed23fea7)

In [30]:
num_of_users = dfUsersGen.count()
users = dfUsersGen.select('userPrincipalName').collect()
num_weeks = 10

teamsSchema = StructType(fields=[
     StructField('reportRefreshDate', StringType(), False),
            StructField('reportPeriod', StringType(), False),
            StructField('userPrincipalName', StringType(), False),
            StructField('privateChatMessageCount', IntegerType(), False),
            StructField('teamChatMessageCount', IntegerType(), False),
            StructField('meetingsAttendedCount', IntegerType(), False),
            StructField('meetingCount', IntegerType(), False),
            StructField('audioDuration', StringType(), False),
            StructField('videoDuration', StringType(), False)
    ])

dfTeamsGen = spark.createDataFrame([], teamsSchema)

today = datetime.datetime.now()

for m in range(num_weeks):
    weeks = datetime.timedelta(days=7*m)
    date = today - weeks
    date = date.strftime('%Y-%m-%d')
    for n in range(num_of_users):
        user = users[n][0]
        newRow = spark.createDataFrame([(date,'7','user',0,0,0,0,"PT0S","PT0S")])
        dfTeamsGen = dfTeamsGen.union(newRow)

dfTeamsGen = dfTeamsGen.withColumn('privateChatMessageCount', ceil(rand()*10)).withColumn('meetingsAttendedCount', ceil(rand()*10)).withColumn('meetingCount', ceil(rand()*10)) #.withColumn('audioDuration', datetime.timedelta(seconds=100)).withColumn('videoDuration', datetime.timedelta(seconds=100))
display(dfTeamsGen.limit(10))

StatementMeta(spark3p1sm, 58, 29, Finished, Available)

SynapseWidget(Synapse.DataFrame, bd35c9e2-4f0f-4c11-9ab0-878f9e79f378)

In [31]:
dfTeamsActivityStructured = dfTeamsGen.select(
    array(struct(
        col("reportRefreshDate"),
        col("reportPeriod"),
        col("userPrincipalName"),        
        col("privateChatMessageCount"),        
        col("teamChatMessageCount"),        
        col("meetingsAttendedCount"),        
        col("meetingCount"),        
        col("audioDuration"),        
        col("videoDuration"),        
    )).alias("value"))

StatementMeta(spark3p1sm, 58, 30, Finished, Available)

In [32]:
dfTeamsActivityStructured.write.format("org.apache.spark.sql.json").mode("overwrite").save(stage1np + '/temp/Teams/')

StatementMeta(spark3p1sm, 58, 31, Finished, Available)

In [33]:
dfTeamsGenLoad = spark.read.format('json').load(f'{stage1np}/temp/Teams/*.json', schema=teams_activity_user_details_schema)
dfTeamsGenLoad.printSchema()
dfTeamsGen = dfTeamsGenLoad.select(explode('value').alias('exploded_values')).select("exploded_values.*")
print(dfTeamsGen.count())
display(dfTeamsGen.limit(10))

StatementMeta(spark3p1sm, 58, 32, Finished, Available)

root
 |-- value: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- reportRefreshDate: string (nullable = true)
 |    |    |-- reportPeriod: string (nullable = true)
 |    |    |-- userPrincipalName: string (nullable = true)
 |    |    |-- privateChatMessageCount: integer (nullable = true)
 |    |    |-- teamChatMessageCount: integer (nullable = true)
 |    |    |-- meetingsAttendedCount: integer (nullable = true)
 |    |    |-- meetingCount: integer (nullable = true)
 |    |    |-- audioDuration: string (nullable = true)
 |    |    |-- videoDuration: string (nullable = true)

770


SynapseWidget(Synapse.DataFrame, c208f65d-3e57-4c1a-bb43-58e48adfb39d)

# Move data to production

In [41]:
dfUsersGenLoad.write.format("org.apache.spark.sql.json").mode("overwrite").save(stage1np + '/GraphAPI/Users/')
dfM365GenLoad.write.format("org.apache.spark.sql.json").mode("overwrite").save(stage1np + '/GraphAPI/M365_App_User_Detail/')
dfTeamsGenLoad.write.format("org.apache.spark.sql.json").mode("overwrite").save(stage1np + '/GraphAPI/Teams_Activity_User_Detail/')

StatementMeta(spark3p1sm, 58, 40, Finished, Available)