In [1]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
import os
import glob
from sklearn import metrics
pd.set_option('display.max_columns', None)

In [2]:
# load preprocessed data
file_path = 'C:/Users/svenj/Documents/Uni/Bachelorarbeit/Codes/data_all_reset.csv'
data_all_reset = pd.read_csv(file_path)
data_all_reset

FileNotFoundError: [Errno 2] No such file or directory: 'C:/Users/svenj/Documents/Uni/Bachelorarbeit/Codes/data_all_reset.csv'

## Remove unnessecary columns
Explanation of parameters see FlowCam Documentation.

In [None]:
columns_to_drop = ['Particle ID','Average Blue', 'Average Green', 'Average Red','Calibration Factor','Calibration Image','Camera', 'Capture X', 'Capture Y', 'Ch1 Area', 'Ch1 Peak', 'Ch1 Width', 'Ch2 Area', 'Ch2 Peak', 'Ch2 Width', 'Ch2/Ch1 Ratio', 'Date', 'Ratio Blue/Green','Ratio Red/Blue', 'Ratio Red/Green','Date','Elapsed Time', 'Filter Score', 'Image File', 'Original Reference ID', 'Scatter Area', 'Scatter Peak', 'Scatter Width', 'Source Image', 'Sphere Complement', 'Sphere Count', 'Sphere Unknown', 'Sphere Volume','Time', 'Timestamp', 'path']
data_wrong= data_all_reset.drop(columns=columns_to_drop)

In [None]:
print(data_wrong[data_wrong.Class == 'ANCR'].shape[0])
print(data_wrong[data_wrong.Class == 'NELU'].shape[0])
print(data_wrong[data_wrong.Class == 'TRSP'].shape[0])
print(data_wrong[data_wrong.Class == 'ANLO'].shape[0])
print(data_wrong[data_wrong.Class == 'TEMA'].shape[0])
print(data_wrong[data_wrong.Class == 'LUCU'].shape[0])

In [None]:
data_wrong.Class.unique()
data_wrong

In [None]:
# drop all samples with label WRONG and Debris
data = data_wrong.drop(data_wrong[data_wrong.Class == 'WRONG'].index, inplace=False)
data

In [None]:
print(data[data.Class == 'ANCR'].shape[0])
print(data[data.Class == 'NELU'].shape[0])
print(data[data.Class == 'TRSP'].shape[0])
print(data[data.Class == 'ANLO'].shape[0])
print(data[data.Class == 'TEMA'].shape[0])
print(data[data.Class == 'LUCU'].shape[0])
print(data[data.Class == 'DEBRIS'].shape[0])

In [None]:
data.Class.unique()

In [None]:
data.head()

In [None]:
data.info()

## Drop missing data

In [None]:
data.isnull().sum()

#### -> no missing values

## Handle categorical data

In [None]:
print(data.dtypes)

#### no categorical data apart from class label

# Train-Test split 

In [None]:
from sklearn import datasets
X = data.drop(columns=["Class"])

In [None]:
from sklearn.preprocessing import LabelEncoder
label_encoder = LabelEncoder()
y = label_encoder.fit_transform(data['Class'])
class_names = label_encoder.classes_.astype(str)

In [None]:
from sklearn.model_selection import train_test_split, cross_val_score

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.3, random_state=1, stratify=y)


In [None]:
from collections import defaultdict

# map encoded labels back to their original class names
label_to_class = {i: class_name for i, class_name in enumerate(class_names)}

train_class_counts = defaultdict(int)
test_class_counts = defaultdict(int)

for encoded_label in y_train:
    class_name = label_to_class[encoded_label]
    train_class_counts[class_name] += 1

for encoded_label in y_test:
    class_name = label_to_class[encoded_label]
    test_class_counts[class_name] += 1

print('Number of samples in training set:')
for class_name, count in train_class_counts.items():
    print(f"{class_name}: {count}")

print('\nNumber of samples in test set:')
for class_name, count in test_class_counts.items():
    print(f"{class_name}: {count}")

