In [1]:
# Necessary libraries 
import pandas as pd
from sklearn.ensemble import IsolationForest
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler 
import joblib 
import os
import logging
import json
import time
import matplotlib.pyplot as plt
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler  


In [5]:
# Read and loading data
df = pd.read_csv('sensor.csv', parse_dates=['timestamp'])

# Divide the data into sets for training, testing, and validation.

train_df = df[df['timestamp'] < '2018-07-01']
validation_df = df[(df['timestamp'] >= '2018-07-01') & (df['timestamp'] < '2018-08-01')]
test_df = df[df['timestamp'] >= '2018-08-01'] 

In [6]:
# Store these three portions into distinct CSV files.
train_df.to_csv('train.csv', index=False)
validation_df.to_csv('validation.csv', index=False)
test_df.to_csv('test.csv', index=False)


Considering this involves identifying anomalies in time-series data, a potential approach is to utilize an Isolation Forest. This algorithm, classified under unsupervised learning, is particularly effective for anomaly detection.

In [11]:
# Loading Train data 
df_train = pd.read_csv('train.csv', parse_dates=['timestamp'])

# Inspect data
print(df_train.isna().sum())
print(df_train.var())


Unnamed: 0             0
timestamp              0
sensor_00           6443
sensor_01             64
sensor_02              9
sensor_03              9
sensor_04              9
sensor_05              9
sensor_06           1002
sensor_07           1655
sensor_08           1365
sensor_09            793
sensor_10              9
sensor_11              9
sensor_12              9
sensor_13              9
sensor_14             16
sensor_15         131040
sensor_16             26
sensor_17             41
sensor_18             41
sensor_19             11
sensor_20             11
sensor_21             11
sensor_22             36
sensor_23             11
sensor_24             11
sensor_25             31
sensor_26             15
sensor_27             11
sensor_28             11
sensor_29             67
sensor_30            256
sensor_31             11
sensor_32             63
sensor_33             11
sensor_34             11
sensor_35             11
sensor_36             11
sensor_37             11


  print(df_train.var())


#### Data cleaning

In [12]:

# Dropping unnecessary columns

df_train.drop(['Unnamed: 0', 'timestamp', 'machine_status'], axis=1, inplace=True)

# Filling NaN values with mean
df_train.fillna(df_train.mean(), inplace=True)


df_train.drop('sensor_15', axis=1, inplace=True)


In [14]:
# check Nan values
print(df_train.isna().sum())

sensor_00    0
sensor_01    0
sensor_02    0
sensor_03    0
sensor_04    0
sensor_05    0
sensor_06    0
sensor_07    0
sensor_08    0
sensor_09    0
sensor_10    0
sensor_11    0
sensor_12    0
sensor_13    0
sensor_14    0
sensor_16    0
sensor_17    0
sensor_18    0
sensor_19    0
sensor_20    0
sensor_21    0
sensor_22    0
sensor_23    0
sensor_24    0
sensor_25    0
sensor_26    0
sensor_27    0
sensor_28    0
sensor_29    0
sensor_30    0
sensor_31    0
sensor_32    0
sensor_33    0
sensor_34    0
sensor_35    0
sensor_36    0
sensor_37    0
sensor_38    0
sensor_39    0
sensor_40    0
sensor_41    0
sensor_42    0
sensor_43    0
sensor_44    0
sensor_45    0
sensor_46    0
sensor_47    0
sensor_48    0
sensor_49    0
sensor_50    0
sensor_51    0
dtype: int64


### ML: Training the model

In [13]:
# Scale features
scaler = StandardScaler()
df_scaled = pd.DataFrame(scaler.fit_transform(df_train), columns=df_train.columns)

# Defining model
model = IsolationForest(contamination=0.05)

# Fitting model
model.fit(df_scaled)

# Utilize the trained model on the dataset
scores = model.decision_function(df_scaled)




In [15]:
# Storing model and scaler
joblib.dump(model, 'model.joblib')
joblib.dump(scaler, 'scaler.joblib')


['scaler.joblib']

