# Applied Machine Learning 
## Project – Stage 2 (Sorting Legos Using Engineered Features)

You have been hired by a company to develop machine learning algorithms for a sorting facility. The requirement is that the sorting device takes images of items on a conveyor belt, and then uses and machine learning algorithm to classify the items into classes. Then, the items get routed through different routes on the conveyor belt depending on their class.

For the sake of this project, you are given RGB images, and your focus will be on developing the classification algorithms. Also, for simplicity, it is assumed that items are Lego pieces of three different types with the following shapes (top view): Rectangles (2x4), squares (2x2), and circles (2x2). Examples are shown below:

<img align="center" src="Lego.JPG">

The company tested your solution from stage 1, and requested an update. Namely, the company realized they have given you an idealized data set where the images are centered and oriented in a given direction. However, the items may appear in the image off-centre, and may have various orientations as shown below:

<img align="center" src="Lego2.JPG">

Your algorithm should be able to classify these three classes with different centering and orientations. To realize this, you need to extract/engineer features from the images, and to use these features to train and test a machine learning algorithm. The company requested that you use these features as the input of a 3-class classifier (Logistic regression, as in Lecture 5), and to use a maximum of 64 features per image.


In [2]:
# The required libraries are imported here
import os
from pathlib import Path
import numpy as np

from PIL import Image, ImageFilter
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import confusion_matrix, accuracy_score
from sklearn.preprocessing import MinMaxScaler
import cv2
from scipy import ndimage
import warnings

warnings.filterwarnings("ignore", category=DeprecationWarning)

In [6]:
# This cell contains the functions defined to read the images from the given directories,
# separate them into classes, and preprocess them, so they can be used by thr model.

