# Random Forest Classification with Capacitive Sensing
### Created By Alexandra Gillespie
### Last updated 2/27/24


## Load imports

In [1]:
import numpy as np
import os
import pandas as pd
from IPython.display import display
from sklearn.model_selection import StratifiedKFold
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
import matplotlib.pyplot as plt 
import plotly.graph_objects as go
from plotly.subplots import make_subplots

In [22]:
# TODO: specify containers and contents
containers = ['pp', 'glass', 'paper', 'foam', 'ceramic', 'silicon', 'wood', 'pcg', 'pet']
contents = ['water', 'oil', 'honey', 'sugar', 'starch', 'vinegar', 'oats', 'rice', 'lentils']
# TODO put the directory that houses the data's directories
current_dir = '/Users/alexandragillespie/Desktop/Clean Code'

## Data Preprocessing

In [33]:
# Function for data preprocessing
def data_processing(directory, num_its):
    '''
    parameters
    directory: name of the directory housing the data
    num_its: the total number of iterations of each container/content combination

    returns
    X_final: array of all sensor data
    y_content: the labels for contents
    y_container: the labels for containers
    y_overall: the label for the container/content combination
    ids_contents: the dictionary identifying the label for each content
    ids_container: the dictionary identifying the label for each container
    ids: the dictionary identifying the label for each content/container combination
    '''
    # goes into that directory, pulls those files
    dir_name = current_dir+'/'+directory
    location = os.chdir(dir_name)
    read_files = os.listdir(location)

    # houses which ID (number, starting at 0) indicates which container/content combination 
    ids = {} 
    ids_container = {}
    ids_contents = {}

    # the headers according to how the data was collected
    headers = ['container', 'content', 'labels', 'group', 'group_it', 'iteration', 'observation', 'python time', 'Teensy time sensor 1', 'Teensy time for sensor 2', 'sensor 1','sensor 2','sensor 3', 'sensor 4','sensor 5', 'sensor 6', 'sensor 7', 'sensor 8', 'sensor 9', 'sensor 10']

    # labels
    labels = 0
    labels_container = 0
    labels_content = 0

    # goes through each file in the directory
    for filename in read_files:
        # determines the container and content we're using based on the name of the file (ex: honey_plastic.pkl has 'honey' and 'plastic' in the title)
        id_nameing = filename
        id_nameing = filename.split('.')[0]
        id_name = id_nameing.split('_')[:2]

        # if that combination is a file we have listed in contents and containers (i.e. if the content is in contents and the container is in containers)
        if id_name[0] in contents and id_name[1] in containers:

            # unpickles the file, reads as a numpy array
            unpickled = pd.read_pickle(filename)

            # updates dictionaries to include the id and what it represents (ex: 'water\nglass': 0 or 'water': 0)
            ids['\n'.join(id_name)] = labels 
            if id_name[1] not in ids_container:
                labels_container += 1
                ids_container[id_name[1]] = labels_container
            if id_name[0] not in ids_contents:
                labels_content +=1
                ids_contents[id_name[0]]= labels

            this_container_label = ids_container[id_name[1]]
            this_content_label =  ids_contents[id_name[0]]

            # creates a dataframe, converts the appropriate columns to numbers
            reading = pd.DataFrame(unpickled, columns = headers)
            reading[reading.columns[3:]] = reading[reading.columns[3:]].apply(pd.to_numeric) # converts necessary values to numeric rather than strings (as they were read in)

            # goes through each iteration
            for i in range(num_its):
                # creates a new dataframe with just an iteration
                new_df = reading[reading['iteration'] == i]
                # takes just the sensor values of that iteration
                just_sensors_at_it = np.array(new_df[['sensor 1','sensor 2','sensor 3', 'sensor 4','sensor 5', 'sensor 6', 'sensor 7', 'sensor 8', 'sensor 9', 'sensor 10']].copy())

                # create datasets for each sensor to later use for first order gradient calculations
                if i == 0:
                    sensor_1_array = np.array(new_df[['sensor 1']].copy())
                    sensor_2_array = np.array(new_df[['sensor 2']].copy())
                    sensor_3_array = np.array(new_df[['sensor 3']].copy())
                    sensor_4_array = np.array(new_df[['sensor 4']].copy())
                    sensor_5_array = np.array(new_df[['sensor 5']].copy())
                    sensor_6_array = np.array(new_df[['sensor 6']].copy())
                    sensor_7_array = np.array(new_df[['sensor 7']].copy())
                    sensor_8_array = np.array(new_df[['sensor 8']].copy())
                    sensor_9_array = np.array(new_df[['sensor 9']].copy())
                    sensor_10_array = np.array(new_df[['sensor 10']].copy())
                else:
                    sensor_1_array = np.hstack((sensor_1_array, np.array(new_df[['sensor 1']].copy())))
                    sensor_2_array = np.hstack((sensor_2_array, np.array(new_df[['sensor 2']].copy())))
                    sensor_3_array = np.hstack((sensor_3_array, np.array(new_df[['sensor 3']].copy())))
                    sensor_4_array = np.hstack((sensor_4_array, np.array(new_df[['sensor 4']].copy())))
                    sensor_5_array = np.hstack((sensor_5_array, np.array(new_df[['sensor 5']].copy())))
                    sensor_6_array = np.hstack((sensor_6_array, np.array(new_df[['sensor 6']].copy())))
                    sensor_7_array = np.hstack((sensor_7_array, np.array(new_df[['sensor 7']].copy())))
                    sensor_8_array = np.hstack((sensor_8_array, np.array(new_df[['sensor 8']].copy())))
                    sensor_9_array = np.hstack((sensor_9_array, np.array(new_df[['sensor 9']].copy())))
                    sensor_10_array = np.hstack((sensor_10_array, np.array(new_df[['sensor 10']].copy())))


                # windowing the data such that each 10 sensor values lie end to end (flattened)
                first_through_row = True
                for row in just_sensors_at_it:
                    for m in range(len(row)):
                        ind_val = row[m]
                        # if a value is below 2000, replace it with either the preceding value or the next value (handles outliers)
                        if ind_val > 2000:
                            if prev_val!=0:
                                row[m] = prev_val
                            else: 
                                row[m] = row[m+1]
                        else:
                            prev_val = row[m]
                    if first_through_row:
                        new_row = np.array(row)
                        first_through_row = False
                    else:
                        new_row = np.hstack((new_row, np.array(row)))
                if i == 0:
                    new_array = np.array(new_row)
                else: 
                    new_array = np.vstack((new_array, new_row))

            # concatenate the gradients of each sensor
            final_grad_array = np.hstack((np.gradient(sensor_1_array.T, axis = 1), np.gradient(sensor_2_array.T, axis = 1)))
            final_grad_array = np.hstack((final_grad_array, np.gradient(sensor_3_array.T, axis = 1)))
            final_grad_array = np.hstack((final_grad_array, np.gradient(sensor_4_array.T, axis = 1)))
            final_grad_array = np.hstack((final_grad_array, np.gradient(sensor_5_array.T, axis = 1)))
            final_grad_array = np.hstack((final_grad_array, np.gradient(sensor_6_array.T, axis = 1)))
            final_grad_array = np.hstack((final_grad_array, np.gradient(sensor_7_array.T, axis = 1)))
            final_grad_array = np.hstack((final_grad_array, np.gradient(sensor_8_array.T, axis = 1)))
            final_grad_array = np.hstack((final_grad_array, np.gradient(sensor_9_array.T, axis = 1)))
            final_grad_array = np.hstack((final_grad_array, np.gradient(sensor_10_array.T, axis = 1)))

            new_array = np.hstack((new_array, final_grad_array))

            # creates numpy arrays for the labels
            if labels == 0:
                X_final = new_array
                y_overall = np.full((new_array.shape[0],1), labels)
                y_container = np.full((new_array.shape[0],1), this_container_label)
                y_content = np.full((new_array.shape[0],1), this_content_label)
            else: 
                X_final = np.vstack((X_final, new_array))
                y_overall = np.vstack((y_overall, np.full((new_array.shape[0],1), labels)))
                y_container = np.vstack((y_container, np.full((new_array.shape[0],1), this_container_label)))
                y_content = np.vstack((y_content, np.full((new_array.shape[0],1), this_content_label)))
            labels +=1
            

    return X_final, y_content, y_container, y_overall, ids_contents, ids_container, ids


