In [6]:
import sys
import subprocess
import pkg_resources
import numpy as np

required = {"scikit-image"}
installed = {pkg.key for pkg in pkg_resources.working_set}
missing = required - installed

if missing:
    python = sys.executable
    subprocess.check_call(
        [python, "-m", "pip", "install", *missing], stdout=subprocess.DEVNULL
    )


###### 1. Opis danych dla pierwszych 6 zajęć laboratoryjnych

Dane są dostępne w postaci spakowanej w pliku <a href="http://fraktal.faculty.wmi.amu.edu.pl/symulowanie_wizualne/train_test_sw.zip">train_test_sw.zip</a>. Po rozpakowaniu otrzymujmemy katalog <i>train_test_sw</i> zawierający:

<ol>
        <li>katalog <i>train_sw</i> - katalog z listą podkatalogów przechowujących obrazy treningowe (format 4-bajtowy rgba) podzielone na kategorie - nazwa katalogu odpowiada nazwie kategorii
     <li>katalog <i>test_sw</i> - katalog z listą plików zawierających obrazy testowe   
     <li>plik <i>test_labels.json</i> - zawiera etykiety obrazów testowych  w formacie JSON - pojedyncza dana to słownik  
         <p>
             <code>
                 {
                    "filename": nazwa pliku,
                    "value": nazwa_kategorii
                 }
            </code>
 </ol>


#### 2. Funkcja <code>load_train_data(input_dir, newSize=(64,64))</code>

Funkcja ta wczytuje dane teningowe, przeskalowuje je do rozmiaru podanego w drugim parametrze (z interpolacją typu cv2.INTER_AREA), przeprowadza ich normalizację i zwraca słownik w formacie:

<p>
<code>
{  "data" - lista (typ <code>list</code>) obrazów, gdzie pojedynczy obraz jest tablicą <code>numpy.array</code> zapisaną wierszowo blokami rgb
   "categories_name" - lista nazw kategorii klasyfikacyjnych, gdzie pojedyncza nazwa kategorii jest typu <code>str</code>
   "categories_count" - lista ilości przykładów z pozszczególnych kategorii w danych treningowych 
   "labels" - lista etykiet wszystkich danych treningowych
 }
 </code>
 
 Parametry:
    
 - <code>input_dir</code> przyjmuje wartość  ścieżki katalogu 'train_sw'
 - <code>newSize</code> przyjmuje wartość  krotki określającej rozmiar przeskalowanych obrazów


In [7]:
def load_train_data(input_dir, newSize=(64, 64)):
    """ """

    import numpy as np
    import pandas as pd
    import os
    from skimage.io import imread
    import cv2 as cv
    from pathlib import Path
    import random
    from shutil import copyfile, rmtree
    import json

    import seaborn as sns
    import matplotlib.pyplot as plt

    import matplotlib

    image_dir = Path(input_dir)
    categories_name = []
    for file in os.listdir(image_dir):
        d = os.path.join(image_dir, file)
        if os.path.isdir(d):
            categories_name.append(file)

    folders = [directory for directory in image_dir.iterdir() if directory.is_dir()]

    train_img = []
    categories_count = []
    labels = []
    for i, direc in enumerate(folders):
        count = 0
        for obj in direc.iterdir():
            if (
                os.path.isfile(obj)
                and os.path.basename(os.path.normpath(obj)) != "desktop.ini"
            ):
                labels.append(os.path.basename(os.path.normpath(direc)))
                count += 1
                img = imread(obj)  # zwraca ndarry postaci xSize x ySize x colorDepth
                img = cv.resize(
                    img, newSize, interpolation=cv.INTER_AREA
                )  # zwraca ndarray
                if img[0][0].size==3:
                    img =  np.dstack((img,np.ones((64,64,1))))
                img = img / 255  # normalizacja
                train_img.append(img)
        categories_count.append(count)
    X = {}
    X["values"] = np.array(train_img)
    X["categories_name"] = categories_name
    X["categories_count"] = categories_count
    X["labels"] = labels
    return X


In [8]:
data = load_train_data("./train_test_sw/train_sw")
print(data["categories_name"])
print(data["categories_count"])
print([data["labels"][50], data["labels"][200], data["labels"][300]])
print(list(data["values"][100][1][10]))


