# AD for periodic time series
## Basic statistics

Import libraries 

In [None]:
from pandas import read_csv
import pandas as pd
import numpy as np
from matplotlib import pyplot as plt
import os, glob
from datetime import timedelta, datetime
from scipy.fft import fft, ifft, fftfreq
import scipy.stats

pd.set_option('display.max_rows', 500)

Parameters definition

In [None]:
# Name of the dataset
dataset_name = "badec"

# Duration of the training
train_length = "2w"

In [None]:
# Path with the .csv train set files and path for the .csv validation file
csv_filenames = "/notebooks/EIA_2022/data/" + dataset_name + "/train/clean_data/" + train_length + "/*.csv"
val_filename = "/notebooks/EIA_2022/data/" + dataset_name + "/val/val_" + dataset_name + ".csv"
test_filename = "/notebooks/EIA_2022/data/" + dataset_name + "/test/test_" + dataset_name + ".csv"

In [None]:
val_set = read_csv(val_filename)
test_set = read_csv(test_filename)

anomalies = read_csv("/notebooks/EIA_2022/data/anomalies_" + dataset_name + ".csv")

In [None]:
# The train set files need to be merged. Taking an integer number of cycles is necessary to have a periodic
# or quasi-periodic anomaly free train set time series.

def keep_cycles(df, threshold = 50): # put the train set together to have an integer number of cycles
    '''
    Merge the train set files by taking an integer number of cycles.
    A cycle starts when there is a variation greater than threshold with respect to the previous point.
    '''
    df["variation"] = df["device_consumption"].diff()
    mask = df["variation"] > threshold # see where there is the beginning of a new cycle
    beginnings = df[mask].index.values
    if len(beginnings) > 0:
        first_beginning = beginnings[0] # index at which the first cycle begins
        last_beginning = beginnings[-1] - 1 # index at which the last cycle ends
        
        if first_beginning < last_beginning:
            return df[first_beginning:last_beginning]

In [None]:
dfs = [None]*len(glob.glob(csv_filenames)) # Create an array of dataframes as long as the number of files

# For each train set file, take an integer number of cycles and use the result as an element of the array
for filename in glob.glob(csv_filenames):
    with open(os.path.join(os.getcwd(), filename), 'r') as f:    
        file_number = int(filename.split("clean_" + dataset_name + "_")[1].split(".csv")[0])
        df = read_csv(f)
        df = keep_cycles(df)
        dfs[file_number] = df
        
# Build the train set by concatenating the parts of the smaller train sets
train_set = pd.concat(dfs)  

# Define a arbitrary starting datetime
start_datetime = datetime(2020,1,1,0,0)

# Define an arbitrary end datetime by adding the train set minutes to the start datetime
# Assumption: the sampling time in the dataset is 1 minute
end_datetime = start_datetime + timedelta(minutes = len(train_set)-1)

# Define a date_range with the given dates and times
timerange = pd.date_range(start=start_datetime, end=end_datetime, freq="1min")

# Use the defined time range as a column in Pandas
train_set["ctime"] = timerange

# Reindex the dataframe
train_set.reset_index(inplace=True)

# Visualize the first 100 rows (samples) of the time series
train_set.head(100)

In [None]:
def get_sampling_time_seconds(timestamps, timestamp_format):
    '''
    timestamps is an array of timestamps
    timestamp_format is the format of the timestamp
    '''
    
    # Assumption: the timestamps are equally spaced (as defined above)
    # Compute the duration of an interval
    time_0 = timestamps[0]
    time_1 = timestamps[1]
    time_interval = time_1 - time_0
    
    # Convert the interval duration in seconds
    time_interval_seconds = pd.Timedelta(time_interval).total_seconds()
    
    return time_interval_seconds

In [None]:
def plot_fft(xf, yf):
    # Save the plot of the FFT of the test set
    fig = plt.figure(figsize=(8, 6), dpi=80)
    plt.plot(xf, np.abs(yf)) # plot only the absolute value of yf
    
    # Use a logarithmic scale
    plt.yscale('log')
    plt.xscale('log')
    
    # Set plot description
    plt.title('Power spectrum')
    plt.xlabel('Frequency')
    plt.ylabel('Power')
    
    # Save as .pdf to export without image data loss
    plt.savefig("fft.pdf", dpi=80)  

In [None]:
def get_main_period(xf, yf):
    '''
    Given the Fourier transform power spectrum, find the highest peak
    '''
    max_idx = np.argmax(np.abs(yf)) # get the maximum power in the FFT
    p = 1/xf[max_idx] # get the period (1/frequency) associated with the maximum power
    return p

