In [None]:
# Importing libraries
import pandas as pd
from sklearn.ensemble import IsolationForest
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler 
import joblib 
import os
import logging
import json
import time
import matplotlib.pyplot as plt
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler  


In [None]:
# load the data
df = pd.read_csv('sensor.csv', parse_dates=['timestamp'])

# Split the data into training and testing and validation sets
train_df = df[df['timestamp'] < '2018-07-01']
valid_df = df[(df['timestamp'] >= '2018-07-01') & (df['timestamp'] < '2018-08-01')]
test_df = df[df['timestamp'] >= '2018-08-01'] 

In [None]:
# save these three parts to separate CSV files:
train_df.to_csv('train.csv', index=False)
valid_df.to_csv('valid.csv', index=False)
test_df.to_csv('test.csv', index=False)

Given this is anomaly detection on time-series data, one possible method is to use an Isolation Forest. This is an unsupervised learning algorithm that works well for anomaly detection.

In [None]:
# Load the training data
df_train = pd.read_csv('train.csv', parse_dates=['timestamp'])

# Inspect the data
print(df_train.isna().sum())
print(df_train.var())


In [None]:
# Data cleaning
# Dropping the columns we won't use
df_train.drop(['Unnamed: 0', 'timestamp', 'machine_status'], axis=1, inplace=True)

# Fill any NaN values with the mean
df_train.fillna(df_train.mean(), inplace=True)

# Drop 'sensor_15' column
df_train.drop('sensor_15', axis=1, inplace=True)


In [None]:
# Then, check again if any NaN values still exist
print(df_train.isna().sum())

### ML: Training the model

In [None]:
# Scale the features
scaler = StandardScaler()
df_scaled = pd.DataFrame(scaler.fit_transform(df_train), columns=df_train.columns)

# Define the model
model = IsolationForest(contamination=0.05)

# Fit the model
model.fit(df_scaled)

# Apply the trained model to the data
scores = model.decision_function(df_scaled)


In [None]:
# Save the model and the scaler
joblib.dump(model, 'model.joblib')
joblib.dump(scaler, 'scaler.joblib')


In [17]:
class FileProcessor(FileSystemEventHandler):
    '''This class manages new data files from the input directory.

    It reacts to new file creation events by loading the file, transforming its data, making predictions with
    a pre-trained model, and saving the results.
    '''

    def __init__(self, configuration):
        '''Initializes the FileProcessor with the provided configuration.

        Args:
            configuration (dict): Configuration dictionary containing paths to model, scaler, logs and other parameters.
        '''

        self.configuration = configuration
        # Loading the pre-trained model from the file
        self.loaded_model = joblib.load(configuration['model_path'])
        # Loading the data scaler from the file
        self.data_scaler = joblib.load(configuration['scaler_path'])
        # Initializing logging
        self.initialize_logging()

    def initialize_logging(self):
        '''Establishes logging to write to the log file as defined in the configuration.'''
        logging.basicConfig(filename=self.configuration['log_path'], level=logging.INFO)
        logging.info('Application initiated.')

    def on_created(self, event):
        '''This method is invoked when a new file is created in the input directory.

        It loads the data, processes it, makes predictions, saves the results and logs the process.

        Args:
            event (FileSystemEvent): Event representing file system changes.
        '''

        file_name = event.src_path
        logging.info(f'Detected new file: {file_name}')

        try:
            # Loading the data from the new file
            loaded_data = pd.read_csv(file_name)
            # Processing the data for model input
            processed_data = self.preprocess_data(loaded_data)
            # Making predictions with the loaded model
            model_predictions = self.loaded_model.predict(processed_data)
            # Appending predictions to the processed data
            updated_predictions = processed_data.copy()
            updated_predictions['prediction'] = model_predictions
            # Saving results in the output directory with the same filename
            updated_predictions.to_csv(os.path.join(self.configuration['output_directory'], os.path.basename(file_name)))

            # Visualizing sensor data if specified in the configuration
            for sensor in self.configuration['sensors_to_draw']:
                self.visualize_sensor(updated_predictions, sensor)

            logging.info('File processing completed.')
        except Exception as ex:
            logging.error(f'Error while processing file: {ex}')

    def preprocess_data(self, data):
        '''This method processes the data into a format acceptable by the model.

        It fills the missing values with mean of the column and scales the data using the pre-loaded scaler.

        Args:
            data (pandas.DataFrame): Raw data to be processed.

        Returns:
            data (pandas.DataFrame): Processed data ready for model input.
        '''

        data = data.fillna(data.mean())
        data = pd.DataFrame(self.data_scaler.transform(data), columns=data.columns)
        return data

    def visualize_sensor(self, data, sensor):
        '''This method visualizes the sensor data and saves it as a file.

        It plots the specified sensor's data and saves the plot in the image directory specified in the configuration.

        Args:
            data (pandas.DataFrame): Data containing the sensor values.
            sensor (str): Sensor to be visualized.
        '''

        figure, axis = plt.subplots()
        data[sensor].plot(ax=axis)
        # Saving the plot to a file
        figure.savefig(os.path.join(self.configuration['image_directory'], f'{sensor}.png'))


def parse_config(config_file):
    '''This function parses the configuration from the given JSON file.

    Args:
        config_file (str): Path to the JSON configuration file.

    Returns:
        configuration (dict): Configuration parameters as a dictionary.
    '''

    with open(config_file) as file:
        return json.load(file)


def application():
    '''Main application function.

    It loads the configuration, initializes the FileProcessor and starts the file system observer to watch for new files.
    It keeps running until an interrupt signal is received.
    '''

    # Loading configuration from JSON file
    config_data = parse_config('config.json')
    # Initializing the file processor with configuration
    file_observer = FileProcessor(config_data)

    # Setting up file event observer to watch the input directory
    file_event_observer = Observer()
    file_event_observer.schedule(file_observer, config_data['input_directory'], recursive=False)
    file_event_observer.start()

    # Keep the program running until an interrupt signal is received
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        file_event_observer.stop()

    file_event_observer.join()


if __name__ == "__main__":
    application()
