In [97]:
import pandas as pd
import numpy as np
from scipy.fft import fft, rfft
from scipy.fft import fftfreq, rfftfreq
import os
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from imblearn.over_sampling import RandomOverSampler
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import StandardScaler

### Data Preparation using Data Augmentation and Feature Extraction using Fast Fourier Transform

In [98]:
path = "./DAQ_Healthy/"
positions = ['Position-I/', 'Position-II/']
waveforms = ['Sine/', 'Square/', 'Triangle/']

df = {}
for i in range(0,32):
    df[i]=[]
df['Healthy']=[]

for position in positions:
    for waveform in waveforms:
            curpath = path + position + waveform
            for i in range(9,31):
                data = pd.read_csv(curpath+str(i)+'.csv')

                seq1 = data['Amplitude - Voltage_0']
                seq2 = np.array(seq1.rolling(
                    window=2,
                    center=True,
                    min_periods=1
                ).mean())
                seq1 = np.array(seq1)

                fourier1 = fft(seq1)
                fourier1 = np.abs(fourier1[0:32])/(len(seq1)/2)
                fourier2 = fft(seq2)
                fourier2 = np.abs(fourier2[0:32])/(len(seq2)/2)
                for j in range(0,32):
                    df[j].append(fourier1[j])
                    df[j].append(fourier2[j])

                df['Healthy'].append(1)
                df['Healthy'].append(1)

path = "./DAQ_Unhealthy"
positions = ['_1/', '_2/']
for position in positions:
    for waveform in waveforms:
            curpath = path + position + waveform
            for i in range(9,31):
                data = pd.read_csv(curpath+str(i)+'.csv')

                seq1 = data['Amplitude - Voltage_0']
                seq2 = np.array(seq1.rolling(
                    window=2,
                    center=True,
                    min_periods=1
                ).mean())
                seq1 = np.array(seq1)

                fourier1 = fft(seq1)
                fourier1 = np.abs(fourier1[0:32])/(len(seq1)/2)
                fourier2 = fft(seq2)
                fourier2 = np.abs(fourier2[0:32])/(len(seq2)/2)
                for j in range(0,32):
                    df[j].append(fourier1[j])
                    df[j].append(fourier2[j])

                df['Healthy'].append(0)
                df['Healthy'].append(0)

data = pd.DataFrame(df)
data = data.sample(frac=1).reset_index(drop=True)

In [99]:
data.head()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,23,24,25,26,27,28,29,30,31,Healthy
0,0.001022,5.2e-05,0.000117,9.7e-05,4.9e-05,0.00014,0.000117,0.000159,0.004758,0.000161,...,0.000317,0.007833,0.000212,0.000462,0.000828,0.000317,0.000161,0.000595,0.000234,1
1,0.000734,1.3e-05,1.9e-05,4.7e-05,0.002339,3.7e-05,2.7e-05,6.8e-05,0.002237,0.000113,...,9.7e-05,0.001866,0.000173,0.000164,0.000176,0.001736,0.000138,0.000406,0.000258,1
2,0.000676,0.000406,0.000451,0.000444,0.000476,0.000454,0.000759,0.00082,0.000837,0.001115,...,0.002484,0.000772,0.000534,0.000498,0.000199,0.001644,0.000629,0.000114,0.000462,0
3,0.000232,0.000456,0.000472,0.000548,0.000615,0.000789,0.000782,0.001219,0.001804,0.004886,...,0.000264,0.00012,0.000326,8.4e-05,0.000633,0.000985,0.005332,0.000823,0.000144,1
4,0.000725,0.000317,0.000248,0.000402,0.000287,0.000352,0.000588,0.000857,0.00174,0.010308,...,0.000354,0.00048,0.000277,0.00036,0.00134,0.002413,0.000766,0.000545,0.000585,0


In [100]:
len(data)

528

### Model Training and Evaluation without Principal Component Analysis

In [101]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

X, y = data.drop(['Healthy'], axis=1), data['Healthy']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, shuffle=True, random_state=42)

model = RandomForestClassifier()
model.fit(X_train, y_train)

y_pred = model.predict(X_test)
print("Accuracy:" + str(accuracy_score(y_test, y_pred)))
print("Precision:" + str(precision_score(y_test, y_pred, average='weighted')))
print("Recall:" + str(recall_score(y_test, y_pred, average='weighted')))
print("F1-score:" + str(f1_score(y_test, y_pred, average='weighted')))

Accuracy:0.9371069182389937
Precision:0.9443360080965807
Recall:0.9371069182389937
F1-score:0.936982407069844


### Principal Component Analysis (Dimensionality Reduction)

In [102]:
from statsmodels.multivariate.pca import PCA

X, y = data.drop(['Healthy'], axis=1), data['Healthy']
X = PCA(X, ncomp=16).factors
X.head()