In [None]:
def plot_timeseries(time_series, timestamps, timestamp_format, first_N_timestamps, period_samples):    
    '''
    time_series is an array containing the time series values
    timestamps is an array containing the timestamp values
    timestamp_format is the format of the datetime
    first_N_timestamps is the number of timestamps assumed to be part of an anomaly-free time series
    period_samples is the number of samples in each period
    '''
    timestamp_seconds = get_sampling_time_seconds(timestamps.values, timestamp_format)
    
    # Save the plot of the first cycles
    fig, ax = plt.subplots(figsize=(8, 6), dpi=80)
    ax.plot(np.arange(int(first_N_timestamps))*timestamp_seconds, time_series[:int(first_N_timestamps)])

    # Draw a vertical line for each period
    for ps in np.arange(period_samples*timestamp_seconds, first_N_timestamps*timestamp_seconds, period_samples*timestamp_seconds):
        ax.vlines(ps, -50, 300, color='r')

    ax.set_title('Power variation in a sequence of cycles')
    ax.set_xlabel('Seconds')
    ax.set_label('Power')
    
    plt.savefig("timeseries.pdf", dpi=80)

In [None]:
def get_fft_periodicity(time_series, timestamps, timestamp_format):
    ''' 
    time_series contains the values of the time series and is a pandas Series
    timestamps contains the values of the time stamp and is a pandas Series
    timestamp_format is the format of the datetime
    '''
    
    timestamps = timestamps.values # recover the values of timestamps
    time_series = time_series - time_series.mean() # subtract the mean from the time series values (used to perform FFT)

    # Compute the FFT of the test set
    yf = fft(time_series.values)
    xf = fftfreq(len(time_series), 1)

    # The Fourier transform is assumed to be symmetric due to the nature of the signal
    mask = [idx for idx, val in enumerate(xf) if val >= 0] # create a mask to take only the positive frequencies
    yf = yf[mask] # consider only the positive frequencies values

    time_interval_seconds = get_sampling_time_seconds(timestamps, timestamp_format)
    xf = xf[mask]/time_interval_seconds # convert to Hz (1/sample -> Hz)

    # Estimate the main period from the highest peak in the Fourier transform
    p = get_main_period(xf, yf)
    p_samples = p/time_interval_seconds # period expressed in number of samples

    plot_fft(xf, yf) # plots and saves the FFT

    # Print information about the minutes and the samples of each period
    # here, they must be the same for the presented data sets
    print("The period is %.4f minutes, or %.4f samples" %(p/60, p_samples))

    return p_samples

In [None]:
period = get_fft_periodicity(train_set["device_consumption"], train_set["ctime"], "%Y-%m-%D %H:%M:%s")
period

In [None]:
def get_Rstd(window1, window2):
    # receives two uniformly sampled arrays
    # returns the value of R_std
    # Note: the value of R_std can be infinite, for example when a window has all the values at zero
    std_current = np.std(window2)
    std_previous = np.std(window1)
    
    # print("stdc", std_current) # current std
    # print("stdp", std_previous) # previous std

    return np.abs(std_current - std_previous)/std_previous

In [None]:
def get_max_rstd(timeseries, period):
    '''
    timeseries is an array containing the values of the time series
    period is the period associated with the time series
    '''
    
    max_Rstd = 0
    
    # receives a uniformly sampled array and obtains the number of periods in the time series
    number_of_periods = int(np.ceil(len(timeseries)/period))
    
    # For each couple of subsequent windows, compute the R_std
    # Obtain the maximum R_std value in the considered part of the time series
    for idx in range(number_of_periods-1):
        array_idx = int(idx*period)
        next_array_idx = int((idx+1)*period)
        end_array_idx = next_array_idx + next_array_idx - array_idx
        
        window1 = timeseries[array_idx:next_array_idx]
        window2 = timeseries[next_array_idx:end_array_idx]
        
        if len(window1) == len(window2):
            Rstd = get_Rstd(window1, window2)
            max_Rstd = max(Rstd, max_Rstd)
        
    return max_Rstd

In [None]:
max_rstd_train = get_max_rstd(train_set["device_consumption"].values, period)
max_rstd_train

In [None]:
# Set the parameters for grid search
rstds = np.array(np.arange(1, 10, 0.1))*max_rstd_train
f1s = [None]*len(rstds)