# Create, train and visualize the decision tree

In [None]:
from sklearn.tree import DecisionTreeClassifier
import matplotlib.pyplot as plt

#create the tree
tree = DecisionTreeClassifier(criterion='entropy', max_depth=20, random_state=1)

#fit the tree with training set
tree.fit(X_train, y_train)

# cross-validation
cross_val_score(tree, X_train, y_train, cv=7)


In [None]:
type(X_test)
X_test_orig = X_test

In [None]:
#test with test data
print(metrics.accuracy_score(y_test, tree.predict(X_test)))
print(metrics.classification_report(y_test, tree.predict(X_test)))

## Optimization on test sets

### Load selected features

In [None]:
import pickle
# Load the saved feature indices
with open('features/features_IMbalanced_DEBRIS.pkl', 'rb') as f:
    selected_features = pickle.load(f)

In [None]:
# drop columns that have low feature importances from training and validation sets
X_train_drop = X_train[selected_features]
X_test_drop = X_test[selected_features]

# train new tree with cut training set
cut_clf = DecisionTreeClassifier(criterion='entropy', max_depth=20, random_state=1)
cut_clf.fit(X_train_drop, y_train)

# validate with test set
print(metrics.accuracy_score(y_test, cut_clf.predict(X_test_drop)))

In [None]:
print(metrics.classification_report(y_train, cut_clf.predict(X_train_drop)))
print(metrics.classification_report(y_test, cut_clf.predict(X_test_drop)))

#### Check for overfitting

In [None]:
print('Accuracy on the training set:', metrics.accuracy_score(y_train, cut_clf.predict(X_train_drop)))
print('Accuracy on the test set:', metrics.accuracy_score(y_test, cut_clf.predict(X_test_drop)))

print('F1 Score on the training set (macro):', metrics.f1_score(y_train, cut_clf.predict(X_train_drop), average='macro'))
print('F1 Score on the test set (macro):', metrics.f1_score(y_test, cut_clf.predict(X_test_drop), average='macro'))

print('F1 Score on the training set (weighted):', metrics.f1_score(y_train, cut_clf.predict(X_train_drop), average='weighted'))
print('F1 Score on the test set (weighted):', metrics.f1_score(y_test, cut_clf.predict(X_test_drop), average='weighted'))


In [None]:
X_train = X_train_drop
X_test= X_test_drop
best_tree = cut_clf

### Load the trained tree from file

In [None]:
from joblib import dump, load
best_tree = load('./trees/tree_IMbalanced_DEBRIS.joblib')
best_tree.fit(X_train, y_train)

#### Check for overfitting again

In [None]:
print('Accuracy on the training set:', metrics.accuracy_score(y_train, best_tree.predict(X_train)))
print('Accuracy on the test set:', metrics.accuracy_score(y_test, best_tree.predict(X_test)))

print('F1 Score on the training set (macro):', metrics.f1_score(y_train, best_tree.predict(X_train), average='macro'))
print('F1 Score on the test set (macro):', metrics.f1_score(y_test, best_tree.predict(X_test), average='macro'))

print('F1 Score on the training set (weighted):', metrics.f1_score(y_train, best_tree.predict(X_train), average='weighted'))
print('F1 Score on the test set (weighted):', metrics.f1_score(y_test, best_tree.predict(X_test), average='weighted'))


# Visualisation

In [None]:
from sklearn.tree import plot_tree
import matplotlib.pyplot as plt

plt.figure(figsize=(100,30)) 
plot_tree(best_tree, filled=True, feature_names=X_train.columns, class_names=class_names, rounded = True, fontsize=7)
plt.title('Decision Tree "Imalanced / Debris"', fontsize=30, pad=20)
plt.savefig('DecisionTree_IMbalanced_DEBRIS.png',dpi=300)
plt.show()
#tree fit to training data

## Prediction on test set

In [None]:
# use model with test data
predictions = best_tree.predict(X_test)
print(metrics.classification_report(y_test, best_tree.predict(X_test)))