Unnamed: 0,comp_00,comp_01,comp_02,comp_03,comp_04,comp_05,comp_06,comp_07,comp_08,comp_09,comp_10,comp_11,comp_12,comp_13,comp_14,comp_15
0,0.031243,0.036777,-0.029414,0.052928,0.112867,0.017441,0.054679,0.044393,-0.023308,-0.01262,0.064634,-0.007719,-0.000639,-0.072541,0.080297,0.005182
1,0.045597,0.006299,-0.020639,0.021709,0.0058,0.011826,0.013992,0.002219,-0.001943,0.019928,0.010811,0.012767,0.00137,-0.01919,0.003335,0.028832
2,0.022289,0.002733,0.023521,0.011693,-0.020384,0.043565,0.005756,0.00339,0.039342,0.037603,-0.004972,-0.000665,-0.008117,-0.00721,-0.000579,0.023349
3,0.008641,0.014013,-0.014798,-0.000555,-0.055916,0.021757,0.045885,-0.049336,-0.047768,0.007352,0.01745,-0.074573,-0.027047,0.080245,0.019219,0.030354
4,0.022047,0.020585,-0.037268,-0.008741,-0.039036,0.027243,0.018363,-0.028632,0.000762,0.012929,0.00076,0.016936,-0.011403,-0.025269,-0.038233,0.011565


### Model Training and Evaluation using Principal Component Analysis

In [103]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, shuffle=True, random_state=42)

model = RandomForestClassifier()
model.fit(X_train, y_train)

y_pred = model.predict(X_test)
print("Accuracy:" + str(accuracy_score(y_test, y_pred)))
print("Precision:" + str(precision_score(y_test, y_pred, average='weighted')))
print("Recall:" + str(recall_score(y_test, y_pred, average='weighted')))
print("F1-score:" + str(f1_score(y_test, y_pred, average='weighted')))

Accuracy:0.89937106918239
Precision:0.9002286110426548
Recall:0.89937106918239
F1-score:0.8992273135669361


### Feature Selection

In [104]:
class FeatureSelection:
    def __init__(self, data, clabel, oversample) -> None:
        X, y = data.drop(clabel, axis=1), data[clabel]
        self.clabel = clabel
        features = {_ for _ in data.columns}
        features.remove(clabel)
        self.features = features

        if oversample:
            over = RandomOverSampler()
            X, y = over.fit_resample(X,y)
        
        scaler = StandardScaler()
        X = pd.DataFrame(scaler.fit_transform(X), columns=list(features))

        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(X,y, test_size=0.2, shuffle=True, random_state=42)
        
    def wrapper(self, feature_subset):
        model = RandomForestClassifier(max_depth=16)
        model.fit(self.X_train[list(feature_subset)],self.y_train)
        y_pred=model.predict(self.X_test[list(feature_subset)])
        return accuracy_score(self.y_test,y_pred)
    
    def sfs(self, count, keep=set()):
        feature_subset = keep.copy()
        feature_set = self.features.copy() - keep
        
        if len(feature_set)==0:
            return (feature_subset, self.wrapper(feature_subset))
        
        accuracy = 0
        for _ in range(0,count):
            best_attribute = ""
            best_report = 0
            for attribute in feature_set:
                feature_subset.add(attribute)
                report = self.wrapper(feature_subset)*100
                if report > best_report:
                    best_attribute=attribute
                    best_report=report
                feature_subset.remove(attribute)
            
            accuracy = best_report
            feature_subset.add(best_attribute)
            feature_set.remove(best_attribute)

        return (feature_subset, accuracy)
    
    def sbr(self, count, fromset=set()):
        if len(fromset)==1:
            return (set(), 0)
        
        feature_set = self.features.copy() if len(fromset)==0 else fromset
        feature_subset = feature_set.copy()
        for _ in range(0,len(feature_set)-count):
            worst_attribute = ""
            best_report = 0
            for attribute in feature_subset:
                feature_subset.remove(attribute)
                report = self.wrapper(feature_subset)*100
                if report > best_report:
                    worst_attribute=attribute
                    best_report=report
                feature_subset.add(attribute)

            feature_subset.remove(worst_attribute)

        accuracy = self.wrapper(feature_subset)*100
        return (feature_subset, accuracy)
    
    def lrs(self, count, l, r):
        feature_set = self.features.copy()
        if l>r:
            feature_subset=set()
            while len(feature_subset)<count:
                feature_subset = self.sfs(l, keep=feature_subset)[0]
                feature_subset = self.sbr(len(feature_subset)-r, fromset=feature_subset)[0]
                feature_set=feature_set-feature_subset
            
            accuracy = self.wrapper(feature_subset)*100
            return (feature_subset, accuracy)
        else:
            feature_subset=feature_set.copy()
            while len(feature_subset)>count:
                temp_subset = self.sbr(len(feature_subset)-r)[0]
                feature_set = feature_set | (feature_subset-temp_subset)
                feature_subset = temp_subset
                temp_subset = self.sfs(l, feature_subset)[0]
                feature_set = feature_set - (temp_subset-feature_subset)
                feature_subset = temp_subset

            accuracy = self.wrapper(feature_subset)*100
            return (feature_subset, accuracy)
        
    def sffs(self, count):
        feature_subset = set()
        while len(feature_subset)<count:
            feature_subset, accuracy = self.sfs(1, keep=feature_subset)
            prev_acc = 0
            while accuracy > prev_acc:
                prev_acc = accuracy
                temp_subset, temp_acc = self.sbr(len(feature_subset)-1, fromset=feature_subset)
                if temp_acc>accuracy:
                    accuracy = temp_acc
                    feature_subset = temp_subset
            
        return (feature_subset, self.wrapper(feature_subset)*100)

    def sfbr(self, count):
        feature_subset = self.features.copy()
        while len(feature_subset)>count:
            feature_subset, accuracy = self.sbr(len(feature_subset)-1, fromset=feature_subset)
            prev_acc = 0
            while accuracy > prev_acc:
                prev_acc = accuracy
                temp_subset, temp_acc = self.sfs(1, keep=feature_subset)
                if temp_acc>accuracy:
                    accuracy = temp_acc
                    feature_subset = temp_subset
        
        return (feature_subset, self.wrapper(feature_subset)*100)

