# Using AWS Glue and PyDeequ to detect anomalies in large datasets

PyDeequ allows us to persist the metrics we computed on dataframes in a so-called MetricsRepository using AWS Glue. In the following example, we showcase how to store a metrics file in S3, query them later on to detect anomalous data. Afterwards, when an anomaly is detected an SNS notification is sent.

This tutorial is part of a PyDeequ Glue post. This tutorial uses a dataset queried from AWS Athena.

In [1]:
import sys
from awsglue.utils import getResolvedOptions 
from pyspark.context import SparkContext
from awsglue.context import GlueContext


glueContext = GlueContext(SparkContext.getOrCreate())
session = glueContext.spark_session

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
107,application_1595892420059_0108,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Extract the Data table 
Let us demo the Holt Winters Anomaly Strategy on the `jewelry_dataset`, the newly created table from AWS Athena.  

To use PyDeequ on `jewelry_dyf` convert the dataset to a dataframe using `.toDF()`. Next, use the `dropDuplicates` method to remove any potential duplicates within the data set.

In [2]:
jewelry_dyf = glueContext.create_dynamic_frame.from_catalog(database="jewelry_db", table_name="jewelry_dataset")

jewelry_df = jewelry_dyf.toDF()
jewelry_df.dropDuplicates()

jewelry_df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+----+-----------+
|total_votes|year|review_date|
+-----------+----+-----------+
|          7|2014|  2014-4-01|
|          7|2014|  2014-4-01|
|          7|2014|  2014-4-01|
|          7|2014|  2014-4-01|
|          7|2014|  2014-4-01|
+-----------+----+-----------+
only showing top 5 rows

## Transform the Data table

It looks like the `jewelry_df` can be further simplified. Using the `date_format` method, we can change the column to only show the month and year of the total_votes. Afterwards, we can `filter` the `jewelry_df2` by year and contain only the two needed columns.


In [3]:
import pyspark.sql.functions as f

jewelry_df2 = jewelry_df.withColumn('review_date', f.date_format('review_date', 'yyyy/M'))\
        .orderBy('review_date', ascending = False)


df_2013 = jewelry_df2.filter("year ='2013'").select("review_date","total_votes")
df_2014 = jewelry_df2.filter("year ='2014'").select("review_date","total_votes")
df_2015 = jewelry_df2.filter("year ='2015'").select("review_date","total_votes")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
df_2013.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+-----------+
|review_date|total_votes|
+-----------+-----------+
|     2013/9|          2|
|     2013/9|          2|
|     2013/9|          2|
|     2013/9|          8|
|     2013/9|          2|
|     2013/9|          8|
|     2013/9|          2|
|     2013/9|          2|
|     2013/9|          2|
|     2013/9|          8|
+-----------+-----------+
only showing top 10 rows

## Let us use PyDeequ to detect anomalous data

### Initialize Metrics Repository

We will be demoing with the `FileSystemMetricsRepository` class. A metrics repository can be used as a data quality report overtime. 

**Metrics Repository allows us to store the metrics in json format on S3.**

In [5]:
# S3 write path 
s3_write_path = "s3://devpydeequ/tmp/holt_winters_tutorial.json"

import pydeequ
from pydeequ.repository import *

metricsRepository = FileSystemMetricsRepository(session,s3_write_path)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## We will be running a Holt Winters Anomaly Strategy on our current datatasets. 

Holt Winters checks for trends, seasonality and average within a dataset. Therefore, to create these averages Deequ uses two cycles of data to forecast datapoints. 

Our dataset today is collected monthly, and follows an annual seasonal trend. Let us use the `MetricInterval.Monthly` and `SeriesSeasonality.Yearly`. This selection requires us to collect at least 25 datapoints. The initial 24 datapoints are monthly values from 2013 and 2014, which will be used to create the Holt Winters model. The following values in 2015 are the forecasted points, which could can concede an anomalous value. 

### We will be loading the 2013 dataset into a metrics file 

First, let us create a `for` loop that will be iterating through `df_2013`. Using `month` we can create a `date` to later help us query values from `df_2013`. The `filter` method allows us create a `df` dataframe which contains the `total_votes` values by month (e.x. the first iteration will be a table of values from January 2013). 


Next each set of metrics that we computed needs be indexed by a so-called `ResultKey`, which contains a timestamp and supports arbitrary tags in the form of key-value pairs.

Finally, we create a `VerificationSuite`. We make Deequ write and store our metrics in S3 by adding the `useRepository` and the `saveOrAppendResult` method. Then we add the `Holt Winters` with a `Sum` analyzer to calculate monthly `total_votes`.

In [6]:
from pydeequ.verification import *

