# Anomaly detection and predictive maintenance for industrial devices

## Group 3

| Name   | ID |
| -------- | ------- |
| Braidi Federico  | 2122169    |
| Calandra Buonaura Lorenzo | 2107761     |
| Malagoli Pietro    |     |
| Turci Andrea  |   |

## Introduction

Nowadays, it has become of paramount importance for companies to collect a huge amount of data about their products. This data can have many applications depending on its type and quality and can empower the company itself to refine their products or to offer new services (i.e. predictive maintenance).

The dataset we work with in this project is a slice of a dataset produced by a refrigeration company by collecting the data provided by sensors present on the machines they sell. Our objective with it is divided in 2 main parts:

- Anomaly detection: We want to identify anomalies in the collected data which could correlate with problems during installation, deterioration of machine parts or bad external conditions. As we will see, here we consider as anomalies frequent turn on/off of engines and their overuse/underuse.

- Predictive Maintenance: Once anomalies are detected we want to find correlations between them and other measured metrics to learn what might produce the malfunctionings and either correct possible problems in design/installation or have a way to predict faults by focusing on the correlated features in the database and provide predictive maintenance.

## Dataset

The dataset we are provided with is a part of a much bigger dataset from a company in the field of refrigeration. Let's inspect the structure of it by looking at an example record:
<center>

| when | hwid | metric | value |
|------|------|--------|-------|
|1601510485159|SW-065|SA4|0|

</center>


- The first column, 'when', is the time of the measurement in the UNIX timestamp format, it indicates the number of milliseconds since 1st of January 1970. It is saved as an int64.

- The second column is the ID of the machine, there are four of them in our dataset (SW-065, SW-088, SW-106, SW-115).

- The third column is the name of the measured metric, there are 133 of those and we are given a table to check what every code means.

- The fourth column is the value of the measured metric, depending on what metric it is it can be an int, a float or a binary value. All the info about the number of decimals and the unit of measurement are contained in the table we are given together with the dataset.

## Tasks

Our work is divided in 3 main parts:

- Analysis of engine turn-on/turn-off

- Analysis of engine working percentage

- Analysis of machine circuit alarms

Let's look at a trasversal task first and then at these one by one:

#### Normalization of the dataset

For any of the correlation parts of the tasks we need the data from the two metrics of which we want to test the correlation to be normalized to the same sampling frequency. This is needed because otherwise we would have different length values vectors and each point in the two vectors would most likely not refer to the same time, making finding correlations very hard.

As we will see in the code explanation part, for every machine and metric we decided to group records based on which minute from the beginning of the measurements they belonged to. The aggregation function for the 'value' column is decided based on the nature of the metric, for float valued metrics we can use mean() but for integer (or binary) valued ones we have to choose between max(), min(), sum(). Finally, missing minutes are filled in with values generated by copying the last value seen.

#### Engine turn-on/turn-off

Every machine in the dataset works thanks to 4 motors. Each of these motors sends a ping to the database usually every 30/60s via the metrics S117,S118,S169,S170 for engine 1,2,3,4 respectively. A high frequency of engine turn-on/turn-off might be a sign of issues during installation, bad external conditions or deterioration of some parts of the machines. 

We need to identify when these anomalies happen by looking at the length of the periods where the motors are on or off and confronting them with the usual sampling time. For example if the time a motor is on is around 30s it generally means that we had a series of "off" pings and a single "on" ping in the middle, which is an anomaly and might be a sign of problems.

Once we identify the anomalies we need to conduct a correlation study between the motor-state metric and other metrics in the dataset. Since the metrics are many and we also want to check for time-shifted correlations, we need to restrict the amount of metrics to check to the ones we believe could have a higher chance to be linked. We decided to check correlation with **INSERT CHECKED METRICS**

#### Engine working percentage

The four motors we analyzed in the previous section, in each machine, are divided in two circuits. The motors associated with metrics S117 and S118 are part of circuit 1 and those relative to metrics S169 and S170 to circuit 2. Each of these circuits has a associated metric that describes the working percentage of its motors in the range 0-100% relative to the max work load that the motors can manage. Studying this data series can help us discover signs of deteriorated parts, bad external conditions or even design flaws (over/under dimensioned machines). 

