Parameters

In [60]:
StartDate = "2019-04-01"
EndDate = "2021-08-31"
Destination = "ona/test_20210810"
hr_attributes = "FunctionType,LevelDesignation,Domain,Organization,Region"
minGroupSize = 5
log_folder = "ona/job_logs/01/"
meaningfulParticipantThreshold = 8
metric_partition, metric_clustering, metric_fluidity, metric_xy, metric_ari, metric_freedom = True, True, True, True, True, True
bad_flag = False # becomes True if LCC = 0, LCC fails, data is less than 2 months, or modularity fails.

ResultBlobPath = 'abfss://users@dopsis.dfs.core.windows.net/ona/MGDC_Insights/'


StatementMeta(spark1, 44, 3, Finished, Available)



Import Packages

In [61]:
from pyspark.sql.functions import to_date, date_format, ceil, year, lit
import os
import pyspark.sql.functions as F
import datetime as dt
from pyspark.sql.types import *
from pyspark.sql.functions import col
from pyspark.sql.window import Window

import networkx as nx
import topologic as tc
import graspologic as gc

from datetime import datetime, timedelta
import pandas as pd
from notebookutils import mssparkutils

StatementMeta(, , , Waiting, )



In [62]:
# # %%pyspark
# df = spark.read.load('abfss://users@dopsis.dfs.core.windows.net/ona/democontoso interaction table/Democontoso-DI three months_Interactions.csv', format='csv', header)
# df.write.mode("overwrite").saveAsTable("interactions")
# display(df.limit(10))

StatementMeta(spark1, 44, 5, Finished, Available)

In [63]:

# Get passed in log folder

# If it was empty, replace with run_id or 999
if log_folder == "ona/job_logs/01/" or log_folder == "":
  try:
    log_folder = "/job_logs/" + str(run_id) + "/"
  except:
    log_folder = "ona/job_logs/" + "01" + "/"
  # create pandas dataframe for logging and create the folder for logging
  logging_output = pd.DataFrame(columns=["time", "type", "text"])
  mssparkutils.fs.mkdirs(log_folder)
  mssparkutils.fs.mkdirs(log_folder + 'full_log.json')
# Otherwise - load parent log folder
else:
  logging_output = sqlContext.read.json(log_folder + "full_log.json").toPandas()
print(log_folder)
 

def output_log(log, log_type, text):
  df = pd.DataFrame({"time":str(datetime.now()), "type":log_type, "text": "Monthly Indv CHI Metrics - " + text}, index=[0])
  df = df[list(logging_output.columns)]
  return(log[list(logging_output.columns)].append(df, ignore_index=True))



def df_output_blob(df, extension, outFolder):
    outPath = ResultBlobPath + outFolder + "/"
    if extension == 'csv':
      df.repartition(1).write.csv(outPath, header='true', mode='overwrite', escape="\"")
    elif extension == 'json':
      df.repartition(1).write.json(outPath)
    
    # Copy file from outFolder to central working directory
    try:
      fullLS = mssparkutils.fs.ls(outPath)
      for i in fullLS:
        if 'part-00000' in i.name:
          outFileName = i.name
          outFileLocation = i.path
          newFileLocation = ResultBlobPath + outFolder + '.' + extension
          mssparkutils.fs.mv(outFileLocation, newFileLocation, overwrite=True)
          print ('File moved successfully: ', newFileLocation)
    except Exception as e:
      print ("Error moving file. Error: ", e) 
  
    # clean up old files
    try:
      mssparkutils.fs.rm(outPath , True)
      print ('Work Folder deleted: ', outPath)
    except Exception as e:
      print ("Error Deleting work File or Folder. Error: ", e)
      

class CHIException(Exception):
    pass

StatementMeta(spark1, 44, 6, Finished, Available)

ona/job_logs/01/

Part 1: Reading Graph

In [64]:
def split_date(string_date):
  """
  splits date string in format yyyy-mm-dd
  """
  year, month, date = str(string_date).split('-')
  return year, month, date

### filter out 
def check_same_month(unique_months):
  """
  normally it should be like 01-01-20 and 01-31-20. find the Start/End pairs that are not referring to same month
  """
  for pair in unique_months:
    start, end = pair['MonthStartDate'], pair['MonthEndDate']
    s_year, s_month, s_date = split_date(start)
    e_year, e_month, e_date = split_date(end)

    if s_year != e_year or s_month != e_month:
      print(f'Not a one month period: {start, end}')
      return start, end
  else:
    return None, None
  
def get_max_min_date(df):
  """
  get last month in dataset
  """
  StartDate = df.agg(F.min(F.col("MonthStartDate"))).toPandas().iloc[0][0]
  EndDate = df.agg(F.max(F.col("MonthEndDate"))).toPandas().iloc[0][0]

  return StartDate, EndDate

# logging_output = output_log(logging_output, "StdOut",f"Reading Interactions data.")
# logging_output.to_json(log_folder + "full_log.json", orient='records')

try:
  dfraw = spark.read.load('abfss://users@dopsis.dfs.core.windows.net/ona/MGDC Interaction Table/MGDC.csv', format='csv', header=True)

  print(f'Original: {dfraw.count()}')
  # get new start and end date if default are passed
  if StartDate == '1900-01-01' and EndDate == '2100-12-31':
    StartDate, EndDate = get_max_min_date(dfraw)

  dfraw = dfraw.filter( (F.col("MonthStartDate") >= StartDate) & (F.col("MonthEndDate") <= EndDate) )
  interactions_ = dfraw
  unique_months = dfraw.groupBy("MonthStartDate", "MonthEndDate").agg(F.count("WeightByCount").alias("count")).collect()
  # filter so we're only working with monthly data
  start, end = check_same_month(unique_months)

  if start:
    # first filter out interactions that aren't for just 1 month
    dfraw = dfraw.filter(~((F.col("MonthStartDate")==start) & (F.col("MonthEndDate")==end)) )
  print(f'Filtered: {dfraw.count()}')

  dfraw = dfraw.select("Node1Pid","Node2Pid","WeightbyCount","MonthStartDate")\
    .withColumn('year_month',F.date_format(F.col('MonthStartDate'),'yyyy-MM').cast("string"))\
    .drop("MonthStartDate")\
    .select("year_month","Node1Pid","Node2Pid","WeightByCount")\
    .withColumnRenamed("Node1Pid","Source")\
    .withColumnRenamed("Node2Pid","Target")\
    .withColumnRenamed("WeightByCount","weight")\
    .withColumn("Source",F.col("Source").cast("string"))\
    .withColumn("Target", F.col("Target").cast("string"))

except Exception as e:
#   logging_output = output_log(logging_output, "StdErr", str(e))
#   logging_output.to_json(log_folder + "full_log.json", orient='records')
  raise ValueError(e)

StatementMeta(spark1, 44, 7, Finished, Available)

Original: 63745
Filtered: 63745

In [65]:
# logging_output = output_log(logging_output, "StdOut",f"Checking we have at least 2 months.")
# logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')

has_multiple_months = 0

monthly = dfraw.groupBy("year_month").agg(F.count("Source").alias("monthly_entries"))

if monthly.count() > 1:
  has_multiple_months = 1
  
if not has_multiple_months:
  bad_flag = True
  e = "ERROR. CHI calculations requires at least 2 months of data."
#   logging_output = output_log(logging_output, "StdErr", str(e))
#   logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')

StatementMeta(spark1, 44, 8, Finished, Available)



In [66]:
# logging_output = output_log(logging_output, "StdOut",f"Casting weights from floats to integer by multiplying by 1000.")
# logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')

dfraw = dfraw.withColumn("weight", F.lit(F.round(F.col("weight") * 1000)).cast("integer")).filter(F.col("weight")!= 0)

StatementMeta(spark1, 44, 9, Finished, Available)



Computing Longest Connected Component

In [67]:
import os
from pyspark.sql.types import *

def get_lcc(row):
  import networkx as nx
  import topologic as tc
  try:
    date = row[0]
    graph = nx.DiGraph()

    for s, t, w in row[1]:
      if not graph.has_edge(s, t):
        graph.add_edge(s, t, weight=w)
      else:
        previous_weight = graph[s][t]['weight']
        graph.add_edge(s, t, weight=w + previous_weight)

    lcc = tc.largest_connected_component(graph)

    for s, t, w in lcc.edges(data='weight'):
      yield (date, s, t, w)
  except Exception as e:
    print(e)

# logging_output = output_log(logging_output, "StdOut",f"Calculating Longest Connected Component.")
# logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')

try:
  if not bad_flag:
    monthly_rdd = dfraw.rdd.map(lambda row: (row[0], (row[1:]))).cache()
    lcc_rdd = monthly_rdd.groupByKey().flatMap(lambda group: get_lcc(group)).cache()

    schema = StructType([
        StructField("year_month", StringType(), True),
        StructField("Source", StringType(), True),
        StructField("Target", StringType(), True),
        StructField("weight", IntegerType(), True),
    ])

    lcc_df = spark.createDataFrame(lcc_rdd, schema=schema)

    raw_count, lcc_count = dfraw.count(), lcc_df.count()
    print(f'Original raw count: {raw_count}. LCC count: {lcc_count}. Difference: {raw_count - lcc_count}')

    if lcc_count > 0:
        lcc_df.write.mode('overwrite').parquet(os.path.join(ResultBlobPath, 'graph_lcc.parquet'))
    #   logging_output = output_log(logging_output, "StdOut",f"LCC Count: {lcc_df.count()}.")
    #   logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')
        print("Save LCC Count")
    else:
      bad_flag = True
      e = "FATAL ERROR. LCC is equal to 0. Unable to calculate metrics."
    #   logging_output = output_log(logging_output, "StdErr", str(e))
    #   logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')

    print(f'Bad flag status: {bad_flag}')
  
except Exception as e:
#   logging_output = output_log(logging_output, "StdErr", str(e))
#   logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')
  
  bad_flag = True
  e = "FATAL ERROR. Calculation of Longest Connected Component failed. Unable to calculate metrics."
#   logging_output = output_log(logging_output, "StdErr", str(e))
#   logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')

StatementMeta(spark1, 44, 10, Finished, Available)

Original raw count: 63745. LCC count: 63353. Difference: 392
Save LCC Count
Bad flag status: False

In [68]:
import os