In [17]:
class FileProcessor(FileSystemEventHandler):

    '''
    This class manages newly generated data files situated in the input directory.

    It reacts to events when new files are created by reading the file, 
    Transforming its data, utilizing a pre-trained model for predictions, and subsequently storing the results.
    '''

    def __init__(self, configuration):

        '''
        Sets up the FileProcessor using the given configuration.

        Args:
            configuration (dictionary): A dictionary that includes paths to the model, scaler, logs, and other parameters.
        '''

        self.configuration = configuration
        # Loading the pre-trained model from the file
        self.loaded_model = joblib.load(configuration['model_path'])
        # Loading the data scaler from the file
        self.data_scaler = joblib.load(configuration['scaler_path'])
        # Initializing the logging
        self.initialize_logging()

    def initialize_logging(self):

        '''
        Initiates logging to record information into the log file as specified in the configuration.
        '''
        logging.basicConfig(filename=self.configuration['log_path'], level=logging.INFO)
        logging.info('Application initiated.')

    def on_created(self, event):

        '''
        
        This function is called when a new file is generated within the input directory.

        It loads the data, processes it, makes predictions, saves the results and logs the process.

        Args:
            event (FileSystemEvent): Event representing file system changes.
        '''

        file_name = event.src_path
        logging.info(f'Detected new file: {file_name}')

        try:
            
            loaded_data = pd.read_csv(file_name)
            processed_data = self.preprocess_data(loaded_data)
            model_predictions = self.loaded_model.predict(processed_data)
            updated_predictions = processed_data.copy()
            updated_predictions['prediction'] = model_predictions

            # Storing outcomes in the output directory using the identical filename.
            updated_predictions.to_csv(os.path.join(self.configuration['output_directory'], os.path.basename(file_name)))

            # Creating visual representations of sensor data if indicated in the configuration.
            for sensor in self.configuration['sensors_to_draw']:
                self.visualize_sensor(updated_predictions, sensor)

            logging.info('File processing completed.')
        except Exception as ex:
            logging.error(f'Error while processing file: {ex}')

    def preprocess_data(self, data):

        '''
        This function converts the data into a suitable format for use by the model.

        It populates any absent values with the average of the column and normalizes the data utilizing the scaler that was preloaded.

        Args:
            data (pandas.DataFrame): Raw data to be processed.

        Returns:
            data (pandas.DataFrame): Data has been prepared and is now suitable for input to the model..
        '''

        data = data.fillna(data.mean())
        data = pd.DataFrame(self.data_scaler.transform(data), columns=data.columns)
        return data

    def visualize_sensor(self, data, sensor):
        '''
        This function visualizes the sensor data and saves it as a file.

        It plots the specified sensor's data and saves the plot in the image directory specified in the configuration.

        Args:
            data (pandas.DataFrame): Data containing the sensor values.
            sensor (str): Sensor to be visualized.
        '''

        figure, axis = plt.subplots()
        data[sensor].plot(ax=axis)
        # Saving the plot to a file
        figure.savefig(os.path.join(self.configuration['image_directory'], f'{sensor}.png'))


def parse_config(config_file):

    '''
    This function parses the configuration from the given JSON file.

    Args:
        config_file (str): Path to the JSON configuration file.

    Returns:
        configuration (dict): Configuration parameters as a dictionary.
    '''

    with open(config_file) as file:
        return json.load(file)


def application():

    '''
    Main application function.

    It reads the configuration, sets up the FileProcessor, and commences the file system observer to monitor for new files.
    This process continues until an interrupt signal is detected.
    '''

    # Loading configuration from JSON file
    config_data = parse_config('config.json')
    file_observer = FileProcessor(config_data)

    # Configuring a file event observer to monitor the input directory.
    file_event_observer = Observer()
    file_event_observer.schedule(file_observer, config_data['input_directory'], recursive=False)
    file_event_observer.start()

    # Continue program execution until an interrupt signal is encountered.
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        file_event_observer.stop()

    file_event_observer.join()


if __name__ == "__main__":
    application()
