In [None]:





#%%


# def read_secret(secret_name):
#     secret_path = os.getenv(secret_name)
#     try:
#         with open(secret_path, 'r') as file:
#             return file.read().strip()
#     except Exception as e:
#         print(f"Error reading {secret_name}: {e}")
#         return None

# INFLUXDB_USERNAME = read_secret('DOCKER_INFLUXDB_INIT_USERNAME_FILE')
# INFLUXDB_PASSWORD = read_secret('DOCKER_INFLUXDB_INIT_PASSWORD_FILE')
# INFLUXDB_TOKEN = read_secret('DOCKER_INFLUXDB_INIT_ADMIN_TOKEN_FILE')

# INFLUXDB_ORG = os.getenv('DOCKER_INFLUXDB_INIT_ORG')
# INFLUXDB_BUCKET = os.getenv('DOCKER_INFLUXDB_INIT_BUCKET')
# INFLUXDB_URL = os.getenv('INFLUXDB_URL', 'http://influxdb:8086') 
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS

from influxdb_client.client.query_api import QueryOptions
import pandas as pd
import numpy as np
import os
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM, Dropout, RepeatVector, TimeDistributed
from datetime import datetime, timedelta, timezone
import plotly.graph_objects as go

import time
import pytz
INFLUXDB_USERNAME="admin"
INFLUXDB_PASSWORD="password"
INFLUXDB_TOKEN="G3UtSut5Kv-RuT32yh27StdDCrl4fu3uzxPLzdias8vFsNzyzgfw5kIX9iGvtLctAXpZjFItOUwA65YWtk_5fg=="
INFLUXDB_ORG="example_org"
INFLUXDB_BUCKET="example_bucket"
INFLUXDB_URL="http://localhost:8086"





In [27]:


class AnomalyDetector():
    def __init__(self,url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG,bucket=INFLUXDB_BUCKET):
        self.url = url
        self.token=token
        self.org = org
        self.bucket=bucket
        self.client = InfluxDBClient(url=self.url, token=self.token, org=self.org)
        self.query_api = self.client.query_api(query_options=QueryOptions(profilers=[]))
        
        self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
        self.training_df = pd.DataFrame()
        self.bucket=bucket
        self.last_pull_time = datetime.now()
        
        self.model = None
        self.scaler = None
        self.time_steps=30
        self.current_year=2017
    def run(self):
        
        while True:
            current_year = self.current_year
            start_time = f"{current_year}-05-10T00:00:00Z"
            stop_time = f"{current_year + 1}-05-10T00:00:00Z"

            flux_query = f'''
            from(bucket: "{INFLUXDB_BUCKET}")
                |> range(start: -1y)  
                |> filter(fn: (r) => r._measurement == "heart_rate")
                |> filter(fn: (r) => r._field == "noisy" or r._field == "original_time")
                |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
                |> filter(fn: (r) => time(v: r.original_time)>= time(v: "{start_time}") and time(v: r.original_time) < time(v: "{stop_time}"))
                |> keep(columns: ["noisy", "original_time"])
                |> sort(columns: ["original_time"])
            '''
            new_df = self.query_api.query_data_frame(query=flux_query)
            if len(new_df)<363*4:
                time.sleep(5)
                continue
            
            self.current_year+=1
            
            new_df.set_index('original_time',inplace=True)
            new_df.index = pd.to_datetime(new_df.index)
      
            anomaly_flagged_df = self.detect_anomalies(new_df)
            self.push_data_to_influxdb(anomaly_flagged_df)

    def push_data_to_influxdb(self,df):
        
        points = []
        for idx, row in df.iterrows():
            points.append(
                Point("anomaly_data")
                .tag("unit", "bpm")
                .field("noisy", row['noisy'])
                .field("anomaly", row['anomaly'])
                .field("original_time", idx.isoformat())  # Storing original time as a field
                .time(datetime.utcnow(),WritePrecision.NS)  # Use current UTC time for timestam
            )
        self.write_api.write(bucket=self.bucket, record=points)
            
    def has_one_year(self, df):
        time_span = df.index.max() - df.index.min()
        return time_span >= pd.Timedelta(days=365)
    

    def create_dataset(self, X, y, time_steps=1):
            Xs, ys = [], []
            for i in range(len(X) - time_steps):
                v = X.iloc[i:(i + time_steps)].values
                Xs.append(v)        
                ys.append(y.iloc[i + time_steps])
            return np.array(Xs), np.array(ys)
        
    def train_model(self):
        train_data = self.training_df

        scaler = StandardScaler()
        scaler = scaler.fit(train_data[['noisy']])
        self.scaler = scaler
        train_data['noisy_scl'] = scaler.transform(train_data[['noisy']])
        train_data.head()

        

        time_steps = self.time_steps
        X_train, y_train = self.create_dataset(train_data[['noisy_scl']], train_data.noisy_scl, time_steps)

        num_features=1
        model = Sequential([
            LSTM(128, input_shape=(time_steps, num_features)),
            Dropout(0.2),
            RepeatVector(time_steps),
            LSTM(128, return_sequences=True),
            Dropout(0.2),
            TimeDistributed(Dense(num_features))                 
        ])

        model.compile(loss='mae', optimizer='adam')
        history = model.fit(
            X_train, y_train,
            epochs=30,
            batch_size=32,
            validation_split=0.1,
            shuffle=False,
        )
        
        self.model = model
        
        fig = go.Figure()
        fig.add_trace(go.Scatter(x=train_data.index, y=train_data.noisy,
                            mode='lines',
                            name='rate'))
        fig.update_layout(showlegend=True)
        fig.show()         
            
    def detect_anomalies(self, new_df):
        test_data = new_df.copy()
        model = self.model
        time_steps = self.time_steps
        test_data['noisy_scl'] = self.scaler.transform(test_data[['noisy']])
        X_test, y_test = self.create_dataset(test_data[['noisy_scl']], test_data.noisy_scl, time_steps)
        X_test_pred = model.predict(X_test)
        test_mae_loss = np.mean(np.abs(X_test_pred - X_test), axis=1)

        THRESHOLD = 0.48

        test_score_df = pd.DataFrame(test_data[time_steps:])
        test_score_df['loss'] = test_mae_loss
        test_score_df['threshold'] = THRESHOLD
        test_score_df['anomaly'] = test_score_df.loss > test_score_df.threshold
        test_score_df['noisy'] = test_data[time_steps:].noisy
        
        test_score_df['date'] = test_score_df.index.date
        anomalous_dates = test_score_df[test_score_df['anomaly']].date.unique()
        
        
        # test_score_df['anomaly'] = test_score_df['date'].isin(anomalous_dates)
        
        extended_anomalous_dates = pd.Series()
        # Create a list of dates that include 1 day before and 1 day after the anomalous dates
        extended_anomalous_dates = extended_anomalous_dates.union(anomalous_dates - pd.Timedelta(days=1))
        extended_anomalous_dates = extended_anomalous_dates.union(anomalous_dates + pd.Timedelta(days=1))

        # Set anomalies based on the extended anomalous dates
        test_score_df['anomaly'] = test_score_df['date'].isin(extended_anomalous_dates)
        test_score_df.drop(columns='date', inplace=True) 

        # print('Anomaly detection completed')
        # print(test_score_df[test_score_df['anomaly']])
        # # Plotting
        # fig = go.Figure()
        # fig.add_trace(go.Scatter(x=test_data[time_steps:].index, y=test_score_df.loss,
        #                         mode='lines',
        #                         name='Test Loss'))
        # fig.add_trace(go.Scatter(x=test_data[time_steps:].index, y=test_score_df.threshold,
        #                         mode='lines',
        #                         name='Threshold'))
        # fig.update_layout(showlegend=True)
        # fig.show()

        # fig = go.Figure()
        # fig.add_trace(go.Scatter(x=test_data.index, y=test_data.noisy,
        #                         mode='lines',
        #                         name='Rate'))
        # fig.update_layout(showlegend=True)
        # fig.show()

        return test_score_df

    def get_dataframe_starting(self, starting_time):
        print("Pulling new data starting: ",starting_time)
        query = f'''
        from(bucket: "{INFLUXDB_BUCKET}")
                |> range(start: {starting_time})
                |> filter(fn: (r) => r._measurement == "heart_rate")
                |> filter(fn: (r) => r._field == "noisy" or r._field == "original_time")
                |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
                |> keep(columns: ["noisy", "original_time"])
        '''
        print(query)
        df = self.query_api.query_data_frame(query=query)
        if not df.empty:
            self.last_pull_time = (datetime.utcnow().replace(tzinfo=pytz.utc) -timedelta(seconds=60)).isoformat()
            df.drop(columns=['table','result'],inplace=True)
            df['original_time'] = pd.to_datetime(df['original_time'])
        return  df
    
    def get_dataframe_initial(self):
        checkpoint_df = self.get_dataframe_starting('-24h')
        earliest_time = checkpoint_df['original_time'].min()
        one_year_after_earliest = earliest_time + pd.DateOffset(years=1)
        training_df = checkpoint_df[checkpoint_df['original_time'] <= one_year_after_earliest]
        training_df['anomaly'] = False
        training_df.set_index('original_time',inplace=True)
        training_df.index = pd.to_datetime(training_df.index)

        print('Length of training dataframe',len(training_df))
        while len(training_df)<363*4:
            checkpoint_df = self.get_dataframe_starting('-24h')
            earliest_time = checkpoint_df['original_time'].min()
            one_year_after_earliest = earliest_time + pd.DateOffset(years=1)
            training_df = checkpoint_df[checkpoint_df['original_time'] <= one_year_after_earliest]
            training_df['anomaly'] = False
            training_df.set_index('original_time',inplace=True)
            training_df.index = pd.to_datetime(training_df.index)
            time.sleep(3)

        self.training_df = training_df