['Beech', 'Gardenia', 'Lemon', 'Mean', 'Tomato']
[406, 206, 412, 412, 412]
['Beech', 'Beech', 'Beech']
[0.3843137254901961, 0.08627450980392157, 0.11764705882352941, 1.0]


#### 3. Funkcja <code>load_test_data(input_dir, newSize=(64,64))</code>

Funkcja ta wczytuje dane testowe, przeskalowuje je do rozmiaru podanego w drugim parametrze (z
interpolacją typu cv2.INTER_AREA), przeprowadza ich normalizację i zwraca słownik w formacie:

<p>
    
<code>
{  "data" - lista (typ <code>list</code>) obrazów, gdzie pojedynczy obraz jest tablicą <code>numpy.array</code> zapisaną wierszowo blokami rgb
   "categories_name" - lista nazw kategorii klasyfikacyjnych, gdzie pojedyncza nazwa kategorii jest typu <code>str</code>
   "categories_count" - lista ilości przykładów z pozszczególnych kategorii w danych testowych 
   "labels" - lista etykiet wszystkich danych testowych
 }
 </code>
 
 Parametry:
    
 - <code>input_dir</code> przyjmuje wartość  ścieżki katalogu 'test_sw'
 - <code>newSize</code> przyjmuje wartość  krotki określającej rozmiar przeskalowanych obrazów


In [9]:
def load_test_data(input_dir, newSize=(64, 64)):
    """ """

    import numpy as np
    import pandas as pd
    import os
    from skimage.io import imread
    import cv2 as cv
    from pathlib import Path
    import random
    from shutil import copyfile, rmtree
    import json

    import seaborn as sns
    import matplotlib.pyplot as plt

    import matplotlib

    image_path = Path(input_dir)

    labels_path = image_path.parents[0] / "test_labels.json"

    # with labels_path.open("r", encoding ="utf-8") as f:
    jsonString = labels_path.read_text()
    objects = json.loads(jsonString)

    # print(objects)

    categories_name = []
    categories_count = []
    count = 0
    c = objects[0]["value"]
    for e in objects:
        if e["value"] != c:
            # print(count)
            # print(c)
            categories_count.append(count)
            c = e["value"]
            count = 1
        else:
            count += 1
        if not e["value"] in categories_name:
            categories_name.append(e["value"])

    categories_count.append(count)

    test_img = []

    labels = []
    for e in objects:
        p = image_path / e["filename"]
        img = imread(p)  # zwraca ndarry postaci xSize x ySize x colorDepth
        img = cv.resize(img, newSize, interpolation=cv.INTER_AREA)  # zwraca ndarray
        img = img / 255  # normalizacja
        test_img.append(img)
        labels.append(e["value"])

    X = {}
    X["values"] = np.array(test_img)
    X["categories_name"] = categories_name
    X["categories_count"] = categories_count
    X["labels"] = labels
    return X


In [10]:
data = load_test_data("./train_test_sw/test_sw")
print(data["categories_name"])
print(data["categories_count"])
print([data["labels"][10], data["labels"][20], data["labels"][30]])
print(list(data["values"][10][1][10]))


['Beech', 'Gardenia', 'Lemon', 'Mean', 'Tomato']
[51, 52, 52, 52, 52]
['Beech', 'Beech', 'Beech']
[0.058823529411764705, 0.00784313725490196, 0.0, 1.0]


## Skład zespołu:
- Jakub Eichner 478874
- Mateusz Ogrodowczyk 478841
- Julian Zabłoński 478831


#### 4. Zadanie 1 (4pkt):

Napisz kod klasy <code>KNearestNeighbor</code> implementującej klasyfikator <i>knn</i>. Należy zimplementować następujące metody:

- <code>konstruktor</code> pobierający listę obrazów treningowych (zgodną zw składową 'values' wczytanego słownika) oraz listę ich etykiet
- metoda <code>l_p_metric(image1, image2, p):</code> zwracająca wartość odległości pomiędzy dwoma obrazami, mierzoną normą typu <i>l_p</i> - parametr <code>p</code> określa 'potęgę' normy
- metoda <code>predict(test_images, k,p):</code> zwracająca listę prognozowanych etykiet dla obrazów testowych (parametr <code>test_images</code>). Paramter drugi określa liczbę przeszukiwanych sąsiadów, natomiast paramter trzeci określa potęgę wybranej metryki.
- metoda <code>accuracy(test_images, k,p)</code> zwracająca dokładność klasyfikatora na zbiorze testowym. Parametr drugi i trzeci są jak w metodzie <code>predict()</code>