Our objective is to find any high or low workload of motors to determine the previously mentioned problems. Furthermore we want to find correlations, if there are any, between these working percentages and the external temperature, stored in metric S41.

#### Engine circuit alarms

In our dataset, two metrics are treated differently from the others: A5 and A9. These metrics are used to convey alarm signals of 16 parts of a circuit (A5 for circuit 1 and A9 for circuit 2). This technique is frequently used in industrial applications such as this one where, due to the high amount of sensors on a single machine part, it becomes much more beneficial to group all of their 0-1 values into a binary number and send only 1 signal. In our case there are 16 sensors that give information about the state of specific parts in the circuit (0 for good condition, 1 for errors), these binary values are concatenated and tranformed to decimal, obtaining a number in the range 0-65535 that is sent as the value of the A5/A9 metric.

For this part we should convert back the values to the 16 flags and consider it a malfunctioning if any of the 6,7,8 positioned bits are set to 1, this indicates overheating. Then we should try to look for correlations between the series of values for each bit (each alarm) and other variables in the dataset to possibly identify why these problems happend. Furthermore, we should check for correlations between the malfunctionings as a whole and other variables. These correlations should be time-shifted, meaning that we would like to find some metric that allows us to predict future malfunctionings.

## Code

For the code part, we created a library **aux.py** to contain all of our functions, and used notebooks to call them and visualize the results. We will now proceed at explaining the functions we created and the ones we used from other libraries, ordered by their apparence in the project.

### Cluster creation

First of all, we had to setup the cluster and connect to it. To do so, we used the `SSHCluster` function of `dask.distributed`:

```python 
cluster = SSHCluster(["localhost", "localhost", "10.67.22.61", "10.67.22.67"],connect_options={"password":"********","username":"*******"},scheduler_options={"port": 0, "dashboard_address": ":8787"})
```

This function is called from the 10.67.22.165 machine, which we adopted as the head of the cluster. It creates a cluster with localhost as its head, and 3 attached machines: localhost itself, 10.67.22.61 and 10.67.22.67. The cluster is created using one of our credentials and sets up the scheduler's dashboard on port 8787, if not already occupied.

