In [1]:
import asyncio
import httpx
import sys
sys.path.append('../')

!{sys.executable} -m pip install tqdm
from tqdm import tqdm

import os
import json
import pandas as pd
from pathlib import Path

#custom imports
import config
import nab_utils

from time import perf_counter

BASE_URL='http://127.0.0.1:8000'

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m24.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
/app/nab
ROOT_PATH = /app


In [2]:
import time
def timeit(func):
    async def process(func, *args, **params):
        if asyncio.iscoroutinefunction(func):
            #print('this function is a coroutine: {}'.format(func.__name__))
            return await func(*args, **params)
        else:
            print('this is not a coroutine')
            return func(*args, **params)

    async def helper(*args, **params):
        print('{}.time'.format(func.__name__))
        start = time.time()
        result = await process(func, *args, **params)
        print('>>>', time.time() - start)
        return result

    return helper

In [3]:
datafeeds_path=Path(config.ROOT_PATH) / "static" / "nab" / "datasets"
datafeed_folders=next(os.walk(datafeeds_path))[1] #GET ALL MODEL FOLDERS

#get labels file for anomalies
LABELS_FILE=Path(config.NAB_ASSETS) / "labels" / "combined_labels.json"
with open(LABELS_FILE) as json_file:
    labels=json.load(json_file)

In [25]:
def build_datafeed_object(datafeed_name):
    datafeed_object={}
    datafeed_object["name"]=datafeed_name
    datafeed_object["group"]="nab"
    datafeed_object["type"]="demo"
    datafeed_object["url"]="/nab/"+str(datafeed_name)
    return datafeed_object

def build_dataset_object(dataset_name):
    dataset_object={}

    #load dataset and format
    dataset=pd.read_csv(config.NAB_ASSETS / "raw_datasets" / dataset_name) #load dataset
    dataset['timestamp']=pd.to_datetime(dataset['timestamp']) #set timestamps to datetime objects
    dataset=dataset.set_index("timestamp") #set index to datetime column
    dataset["anomaly_label"]=False # creates new column for labeled anomalies and fills with False
    nab_utils.add_labels_to_dataset(dataset,dataset_name)

    if 'real' in dataset_name:
        dataset_object["tags"]=['real']
    else:
        dataset_object["tags"]=['artificial']

    #build dataset model object
    dataset_object["name"]=dataset_name
    dataset_object["folder"] = dataset_name.split("/")[0]
    dataset_object["filename"] = dataset_name.split("/")[1]
    dataset_object["labeled"] = 'True'
    dataset_object["start_time"] = dataset.index[0].strftime('%Y-%m-%dT%H:%M:%S.%f%z')
    dataset_object["end_time"] = dataset.index[1].strftime('%Y-%m-%dT%H:%M:%S.%f%z')
    dataset_object["anomaly_count"] = len(dataset[dataset['anomaly_label'] == True])


    anomaly_records=[]
    #build anomaly records for this dataset
    for item in labels[dataset_name]: #for each entry in the labels for this file
        anomaly_object={}
        anomaly_object["value"] = float(dataset.at[item,"value"])
        anomaly_object["time"] = item
        anomaly_object["model"] = "Labeled Anomaly"
        anomaly_object["status"] = 'test'
        anomaly_object["severity"] = "low"
        anomaly_object["tags"] = ['label']
        anomaly_records.append(anomaly_object)

    return dataset_object, anomaly_records

async def register_original_dectector():
    detector_object={}
    detector_object["name"] = "Original Labeler"
    detector_object["description"] = "A collection of the original datasets with the provided ground truth labels for anomalies."
    detector_object["tags"] = ['Demo']

    async with httpx.AsyncClient(base_url=BASE_URL) as client:
        response = await client.post("/detector/create", json=detector_object)
    return response.json()

In [20]:
#test adding datafeed
@timeit
async def add_datafeed_test(datafeed_name):
       datafeed_object=build_datafeed_object(datafeed_name)
       async with httpx.AsyncClient(base_url=BASE_URL) as client:
              response = await client.post("/datafeed/create", json=datafeed_object)
       return response.json()
       