if not bad_flag:
  df = spark.read.parquet(os.path.join(ResultBlobPath, 'graph_lcc.parquet'))
  df.registerTempTable('dftable')
  print(f'LCC count: {df.count()}')

StatementMeta(spark1, 44, 11, Finished, Available)

LCC count: 63353

Compute Partitioning + Modularity (node/network level)

In [69]:
def get_graph_and_leaf_partitions(row):
  import networkx as nx
  from graspologic.partition import hierarchical_leiden
  import topologic as tc
  import collections
  
  date = row[0]
  
  print(row[0])
  # build graph - this is UNDIRECTED since it is using Leiden
  graph = nx.Graph()
 
  for s, t, w in row[1]:
    w = int(w)
    if graph.has_edge(s, t):
      graph[s][t]['weight'] += w
    else:
      graph.add_edge(s, t, weight=w)
  
  partitions = hierarchical_leiden(
    [(s, t, w) for s, t, w in graph.edges(data='weight')],
    max_cluster_size=250
  )
  
  leaf_partitions_inverted = collections.defaultdict(set) # dictionary partition_id -> set of nodes in that partition
  root_partitions_inverted = collections.defaultdict(set)
  leaf_partitions = dict()
  root_partitions = dict()
  
  for row in partitions:
    if row.level == 0:
      root_partitions_inverted[row.cluster].add(row.node)
      root_partitions[row.node] = row.cluster
    if row.is_final_cluster:
      leaf_partitions_inverted[row.cluster].add(row.node)
      leaf_partitions[row.node] = row.cluster
      
  leaf_modularity = tc.partition.modularity(graph, leaf_partitions)
  root_modularity = tc.partition.modularity(graph, root_partitions)
  
  return (date, graph, leaf_partitions_inverted, root_partitions_inverted, leaf_modularity, root_modularity)

StatementMeta(spark1, 44, 12, Finished, Available)



Calculating partitions

In [70]:
# logging_output = output_log(logging_output, "StdOut",f"Calculating partitions.")
# logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')

if metric_partition and not bad_flag:
  try:
    graphs_rdd = df.rdd.map(lambda row: (row[0], (row[1:]))).groupByKey().map(lambda x: get_graph_and_leaf_partitions(x))
    graph_with_partitions = graphs_rdd.collect()
  except:
    # logging_output = output_log(logging_output, "StdErr", "FATAL ERROR: Calculation of Partitions failed. Will produce dummy values")
    # logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')
    
    bad_flag = True
    metric_partition = False
else:
  print('not calculating metric_partition')

StatementMeta(spark1, 44, 13, Finished, Available)



Calculating root and leaf partitions

In [71]:
# logging_output = output_log(logging_output, "StdOut",f"Calculating root and leaf partitions.")
# logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')

if metric_partition and not bad_flag:
  try:
    modularities = []
    leaf_partitions = []
    root_partitions = []

    for date, graph, leaf_partitions_inverted, root_partitions_inverted, leaf_modularity, root_modularity in graph_with_partitions:
      modularities.append((date, leaf_modularity, root_modularity))

      for partition, nodes in leaf_partitions_inverted.items():
        for node in nodes:
          leaf_partitions.append((date, partition, node))

      for partition, nodes in root_partitions_inverted.items():
        for node in nodes:
          root_partitions.append((date, partition, node))

    modularity_df = sc.parallelize(modularities).toDF(['date', 'leaf_modularity', 'root_modularity'])

    leaf_partitions_df = sc.parallelize(leaf_partitions).toDF(['date', 'leaf_partition_id', 'node'])
    root_partitions_df = sc.parallelize(root_partitions).toDF(['date', 'root_partition_id', 'node'])
  except:
    # logging_output = output_log(logging_output, "StdErr", "FATAL ERROR: Calculation of Partitions failed. Will produce dummy values")
    # logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')
    
    bad_flag = True
    metric_partition = False
    e = 'Failed to calculate root and leaf partitions.'
    # logging_output = output_log(logging_output, "StdErr", str(e))
    # logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')
  
else:
  print('not calculating metric_partition')

StatementMeta(spark1, 44, 14, Finished, Available)



Combining Partitions Data

In [72]:
from pyspark.sql.functions import col

# logging_output = output_log(logging_output, "StdOut","Combining partitions data.")
# logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')

if metric_partition and not bad_flag:
  try:
    root_partitions_df = root_partitions_df.withColumnRenamed('node', 'node1').withColumnRenamed('date', 'date1')

    partitions_df = (root_partitions_df.join(
                        leaf_partitions_df,
                        (leaf_partitions_df.node == root_partitions_df.node1) & (leaf_partitions_df.date == root_partitions_df.date1),
                        how='outer'
                      )
                     .fillna({'root_partition_id' : '-1', 'leaf_partition_id' : '-1'})
                     .select([col('date'), col('node'), col('root_partition_id'), col('leaf_partition_id')])
                     .withColumn("root_partition_id", F.col("root_partition_id") + 1)
                     .withColumn("leaf_partition_id", F.col("leaf_partition_id") + 1)
                     .cache()
                    )
  except:
    metric_partition = False
    bad_flag = True
    e = 'Failed to calculate root and leaf partitions.'
    # logging_output = output_log(logging_output, "StdErr", str(e))
    # logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')

else:
  print('not calculating metric_partition')

StatementMeta(spark1, 44, 15, Finished, Available)



Writing partitions and modularity data

In [73]:
# logging_output = output_log(logging_output, "StdOut","Writing partitions and modularity data.")
# logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')

if metric_partition and not bad_flag:
  modularity_df.write.mode('overwrite').option('header', 'true').parquet(os.path.join(ResultBlobPath, 'modularity.parquet'))
  partitions_df.write.mode('overwrite').option('header', 'true').parquet(os.path.join(ResultBlobPath, 'partitions.parquet'))
else:
  print('not calculating metric_partition')

StatementMeta(spark1, 44, 16, Finished, Available)



Reading partitions and modularity data

In [74]:
# logging_output = output_log(logging_output, "StdOut","Reading partitions and modularity data.")
# logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')

if metric_partition and not bad_flag:
  modularity_df = spark.read.parquet(os.path.join(ResultBlobPath, 'modularity.parquet'))
  print(modularity_df.count())
  partitions_df = spark.read.parquet(os.path.join(ResultBlobPath, 'partitions.parquet'))
  print(partitions_df.count())

else:
  print('not calculating metric_partition')

StatementMeta(spark1, 44, 17, Finished, Available)

29
7908

In [75]:
display(modularity_df)

StatementMeta(spark1, 44, 18, Finished, Available)

SynapseWidget(Synapse.DataFrame, cc478c04-08de-4052-8bca-348e5b4aa0fd)

Modularity Scores For Root Clusters

In [76]:
if metric_partition and not bad_flag:
  # calculate modularity score for each root_partition cluster for the last month
  last_month = '-'.join(x for x in EndDate.split('-')[:-1])
  # read data
  df_temp = spark.read.parquet(os.path.join(ResultBlobPath, 'graph_lcc.parquet')).filter(F.col("year_month")==last_month)
  partitions_list = (spark.read.parquet(os.path.join(ResultBlobPath, 'partitions.parquet'))
                     .withColumnRenamed("date", "year_month")
                     .filter(F.col("year_month")==last_month)
                     .select("year_month", "node", "root_partition_id").collect()
                  )

  ### Creating Partitions Dictionary
  # get dict where k, v = node_id, partition_id
  partition_dict = {}
  for node in partitions_list:
    node_id = node['node']
    partition_id = node['root_partition_id']
    partition_dict[node_id] = partition_id

  ### Creating Undirected Graph
  mod_graph = nx.Graph()
  for d, s, t, w in df_temp.collect():
    w = int(w)
    if mod_graph.has_edge(s, t):
      mod_graph[s][t]['weight'] += w
    else:
      mod_graph.add_edge(s, t, weight=w)

  # get each clusters contribution to modularity score
  modularity_dict = gc.partition.modularity_components(mod_graph, partition_dict)

  cluster_scores_df = (spark.createDataFrame(pd.DataFrame({'root_partition_id': list(modularity_dict.keys()),  'cluster_modularity': list(modularity_dict.values())}))
                       .withColumn("year_month", F.lit(last_month))
                       .select("year_month", "root_partition_id", "cluster_modularity")
                      )
  cluster_scores_df.write.mode('overwrite').option('header', 'true').parquet(os.path.join(ResultBlobPath, 'modularity_scores.parquet'))

else:
  print('not calculating modularity components')

StatementMeta(spark1, 44, 19, Finished, Available)

  'JavaPackage' object is not callable
Attempting non-optimization as 'spark.sql.execution.arrow.fallback.enabled' is set to true.

Freedom metric is not processed because we don't have person historical file, but the code would go here

Calculating ARI (network level)

In [77]:
import collections

if metric_ari and not bad_flag:
  # create dictionary 
  partitions_by_date = collections.defaultdict(lambda: collections.defaultdict(dict))

  partitions = partitions_df.select(['node', 'root_partition_id', 'leaf_partition_id', 'date']).collect()

  for row in partitions:
    partitions_by_date[row['date']][row['node']] = (int(row['root_partition_id']), int(row['leaf_partition_id']))

else:
  print('Not calculating ARI')

StatementMeta(spark1, 44, 20, Finished, Available)



In [78]:
def get_ari(partitions_by_date):
  from sklearn.metrics.cluster import adjusted_rand_score
  results = []

  dates = sorted(partitions_by_date.keys())

  for i in range(1, len(dates)):
    partitions_for_first_graph = partitions_by_date[dates[i-1]] # node_id : partition_id, key : value
    partitions_for_second_graph = partitions_by_date[dates[i]]  # node_id : partition_id, key : value

    nodes_in_first_graph = set(partitions_for_first_graph.keys())
    nodes_in_second_graph = set(partitions_for_second_graph.keys())

    nodes_in_first_reduced = []
    nodes_in_second_reduced = []

    for node in nodes_in_first_graph:
      if node in nodes_in_second_graph:
        # element 0 is root partition, element 1 is the leaf partition
        # this can be simplified by only storing the root partition, had both for testing
        nodes_in_first_reduced.append(partitions_for_first_graph[node][0])
        nodes_in_second_reduced.append(partitions_for_second_graph[node][0])

    ari = adjusted_rand_score(nodes_in_first_reduced, nodes_in_second_reduced)
    intersection_length = len(nodes_in_first_reduced)

    results.append((dates[i], dates[i-1], ari, intersection_length))
  return results