Any time someone wants to run code on the cluster, they need to check wether one is already up by looking at the dashboard at port 8787 of the machine (this can be done by connecting the local computer's 8787 port to 10.67.22.165 with an ssh command). Once a cluster is up, one can connect any notebook to it by creating a dask Client object using the command:

```python
client = Client('10.67.22.165:*port*')
```

Where *port* needs to be substituted by the port shown in the INFO section of the dashboard.

### Data Import

The next step is importing the data from the provided file. We decided to work with dataframes and, because of that, we had to keep the file in .csv unzipped form to make it readable to a Dask Dataframe. To avoid storing multiple unzipped copies of a 5 Gb file and filling up the memory, we created a common folder, outside of our personal user folders on every machine, containing a copy of the .csv file. At the beginning of a notebook, we usually imported it like:

```python
path=os.path.abspath('/home/data/data.csv')

df = dd.read_csv(path)
df.head()
```

Where we used the `os` Python library and imported dask.dataframes as `dd`.

### Task 1

The first part of Task 1 was about finding the anomalies in the metrics that represent the on-off state of the 4 motors present in each hardware. As explained, we define an anomaly as a period of "on-time" or "off-time" that lasts 1 minute or less. To find the anomalies, we wrote some functions, which we split if a mid-product of the computing process is needed:

```python
def aggregate_up_down(df, hwid, metric):

    selection=df[df['hwid']==hwid]
    selection=selection[selection['metric']==metric].compute()  

    selection['switch']=selection['value'].diff().map(abs).map(lambda x: True if x==1 or math.isnan(x) else False)
    selection=selection[selection['switch']]
    selection['times']=selection['when'].diff()/1000/60
    selection['value']=selection['value'].map(lambda x: 0 if x==1 else 1)
    times=list(selection['times'])[1:]
    values=list(selection['value'])[1:]
    history=[[value,time] for value,time in zip(values,times)]

    return history
```
The `aggregate_up_down()` function takes a Dask Dataframe (`df`), an hardware id (`hwid`) and a metric label (`metric`) as inputs. It then filters `df` to only keep the rows relative to the given hardware and metric and computes to get a Pandas Dataframe. We do this here because the filtered dataframe is small enough to fit comfortably in memory and the `diff` function that we use on it later is more efficient on pandas dataframes because of the absence of partition boundaries. Later, we create a new column which is set to `True` if the absolute value of the difference between the current row's value and the previous one is 1 (i.e. if there's a switch in the motor's state), `False` otherwise.

A new column `times` is created as the result of the `diff()` of the `when` column, then converted into minutes; this way, we have a column of the periods of up-time and down-time,alternated. Due to how the `diff()` function calculates the difference, the values in the `value` column are inverted with respects to the real ones, so we flip them (invert 0-1 and viceversa). 

Finally, we get the periods (`times`) and the machine states (`values`) and combine them into a list where each element is composed of:
- machine state.
- period of time for which the motor has remained in that state.

```python
def getuniquecouples(df):
    g=df.groupby(['hwid','metric']).mean().compute()
    hwids=list(g.index.get_level_values('hwid'))
    metrics=list(g.index.get_level_values('metric'))
    combos=zip(hwids,metrics)
    combos=sorted(combos,key=lambda x:x[0])
    
    return combos
```

This function takes as input a Dask Dataframe and finds all the couples (of hardware and metric) present by grouping on those columns and extracting the grouped indexes. These two vectors are then zipped to preserve the order and sorted according to the hardware id. This is done in order to use the result in for loops and avoid checking non-existing couples that would result in empy Dataframes.

Finally, the last function we used to get the number of anomalies for each metric and hardware is:

```python
def find_anomaly_number(df):

    zipped=getuniquecouples(df)

    limitlow=(30+1)/60
    limithigh=(60+1)/60

    for hw,metric in zipped:
            print(f"hw: {hw} metric: {metric}")
            agg=aux.aggregate_up_down(df,hw,metric)
            times=[el[1] for el in agg]
            zeros=[x for x in times if x==0]
            print(f"The instances of 1 single isolated uptime or downtime reading are: {len(zeros)}")
            between=[x for x in times if (x>limitlow and x<limithigh)]
            print(f"The instances of uptimes or downtimes between 30 sec and 1 min are: {len(between)}")
            low=[x for x in times if (x>0 and x<limitlow)]
            print(f"The instances of <30 seconds uptimes or downtimes are: {len(low)}")
```

This function is given a Dask Dataframe (in our case the imported `df` filtered on metrics 'S117','S118','S169','S170'). It first calls the function `getuniquecouples()` that returns the existing hardware-metric couples. We then define two thresholds: one at 30 seconds and one at 60. These are decided on the basis of the sampling frequencies in the dataset. We define them as (30+1)/60 and (60+1)/60 to include those that should be at 30 and 60 but are actually some milliseconds longer. Finally, for each combination of hardware and metric, we call the `aggregate_up_down()` function and count how many 0s/0-30s/30-60s intervals we find. 

In the second part, we want to find correlations between the on-off variables relative to the motors and other variables in the dataset. We move on two fronts: checking global relations between the behaviour of the engines and other metrics, and checking local correlations in the neighbourhood of the anomalies. As explained in the introduction part, to test correlations we employed 3 methods: Pearson's, Spearman's and Kendall's coefficients. All of these techniques require the two compared data vectors to be of the same length and, in order to give a meaningful result, to have points relative to the same time. To do so, we need to normalize our measurements, which we do by using the following functions:

```python
def get_norm_data(df,hwid,metric,aggfunc='mean'):
    selection=df[df['hwid']==hwid]
    selection=selection[selection['metric']==metric]
    selection['minute']=selection['when'].map(lambda x: int((x - 1601510422405) / 60000),meta=('when','int64'))

    if aggfunc=='mean':
        g = selection.groupby(['hwid', 'metric', 'minute']).mean().compute().sort_values(by='minute')
    elif aggfunc=='max':
        g = selection.groupby(['hwid', 'metric', 'minute']).max().compute().sort_values(by='minute')
    elif aggfunc=='min':
        g = selection.groupby(['hwid', 'metric', 'minute']).min().compute().sort_values(by='minute')
    elif aggfunc=='sum':
        g = selection.groupby(['hwid', 'metric', 'minute']).sum().compute().sort_values(by='minute')
    else:
        raise ValueError('Non supported aggfunction')

    minutes=list(g.index.get_level_values('minute'))

    return list(g['value']),minutes
```

This function is designed to be used to normalize the values of one of the possibly correlated metrics, before checking. It takes a Dataframe, the hardware, the metric and an aggregation function. It then filters the Dataframe with the hardware and the metric and adds a column `minute` that contains the minute (from the first value of `when` present in the dataset) which the measurement belongs to. The dataframe is finally grouped by this new column (together with the hardware and the metric), the selected aggregation function is applied to `when` and `value` and it is computed and sorted with respects to `minute`. In the end, the function returns the `value` column as a list and the parallel `minute` column which will be useful in calling the next function.

```python
def fill_missing_min(values,minutes):

    tmax=max(minutes)
    lastvalue=0
    fmin=minutes.copy()
    fvals=values.copy()

    for i in range(1,tmax):
        if i not in minutes:
            fmin.append(i)
            fvals.append(lastvalue)
        else:
            index=minutes.index(i)
            lastvalue=fvals[index]

    _, filled_values = zip(*sorted(zip(fmin, fvals)))

    return list(filled_values)
```

This function is needed for filling values in the minutes that are missing from the measurements. In this case, we decided to fill values copying from the last seen value, supposing that when we have no news from a certain metric it remains constant. This function takes as inputs the outputs of the previous one and goes through them looking for missing minutes. Subsequently, it fills the minutes and the value in the corresponding vectors and sorts them with respects to the minutes.

The process is slightly different for the metric of the engine itself because we need to create a new `anomaly` column as a flag of wether or not a certain record is part of an anomaly and then normalize and fill both the `value` and `anomaly` columns. With that said, we define some new functions, similar to the previous ones:

```python
def fill_missing_anomaly(anomalies, values, minutes):

    tmax=int(max(minutes))
    lastvalue=0
    fmin=minutes.copy()
    fvals=values.copy()
    fanom=anomalies.copy()

    for i in range(1,tmax):
        if i not in minutes:
            fmin.append(i)
            fvals.append(lastvalue)
            fanom.append(0)
        else:
            index=minutes.index(i)
            lastvalue=fvals[index]

    _, filled_values, filled_anomalies = zip(*sorted(zip(fmin, fvals, fanom)))

    return list(filled_values), list(filled_anomalies)
```

This one takes an anomaly, a value and a minutes column (in list form), checks for missing minutes and, for each, inserts a 0 value (in the anomalies) and copies the last number seen for the values.The difference in the filling of the anomalies is due to our decision to avoid supposing an anomaly during a missing data period. The function finally returns both the values and anomalies filled lists.

```python
def is_within_intervals(value, intervals):
    for start, end in intervals:
        if start <= value <= end:
            return 1
    return 0
```

```python
def get_norm_val_anom(df, hwid, metric, aggfunc='mean'):
    history=aux.aggregate_up_down(df,hwid,metric)

    history_new=[int(x[1]*60000) for x in history]

    selection=df[df['hwid']==hwid]
    selection=selection[selection['metric']==metric]

    initial_time=selection['when'].min().compute()

    a=[(initial_time+sum(history_new[:i]),initial_time+sum(history_new[:i+1])) for i,x in enumerate(history_new) if x < (60+1)*1000]

    selection['anomaly']=selection['when'].apply(lambda x: aux.is_within_intervals(x, a), meta=('when', 'bool'))

    agg_dict = {'value': aggfunc, 'anomaly': 'max'}

    selection['minute']=selection['when'].map(lambda x: int((x - 1601510422405) / 60000),meta=('when','int64'))
    
    g = selection.groupby(['hwid', 'metric', 'minute']).agg(agg_dict).compute().sort_values(by='minute')

    
    minutes=list(g.index.get_level_values('minute'))
    anomaly = list(g['anomaly'])
    value = list(g['value'])

    filled_val, filled_anom = aux.fill_missing_anomaly(anomaly, value, minutes)

    time = np.arange(0,len(filled_anom))

    return time, filled_anom, filled_val
```

This function takes as input a dataframe, an hardware id, a metric and an aggregation function. It first calls the function `aggregate_up_down()`, transforms the on-off periods into milliseconds and filters the dataset on the given metric and hardware. It then calculates the initial and final time (`initial_time`,`final_time`) of the anomalies periods (i.e. for periods of lenght < (60+1)x1000 ms) and creates a new column `anomaly` by applying the `is_within_intervals()` function, which returns `True` if a recors is inside of one of the intervals, `False` otherwise.

After that, it adds a `minute` column, groupes on it (together with hardware id and metric) and applies a max aggregation function over `anomaly` and the desired aggregation function over `values`. Finally the Dataframe is computed, the columns are transformed into lists and the `filling_missing_anomaly()` function is called. In the end, the function returns the 3 vectors `time`, `filled_anom` and `filled_val`.

To complete the analysis, for each hardware and motor metric, we normalize and fill the possibly-correlated metrics listed in the Introduction section, using the `get_norm_data()` and `fill_missing_min()` functions and we append them to a list `metric_list()`. We then normalize and fill the hardware-metric of interest with `get_norm_val_anom()` and `fill_missing_anom()` functions.

We call the following function with engine being the normalized and filled hardware-metric vector, `metric_list` being the just-defined list of filled vectors and `window` being the one-sided lenght of the slice to be taken around the anomaly. 


```python
def corr_finder(engine, metric_list, window):
    
    corr_coeff_pearson = []
    corr_coeff_spearman = []
    corr_coeff_kendall = []
    tot_corr_coeff = []

    for k in range(len(metric_list)):
        corr_coeff_pearson.append([])
        corr_coeff_spearman.append([])
        corr_coeff_kendall.append([])

    for metric in metric_list:
        dftot = pd.DataFrame({'values_engine': list(engine.iloc[:,1]), 'values_metric': list(metric.iloc[:,1])})
        corrs = [dftot.corr(method='pearson').iloc[0,1],dftot.corr(method='spearman').iloc[0,1],dftot.corr(method='kendall').iloc[0,1]]
        tot_corr_coeff.append(corrs)

    for i in range(len(engine['anomaly'])):
        if engine.iloc[i,1]:
            values_engine = list(engine.iloc[i-window:i+window+1,2])
            for j,metric in enumerate(metric_list):
                values_metric = list(metric.iloc[i-window:i+window+1,1])

                df = pd.DataFrame({'values_engine': values_engine, 'values_metric': values_metric})
                corr_coeff_pearson[j].append(df.corr(method='pearson').iloc[0, 1])
                corr_coeff_spearman[j].append(df.corr(method='spearman').iloc[0, 1])
                corr_coeff_kendall[j].append(df.corr(method='kendall').iloc[0, 1])
    return corr_coeff_pearson,corr_coeff_spearman,corr_coeff_kendall,tot_corr_coeff
```

As we can see this function defines 4 lists: 1 for each of the local correlations and 1 for the global ones. A first for loop checks for global correlations between `engine` and any of the metrics in `metric_list`. The second for loop spans the whole 'anomaly' column and, when it finds a 1 (presence of an anomaly), takes the neighbouring slice of `engine` and of every `metric` in `metric_list`, puts them in couples into dataframes and uses the `.corr()` function to calculate the three types of correlations. Finally the lists are returned for further analysis and visualization.

### Task 2

The first part of this task involves checking for signs of under/over-dimensioned machines, deteriorated parts or unsuitable external conditions by analyzing the metrics 'S125' and 'S181'. Later, we will try to find correlations between them and the external temperature metric, 'S41'.

To tackle the first objective we iterate through the hardwares (`hwids=['SW-065','SW-088','SW-106','SW-115']`) and the metrics (`metrics=['S125','S181']`) using the code below. For each combination, we normalize and fill the measurements with the appropriate functions and print the active and maximum capacity hours with respects to the total measurement hours.

```python
for hw in hwids:
    for metric in metrics:
        perc_val,perc_min=aux.get_norm_data(df,hw,metric)
        perc_fill=aux.fill_missing_min(perc_val,perc_min)

        total_hours=len(perc_fill)/60
        active_hours=len([x for x in perc_fill if x!=0])/60
        max_hours=len([x for x in perc_fill if x==100])/60

        print('############')
        print(f'Analyzing the active time of metric {metric} for machine {hw}')
        print(f'The circuit is active for a total of {active_hours} over {total_hours}\
             of measurements ({active_hours/total_hours*100}% of the measurement time).')
        print(f'The circuit runs at max capacity for {max_hours} hours ({max_hours/total_hours*100}% of the measurement time).')
```

For the second step we just need to iterate through the hardwares, normalize and fill the 'S125','S181' and 'S41' metrics. We then cut them to be of the same length (it might be the case that both metrics don't have their last measurement on the same minute), arrange them in Dataframes and calculate the 3 global correlations. 

```python
for hw in hwids:
    load1_val,load1_min=get_norm_data(df,hw,'S125')
    load1_fill=fill_missing_min(load1_val,load1_min)
    load2_val,load2_min=get_norm_data(df,hw,'S181')
    load2_fill=fill_missing_min(load2_val,load2_min)
    temp_val,temp_min=get_norm_data(df,hw,'S41')
    temp_fill=fill_missing_min(temp_val,temp_min)
    temp_fill_min=np.arange(0,len(temp_fill))

    min1=min(len(temp_fill),len(load1_fill))
    min2=min(len(temp_fill),len(load2_fill))

    df1=pd.DataFrame({'S41':temp_fill[:min1],'S125':load1_fill[:min1]})
    df2=pd.DataFrame({'S41':temp_fill[:min2],'S181':load2_fill[:min2]})

    print(f"Full correlations between S125 and S41 of machine {hw}:\n")
    print(f"Pearson Correlation: {df1.corr(method='pearson').iloc[0,1]}\n")
    print(f"Spearman Correlation: {df1.corr(method='spearman').iloc[0,1]}\n")
    print(f"Kendall Correlation: {df1.corr(method='kendall').iloc[0,1]}\n")
    print(f"Full correlations between S181 and S41 of machine {hw}:\n")
    print(f"Pearson Correlation: {df2.corr(method='pearson').iloc[0,1]}\n")
    print(f"Spearman Correlation: {df2.corr(method='spearman').iloc[0,1]}\n")
    print(f"Kendall Correlation: {df2.corr(method='kendall').iloc[0,1]}\n")
```

### Task 3

The first step for this task is to convert the values of the 'A5'/'A9' metrics to extract the signals from the different parts of the 1/2 circuit, respectively. Furthermore we want to flag a record with 1 if it contains an error on one or more of the 6/7/8 positioned bits. To do this, we define the follwing functions:

```python
def check_fault(row,metric):
    return int((row[f'{metric}_6']==1) or (row[f'{metric}_7']==1) or (row[f'{metric}_8']==1))
```

```python
def separate_readings(df,hwid,metric):
    selection=df[df['hwid']==hwid]
    selection=selection[selection['metric']==metric]

    selection[f'{metric}_6']=selection['value'].apply(lambda value: int(((bin(value)[2:]).zfill(16))[-6]),meta=('value', 'int64'))

    selection[f'{metric}_7']=selection['value'].apply(lambda value: int(((bin(value)[2:]).zfill(16))[-7]),meta=('value', 'int64'))

    selection[f'{metric}_8']=selection['value'].apply(lambda value: int(((bin(value)[2:]).zfill(16))[-8]),meta=('value', 'int64'))

    selection['minute']=selection['when'].map(lambda x: int((x - 1601510422405) / 60000),meta=('when','int64'))

    agg_dict = {f'{metric}_6': 'max', f'{metric}_7': 'max', f'{metric}_8': 'max'}
    
    g = selection.groupby(['hwid', 'metric', 'minute']).agg(agg_dict).compute().sort_values(by='minute')

    minutes=list(g.index.get_level_values('minute'))
    alarm6 = list(g[f'{metric}_6'])
    alarm7 = list(g[f'{metric}_7'])
    alarm8 = list(g[f'{metric}_8'])

    tmax=int(max(minutes))
    fmin=minutes.copy()
    fvals6=alarm6.copy()
    fvals7=alarm7.copy()
    fvals8=alarm8.copy()

    for i in range(1,tmax):
        if i not in minutes:
            fmin.append(i)
            fvals6.append(0)
            fvals7.append(0)
            fvals8.append(0)

    _, filled_a6, filled_a7, filled_a8 = zip(*sorted(zip(fmin, fvals6, fvals7, fvals8)))

    time = np.arange(0,len(filled_a7))

    new_df = pd.DataFrame({'minute': time, f'{metric}_6': filled_a6, f'{metric}_7': filled_a7, f'{metric}_8': filled_a8})
    
    new_df['fault'] = new_df.apply(aux.check_fault,axis=1,args=(metric,))
    
    return new_df
```

The function `separate_readings()` takes as input a Dask Dataframe, an hardware id and a metic ('A5'/'A9'). It then filters the dataframe and adds 3 columns, 1 for each of the bits we are interested in (6,7,8) obtained by converting the `value` column into binary and extracting the corresponding digits. After that it creates the `minute` column and groups on it with a `max()` aggregation function on the newly created bit columns. The Dask Dataframe is then computed to get a Pandas one, the columns are extracted, filled with 0 when there are missing minutes (we suppose no errors when we don't have data) and put back together into a Dataframe. Finally, a new column `fault` is created calling a `check_fault()` function on the dataframe, which is basically an 'or' function between the bit columns.

Just like in Task 1, we want to find both general correlations between the error signals and other metrics in the data and local ones in the neighbourhood of the errors themselves. To reach the goal of *predictive maintenance*, however, it is not enough to find these instantaneous correlations and we should look for shifted ones. Here we consider a shifted correlation to be present if a slice of data around the error in the error column is correlated with an equal but shifted back slice in the other metric's column. To be able to check for these types of correlations, we slightly modify the `corr_finder()` function we used earlier:

```python
def corr_finder_shift(alarm, metric_list, window, shift):
    
    corr_coeff_pearson = []
    corr_coeff_spearman = []
    corr_coeff_kendall = []
    tot_corr_coeff = []

    for k in range(len(metric_list)):
        corr_coeff_pearson.append([])
        corr_coeff_spearman.append([])
        corr_coeff_kendall.append([])

    for metric in metric_list:
        max_length = min(len(alarm['minute']), len(metric['Minute']))
        dftot = pd.DataFrame({'values_alarm': list(alarm.iloc[:max_length,1]), 'values_metric': list(metric.iloc[:max_length,1])})
        corrs = [dftot.corr(method='pearson').iloc[0,1], dftot.corr(method='spearman').iloc[0,1], dftot.corr(method='kendall').iloc[0,1]]
        tot_corr_coeff.append(corrs)

    for i in range(len(alarm['minute'])):
        if alarm.iloc[i,1]:
            if i-window-shift<0 or i+window>len(alarm['minute']):
                for j in range(len(metric_list)):
                    corr_coeff_pearson[j].append(0)
                    corr_coeff_spearman[j].append(0)
                    corr_coeff_kendall[j].append(0)
            else:
                start_index = i-window
                end_index = i+window
                values_alarm = list(alarm.iloc[start_index:end_index+1,1])
                for j,metric in enumerate(metric_list):
                    start_shifted = start_index-shift
                    end_shifted = end_index-shift
                    
                    values_metric = list(metric.iloc[start_shifted:end_shifted+1,1])
        
                    df = pd.DataFrame({'values_alarm': values_alarm, 'values_metric': values_metric})
                    corr_coeff_pearson[j].append(df.corr(method='pearson').iloc[0, 1])
                    corr_coeff_spearman[j].append(df.corr(method='spearman').iloc[0, 1])
                    corr_coeff_kendall[j].append(df.corr(method='kendall').iloc[0, 1])
                    
    return tot_corr_coeff,corr_coeff_pearson,corr_coeff_spearman,corr_coeff_kendall
```

As we can see, before extracting the slices, we always check for the extremes, in order not to go out of the boundaries of the arrays, to avoid IndexOutOfBound Errors. 


## Analysis

### Choice of cluster parameters