In [None]:
# IMPORTANT: SOME KAGGLE DATA SOURCES ARE PRIVATE
# RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES.
import kagglehub
kagglehub.login()


In [None]:
# IMPORTANT: RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES,
# THEN FEEL FREE TO DELETE THIS CELL.
# NOTE: THIS NOTEBOOK ENVIRONMENT DIFFERS FROM KAGGLE'S PYTHON
# ENVIRONMENT SO THERE MAY BE MISSING LIBRARIES USED BY YOUR
# NOTEBOOK.

state_farm_distracted_driver_detection_path = kagglehub.competition_download('state-farm-distracted-driver-detection')

print('Data source import complete.')


In [None]:
import numpy as np
import matplotlib.pyplot as plt
import cv2
import itertools
import pandas as pd
import glob
import pickle
import os
import time
from keras.models import Sequential, save_model
from keras.layers import Conv2D, MaxPooling2D, Flatten, Dense
from keras.utils import to_categorical
from sklearn.model_selection import train_test_split
from sklearn.utils import check_random_state
from tqdm import tqdm

In [None]:
main_path = '/kaggle/input/state-farm-distracted-driver-detection/imgs/train/c'
class_labels = []
images = []

# Melakukan iterasi untuk setiap kelas
for class_index in range(10):
    class_path = main_path + str(class_index)  # Path ke direktori kelas saat ini
    for root, dirs, files in os.walk(class_path):
        # Memproses file-file pada kelas saat ini
        for filename in tqdm(files, desc='Memproses kelas ' + str(class_index)):
            image_path = os.path.join(class_path, filename)
            img = cv2.imread(image_path)
            img = cv2.resize(img, (100, 100)) / 255
            images.append(img)
            class_labels.append(class_index)

In [None]:
train_images, test_images, train_labels, test_labels = train_test_split(np.array(images), np.array(class_labels), test_size = 0.2, shuffle=True)

In [None]:
# Define the custom CNN architecture
cnn = Sequential()
cnn.add(Conv2D(32, (3, 3), activation='relu', input_shape=(100, 100, 3)))
cnn.add(MaxPooling2D((2, 2)))
cnn.add(Conv2D(64, (3, 3), activation='relu'))
cnn.add(MaxPooling2D((2, 2)))
cnn.add(Conv2D(128, (3, 3), activation='relu'))
cnn.add(MaxPooling2D((2, 2)))
cnn.add(Flatten())
cnn.add(Dense(256, activation='relu'))
cnn.add(Dense(128, activation='relu'))
cnn.add(Dense(10, activation='softmax'))

# Compile the model
cnn.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

In [None]:
cnn.summary()

In [None]:
# Encode the target labels as one-hot vectors
train_labels_encoded = to_categorical(train_labels, num_classes=10)
test_labels_encoded = to_categorical(test_labels, num_classes=10)

cnn.fit(train_images, train_labels_encoded, epochs=10, batch_size=32, validation_data=(test_images, test_labels_encoded))

In [None]:
# Remove the last layer from the CNN model
cnn = Sequential(cnn.layers[:-1])

# Preventing the weights from being updated
for layer in cnn.layers:
    layer.trainable = False

In [None]:
cnn.summary()

In [None]:
train_features = cnn.predict(train_images)
test_features = cnn.predict(test_images)

In [None]:
print(train_features.shape)
print(test_features.shape)

In [None]:
class PCA:
    def __init__(self, n_components):
        self.n_components = n_components
        self.components = None
        self.mean = None

    def fit(self, X):
        # Calculate the mean of each feature
        self.mean = np.mean(X, axis=0)

        # Center the data by subtracting the mean from each feature
        X = X - self.mean

        # Calculate the covariance matrix
        cov = np.cov(X.T)

        # Calculate the eigenvalues and eigenvectors of the covariance matrix
        eigenvalues, eigenvectors = np.linalg.eig(cov)

        # Sort the eigenvectors by their corresponding eigenvalues in descending order
        eigenvectors = eigenvectors.T
        idxs = np.argsort(eigenvalues)[::-1]
        eigenvectors = eigenvectors[idxs]
        eigenvalues = eigenvalues[idxs]

        # Store the first n_components eigenvectors as the components
        self.components = eigenvectors[0:self.n_components]

    def transform(self, X):
        # Center the data by subtracting the mean from each feature
        X = X - self.mean

        # Project the data onto the components
        return np.dot(X, self.components.T)

    def fit_transform(self, X):
        # Calculate the mean of each feature
        self.mean = np.mean(X, axis=0)

        # Center the data by subtracting the mean from each feature
        X = X - self.mean

        # Calculate the covariance matrix
        cov = np.cov(X.T)

        # Calculate the eigenvalues and eigenvectors of the covariance matrix
        eigenvalues, eigenvectors = np.linalg.eig(cov)

        # Sort the eigenvectors by their corresponding eigenvalues in descending order
        eigenvectors = eigenvectors.T
        idxs = np.argsort(eigenvalues)[::-1]
        eigenvectors = eigenvectors[idxs]
        eigenvalues = eigenvalues[idxs]

        # Store the first n_components eigenvectors as the components
        self.components = eigenvectors[0:self.n_components]

        # Project the data onto the components
        return np.dot(X, self.components.T)

