In [1]:
#Configuração do container e keys
STORAGE_ACCOUNT_NAME = 'nomedastorageaccount'
CONTAINER_INPUT = 'nomecontainerinput'
CONTAINER_OUTPUT = 'nomecontaineroutput'
KEY = ''

#Mount do diretório
if not any(mount.mountPoint == '/mnt/input' for mount in dbutils.fs.mounts()):
  dbutils.fs.mount(
     source = "wasbs://{}@{}.blob.core.windows.net".format(CONTAINER_INPUT, STORAGE_ACCOUNT_NAME),
     mount_point = "/mnt/{}".format(CONTAINER_INPUT),
     extra_configs = {"fs.azure.account.key.{}.blob.core.windows.net".format(STORAGE_ACCOUNT_NAME):"{}".format(KEY)})

In [2]:
# Montando o Dataframe
FILENAME = "/pasta/arquivo.csv".format(CONTAINER_INPUT)

df = (spark
  .read
  .option("header", True)
  .option("header", True) \
  .option("sep", ',') \
  .csv(FILENAME)
)

In [3]:
#Importando as bibliotecas
from __future__ import print_function
import requests
import json
import pandas as pd
import numpy as np
import warnings
import matplotlib
warnings.filterwarnings('ignore')

import matplotlib.pyplot as plt
%matplotlib inline

from bokeh.resources import CDN
from bokeh.embed import components, file_html
from bokeh.plotting import figure,output_notebook, show
from bokeh.palettes import Blues4
from bokeh.models import ColumnDataSource,Slider
import datetime
from bokeh.io import push_notebook
from dateutil import parser
from ipywidgets import interact, widgets, fixed
output_notebook()

In [4]:
#CONFIGURANDO API ANOMALIA
# To start sending requests to the Anomaly Detector API, paste your subscription key you received after creating Anomaly Detector resource. 
subscription_key = 'subscriptionkey' 

# Use the endpoint your received from overview section of the Anomaly Detector resource you created
# the endpoint is like https://westus2.api.cognitive.microsoft.com/, different by regions, you need to concat anomalydetector/v1.0/timeseries/entire/detect

endpoint = 'endpoint'

In [5]:
#Função que chama a API
def detect(endpoint, subscription_key, request_data):
    #print(request_data)
    headers = {'Content-Type': 'application/json', 'Ocp-Apim-Subscription-Key': subscription_key}
    response = requests.post(endpoint, data=json.dumps(request_data), headers=headers)
    if response.status_code == 200:
        return json.loads(response.content.decode("utf-8"))
    else:
        print(response.status_code)
        raise Exception(response.text)

In [6]:
# Definindo função que trata o retorno da API
def build_figure(sample_data, sensitivity):
    sample_data['sensitivity'] = sensitivity
    # Chama a função detect que chama a API
    result = detect(endpoint, subscription_key, sample_data)    
    columns = {'expectedValues': result['expectedValues'], 'isAnomaly': result['isAnomaly'], 'isNegativeAnomaly': result['isNegativeAnomaly'],
          'isPositiveAnomaly': result['isPositiveAnomaly'], 'upperMargins': result['upperMargins'], 'lowerMargins': result['lowerMargins'],
          'timestamp': [parser.parse(x['timestamp']) for x in sample_data['series']], 
          'value': [x['value'] for x in sample_data['series']]}
    response = pd.DataFrame(data=columns)
    values = response['value']
    label = response['timestamp']
    anomalies = []
    anomaly_labels = []
    index = 0
    anomaly_indexes = []
    p = figure(x_axis_type='datetime', title="Batch Anomaly Detection ({0} Sensitvity)".format(sensitivity), width=700, height=600)
    for anom in response['isAnomaly']:
       
        if anom == True:
          current_value = int(values[index])
          iloc_expvalue = response.iloc[index]['expectedValues']
          iloc_upperMargins = response.iloc[index]['upperMargins']
          iloc_lowerMargins = response.iloc[index]['lowerMargins']
          sum_exp_upper = iloc_expvalue + iloc_upperMargins
          dif_exp_lower = iloc_expvalue - iloc_lowerMargins
          current_value_greaterthan_sum_exp_upper = current_value > sum_exp_upper
          current_value_lowerthan_sum_exp_upper = current_value < dif_exp_lower
          
          if (current_value_greaterthan_sum_exp_upper or current_value_lowerthan_sum_exp_upper):           
            anomalies.append(values[index])
            anomaly_labels.append(label[index])
            anomaly_indexes.append(index)
        
        index = index+1
        
    upperband = response['expectedValues'] + response['upperMargins']
    lowerband = response['expectedValues'] -response['lowerMargins']
    band_x = np.append(label, label[::-1])
    band_y = np.append(lowerband, upperband[::-1])
    boundary = p.patch(band_x, band_y, color=Blues4[2], fill_alpha=0.5, line_width=1, legend_label='Boundary')
    p.line(label, values, legend_label='Value', color="#2222aa", line_width=1)
    p.line(label, response['expectedValues'], legend_label='ExpectedValue',  line_width=1, line_dash="dotdash", line_color='olivedrab')
    anom_source = ColumnDataSource(dict(x=anomaly_labels, y=anomalies))
    anoms = p.circle('x', 'y', size=5, color='tomato', source=anom_source)
    p.legend.border_line_width = 1
    p.legend.background_fill_alpha  = 0.1
    # cria um html para fazer o plot da Bokeh
    html = file_html(p, CDN, "my plot1")
    # faz o display html
    displayHTML(html)

In [7]:
from pyspark.sql.functions import col, udf, date_format, asc
from pyspark.sql.types import DateType, TimestampType 
from datetime import datetime

# Cria função para converter a data em um timestamp
func =  udf (lambda x: datetime.strptime(x, '%m/%d/%Y %H:%M:%S'), TimestampType())
df_dt = df.withColumn('timestamp', func(col('TimeStampScript')))

# Converte no formato de data ISO-8601 yyyy-MM-dd'T'HH:mm:ss'Z' criando a coluna timestamp
df_dt = df_dt.withColumn("timestamp", date_format(col("timestamp"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))

# Cria a coluna value
df_dt = df_dt.withColumn('value', col('IOOtherBytesPersec'))

In [8]:
# Faz o filtro pela coluna escolhida no csv
filter = df_dt["Name"]=="OUTLOOK"

# Filtra os dados do dataframe pegando value e timestamp filtrando pelo outlook
df_anomaly = df_dt.select('value', df_dt.timestamp).where(filter).sort(asc("timestamp"))

# Converte o dataframe para dicionario
dic_time_series = df_anomaly.rdd.map(lambda row: row.asDict()).collect()

In [9]:
# Estrutura de dados que vai para a API
sample_data = {}
sample_data["series"] = dic_time_series
sample_data['granularity'] = 'minutely'
sample_data['customInterval'] = 6

# 85 sensitivity
build_figure(sample_data,85)

In [10]:
#Resultado API sem o gráfico

sample_data['sensitivity'] = 85
result = detect(endpoint, subscription_key, sample_data)
result