In [1]:
import writefile_run as writefile_run

In [2]:
%%writefile_run bayeschangept.py

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn import preprocessing 
from matplotlib.pylab import rcParams
import datetime as dt
import cProfile
import bayesian_changepoint_detection.offline_changepoint_detection as offcd
import bayesian_changepoint_detection.online_changepoint_detection as oncd
from functools import partial
import matplotlib.cm as cm
import argparse
import time
# Importing reader and checker python files as modules
import reader_writer.reader as reader
import reader_writer.checker as checker
import reader_writer.reader_configs as read_args
import reader_writer.writer_configs as write_args
import psycopg2

import json
from pandas.io.json import json_normalize

import warnings
warnings.filterwarnings('ignore')

rcParams['figure.figsize'] = 12, 9
rcParams[ 'axes.grid']=True

Use scipy logsumexp().


In [3]:
assetno = ['1']
con = '52.173.76.89:4242'
src_type     =  'opentsdb'
param = ['FE-001.DRIVEENERGY']
from_timestamp = 1520402214
to_timestamp = 1520407294

In [24]:
%%writefile_run bayeschangept.py -a

def read(reader_kwargs):
    response_dict=reader.reader_api(**reader_kwargs)
#     print(response_dict)
    print("Getting the dataset from the reader....\n")
    entire_data = parse_dict_to_dataframe(response_dict)
    print(entire_data.head())
#     if(data[data.columns].values)
#     data[data.columns] = data[data.columns].values.astype(np.float64)
    print(entire_data.dtypes)
    print(entire_data.shape)
#     entire_data = entire_data[np.isfinite(entire_data[entire_data.columns].values)]
    return entire_data

In [25]:
def parse_dict_to_dataframe(response_dict):
#     print(response_dict)
    entire_data_set = []
    
    for data_per_asset in response_dict['body']:
        assetno = data_per_asset['assetno']
        for data_per_metric in data_per_asset['readings']:
            data = pd.DataFrame(data_per_metric['datapoints'],columns=['timestamp',data_per_metric['name']])
            data.index = data['timestamp']
            del data['timestamp']
            data['assetno']=assetno
            entire_data_set.append(data)
    
    return pd.concat(entire_data_set)

In [31]:
def call(assetno,from_timestamp,to_timestamp,con,para_list,source_type='opentsdb',table_name='',
        qry_str='',impute_fill_method='forward',down_sampling_method=None,down_sampling_window=None,freq=None,
        resample_fill_method=None,to_resample=None,to_impute=None,thres_prob=0.5,samples_to_wait=10,expected_run_length=100):

        reader_kwargs={
            'assetno':assetno,
            'from_timestamp':from_timestamp,
            'to_timestamp':to_timestamp,
            'con':con,
            'para_list':para_list,
            'source_type':source_type,
            'table_name':table_name,
            'qry_str':qry_str,
            'impute_fill_method':impute_fill_method,
            'down_sampling_method':down_sampling_method,
            'down_sampling_window':down_sampling_window,
            'freq':freq,
            'resample_fill_method':resample_fill_method,
            'to_resample':to_resample,
            'to_impute':to_impute
        }

        algo_kwargs={
            'thres_prob':thres_prob,
            'samples_to_wait':samples_to_wait,
            'expected_run_length':expected_run_length
        }
        
        entire_data = read(reader_kwargs)
#         print(original_data.head())
#         anom_indexes = anomaly_detector(original_data)

In [32]:
kwargs = {
            'assetno':assetno,
            'from_timestamp':from_timestamp,
            'to_timestamp':to_timestamp,
            'con':con,
            'para_list':param,
            'source_type':src_type,
            'table_name':'',
            'qry_str':'',
            'impute_fill_method':'forward',
            'down_sampling_method':None,
            'down_sampling_window':None,
            'freq':None,
            'resample_fill_method':None,
            'to_resample':None,
            'to_impute':None,
            'thres_prob':0.5,
            'samples_to_wait':10,
            'expected_run_length':100
        }

call(**kwargs)

Getting the dataset from the reader....

               FE-001.DRIVEENERGY assetno
timestamp                                
1520402214990          177.208099       1
1520402224990          443.687561       1
1520402234990          127.826195       1
1520402244990          167.014084       1
1520402254990          418.113342       1
FE-001.DRIVEENERGY    float64
assetno                object
dtype: object
(508, 2)
               FE-001.DRIVEENERGY assetno
timestamp                                
1520402214990          177.208099       1
1520402224990          443.687561       1
1520402234990          127.826195       1
1520402244990          167.014084       1
1520402254990          418.113342       1


In [33]:
% matplotlib inline
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [None]:
%%writefile_run bayeschangept.py -a

def ts_to_unix(t):
    return int((t - dt.datetime(1970, 1, 1)).total_seconds()*1000)