# logging_output = output_log(logging_output, "StdOut","Calculating network ARI.")
# logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')

if metric_ari and not bad_flag:
  try:
    results = get_ari(partitions_by_date)
    ari_df = spark.createDataFrame(results, ['year_month', 'comparison_date', 'root_ari', 'root_count'])
    ari_df.write.mode('overwrite').option('header', 'true').parquet(os.path.join(ResultBlobPath, 'network_ari.parquet'))
  except:
    metric_ari = False
    e = 'Failed to calculate ARI.'
    # logging_output = output_log(logging_output, "StdErr", str(e))
    # logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')

else:
  print('Not calculating ARI')

StatementMeta(spark1, 44, 21, Finished, Available)



Reading ARI data

In [79]:
# logging_output = output_log(logging_output, "StdOut","Reading ARI data.")
# logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')

if metric_ari and not bad_flag:
  ari_df = spark.read.parquet(os.path.join(ResultBlobPath, 'network_ari.parquet'))
  print(ari_df.count())

else:
  print('not calculating metric_ari')

StatementMeta(spark1, 44, 22, Finished, Available)

28

In [80]:
display(ari_df)

StatementMeta(spark1, 44, 23, Finished, Available)

SynapseWidget(Synapse.DataFrame, 91adabfd-a2f8-4641-96f9-f29d001e6bcb)

Fluidity

In [81]:
import os
from pyspark.sql.types import *

# generate current and current - 1 distance
from pyspark.sql.functions import lit

# logging_output = output_log(logging_output, "StdOut","Preparing data for fluidity calculations.")
# logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')

if metric_fluidity and not bad_flag:
  try:
    dates = sorted([row.year_month for row in df.select(df.year_month).distinct().collect()])

    def get_previous_month(date_string):
      import datetime
      d = datetime.datetime.strptime(date_string, '%Y-%m')

      return d.replace(
            year=d.year if d.month > 1 else d.year - 1,
            month=d.month - 1 if d.month > 1 else 12,
            day=1
        ).strftime('%Y-%m')

    union = None

    for date in dates[1:]:  
      # use 4 week offset instead of week offset
      # use baseline average for 4-8 weeks before our current time step

      # baseline -> distance.parquet
      current_month = df.where(df.year_month == date).withColumn('graph_for_date', lit(date)).withColumn('is_previous', lit(0)) # current month
      previous_month = df.where(df.year_month == get_previous_month(date)).withColumn('graph_for_date', lit(date)).withColumn('is_previous', lit(1)) # current - 1

      temp = current_month.union(previous_month)

      if not union:
        union = temp
      else:
        union = union.union(temp)

    union = union.cache()
    union.count()
    
  except Exception as e:
    metric_fluidity = False
    m = 'Failed to pre-process data for fluidity.'
    print(m)
    print(e)
    # logging_output = output_log(logging_output, "StdErr", str(e))
    # logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')


else:
  print("Not calculating fluidity.")

  
schema = StructType([
    StructField("year_month", StringType(), True),
    StructField("node", StringType(), True),
    StructField("prevmagnitude", StringType(), True),
    StructField("currmagnitude", StringType(), True),
    StructField("previous_vector", ArrayType(FloatType()), True),
    StructField("current_vector", ArrayType(FloatType()), True),
    StructField("cosine_distance", FloatType(), True),
    StructField("euc_distance", FloatType(), True)
])

StatementMeta(spark1, 44, 24, Finished, Available)



In [82]:
def calculate_fluidity(row, spectype, directed):
    import networkx as nx
    import topologic as tc
    import numpy as np
    date = row[0]

    print('fluidity: ' + str(date))
    print(spectype)
    print(directed)
    if directed == 'di':
      current = nx.DiGraph()
      previous = nx.DiGraph()
    else:
      current = nx.Graph()
      previous = nx.Graph()

    result = []
    for s, t, w, is_previous in row[1]:
      if int(is_previous) == 1:
        if previous.has_edge(s, t):
          previous[s][t]['weight'] += int(w)
        else:
          previous.add_edge(s, t, weight=int(w))
      else:
        if current.has_edge(s, t):
          current[s][t]['weight'] += int(w)
        else:
          current.add_edge(s, t, weight=int(w))


    embedding_containers = tc.embedding.omnibus_embedding([current, previous], svd_seed=123, embedding_method=spectype)

    current_embedding = embedding_containers[0][0].to_dictionary() # dict node_id -> vector
    previous_embedding = embedding_containers[0][1].to_dictionary() # dict node_id -> vector

    for node in previous_embedding.keys(): # it is arbitrary whether we use previous or current as they must contain the same set of nodes
      yield (date, 
             node, 
             str(np.linalg.norm(previous_embedding[node])),
             str(np.linalg.norm(current_embedding[node])),
             previous_embedding[node].tolist(), 
             current_embedding[node].tolist(), 
             tc.embedding.distance.cosine(previous_embedding[node], current_embedding[node]).tolist(),
             tc.embedding.distance.euclidean(previous_embedding[node], current_embedding[node])
            )

StatementMeta(spark1, 44, 25, Finished, Available)



In [83]:
import topologic as tc

# logging_output = output_log(logging_output, "StdOut","Calculating fluidity.")
# logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')

if metric_fluidity and not bad_flag: 
  try:
    sc.setJobDescription("Organization Insights: Calculating fluidity.")
    fluidity_rdd = union.select(union.graph_for_date, union.Source, union.Target, union.weight, union.is_previous).rdd.map(lambda row: (row[0], (row[1], row[2], row[3], row[4])))
    fluidity_rdd_grouped = fluidity_rdd.groupByKey().flatMap(lambda x: calculate_fluidity(x, tc.embedding.EmbeddingMethod.LAPLACIAN_SPECTRAL_EMBEDDING, 'un')).cache() 
    fluidity_df = spark.createDataFrame(fluidity_rdd_grouped, schema=schema).cache()
    fluidity_df.repartition(1000).write.mode('overwrite').parquet(os.path.join(ResultBlobPath, 'lse_ungraph_distance_monthly.parquet')) #workgroup fluidity
    print('lse_ungraph')
    
    fluidity_rdd = union.select(union.graph_for_date, union.Source, union.Target, union.weight, union.is_previous).rdd.map(lambda row: (row[0], (row[1], row[2], row[3], row[4])))
    fluidity_rdd_grouped = fluidity_rdd.groupByKey().flatMap(lambda x: calculate_fluidity(x, tc.embedding.EmbeddingMethod.ADJACENCY_SPECTRAL_EMBEDDING, 'di')).cache()
    fluidity_df = spark.createDataFrame(fluidity_rdd_grouped, schema=schema).cache()
    fluidity_df.repartition(1000).write.mode('overwrite').parquet(os.path.join(ResultBlobPath, 'ase_digraph_distance_monthly.parquet')) # egocentric fluidity
    print('ase_digraph')
    
  except:
    metric_fluidity = False
#     bad_flag = True
    print('Cannot calculate fluidity.')
    e = "Cannot calculate fluidity. May not have same connections month-to-month. Try lowering interactions threshold."
    # logging_output = output_log(logging_output, "StdErr", str(e))
    # logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')

else:
  print('not calculating fluidity')

StatementMeta(spark1, 44, 26, Finished, Available)

lse_ungraph
ase_digraph

In [84]:
if metric_fluidity and not bad_flag:
#   logging_output = output_log(logging_output, "StdOut","Writing out fluidity.")
#   logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')

  # get euc distance for egocentric_fluidity
  asedigraph = spark.read.parquet(os.path.join(ResultBlobPath, 'ase_digraph_distance_monthly.parquet'))

  # get cos distance for globally influenced fluidity
  undigraph = spark.read.parquet(os.path.join(ResultBlobPath, 'lse_ungraph_distance_monthly.parquet'))

  fluidity_df = (asedigraph.withColumnRenamed("euc_distance", "egocentric_fluidity")
                 .join(undigraph.select("year_month", "node", "cosine_distance").withColumnRenamed("cosine_distance", "globally_influenced_fluidity"), 
                                 ['year_month', 'node'], 'inner')
                 .drop('current_vector').drop('previous_vector').drop("cosine_distance")
                )
  # save
  fluidity_df.write.mode('overwrite').parquet(os.path.join(ResultBlobPath, 'fluidity_monthly.parquet'))


else:
  print('not calculating fluidity')

StatementMeta(spark1, 44, 27, Finished, Available)



Clustering Coefficient (Collab Cohesion)

In [85]:
from pyspark.sql.types import *

def calculate_clustering_coefficient(row):
  import networkx as nx
  import topologic as tc
  try:
    date = row[0]

    graph = nx.Graph()

    for s, t, w in row[1]:
      w = int(w)
      if graph.has_edge(s, t):
        graph[s][t]['weight'] += w
      else:
        graph.add_edge(s, t, weight=w)

    clustering_coefficients = nx.algorithms.cluster.clustering(graph)

    for node in graph.nodes():
      yield (date, node, float(clustering_coefficients[node]))
  except Exception as e:
    print(e)

# logging_output = output_log(logging_output, "StdOut","Calculating clustering coeff (cohesion).")
# logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')
  
if metric_clustering and not bad_flag:
  try:
    sc.setJobDescription("Organization Insights: Calculating clustering coefficient.")
    monthly_rdd = df.rdd.map(lambda row: (row[0], (row[1:]))).cache()
    coefficient_rdd = monthly_rdd.groupByKey().flatMap(lambda group: calculate_clustering_coefficient(group)).cache()

    schema = StructType([
        StructField("year_month", StringType(), True),
        StructField("Source", StringType(), True),
        StructField("clustering_coefficient", FloatType(), True),
    ])

    coefficient_df = spark.createDataFrame(coefficient_rdd, schema=schema).cache()
    coefficient_df.write.mode('overwrite').parquet(os.path.join(ResultBlobPath, 'clustering_coefficient.parquet'))
  except:
    metric_clustering = False
    e = "Cannot calculate clustering co-eff."
    # logging_output = output_log(logging_output, "StdErr", str(e))
    # logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')

else:
  print('not calculating clustering')

StatementMeta(spark1, 44, 28, Finished, Available)



In [86]:
if metric_clustering and not bad_flag:
  coefficient_df = spark.read.parquet(os.path.join(ResultBlobPath, 'clustering_coefficient.parquet'))