@timeit
async def add_dataset_test(datafeed_id):
       #test adding a dataset with the id of a created datafeed
       dataset_name="realKnownCause/nyc_taxi.csv"
       dataset_object,anomaly_records=build_dataset_object(dataset_name)
       async with httpx.AsyncClient(base_url=BASE_URL) as client:
              response = await client.post("/dataset/create/"+datafeed_id, json=dataset_object)
       return response.json()['id'], anomaly_records

@timeit
async def test_sequential_anomaly_insert(dataset_id, anomaly_records):
       #test adding an anomaly to a dataset
       for item in anomaly_records:
              async with httpx.AsyncClient(base_url=BASE_URL) as client:
                     anomaly_response = await client.post("/anomaly/create/"+dataset_id, json=item)

@timeit
async def test_gather_individual_anomaly_insert(dataset_id, anomaly_records):
       async with httpx.AsyncClient(base_url=BASE_URL) as client:
              anomalies = await asyncio.gather(*[client.post("/anomaly/create/"+dataset_id, json=anomaly) for anomaly in anomaly_records])
       return anomalies

@timeit
async def test_batch_insert_anomalies(dataset_id, anomaly_records):
       async with httpx.AsyncClient(base_url=BASE_URL) as client:
              anomaly_response = await client.post("/anomaly/create_many/"+dataset_id, json=anomaly_records)
       return anomaly_response

In [29]:
detector_response= await register_original_dectector()

In [30]:
detector_id=detector_response['id']

In [23]:
response = await add_datafeed_test("test_nab")
datafeed_id=response['id']

add_datafeed_test.time
>>> 0.16153311729431152


In [24]:
dataset_id, anomaly_records = await add_dataset_test(datafeed_id)

add_dataset_test.time
>>> 0.1858048439025879


In [102]:
result= await test_sequential_anomaly_insert(dataset_id, anomaly_records)
result = await test_gather_individual_anomaly_insert(dataset_id, anomaly_records)
result = await test_batch_insert_anomalies(dataset_id, anomaly_records)

test_sequential_anomaly_insert.time
>>> 0.8003561496734619
test_gather_individual_anomaly_insert.time
>>> 0.3228011131286621
test_batch_insert_anomalies.time
>>> 0.1400461196899414


## Build Datafeeds / Datasets / Anomalies

### Loops through each datafeed and adds all the datasets, with each dataset adding it's labeled anomalies

In [114]:
time_before=perf_counter()


for datafeed in tqdm(datafeed_folders): #for each datafeed
    datafeed_object=build_datafeed_object(datafeed)#build datafeed object
    async with httpx.AsyncClient(base_url=BASE_URL) as client: #add the datafeed to the database
        response = await client.post("/datafeed/datafeeds", json=datafeed_object)
    datafeed_id=response.json()['id']

    #for each dataset in the folder (ends in .csv)
    for csv_file in Path(datafeeds_path / datafeed).glob('*.csv'): 
        dataset_name=str(Path(csv_file.parent.name) / csv_file.name) #convert full Path object to a '/folder/filename.csv' string
        dataset_object, anomaly_entries=build_dataset_object(dataset_name) #build dataset object by passing in the dataset name and the datafeed id

        async with httpx.AsyncClient(base_url=BASE_URL) as client: #add the dataset to the database
            response = await client.post("/dataset/datasets/"+datafeed_id, json=dataset_object)
        
        dataset_id=response.json()['id']
        #for each anomaly for this dataset
        for anomaly in anomaly_entries:
            async with httpx.AsyncClient(base_url=BASE_URL) as client: #add the anomaly to the database
                response = await client.post("/anomaly/anomalies/"+dataset_id, json=anomaly)

total_time=perf_counter()-time_before
print(total_time)

100%|██████████| 7/7 [00:31<00:00,  4.46s/it]

31.190025987998524