In [None]:
%%writefile_run bayeschangept.py -a

def readata_from_opentsb(reader_kwargs):
    (checker.check_global_args(**reader_kwargs))
    data=reader.reader_api(**reader_kwargs)
    print("Downloading the dataset from the opentsdb....")
    data= data.iloc[:, ::-1]
    data.index = data['timestamp']
    assetno = pd.unique(data['assetno'])
    del data['timestamp']
    del data['assetno']
    data[data.columns] = data[data.columns].values.astype(np.float64)
    print(data.dtypes)
    data = data[np.isfinite(data[data.columns].values)]
    return data

In [None]:
sample_json = '''{
"header":{
"parameter_count":12,
"asset_count":7,
"data_count":14000
},
"body":[
{ "asset": "1","readings":[{"name":"<TagName>","datapoints":[ {"<EpochInMs>":1.34}]}]}]}'''

In [None]:
time_epochs = set().union(*(d.keys() for d in data_list))

In [None]:
values = set().union(*(d.values() for d in data_list))

In [None]:
time_epochs

In [None]:
df = pd.DataFrame({'Timestamp':time_epochs,metric_name:values})

In [None]:
df.index = df['Timestamp']

In [None]:
del df['Timestamp']

In [None]:
df

In [None]:
pd.DataFrame.from_records(d['body'][0]['readings'])

In [None]:
%%writefile_run bayeschangept.py -a

def write_to_db(table_name,col_names,col_vals):
    
#     print("\n Changepoint info : \n {}".format(col_vals))
    
    conn = psycopg2.connect(**reader.db_props.db_connection)
    cur = conn.cursor()
    
    col_vals1 = [[str(val) if(type(val)!=str) else "'{}'".format(val) for val in row] for row in col_vals]
    
    joined_col_vals = ["({})".format(','.join(map(str,val))) for val in col_vals1]
    fmt_col_vals = (','.join(joined_col_vals))
    insert_query = """ INSERT INTO {} ({}) VALUES{};""".format(table_name,col_names,fmt_col_vals)
#     print(insert_query)
    cur.execute(insert_query)
    
    conn.commit()
    cur.close()
    conn.close()

In [None]:
%%writefile_run bayeschangept.py -a

def processanomalies(original_data,anom_indexes,sql_query_args,is_csv=False,window=10):
    
    col_names_list = list(sql_query_args.keys())
    col_names = ','.join(col_names_list)
    col_vals = []
    table_name = write_args.table_name
    
    for i in anom_indexes:
        
        sql_query_args['parameter_list'] = '[{}]'.format(original_data.columns[0])
        sql_query_args['event_name'] = original_data.columns[0]+write_args.anomaly_code+'_anomaly'

        if(is_csv):
            time_series = original_data.index[i-window:i+window]
            sql_query_args['event_timestamp'] =  str(original_data.index[i])
            sql_query_args['event_timestamp_epoch'] = str(ts_to_unix(original_data.index[i]))
        else:
            time_series = (pd.to_datetime(original_data.index[i-window:i+window],unit='ms',utc=True))
            sql_query_args['event_timestamp'] =  str(pd.to_datetime(original_data.index[i],unit='ms',utc=True))
            sql_query_args['event_timestamp_epoch'] = str((original_data.index[i]))

        time_around_anoms = ["''{}''".format((t)) for t in time_series]
        data_around_anoms = {'timestamp':time_around_anoms,
                            'value':(list(original_data.iloc[i-window:i+window,0].values))}
        
        pts_around_anoms = ''
        
        for key,val in data_around_anoms.items():
            pts_around_anoms += "{}:{},".format(key,val)
        
        sql_query_args['event_context_info'] = "{}".format("{"+pts_around_anoms.strip(',')+"}")
        
        col_vals.append(list(sql_query_args.values()))
    
    write_to_db(table_name,col_names,col_vals)

In [None]:
%%writefile_run bayeschangept.py -a

def analyse_data(file,date_col,time_format='%Y-%m',isweek=False):
    if(isweek!=True):
            dateparse = lambda dates: pd.to_datetime(dates,infer_datetime_format=True)
    else:
        dateparse = lambda dates: dt.datetime.strptime(dates+'-0', time_format)
    data = pd.read_csv(file, parse_dates=[date_col], index_col=date_col,date_parser=dateparse)
    return data

In [None]:
%%writefile_run bayeschangept.py -a

def normalise_standardise(data):    
    # Create a minimum and maximum processor object
    min_max_scaler = preprocessing.MinMaxScaler()
    # Create an object to transform the data to fit minmax processor
    data_norm = pd.DataFrame(min_max_scaler.fit_transform(data.values),columns=data.columns,index=data.index)
    data_standardised = (data_norm - data_norm.mean())/(data_norm.std())
    return data_standardised

