# Tech Stack
I decided to use **Docker** to run local services for a **Postgres database**, **dbt**, and **Jupyter notebook**. My reasoning is that I already had this infrastructure set up for my own projects so I could quickly assemble it and start the exercise.

I pushed my code to this git repo: https://github.com/danbratton/abnormal_exercise

# Loading Data
I decided to load the data into Postgres using dbt **seeds**. Although seeds are for slowly changing or static data, it was quick and easy to use for this exercise. I put the CSV files in the `seeds` directory and then ran `dbt seed`.

# Running SQL
I developed my SQL in DBeaver and put the final SQL below, which I execute with `psycopg2`. I used `pandas` to display the results in a nice format.

In [1]:
import psycopg2
import pandas as pd
import os

# pull in db credentials from environment variables
# note: the environment variables are set by compose.yaml from the .env-local file
conn = psycopg2.connect(
    database=os.environ['DEV_PG_NAME'], 
    user=os.environ['DEV_PG_USER'],  
    password=os.environ['DEV_PG_PASSWORD'], 
    host=os.environ['DEV_PG_HOST'], 
    port=os.environ['DEV_PG_PORT']
)

def run_select(sql):
    conn = psycopg2.connect( 
        database=os.environ['DEV_PG_NAME'], user=os.environ['DEV_PG_USER'],  
        password=os.environ['DEV_PG_PASSWORD'], host=os.environ['DEV_PG_HOST'], port=os.environ['DEV_PG_PORT']
    )
    cursor = conn.cursor()
    cursor.execute(sql) 
    results = cursor.fetchall() 
    cols = [desc[0] for desc in cursor.description]
    df = pd.DataFrame(results, columns=cols)
    conn.commit() 
    conn.close() 
    return df


# Exercise

## 1.a. How many customers do we have ATO product data for?
* I'm assuming a customer is identified by the `customer_id`
* I am NOT going to assume all the customers with ATO data are in the `customers` table. That is why I counted all the unique `customer_id`s across all event tables instead of counting the customers in the `customers` table.

In [2]:
sql = """
with customer_ids as (
	select
		distinct customer_id
	from user_events
	
	union
	
	select
		distinct customer_id
	from detector_events de 
	
	union
	
	select
		distinct customer_id
	from case_events ce 
	
	union
	
	select
		distinct customer_id
	from customer_case_events cce 
	
	union
	
	select
		distinct customer_id
	from customer_action_events cae
)

select 
	count(distinct(customer_id)) as customer_count
from customer_ids
"""
run_select(sql)

Unnamed: 0,customer_count
0,10


## 1.b. Over what time period does the ATO data exist?
* 

In [3]:
sql = """
with all_events_timestamps as (
	select
		event_timestamp as timestamp
	from user_events
	
	union
	
	select
		event_timestamp as timestamp
	from detector_events de 
	
	union
	
	-- I'm assuming we care about the event timestamp rather than the customer-reported event timestamp
	select
		reported_event_timestamp as timestamp
	from customer_case_events cce 
	
	union
	
	select
		action_timestamp as timestamp
	from case_action_events cae
	
	union
	
	select 
		customer_action_ts as timestamp
	from customer_action_events cae2
)

select 
	min(timestamp) as data_date_from, max(timestamp) as data_date_to
from all_events_timestamps
"""
run_select(sql)

Unnamed: 0,data_date_from,data_date_to
0,2020-01-01 03:56:31,2021-01-03 15:20:10


## 1.c. How many ATO detectors are there?
* I'm assuming the detectors are identified by the `detector_id`

In [4]:
sql = """
select 
	count(distinct(detector_id)) as detector_count
from detector_events de 
"""
run_select(sql)

Unnamed: 0,detector_count
0,6


## 1.d. What is the range of customer sizes by number of users?
* I'm assuming `cnt_users` is the count of the number of users in a company.

In [5]:
sql = """
select 
	min(cnt_users) as company_user_count_from,
	max(cnt_users) as company_user_count_to
from customers c
"""
run_select(sql)

Unnamed: 0,company_user_count_from,company_user_count_to
0,14,371


## 2.a. What is the average monthly false negative rate of the ATO product?
* False negatives occur when the customer reports an ATO event that was not detected by Abnormal. Meaning, cases in `customer_case_events` are false negatives.