In [None]:
# Create a PCA object with 16 components
pca = PCA(n_components=16)
train_features_reduced = pca.fit_transform(train_features)
test_features_reduced = pca.transform(test_features)

In [None]:
print(train_features_reduced.shape)
print(test_features_reduced.shape)

In [None]:
train_features_reduced_save = pd.DataFrame(train_features_reduced)
train_features_reduced_save.to_csv('train_features_reduced_save.csv', index=False)

In [None]:
test_features_reduced_save = pd.DataFrame(test_features_reduced)
test_features_reduced_save.to_csv('test_features_reduced_save.csv', index=False)

In [None]:
train_tar_save = pd.DataFrame(train_labels)
train_tar_save.to_csv('train_tar.csv', index=False)

In [None]:
test_tar_save = pd.DataFrame(test_labels)
test_tar_save.to_csv('test_tar.csv', index=False)

In [None]:
print(train_features_reduced[0])

In [None]:
print(test_features_reduced[0])

In [None]:
class SVM:
    def __init__(self, C=1, max_iter=50, tol=0.05,
                 random_state=None, verbose=0):
        self.C = C
        self.max_iter = max_iter
        self.tol = tol,
        self.random_state = random_state
        self.verbose = verbose

    def projection_simplex(self, v, z=1):
        n_features = v.shape[0]
        u = np.sort(v)[::-1]
        cssv = np.cumsum(u) - z
        ind = np.arange(n_features) + 1
        cond = u - cssv / ind > 0
        rho = ind[cond][-1]
        theta = cssv[cond][-1] / float(rho)
        w = np.maximum(v - theta, 0)
        return w

    def _partial_gradient(self, X, y, i):
        # Partial gradient for the ith sample.
        g = np.dot(X[i], self.coef_.T) + 1
        g[y[i]] -= 1
        return g

    def _violation(self, g, y, i):
        # Optimality violation for the ith sample.
        smallest = np.inf
        for k in range(g.shape[0]):
            if k == y[i] and self.dual_coef_[k, i] >= self.C:
                continue
            elif k != y[i] and self.dual_coef_[k, i] >= 0:
                continue

            smallest = min(smallest, g[k])

        return g.max() - smallest

    def _solve_subproblem(self, g, y, norms, i):
        # Prepare inputs to the projection.
        Ci = np.zeros(g.shape[0])
        Ci[y[i]] = self.C
        beta_hat = norms[i] * (Ci - self.dual_coef_[:, i]) + g / norms[i]
        z = self.C * norms[i]

        # Compute projection onto the simplex.
        beta = self.projection_simplex(beta_hat, z)

        return Ci - self.dual_coef_[:, i] - beta / norms[i]

    def fit(self, X, y):
        n_samples, n_features = X.shape

        n_classes = np.unique(y).size
        self.dual_coef_ = np.zeros((n_classes, n_samples), dtype=np.float64)
        self.coef_ = np.zeros((n_classes, n_features))

        # Pre-compute norms.
        norms = np.sqrt(np.sum(X ** 2, axis=1))

        # Shuffle sample indices.
        rs = check_random_state(self.random_state)
        ind = np.arange(n_samples)
        rs.shuffle(ind)

        violation_init = None
        for it in range(self.max_iter):
            violation_sum = 0

            for ii in range(n_samples):
                i = ind[ii]

                # All-zero samples can be safely ignored.
                if norms[i] == 0:
                    continue

                g = self._partial_gradient(X, y, i)
                v = self._violation(g, y, i)
                violation_sum += v

                if v < 1e-12:
                    continue

                # Solve subproblem for the ith sample.
                delta = self._solve_subproblem(g, y, norms, i)

                # Update primal and dual coefficients.
                self.coef_ = self.coef_.astype(np.complex128)
                self.dual_coef_ = self.dual_coef_.astype(np.complex128)
                delta = delta.astype(np.complex128)

                self.coef_ += np.multiply(delta[:, np.newaxis], X[i][:, np.newaxis].conj().T)
                self.dual_coef_[:, i] += delta

            if it == 0:
                violation_init = violation_sum

            vratio = violation_sum / violation_init

            if vratio < self.tol:
                if self.verbose >= 1:
                    print("Converged")
                break

        return self

    def predict(self, X):
        decision = np.dot(X, self.coef_.T)
        return decision.argmax(axis=1)