for month in range(1,13):
    date = "\'2013/"+str(month)+"\'"
    df = df_2013.filter("review_date =" + date)
    
    key_tags = {'tag':  date}
    result_key_2013 = ResultKey(session, ResultKey.current_milli_time(), key_tags)

    jewelry_result = VerificationSuite(session).onData(df)\
        .useRepository(metricsRepository) \
        .saveOrAppendResult(result_key_2013) \
        .addAnomalyCheck(HoltWinters(session, MetricInterval.Monthly, SeriesSeasonality.Yearly), Sum('total_votes'))\
        .run()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### We will be creating an anomaly check for 2014

This process is very similar to loading the 2013 dataset.

In [7]:
for month in range(1,13):
    date = "\'2014" +'/'+str(month)+"\'"
    df = df_2014.filter("review_date =" + date)
    key_tags = {'tag':  date}
    result_key_2014 = ResultKey(session, ResultKey.current_milli_time(), key_tags)

    jewelry_result = VerificationSuite(session).onData(df)\
        .useRepository(metricsRepository) \
        .saveOrAppendResult(result_key_2014) \
        .addAnomalyCheck(HoltWinters(session, MetricInterval.Monthly, SeriesSeasonality.Yearly), Sum('total_votes'))\
        .run()

    

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Let us load the current metrics repository. 
The repository now has the monthly total votes for 2013 to 2014

In [8]:
analysisResult_metRep = metricsRepository.load() \
                            .before(ResultKey.current_milli_time()) \
                            .getSuccessMetricsAsDataFrame()

analysisResult_metRep.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-----------+----+-------+-------------+---------+
|entity|   instance|name|  value| dataset_date|      tag|
+------+-----------+----+-------+-------------+---------+
|Column|total_votes| Sum|41199.0|1597783116602| '2013/1'|
|Column|total_votes| Sum|32570.0|1597783122280| '2013/2'|
|Column|total_votes| Sum|31366.0|1597783125923| '2013/3'|
|Column|total_votes| Sum|28408.0|1597783129935| '2013/4'|
|Column|total_votes| Sum|28395.0|1597783133643| '2013/5'|
|Column|total_votes| Sum|25896.0|1597783137074| '2013/6'|
|Column|total_votes| Sum|27524.0|1597783140544| '2013/7'|
|Column|total_votes| Sum|29689.0|1597783144126| '2013/8'|
|Column|total_votes| Sum|25246.0|1597783147704| '2013/9'|
|Column|total_votes| Sum|27364.0|1597783151084|'2013/10'|
|Column|total_votes| Sum|25710.0|1597783154598|'2013/11'|
|Column|total_votes| Sum|50431.0|1597783158101|'2013/12'|
|Column|total_votes| Sum|42801.0|1597783165759| '2014/1'|
|Column|total_votes| Sum|41202.0|1597783169756| '2014/2'|
|Column|total_

### Great!  We have created the trend for the Holt Winters algorithm. 
Now let us detect any anomalies within 2015. If you want to recieve an e-mail notification create a variable `topicArns` containing the arn of the SNS topic you created and an `snsClient` with your region. 

Next create another HoltWinters anomaly check similar to the 2013 and 2014 dataset. Except iterate from month 1-8 (the dataset only went up to August of 2015). 

Within the for loop check for an anomaly using the `jewelry_result.status`. If it is not a success, that means an anomaly has been detected. Next, `collect` the `constraint_message` to see the error value. 

Now let us create an SNS Notification! 
Use `publish` to create an SNS notification. Include the `topicArn`, a `Message`, `subject` and `MessageAttribute`. If an anomaly has been detected break out of the loop. 



In [None]:
# Use AWS SNS 
import boto3 
import json

# Topic for AWS SNS 
topicArn = 'arn:aws:sns:us-west-2:498730894712:jewelry_hw'
snsClient = boto3.client('sns', region_name = 'us-west-2')


for month in range(1,9):
    date = "\'2015" +'/'+str(month)+"\'"
    df = df_2015.filter("review_date =" + date)
    key_tags = {'tag':  date}
    result_key_2015 = ResultKey(session, ResultKey.current_milli_time(), key_tags)

    jewelry_result = VerificationSuite(session).onData(df)\
        .useRepository(metricsRepository) \
        .saveOrAppendResult(result_key_2015) \
        .addAnomalyCheck(HoltWinters(session, MetricInterval.Monthly, SeriesSeasonality.Yearly), Sum('total_votes'))\
        .run()
    
    df = VerificationResult.checkResultsAsDataFrame(session, jewelry_result)
    
    if (jewelry_result.status != "Success"):
        print("Anomaly for total_votes has been detected")
        print(date)
        message = df.select("constraint_message").collect()
        response = snsClient.publish(TopicArn = topicArn,
                             Message = "anomaly detected in data frame: \n" + json.dumps(message),
                             Subject = "Anomaly Detected in the jewelry dataset:"+ date,
                             MessageAttributes = {"TransactionType":
                                            {"DataType": "String.Array", "StringValue": "Anomaly Detected in Glue"}})
        break
    
    

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We recieved an e-mail finding an anomaly for February 2015. Looking back at our graphed dataset this supports our hypothesis of having a Holt Winters Anomaly on February 2015. 