In [6]:
sql = """
with cases as (
	select
		de.event_timestamp as case_timestamp, 
		case_id,
		0 as false_negative
	from case_events ce
	join detector_events de on ce.triggering_event_id = de.event_id
	
	union
	
	select
		-- using the customer-provided actual time the event happened to align with detector_events.event_timestamp
		reported_event_timestamp as case_timestamp,
		reported_case_id as case_id,
		1 as false_negative
	from customer_case_events cce 
)

select 
	date_trunc('month',case_timestamp) as month,
	sum(false_negative) as false_negative_count,
	count(false_negative) as case_count,
	round(sum(false_negative)/count(false_negative)::numeric, 4)*100 as false_negative_rate_pct
from cases
group by month
order by month desc
"""
run_select(sql)

Unnamed: 0,month,false_negative_count,case_count,false_negative_rate_pct
0,2020-12-01,0,37,0.0
1,2020-11-01,1,30,3.33
2,2020-10-01,1,24,4.17
3,2020-09-01,0,36,0.0
4,2020-08-01,1,40,2.5
5,2020-07-01,0,32,0.0
6,2020-06-01,1,29,3.45
7,2020-05-01,0,36,0.0
8,2020-04-01,0,36,0.0
9,2020-03-01,0,33,0.0


## 2.b. What is the average monthly false positive rate of the ATO product?
* False positives occur when Abnormal flags a breach that the customer reports as not a threat
* This would be cases in the `case_events` table that have a `customer_action_type` of `NOT_A_COMPROMISE` in the `customer_action_events` table.

In [7]:
sql="""
with all_cases as (
	select
		case_id,
		de.event_timestamp as case_timestamp
	from case_events ce
	join detector_events de on ce.triggering_event_id = de.event_id
	
	union
	
	select
		-- using the customer-provided actual time the event happened to align with detector_events.event_timestamp
		reported_case_id as case_id,
		reported_event_timestamp as case_timestamp
	from customer_case_events cce 
)

, false_positives as (
	select
		ce.case_id, 
		de.event_timestamp,
		case 
			when customer_action_type = 'NOT_A_COMPROMISE'
			then 1 else 0 
		end as false_positive
	from case_events ce
	left join customer_action_events cae on ce.case_id = cae.case_id
	left join detector_events de on ce.triggering_event_id = de.event_id
)

, distinct_false_positives as (
	select
		distinct case_id,
		event_timestamp,
		max(false_positive) as false_positive
	from false_positives
	group by case_id, event_timestamp
)

, combined as (
	select 
		ac.case_id,
		ac.case_timestamp,
		case 
			when false_positive is not null then false_positive else 0
		end as false_positive
	from all_cases ac
	left join distinct_false_positives dfp on ac.case_id = dfp.case_id
)

select 
	date_trunc('month', case_timestamp) as month,
	sum(false_positive) as false_positive_count,
	count(false_positive) as cases_count,
	round(sum(false_positive)/count(false_positive)::numeric,4)*100 as false_positive_rate_pct
from combined c
group by month
order by month desc
"""
run_select(sql)

Unnamed: 0,month,false_positive_count,cases_count,false_positive_rate_pct
0,2020-12-01,1,37,2.7
1,2020-11-01,1,30,3.33
2,2020-10-01,3,24,12.5
3,2020-09-01,0,36,0.0
4,2020-08-01,5,40,12.5
5,2020-07-01,2,32,6.25
6,2020-06-01,0,29,0.0
7,2020-05-01,0,36,0.0
8,2020-04-01,1,36,2.78
9,2020-03-01,1,33,3.03


* I used this query to check my work; there should be 405 cases. So the sum of the `case_count` column in the above result should be 405.

In [8]:
sql="""
select count(distinct case_id) as count_of_cases from 
(
	select distinct case_id 
	from case_events 
	union 
	select distinct reported_case_id as case_id 
	from customer_case_events
) all_cases
"""
run_select(sql)


Unnamed: 0,count_of_cases
0,405


## 2.c. What fraction of user events are flagged by a detector at any confidence? 

In [9]:
sql = """
with flagged_user_events as (
	select 
		ue.event_id,
		case
			when de.event_id is not null then 1 else 0
		end as flagged_by_detector
	from user_events ue
	left join detector_events de on ue.event_id = de.event_id
)

select
	round(sum(flagged_by_detector)/count(flagged_by_detector)::numeric,4)*100 as flagged_user_events_pct
from flagged_user_events
"""
run_select(sql)

