In [1]:
# This notebook compute anomaly for max_apply_date -1 day with past 90 days window
# Terminology:
# {}_thresholds: the actual value of 5th and 95th percentiles, any value outside of the range considered anomaly
# {}_score: the mean score of the compute date (overall/channel: max_apply_date -1 day or yesterday; positive/negative: max_apply_date -1 -21, with the assumption of hirer take 21 days to update application status)
# {}_trigger: trigger as anomaly? 1 yes 0 no
# {}_count: the datapoints or number of applications from the compute date (overall/channel: max_apply_date -1 day or yesterday; positive/negative: max_apply_date -1 -21, with the assumption of hirer take 21 days to update application status)

In [2]:
%run ./library

In [3]:
dbutils.widgets.text("country", "my", "country")
dbutils.widgets.text("max_app_id", "", "max_app_id")
country = dbutils.widgets.get("country")
max_app_id = int(dbutils.widgets.get("max_app_id"))

In [4]:
config = dbutils.notebook.run("config", 0, {"country": country})
params = json.loads(config)

application_status_window = params['application_status_window']
application_signal_window = params['application_signal_window']
min_percentile = params['min_percentile']
max_percentile = params['max_percentile']

# conversion_function_window = params['conversion_function_window']
descriptive_fields = params['meta_tables']['descriptive_fields']
application_signals = params['meta_tables']['application_signals']
data_mart_predictions = params['common_dirs']['data_mart_predictions']
anomaly_result_table = params['meta_tables']['anomaly_detection_summary']


In [5]:
#checking the dependency
assert prerequisite_check(max_app_id, [JOB_PLACEMENTS_PROXY_EXTRACTION, JOB_PLACEMENTS_PROXY_SCORE, JOB_PLACEMENTS_PROXY_DESCRIPTIVE, JOB_PLACEMENTS_PROXY_SIGNALS], params["meta_tables"]["state_management"])

In [6]:
prediction = spark.read.parquet(data_mart_predictions)
descriptive = spark.table(descriptive_fields)
signal = spark.table(application_signals)

# join tables and derive max_date
signal = drop_ordered_duplicates(signal, 'application_id', 'action_date')
descriptive = descriptive.select(
  'application_id', 
  col('apply_date').substr(0,10).alias('apply_date'),
  'application_channel_code', 
  'application_channel_desc'
)

dataset = prediction.join(
  descriptive,
  'application_id'
).join(
  signal,
  'application_id'
)

max_apply_date = dataset.select(max('apply_date').alias('max_apply_date')).collect()[0]['max_apply_date']

In [7]:
yesterday = (datetime.strptime(max_apply_date,"%Y-%m-%d") - timedelta(days = 1)).strftime("%Y-%m-%d")
window_start_date = (datetime.strptime(yesterday,"%Y-%m-%d") - timedelta(days = int(application_status_window))).strftime("%Y-%m-%d")
window_signal_end_date =  (datetime.strptime(yesterday,"%Y-%m-%d") - timedelta(days = int(application_signal_window))).strftime("%Y-%m-%d")

In [8]:
dataset = dataset.filter(
  (col('apply_date') > window_start_date)
).withColumn(
  'label', label_application_status('application_status_code')
)

dataset.cache()

In [9]:
MODE_OVERALL = "OVERALL"
MODE_POSITIVE = "POSITIVE"
MODE_NEGATIVE = "NEGATIVE"
MODE_CHANNEL = "CHANNEL"

def anomaly_detection(df, mode, channel=None):
  
  # get daily mean
  daily_mean_df = None
  
  if mode == MODE_NEGATIVE:
    df = df.filter(
      (col('label').isin([0])) &
      (col('apply_date') <= window_signal_end_date)
    )
  elif mode == MODE_POSITIVE:
     df = df.filter(
      (col('label').isin([1])) &
      (col('apply_date') <= window_signal_end_date)
    )
  elif mode == MODE_OVERALL:
    df = df.filter(
      (col('label').isin([0, 1, -1]))
    )
  elif mode == MODE_CHANNEL:
    df = df.filter(
      col('application_channel_desc') == channel
    )
  
  df_count = None
  thresholds = [None, None]
  today_score = None
  trigger = None
  
# skip anomaly detection if df.count()/datapoints lesser than 11
# default None value to anomaly_detection_summary (lower_threshold, upper_threshold, score, num_app) columns 
  if df.count() > 11:
    
    # get count()
    df_count = df.filter(
      col('apply_date') == (yesterday if mode in [MODE_OVERALL, MODE_CHANNEL] else window_signal_end_date)
    ).count()

    # get daily mean
    daily_mean_df = df.groupby(
      'apply_date'
    ).agg(
      mean('score').alias('mean_score')
    )

    # calculate threshold
    thresholds = make_threshold(
      daily_mean_df,
      yesterday,
      min_percentile,
      max_percentile
    )

    # get today's score
    today_score = daily_mean_df.filter(
      col('apply_date') == (yesterday if mode in [MODE_OVERALL, MODE_CHANNEL] else window_signal_end_date)
    ).select(
      'mean_score'
    ).collect()[0][0]

    # decide trigger
    trigger = anomaly_detector(
      today_score,
      thresholds
    )
 
  return thresholds, today_score, trigger, df_count