In [None]:
%%writefile_run bayeschangept.py -a

def findonchangepoint(data,mean_runlength=100):
    R, maxes = oncd.online_changepoint_detection(data, partial(oncd.constant_hazard, mean_runlength),
                                                 oncd.StudentT(0.1, .01, 1, 0))
    return R,maxes

In [None]:
%%writefile_run bayeschangept.py -a

def findthreshold(data):
    mu = np.mean(data)
    sigma = np.mean(data)
    inv_pt = []
    for i in range(len(data)-1):
        if((data[i+1]>mu and data[i]<=mu) or (data[i+1]<mu and data[i]>=mu)):
            inv_pt.append(i)

    return inv_pt    

In [None]:
%%writefile_run bayeschangept.py -a

def plotonchangepoints(data,R,maxes,nrow=None,ncol=0,Nw=10,pthres=0.5):
    fig,(ax1,ax2,ax3) = plt.subplots(3,figsize=[18, 16])
    ltext = 'Column : '+str(ncol+1)+' data with threshold probab = '+ str(pthres)
    
    ax1.set_title(data.columns[ncol])
    
    cp_probs = np.array(R[Nw,Nw:-1][1:-2])
    
    inversion_pts = findthreshold(cp_probs)
    
    max_indexes = []
    for i in range(len(inversion_pts)-1):
        max_indexes.append(inversion_pts[i]+np.argmax(cp_probs[inversion_pts[i]:inversion_pts[i+1]+1]))
    
    cp_mapped_probs = pd.Series(cp_probs[max_indexes],index=max_indexes)
    anom_indexes = cp_mapped_probs.index[(np.where(cp_mapped_probs.values>pthres)[0])]

    if(nrow==None):
        ax1.plot(data.values[:,ncol],label=ltext)
    else:
        ax1.plot(data.values[:nrow,ncol],label=ltext)
        
    ax1.legend()

    
    for a in anom_indexes:
        if(a):
            ax1.axvline(x=a,color='r')
        
    sparsity = 5  # only plot every fifth data for faster display
    ax2.pcolor(np.array(range(0, len(R[:,0]), sparsity)), 
              np.array(range(0, len(R[:,0]), sparsity)), 
              -np.log(R[0:-1:sparsity, 0:-1:sparsity]), 
              cmap=cm.Greys, vmin=0, vmax=30,label="Distribution of Run length")
    ax2.legend()
    
    ax3.plot(cp_probs)
   
    ax3.set_title('Change points with Probability')

    plt.show()
    print("\n No of Anomalies detected = %g"%(len(anom_indexes)))
    
    return anom_indexes

In [None]:
%%writefile_run bayeschangept.py -a

def analyse_detectchangepts(src_type,filepath,date_col,time_format='%Y-%m',ncol=0,
                            weekly_data=False,pthres=0.5,mean_runlen = 100,Nw=10):
    
#     orig_data = pd.DataFrame()
#     if(src_type==0 or src_type=='csv'):
#         orig_data = analyse_data(filepath,date_col=date_col,time_format=time_format,isweek=weekly_data)
#     else:
#         orig_data = readata_from_opentsb(read_args.reader_kwargs)
    
    
    data = normalise_standardise(orig_data)
    print("Shape of the dataset : ")
    print(data.shape)
    print("Overview of first five rows of dataset : ")
    print(data.head())
    
    ax = data[data.columns[ncol]].plot.hist(figsize=(9,7),bins=100)
    ax.set_title("Histogram of Dataset")
    
    R,maxes = findonchangepoint(data[data.columns[ncol]].values,mean_runlength=mean_runlen)
    anom_indexes = plotonchangepoints(data,R,maxes,Nw=Nw,pthres=pthres,ncol=ncol)
    sql_query_args = write_args.writer_kwargs
    if(anom_indexes.shape[0]!=0):
        is_csv = src_type==0 or src_type=='csv'
        processanomalies(orig_data,anom_indexes,sql_query_args,is_csv)
    return orig_data,anom_indexes

In [None]:
%%writefile_run bayeschangept.py -a

def add_args(parser,arg_name,arg_help,arg_optional=False,arg_def=None):
    if(arg_optional):
        arg_name = '--'+arg_name
    parser.add_argument(arg_name,help=arg_help,type=type(arg_def),default=arg_def)

In [None]:
%%writefile_run bayeschangept.py -a

arg_name_list = ['src_type','filepath','date_column_name','time_format','weekly_data','Threshold_probability',
                'samples_to_wait','expected_run_length','data_column']