Unnamed: 0,flagged_user_events_pct
0,17.54


## At high confidence?

In [10]:
sql = """
with flagged_user_events as (
	select 
		ue.event_id,
		confidence_level,
		case
			when de.event_id is not null and de.confidence_level = 'HIGH' then 1 else 0
		end as flagged_by_detector
	from user_events ue
	left join detector_events de on ue.event_id = de.event_id
)

select
	round(sum(flagged_by_detector)/count(flagged_by_detector)::numeric,4)*100 as flagged_user_events_pct
from flagged_user_events
"""
run_select(sql)

Unnamed: 0,flagged_user_events_pct
0,8.0


## 2.d. Calculate the precision of each detector based on cases surfaced to customers.
* I am assuming 'precision' means true positive rate
* True positives are when a case is opened by a detector which a customer resolves with `customer_action_type = 'RESOLVE'`
* I'm only going to include cases that have been closed; meaning, `customer_action_type = 'RESOLVE'` or `'NOT_A_COMPROMISE'`

In [11]:
# I'm going to use these CTEs to answer the next few questions
ctes = """
with true_positives as (
	select 
		detector_id,
		event_id,
		ce.case_id,
		customer_action_type,
		case
			when customer_action_type = 'RESOLVE' then 1 
			when customer_action_type = 'NOT_A_COMPROMISE' then 0
			else NULL
		 end as true_positive
	from detector_events de
	-- inner join to only get records with a case_id in both tables
	inner join case_events ce on de.event_id = ce.triggering_event_id
	-- inner join to only get records with a resolved case_id
	inner join (
		select 
			distinct case_id, 
			customer_action_type 
		from customer_action_events 
		where customer_action_type = 'RESOLVE' or customer_action_type = 'NOT_A_COMPROMISE'
		) cae on ce.case_id = cae.case_id
)

, detector_precision as (
select
	detector_id,
	count(true_positive) as closed_cases,
	sum(true_positive) as true_positives,
	round(sum(true_positive)/count(true_positive)::numeric,2) as detector_precision
from true_positives
group by detector_id
)

, detector_rankings as (
select
	detector_id,
	detector_precision,
	rank() over (order by detector_precision desc) as precision_rank
from detector_precision
group by detector_id, detector_precision
)

, highest_precision_detectors as (
	select 
		detector_id as detectors_with_highest_precision,
		detector_precision
	from detector_rankings where precision_rank = 1
)

, lowest_precision_detectors as (
	select 
		detector_id as detectors_with_lowest_precision,
		detector_precision
	from detector_rankings de
	inner join (
		select max(precision_rank) as max_precision_rank from detector_rankings
		) mpr on de.precision_rank = mpr.max_precision_rank
)

, true_positive_count_rank as (
	select
		detector_id,
		sum(true_positive) as true_positive_count,
		rank() over (order by sum(true_positive) desc) as count_rank
	from true_positives
	group by detector_id
)

, highest_true_positive_count_detector as (
	select 
		detector_id as detector_with_most_true_positives,
		true_positive_count
	from true_positive_count_rank
	where count_rank = 1 
)
"""
sql = ctes + """
--Calculate the precision of each detector based on cases surfaced to customers.
select * from detector_precision
"""
run_select(sql)

Unnamed: 0,detector_id,closed_cases,true_positives,detector_precision
0,DETECTOR_6,12,8,0.67
1,DETECTOR_1,6,4,0.67
2,DETECTOR_5,23,16,0.7
3,DETECTOR_2,6,5,0.83
4,DETECTOR_4,3,3,1.0
5,DETECTOR_3,8,5,0.63


* I used the query below to check my work -- there should be 58 closed cases. Meaning, the sum of the `closed_cases` column in the result above should be 58

In [12]:
sql = """
select count(distinct case_id) as count_of_closed_cases
from customer_action_events cae 
where customer_action_type = 'RESOLVE' or customer_action_type = 'NOT_A_COMPROMISE'
"""
run_select(sql)

Unnamed: 0,count_of_closed_cases
0,58


## Which detector has the highest precision?

In [13]:
sql = ctes + """
select * from highest_precision_detectors
"""
run_select(sql)

Unnamed: 0,detectors_with_highest_precision,detector_precision
0,DETECTOR_4,1.0


## Which detector has the lowest precision?

In [14]:
sql = ctes + """
select * from lowest_precision_detectors
"""
run_select(sql)