In [None]:
# Validation times and consumptions
val_ctimes = np.array([datetime.strptime(a, '%Y-%m-%d %H:%M:%S') for a in val_set["ctime"].values])
val_consumptions = val_set["device_consumption"].values

# Find the GT anomaly intervals
anomalies_starts = np.array([datetime.strptime(a, '%Y-%m-%d %H:%M:%S') for a in anomalies["start_date"].values])
anomalies_ends = np.array([datetime.strptime(a, '%Y-%m-%d %H:%M:%S') for a in anomalies["end_date"].values])

starts = [(ast - val_ctimes[0]).total_seconds()/60 for ast in anomalies_starts]
ends = [(ast - val_ctimes[0]).total_seconds()/60 for ast in anomalies_ends]

anomalies_time_intervals = np.array([[starts[i], ends[i]] for i, _ in enumerate(starts) if ends[i] > 0 and starts[i] < len(val_ctimes)])
anomalies_time_intervals[anomalies_time_intervals < 0] = 0
anomalies_time_intervals[anomalies_time_intervals > len(val_ctimes)] = len(val_ctimes)

anomalies_time_intervals

In [None]:
def get_anomalies(df_ts, period, ratio_threshold, pearson_threshold = 0.2):
    ''' amount_normal_samples is the number of samples assumed to be normal (i.e., without anomalies)
    pearson_threshold is the minimum value of the Pearson product-moment coefficient to have a periodic series
    ratio_threshold is the minimum value of RStd to have an anomaly
    '''
    anomalies = []
    
    time_series = df_ts.values
    
    p_samples = np.round(period)
    
    number_of_periods = int(len(time_series)/p_samples)

    for period_no in range(0, number_of_periods-1):
        int_period = int(p_samples)
        start_p = int(period_no*p_samples)
        end_first_p = start_p + int_period
        end_second_p = start_p + int_period*2
    
        Y = time_series[int(start_p):int(end_first_p)]
        Z = time_series[int(end_first_p):int(end_second_p)]

        pearson, _ = scipy.stats.pearsonr(Y, Z)


        if np.abs(pearson) > pearson_threshold:
            print("The series is not periodic in samples interval [%d, %d]" %(start_p, end_second_p))
            anomalies.append([int(start_p), int(end_second_p)])

        std_current = np.std(Z)
        std_previous = np.std(Y)

        Rstd = abs(std_current-std_previous)/std_previous

        if Rstd > ratio_threshold:
            anomalies.append([int(end_first_p), int(end_second_p)])
            print("Anomaly in samples interval [%d, %d]" %(end_first_p, end_second_p))
        
    return anomalies  

In [None]:
def interval_to_onehot(intervals, length):
    arr = np.array([0]*length)
    for interval in intervals:
        arr[int(interval[0]):int(interval[1]+1)] = 1
    return arr

In [None]:
def get_F1(GT, predicted_anomalies):
    # GT is an array of 0s and 1s
    # predicted_anomalies is an array of 0s and 1s
    
    assert len(GT) == len(predicted_anomalies)
    tp = np.sum(GT & predicted_anomalies)
    fp = np.sum(np.invert(GT) & predicted_anomalies)
    fn = np.sum(GT & np.invert(predicted_anomalies))
    
    precision = tp/(tp+fp)
    recall = tp/(tp+fn)

    f1 = 2/(1/precision + 1/recall)
    return f1

In [None]:
max_f1 = 0
best_rstd = None

for rstd in rstds:
    print("RSTD", rstd)
    # generate anomalies on validation
    anomalies = get_anomalies(val_set["device_consumption"], period, rstd)
    
    gt = interval_to_onehot(anomalies_time_intervals, len(val_ctimes))
    pred = interval_to_onehot(anomalies, len(val_ctimes))
        
    f1 = get_F1(gt, pred)
   
    if f1 > max_f1:
        max_f1 = f1
        best_rstd = rstd
        
print("MAX F1", max_f1, "BEST RSTD", best_rstd)

In [None]:
anomalies = get_anomalies(test_set["device_consumption"], period, best_rstd)
anomalies

In [None]:
# Produce an output file
confidences = interval_to_onehot(anomalies, len(test_set))
d = {'ctime': test_set["ctime"], 'confidence': confidences}
out_df = pd.DataFrame(d)
out_df.set_index("ctime", inplace=True)
out_df.to_csv("basic_statistics_" + dataset_name + "_" + train_length + ".csv")