In [11]:
from collections import Counter


class KNearestNeighbor(object):
    def __init__(self, values, labels) -> None:
        self.values = values
        self.labels = labels
        #self.imgs_no = len(self.values)

    def l_p_metric(self, image1, image2, p, i):
        # if (i[1] + 1) % 500 == 0 or i[1] + 1 == self.imgs_no:
#         if i[1] + 1 == self.imgs_no and (
#             (i[0] + 1) % 25 == 0 or i[0] == self.currently_predicted_no
#         ):
#             print(
#                 f"Calculating distance between the {i[0]+1}/{self.currently_predicted_no} image for the {i[1]+1}/{self.imgs_no} value"
#             )
        return pow(np.sum(pow(abs(image1 - image2) + 1e-16, p)), 1 / p)

    def predict(self, test_images, K, P):
        self.currently_predicted_no = len(test_images)
        diffs = [
            [
                (
                    learned_data[1],
                    self.l_p_metric(test_img, learned_data[0], P, (img_no, i)),
                )
                for i, learned_data in enumerate(zip(self.values, self.labels))
            ]
            for img_no, test_img in enumerate(test_images)
        ]
        # we create a list of the K nearest neighbours for each test imag
        neighbours = [sorted(distances, key=lambda x: x[1])[:K] for distances in diffs]
        neigh_keys = [(x[0] for x in neighs_list) for neighs_list in neighbours]
        result_neighbours = [
            Counter(neighs).most_common(1)[0][0] for neighs in neigh_keys
        ]
        return np.array(result_neighbours)

    def accuracy(self, test_labels, pred_labels):
        comparison_results = [
            label == pred for label, pred in zip(test_labels, pred_labels)
        ]
        return sum(comparison_results) / len(comparison_results)

    def __str__(self):
        return f"Values {self.values}\nLabels: {self.labels}"


In [12]:
from sklearn.preprocessing import LabelEncoder

data_train = load_train_data("./train_test_sw/train_sw", newSize=(64, 64))
X_train = data_train["values"]
y_train = data_train["labels"]


data_test = load_test_data("./train_test_sw/test_sw", newSize=(64, 64))
X_test = data_test["values"]
y_test = data_test["labels"]


class_le = LabelEncoder()
y_train_enc = class_le.fit_transform(y_train)
y_test_enc = class_le.fit_transform(y_test)


In [13]:
Knn = KNearestNeighbor(X_train, y_train_enc)

pred = Knn.predict(test_images=X_test, K=1, P=1)
print(f'Accuracy for 1 neighbour and P=1 → {Knn.accuracy(y_test_enc, pred)}')

pred = Knn.predict(X_test, K=1, P=2)
print(f'Accuracy for 1 neighbour and P=2 → {Knn.accuracy(y_test_enc, pred)}')

pred = Knn.predict(X_test, K=5, P=1)
print(f'Accuracy for 5 neighbours and P=1 → {Knn.accuracy(y_test_enc, pred)}')

pred = Knn.predict(X_test, K=5, P=2)
print(f'Accuracy for 5 neighbour and P=2 → {Knn.accuracy(y_test_enc, pred)}')

pred = Knn.predict(X_test, K=10, P=1)
print(f'Accuracy for 10 neighbours and P=1 → {Knn.accuracy(y_test_enc, pred)}')

pred = Knn.predict(X_test, K=10, P=2)
print(f'Accuracy for 10 neighbours and P=2 → {Knn.accuracy(y_test_enc, pred)}')


Accuracy for 1 neighbour and P=1 → 0.583011583011583
Accuracy for 1 neighbour and P=2 → 0.5521235521235521
Accuracy for 5 neighbours and P=1 → 0.555984555984556
Accuracy for 5 neighbour and P=2 → 0.5444015444015444
Accuracy for 10 neighbours and P=1 → 0.5019305019305019
Accuracy for 10 neighbours and P=2 → 0.5019305019305019