## Create aggregate dataset

In [34]:
# if multiple datasets were used to create an aggregate dataset, use this function to combine them
def create_aggregate_dataset(datasets, iterations):
    '''
    parameters
    datasets: the names of the datasets you intend to combine
    iterations: the number of iterations used for each dataset

    returns
    X_data: total dataset
    y_content: total list of contents labels
    y_container: total list of contents labels
    y_total: total list of total labels
    ids_contents: the dictionary identifying the label for each content
    ids_container: the dictionary identifying the label for each container
    ids: the dictionary identifying the label for each content/container combination
    '''
    first_through = True
    for dataset in datasets:
        X_raw, y_content_individual, y_container_individual, y_overall_individual, ids_contents, ids_container, ids = data_processing(dataset, iterations)
        if first_through:
            X_data = X_raw
            y_content = y_content_individual
            y_container = y_container_individual
            y_total = y_overall_individual
            first_through = False
        else:
            X_data = np.vstack((X_data, X_raw))
            y_content = np.vstack((y_content, y_content_individual))
            y_container = np.vstack((y_container, y_container_individual))
            y_total = np.vstack((y_total, y_overall_individual))
    return X_data, y_content, y_container, y_total, ids_contents, ids_container, ids

## Run dataset preprocessing

In [35]:
num_its = 10
datasets = ['data_collection_1', 'data_collection_3', 'data_collection_4']
X_training, y_content_training, y_container_training, y_overall_training, ids_contents, ids_container, ids = create_aggregate_dataset(datasets, num_its)
print("Training data shape: ", np.shape(X_training))