def image_cropper(img: np.array, height_percentage: int = 40, width_percentage: int = 40) -> np.array:
    """
    the image is cropped from both sides of the height and width
    :param img: the images as an np.array
    :param width_percentage: the percentage of the pixels from the width of the image we would like to crop
    :param height_percentage: the percentage of the pixels from the height of the image we would like to crop
    :return: cropped image as np.array
    """
    height, width = img.shape[0], img.shape[1]
    cropped_height_amount = height // 100 * height_percentage
    cropped_width_amount = height // 100 * width_percentage
    img = img[cropped_height_amount // 2:height - cropped_height_amount // 2,
          cropped_width_amount // 2:width - cropped_width_amount // 2]
    return img


def dynamic_image_cropper(img: np.array, region_of_interest=500, negative_image = False) -> np.array:
    """
    The pixel value mean is calculated at the 4 corners of the image + the center.
    The region with the lowest mean (since pixels of the object are darker) is chosen as the region of interest.
    In case of negative image, we use the maximum.
    :param img: img as np.array
    :param region_of_interest: the size of the bounding box we are checking for calculating the mean
    :return: cropped image
    """
    if negative_image:
        img = abs(255-img)
    # points of interest
    height, width = img.shape[0], img.shape[1]
    center = [height // 2, width // 2]
    right_top_corner = [0, width - 1]
    left_top_corner = [0, 0]
    right_bottom_corner = [height - 1, width - 1]
    left_bottom_corner = [height - 1, 0]

    # calculating the mean
    left_bottom_corner_mean = np.mean(img[left_bottom_corner[0] - region_of_interest:left_bottom_corner[0],
                                      left_bottom_corner[1]:left_bottom_corner[1] + region_of_interest])
    left_top_corner_mean = np.mean(img[left_top_corner[0]: region_of_interest + left_bottom_corner[0],
                                   left_bottom_corner[1]:left_bottom_corner[1] + region_of_interest])
    right_top_corner_mean = np.mean(img[right_top_corner[0]:right_top_corner[0] + region_of_interest,
                                    right_top_corner[1] - region_of_interest: right_top_corner[1]])
    right_bottom_corner_mean = np.mean(img[right_bottom_corner[0] - region_of_interest:right_bottom_corner[0],
                                       right_bottom_corner[1] - region_of_interest:right_bottom_corner[1]])
    center_mean = np.mean(img[center[0] - region_of_interest // 2:center[0] + region_of_interest // 2,
                          center[1] - region_of_interest // 2:center[1] + region_of_interest // 2])


    # getting the minimum/maximum mean and associated region
    if not negative_image:
        min_mean = np.argmin(np.array(
            [center_mean, left_top_corner_mean, left_bottom_corner_mean, right_top_corner_mean, right_bottom_corner_mean]))
    else:
        min_mean = np.argmax(np.array(
            [center_mean, left_top_corner_mean, left_bottom_corner_mean, right_top_corner_mean, right_bottom_corner_mean]))
    if min_mean == 0:
        return img[center[0] - region_of_interest // 2:center[0] + region_of_interest // 2,
               center[1] - region_of_interest // 2:center[1] + region_of_interest // 2]
    elif min_mean == 1:
        return img[left_top_corner[0]: region_of_interest + left_bottom_corner[0],
               left_bottom_corner[1]:left_bottom_corner[1] + region_of_interest]
    elif min_mean == 2:
        return img[left_bottom_corner[0] - region_of_interest:left_bottom_corner[0],
               left_bottom_corner[1]:left_bottom_corner[1] + region_of_interest]
    elif min_mean == 3:
        return img[right_top_corner[0]:right_top_corner[0] + region_of_interest,
               right_top_corner[1] - region_of_interest: right_top_corner[1]]
    elif min_mean == 4:
        return img[right_bottom_corner[0] - region_of_interest:right_bottom_corner[0],
               right_bottom_corner[1] - region_of_interest:right_bottom_corner[1]]

def dynamic_image_cropper2(img):
    """
    Find the object in the image and isolates it based on the edges and contours in the image.
    The image is first turned into its negative counter part.
    The the edges on the negative image are calculated, the contours for the edge image are found,
    and the contour image with the biggest area is chosen as the final image and the object.
    :param img:
    :return:
    """
    img = np.uint8(img)
    gray_negative = abs(255-img)
    img = gray_negative
    # tested blured version of image for source, noise reduction and unsharp mask
    blur = cv2.GaussianBlur(img, (5,5), cv2.BORDER_DEFAULT)
    # img = img + (img-blur)
    edges= cv2.Canny(img, 100,200)
    contours, hierarchy= cv2.findContours(edges.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)
    sorted_contours= sorted(contours, key=cv2.contourArea, reverse= True)

    objects = []
    for (i,c) in enumerate(sorted_contours):
        x,y,w,h= cv2.boundingRect(c)
        new = img[y- 10:y+h + 10, x- 10:x+w + 10]
        new[new<110] = 0
        objects.append(new)
    biggest_object = np.array([[1,1],[1,1]])
    for obj in objects:
        if obj.shape[0] * obj.shape[1] > biggest_object.shape[0] * biggest_object.shape[1]:
            biggest_object = obj
    # cv2.imshow("test", biggest_object)
    # cv2.waitKey(0)
    # cv2.destroyAllWindows()

    return biggest_object



def image_scaler(img, new_height=20, new_width=20):
    """
    :param img: the images as an np.array
    :param new_height: the new height of the scaled image
    :param new_width: the new width of the scaled image
    :return: scaled image as np.array
    """
    img = Image.fromarray(img)
    img = img.resize((new_width, new_height))
    img = np.array(img)
    return img


def image_standardization(img, max_value=255):
    """
    changes the values of the image pixels to be between 0 and max_value. The commented line is for discretizing the pixel values
    :param max_value: max_value of the new pixels
    :param img: the images as an np.array
    :return: standardized image
    """
    img = (img - np.min(img)) / (np.max(img) - np.min(img)) * max_value
    # img = (img // 16) * 16
    return img


def separate_classes(img_files, name_separation_chars=3, test_flag=False, classes_dict=None):
    """
    separates the images into classes based on their file names
    :param img_files: list of image file names
    :param name_separation_chars: number of characters in the name that should get used for separation
    :return: a list of classes for image names, a dict containing the name and the corresponding class number of images
    """
    names = list(set(img_name[:name_separation_chars] for img_name in img_files))
    if not test_flag:
        classes_dict = dict(enumerate(names))
        classes_dict = {v: k for k, v in classes_dict.items()}
    classes = []
    for img_name in img_files:
        classes.append(classes_dict[img_name[:name_separation_chars]])
    return classes, classes_dict


def get_files_list(data_dir, name_separation_chars=3, test_flag=False, classes_dict=None):
    """
    gets the list of image names from the directory
    :param data_dir: path to the image files
    :return: list of image names, their classes and the dict for classes
    """
    data = os.listdir(data_dir)
    classes, classes_dict = separate_classes(data, name_separation_chars=name_separation_chars, test_flag=test_flag,
                                             classes_dict=classes_dict)
    return data, classes, classes_dict


def image_reader(img_dir):
    """
    reads image in grayscale
    :return: img as np.array
    """
    img = cv2.imread(img_dir)
    gray= cv2.cvtColor(img,cv2.COLOR_BGR2GRAY)
    return gray


def calculate_image_edges(img, total_extracted_features, threshold=100, peak_threshold=0.3, cutoff_percentage=20):
    """
    Was supposed to be similar to the landing practice coding example in the course.
    PIL filters are used to find the edges from the image.
    Then the sum of edges are calculated for vertical and horizontal axis.
    Did not return good results for this project.
    Was changed to combining the max values in the vertical and horizontal edges
    :param img: img as PIL object
    :param total_extracted_features: number of features we want to extract from the image
    :param threshold: used for noise reduction
    :param peak_threshold: used for high-pass filter
    :param cutoff_percentage: used for image resizing and reduction of zero pixels after edge detection
    :return: engineered features from the image
    """
    # img = Image.fromarray(img)
    img_edges = img.filter(ImageFilter.FIND_EDGES)
    edge_array = np.array(img_edges)
    edge_array = edge_array[4:edge_array.shape[0] - 4, 4:edge_array.shape[1] - 4]
    edge_array[edge_array < threshold] = 0
    edges_v = np.sum(edge_array, axis=0).astype(float)
    edges_h = np.sum(edge_array, axis=1).astype(float)
    ##############################################################################################################
    ## High-pass filter
    # M = 4
    # edges_v_copy = edges_v.copy()
    # for i in range(M // 2, len(edges_v) - M // 2):
    #     edges_v[i] = edges_v[i] - np.mean(edges_v_copy[i - M // 2:i + M // 2])
    # edges_v[:M // 2] = 0
    # edges_v[-M // 2:] = 0
    # edges_h_copy = edges_h.copy()
    # for j in range(M // 2, len(edges_h) - M // 2):
    #     edges_h[j] = edges_h[j] - np.mean(edges_h_copy[j - M // 2:j + M // 2])
    # edges_h[:M // 2] = 0
    # edges_h[-M // 2:] = 0


    # cutoff = edge_array.shape[0] * cutoff_percentage // 100
    # edges_h = edges_h[cutoff:-cutoff]
    # edges_v = edges_v[cutoff:-cutoff]

    # peak_v = np.max(np.abs(edges_v))
    # peak_h = np.max(np.abs(edges_h))
    # edges_v[np.abs(edges_v) < peak_v * peak_threshold] = 0
    # edges_h[np.abs(edges_h) < peak_h * peak_threshold] = 0
    # sd_v = np.sqrt(np.var(noise_v))
    # sd_h = np.sqrt(np.var(noise_h))
    # # # sd_h, sd_v = peak_h,peak_v
    # x = np.array([edges_v / sd_v, edges_h / sd_h]).reshape(1, -1)
    ##############################################################################################################

    edges_v_indexes = np.sort(edges_h.argsort()[-total_extracted_features // 2:][::-1])

    edges_v_extracted = edges_v[edges_v_indexes]
    edges_h_indexes = np.sort(edges_h.argsort()[-total_extracted_features // 2:][::-1])
    edges_h_extracted = edges_h[edges_h_indexes]


    ## MinMax scaler was used for standardizing the values
    # scaler = MinMaxScaler()
    # x = np.array([scaler.fit_transform(edges_v_extracted.reshape(-1,1)), scaler.fit_transform(edges_h_extracted.reshape(-1,1))]).reshape(1, -1)

    x = np.array([edges_v_extracted, edges_h_extracted]).reshape(-1, 1)
    return x

def calculate_image_edges2(img, total_extracted_features):
    """
    The edge detection was changed to OpenCV2 functions.
    Sobel filter and Canny edge detection were tested.
    Canny edge detection achieved results with less noise and more accurate edges.
    Then, similar to the previous method, edge values in vertical and horizontal axis were summed up to extract the features.
    :param img: source image
    :param total_extracted_features: total number of extracted features
    :return: the extracted features
    """

    img = np.array(img)
    img = np.uint8(img)
    # Sharp mask - Enhancing the details in the image by adding the subtraction of blured image from the original image
    # img = img + (img - cv2.GaussianBlur(img, (5,5), cv2.BORDER_DEFAULT))
    # blurring the source image as the input to the Canny edge detector
    blur = cv2.GaussianBlur(img, (5,5), cv2.BORDER_DEFAULT)
    edge_array = cv2.Canny(image=blur, threshold1=100, threshold2=200, apertureSize= 3)
    edges_v = np.sum(edge_array, axis=0).astype(float)
    edges_h = np.sum(edge_array, axis=1).astype(float)
    # 32 biggest values from horizontal and vertical values are combined
    edges_v_indexes = np.sort(edges_v.argsort()[-total_extracted_features // 2:][::-1])
    edges_v_extracted = edges_v[edges_v_indexes]
    edges_h_indexes = np.sort(edges_h.argsort()[-total_extracted_features // 2:][::-1])
    edges_h_extracted = edges_h[edges_h_indexes]
    x = np.array([edges_v_extracted, edges_h_extracted]).reshape(-1, 1)
    return x

def calculate_image_edges3(img, total_extracted_features):
    """
    This method was developed in combination with the second dynamic cropper function. In this approach,
    we don't look for edges, but we sum up the actual pixel values in vertical and horizontal axis.
    That is because we used edge detection for finding the actual objects and
    using it again seemed to only cause information loss.
    :param img: input image as np.array
    :param total_extracted_features: total number of extracted features
    :return: the extracted features
    """
    vertical = np.sum(img, axis = 0)
    horizontal = np.sum(img, axis = 1)
    ## tested for noise reduction, did not have positive effects
    # vertical[vertical<np.max(vertical) * 0.05] = 0
    # horizontal[horizontal<np.max(horizontal) * 0.05] = 0

    vertical[vertical == 0] = sorted(set(vertical))[1]
    horizontal[horizontal == 0] = sorted(set(horizontal))[1]
    edges_v_indexes = np.sort(vertical.argsort()[-total_extracted_features // 2:][::-1])
    edges_v_extracted = vertical[edges_v_indexes]
    edges_h_indexes = np.sort(horizontal.argsort()[-total_extracted_features // 2:][::-1])
    edges_h_extracted = horizontal[edges_h_indexes]
    x = np.array([edges_v_extracted, edges_h_extracted])
    ## an attempt on normalizing the data - was unreliable
    # x = np.log2(x + 1)
    return x



def get_data(path_to_training_data,
             class_name_chars=3,
             test_flag=False,
             classes_dict=None,
             augment_data=False,
             augments=[90, 45]
             ):
    """
    Reads the images. Adds rotation to them if augment_data flag is True.
    This way, we can make up for the low number of training images and take the rotation in images into account.
    :param path_to_training_data:
    :param class_name_chars:
    :param test_flag:
    :param classes_dict:
    :param augment_data: Flag to augmenting data
    :param augments: The angles we want to rotate the image by
    :return:
    """
    data = []
    image_names, classes, classes_dict = get_files_list(path_to_training_data, class_name_chars, test_flag=test_flag,
                                                        classes_dict=classes_dict)
    new_classes = []
    for i, name in enumerate(image_names):
        img = image_reader(str(Path(path_to_training_data) / name))
        data.append(np.array(img))
        new_classes.append(classes[i])
        if augment_data:
            for value in augments:
                rotated = ndimage.rotate(img, value)
                rotated = np.array(rotated)
                rotated[rotated == 0] = np.mean(np.array(img)[0:20, 0:20])
                data.append(rotated)
                new_classes.append(classes[i])

    # converting classes list to a column array
    new_classes = np.array(new_classes).reshape(-1, 1)
    return np.array(data, dtype=object), new_classes, classes_dict


def preprocess_data(data,
                    standardization_flag=True,
                    standardization_max_value=255,
                    crop_flag=True,
                    crop_height_percentage=50,
                    crop_width_percentage=50,
                    scaling_flag=True,
                    scaling_height=100,
                    scaling_width=100,
                    total_extracted_features=64,
                    region_of_interest=500,
                    scaling = True,
                    scaler=None):
    """
    The actual preprocessing pipeline for the model. The images are standardized to 0-255.
    Then scaled, cropped by the chosen method, and finally the 64 features are extracted.
    For the first dynamic image cropper, first the image is cropped and then scaled,
    while for the dynamic image cropper 2, first the image is scaled and then cropped
    :param data:
    :param standardization_flag:
    :param standardization_max_value:
    :param crop_flag:
    :param crop_height_percentage:
    :param crop_width_percentage:
    :param scaling_flag:
    :param scaling_height:
    :param scaling_width:
    :param total_extracted_features:
    :param region_of_interest: used for dynamic image cropper
    :param scaling: If true, the extracted feature are scaled using a MonMaxScaler
    :param scaler: we pass in the scaler if we have a pre-fitted scaler
    :return: training and testing data
    """
    new_data = []
    for img in data:
        if standardization_flag:
            img = image_standardization(img, standardization_max_value)
        if scaling_flag:
            img = image_scaler(img, scaling_height, scaling_width)
        if crop_flag:
            # img = np.array(img)
            img = dynamic_image_cropper(img, region_of_interest=region_of_interest,negative_image=True)
            # img = dynamic_image_cropper2(img)




        # img = Image.fromarray(img).convert('L')
        # img = calculate_image_edges(img, total_extracted_features)
        # img = calculate_image_edges2(img, total_extracted_features)
        img = calculate_image_edges3(img, total_extracted_features)
        img = img.flatten()
        new_data.append(img)
    new_data = np.array(new_data)
    if scaling:
        if scaler is None:
            scaler = MinMaxScaler()
            scaler.fit(new_data)
        new_data = scaler.transform(new_data)
    return new_data, scaler

In [7]:
# The train function uses the get_data function as a base to read all the images from the given directory, create the classes, prepare the dataset, and then train a logistic regression model to classify the images. In the end, it returns the trained model and classes_dict to be passed to the test function

# control parameters for the preprocessing functions
region_of_interest = 600
scaling_height = 150
scaling_width = 150
####################################################


def train_function(path_to_data,
                   class_name_chars=3,
                   test_flag=False,
                   classes_dict=None,
                   standardization_flag=True,
                   standardization_max_value=255,
                   crop_flag=True,
                   crop_height_percentage=15,
                   crop_width_percentage=15,
                   scaling_flag=True,
                   scaling_height=scaling_height,
                   scaling_width=scaling_width,
                   region_of_interest=region_of_interest,
                   total_extracted_features=64,
                   results=None,
                   scaling = True,
                   scaler=None):
    data, y_train, classes_dict = get_data(path_to_data, class_name_chars, test_flag, classes_dict)
    X_train, scaler = preprocess_data(data, standardization_flag=standardization_flag,
                                      standardization_max_value=standardization_max_value, crop_flag=crop_flag,
                                      crop_height_percentage=crop_height_percentage,
                                      crop_width_percentage=crop_width_percentage, scaling_flag=scaling_flag,
                                      scaling_height=scaling_height, scaling_width=scaling_width,
                                      total_extracted_features=total_extracted_features,
                                      region_of_interest=region_of_interest, scaling = scaling, scaler=scaler)
    # train LogisticRegression model
    model = LogisticRegression(max_iter=5000000)
    model.fit(X_train, y_train.ravel())

    print(f'\nX_train.shape = {X_train.shape}, y_train.shape = {len(y_train)}')
    y_pred = model.predict(X_train)

    # Reporting the stats from the model
    print(f'\nConfusion Matrix = \n{confusion_matrix(y_train, y_pred)}')
    print(f'\nAccuracy Score = {accuracy_score(y_train, y_pred)}')
    print(f'Model Coefficient Shape = {model.coef_.shape}')
    if results is not None:
        results.append(accuracy_score(y_train, y_pred))
    return model, classes_dict, results, scaler


model, classes_dict, results, scaler = train_function('data/Lego_dataset_2/training/')


X_train.shape = (81, 64), y_train.shape = 81

Confusion Matrix = 
[[18  2  7]
 [ 0 25  2]
 [ 2  6 19]]

Accuracy Score = 0.7654320987654321
Model Coefficient Shape = (3, 64)


In [11]:
# The test function uses the get_data function as a base to read all the images from the given directory, get their classes based on the classes_dict from the training function, prepare the dataset, and then predict the classes of the images.

def test_function(path_to_data,
                  model,
                  class_name_chars=3,
                  test_flag=False,
                  classes_dict=None,
                   standardization_flag=True,
                  standardization_max_value=255,
                  crop_flag=True,
                  crop_height_percentage=10,
                  crop_width_percentage=10,
                  scaling_flag=True,
                  scaling_height=scaling_height,
                  scaling_width=scaling_width,
                  region_of_interest=region_of_interest,
                  total_extracted_features=64,
                  results=None,
                  scaling = True,
                  scaler = None):
    data, y_test, classes_dict = get_data(path_to_data, class_name_chars, test_flag, classes_dict, augment_data= False)
    X_test, scaler = preprocess_data(data, standardization_flag=standardization_flag,
                             standardization_max_value=standardization_max_value, crop_flag=crop_flag,
                             crop_height_percentage=crop_height_percentage,
                             crop_width_percentage=crop_width_percentage, scaling_flag=scaling_flag,
                             scaling_height=scaling_height, scaling_width=scaling_width,
                             total_extracted_features=total_extracted_features, region_of_interest=region_of_interest, scaling = scaling, scaler = scaler)
    # train LogisticRegression model
    print(f'\nX_test.shape = {X_test.shape}, y_test.shape = {len(y_test)}')
    y_pred = model.predict(X_test)

    # Reporting the stats from the model
    print(f'\nConfusion Matrix = \n{confusion_matrix(y_test, y_pred)}')
    print(f'\nAccuracy Score = {accuracy_score(y_test, y_pred)}')
    print(f'Model Coefficient Shape = {model.coef_.shape}')
    if results is not None:
        results.append(accuracy_score(y_test, y_pred))
    return results


results = test_function('data/Lego_dataset_2/testing/', model, test_flag=True, classes_dict=classes_dict, scaler=scaler)


X_test.shape = (81, 64), y_test.shape = 81

Confusion Matrix = 
[[21  0  6]
 [ 5 18  4]
 [ 2  2 23]]

Accuracy Score = 0.7654320987654321
Model Coefficient Shape = (3, 64)