In [None]:
svm = SVM(C=1, tol=0.001, max_iter=1000, random_state=0, verbose=1)

start_time = time.time()
svm.fit(train_features_reduced, train_labels)
end_time = time.time()

training_duration = end_time - start_time
print("Training duration:", training_duration, "seconds")

predictions = svm.predict(test_features_reduced)
accuracy = np.mean(predictions == test_labels)
print("Accuracy:", accuracy)

In [None]:
svm_no_pca = SVM(C=1, tol=0.001, max_iter=1000, random_state=0, verbose=1)

start_time_no_pca = time.time()
svm_no_pca.fit(train_features, train_labels)
end_time_no_pca = time.time()

training_duration_no_pca = end_time_no_pca - start_time_no_pca
print("Training duration without PCA:", training_duration_no_pca, "seconds")

predictions_no_pca = svm_no_pca.predict(test_features)
accuracy_no_pca = np.mean(predictions_no_pca == test_labels)
print("Accuracy without PCA:", accuracy_no_pca)

In [None]:
def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix')

    #print(cm)
    plt.figure(figsize = (10,10))
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.tight_layout()

In [None]:
from sklearn.metrics import confusion_matrix
# Compute confusion matrix
cnf_matrix = confusion_matrix(test_labels, predictions)
np.set_printoptions(precision=2)

# Plot non-normalized confusion matrix
plt.figure()
class_names = ['safe driving', 'texting - right', 'talking on the phone - right', 'texting - left', 'talking on the phone - left', 'operating the radio', 'drinking', 'reaching behind',
               'hair and makeup', 'talking to passenger']

plot_confusion_matrix(cnf_matrix, classes=class_names,
                      title='Confusion matrix with PCA')

In [None]:
from sklearn.metrics import confusion_matrix
# Compute confusion matrix
cnf_matrix = confusion_matrix(test_labels, predictions_no_pca)
np.set_printoptions(precision=2)

# Plot non-normalized confusion matrix
plt.figure()
class_names = ['safe driving', 'texting - right', 'talking on the phone - right', 'texting - left', 'talking on the phone - left', 'operating the radio', 'drinking', 'reaching behind',
               'hair and makeup', 'talking to passenger']

plot_confusion_matrix(cnf_matrix, classes=class_names,
                      title='Confusion matrix without PCA')

In [None]:
#classification report
from sklearn.metrics import confusion_matrix, classification_report
y_true = test_labels
class_names = ['safe driving', 'texting - right', 'talking on the phone - right', 'texting - left', 'talking on the phone - left', 'operating the radio', 'drinking', 'reaching behind',
               'hair and makeup', 'talking to passenger']
print(classification_report(y_true, predictions, target_names = class_names))

In [None]:
#classification report
from sklearn.metrics import confusion_matrix, classification_report
y_true = test_labels
class_names = ['safe driving', 'texting - right', 'talking on the phone - right', 'texting - left', 'talking on the phone - left', 'operating the radio', 'drinking', 'reaching behind',
               'hair and makeup', 'talking to passenger']
print(classification_report(y_true, predictions_no_pca, target_names = class_names))

In [None]:
#AUC
from sklearn.preprocessing import LabelBinarizer
from sklearn.metrics import roc_auc_score
def multiclass_roc_auc_score(y_test, y_pred, average="macro"):
    lb = LabelBinarizer()
    lb.fit(y_test)
    y_test = lb.transform(y_test)
    y_pred = lb.transform(y_pred)
    return roc_auc_score(y_test, y_pred, average=average)