else:
  print('not calculating clustering')

StatementMeta(spark1, 44, 29, Finished, Available)



Graph Layout (x, y) via UMAP

In [87]:
import numpy as np
import umap

def get_max_date(df):
  """
  get last month in dataset
  """
  year_month = df.agg(F.max(col("year_month"))).toPandas().iloc[0][0]
  return year_month


def get_latest_graph(last_month_df):
  """
  transforms df of last month into graph
  """
  graph_latest_month = nx.Graph()
  for d, s, t, w in last_month_df.collect():
    s = int(s)
    t = int(t)
    w = int(w)
    if graph_latest_month.has_edge(s, t):
      graph_latest_month[s][t]['weight'] += w
    else:
      graph_latest_month.add_edge(s, t, weight=w)
  
  return graph_latest_month

def get_embeddings(latest_nodes, last_month):
  """
  pulls embedding from lse ungraph
  """
  print('pulling embeddings')
  lse_df = (spark.read.parquet(os.path.join(ResultBlobPath, 'lse_ungraph_distance_monthly.parquet'))
            .filter(F.col('year_month') == last_month)
            .select("year_month", "node", "current_vector")
           )
  
  embedding_list = []
  node_order = []

  for row in lse_df.collect():
    node_id = int(row['node'])

    if node_id in latest_nodes:
      node_order.append(int(row['node']))
      embedding_list.append(row['current_vector'])

  embedding = np.array(embedding_list)
  
  return embedding, node_order


def generate_points(embedding, params=None):
  """
  reduces embeddings to 2D via UMAP
  """
  if params == None:
    min_dist, n_neighbors = 0.99, 25
  else:
    min_dist, n_neighbors = params
  random_seed = 42
  
  print('running UMAP')
  points = umap.UMAP(
      min_dist=min_dist,
      n_neighbors=n_neighbors,
      random_state=random_seed).fit_transform(embedding)
  
  return points

def get_node_positions(graph_latest_month, node_order, points, adjust_overlaps=False):
  import graspologic as gp

  print('adjusting node positions')
  node_positions = gp.layouts.auto._node_positions_from(
    graph=graph_latest_month,
    labels=node_order,
    down_projection_2d=points,
    random_seed=42,
    adjust_overlaps=adjust_overlaps
  )
  
  return node_positions

def create_layout_df(node_positions):
  """
  transforms node_positions into layout_df
  """
  print('converting into df')
  node_ids = [node.node_id for node in node_positions]
  xs = [node.x for node in node_positions]
  ys = [node.y for node in node_positions]

  layout_df = (spark.createDataFrame(pd.DataFrame({'node': node_ids, 'x': xs, 'y': ys}))
               .withColumn("year_month", F.lit(last_month))
               .select("year_month", "node", "x", "y")
              )
  return layout_df



if not bad_flag and metric_xy == True:
  try:
    sqlContext.clearCache()
    # logging_output = output_log(logging_output, "StdOut","Calculating x, y coordinates via UMAP.")
    # logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')
    
    # load df
    df = spark.read.parquet(os.path.join(ResultBlobPath, 'graph_lcc.parquet')).cache()
    # find last month of dataset
    last_month = get_max_date(df)
    # filter interactions to last month
    last_month_df = (df
                     .filter(F.col("year_month")==last_month)
                    )
    # get graph of latest month
    graph_latest_month = get_latest_graph(last_month_df)
    latest_nodes = set(graph_latest_month.nodes())
    num_nodes = len(latest_nodes)
    print(f'Number of nodes: {num_nodes}')
    # set params based on number of nodes
    if num_nodes > 50000:
      adjust_overlaps = False
      params = [0.99, 20]
    else:
      adjust_overlaps = True
      params = [0.5, 50]
    # get embedding
    # logging_output = output_log(logging_output, "StdOut","UMAP - getting embeddings.")
    # logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')
    embedding, node_order = get_embeddings(latest_nodes, last_month)
    # generate first x, y coordinates via umap
    # logging_output = output_log(logging_output, "StdOut","UMAP - applying UMAP.")
    # logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')
    points = generate_points(embedding, params=params)
    # get adjusted node positions
    # logging_output = output_log(logging_output, "StdOut","UMAP - redjusting datapoints.")
    # logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')
    node_positions = get_node_positions(graph_latest_month, node_order, points, adjust_overlaps)
    # create df
    layout_df = create_layout_df(node_positions)
    # save df
    # logging_output = output_log(logging_output, "StdOut","UMAP - saving results.")
    # logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')
    layout_df.repartition(100).write.mode('overwrite').parquet(os.path.join(ResultBlobPath, 'layout_UMAP.parquet'))
    
  except Exception as e:
    print('layout failed')
    print(e)
    # logging_output = output_log(logging_output, "StdErr", 'Layout failed.')
    # logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')
    metric_xy = False
    # logging_output = output_log(logging_output, "StdErr", str(e))
    # logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')
    
else:
  print('Not calculating UMAP layout.')

StatementMeta(spark1, 44, 30, Finished, Available)

Number of nodes: 200
pulling embeddings
running UMAP
adjusting node positions
converting into df
  'JavaPackage' object is not callable
Attempting non-optimization as 'spark.sql.execution.arrow.fallback.enabled' is set to true.

Preparing to Combine Results

In [88]:
### create an modularity df
schema = StructType([
      StructField("year_month", StringType(), True),
      StructField("root_modularity", LongType(), True)
  ])

empty_modularity_df = spark.sparkContext.parallelize([]).toDF(schema)

### create empty partitions df
schema = StructType([
    StructField("year_month", StringType(), True),
    StructField("person_id",StringType(),True),
    StructField("root_partition_id",LongType(),True),
    StructField("leaf_partition_id",LongType(),True)
])

empty_partitions_df = spark.sparkContext.parallelize([]).toDF(schema)

### create empty workgroup modularity
schema = StructType([
    StructField("year_month", StringType(), True),
    StructField("root_partition_id",LongType(),True),
    StructField("workgroup_modularity_component", LongType(), True)
])

empty_cluster_scores_df = spark.sparkContext.parallelize([]).toDF(schema)

### create empty Freedom df
schema = StructType([
    StructField("year_month", StringType(), True),
    StructField("root_partition_id",LongType(),True),
    StructField("freedom_to_collaborate",LongType(),True),
])

empty_freedom_df = spark.sparkContext.parallelize([]).toDF(schema)

### create empty ARI df
schema = StructType([
    StructField("year_month", StringType(), True),
    StructField("root_ari",LongType(),True),
])

empty_ari_df = spark.sparkContext.parallelize([]).toDF(schema)


### create empty clustering coef df
schema = StructType([
    StructField("year_month", StringType(), True),
    StructField("person_id",StringType(),True),
    StructField("clustering_coefficient",FloatType(),True)
])

empty_coefficient_df = spark.sparkContext.parallelize([]).toDF(schema)

### create empty fluidity df
schema = StructType([
    StructField("year_month", StringType(), True),
    StructField("person_id",StringType(),True),
    StructField("prevmagnitude",StringType(),True),
    StructField("currmagnitude",StringType(),True),
    StructField("egocentric_fluidity",FloatType(),True),
    StructField("globally_influenced_fluidity",FloatType(),True)
])

empty_fluidity_df = spark.sparkContext.parallelize([]).toDF(schema)


### create an empty graph layout df
schema = StructType([
    StructField("year_month", StringType(), True),
    StructField("person_id", IntegerType(), True),
    StructField("x", FloatType(), True),
    StructField("y", FloatType(), True)
])

empty_layout_df = spark.sparkContext.parallelize([]).toDF(schema)

StatementMeta(spark1, 44, 31, Finished, Available)



In [89]:
sqlContext.clearCache()

StatementMeta(spark1, 44, 32, Finished, Available)



Consolidate Dataframes

In [90]:
import os

# logging_output = output_log(logging_output, "StdOut","Loading dataframes.")
# logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')
sc.setJobDescription("Organization Insights: Joining calculated metrics dataframes together")

# MODULARITY
if metric_partition and not bad_flag:
  modularity_df = (spark.read.parquet(os.path.join(ResultBlobPath, 'modularity.parquet'))
                   .withColumnRenamed('date', 'year_month')
                   .select("year_month", "root_modularity")
                   .cache()
                  )
  print(f'modularity count: {modularity_df.count()}')
  
else:
  modularity_df = empty_modularity_df
  print('using dummy modularity df')

# PARTITIONS
if metric_partition and not bad_flag:
  partitions_df = (spark.read.parquet(os.path.join(ResultBlobPath, 'partitions.parquet'))
                   .withColumnRenamed('date', 'year_month')
                   .withColumnRenamed("node", "person_id")
                   .cache()
                  )
  print(f'partitions count: {partitions_df.count()}')
else:
  partitions_df = empty_partitions_df
  print('using dunmy partitions')

# Cluster (workgroup) Modularity score
if metric_partition and not bad_flag:
  cluster_scores_df = (spark.read.parquet(os.path.join(ResultBlobPath, 'modularity_scores.parquet'))
                       .withColumnRenamed("cluster_modularity", "workgroup_modularity_component")
                       .cache()
                      )
  print(f'workgroup modularity count: {cluster_scores_df.count()}')
else:
  cluster_scores_df = empty_cluster_scores_df
  print('using dummy workgroup modularity')
  
# Freedom
if metric_freedom and not bad_flag:
#   freedom_df = (spark.read.parquet(os.path.join(ResultBlobPath, 'freedom_partition.parquet'))
#                    .select("year_month", "root_partition_id", "freedom_to_collaborate")
#                    .cache()
#                   )
#   print(f'freedom count: {freedom_df.count()}')
  freedom_df = empty_freedom_df
  print('using dummy freedom df')
  
else:
  freedom_df = empty_freedom_df
  print('using dummy freedom df')

# ARI
if metric_ari and not bad_flag:
  ari_df = (spark.read.parquet(os.path.join(ResultBlobPath, 'network_ari.parquet'))
                   .select("year_month", "root_ari")
                   .cache()
                  )
  print(f'ARI count: {ari_df.count()}')
  
else:
  ari_df = empty_ari_df
  print('using dummy ARI df')
  
# CLUSTERING (cohesion)
if metric_clustering and not bad_flag:
  coefficient_df = spark.read.parquet(os.path.join(ResultBlobPath, 'clustering_coefficient.parquet')).withColumnRenamed('Source', 'person_id').cache()
  print(f'clustering co-eff count: {coefficient_df.count()}')
