In [0]:
# Example 1: learning about spark and anomaly detection of a temperature dataset.
# Dataset: datasets/temperature.csv
# Author: Humberto Bianchini

In [0]:
# 1) Importing all necessary libraries and Spark session creation.
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import monotonically_increasing_id



spark = SparkSession.builder.appName("AnomalyDetection").getOrCreate()

In [0]:
# 2) Reading and showing the first rows of the dataset.
datasetpath = "/FileStore/tables/temperature.csv"
data = spark.read.format("csv").options(header="true", inferschema="true").load(datasetpath)
data.show(5, False)

In [0]:
# 3) Showing the dataset columns.
data.columns

In [0]:
# 4) Showing the dataset rows count.
data.count()

In [0]:
# 5) Choosing one city to analysis (Vancouver in this example) and showing the details.
dataAnalysis = data.select('datetime','Vancouver')
dataAnalysis.show(5)

In [0]:
# 6) Filtering only data that don't have null values.
dataNotNull = dataAnalysis.filter(col('Vancouver').isNotNull())
dataNotNull.show(5)

In [0]:
# 7) Inserting index for the data.
df_plots = dataNotNull.withColumn("index", monotonically_increasing_id())
df_plots.show(5)

In [0]:
# 8) Calculating the "Mean" and "Standard Deviation" of data.
from pyspark.sql.functions import avg, stddev
list_stats = dataNotNull.select(avg(col('Vancouver')).alias('mean'),stddev(col('Vancouver')).alias('stdDev')).collect()
mean = list_stats[0]['mean']
dev = list_stats[0]['stdDev']
print(f"Mean: {mean}")
print(f"Standard deviation: {dev}")

In [0]:
# 9) Creating a new dataframe with the stats.

df_stats = dataNotNull.select(avg('Vancouver').alias('mean'), stddev('Vancouver').alias('stdDev'))
df_stats.show()

In [0]:
# 10) Describing the dataset
dataNotNull.describe().show()

In [0]:
# 11) Setting two functions to calculate anomalies.
from pyspark.sql.types import DoubleType, IntegerType
def distance(x):
  mean = 283.8626
  dev = 6.6401
  return ((x - mean)/dev)


def anomalies(x):
  dev = 6.6401
  if (x > 2):
    return 1
  else:
    return 0

  

distance_udf_double = udf(lambda z: distance(z), DoubleType())
anomalies_udf_int = udf(lambda z: anomalies(z), IntegerType())

In [0]:
# 12) Calculating the distances between a temperature value and the mean.
data_new = dataNotNull.select('Vancouver',distance_udf_double('Vancouver').alias('distance'))
data_new.show(5)

In [0]:
from pyspark.sql.functions import abs
data_new=data_new.select('Vancouver','distance', abs(col('distance')).alias("ABSdistance"))
data_new.show(5)

In [0]:
data_new = data_new.select('Vancouver','distance', "ABSdistance", anomalies_udf_int("ABSdistance").alias("isAnomaly"))
data_new.show(10)

In [0]:
# 13) Showing the found anomalies.
data_new.filter(col("isAnomaly")>0).show(10)

In [0]:
display(dataNotNull.select("Vancouver"))

Vancouver
284.63
284.62904131
284.626997923
284.624954535
284.622911147
284.620867759
284.618824371
284.616780983
284.614737595
284.612694207


In [0]:
display(dataNotNull.select("Vancouver"))

Vancouver
284.63
284.62904131
284.626997923
284.624954535
284.622911147
284.620867759
284.618824371
284.616780983
284.614737595
284.612694207
