
# Anomalous Process Execution

Read data from the previously created table and creates a model to determine anomalous process executions within 4688 Event Logs.


In [None]:
from datetime import datetime, timezone, timedelta

from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml import Pipeline

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

from sklearn.ensemble import IsolationForest


In [None]:
## Variables

# Delta table info
db = "deltadb"
table_format = "delta"
table_name = "process_execution"

# Log Analytics workspace info
workspace_id = 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx'
# workspace_shared_key should be stored in Databricks Secrets/Key Vault
# workspace_shared_key = dbutils.secrets.get(scope = 'YOUR_SCOPE_HERE', key = 'YOUR_KEY_HERE')
workspace_shared_key = 'yyyyyyyy'

In [None]:
## Set context accordingly and list tables
spark.sql(f"USE {db};")
spark.sql("SHOW TABLES;").show()

## Process data

In [None]:
## Load data and summarize by hour
data = spark.read.format(table_format).table(table_name)
#data = data.withColumn("interval", date_trunc("hour", "TimeGenerated"))
data = data.withColumn("interval", date_trunc("day", "TimeGenerated"))
data = data.groupBy("interval", "Computer", "Account", "AccountType", "Process").agg(collect_set("CommandLine").alias("CommandLines"), count("*").alias("count"))

In [None]:
## Prepare data for vectorizing

# Convert hour column to Unix timestamp
data = data.withColumn("unixInterval", unix_timestamp("interval"))

# Convert categorical columns to numerical values using StringIndexer
computerIndexer = StringIndexer(inputCol="Computer", outputCol="computerIndex")
accountIndexer = StringIndexer(inputCol="Account", outputCol="accountIndex")
accountTypeIndexer = StringIndexer(inputCol="AccountType", outputCol="accountTypeIndex")
processIndexer = StringIndexer(inputCol="Process", outputCol="processIndex")
data = computerIndexer.fit(data).transform(data)
data = accountIndexer.fit(data).transform(data)
data = accountTypeIndexer.fit(data).transform(data)
data = processIndexer.fit(data).transform(data)

## Training Isolation Forest Model

In [None]:
## Creates Isolation Forest model and fit it

features = ["unixInterval", "computerIndex", "accountIndex", "accountTypeIndex", "processIndex","count"]
clf = IsolationForest(max_samples=100, random_state=42)
clf.fit(np.array(data.select(features).collect()))

In [None]:
## Create predictions based on trained model
y_pred = (clf.predict(data.select(features).collect())).tolist()
indexed_y_pred =  [(i, n) for i, n in enumerate(y_pred)]
y_pred_df = spark.createDataFrame(indexed_y_pred, ["index","score"])

In [None]:
## Append score to dataframe and filter anomalies
data = data.withColumn("index", monotonically_increasing_id())
data = data.join(y_pred_df, data.index == y_pred_df.index).drop("index")

In [None]:
## Filter Anomalies 

now = datetime.now(timezone.utc)
timeInterval = now - timedelta(days = 1)
anomalies = data.filter((col("score") == -1) & (col("interval") >= timeInterval))

In [None]:
## Display anomalies
display(anomalies.select(["interval", "Computer", "Account", "AccountType", "Process","CommandLines"]))

## Send Anomalies to Log Analytics

In [None]:
# @udf
# def escape_str(str):
#   return str.replace('\\','\\\\')

# def send_results_to_log_analytics(df_to_la):
#   # The log type is the name of the event that is being submitted.  This will show up under "Custom Logs" as log_type + '_CL'
#   log_type = 'AnomalousProcessExecutionResult'

#   # concatenate columns to form one json record
#   json_records = df_to_la.withColumn('json_field', f.concat(f.lit('{'), 
#                                             f.lit(' \"TimeStamp\": \"'), f.from_unixtime(f.unix_timestamp(f.col("timestamp")), "y-MM-dd'T'hh:mm:ss.SSS'Z'"), f.lit('\",'),
#                                             f.lit(' \"User\": \"'), escape_str(f.col('user')), f.lit('\",'),
#                                             f.lit(' \"Resource\": \"'), escape_str(f.col('res')), f.lit('\",'),
#                                             f.lit(' \"AnomalyScore\":'), f.col('anomaly_score'),
#                                             f.lit('}')
#                                            )                       
#                                          )
#   # combine json record column to create the array
#   json_body = json_records.agg(f.concat_ws(", ", f.collect_list('json_field')).alias('body'))

#   if len(json_body.first()) > 0:
#     json_payload = json_body.first()['body']
#     json_payload = '[' + json_payload + ']'

#     payload = json_payload.encode('utf-8') #json.dumps(json_payload)
#     # print(payload)
#     return log_analytics_client(workspace_id, workspace_shared_key).post_data(payload, log_type)
#   else:
#     return "No json data to send to LA"

# count = results_to_la.count()
# if count > 0:
#   print ('Results count = ', count)
#   result = send_results_to_log_analytics(results_to_la)
#   print("Writing to Log Analytics result: ", result)
# else:
#   print ('No results to send to LA')