In [7]:
async def register_datafeed(datafeed_name):
    print("registering datafeed:" +str(datafeed_name) )
    datafeed_object=build_datafeed_object(datafeed_name)#build datafeed object
    async with httpx.AsyncClient(base_url=BASE_URL) as client: #add the datafeed to the database
        response = await client.post("/datafeed/create", json=datafeed_object)
    datafeed_id=response.json()['id']

    csv_files=Path(datafeeds_path / datafeed_name).glob('*.csv')
    datasets = await asyncio.gather(*[register_dataset(filename,datafeed_id) for filename in csv_files])
    return datasets


async def register_dataset(filename,datafeed_id):
    # await asyncio.sleep(10)
    dataset_name=str(Path(filename.parent.name) / filename.name)
    dataset_object, anomaly_entries=build_dataset_object(dataset_name) #build dataset object by passing in the dataset name and the datafeed id
    async with httpx.AsyncClient(base_url=BASE_URL, timeout=15) as client: #add the dataset to the database
        response = await client.post("/dataset/create/"+datafeed_id, json=dataset_object) 
        dataset_id=response.json()['id']
        #anomalies = await asyncio.gather(*[register_anomlay(anomaly,dataset_id) for anomaly in anomaly_entries])
        #anomalies = await asyncio.gather(*[client.post("/anomaly/anomalies/"+dataset_id, json=anomaly) for anomaly in anomaly_entries])
        anomalies = await client.post("/anomaly/create_many/"+dataset_id, json=anomaly_entries)
    return anomalies

In [111]:
# SEQUENTIAL TEST
time_before=perf_counter()
datasets= await register_datafeed("realKnownCause")
datasets= await register_datafeed("realAdExchange")
print(perf_counter()-time_before)
# PARALLEL TEST
time_before=perf_counter()
await asyncio.gather(register_datafeed("artificialNoAnomaly"),register_datafeed("realAdExchange"))
print(perf_counter()-time_before)

registering datafeed:realKnownCause
registering datafeed:realAdExchange
2.205112324998481
registering datafeed:artificialNoAnomaly
registering datafeed:realAdExchange
1.8204953249987739


In [10]:
# with individual anomaly create calls
time_before=perf_counter()
datafeeds = await asyncio.gather(*[register_datafeed(datafeed_name) for datafeed_name in datafeed_folders])
print(perf_counter()-time_before)

registering datafeed:artificialNoAnomaly
registering datafeed:artificialWithAnomaly
registering datafeed:realAdExchange
registering datafeed:realAWSCloudwatch
registering datafeed:realKnownCause
registering datafeed:realTraffic
registering datafeed:realTweets
9.44293911099885


In [124]:
datafeeds[1][0][0].json()

{'id': '38bddb2a-06b0-4cc9-a434-4b33e7fc1eaa',
 'time_created': '2024-09-06T21:42:13.546260Z',
 'time_updated': '2024-09-06T21:42:14.400466Z',
 'value': 40.0,
 'time': '2014-04-11T00:00:00Z',
 'model': 'Labeled Anomaly',
 'status': 'status',
 'severity': 'low',
 'tags': ['demo'],
 'dataset_id': '77f95f3b-2951-4367-9f84-b8ddf670a9aa'}

In [72]:
datafeeds[0][0].json()

{'id': '5c19d144-9b62-4786-abf7-d0377f33b970',
 'time_created': '2024-09-06T20:53:16.793671Z',
 'time_updated': '2024-09-06T20:53:16.884305Z',
 'name': 'artificialNoAnomaly/art_daily_no_noise.csv',
 'folder': 'artificialNoAnomaly',
 'filename': 'art_daily_no_noise.csv',
 'labeled': True,
 'anomaly_count': 0,
 'tags': ['artificial'],
 'start_time': '2014-04-01T00:00:00Z',
 'end_time': '2014-04-01T00:05:00Z',
 'datafeed_id': 'c3b3d94d-8637-496c-8458-552c1a7b8e93',
 'anomalies': []}