This notebook is used for scoring using anomalous resource access model. The model was saved by the training Notebook.  This Notebook runs on a schedule, loads the model and score new events. The data used here is File Share Access Events from Windows machine. Data is loaded from a Blob Storage Container. The top scored results are submitted to Log Analytics.

Steps:
   0. One-time: Install the following packages on the cluster (refer: https://forums.databricks.com/questions/680/how-to-install-python-package-on-spark-cluster.html)
        - com.microsoft.ml.spark:mmlspark_2.11:1.0.0 from https://mmlspark.azureedge.net
        - azure_sentinel_utilities whl package
        - azure-storage-blob (from PyPi - latest based on Azure SDK v12)
        - plotly (from PyPi)
        
   1. One-time: Set credentials in KeyVault so the notebook can access 
        - Storage Account
        - Log Analytics
   2. Ensure the settings in the first cell below are filled in.
   3. Set the Notebook to run on a schedule to score and submit results to LA.
   
 One-time: (Setting up Storage Key & Log Analytics Key in KeyVault)
    - (Refer:- https://docs.databricks.com/spark/latest/data-sources/azure/azure-storage.html#access-azure-blob-storage-directly)
     
 Storing and retrieving secrets: 
    - Using Azure KeyVault:- https://docs.azuredatabricks.net/user-guide/secrets/secret-scopes.html#akv-ss

In [2]:
import datetime as dt

# Storage Account Connection String
storage_conn_str = dbutils.secrets.get(scope = 'YOUR_SCOPE_HERE', key = 'YOUR_KEY_HERE')

# Workspace Resource Id of your Sentinel workspace
workspaceResourceId = 'YOUR_WORKSPACE_RESOURCE_ID_HERE' # eg: /subscriptions/<sub_guid>/resourcegroups/<rg_name>/providers/microsoft.operationalinsights/workspaces/<wks_name>'
mount_point_name = 'YOUR_MOUNT_POINT_HERE' # any name
# Project name
project = 'YOUR_PROJECT_HERE' # specified in training

#Log Analytics WorkSpace (Sentinel) to write the results
workspace_id = 'YOUR_WORKSPACE_ID_HERE' # wks_guid
workspace_shared_key = dbutils.secrets.get(scope = 'YOUR_SCOPE_HERE', key = 'YOUR_KEY_HERE')

###
### Note that when scoring periodically, you specify time range relative to current time as specified in the commented section
###
# Time range for training
# test_start_time = dt.datetime.now() - dt.timedelta(hours=1)
# test_end_time = dt.datetime.now()
test_start_time = dt.datetime.strptime('2020-09-03 00:00', '%Y-%m-%d %H:%M') 
test_end_time = dt.datetime.strptime('2020-09-04 00:00', '%Y-%m-%d %H:%M') 

print (test_start_time)
print (test_end_time)

In [3]:
import re

#extract storage account and key from connection string
key_pattern = 'DefaultEndpointsProtocol=(\w+);AccountName=(\w+);AccountKey=([^;]+);'
match = re.match(key_pattern, storage_conn_str)
storage_account = match.group(2)
storage_key = match.group(3)

print (storage_account)

container = 'am-securityevent' # This name is fixed for security events

test_base_path = 'WorkspaceResourceId={workspaceResourceId}'.format(workspaceResourceId=workspaceResourceId)
print(test_base_path)

model_path = '{root}/{project}/model_output'.format(root=mount_point_name + 'models', project=project)
print(model_path)

In [4]:
###
### You can do this one-time in a separate Notebook, so that you don't cause accidental errors in other Notebooks mounting/unmounting the folder
###

# Mount the Storage Container
#    (Refer:- https://docs.databricks.com/spark/latest/data-sources/azure/azure-storage.html#mount-azure-blob-storage-containers-with-dbfs)
dbutils.fs.mount(
 source = "wasbs://" + container + "@" + storage_account + ".blob.core.windows.net",
 mount_point = mount_point_name,
 extra_configs = {"fs.azure.account.key." + storage_account + ".blob.core.windows.net":storage_key})

In [5]:
import numpy as np
import pandas as pd

from pyspark.sql import functions as f, types as t
from pyspark.sql.functions import udf

#utils
from azure_sentinel_utilities.azure_storage import storage_blob_manager
from azure_sentinel_utilities.log_analytics import log_analytics_client

#Load saved model

In [7]:
from mmlspark.cyber.anomaly.collaborative_filtering import AccessAnomalyModel
access_anomaly_model = AccessAnomalyModel.load(
    spark, 
    '{model_path}/access_anomaly_model'.format(model_path=model_path)
)

# Dataset

In [9]:
#
# This class is used to process 'file share access' related events from Security Events
#
class FileShareDataset:
  
    def __init__(self, storage_conn_str):
        self.storage_conn_str = storage_conn_str
        self.storage_blob_manager = storage_blob_manager(storage_conn_str)

    @staticmethod
    def _make_days_delta():
        @udf('double')
        def days_delta(d2, d1):
            return 1.0 + (d2 - d1).days
        return days_delta
    
    # NOTE that there are a lot more fields for security events. Below we are picking up only a subset of fields
    @staticmethod
    def _security_event_schema():
        return t.StructType([
            t.StructField(name = "Account", dataType = t.StringType(), nullable = True),
            t.StructField(name = "ShareName", dataType = t.StringType(), nullable = True),
            t.StructField(name = "ShareLocalPath", dataType = t.StringType(), nullable = True),
            t.StructField(name = "AccountType", dataType = t.StringType(), nullable = True),
            t.StructField(name = "Computer", dataType = t.StringType(), nullable = True),
            t.StructField(name = "EventID", dataType = t.StringType(), nullable = True),
            t.StructField(name = "EventData", dataType = t.StringType(), nullable = True),
            t.StructField(name = "NewProcessId", dataType = t.StringType(), nullable = True),
            t.StructField(name = "NewProcessName", dataType = t.StringType(), nullable = True),
            t.StructField(name = "ParentProcessName", dataType = t.StringType(), nullable = True),
            t.StructField(name = "Process", dataType = t.StringType(), nullable = True),
            t.StructField(name = "ProcessId", dataType = t.StringType(), nullable = True),
            t.StructField(name = "SourceComputerId", dataType = t.StringType(), nullable = True),
            t.StructField(name = "SourceSystem", dataType = t.StringType(), nullable = True),
            t.StructField(name = "SubjectAccount", dataType = t.StringType(), nullable = True),
            t.StructField(name = "SubjectDomainName", dataType = t.StringType(), nullable = True),
            t.StructField(name = "SubjectLogonId", dataType = t.StringType(), nullable = True),
            t.StructField(name = "SubjectUserName", dataType = t.StringType(), nullable = True),
            t.StructField(name = "SubjectUserSid", dataType = t.StringType(), nullable = True),
            t.StructField(name = "TargetAccount", dataType = t.StringType(), nullable = True),
            t.StructField(name = "TargetDomainName", dataType = t.StringType(), nullable = True),
            t.StructField(name = "TargetLogonId", dataType = t.StringType(), nullable = True),
            t.StructField(name = "TargetUserName", dataType = t.StringType(), nullable = True),
            t.StructField(name = "TargetUserSid", dataType = t.StringType(), nullable = True),
            t.StructField(name = "TenantId", dataType = t.StringType(), nullable = True),
            t.StructField(name = "TimeCollected", dataType = t.StringType(), nullable = True),
            t.StructField(name = "TimeGenerated", dataType = t.StringType(), nullable = True),
            t.StructField(name = "TokenElevationType", dataType = t.StringType(), nullable = True),
        ])

    # Get file share access data from security events
    def get_fs_dataset(self, start_time, end_time, container, root):   
        raw_df = self.storage_blob_manager.get_raw_df(
                                        start_time, 
                                        end_time, 
                                        container, 
                                        root, 
                                        FileShareDataset._security_event_schema(), 
                                        storage_blob_manager.get_blob_service_client(self.storage_conn_str) )
        # Get FileShare access events
        return raw_df.where(
                    f.col('EventID') == '5140'
                 ).select (
                    f.lit('0').alias('tenant_id'),
                    f.col('TimeGenerated'),
                    f.to_date(f.col('TimeGenerated').cast('timestamp')).cast('timestamp').alias('Timestamp'), # timestamp is set at day 00:00
                    f.col('Account').alias('user'),
                    f.col('ShareName').alias('res'),
                 )
    
    # group the file share access per day and assign an initial likelyhood score
    def get_processed_fs_dataset(self, start_time, end_time, container, root):
        dd = FileShareDataset._make_days_delta()

        df_fs = self.get_fs_dataset(start_time, end_time, container, root)
        
        # group fileshare access events per day
        daily_fs_activity = df_fs.groupBy(
                                'tenant_id',
                                'Timestamp',
                                'user',
                                'res'
                            ).count()
        
        # Calculate an initial likelihood score based on count of events
        return daily_fs_activity.select(
            f.col('tenant_id'),
            f.col('Timestamp').alias('timestamp1'),
            f.col('Timestamp').alias('timestamp2'),
            'user',
            'res',
            'count'
        ).groupBy(
            'tenant_id',
            'user',
            'res'
        ).agg({
            'timestamp1': 'min',
            'timestamp2': 'max',
            'count': 'sum'
        }).select(
            f.col('tenant_id'),
            f.col('min(timestamp1)').alias('min_timestamp'),
            f.col('max(timestamp2)').alias('max_timestamp'),
            f.col('user'),
            f.col('res'),
            (f.col('sum(count)')/dd(f.col('max(timestamp2)'), f.col('min(timestamp1)'))).alias('likelihood')
        )

In [10]:
def getdataset():
    return FileShareDataset(storage_conn_str).get_fs_dataset(test_start_time, test_end_time, container, test_base_path)

In [11]:
ptesting = getdataset()

In [12]:
print(ptesting.first())

In [13]:
ptesting.describe().show()

# Scoring

In [15]:
pred_df = access_anomaly_model.transform(ptesting)

In [16]:
pred_df.first()

In [17]:
pred_df.select('anomaly_score').describe().show()

In [18]:
# report results

In [19]:
full_res_df = pred_df.orderBy(f.desc('anomaly_score')).cache()

In [20]:
full_res_df.first()

In [21]:
# Check score of a simulated anomolous user access

#anomalous_user_access = full_res_df.filter(full_res_df.user.like('Domain_282/User_871048'))
#display(anomalous_user_access)

##Filter out commonly seen users (automation account that are known to access File Shares)

In [23]:
# If there are automation user accounts that access different shares and can cause false positives then filter such users out
usersToFilter = ['Domain_346/User_870818', 'Domain_348/User_231659']
filtered_result = full_res_df.filter(full_res_df.user.isin(*usersToFilter) == False)
filtered_result = filtered_result.where(f.col('user').endswith('User_255625') == False) # automation user in all domains
print(full_res_df.count())
print(filtered_result.count())

##Rank top anomalous users

In [25]:
def print_ratio(df, thr):
    print('ratio of above {0} items {1}/{2} = {3}%'.format(
        thr,
        df.filter(f.col('anomaly_score') > thr).count(),
        df.count(),
        100.0*df.filter(f.col('anomaly_score') > thr).count()/df.count()
    ))
    
print_ratio(full_res_df, 0)
print_ratio(full_res_df, 2.5)
print_ratio(full_res_df, 5)
print_ratio(full_res_df, 7.5)

In [26]:
display(full_res_df)

In [27]:
#
# Select a subset of results to send to Log Analytics
#
from pyspark.sql.window import Window

w = Window.partitionBy(
                  'tenant_id',
                  'user',
                  'res'
                ).orderBy(
                  f.desc('anomaly_score')
                )

# select values above threshold
results_above_threshold = filtered_result.filter(filtered_result.anomaly_score > 0.5)

# get distinct resource/user and corresponding timestamp and highest score
results_to_la = results_above_threshold.withColumn(
                  'index', f.row_number().over(w)
                  ).orderBy(
                    f.desc('anomaly_score')
                  ).select(
                    'tenant_id',
                    f.col('user'),
                    f.col('res'),
                    'timestamp',
                    'anomaly_score'
                  ).where(
                    'index == 1'
                  ).limit(25).cache()
  
display(results_to_la)

#Write top anomalous scores to Sentinel

In [29]:
@udf
def escape_str(str):
  return str.replace('\\','\\\\')

def send_results_to_log_analytics(df_to_la):
  # The log type is the name of the event that is being submitted.  This will show up under "Custom Logs" as log_type + '_CL'
  log_type = 'AnomalousResourceAccessResult'

  # concatenate columns to form one json record
  json_records = df_to_la.withColumn('json_field', f.concat(f.lit('{'), 
                                            f.lit(' \"TimeStamp\": \"'), f.from_unixtime(f.unix_timestamp(f.col("timestamp")), "y-MM-dd'T'hh:mm:ss.SSS'Z'"), f.lit('\",'),
                                            f.lit(' \"User\": \"'), escape_str(f.col('user')), f.lit('\",'),
                                            f.lit(' \"Resource\": \"'), escape_str(f.col('res')), f.lit('\",'),
                                            f.lit(' \"AnomalyScore\":'), f.col('anomaly_score'),
                                            f.lit('}')
                                           )                       
                                         )
  # combine json record column to create the array
  json_body = json_records.agg(f.concat_ws(", ", f.collect_list('json_field')).alias('body'))

  if len(json_body.first()) > 0:
    json_payload = json_body.first()['body']
    json_payload = '[' + json_payload + ']'

    payload = json_payload.encode('utf-8') #json.dumps(json_payload)
    print(payload)
    return log_analytics_client(workspace_id, workspace_shared_key).post_data(payload, log_type)
  else:
    return "No json data to send to LA"

count = results_to_la.count()
if count > 0:
  print ('Results count = ', count)
  result = send_results_to_log_analytics(results_to_la)
  print("Writing to Log Analytics result: ", result)
else:
  print ('No results to send to LA')

In [30]:
# users that were not in the training set
never_seen_users = full_res_df.where(f.col('anomaly_score').isNull()).select(f.col('user')).distinct()

print('Count never seen users:', never_seen_users.count())
display(never_seen_users)

#Display all resource accesses by users with highest anomalous score

In [32]:
from plotly import __version__
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot, offline
print (__version__) # requires version >= 1.9.0

# run plotly in offline mode
offline.init_notebook_mode()

In [33]:
#Find all server accesses of users with high predicted scores
# For display, limit to top 25 results
results_to_display = results_to_la.orderBy(
                  f.desc('anomaly_score')
                ).limit(25).cache()
interesting_records = filtered_result.join(results_to_display, ['user'], 'left_semi')
non_anomalous_records = interesting_records.join(results_to_display, ['user', 'res'], 'left_anti')

top_non_anomalous_records = non_anomalous_records.groupBy(
                          'tenant_id',
                          'user', 
                          'res'
                        ).agg(
                          f.count('*').alias('count'),
                        ).select(
                          f.col('tenant_id'),
                          f.col('user'),
                          f.col('res'),
                          'count'
                        )

#pick only a subset of non-anomalous record for UI
w = Window.partitionBy(
                  'tenant_id',
                  'user',
                ).orderBy(
                  f.desc('count')
                )

# pick top non-anomalous set
top_non_anomalous_accesses = top_non_anomalous_records.withColumn(
                  'index', f.row_number().over(w)
                  ).orderBy(
                    f.desc('count')
                  ).select(
                    'tenant_id',
                    f.col('user'),
                    f.col('res'),
                    f.col('count')
                  ).where(
                    'index in (1,2,3,4,5)'
                  ).limit(25)

# add back anomalous record
fileShare_accesses = (top_non_anomalous_accesses
                          .select('user', 'res', 'count')
                          .union(results_to_display.select('user', 'res', f.lit(1).alias('count'))).cache())

In [34]:
# get unique users and file shares
high_scores_df = fileShare_accesses.toPandas()
unique_arr = np.append(high_scores_df.user.unique(), high_scores_df.res.unique())

unique_df = pd.DataFrame(data = unique_arr, columns = ['name'])
unique_df['index'] = range(0, len(unique_df.index))

# create index for source & target and color for the normal accesses
normal_line_color = 'rgba(211, 211, 211, 0.8)'
anomolous_color = 'red'
x = pd.merge(high_scores_df, unique_df, how='left', left_on='user', right_on='name').drop(['name'], axis=1).rename(columns={'index' : 'userIndex'})
all_access_index_df = pd.merge(x, unique_df, how='left', left_on='res', right_on='name').drop(['name'], axis=1).rename(columns={'index' : 'resIndex'})
all_access_index_df['color'] = normal_line_color

# results_to_display index, color and 
y = results_to_display.toPandas().drop(['tenant_id', 'timestamp', 'anomaly_score'], axis=1)
y = pd.merge(y, unique_df, how='left', left_on='user', right_on='name').drop(['name'], axis=1).rename(columns={'index' : 'userIndex'})
high_scores_index_df = pd.merge(y, unique_df, how='left', left_on='res', right_on='name').drop(['name'], axis=1).rename(columns={'index' : 'resIndex'})
high_scores_index_df['count'] = 1
high_scores_index_df['color'] = anomolous_color

# substract 1 for the red entries in all_access df
hsi_df = high_scores_index_df[['user','res', 'count']].rename(columns={'count' : 'hsiCount'})
all_access_updated_count_df = pd.merge(all_access_index_df, hsi_df, how='left', left_on=['user', 'res'], right_on=['user', 'res'])
all_access_updated_count_df['count'] = np.where(all_access_updated_count_df['hsiCount']==1, all_access_updated_count_df['count'] - 1, all_access_updated_count_df['count'])
all_access_updated_count_df = all_access_updated_count_df.loc[all_access_updated_count_df['count'] > 0]
all_access_updated_count_df = all_access_updated_count_df[['user','res', 'count', 'userIndex', 'resIndex', 'color']]

# combine the two tables
frames = [all_access_updated_count_df, high_scores_index_df]
display_df = pd.concat(frames, sort=True)
# display_df.head()

In [35]:
data_trace = dict(
    type='sankey',
    domain = dict(
      x =  [0,1],
      y =  [0,1]
    ),
    orientation = "h",
    valueformat = ".0f",
    node = dict(
      pad = 10,
      thickness = 30,
      line = dict(
        color = "black",
        width = 0
      ),
      label = unique_df['name'].dropna(axis=0, how='any')
    ),
    link = dict(
      source = display_df['userIndex'].dropna(axis=0, how='any'),
      target = display_df['resIndex'].dropna(axis=0, how='any'),
      value = display_df['count'].dropna(axis=0, how='any'),
      color = display_df['color'].dropna(axis=0, how='any'),
  )
)

layout =  dict(
    title = "All resources accessed by users with highest anomalous scores",
    height = 772,
    font = dict(
      size = 10
    ),    
)

fig = dict(data=[data_trace], layout=layout)

p = plot(fig, output_type='div')

displayHTML(p)

In [36]:
# unmount blob storage
dbutils.fs.unmount(mount_point_name)