try:
    ad = AnomalyDetector(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG, bucket=INFLUXDB_BUCKET)
    ad.get_dataframe_initial()
    ad.train_model()
    #ad.run()
except KeyboardInterrupt:
    print("Interrupted by user, closing client.")
    ad.close()

Pulling new data starting:  -24h

        from(bucket: "example_bucket")
                |> range(start: -24h)
                |> filter(fn: (r) => r._measurement == "heart_rate")
                |> filter(fn: (r) => r._field == "noisy" or r._field == "original_time")
                |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
                |> keep(columns: ["noisy", "original_time"])
        
Length of training dataframe 1461
Epoch 1/30




A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy


Do not pass an `input_shape`/`input_dim` argument to a layer. When using Sequential models, prefer using an `Input(shape)` object as the first layer in the model instead.



[1m41/41[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 72ms/step - loss: 0.3838 - val_loss: 0.1622
Epoch 2/30
[1m41/41[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 61ms/step - loss: 0.1419 - val_loss: 0.1958
Epoch 3/30
[1m41/41[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 64ms/step - loss: 0.1529 - val_loss: 0.1844
Epoch 4/30
[1m41/41[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 64ms/step - loss: 0.1590 - val_loss: 0.3146
Epoch 5/30
[1m41/41[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 70ms/step - loss: 0.2263 - val_loss: 0.1604
Epoch 6/30
[1m41/41[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 69ms/step - loss: 0.1279 - val_loss: 0.1018
Epoch 7/30
[1m41/41[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 63ms/step - loss: 0.1175 - val_loss: 0.1101
Epoch 8/30
[1m41/41[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 59ms/step - loss: 0.1150 - val_loss: 0.0946
Epoch 9/30
[1m41/41[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m

In [58]:
current_year = 2021
start_time = f"{current_year}-05-10T00:00:00Z"
stop_time = f"{current_year + 1}-05-10T00:00:00Z"

client = InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG)
        self.query_api = self.client.query_api(query_options=QueryOptions(profilers=[]))
        
flux_query = f'''
from(bucket: "{INFLUXDB_BUCKET}")
    |> range(start: -1y)  
    |> filter(fn: (r) => r._measurement == "anomaly_data")
    |> filter(fn: (r) => r._field == "noisy" or r._field == "original_time" or r._field=="anomaly" or r._field=="loss" or r._field=="threshold")
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
    |> filter(fn: (r) => time(v: r.original_time)>= time(v: "{start_time}") and time(v: r.original_time) < time(v: "{stop_time}"))
    |> keep(columns: ["noisy", "original_time","anomaly","loss","threshold"])
    |> sort(columns: ["original_time"])
'''
print(flux_query)
df = ad.query_api.query_data_frame(query=flux_query)
df.set_index('original_time',inplace=True)
df.index = pd.to_datetime(df.index)
df.head()


from(bucket: "example_bucket")
    |> range(start: -1y)  
    |> filter(fn: (r) => r._measurement == "anomaly_data")
    |> filter(fn: (r) => r._field == "noisy" or r._field == "original_time" or r._field=="anomaly" or r._field=="loss" or r._field=="threshold")
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
    |> filter(fn: (r) => time(v: r.original_time)>= time(v: "2021-05-10T00:00:00Z") and time(v: r.original_time) < time(v: "2022-05-10T00:00:00Z"))
    |> keep(columns: ["noisy", "original_time","anomaly","loss","threshold"])
    |> sort(columns: ["original_time"])



Unnamed: 0_level_0,result,table,anomaly,noisy
original_time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2021-05-17 12:00:00+00:00,_result,0,False,66.291958
2021-05-17 12:00:00+00:00,_result,0,False,66.291958
2021-05-17 18:00:00+00:00,_result,0,False,65.953811
2021-05-17 18:00:00+00:00,_result,0,False,65.953811
2021-05-18 00:00:00+00:00,_result,0,False,67.272022


In [45]:
test_data = df.copy()
model = ad.model
time_steps = ad.time_steps
test_data['noisy_scl'] = ad.scaler.transform(test_data[['noisy']])
X_test, y_test = ad.create_dataset(test_data[['noisy_scl']], test_data.noisy_scl, time_steps)
X_test_pred = model.predict(X_test)
test_mae_loss = np.mean(np.abs(X_test_pred - X_test), axis=1)

THRESHOLD = 0.48

test_score_df = pd.DataFrame(test_data[time_steps:])
test_score_df['loss'] = test_mae_loss
test_score_df['threshold'] = THRESHOLD
test_score_df['anomaly'] = test_score_df.loss > test_score_df.threshold
test_score_df['noisy'] = test_data[time_steps:].noisy

test_score_df['date'] = test_score_df.index.date
# Ensure the 'date' column in test_score_df is in date-only format (no time component)
test_score_df['date'] = pd.to_datetime(test_score_df['date']).dt.date
anomalous_dates = test_score_df[test_score_df['anomaly']]['date'].unique()

# Convert anomalous_dates to a Series for further operations
anomalous_dates = pd.Series(anomalous_dates)

# Extend anomalous dates by including 1 day before and 1 day after
extended_anomalous_dates = pd.concat([anomalous_dates,
                                      anomalous_dates - pd.Timedelta(days=1),
                                      anomalous_dates + pd.Timedelta(days=1)])

# Ensure all dates are unique after concatenation
extended_anomalous_dates = extended_anomalous_dates.unique()

# Set anomalies based on the extended anomalous dates
test_score_df['anomaly'] = test_score_df['date'].isin(extended_anomalous_dates)
print(test_score_df['date'][90:])

test_score_df.drop(columns='date', inplace=True) 
print(extended_anomalous_dates)

[1m45/45[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 29ms/step
original_time
2021-06-09 00:00:00+00:00    2021-06-09
2021-06-09 06:00:00+00:00    2021-06-09
2021-06-09 12:00:00+00:00    2021-06-09
2021-06-09 18:00:00+00:00    2021-06-09
2021-06-10 00:00:00+00:00    2021-06-10
                                ...    
2022-05-08 18:00:00+00:00    2022-05-08
2022-05-09 00:00:00+00:00    2022-05-09
2022-05-09 06:00:00+00:00    2022-05-09
2022-05-09 12:00:00+00:00    2022-05-09
2022-05-09 18:00:00+00:00    2022-05-09
Name: date, Length: 1340, dtype: object
[datetime.date(2021, 5, 23) datetime.date(2021, 5, 24)
 datetime.date(2021, 6, 3) datetime.date(2021, 7, 9)
 datetime.date(2021, 8, 18) datetime.date(2021, 8, 19)
 datetime.date(2021, 10, 13) datetime.date(2021, 10, 18)
 datetime.date(2021, 10, 29) datetime.date(2021, 10, 30)
 datetime.date(2021, 10, 31) datetime.date(2021, 12, 7)
 datetime.date(2022, 1, 8) datetime.date(2022, 1, 9)
 datetime.date(2022, 2, 13) datetime.date(2022

In [None]:
df.head()

In [56]:
import plotly.graph_objects as go
import pandas as pd
anom_df =df
# Assuming anom_df is a DataFrame
anom_df['date'] = anom_df.index.date
anomalous_dates = anom_df[anom_df['anomaly']].date.unique()

# Marking anomalies
anom_df['anomaly'] = anom_df['date'].isin(anomalous_dates)
anom_df.drop(columns='date', inplace=True)

# Create the main figure
fig = go.Figure()

# Plot the main time series (noisy data)
fig.add_trace(go.Scatter(
    x=anom_df.index, 
    y=anom_df['noisy'], 
    mode='lines', 
    name='Data', 
    line=dict(color='blue')
))

# Highlight anomalies as red dots
anomalies = anom_df[anom_df['anomaly'] == True]
fig.add_trace(go.Scatter(
    x=anomalies.index, 
    y=anomalies['noisy'], 
    mode='markers', 
    name='Anomaly', 
    marker=dict(color='red', size=8)
))

# Formatting the x-axis to show only months
fig.update_xaxes(
    dtick="M1", 
    tickformat="%Y-%m",
    tickangle=45
)

# Set titles and labels
fig.update_layout(
    title='Time Series Plot with Anomalies',
    xaxis_title='Time',
    yaxis_title='Value',
    legend_title='Legend',
    xaxis_rangeslider_visible=False,
    height=600,
    width=1000
)

# Show the plot
fig.show()


In [57]:
offset = -60
time_steps = ad.time_steps
test_data = anom_df[240+offset:360+offset]

fig = go.Figure()
fig.add_trace(go.Scatter(x=test_data[time_steps:].index, y=test_data.loss,
                    mode='lines',
                    name='Test Loss'))
fig.add_trace(go.Scatter(x=test_data[time_steps:].index, y=test_data.threshold,
                    mode='lines',
                    name='Threshold'))
fig.update_layout(showlegend=True)
fig.show()

fig = go.Figure()
fig.add_trace(go.Scatter(x=test_data.index, y=test_data.noisy,
                    mode='lines',
                    name='rate'))
fig.update_layout(showlegend=True)
fig.show()

AttributeError: 'DataFrame' object has no attribute 'loss'

Unnamed: 0_level_0,result,table,noisy,noisy_scl,loss,threshold,anomaly
original_time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2021-06-01 12:00:00+00:00,_result,0,64.002316,0.854806,0.074809,0.48,False
2021-06-01 18:00:00+00:00,_result,0,63.850586,0.847382,0.095065,0.48,False
2021-06-02 00:00:00+00:00,_result,0,64.511851,0.879735,0.113819,0.48,False
2021-06-02 06:00:00+00:00,_result,0,64.718832,0.889862,0.119458,0.48,False
2021-06-02 12:00:00+00:00,_result,0,65.028881,0.905031,0.121543,0.48,False
2021-06-02 18:00:00+00:00,_result,0,32.384485,-0.692112,0.120762,0.48,False
2021-06-03 00:00:00+00:00,_result,0,64.15796,0.862421,0.597647,0.48,True
2021-06-03 06:00:00+00:00,_result,0,64.471576,0.877764,0.463907,0.48,True
2021-06-03 12:00:00+00:00,_result,0,64.580148,0.883076,0.376701,0.48,True
2021-06-03 18:00:00+00:00,_result,0,64.592103,0.883661,0.316824,0.48,True


In [None]:
 current_year = self.current_year
            start_time = f"{current_year}-05-10T00:00:00Z"
            stop_time = f"{current_year + 1}-05-10T00:00:00Z"

            flux_query = f'''
            from(bucket: "{INFLUXDB_BUCKET}")
                |> range(start: -1y)  
                |> filter(fn: (r) => r._measurement == "heart_rate")
                |> filter(fn: (r) => r._field == "noisy" or r._field == "original_time")
                |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
                |> filter(fn: (r) => time(v: r.original_time)>= time(v: "{start_time}") and time(v: r.original_time) < time(v: "{stop_time}"))
                |> keep(columns: ["noisy", "original_time"])
                |> sort(columns: ["original_time"])
            '''
            new_df = self.query_api.query_data_frame(query=flux_query)
            if len(new_df)<363*4:
                time.sleep(5)
                continue
            
            self.current_year+=1
            
            new_df.set_index('original_time',inplace=True)
            new_df.index = pd.to_datetime(new_df.index)
      
            anomaly_flagged_df = self.detect_anomalies(new_df)
            self.push_data_to_influxdb(anomaly_flagged_df)