# Anomaly Detection: Retail Sales


In [1]:
import os
# indicate your Spark version, here we use Spark 3.5 with pydeequ 1.4.0
os.environ["SPARK_VERSION"] = '3.2'
os.environ["HADOOP_HOME"] = "C:\\hadoop\\winutils\\hadoop-3.3.6"

In [2]:
from pyspark.sql import SparkSession, Row, DataFrame
import pandas as pd
import sagemaker_pyspark

import pydeequ

classpath = ":".join(sagemaker_pyspark.classpath_jars())

spark = (SparkSession
    .builder
    .config("spark.driver.extraClassPath", classpath)
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate())
sc = spark.sparkContext


### Initialize Metrics Repository

In [62]:
from pydeequ.repository import *
metricsRepository = InMemoryMetricsRepository(spark)

## Anomaly Detection

In [63]:
df = spark.read.parquet("Retail_sales.parquet")


In [64]:
from pydeequ.verification import VerificationSuite
from pydeequ.analyzers import Size, Mean
from pydeequ.anomaly_detection import *

key1 = ResultKey(spark, ResultKey.current_milli_time() - 24 * 60 * 60 * 1000)

Result = VerificationSuite(spark).onData(df) \
    .useRepository(metricsRepository) \
    .saveOrAppendResult(key1) \
    .addAnomalyCheck(	SimpleThresholdStrategy(lowerBound=0, upperBound=50), Mean("Units Sold")) \
    .run()

In [65]:
metricsRepository.load().forAnalyzers([Mean("Units Sold")]).getSuccessMetricsAsDataFrame().show()

+------+----------+----+-----------------+-------------+
|entity|  instance|name|            value| dataset_date|
+------+----------+----+-----------------+-------------+
|Column|Units Sold|Mean|6.161966666666666|1726505969596|
+------+----------+----+-----------------+-------------+



In [66]:
if (Result.status != "Success"):
    print("Anomaly detected in the Mean(Units Sold) metric!")
    metricsRepository.load().forAnalyzers([Mean("Units Sold")]).getSuccessMetricsAsDataFrame().show()


Anomaly detected in the Mean(Units Sold) metric!
+------+----------+----+-----------------+-------------+
|entity|  instance|name|            value| dataset_date|
+------+----------+----+-----------------+-------------+
|Column|Units Sold|Mean|6.161966666666666|1726505969596|
+------+----------+----+-----------------+-------------+