In [10]:
overall_thresholds, overall_score, overall_trigger, overall_count = anomaly_detection(dataset, MODE_OVERALL)
positive_thresholds, positive_score, positive_trigger, positive_count = anomaly_detection(dataset, MODE_POSITIVE)
negative_thresholds, negative_score, negative_trigger, negative_count = anomaly_detection(dataset, MODE_NEGATIVE)
anomaly_list = []
anomaly_label_list = []

# send alerts to slack channel
if overall_trigger == 1:
  anomaly_list.append('overall_mean')
if positive_trigger == 1:
  anomaly_label_list.append('positive_label_mean')
if negative_trigger == 1:
  anomaly_label_list.append('negative_label_mean')

In [11]:
print(overall_thresholds, overall_score, overall_trigger, overall_count)
print(positive_thresholds, positive_score, positive_trigger, positive_count)
print(negative_thresholds, negative_score, negative_trigger, negative_count)

In [12]:
%sql use placements_proxy;
create table if not exists placements_proxy.anomaly_detection_summary (
  tag STRING COMMENT 'types of scores', 
  run_date TIMESTAMP COMMENT 'the date that notebook is run', 
  date STRING,
  lower_threshold DOUBLE,
  upper_threshold DOUBLE,
  score DOUBLE COMMENT 'scores for the date',
  num_app INT COMMENT 'number of application for date',
  is_anomaly BOOLEAN,
  country STRING,
  remarks STRING
) USING PARQUET
partitioned by (
  country, 
  date
);

create table if not exists placements_proxy_dev.anomaly_detection_summary (
  tag STRING COMMENT 'types of scores', 
  run_date TIMESTAMP COMMENT 'the date that notebook is run', 
  date STRING,
  lower_threshold DOUBLE,
  upper_threshold DOUBLE,
  score DOUBLE COMMENT 'scores for the date',
  num_app INT COMMENT 'number of application for date',
  is_anomaly BOOLEAN,
  country STRING,
  remarks STRING
) USING PARQUET
partitioned by (
  country, 
  date
);

In [13]:
# Create a database so to link with Tableau
run_date = datetime.now()
overall_mean_row = anomaly_table_row(
  'overall_mean', 
  run_date, 
  yesterday, 
  overall_thresholds, 
  overall_score, 
  overall_count, 
  overall_trigger, 
  country, 
  window_start_date, 
  yesterday
)

negative_label_mean_row = anomaly_table_row(
  'negative_label_mean', 
  run_date, 
  window_signal_end_date, 
  negative_thresholds, 
  negative_score, 
  negative_count, 
  negative_trigger, 
  country, 
  window_start_date, 
  window_signal_end_date
)

positive_label_mean_row = anomaly_table_row(
  'positive_label_mean', 
  run_date, 
  window_signal_end_date, 
  positive_thresholds, 
  positive_score, 
  positive_count, 
  positive_trigger, 
  country, 
  window_start_date, 
  window_signal_end_date
)

schema = anomaly_table_schema()

In [14]:
row = overall_mean_row + positive_label_mean_row + negative_label_mean_row

In [15]:
result = spark.createDataFrame(row, schema)
result.write.saveAsTable(anomaly_result_table, mode = 'append', partitionBy = ['country','date'])

#####Application Channel Anomaly Detection

In [17]:
dataset_patched = dataset.select(
  'apply_date'
).distinct().withColumn('flag', lit(0)).join(
  dataset.select(
    'application_channel_code'
  ).distinct().withColumn('flag', lit(0)), 
  on = ['flag']
).join(
  dataset, 
  on = ['apply_date','application_channel_code'], 
  how = 'left_outer'
).fillna(0, subset = ['score'])

In [18]:
dataset_patched.cache()
dataset_patched.count()

In [19]:
channel_list = dataset_patched.filter(
  col('apply_date') == yesterday
).select(
  'application_channel_desc'
).dropna().distinct().collect()

In [20]:
for i in range(len(channel_list)):
  channel_name = channel_list[i].application_channel_desc
  channel_thresholds, channel_mean_score, trigger_channel, channel_number_of_application = anomaly_detection(
    dataset_patched, 
    mode=MODE_CHANNEL, 
    channel=channel_name
  )

  if trigger_channel == 1:
    anomaly_list.append('channel: ' + channel_name)
    
      
  application_channel_mean_row = anomaly_table_row(
    channel_name,
    run_date,
    yesterday,
    channel_thresholds,
    channel_mean_score,
    channel_number_of_application,
    trigger_channel,
    country,
    window_start_date,
    yesterday
  )
  
  result = spark.createDataFrame(application_channel_mean_row, schema)
  result.write.saveAsTable(anomaly_result_table, mode = 'append', partitionBy = ['country','date'])

In [21]:
if len(anomaly_list) != 0:
  slack_anomaly_message(anomaly_list, yesterday, country)

if len(anomaly_label_list) != 0:
  slack_anomaly_message(anomaly_label_list, window_signal_end_date, country)