X_testing, y_content_testing, y_container_testing, y_overall_testing, _, _, _ = data_processing('data_collection_2', num_its)
print("Testing data shape: ", np.shape(X_testing))

Training data shape:  (2430, 4000)
Testing data shape:  (810, 4000)


## Model

In [36]:
# define train test split
n_splits = 10 # number of splits (this is 90/10 for train/validation)
skf = StratifiedKFold(n_splits=n_splits, random_state = 10, shuffle = True) #note if random state = False, there will be randomness each time you run this which may lead to slightly different results for the same model architecture

# this doesn't require any edits on your part
all_accuracies = []
all_outside_accuracies = []
for i, (train_index, test_index) in enumerate(skf.split(X_training, y_overall_training)):
    
    print(f"Fold {i}:")
    print(f"  Train: index={train_index}")
    print(f"  Test:  index={test_index}")

    X_train, X_test = X_training[train_index], X_training[test_index]
    y_train, y_test = y_overall_training[train_index], y_overall_training[test_index]

    # scale X_train and X_test
    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)

    # train model
    model = RandomForestClassifier(random_state = 10)
    model.fit(X_train, y_train.ravel())

    # evaluate model on test set and generate accuracy score for held out data
    accuracy = model.score(X_test, y_test.ravel())
    print(f"  Accuracy: {accuracy}")
    print()
    all_accuracies.append(accuracy)

    # outside accuracy 
    X_test2 = scaler.transform(X_testing)
    y_test2 = y_overall_testing

    # evaluate model and generate accuracy score for outside test data
    accuracy = model.score(X_test2, y_test2)
    all_outside_accuracies.append(accuracy)
    print(f"Outside test accuracy: {accuracy}")
    print()


# print out average accuracy across all folds
print(f"Average Test Accuracy: {np.mean(all_accuracies)}")
print(f"Average Outside Accuracy: {np.mean(all_outside_accuracies)}")

# THIS DOES THE SAME AS ABOVE BUT TREATS ALL OF THE TRAIN DATASET AS TRAIN AND ONLY THE TEST DATASET AS TEST
# train final model using all of first dataset 
X_train = scaler.fit_transform(X_training)
X_test = scaler.transform(X_testing)

y_train = y_overall_training
y_test = y_overall_testing

model = RandomForestClassifier(random_state = 10)
model.fit(X_train, y_train.ravel())
preds = model.predict(X_test)

# evaluate model on test set and generate accuracy score
accuracy = model.score(X_test, y_test)