In [None]:
predictions_decoded = label_encoder.inverse_transform(predictions)
predictions_decoded

In [None]:
unique_values, counts = np.unique(predictions_decoded, return_counts=True)
for value, count in zip(unique_values, counts):
    print(f"{value}: {count}")

In [None]:
from sklearn.metrics import classification_report

# test difference between untuned and tuned tree for test data
print('Classification report for the original tree:')
print()
print(metrics.classification_report(y_test, tree.predict(X_test_orig)))
print('Classification report for the tuned tree:')
print()
print(metrics.classification_report(y_test, best_tree.predict(X_test)))

with open('classification_report_IMbalanced_DEBRIS.txt', 'w') as file:
    file.write(metrics.classification_report(y_test, best_tree.predict(X_test)))

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from sklearn.metrics import confusion_matrix
custom_font = "Century Schoolbook"  

# Calculate confusion matrix
conf_matrix = confusion_matrix(y_test, predictions)

row_sums = conf_matrix.sum(axis=1)

class_names = class_names.tolist() if isinstance(class_names, np.ndarray) else class_names
class_names_with_sum = class_names + ['Sum']

fig, ax = plt.subplots(figsize=(7, 6.2))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', cbar=False, 
            annot_kws={"fontsize": 12, "fontfamily": custom_font}, ax=ax, linewidths=.5)

plt.title('Confusion Matrix "Imbalanced / Debris"', fontsize=12, pad=20, fontfamily=custom_font)
plt.xlabel('Predicted Label', labelpad=20, fontsize=12, fontfamily=custom_font)
plt.ylabel('True Label', labelpad=20, fontsize=12, fontfamily=custom_font)

plt.xticks(ticks=np.arange(len(class_names_with_sum)) + 0.5, labels=class_names_with_sum, fontsize=10, fontfamily=custom_font, rotation=0)
plt.yticks(ticks=np.arange(len(class_names)) + 0.5, labels=class_names, rotation=0, fontsize=10, fontfamily=custom_font)

for i, sum_value in enumerate(row_sums):
    ax.text(len(class_names) + 0.5, i + 0.5, sum_value, ha='center', va='center', fontsize=12, fontfamily=custom_font, weight='bold', color='black')

plt.axvline(x=len(class_names), color='black', linewidth=2)

for tick in ax.get_xticklabels():
    if tick.get_text() == 'Sum':
        tick.set_weight('bold')

ax.tick_params(axis='y', which='both', length=0) 
ax.tick_params(axis='x', which='both', length=0)
plt.tight_layout() 

plt.savefig('confusion_matrix_imbalanced_DEBRIS.png', dpi=300, bbox_inches='tight')
plt.show()


In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix

# Calculate the confusion matrix
conf_matrix = confusion_matrix(y_test, predictions)

conf_matrix_percent = conf_matrix.astype(float) / conf_matrix.sum(axis=1)[:, np.newaxis]

sns.set_context('talk')

fig, ax = plt.subplots(figsize=(7,7))
sns.heatmap(conf_matrix_percent, annot=True, fmt='.2f', cmap='Blues', cbar=False, annot_kws={"fontsize": 10}, ax=ax)

plt.title('Confusion Matrix "Imbalanced / Debris": \n'
    'Proportion of True Labels Assigned to Predicted Labels', fontsize=12, pad=20)
plt.xlabel('Predicted Label', labelpad=20, fontsize=12)
plt.ylabel('True Label', labelpad=20, fontsize=12)

plt.xticks(ticks=np.arange(len(class_names)) + 0.5, labels=class_names, fontsize=10)
plt.yticks(ticks=np.arange(len(class_names)) + 0.5, labels=class_names, rotation=0, fontsize=10)

ax.tick_params(axis='y', which='both', length=0) 
ax.tick_params(axis='x', which='both', length=0)
plt.tight_layout() 

plt.savefig('confusion_matrix_proportions_imbalanced_DEBRIS.png', dpi=300, bbox_inches='tight')
plt.show()

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
import numpy as np

