### Anomaly Detection Basic Tutorial

In [2]:
import os
os.environ["SPARK_VERSION"] = '3.3'
os.environ["JAVA_HOME"] = '/usr/lib/jvm/java-11-openjdk-amd64/'

In [9]:
pip install pydeequ==1.2.0

Note: you may need to restart the kernel to use updated packages.


In [10]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [11]:
pip install sagemaker_pyspark

Note: you may need to restart the kernel to use updated packages.


In [21]:
### Ejemplo de Anomaly detection

In [12]:
from pyspark.sql import SparkSession, Row, DataFrame
import pandas as pd
import sagemaker_pyspark

import pydeequ

classpath = ":".join(sagemaker_pyspark.classpath_jars())

spark = (SparkSession
    .builder
    .config("spark.driver.extraClassPath", classpath)
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate())
sc = spark.sparkContext

In [13]:
from pydeequ.repository import *
metricsRepository = InMemoryMetricsRepository(spark)

In [14]:
yesterdaysDataset = sc.parallelize([
            Row(a=3, b=0,),
            Row(a=3, b=5,)]).toDF()

In [15]:
from pydeequ.verification import *

yesterdaysKey = ResultKey(spark, ResultKey.current_milli_time() - 24 * 60 * 60 * 1000)


In [17]:
from pydeequ.anomaly_detection import *
from pydeequ.analyzers import *

prev_Result = VerificationSuite(spark).onData(yesterdaysDataset) \
    .useRepository(metricsRepository) \
    .saveOrAppendResult(yesterdaysKey) \
    .addAnomalyCheck(RelativeRateOfChangeStrategy(maxRateIncrease=2.0), Size()) \
    .run()

                                                                                

In [18]:
todaysDataset = sc.parallelize([
            Row(a=3,  b=0,),
            Row(a=3,  b=5,),
            Row(a=100,b=5,),
            Row(a=2,  b=30,),
            Row(a=10, b=5,)]).toDF()

In [19]:
todaysKey = ResultKey(spark, ResultKey.current_milli_time())

currResult = VerificationSuite(spark).onData(todaysDataset) \
    .useRepository(metricsRepository) \
    .saveOrAppendResult(todaysKey) \
    .addAnomalyCheck(RelativeRateOfChangeStrategy(maxRateIncrease=2.0), Size()) \
    .run()

In [20]:
if (currResult.status != "Success"):
    print("Anomaly detected in the Size() metric!")
    metricsRepository.load().forAnalyzers([Size()]).getSuccessMetricsAsDataFrame().show()

Anomaly detected in the Size() metric!
+-------+--------+----+-----+-------------+
| entity|instance|name|value| dataset_date|
+-------+--------+----+-----+-------------+
|Dataset|       *|Size|  2.0|1725761063501|
|Dataset|       *|Size|  5.0|1725847522832|
+-------+--------+----+-----+-------------+