else:
  coefficient_df = empty_coefficient_df
  print('using dummy coefficient')

# FLUIDITY (stability)
if metric_fluidity and not bad_flag:
  fluidity_df = spark.read.parquet(os.path.join(ResultBlobPath, 'fluidity_monthly.parquet')).withColumnRenamed('node', 'person_id').cache()
  print(f'fluidity count: {fluidity_df.count()}')
else:
  fluidity_df = empty_fluidity_df
  print('using dummy fluidity')

# LAYOUT
if metric_xy and not bad_flag:
  layout_df = (spark.read.parquet(os.path.join(ResultBlobPath, 'layout_UMAP.parquet'))
               .withColumnRenamed("node", "person_id")
               .cache()
              )
  print(f'layout count: {layout_df.count()}')
else:
  layout_df = empty_layout_df
  print('using dummy layout')


# logging_output = output_log(logging_output, "StdOut","Joining dataframes.")
# logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')

if not bad_flag:
  try:
    node_metrics_df  = (partitions_df
                        .join(modularity_df, ['year_month'], 'left')
                        .join(cluster_scores_df, ['year_month', 'root_partition_id'], 'left')
                        .join(ari_df, ['year_month'], 'left')
                        .join(coefficient_df, ['year_month', 'person_id'], 'left')
                        .join(fluidity_df, ['year_month', 'person_id'], 'left')
                        .join(layout_df, ['year_month', 'person_id'], 'left')
                        .join(freedom_df, ['year_month', 'root_partition_id'], 'left')
                        .fillna({'egocentric_fluidity' : -1, 'globally_influenced_fluidity': -1, 'clustering_coefficient' : -1,
                                'x': -1, 'y': -1, 'root_ari': -1, 'freedom_to_collaborate': -1, 'workgroup_modularity_component':-1})
                        .select(["year_month", "person_id", 
                                 "root_partition_id", "leaf_partition_id", "root_modularity", "root_ari",
                                 "workgroup_modularity_component", "egocentric_fluidity", "globally_influenced_fluidity",
                                 "freedom_to_collaborate", "clustering_coefficient", "x", "y"])
                        .withColumn("MonthStartDate", F.concat(F.col("year_month"), F.lit("-01"))) 
                        .withColumn("MonthEndDate", F.last_day(F.col("MonthstartDate")))
                        .withColumnRenamed("person_id", "pid")
                        .drop("year_month")
                       )
    
    if metric_fluidity == False:
      print('placing dummy values for fluidity')
      node_metrics_df = node_metrics_df.withColumn("egocentric_fluidity", F.lit(0.01))

    if metric_xy == False:
      print('placing dummy values for X and Y')
      node_metrics_df = node_metrics_df.withColumn("x", F.rand() * F.lit(100))
      node_metrics_df = node_metrics_df.withColumn("y", F.rand() * F.lit(100))

    if metric_clustering == False:
      print('placing dummy values for clustering coefficient')
      node_metrics_df = node_metrics_df.withColumn("clustering_coefficient", F.lit(0.01))

    node_metrics_df.repartition(1000).write.mode('overwrite').parquet(os.path.join(ResultBlobPath, 'node_metrics.parquet'))

  except Exception as e:
    print(e)
    # logging_output = output_log(logging_output, "StdErr", str(e))
    # logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')
else:
  e = 'FATAL ERROR. Unable to calculate any Org Insight values. Will produce dummy values.'
#   logging_output = output_log(logging_output, "StdErr", str(e))
#   logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')

StatementMeta(spark1, 44, 33, Finished, Available)

modularity count: 29
partitions count: 7908
workgroup modularity count: 29
using dummy freedom df
ARI count: 28
clustering co-eff count: 7908
fluidity count: 7921
layout count: 200

Update metrics-personal for Download

In [91]:
# def df_output_blob(df, extension, outFolder):
#     outPath = ResultBlobPath + outFolder + "/"
#     if extension == 'csv':
#       df.repartition(1).write.csv(outPath, header='true', mode='overwrite', escape="\"")
#     elif extension == 'json':
#       df.repartition(1).write.json(outPath)
    
#     # Copy file from outFolder to central working directory
#     try:
#       fullLS = mssparkutils.fs.ls(outPath)
#       for i in fullLS:
#         if 'part-00000' in i.name:
#           outFileName = i.name
#           outFileLocation = i.path
#           newFileLocation = ResultBlobPath + outFolder + '.' + extension
#           mssparkutils.fs.mv(outFileLocation, newFileLocation, True)
#           print ('File moved successfully: ', newFileLocation)
#     except Exception as e:
#       print ("Error moving file. Error: ", e) 
  
#     # clean up old files
#     try:
#       mssparkutils.fs.rm(outPath , True)
#       print ('Work Folder deleted: ', outPath)
#     except Exception as e:
#       print ("Error Deleting work File or Folder. Error: ", e)
      

StatementMeta(spark1, 44, 34, Finished, Available)



In [92]:
### writing out metrics-personal
if not bad_flag:
#   logging_output = output_log(logging_output, "StdOut","Appending metrics to metrics-personal.")
#   logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')
  sc.setJobDescription("Organization Insights: Appending metrics to metrics-personal")
  
  node_metrics_df = spark.read.parquet(os.path.join(ResultBlobPath, 'node_metrics.parquet')).cache()

  metrics_personal_temp = (node_metrics_df
                           .fillna({"root_partition_id":-1, "leaf_partition_id":-1, "root_modularity":-1, "workgroup_modularity_component":-1, 
                                    "root_ari":-1,"egocentric_fluidity":-1, "globally_influenced_fluidity":-1, 
                                    "clustering_coefficient":-1, "x":-1, "y":-1, "freedom_to_collaborate":-1})
                          )

  df_output_blob(metrics_personal_temp, 'csv', 'metrics-personal')

else:
#   logging_output = output_log(logging_output, "StdOut","Not appending metrics to metrics-personal as metrics could not be calculated.")
#   logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')
  print("Cannot calculate Group Organization metrics. Either LCC = 0 or less than 2 months of data.")

StatementMeta(spark1, 44, 35, Finished, Available)

File moved successfully:  abfss://users@dopsis.dfs.core.windows.net/ona/MGDC_Insights/metrics-personal.csv
Work Folder deleted:  abfss://users@dopsis.dfs.core.windows.net/ona/MGDC_Insights/metrics-personal/

Identifying groups below threshold

In [93]:
# if not bad_flag:
#   ### find any group that is below min size...
#   group_thresholds = (spark.read.format('csv').options(header='true',inferSchema='true', escape='\"').load(ResultBlobPath + "/groupmetrics.csv")
#                   .filter(F.col("monthenddate")==EndDate)
#                   .select("grouping attribute", "grouping value", "groupsize")
#                   .withColumn("belowThreshold", F.when(F.col("groupsize")<minGroupSize, lit(1)).otherwise(lit(0)))
#                   .filter(F.col("belowThreshold")== 1)
#                  )

#   below_threshold_groups = group_thresholds.select("grouping attribute", "grouping value").distinct().collect()
#   btg_count = len(below_threshold_groups)
#   message = f"groups below threshold: {btg_count}."
#   print(message)

#   logging_output = output_log(logging_output, "StdOut", message)
#   logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')
  
# else:
#   print('bad flag is set to true. not running cell.')

StatementMeta(spark1, 44, 36, Finished, Available)



Rescaling values for org_insights_personal

In [94]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

# def normalize_degrees(df, p_=2):
#   df_temp = df.select("monthstartdate", "monthenddate", "pid", "phid", "degree")
#   # get min-max values
#   max_min_vals = df_temp.groupBy().agg(F.max("degree").alias('max_degree'), F.min("degree").alias("min_degree")).collect()
#   max_val = max_min_vals[0]['max_degree']
#   min_val = max_min_vals[0]['min_degree']
#   max_min_diff = max_val - min_val
  
#   df_temp2 = (df_temp
#               .withColumn("max_degree", F.lit(max_val))
#               .withColumn("min_degree", F.lit(min_val))
#               .withColumn("max_min_diff", F.lit(max_min_diff))
#               .withColumn("normalized_degree", (p_ * (F.col("degree") - F.col("min_degree")))/F.col("max_min_diff") + 1  )
#               .select("monthstartdate", "monthenddate", "pid", "phid", "degree", "normalized_degree")
#              )
#   return df_temp2


def scale_column(df, col):
  """scale stability, connectivity and cohesion"""
#   temp_df = df.select("monthstartdate", "monthenddate", "pid", "phid", col).filter(F.col(col) != -1)
  temp_df = df.select("monthstartdate", "monthenddate", "pid", col).filter(F.col(col) != -1)  
  # UDF for converting column type from vector to double type
  unlist = udf(lambda x: round(float(list(x)[0]),3), DoubleType())

  # scaling column
  i = col
  # VectorAssembler Transformation - Converting column to vector type
  assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")

  # MinMaxScaler Transformation
  scaler = MinMaxScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")

  # Pipeline of VectorAssembler and MinMaxScaler
  pipeline = Pipeline(stages=[assembler, scaler])

  # Fitting pipeline on dataframe
  temp_df = pipeline.fit(temp_df).transform(temp_df).withColumn(i+"_Scaled", unlist(i+"_Scaled")).drop(i+"_Vect")
  return temp_df

def scale_two_columns(df, columns = ["egocentric_fluidity", "cluster_modularity"]):
  """
  scale stability, connectivity and join back together
  
  returns scaled values
  """
  df_all = None
  for indx, c in enumerate(columns):
    temp_df = scale_column(df, c)
    if indx == 0:
      df_all = temp_df
    else:
    #   df_all = df_all.join(temp_df, ["monthstartdate", "monthenddate", "pid", "phid"], "outer")
      df_all = df_all.join(temp_df, ["monthstartdate", "monthenddate", "pid"], "outer")
  df_all = (df_all.fillna(-1))
  return df_all

# logging_output = output_log(logging_output, "StdOut","Scaling Results for Heat Map.")
# logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')