# Calculate the confusion matrix
conf_matrix = confusion_matrix(y_test, predictions)

conf_matrix_column_norm = conf_matrix.astype(float) / conf_matrix.sum(axis=0)

sns.set_context('talk')

fig, ax = plt.subplots(figsize=(7,7))
sns.heatmap(conf_matrix_column_norm, annot=True, fmt='.2f', cmap='Blues', cbar=False, annot_kws={"fontsize": 10}, ax=ax)

plt.title('Confusion Matrix "Imbalanced / Debris": \n'
    'Proportion of Predicted Labels Assigned to True Labels', fontsize=12, pad=20)
plt.xlabel('Predicted Label', labelpad=20, fontsize=12)
plt.ylabel('True Label', labelpad=20, fontsize=12)

plt.xticks(ticks=np.arange(len(class_names)) + 0.5, labels=class_names, fontsize=10)
plt.yticks(ticks=np.arange(len(class_names)) + 0.5, labels=class_names, rotation=0, fontsize=10)

ax.tick_params(axis='y', which='both', length=0)  
ax.tick_params(axis='x', which='both', length=0)
plt.tight_layout() 

plt.savefig('confusion_matrix_column_normalized_imbalanced_DEBRIS.png', dpi=300, bbox_inches='tight')
plt.show()

In [None]:
predictions_decoded = label_encoder.inverse_transform(predictions)

data_results = data_all_reset.loc[X_test.index].copy() 

data_results = data_results.rename(columns={'Class': 'True Label'})
data_results = data_results[['path','Particle ID','True Label']]

data_results.loc[:, 'Predictions'] = predictions_decoded

In [None]:
pd.set_option('display.max_colwidth', None) 
data_results
data_results.to_csv('C:/Users/svenj/Documents/Uni/Bachelorarbeit/Codes/tree_IMbalanced_DEBRIS_results.csv', index=False)

In [None]:
class_labels = ['DEBRIS', 'ANCR', 'TEMA', 'NELU', 'LUCU', 'TRSP']

split_dfs = {}

In [None]:
import os
import shutil
import pandas as pd

# Load the CSV file
file_path = 'C:/Users/svenj/Documents/Uni/Bachelorarbeit/Codes/tree_IMbalanced_DEBRIS_results.csv'
data_results = pd.read_csv(file_path)

base_source_folder = 'C:/Users/svenj/Documents/Uni/Bachelorarbeit/Codes/Daten'

base_destination_folder = 'C:/Users/svenj/Documents/Uni/Bachelorarbeit/Codes/mismatched_IMbalanced_DEBRIS'

if not os.path.exists(base_destination_folder):
    os.makedirs(base_destination_folder)

def generate_image_path(row):
    path = row['path'].replace('\\', '/') 
    particle_id = row['Particle ID'] - 1  

    images_folder = 'images'
    
    folder, filename = os.path.split(path)
    
    name, ext = os.path.splitext(filename)
    
    new_filename = f"{name}_{particle_id:05d}.png"
    
    new_path = os.path.join(folder, images_folder, new_filename)
    
    return new_path

def copy_image(row, destination_folder):
    src_image_path = generate_image_path(row)
    
    if not os.path.isfile(src_image_path):
        print(f"File not found: {src_image_path}")
        return
    
    true_label = row['True Label']
    pred_label = row['Predictions']
    
    if true_label == pred_label:
        return
    
    dest_folder_name = f"true_{true_label}_pred_{pred_label}"
    dest_folder = os.path.join(destination_folder, dest_folder_name)
    if not os.path.exists(dest_folder):
        os.makedirs(dest_folder)
    
    dest_image_path = os.path.join(dest_folder, os.path.basename(src_image_path))
    
    shutil.copy2(src_image_path, dest_image_path)
    
missing_files = 0
for index, row in data_results.iterrows():
    new_image_path = generate_image_path(row)
    
    try:
        copy_image(row, base_destination_folder)
    except Exception as e:
        print(f"Error processing row {index}: {e}")
        missing_files += 1

print(f"Total missing files: {missing_files}")