Unnamed: 0,detectors_with_lowest_precision,detector_precision
0,DETECTOR_3,0.63


## 2.e. Which detector flags the highest number of customer-confirmed true positive cases?

In [15]:
sql = ctes + """
select * from highest_true_positive_count_detector
"""
run_select(sql)

Unnamed: 0,detector_with_most_true_positives,true_positive_count
0,DETECTOR_5,16


# 3. Building a data pipeline
Since I already have the tables in dbt, I can build a model. After I run `dbt build` with this model, I can query it.
```
-- models/abnormal.sql

select 
    ue.customer_id,
	ue.user_id,
	ue.event_id,
	ue.event_type,
	ue.event_timestamp,
	c.customer_name,
	c.cnt_users,
	c.remediation_action,
	de.detector_id,
	de.confidence_level as detector_confidence_level,
	ce.case_id,
	ce.triggering_event_id,
	cce.reported_case_id,
	cce.report_timestamp,
	cae.action_id,
	cae.action_type,
	cae.action_timestamp,
	cae2.customer_action_type,
	cae2.customer_action_ts,
	cae2.customer_comment 
from {{ ref('user_events') }} ue
left join {{ ref('customers') }} c on ue.customer_id = c.customer_id 
left join {{ ref('detector_events') }} de on ue.event_id = de.event_id 
left join {{ ref('case_events') }} ce on ue.event_id = ce.triggering_event_id
left join {{ ref('customer_case_events') }} cce on ue.event_id = cce.reported_event_id 
left join {{ ref('case_action_events') }} cae on ce.case_id = cae.case_id 
left join {{ ref('customer_action_events') }} cae2 on ce.case_id = cae2.case_id 
order by ue.event_id asc
```

Also since this is in dbt, I can implement tests such as the one below which asserts that all `reported_event_id`s from `customer_case_events` exist in `user_events`

```
-- assert_all_reported_event_ids_exist_in_user_events.sql
select 
	distinct reported_event_id 
from {{ ref('customer_case_events') }} cce 
where reported_event_id not in (select distinct event_id from {{ ref('user_events') }})
```

* After I run `dbt build` with this model, I can query it.

In [16]:
sql = """
select * from abnormal
"""
run_select(sql)

Unnamed: 0,customer_id,user_id,event_id,event_type,event_timestamp,customer_name,cnt_users,remediation_action,detector_id,detector_confidence_level,case_id,reported_case_id,report_timestamp,action_id,action_type,action_timestamp,customer_action_type,customer_action_ts,customer_comment
0,3,323,1000,DATA_DOWNLOAD,2020-11-20 10:33:50,Lumina Dynamics Inc.,371,REVOKE_SESSIONS,,,,,NaT,,,NaT,,NaT,
1,7,255,1001,MAILFILTER,2020-12-09 10:10:21,Phoenix Crest Construction,347,REVOKE_SESSIONS,,,,,NaT,,,NaT,,NaT,
2,3,190,1004,SIGN_IN,2020-02-12 03:27:06,Lumina Dynamics Inc.,371,REVOKE_SESSIONS,,,,,NaT,,,NaT,,NaT,
3,10,12,1005,DATA_DOWNLOAD,2020-03-30 08:21:36,Aurora Axis Enterprises,257,REVOKE_SESSIONS_RESET_PASSWORD,,,,,NaT,,,NaT,,NaT,
4,8,182,1008,POSTURE_CHANGE,2020-01-12 23:25:23,"Nexus Auto Parts, Inc",200,REVOKE_SESSIONS,,,,,NaT,,,NaT,,NaT,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
5069,8,183,9988,SIGN_IN,2020-08-12 01:40:57,"Nexus Auto Parts, Inc",200,REVOKE_SESSIONS,,,,,NaT,,,NaT,,NaT,
5070,6,137,9989,MAILFILTER,2020-01-30 11:19:47,Elysian Capital,211,NOTIFICATION,,,,,NaT,,,NaT,,NaT,
5071,4,246,9994,SIGN_IN,2020-11-18 11:45:57,Titan Forge Technologies,248,REVOKE_SESSIONS_RESET_PASSWORD,DETECTOR_1,LOW,,,NaT,,,NaT,,NaT,
5072,7,87,9997,MAILFILTER,2020-01-12 01:58:59,Phoenix Crest Construction,347,REVOKE_SESSIONS,,,,,NaT,,,NaT,,NaT,