#### 5. Zadanie 2 (2pkt):

Napisz kod funkcji <code>crossValidation(X,y, n = 10, k=1,p=1 ):</code> obliczającą algorytm <code>knn </code> z n-krotną walidacją krzyżową.


In [14]:
from itertools import chain


def crossValidation(X, y, n=10, k=1, p=1):
    """
    :param X: train data
    :param y: encoded labels of train data
    :param n: value for n-fold cross validation
    :param k: k value  for knn
    :param p: p value for l_p metric
    :return:
    """
    # BEGIN SOLUTION 
    s_x = np.split(X, n)
    s_y = np.split(y, n)

    accs = []

    for i in range(n):
        test_x, test_y = s_x[i], s_y[i]
        train_x, train_y = chain(*s_x[:i], *s_x[i+1:]), chain(*s_y[:i], *s_y[i+1:])
        Knn  = KNearestNeighbor(train_x, train_y)
        pred = Knn.predict(test_x,k,p)
        accs.append(Knn.accuracy(test_y,pred))

    #print(accs)
    print(f'Avarage accuracy => {np.mean(accs)}')
    return accs

    #END SOLUTION 


In [15]:
res = crossValidation(X_train, y_train_enc, n=len(X_train), k=5, p=1)


Avarage accuracy => 0.479978354978355


#### 6. Zadanie 3 (4pkt):

Napisz kod klasy <code>LogisticRegression</code> implementującej klasyfikator <i>wieloklasowej regresji logistycznej</i> z funkcją <code>softmax()</code> (ze standardowymi nazwami dwóch kluczowych funkcji: <i>fit()</i>, <i>predict()</i>). Zastosuj ten kod do pobranych danych (zbiór walidacyjny losujemy ze zbioru treningowego) - oblicz następujące charakterystyki modelu dla danych walidacyjnych oraz treningowych: dokładność (accuracy), precyzję (precision), czułość(recall) oraz F1 - dla poszczególnych klas oraz globalnie (zob. np. <a href="https://medium.com/synthesio-engineering/precision-accuracy-and-f1-score-for-multi-label-classification-34ac6bdfb404">tu</a>).


In [16]:
class LogisticRegression:
    def __init__(self, data_shape, classes_no) -> None:
        self.w = np.zeros(shape=(classes_no, data_shape[0], 1))
        self.b = np.zeros(shape=(classes_no, 1))
        print(f"self.w.shape = {self.w.shape}\nself.b.shape = {self.b.shape}")

    def sigmoid(self, z):
        return 1 / (1 + np.exp(-z)) - 0.0000001

    def propagate(self, w, b, X, Y):
        data_len = X.shape[1]
        
        A = self.sigmoid((np.dot(w.T, X) + b))
        
        cost = np.sum(((-np.log(A)) * Y + (-np.log(1 - A)) * (1 - Y))) / data_len
        
        cost = np.squeeze(cost)
        
        dw = (np.dot(X, (A - Y).T)) / data_len
        
        db = (np.sum(A - Y)) / data_len

        grads = {"dw": dw, "db": db}
        return grads, cost

    def train(self, X, Y, num_iterations, lr, verbose=False):
        error_vals = [[] for i in range(self.w.shape[0])]
        for i in range(num_iterations):
            if (i + 1) % 100 == 1 or i == num_iterations - 1:
                print("Iteration:", i)
            for ent_class in range(self.w.shape[0]):
                
                grads, cost = self.propagate(
                    self.w[ent_class], self.b[ent_class], X, Y[ent_class]
                )
                # print(f"\t{i}before w update")
                self.w[ent_class] -= lr * grads["dw"]
                # print("\tbefore b update")
                self.b[ent_class] -= lr * grads["db"]
                if (i + 1) % 100 == 1 or i == num_iterations - 1:
                    error_vals[ent_class].append(cost)
                    if verbose:
                        print(f"\tClass: {ent_class} → error: {cost}")
        return {
            "parameters": {"w": self.w, "b": self.b},
            "gradients": grads,
            "error_values": error_vals,
        }

    def predict(self, X):
        data_len = X.shape[1]
        
        A = np.array(
            [
                self.sigmoid(np.dot(current_w.reshape(X.shape[0], -1).T, X) + current_b)
                for current_w, current_b in zip(self.w, self.b)
            ]
        )
        Y_pred = [(class_preds > 0.5) * 1.0 for class_preds in A]  # A * 5.0
        return Y_pred

    def validate(self, test_data, true_labels):
        preds = self.predict(test_data)
        return f"Accuracy for the given data → {100 - np.mean(np.abs(preds - true_labels))*100}"