if not bad_flag:
  try:
    # read metrics_personal_temp
    metrics_personal_temp = (spark.read.csv(os.path.join(ResultBlobPath, 'metrics-personal.csv'), header=True, inferSchema=True)
                             .filter(F.col("MonthEndDate")==EndDate)
                             .cache()
                            )
    metrics_personal_temp.count()

    # replace root modularity (network) with cluster modularity
    cluster_scores_df = spark.read.parquet(os.path.join(ResultBlobPath, 'modularity_scores.parquet'))
    metrics_personal_temp = metrics_personal_temp.join(cluster_scores_df, ['root_partition_id'], 'inner')

    # scale degrees
    # df_degree_temp = normalize_degrees(metrics_personal_temp)

    # scale egocentric_fluidity and root_modularity
    scaled_df = scale_two_columns(metrics_personal_temp)

    # metrics_personal_temp2 = (metrics_personal_temp.drop("degree", "egocentric_fluidity", "cluster_modularity", "root_modularity")
    #                           .join(df_degree_temp, ['monthstartdate', 'monthenddate', 'pid', 'phid'], 'inner')
    #                           .join(scaled_df, ['monthstartdate', 'monthenddate', 'pid', 'phid'], 'inner')
    #                           .withColumn("clustering_coefficient_Scaled", F.sqrt(F.col("clustering_coefficient")))
    #                          )

    metrics_personal_temp2 = (metrics_personal_temp.drop("degree", "egocentric_fluidity", "cluster_modularity", "root_modularity")
                              .join(scaled_df, ['monthstartdate', 'monthenddate', 'pid'], 'inner')
                              .withColumn("clustering_coefficient_Scaled", F.sqrt(F.col("clustering_coefficient")))
                             )

    # metrics_personal_temp2_save = (metrics_personal_temp2
    #                                .select("pid", "phid", "monthstartdate", "monthenddate", "degree", "normalized_degree", "egocentric_fluidity", "egocentric_fluidity_Scaled",
    #                                        "clustering_coefficient", "clustering_coefficient_Scaled", "cluster_modularity", "cluster_modularity_Scaled")
    #                               )
    
    metrics_personal_temp2_save = (metrics_personal_temp2
                                   .select("pid", "monthstartdate", "monthenddate","egocentric_fluidity", "egocentric_fluidity_Scaled",
                                           "clustering_coefficient", "clustering_coefficient_Scaled", "cluster_modularity", "cluster_modularity_Scaled")
                                  )

    # save output with everything
    metrics_personal_temp2_save.repartition(1000).write.mode('overwrite').parquet(os.path.join(ResultBlobPath, 'org_insights_scaled_original.parquet'))

    # use fluidity, clustering coefficient and root partition id modularity
    metrics_personal_temp3 = (metrics_personal_temp2
                              .drop("egocentric_fluidity", "clustering_coefficient", "cluster_modularity", "degree")
                              .withColumnRenamed("egocentric_fluidity_Scaled", "egocentric_fluidity")
                              .withColumnRenamed("clustering_coefficient_Scaled", "clustering_coefficient")
                              .withColumnRenamed("cluster_modularity_Scaled", "root_modularity")
                              .withColumnRenamed("normalized_degree", "degree")
                             )
    metrics_personal_temp3 = (metrics_personal_temp2
                              .drop("egocentric_fluidity", "clustering_coefficient", "cluster_modularity")
                              .withColumnRenamed("egocentric_fluidity_Scaled", "egocentric_fluidity")
                              .withColumnRenamed("clustering_coefficient_Scaled", "clustering_coefficient")
                              .withColumnRenamed("cluster_modularity_Scaled", "root_modularity")
                             )
  except Exception as e:
    print(e)
    # logging_output = output_log(logging_output, "StdErr", str(e))
    # logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')
    
else:
  print('bad flag is set to true. not running cell.')

StatementMeta(spark1, 44, 37, Finished, Available)

200

Write out org_insights_personal

In [95]:
# logging_output = output_log(logging_output, "StdOut","Writing out results for graph viz - personhistorical, degrees, and x, y, and individual fluidity (of last month).")
# logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')

if not bad_flag:
  ### read personhistorical for column casing
#   personhistorical_cols = spark.read.format('csv').options(header='true',inferSchema='true', escape='\"').load(ResultBlobPath + "/personhistorical.csv").columns
#   org_insight_cols = personhistorical_cols + ['degree', 'root_partition_id', 'root_modularity', 'clustering_coefficient', 'egocentric_fluidity', 'x', 'y']
  org_insight_cols = ['root_partition_id', 'root_modularity', 'clustering_coefficient', 'egocentric_fluidity', 'x', 'y']

  # x and y will only be for last month
  org_insights_personal = (metrics_personal_temp3
                           .filter(F.col("x")!=-1)
                           .filter(F.col("y")!=-1)
                           .filter(F.col("egocentric_fluidity") != -1)
                           .select(*[org_insight_cols])
                          )

  print(org_insights_personal.count())

#   # replace values that are below threshold
#   if btg_count > 0:
#     for btg in below_threshold_groups:
#       col_ = btg['grouping attribute']
#       val_ = btg['grouping value']
#       # hr attributes can already be dropped because theyre below min group size
#       if col_ in hr_attributes:
#         org_insights_personal = (org_insights_personal
#                                  .withColumn(col_, F.when(F.col(col_)==val_, 'groupBelowThreshold').otherwise(F.col(col_)))
#                                 )

#   print(org_insights_personal.count())

else:
#   logging_output = output_log(logging_output, "StdErr","WARNING - SURFACING DUMMY VALUES AS FATAL ERROR OCCURRED.")
#   logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')
  
  org_insights_personal = (spark.read.format('csv').options(header='true',inferSchema='true', escape='\"').load(ResultBlobPath + "/personhistorical.csv")
                           .withColumn("degree", F.lit(1))
                           .withColumn("root_partition_id", F.lit(1))
                           .withColumn("root_modularity", F.lit(0.5))
                           .withColumn("clustering_coefficient", F.lit(0.5))
                           .withColumn("egocentric_fluidity", F.lit(0.5))
                           .withColumn("x", F.rand()* 100)
                           .withColumn("y", F.rand()* 100)
                          )
    
df_output_blob(org_insights_personal, 'csv', 'org_insights_personal')

StatementMeta(spark1, 44, 38, Finished, Available)

200
File moved successfully:  abfss://users@dopsis.dfs.core.windows.net/ona/MGDC_Insights/org_insights_personal.csv
Work Folder deleted:  abfss://users@dopsis.dfs.core.windows.net/ona/MGDC_Insights/org_insights_personal/

In [96]:
sqlContext.clearCache()

StatementMeta(spark1, 44, 39, Finished, Available)



Part 2: Group Level

Enriching Graph for Induced ARI/Partitions

Induced modularity + ARI

In [97]:
def get_graph_and_leaf_partitions(row):
  import networkx as nx
  import graspologic as gc
  import topologic as tc
  import collections
  from datetime import datetime
  
  t1 = row[0]
  
  # build graph - this is UNDIRECTED since it is using Leiden
  dated_graphs = collections.defaultdict(nx.Graph)
 
  for date, s, t, w in row[1]:
    y, m = date.split('-')
    date_typed = datetime(year=int(y), month=int(m), day=1)
    
    graph = dated_graphs[date_typed]
    
    w = int(w)
    if graph.has_edge(s, t):
      graph[s][t]['weight'] += w
    else:
      graph.add_edge(s, t, weight=w)
  
  dated_lccs = dict()
  dates_sorted = sorted(dated_graphs.keys())
  for date in dates_sorted:
    dated_lccs[date] = tc.largest_connected_component(dated_graphs[date])
  
  previous_date = None
  previous_partitions = None
  results = []
  for i, date in enumerate(dates_sorted):
    ari = None
    len_intersection = 0
    
    current_graph = dated_lccs[date]
    
    current_partitions = {
      row.node: row.cluster 
      for row in gc.partition.hierarchical_leiden([(s, t, w) for s, t, w in current_graph.edges(data='weight')], max_cluster_size=250) 
      if row.level == 0
    }
    
    modularity = tc.partition.modularity(current_graph, current_partitions)
      
    if previous_partitions is not None: # we cannot calculate ARI for the first month as we do not have a previous partitioning to compare to
      from sklearn.metrics.cluster import adjusted_rand_score

      #   Take the intersection of the sets first, then calculate ARI
      nodes_in_previous = set(previous_partitions.keys())
      nodes_in_current = set(current_partitions.keys())

      nodes_in_previous_reduced = []
      nodes_in_current_reduced = []
      
      for node in nodes_in_previous:
        if node in nodes_in_current:
          nodes_in_previous_reduced.append(previous_partitions[node])
          nodes_in_current_reduced.append(current_partitions[node])

      ari = adjusted_rand_score(nodes_in_previous_reduced, nodes_in_current_reduced)
      len_intersection = len(nodes_in_previous_reduced)
    
    previous_partitions = current_partitions
#     if ari == None:
#       ari = -1.0
    results.append((date, previous_date, t1, modularity, ari, len_intersection, current_partitions))
    previous_date = date
#     results.append((date, t1, modularity, ari, len_intersection, current_partitions))
    
  return results

def process_ari_rdd(ari_rdd, hr_attr):
  evp_results = ari_modularity_rdd.collect()
  unrolled_results = []
  evp_partitions = []
  for evp_result in evp_results:
    for month_results in evp_result:
      unrolled_results.append(month_results[:6])
      for node, partition in month_results[6].items():
        evp_partitions.append((month_results[0], month_results[2], node, partition))

  len(unrolled_results), len(evp_partitions)
  
  schema = StructType([
    StructField("date", DateType(), True),
    StructField("previous_date", DateType(), True),
    StructField(hr_attr, StringType(), True),
    StructField("root_modularity", FloatType(), True),
    StructField("ari", FloatType(), True),
    StructField("intersection_length", IntegerType(), True)
])

  evp_results_df = spark.createDataFrame(unrolled_results, schema=schema)
  return evp_results_df

StatementMeta(spark1, 44, 40, Finished, Available)