Fold 0:
  Train: index=[   0    1    2 ... 2427 2428 2429]
  Test:  index=[   3    9   29   51   56   84   89   91   98  101  117  121  142  156
  161  175  176  184  196  216  231  244  268  284  301  305  315  319
  330  332  345  357  369  373  375  403  426  429  431  435  447  472
  479  485  494  499  502  507  520  532  539  548  565  581  594  622
  632  643  654  670  678  705  707  717  718  722  751  759  772  780
  802  803  824  825  835  843  844  850  859  864  871  884  885  904
  924  929  930  941  946  951  993 1008 1015 1018 1024 1031 1033 1062
 1078 1081 1087 1092 1103 1109 1124 1136 1139 1140 1166 1178 1180 1204
 1218 1220 1225 1262 1263 1268 1270 1271 1285 1292 1298 1306 1347 1351
 1360 1367 1374 1384 1385 1414 1421 1434 1446 1449 1458 1471 1473 1497
 1499 1531 1547 1557 1579 1585 1598 1601 1606 1617 1626 1633 1643 1655
 1666 1682 1683 1697 1703 1721 1724 1747 1753 1767 1770 1777 1782 1783
 1798 1802 1818 1821 1830 1842 1850 1857 1860 1864 1872 1873 1882 1894
 19

## Confusion Matrix Generation

In [37]:
cm = confusion_matrix(y_test, preds)
df_cm = pd.DataFrame(cm)
ids_list = []
for key in ids:
    ids_list.append(key.split("\n")[0]+' '+key.split("\n")[1])
df_cm.insert(0, " ", ids_list, True)
new_list = list(" ")+ids_list
df_cm.columns = new_list

In [41]:
# Generate confusion matrices
ids_by_num = {v: k for k, v in ids.items()}
container_preds = []
content_preds = []
for pred in preds:
    combo = ids_by_num[pred]
    content = combo.split("\n")[0]
    container = combo.split("\n")[1]
    container_preds.append(ids_container[container])
    content_preds.append(ids_contents[content])

container_ys = []
content_ys = []
for ys in y_test:
    combo = ids_by_num[ys[0]]
    content = combo.split("\n")[0]
    container = combo.split("\n")[1]
    container_ys.append([ids_container[container]])
    content_ys.append([ids_contents[content]])
    

In [42]:
# create the confusion matrices with aesthetically apealing characteristics 
def produce_cm(scaled_cm, title, color_bar):
    if 'Container' in title:
        labels = ['PP', 'Glass', 'Paper', 'Foam', 'Ceramic', 'Silicon', 'Wood', 'PC', 'PET']
        inverse_labels = labels[::-1]
    else:
        labels = ['Water', 'Oil', 'Honey', 'Sugar', 'Starch', 'Vinegar', 'Oats', 'Rice', 'Lentils']
        inverse_labels = labels[::-1]
    # Create a Plotly heatmap for the confusion matrix
    if color_bar == True:
        fig = go.Figure(data=go.Heatmap(
            z=scaled_cm,
            x=labels,
            y=inverse_labels,
            colorscale='tempo',
            colorbar=dict()
        ))
        # Add labels to each cell
        for i in range(len(scaled_cm)):
            for j in range(len(scaled_cm)):
                fig.add_annotation(
                    dict(
                        x= labels[j],
                        y= inverse_labels[i],
                        text=str(int(scaled_cm[i, j])),
                        showarrow=False,
                        font=dict(color='white' if scaled_cm[i, j] > (scaled_cm.max() / 2) else 'black')
                    )
                )

        # Update layout for better visibility
        fig.update_layout(
            title=dict(
                text= "<b>"+title+"<b>",
                font=dict(size=25, color='black', family='Arial, sans-serif')),
            title_x=0.5,
            xaxis_title='Predicted Label',
            yaxis_title='True Label',
            xaxis=dict(tickmode='array', tickfont=dict(size=16, color = 'black'), title=dict(font=dict(size=18, color = 'black'))), 
            yaxis=dict(tickmode='array', tickfont=dict(size=16, color = 'black'), title=dict(font=dict(size=18, color = 'black'))),  
            width=600,
            height=560,
            margin=dict(l=0, r=0, b=0, t=50)
        )

        fig.show()
    else:

        fig = go.Figure(data=go.Heatmap(
            z=scaled_cm,
            x=labels,
            y=inverse_labels,
            colorscale='tempo',
            showscale=False  
        ))

        for i in range(len(scaled_cm)):
            for j in range(len(scaled_cm)):
                fig.add_annotation(
                    dict(
                        x= labels[j],
                        y= inverse_labels[i],
                        text=str(int(scaled_cm[i, j])),
                        showarrow=False,
                        font=dict(color='white' if scaled_cm[i, j] > (scaled_cm.max() / 2) else 'black')
                    )
                )

        fig.update_layout(
            title=dict(
                text= "<b>"+title+"<b>",
                font=dict(size=25, color='black', family='Arial, sans-serif')),
            title_x=0.5,
            xaxis_title='Predicted Label',
            yaxis_title='True Label',
            xaxis=dict(tickmode='array', tickfont=dict(size=16, color = 'black'), title=dict(font=dict(size=18, color = 'black'))), 
            yaxis=dict(tickmode='array', tickfont=dict(size=16, color = 'black'), title=dict(font=dict(size=18, color = 'black'))),  
            width=590,
            height=600,
            margin=dict(l=0, r=10, b=0, t=50)
        )

        # Show the plot
        fig.show()
    

In [43]:
cm_container = confusion_matrix(container_ys, container_preds)
scaled_container_cm = np.ceil(cm_container/len(containers)*10)
produce_cm(scaled_container_cm[::-1], 'Container Accuracies', False)

cm_content = confusion_matrix(content_ys, content_preds)
scaled_content_cm = np.ceil(cm_content/len(contents)*10)
produce_cm(scaled_content_cm[::-1], 'Substance Accuracies', True)

## Deriving Container and Content accuracies 

In [20]:
accuracy_total_container = 0
for i in range(len(container_preds)):
    if container_preds[i] == container_ys[i][0]:
        accuracy_total_container +=1
accuracy_containers = accuracy_total_container/len(container_preds)
print('container accuracy:', accuracy_containers)


accuracy_total_contents = 0
for i in range(len(content_preds)):
    if content_preds[i] == content_ys[i][0]:
        accuracy_total_contents +=1
accuracy_contents = accuracy_total_contents/len(content_preds)
print('container accuracy:', accuracy_contents)

container accuracy: 0.9777777777777777
container accuracy: 0.8493827160493828