def prepare_data(x_data, y_data):
    x = x_data.reshape(x_data.shape[0], -1).T
    y = np.array([[int(x == i) for i in y_data] for x in range(len(set(y_data)))])
    return x, y


In [17]:
X_train_flat, y_train_enc_multi = prepare_data(X_train, y_train_enc)

#Logistic Regression training
log_reg = LogisticRegression(X_train_flat.shape, 5)
training_result = log_reg.train(
    X_train_flat, y_train_enc_multi, num_iterations=2000, lr=0.0029, verbose=True
)


self.w.shape = (5, 16384, 1)
self.b.shape = (5, 1)
Iteration: 0
	Class: 0 → error: 0.693147068438753
	Class: 1 → error: 0.6931470251487096
	Class: 2 → error: 0.6931470697374542
	Class: 3 → error: 0.6931470697374542
	Class: 4 → error: 0.6931470697374542
Iteration: 100
	Class: 0 → error: 0.5157444236726193
	Class: 1 → error: 0.28246828140335944
	Class: 2 → error: 0.5252240196030604
	Class: 3 → error: 0.5189321738772411
	Class: 4 → error: 0.5222111819329334
Iteration: 200
	Class: 0 → error: 0.5059211686007784
	Class: 1 → error: 0.2777127183186381
	Class: 2 → error: 0.5186898143236112
	Class: 3 → error: 0.5142815434224718
	Class: 4 → error: 0.5181261463700836
Iteration: 300
	Class: 0 → error: 0.4972898240996385
	Class: 1 → error: 0.2749221120401529
	Class: 2 → error: 0.5128841297366997
	Class: 3 → error: 0.5102354194969786
	Class: 4 → error: 0.5146483236050161
Iteration: 400
	Class: 0 → error: 0.4895375243174669
	Class: 1 → error: 0.2729949006339826
	Class: 2 → error: 0.5075623954318662
	C

In [18]:
X_test_flat, y_test_enc_multi = prepare_data(X_test, y_test_enc)

preds = log_reg.predict(X_test_flat)
size = np.size(preds - y_test_enc_multi)
fp = np.sum(np.abs(preds - y_test_enc_multi))
tp = size - fp
fn = np.count_nonzero((preds - y_test_enc_multi)>0)

print(f'Accuracy for the given data → {100 - np.mean(np.abs(preds - y_test_enc_multi))*100}%')
print(f'Precision for the given data → { tp/(tp+fp) }')
print(f'Recall for the given data → { tp/(tp+fn) }')
print(f'F1 score for the given data → { 2*tp / (2*tp + fp + fn) }')



#print(preds)

Accuracy for the given data → 79.58301158301158%
Precision for the given data → 0.7958301158301159
Recall for the given data → 0.9930622470610908
F1 score for the given data → 0.8835733882030178


#### 7. Zadanie 4 (1pkt):

Oblicz ile danych z poszczególnych klas znajduje się po dodatniej/ujemnej stronie hiperpłaszczyzny klasyfikacyjnej dla danej klasy


In [19]:
# BEGIN SOLUTION (zad. 4)

classes_ids = {
    'Beech': 0,
    'Gardenia': 1,
    'Lemon': 2,
    'Mean': 3,
    'Tomato': 4
}


for k, v in classes_ids.items():
    print(f'{k} → {np.sum(np.abs(preds[v] - y_test_enc_multi[v]))} / {y_test.count(k)}')

# END SOLUTION


Beech → 44.0 / 51
Gardenia → 52.0 / 52
Lemon → 52.0 / 52
Mean → 52.0 / 52
Tomato → 54.0 / 52


#### 8. Dwa uzupełnienia

W pliku [lab1_add](lab1_add.ipynb) znajdują się minimalne podstawy teoretyczne związane z konstrukcją funkcji kosztu oraz algorytmami optymalizacji.