y_true = test_labels
multiclass_roc_auc_score(y_true, predictions)

In [None]:
#AUC
from sklearn.preprocessing import LabelBinarizer
from sklearn.metrics import roc_auc_score
def multiclass_roc_auc_score(y_test, y_pred, average="macro"):
    lb = LabelBinarizer()
    lb.fit(y_test)
    y_test = lb.transform(y_test)
    y_pred = lb.transform(y_pred)
    return roc_auc_score(y_test, y_pred, average=average)

y_true = test_labels
multiclass_roc_auc_score(y_true, predictions_no_pca)

In [None]:
# Define the class names
class_names_folder = ['c0', 'c1', 'c2', 'c3', 'c4', 'c5', 'c6', 'c7', 'c8', 'c9']

# Number of images to display from each class
num_images_per_class = 2

# Loop through each class
for class_index in range(len(class_names_folder)):
    class_label = class_names_folder[class_index]

    # Get all image file paths in the current class folder
    image_paths = glob.glob(f'/kaggle/input/state-farm-distracted-driver-detection/imgs/train/{class_label}/*.jpg')

    # Randomly select the specified number of images from the current class
    selected_image_paths = np.random.choice(image_paths, size=num_images_per_class, replace=False)

    # Loop through the selected images in the current class
    for image_path in selected_image_paths:
        # Load and preprocess the image
        img = cv2.imread(image_path)
        img_resized = cv2.resize(img, (100, 100))/256
        images = np.array([img_resized])

        # Perform prediction
        images = cnn.predict(images)
        images = pca.transform(images)
        result = svm.predict(images)

        # Display the original image
        plt.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
        plt.axis('off')
        plt.show()

        # Print the result
        print(f"Image: {image_path}")
        print(f"Predicted Class: {class_names[result[0]]}")
        print()

In [None]:
# Define the class names
class_names_folder = ['c0', 'c1', 'c2', 'c3', 'c4', 'c5', 'c6', 'c7', 'c8', 'c9']

# Number of images to display from each class
num_images_per_class = 2

# Loop through each class
for class_index in range(len(class_names_folder)):
    class_label = class_names_folder[class_index]

    # Get all image file paths in the current class folder
    image_paths = glob.glob(f'/kaggle/input/state-farm-distracted-driver-detection/imgs/train/{class_label}/*.jpg')

    # Randomly select the specified number of images from the current class
    selected_image_paths = np.random.choice(image_paths, size=num_images_per_class, replace=False)

    # Loop through the selected images in the current class
    for image_path in selected_image_paths:
        # Load and preprocess the image
        img = cv2.imread(image_path)
        img_resized = cv2.resize(img, (100, 100))/256
        images = np.array([img_resized])

        # Perform prediction
        images = cnn.predict(images)
        result = svm_no_pca.predict(images)

        # Display the original image
        plt.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
        plt.axis('off')
        plt.show()

        # Print the result
        print(f"Image: {image_path}")
        print(f"Predicted Class without PCA: {class_names[result[0]]}")
        print()

In [None]:
# # Directory path of test images
# test_dir = '/kaggle/input/state-farm-distracted-driver-detection/imgs/test/'

# # Get the list of files in the test directory
# file_list = os.listdir(test_dir)[:20]

# # Process each image and make predictions
# for file_name in file_list:
#     # Load and preprocess the image
#     img_path = os.path.join(test_dir, file_name)
#     img = cv2.imread(img_path)
#     img_c0 = cv2.resize(img, (100, 100)) / 256
#     images = np.array([img_c0])

#     # Perform prediction
#     images = cnn.predict(images)
#     images = pca.transform(images)
#     result = svm.predict(images)

#     # Convert result to class names
#     class_result = class_names[result[0]]

#     # Display the original image
#     plt.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
#     plt.axis('off')
#     plt.title(f"Prediction: {class_result}")
#     plt.show()

In [None]:
with open('svm_manual_bisa_v4.4.pkl', 'wb') as f:
    pickle.dump(svm, f)
with open('svm_manual_bisa_v4.4_no_pca.pkl', 'wb') as f:
    pickle.dump(svm_no_pca, f)
with open('pca_manual_bisa_v4.4.pkl', 'wb') as f:
    pickle.dump(pca, f)
cnn.save('/kaggle/working/cnn_bisa_v4.4.h5')