# Anomaly detection

The dataset is public and references to its license could be found in the README.md in /data subdirectory of the repo.

In [1]:
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors

In [None]:
from mxnet import autograd, np, npx, gluon

In [2]:
# This is the entry point of our spark app
spark = SparkSession \
        .builder \
        .appName("2-class classification") \
        .getOrCreate()

## Where to download the dataset

The dataset we use can downloaded from https://raw.githubusercontent.com/alessio-proietti/dp-sgd-notebook/main/data/bank-additional-full-new-label.csv. 

It's shipped with the repo itself though.

In [3]:
df = spark.createDataFrame(pd.read_csv("data/bank-additional-full-new-label.csv"))

In [24]:
print(df.columns)
df.drop('duration')
df.describe(['age', 'campaign', 'pdays', 'previous']).show()
df.describe(['emp_var_rate', 'cons_price_idx', 'con_conf_idx', 'euribor3m']).show()
print('yes:', df.filter(df['y'] == 'yes').count())
print('no:', df.filter(df['y'] == 'no').count())

['age', 'job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'month', 'day_of_week', 'duration', 'campaign', 'pdays', 'previous', 'poutcome', 'emp_var_rate', 'cons_price_idx', 'con_conf_idx', 'euribor3m', 'nr_employed', 'y']
+-------+------------------+-----------------+------------------+-------------------+
|summary|               age|         campaign|             pdays|           previous|
+-------+------------------+-----------------+------------------+-------------------+
|  count|             41188|            41188|             41188|              41188|
|   mean| 40.02406040594348|2.567592502670681| 962.4754540157328|0.17296299893172767|
| stddev|10.421249980934048|2.770013542902328|186.91090734474213| 0.4949010798392895|
|    min|                17|                1|                 0|                  0|
|    max|                98|               56|               999|                  7|
+-------+------------------+-----------------+------------------+---

In [4]:
# Here we set the stages for data processing

numericCols = [field for (field, dataType) in df.dtypes if ( dataType != "string" )]
categoricalCols = [field for (field, dataType) in df.dtypes if (dataType == "string" and field != "y")]

indexOutputCols = [x + "Index" for x in categoricalCols]
oheOutputCols = [x + "OHE" for x in categoricalCols]

stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=indexOutputCols, handleInvalid="skip")
oheEncoder = OneHotEncoder(inputCols=indexOutputCols, outputCols=oheOutputCols)

assemblerInputs = oheOutputCols + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

In [5]:
stringIndexerLabel = StringIndexer(inputCol="y", outputCol="label", handleInvalid="skip")
labelModel = stringIndexerLabel.fit(df)
df = labelModel.transform(df)

In [6]:
train, validation = df.randomSplit([.75, .25], 24)

pipeline = Pipeline(stages=[stringIndexer, oheEncoder, vecAssembler])

pipelineModel = pipeline.fit(train)
train_df = pipelineModel.transform(train).select("label","features")

pipelineModel = pipeline.fit(validation)
val_df = pipelineModel.transform(validation).select("label","features")

In [None]:
#spark.stop()