In [105]:
fs = FeatureSelection(data, "Healthy", False)
feature_subset = fs.sffs(3)[0]

In [106]:
feature_subset

{8, 12, 20}

### Model Training and Evaluation with Feature Selection

In [107]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

X, y = data.drop(['Healthy'], axis=1), data['Healthy']
X_train, X_test, y_train, y_test = train_test_split(X[[num for num in feature_subset]], y, test_size=0.3, shuffle=True, random_state=42)

model = RandomForestClassifier()
model.fit(X_train, y_train)

y_pred = model.predict(X_test)
print("Accuracy:" + str(accuracy_score(y_test, y_pred)))
print("Precision:" + str(precision_score(y_test, y_pred, average='weighted')))
print("Recall:" + str(recall_score(y_test, y_pred, average='weighted')))
print("F1-score:" + str(f1_score(y_test, y_pred, average='weighted')))

Accuracy:0.9245283018867925
Precision:0.9254900310758745
Recall:0.9245283018867925
F1-score:0.9244204851752021


### Visualizing the Healthy and Unhealthy Waveforms using K-Means

In [108]:
from sklearn.cluster import KMeans
import plotly.express as px
import plotly.graph_objects as go

In [109]:
kmeans_model = KMeans(n_clusters=2, init='k-means++',  max_iter=500, random_state=42, n_init=10)
kmeans_model.fit_predict(data[[num for num in feature_subset]])
cluster_centers = kmeans_model.cluster_centers_
pdata = np.expm1(cluster_centers)
points = np.append(pdata, cluster_centers, axis=1)
points = np.append(points, [[0],[1]], axis=1)
data["cluster"] = kmeans_model.labels_

In [110]:
if 2*sum(data['Healthy']==data['cluster'].astype(int)) < len(data):
    data["cluster"] = [int(not num) for num in list(data['cluster'])]

In [111]:
round(sum(data['Healthy']==data['cluster'].astype(int))/len(data)*100)

70

#### Thus 70% of the classification can be explained by the 8th, 12th and 20th harmonics

### Clustering based on the Cluster Labels

In [113]:
cluster0 = go.Scatter3d(
    x=data[data['cluster']==0][8],
    y=data[data['cluster']==0][12],
    z=data[data['cluster']==0][20],
    mode='markers',
    marker=dict(
        color='red',  
        size=10,
        symbol='circle'
    ),
    name='Unhealthy'  
)

cluster1 = go.Scatter3d(
    x=data[data['cluster']==1][8],
    y=data[data['cluster']==1][12],
    z=data[data['cluster']==1][20],
    mode='markers',
    marker=dict(
        color='green',  
        size=10,
        symbol='circle'
    ),
    name='Healthy'  
)

fig = go.Figure()
fig.add_trace(cluster0)
fig.add_trace(cluster1)

fig.update_layout(
    title='Healthy vs Unhealthy',
    scene=dict(
        xaxis=dict(title='8'),
        yaxis=dict(title='12'),
        zaxis=dict(title='20')
    ),
    showlegend=True,  
    width=800, 
    height=600
)

fig.show()

### Clustering based on the Class Labels

In [114]:
cluster0 = go.Scatter3d(
    x=data[data['Healthy']==0][8],
    y=data[data['Healthy']==0][12],
    z=data[data['Healthy']==0][20],
    mode='markers',
    marker=dict(
        color='red',  
        size=10,
        symbol='circle'
    ),
    name='Unhealthy'  
)

cluster1 = go.Scatter3d(
    x=data[data['Healthy']==1][8],
    y=data[data['Healthy']==1][12],
    z=data[data['Healthy']==1][20],
    mode='markers',
    marker=dict(
        color='green',  
        size=10,
        symbol='circle'
    ),
    name='Healthy'  
)

fig = go.Figure()
fig.add_trace(cluster0)
fig.add_trace(cluster1)

fig.update_layout(
    title='Healthy vs Unhealthy',
    scene=dict(
        xaxis=dict(title='8'),
        yaxis=dict(title='12'),
        zaxis=dict(title='20')
    ),
    showlegend=True,  
    width=800, 
    height=600
)

fig.show()