In [4]:
# Importing the Libraries
import pickle
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

import scipy.io.wavfile as wavfile
import scipy.fftpack as fftpk
from matplotlib import pyplot as plt

import sounddevice as sd
import soundfile as sf
import librosa

from IPython.display import Image
import ipywidgets as widgets
from IPython.display import display

In [3]:
# Reading the csv file
data = pd.read_csv("training.csv")

# Plotting the each feature's value against RMS
# for i in range(5):
#     for j in range(5):
#         if i != j:
#             x1= data[data["FailureID"]==1.0].iloc[:,i]
#             y1= data[data["FailureID"]==1.0].iloc[:,j]
            
#             x2= data[data["FailureID"]==2.0].iloc[:,i]
#             y2= data[data["FailureID"]==2.0].iloc[:,j]
            
#             x3= data[data["FailureID"]==3.0].iloc[:,i]
#             y3= data[data["FailureID"]==3.0].iloc[:,j]
            
#             x4= data[data["FailureID"]==4.0].iloc[:,i]
#             y4= data[data["FailureID"]==4.0].iloc[:,j]
            
#             plt.scatter(x1,y1)
#             plt.scatter(x2,y2)
            
#             plt.scatter(x3,y3)
#             plt.scatter(x4,y4)
#             plt.show()

In [7]:
# Selecting the predictors and target
predictors= ["RMS","Mean","MA1","MA2","MA3","F1","F2","F3"]
target= "FailureID"

# This function gives the predictions to a given test set
def test_predict(train,test,predictors,target):
    
    # Training the ML model with random forest algorithm
    rf = RandomForestClassifier(min_samples_split=10,random_state=1)
    
    # Predicting the faultID
    rf.fit(train[predictors],train[target])
    preds = rf.predict(test)
    return preds


# Creating initial null arrays to store...
tr =[]                                    # positions of train set
te =[]                                    # positions of test set
all_preds=[]                              # predictions

# Selecting the positions of train and test sets
for i in range(data.shape[0]):
    # Choosing the all multiples of 5 as test set positions and ,the rest as train set positions
    if i%5!= 0 :
        tr.append(i)
    else:
        te.append(i)
        
# Spliting the data
train = data.iloc[tr]
test = data.iloc[te][predictors]

# Making the predictions
all_preds.append(test_predict(train,test,predictors,target))
preds = np.concatenate(all_preds)

# Calculating the accuracy score
accuracy = accuracy_score(data.iloc[te][target],preds)


In [5]:
# predictions for test set
pre = pd.DataFrame(preds)
pre = pre.set_index(data.iloc[te].index)
pre.columns = ["predicton"]

# Actual target values of the test set
actual = data.iloc[te][target]
test_results = pd.concat([actual,pre],axis=1)
test_results

Unnamed: 0,FailureID,predicton
0,1.0,1.0
5,1.0,1.0
10,1.0,1.0
15,1.0,1.0
20,1.0,1.0
...,...,...
375,4.0,4.0
380,4.0,4.0
385,4.0,4.0
390,4.0,4.0


In [8]:
# accuracy of the predictions
accuracy

1.0

In [9]:
# This fuction records the audio and gives predictions for the audio
def predict():

    # Recording the audio
    samplerate = 48000  
    duration = 5 # seconds
    audio = 'test.wav'
    mydata = sd.rec(int(samplerate * duration), samplerate=samplerate, channels=1, blocking=True)
    sd.wait()
    sf.write(audio, mydata, samplerate)

    # Reading the audio
    s_rate, signal = wavfile.read(audio)

    # Extracting the features
    mean = abs(signal).mean()
    rms = np.sqrt(abs(signal**2).mean())
    FFT_ = abs(fftpk.fft(signal))
    FFT = FFT_[range(len(FFT_)//2)]
    freqs = fftpk.fftfreq(len(FFT), 1.0/s_rate)[range(len(FFT_)//2)]
    sorted = np.sort(FFT)[::-1]
    ma1, ma2 , ma3 = sorted[[0,1,2]]
    max_amp_index = [np.where(FFT == sorted[0])[0], np.where(FFT == sorted[1])[0], np.where(FFT == sorted[2])[0]]
    f1, f2, f3 = np.concatenate(freqs[max_amp_index])

    # Storing the features in a list
    features = [rms,mean,ma1,ma2,ma3,f1,f2,f3]

    # Creating a dataframe
    input_test = pd.DataFrame([features])
    input_test.columns = ["RMS","Mean","MA1","MA2","MA3","F1","F2","F3"]

    # Making the predictions
    new_preds = test_predict(train,input_test,predictors,target)
    if new_preds==[1]:
        print("\033[1mMachine ID: Off condition with noice\033[0m")
        display(Image(filename="off.png", width=200))
    elif new_preds==[2]:
        print("\033[1mMachine ID: Healthy condition\033[0m")
        display(Image(filename="good_.png", width=200))
    elif new_preds==[3]:
        print("\033[1mMachine ID: Bearing Fault\033[0m")
        display(Image(filename="fault1_.png", width=200))
    else:
        print("\033[1mMachine ID: Fan Fault\033[0m")
        display(Image(filename="fault2.png", width=200))

In [10]:
import threading
import time

# Creating the recored button
record_button = widgets.Button(description='Record',disabled=False,button_style='success',tooltip='Record',icon='microphone')

# Creating the stop button
stop_button = widgets.Button(description='Stop',disabled=False,button_style='warning',tooltip='Stop',icon='stop')

# Creating the output widget
output = widgets.Output()

is_rec = True

# This function record the data and show the preictions
def start_recording(data):
    global is_rec
    while is_rec:
        with output:
            output.clear_output()
            predict()
            time.sleep(5)

def stop_recording(data):
    global is_rec
    is_rec = False

        

# Recording the audio by clicking the record button    
def record():
    record_button.on_click(start_recording)

# stopping the audio by clicking the stop button
def stop():
    stop_button.on_click(stop_recording)
    
x= threading.Thread(target=record)
y= threading.Thread(target=stop)

x.start()
y.start()

x.join()
y.join()

# Displaying the output and buttons

display(record_button, stop_button ,output)

Button(button_style='success', description='Record', icon='microphone', style=ButtonStyle(), tooltip='Record')



Output()