Calculating Induced ARI and Modularity (not included because we don't have group metrics or hr data)

In [98]:
sqlContext.clearCache()

StatementMeta(spark1, 44, 41, Finished, Available)



Organization Insights Group + Network Metrics: Load metrics_monthly_temp and group_data

In [99]:
# logging_output = output_log(logging_output, "StdOut", "Loading group data and joining with individual chi metrics.")
# logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')

if not bad_flag:
#   spark.conf.set(fs_string, token_template.format(".csv"))
#   wasbs_template = "wasbs://%s@%s.blob.core.windows.net/%s/{0}/" % (output_container, StorageAccount, output_folder) 

  group_chi = (spark.read.format('parquet').options(header='true',inferSchema='true', escape='\"')
                  .load(ResultBlobPath + "/node_metrics.parquet")
                  .withColumnRenamed("pid", "pid1")
                  .withColumn("pid1_attribute",col("root_partition_id"))
                  .withColumn("attribute",lit("community"))
                  .filter( (F.col("monthstartdate") >= StartDate) & (F.col("monthenddate") <= EndDate))
                  .cache()
                 )
  group_chi.count()

else:
  print("Cannot calculate Group Organization metrics. Either LCC = 0 or less than 2 months of data.")

StatementMeta(spark1, 44, 42, Finished, Available)

7908

Calculate group level + netwrok level cohesion + stability

In [100]:
# logging_output = output_log(logging_output, "StdOut", "Calculating group level and network level metrics.")
# logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')

if not bad_flag:
  calcMedian = F.expr('percentile_approx(egocentric_fluidity, 0.5)')

  ### GROUP LEVEl
  group_cohesion1 = (group_chi
                     .groupBy("attribute", "pid1_attribute", "monthstartdate", "monthenddate")
                     .agg(F.mean(F.when(F.col("clustering_coefficient")!= -1, F.col("clustering_coefficient"))).alias("group_cohesion"))
                    )
  group_stability1 = (group_chi
                     .filter(F.col("egocentric_fluidity")!= -1)
                     .groupBy("attribute", "pid1_attribute", "monthstartdate", "monthenddate")
                     .agg(calcMedian.alias('group_fluidity'))
                    )
  group_chi2 = (group_cohesion1
                .join(group_stability1, ['attribute', 'pid1_attribute', 'monthstartdate', 'monthenddate'], 'left')
                .filter(F.col("pid1_attribute").isNotNull())
               )

  ### NETWORK LEVEL
  # get network cohesion by average group averages
  network_cohesion = (group_chi2
                      .groupBy("monthstartdate", "monthenddate")
                      .agg(F.mean("group_cohesion").alias("network_cohesion"))
                      )


  # get network stability by getting median of all individuals
  network_stability = (group_chi
                      .groupBy("monthstartdate", "monthenddate")
                      .agg(calcMedian.alias("network_fluidity"))
                      )

  ### Add NETWORK COHESION + STABILITY to groups
  group_chi3 = (group_chi2
                .join(network_cohesion, ["monthstartdate", "monthenddate"], "left")
                .join(network_stability, ["monthstartdate", "monthenddate"], "left")
               )
  
  ### Add NETWORK CONNECTIVITY via Root Modularity 
  connectivity = (group_chi
                  .select("MonthStartDate", "MonthEndDate", "root_modularity")
                  .withColumnRenamed("root_modularity", "network_connections")
                  .distinct()
                 )
  
  ### Add ROOT ARI + ROOT FREEDOM 
  if metric_ari:
    ari_df = (spark.read.format('parquet')
          .options(header='true',inferSchema='true', escape='\"')
          .load(ResultBlobPath + "/network_ari.parquet")
          .withColumn("MonthStartDate", F.concat(F.col("year_month"), F.lit("-01"))) 
          .withColumn("MonthEndDate", F.last_day(F.col("MonthstartDate")))
          .withColumnRenamed("root_ari", "network_ari")
          .select("MonthStartDate", "MonthEndDate", "network_ari")
         )
  else:
    print('Using Dummy Network ARI for Stability')
    schema = StructType([StructField("MonthStartDate", StringType(), True),
                         StructField("MonthEndDate", StringType(), True),
                         StructField("network_ari",LongType(),True)])

    ari_df = spark.sparkContext.parallelize([]).toDF(schema)

#   if metric_freedom:
#     freedom_df = (spark.read.format('parquet')
#                   .options(header='true',inferSchema='true', escape='\"')
#                   .load(ResultBlobPath + "/freedom_network.parquet")
#                   .withColumn("MonthStartDate", F.concat(F.col("year_month"), F.lit("-01"))) 
#                   .withColumn("MonthEndDate", F.last_day(F.col("MonthstartDate")))
#                   .withColumnRenamed("freedom_to_collaborate", "network_freedom")
#                   .select("MonthStartDate", "MonthEndDate", "network_freedom")
#              )
#   else:
#     schema = StructType([StructField("MonthStartDate", StringType(), True),
#                          StructField("MonthEndDate", StringType(), True),
#                          StructField("network_freedom",LongType(),True)])

#     freedom_df = spark.sparkContext.parallelize([]).toDF(schema)

  
  group_data_final = (group_chi3
                        .join(connectivity, ['monthstartdate', 'monthenddate'], "left")
                        .join(ari_df, ["monthstartdate", "monthenddate"], "left")
                        .withColumnRenamed("attribute", "grouping attribute")
                        .withColumnRenamed("pid1_attribute", "grouping value")
                       )
  
#   group_data_final = (group_chi3
#                         .join(connectivity, ['monthstartdate', 'monthenddate'], "left")
#                         .join(ari_df, ["monthstartdate", "monthenddate"], "left")
#                         .join(freedom_df, ["monthstartdate", "monthenddate"], "left")
#                         .withColumnRenamed("attribute", "grouping attribute")
#                         .withColumnRenamed("pid1_attribute", "grouping value")
#                        )
  
  if metric_ari == False:
    # logging_output = output_log(logging_output, "StdErr", "Warning. Using dummy network ARI value for stability.")
    # logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')
    
    group_data_final = group_data_final.withColumn("network_ari", F.lit(0.01))
  
#   if metric_freedom == False:
#     group_data_final = group_data_final.fillna({'network_freedom': -1})
  
  group_data_final.repartition(100).write.mode('overwrite').csv(os.path.join(ResultBlobPath, 'group_data_intermediate.csv'))
                       
else:
  print("Cannot calculate Group Organization metrics. Either LCC = 0 or less than 2 months of data.")

StatementMeta(spark1, 44, 43, Finished, Available)



Writing out Group Metrics for Download

Calculating Monthly Change

In [101]:
# logging_output = output_log(logging_output, "StdOut","Calculating Month over Month Changes")
# logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')

from pyspark.sql.window import *

def get_max_date2(df):
  """
  get last month in dataset
  """
  latest_start = df.agg(F.max(col("monthstartdate"))).toPandas().iloc[0][0]
  return latest_start


if not bad_flag:
  # look only at entries that cover 1 month at a time
  temp_df = (group_data_final
             .fillna({"network_ari": -1})
             .filter(~((col("monthstartdate")==StartDate) & (col("monthenddate")==EndDate)) )
             .withColumn('temp', lit('a'))
             .select("temp", "monthstartdate", "monthenddate", "network_cohesion", "network_ari", "network_connections")
             .withColumnRenamed("network_ari", "network_stability")
             .filter(F.col("network_cohesion").isNotNull())
             .filter(F.col("network_stability").isNotNull())
             .filter(F.col("network_connections").isNotNull())
             .distinct()
            )

  month_attr = Window.partitionBy(['temp']).orderBy("monthstartdate")

  monthly_df = (temp_df
              .withColumn("prev_stability", F.lag(temp_df.network_stability).over(month_attr)  )
              .withColumn("prev_stability", F.when(F.col("prev_stability") == -1, F.col("network_stability")).otherwise(F.col("prev_stability")))
              .withColumn("stabilityChangeMonthly", ( (F.col("network_stability") - F.col("prev_stability")) / F.col("prev_stability")) * 100 )
              .withColumn("prev_connectivity", F.lag(temp_df.network_connections).over(month_attr)  )
              .withColumn("connectivityChangeMonthly", ( (F.col("network_connections") - F.col("prev_connectivity")) / F.col("prev_connectivity")) * 100 )
              .withColumn("prev_cohesion", F.lag(temp_df.network_cohesion).over(month_attr)  )
              .withColumn("cohesionChangeMonthly", ( (F.col("network_cohesion") - F.col("prev_cohesion")) / F.col("prev_cohesion")) * 100 )
              #.select("monthstartdate", "monthenddate", "stabilityChangeMonthly", "connectivityChangeMonthly", "cohesionChangeMonthly")
              .withColumn("stabilityChangeTotal", lit(None))
              .withColumn("connectivityChangeTotal", lit(None))
              .withColumn("cohesionChangeTotal", lit(None))
              .withColumn("isMonthly", lit(1))
              .fillna(0)
             )
  
  latest_start = get_max_date2(monthly_df)
  
  monthly_df = monthly_df.withColumn("isLastMonth", F.when(F.col("monthstartdate")==latest_start, F.lit(1)).otherwise(F.lit(0)))
  
  
else:
  print("Cannot calculate Group Organization metrics. Either LCC = 0 or less than 2 months of data.")

StatementMeta(spark1, 44, 44, Finished, Available)



Pre-processing for Total Changes

In [102]:
##############################################################################################
# NOTE: requires three months for total changes as fluidity is only calculed for change between
# two months. so if we have less than three months of data we will only have 1 fluidity score.
# so change will just be 0
##############################################################################################
# logging_output = output_log(logging_output, "StdOut","Pre-processing for calculating total change.")
# logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')

if not bad_flag:
  # add column that denotes if its first month/last month
  month_attr = Window.partitionBy(['temp']).orderBy("monthstartdate", "monthenddate")

  monthly_df_temp = (monthly_df
                     .withColumn("month_order", F.dense_rank().over(month_attr))
                    )

  # get the last month
  last_month = (monthly_df_temp
                .groupBy(F.col("temp"))
                .agg(F.max("month_order").alias("max_month"))
                .collect()
            )
  last_month_num = [x['max_month'] for x in last_month][0]

  print(last_month_num)

  # get the network stability of month 2
  nw_stab = monthly_df_temp.filter(F.col("month_order")==2).select("network_stability").collect()
  nws = [x['network_stability'] for x in nw_stab][0]
  print(nws)

  # replace month 1 stability with month 2 stability - then calculate total change
  temp_df = (monthly_df_temp
             .filter( (F.col("month_order")==1) | (F.col("month_order") == last_month_num) )
             .withColumn("network_stability", F.when(F.col("month_order")==1, nws).otherwise(F.col("network_stability")))
            )
else:
  print("Cannot calculate Group Organization metrics. Either LCC = 0 or less than 2 months of data.")

StatementMeta(spark1, 44, 45, Finished, Available)

29
0.5836312788507976

Calculating Total Change

In [103]:
# logging_output = output_log(logging_output, "StdOut","Calculating Total Change - First month vs last month.")
# logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')

if not bad_flag:
  month_attr = Window.partitionBy(['temp']).orderBy("monthstartdate")

  temp_df2 = (temp_df
              .withColumn("prev_stability", F.lag(temp_df.network_stability).over(month_attr)  )
              .withColumn("stabilityChangeTotal", F.round(( (F.col("network_stability") - F.col("prev_stability")) / F.col("prev_stability")) * 100, 10) )
              .withColumn("prev_connectivity", F.lag(temp_df.network_connections).over(month_attr)  )
              .withColumn("connectivityChangeTotal", ( (F.col("network_connections") - F.col("prev_connectivity")) / F.col("prev_connectivity")) * 100 )
              .withColumn("prev_cohesion", F.lag(temp_df.network_cohesion).over(month_attr)  )
              .withColumn("cohesionChangeTotal", ( (F.col("network_cohesion") - F.col("prev_cohesion")) / F.col("prev_cohesion")) * 100 )
              .select("monthstartdate", "monthenddate", "stabilityChangeTotal", "connectivityChangeTotal", "cohesionChangeTotal")
              .fillna(0)
             )

  total_df = (temp_df2
              .filter(F.col("monthenddate")==EndDate)
              .withColumn("monthstartdate", lit(StartDate))
              .withColumn("stabilityChangeMonthly", lit(None))
              .withColumn("connectivityChangeMonthly", lit(None))
              .withColumn("cohesionChangeMonthly", lit(None))
              .withColumn("isMonthly", lit(0))
              .withColumn("isLastMonth", lit(0))
             )

  ### JOINING monthly_df with total_df
  monthly_df2 = monthly_df.select("monthstartdate", "monthenddate", "stabilityChangeMonthly", "connectivityChangeMonthly", "cohesionChangeMonthly",
                                  "stabilityChangeTotal", "connectivityChangeTotal", "cohesionChangeTotal", "isMonthly", "isLastMonth")     
  cols = ['monthstartdate', 'monthenddate'] + [x for x in sorted(monthly_df2.columns) if x not in ['monthstartdate', 'monthenddate']]

  org_insights = (monthly_df2
                  .select(*[cols])
                  .union(total_df.select(*[cols]))
                  .join(monthly_df
                        .select("monthstartdate", "monthenddate", "network_cohesion", "network_stability", "network_connections"),
                       ["monthstartdate", "monthenddate"], "left")
                  .withColumnRenamed("network_connections", "network_connectivity")
                 )
  
  # replace first stability value of -1 with second month value for visualization
  stability_replace_val = (monthly_df.select("network_stability")
                         .filter(F.col("network_stability")!= -1)
                         .orderBy("monthstartdate", "monthenddate")
                         .collect()
                        )
  replace_val = stability_replace_val[0]['network_stability']
  org_insights = (org_insights
                  .withColumn("network_stability", F.when(F.col("network_stability")==-1, F.lit(replace_val)).otherwise(F.col("network_stability")))
                 )
  
  cols = ['monthstartdate', 'monthenddate'] + [x for x in sorted(org_insights.columns) if x not in ['monthstartdate', 'monthenddate']]
  org_insights = org_insights.select(*[cols])
  
  # add number of clusters for each month
  number_of_clusters = (spark.read.parquet(os.path.join(ResultBlobPath, 'partitions.parquet'))
                      .groupBy("date")
                      .agg(F.countDistinct("root_partition_id").alias("number_of_clusters"))
                      .withColumnRenamed("date", "MonthStartDate")
                      .withColumn("MonthStartDate", F.concat(F.col("MonthStartDate"), F.lit("-01"))) 
                     )
  org_insights = org_insights.join(number_of_clusters, ['monthstartdate'], 'left')
               
  display(org_insights)
  
else:
  e = "Cannot calculate Group Organization metrics. Either LCC = 0 or less than 2 months of data."
#   logging_output = output_log(logging_output, "StdErr", str(e))
#   logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')

StatementMeta(spark1, 44, 46, Finished, Available)

SynapseWidget(Synapse.DataFrame, a0721aaa-06e9-4882-a297-7cd727c5a06f)

Savings Results

In [104]:
if not bad_flag:
  print("Success. Saving Results")
#   logging_output = output_log(logging_output, "StdOut","Saving Results.")
#   logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')

  df_output_blob(org_insights, 'csv', 'organization_insights')

else:
  print("Writing dummy results")
#   logging_output = output_log(logging_output, "StdOut","Saving Results with Dummy Values.")
#   logging_output.to_json("/dbfs/" + log_folder + "full_log.json", orient='records')

  
  # get only MonthStart and MonthEndDate for months
  org_insights1 = (interactions_
                   .groupBy("MonthStartDate", "MonthEndDate")
                   .agg(F.count("WeightByCount").alias("count"))
                   .withColumn("isMonthly", lit(1))
                   .drop("count")
                  )
  # get row for MonthStart and MonthEnd are Start and End
  full_timeframe = spark.createDataFrame(pd.DataFrame({"MonthStartDate": [StartDate], "MonthEndDate": [EndDate], "isMonthly": [0]}))
  
  # join
  org_insights2 = (org_insights1
                   .union(full_timeframe)
                  )
  
  # place dummy values
  org_insights = (org_insights2
                   .withColumn("isLastMonth", F.when(((F.col("MonthEndDate")==EndDate) & (F.col("isMonthly")==1)), lit(1)).otherwise(lit(0)))
                   .withColumn("cohesionChangeMonthly", F.when(F.col("isMonthly")==1, lit(0)).otherwise(lit(None)))
                   .withColumn("cohesionChangeTotal", F.when(F.col("isMonthly")==1, lit(None)).otherwise(lit(0)))
                   .withColumn("connectivityChangeMonthly", F.when(F.col("isMonthly")==1, lit(0)).otherwise(lit(None)))
                   .withColumn("connectivityChangeTotal", F.when(F.col("isMonthly")==1, lit(None)).otherwise(lit(0)))
                   .withColumn("network_cohesion", F.when(F.col("isMonthly")==1, lit(-0)).otherwise(lit(None)))
                   .withColumn("network_connectivity", F.when(F.col("isMonthly")==1, lit(-0)).otherwise(lit(None)))
                   .withColumn("network_stability", F.when(F.col("isMonthly")==1, lit(0)).otherwise(lit(None)))
                   .withColumn("stabilityChangeMonthly", F.when(F.col("isMonthly")==1, lit(0)).otherwise(lit(None)))
                   .withColumn("stabilityChangeTotal", F.when(F.col("isMonthly")==1, lit(None)).otherwise(lit(0)))
                   .withColumn("number_of_clusters", lit(1))
                   .select("monthstartdate", "monthenddate", "cohesionChangeMonthly", "cohesionChangeTotal",
                          "connectivityChangeMonthly", "connectivityChangeTotal", "isLastMonth", "isMonthly",
                          "network_cohesion", "network_connectivity", "network_stability",
                          "stabilityChangeMonthly", "stabilityChangeTotal", "number_of_clusters")
                  )
  
  display(org_insights)
  
  df_output_blob(org_insights, 'csv', 'organization_insights')


StatementMeta(spark1, 44, 47, Finished, Available)

Success. Saving Results
File moved successfully:  abfss://users@dopsis.dfs.core.windows.net/ona/MGDC_Insights/organization_insights.csv
Work Folder deleted:  abfss://users@dopsis.dfs.core.windows.net/ona/MGDC_Insights/organization_insights/

In [105]:
sqlContext.clearCache()

StatementMeta(spark1, 44, 48, Finished, Available)



In [106]:
'''
=========================================================================================================

 

// Copyright (C) Microsoft Corporation. All rights reserved.

 

=========================================================================================================
'''

StatementMeta(spark1, 44, 49, Finished, Available)



In [107]:
metrics_personal_temp = (node_metrics_df
                           .fillna({"root_partition_id":-1, "leaf_partition_id":-1, "root_modularity":-1, "workgroup_modularity_component":-1, 
                                    "root_ari":-1,"egocentric_fluidity":-1, "globally_influenced_fluidity":-1, 
                                    "clustering_coefficient":-1, "x":-1, "y":-1, "freedom_to_collaborate":-1})
                          )
display(metrics_personal_temp)

StatementMeta(spark1, 44, 50, Finished, Available)

SynapseWidget(Synapse.DataFrame, 56793bf6-09cb-4c1a-b834-99fa0dc28cf6)

In [109]:
ResultBlobPath = 'abfss://users@dopsis.dfs.core.windows.net/ona/MGDC_Insights/'
ph = spark.read.option("header", "true").csv(ResultBlobPath + 'personhistorical.csv')
metrics_personal_temp2 = metrics_personal_temp.join(ph, on=["pid","MonthStartDate"], how='left')
df_output_blob(metrics_personal_temp2,"csv","metrics_personal_ph")

StatementMeta(spark1, 44, 52, Finished, Available)

File moved successfully:  abfss://users@dopsis.dfs.core.windows.net/ona/MGDC_Insights/metrics_personal_ph.csv
Work Folder deleted:  abfss://users@dopsis.dfs.core.windows.net/ona/MGDC_Insights/metrics_personal_ph/

In [115]:
ph.columns
other_columns =['pid','FunctionType', 'LevelDesignation', 'Layer', 'Region', 'Organization', 'GroupNum', 'Numberofdirectreports', 'SupervisorIndicator']

StatementMeta(spark1, 44, 58, Finished, Available)



In [116]:
ResultBlobPath = 'abfss://users@dopsis.dfs.core.windows.net/ona/MGDC_Insights/'
ph = spark.read.option("header", "true").csv(ResultBlobPath + 'personhistorical.csv')
metrics_personal_temp2 = metrics_personal_temp.join(ph.select(other_columns), on="pid", how='left')
df_output_blob(metrics_personal_temp2,"csv","metrics_personal_ph2")

StatementMeta(spark1, 44, 59, Finished, Available)

File moved successfully:  abfss://users@dopsis.dfs.core.windows.net/ona/MGDC_Insights/metrics_personal_ph2.csv
Work Folder deleted:  abfss://users@dopsis.dfs.core.windows.net/ona/MGDC_Insights/metrics_personal_ph2/