arg_help_list = [
                'Enter 0 to read from csv and 1 to read from opentsdb',
                'Enter the full path of input file with extension .csv',
                'Enter the name of the date column',
                'Enter the time format of the date column',
                'Enter whether data sampled weekly or not. Give True or False',
                'Enter the threshold probability',
                'Enter no of samples to wait to detect a change point',
                'Enter the expected gap or no of samples between changepoints',
                'Enter the index of column which you want to analyse with starting index excluding date column']

arg_opt_list = [False,True,True,True,True,True,True,True,True]

arg_def_list = [1,'','Month','%Y-%m',False,0.5,10,100.0,0]

In [None]:
%%writefile_run bayeschangept.py -a

args_list = {
    'arg_name':arg_name_list,
    'arg_help':arg_help_list,
    'arg_optional':arg_opt_list,
    'arg_def':arg_def_list,
}

In [None]:
%%writefile_run bayeschangept.py -a

arg = {
    'arg_name':'',
    'arg_help':'',
    'arg_optional':False,
    'arg_def':'',
}

In [None]:
%%writefile_run bayeschangept.py -a

parser = argparse.ArgumentParser()

for i in range(len(args_list['arg_name'])):
    arg['arg_name'] = args_list['arg_name'][i]
    arg['arg_help'] = args_list['arg_help'][i]
    arg['arg_optional'] = args_list['arg_optional'][i]
    arg['arg_def'] = args_list['arg_def'][i]
    add_args(parser,**arg)
args = parser.parse_args()

In [None]:
%%writefile_run bayeschangept.py -a

if(args.src_type!=None):
    src_type = (args.src_type)
if(args.filepath!=None):
    filepath = (args.filepath)
if(args.outputfile_dir!=None):
    outputfile_dir = (args.outputfile_dir)
if(args.Threshold_probability!=None):
    pthres = (args.Threshold_probability)
if(args.date_column_name!=None):
    date_col = (args.date_column_name)
if(args.time_format!=None):
    time_format = (args.time_format)
if(args.weekly_data!=None):
    weekly_data = (args.weekly_data==True)
if(args.samples_to_wait!=None):
    Nw = int(args.samples_to_wait) 
if(args.expected_run_length!=None):
    mean_runlen = (args.expected_run_length)
if(args.data_column!=None):
    ncol = int(args.data_column)

In [None]:
src_type      = 'csv'
filepath      = ''
date_col      = ''
pthres        = 0.01
time_format   = '%Y-%m'
weekly_data   = False
mean_runlen   = 100
Nw            = 10

In [None]:
%%writefile_run bayeschangept.py -a

input_kwargs = {
    'src_type'       :src_type,
    'filepath'      :filepath,
    'date_col'      :date_col,
    'pthres'        :pthres,
    'time_format'   :time_format,
    'weekly_data'   :weekly_data,
    'mean_runlen'   :mean_runlen,
    'Nw'            :Nw
}

In [None]:
%%writefile_run bayeschangept.py -a

input_kwargs['src_type'] = 1
original_data,anom_indexes = analyse_detectchangepts(**input_kwargs)

In [None]:
original_data,anom_indexes = analyse_detectchangepts(src_type=0,filepath="./alcohol-demand-log-spirits-consu.csv",date_col='Month',
                                                     pthres=0.5)

In [None]:
temp_data,anom_indexes = analyse_detectchangepts(src_type=0,filepath="./average-annual-temperature-centr.csv",date_col='Year'
                                         ,pthres=0.4,time_format='%Y')

plt.plot(temp_data[0:200])
plt.show()

In [None]:
fisher_tempdata = analyse_detectchangepts(src_type=0,filepath="./mean-daily-temperature-fisher-ri.csv",
                                         date_col='Date',time_format='%d-%m-%Y',pthres=0.7)

In [None]:
female_unemp_data = analyse_detectchangepts(src_type=0,filepath="./monthly-us-female-20-years-and-o.csv",
                                         date_col='Month',time_format='%Y-%m',pthres=0.4)

In [None]:
weekly_close_data = analyse_detectchangepts(src_type=0,filepath="./weekly-closings-of-the-dowjones-.csv",
                                         date_col='Week',time_format='%Y-W%W-%w',weekly_data=True,pthres=0.3)

In [None]:
weekly_close_data = analyse_detectchangepts(src_type=0,filepath="methane-input-into-gas-furnace-c.csv",ncol=0,
                                         date_col='Time',pthres=0.4)

In [None]:
weekly_close_data2 = analyse_detectchangepts(src_type=0,filepath="methane-input-into-gas-furnace-c.csv",ncol=1,
                                         date_col='Time',pthres=0.4)

In [None]:
mean_tempdata = analyse_detectchangepts(src_type=0,filepath="./mean-monthly-temperature-1907-19.csv",
                                        date_col='Month',time_format='%Y-%m')

# Conclusion:
* Hence we observe that **Bayesian Changepoint Detection** works well only on level shifts or variational shift datasets over outlier or surge,sag datasets