Regresja logistyczna napisana od zera, tylko przy użyciu pakietu numpy.
Algorytm SGD z momentum i regularyzacją.

Zbiór MNIST dostępny jest pod linkami: 

(zbiór treningowy):
 - http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
 - http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz

(zbiór walidacyjny):
 - http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
 - http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz



In [27]:
import numpy as np

class Model:
    
  def tune(self, weights, alpha=0.05, beta = 0.98, lamb = 0.9) -> None:
    '''Wagi modelu to wektor przechowujący wszystkie współczynniki odnoszące się do każdego 
    piksela plus stałą.
    '''
    self.weights = weights 
    self.alpha = alpha
    self.beta = beta
    self.lamb = lamb
    self.primes = [2,3,5,7]
    
  @staticmethod
  def _sigmoid(s):
    '''Metoda odwzorowująca sigmoid'''
    s = np.exp(-s)
    return 1.0 / (1.0 + s)

  def predict(self, X: np.ndarray) -> np.ndarray:
    '''Metoda zwracająca przewidywanie modelu'''
    x_vec = np.array(np.insert(X, 0, 1))
    return self._sigmoid(np.dot(self.weights, x_vec))

  def _cost_per_point(self, X: np.ndarray, y: np.ndarray):
    '''Metoda zwracająca koszt regresji liniowej dla pojedynczego punktu'''
    y_pred = self.predict(X)
    # Sprawdzane jest, czy etykietka należy do cyfr pierwszych, jeżeli tak
    # prawdopodobieństwo 1
    y_true = 1.0 if y in self.primes else 0.0
    cost_per_point = y_true * np.log(y_pred) + (1.0 - y_true) * np.log(1 - y_pred)
    return round(cost_per_point)

  def _total_cost(self, X: np.ndarray, y: np.ndarray):
    '''Metoda sumująca koszt regresji zbioru'''
    cost = 0.0
    # Dodawany jest koszt dla każdego punktu
    for img, label in zip(X, y):
        cost+= self._cost_per_point(img, label)
    # Dodatkowo jako część regularyzacji dodawany jest kwadrat wag
    cost -= (self.lamb*np.dot(np.transpose(self.weights), self.weights))/(2*self.weights.size)
    return cost

  def _shuffle_datasets(self, images: np.ndarray, labels: np.ndarray, proportion = 0.8, random_seed:int = -1):
    '''Metoda do ponownego przetasowania danych i podzielenia ich na zbiory - w tym wypadku treningowy 
    i walidacyjny. Do trenowania modelu postanowiłam wykorzystać metodę walidacji krzyżowej.
    '''
    if(random_seed<0):
        np.random.seed()
    else:
        np.random.seed(random_seed)
    # Aby nie pomieszać obrazków i etykietek, najpierw przetasowuję indeksy
    total = np.arange(labels.size,dtype=np.int64)
    np.random.shuffle(total)
    # Puste tablice na przetasowane etykietki i obrazki
    img = np.empty(images.size,dtype=np.object)
    lab = np.empty(images.size,dtype=np.int64)
    for i in range(0, labels.size):
        # Do tablic na obrazki i etykietki trafiają obiekty o nowym indeksie
        img[i] = images[total[i]]
        lab[i] = labels[total[i]]
    # Podział obrazków i etykietek na treningowe i testowe
    split = [round(0.8*total.size)]
    img = np.array_split(img, split, axis = 0)
    lab = np.array_split(lab, split)
    return img[0], img[1], lab[0], lab[1]
    
  def fit(self, X: np.ndarray, y: np.ndarray, epochs:int = 3, after:int = 3, verbose:bool = True) -> None:
    '''Metoda odpowiadająca za trenowanie modelu. Podczas przechodzenia w losowej 
    kolejności przez elementy zbioru treningowego szacowany jest koszt przejścia 
    przez zbiór treningowy (do późniejszego porównania z poprzednią iteracją i
    przerwania, gdyby koszt zaczął rosnąć). W podobnych celach wykorzystywany jest
    koszt na zbiorze walidacyjnym, obliczany co pełne przejście przez zbiór.
    '''
    previous_compute_cost = self._total_cost(X, y)
    best_weights = self.weights
    # Ze względu na losową naturę SGD, algorytm powtarzany jest epochs razy.
    # Na końcu zapisywane są wagi, które pozwoliły na otrzymanie najmniejszego
    # całkowitego kosztu.
    
    for e in range(0, epochs):
        # przetasowanie zbioru, podział na testowy i walidacyjny
        train, val, label_train, label_val =\
        self._shuffle_datasets(X, y, random_seed = int(abs(previous_compute_cost)))
        random_inputs = list(zip(train, label_train))
        # obliczenie całkowitego kosztu na zbiorze walidacyjnym
        # i treningowym, dla porównania później
        previous_val_loss = self._total_cost(val, label_val)
        previous_train_cost = self._total_cost(train, label_train)
        
        cost_index = 0
        val_index = 0
        momentum = 0
            
        weights_of_min = self.weights
        loss_of_min = previous_val_loss
            
        # iteracja SGD po zbiorze następuje tak długo, aż przez ilość
        # iteracji ustalonych w after koszt treningowy nie będzie maleć,
        # albo koszt na zbiorze walidacyjnym będzie większy od najmniejszego
        # (oznacza to, ze osiągnęliśmy minimum i cza przerwać funkcję)
        while(cost_index<after and val_index <after):
            train_cost = 0
                
            for i in range(train.size):
                x_vec = np.array(np.insert(random_inputs[i][0], 0, 1))
                y_pred = self.predict(random_inputs[i][0])
                y_true = 1.0 if random_inputs[i][1] in self.primes else 0.0
                # obliczany jest gradient dla każdego punktu
                gradient = (y_pred - y_true) * x_vec
                # obliczane jest momentum dla kolejnego punktu
                momentum = self.beta * momentum + (1-self.beta)* gradient
                # dodawana jest kolejna wartość do kosztu treningowego
                train_cost += self._cost_per_point(random_inputs[i][0], random_inputs[i][1])
                # wagi aktualizowane są zgodnie z obliczonym gradientem i momentum
                # oraz dodaną regularyzacją
                self.weights -= self.alpha * (momentum + self.lamb*self.weights/self.weights.size)
                
            # obliczanie kosztu zbioru walidacji
            val_loss = self._total_cost(val, label_val)
            # koszt zbioru treningowego aktualizowany jest o regularyzację
            train_cost -= (self.lamb*np.dot(np.transpose(self.weights), self.weights))/(2*self.weights.size)
             
            # porównanie kosztu treningowego obecnego i poprzedniego
            # ponieważ koszt zawsze jest ujemny, szukany jest koszt o mniejszej wartości
            # bezwzględnej
            previous_train_cost = np.around(previous_train_cost, decimals = 6)
            train_cost = np.around(train_cost, decimals = 6)
            
            cost_index = cost_index + 1 if(previous_train_cost>=train_cost) else 0
                  
            # podobnie koszt zbioru walidacyjnego     
            if(np.around(loss_of_min, decimals = 6)>= np.around(val_loss, decimals = 6)):
                val_index += 1
            else:
                # przechowywane są wagi, dla których koszt 
                # zbioru walidacyjnego był najkorzystniejszy
                val_index = 0
                weights_of_min = self.weights
                loss_of_min = val_loss
                    
            previous_train_cost = train_cost
            previous_val_loss = val_loss
            if(verbose):
                print(f'epoch: {e}')
                print(f'training cost: {train_cost}')
                print(f'validation loss: {val_loss}')
                print(f'training index: {cost_index}')
                print(f'validation index: {val_index}')
                print('-----------------------------------')
                
        self.weights = weights_of_min
        compute_cost = self._total_cost(X, y)
        # jeżeli wagi osiągnięte w tej epoce zwracają lepszy koszt od poprzednich
        # zapisz je jako najlepsze
        if(previous_compute_cost < compute_cost):
            best_params = self.weights
        previous_compute_cost = compute_cost
        self.weights = np.zeros(len(train[0]) + 1)
        
    # Uaktualnij wagi do najlepszych
    self.weights = best_weights

  @staticmethod
  def evaluate(y_true: np.ndarray, y_pred: np.ndarray) -> float:
    # Oblicz skuteczność
    total = 0.0
    correct = 0.0
    for true, pred in zip(y_true, y_pred):
        total += 1   
        if true >= 0.5 and pred == 1:
            # Właściwa klasyfikacji cyfry pierwszej
            correct += 1
                
        if true < 0.5 and pred != 1:
            # Właściwa klasyfikacja cyfry złożonej
            correct += 1
    accuracy = correct / total
    return accuracy


In [24]:
'''Pliki z danymi zostały uprzednio rozpakowane w tym samym katalogu, w którym znajduje się notebook'''
def read_labels(filepath: str) -> np.ndarray:
    '''Metoda odczytująca etykiety obrazków z pliku zgodnie ze specyfikacją ze strony MNIST'''
    try:
        f = open(filepath, 'rb')
        # pierwsze cztery bajty to magiczna liczba
        f.read(4)
        if f.mode == 'rb':
            # kolejne cztery zawierają informację o ilości etykiet
            buf = f.read(4)
            number_of_labels = int.from_bytes(buf,'big')
            result = np.empty(number_of_labels)
            for i in range(0,number_of_labels):
                # odczytywanie etykiet bajt po bajcie
                buf = f.read(1)
                label = np.frombuffer(buf, dtype=np.uint8).astype(np.int64)
                result[i] = label
            return result
    except:
        print("MNIST files not in directory/unable to read")

In [3]:
def read_images(filepath: str) -> np.ndarray:
    '''Metoda odczytująca  z pliku zgodnie ze specyfikacją ze strony MNIST'''
    try:
        f = open(filepath, 'rb')
        # pierwsze cztery bajty to, znowu, magiczna liczba
        f.read(4)
        if f.mode == 'rb':
            # kolejne zawierają liczbę zapisanych obrazów
            buf = f.read(4)
            number_of_images = int.from_bytes(buf,'big')
            result = np.zeros(number_of_images, dtype ='object')
            # ilość rzędów w pojedynczym obrazie
            buf = f.read(4)
            number_of_rows = int.from_bytes(buf,'big')
            # ilość kolumn w pojedynczym obrazie
            buf = f.read(4)
            number_of_columns = int.from_bytes(buf,'big')
            for i in range(0, number_of_images):
                # ze względu na wygodę późniejszych obliczeń zdecydowałam się 
                # zapisać obrazy w postaci wektora o długości 28*28 i szerokości 1
                result[i] = np.zeros([number_of_rows*number_of_columns], dtype='float')
                for j in range(0, number_of_rows*number_of_columns):
                    buf = f.read(1)
                    # Ze względu na wygodę późniejszych obliczeń piksele są normalizowane
                    # ze skali 0-256 do 0-1
                    pixel = np.frombuffer(buf, dtype=np.uint8).astype(np.float64)/256
                    if(pixel!=0): result[i][j] = pixel
            return result
    except:
        print("MNIST files not in directory/unable to read")

In [4]:
def delete_elements(images: np.ndarray, labels: np.ndarray, elements: np.ndarray) -> (np.ndarray, np.ndarray):
    '''Metoda usuwająca niepasujące wartości (tutaj 0 i 1)'''
    to_delete = []
    for i in range(0, len(labels)):
        # Sprawdź, czy element należy do do usunięcia 
        # Jeżeli tak, dodaj indeks do listy
        if(labels[i] in elements):
            to_delete.append(i)
    # Usunięcie niepotrzebnych elementów
    images = np.delete(images, to_delete, axis = 0)
    labels = np.delete(labels, to_delete)
    return images, labels

In [6]:
# Odczytanie informacji z pliku
images = np.concatenate((read_images("train-images.idx3-ubyte"), read_images("t10k-images.idx3-ubyte")))
labels = np.concatenate((read_labels("train-labels.idx1-ubyte"), read_labels("t10k-labels.idx1-ubyte")))
# Usunięcie obrazków oznaczonych wybranymi etykietami (tu 0 i 1)
images, labels = delete_elements(images, labels, [0,1])

In [7]:
# Przetasowanie zbiorów (aby uniknąć redundancji, używam metody przetasowywania z modelu)
primes_train, primes_test, primes_label_train, primes_label_test = Model()._shuffle_datasets(images, labels)

In [28]:
'''Wyznaczenie odpowiednich parametrów. Aby uniknąć nadmiernego dopasowania modelu i 
eksplodujących gradientów, postanowiłam dodać do algorytmu regularyzację
'''
# Wartość kroku gradientu w SGD
alpha = 5e-3
# Wartość parametru beta, używanego w momentum
beta = 0.99
# Wartość parametru lambda, koniecznego do regularyzacji 
lamb = 0.9
# Startowe wartości wag. Ponieważ używamy znormalizowanych 
# wartości pikseli, wygodnie jest zacząć od wag równych 0
start_weights = np.zeros(len(primes_train[0]) + 1)

primes_model = Model()
primes_model.tune(start_weights, alpha = alpha, beta = beta, lamb = lamb)
primes_model.fit(primes_train, primes_label_train)

epoch: 0
training cost: -7394.00805
validation loss: -1628.0080497061883
training index: 0
validation index: 0
-----------------------------------
epoch: 0
training cost: -6726.010683
validation loss: -1597.010682783094
training index: 0
validation index: 0
-----------------------------------
epoch: 0
training cost: -6635.01217
validation loss: -1578.0121699951098
training index: 0
validation index: 0
-----------------------------------
epoch: 0
training cost: -6598.013084
validation loss: -1576.0130841821317
training index: 0
validation index: 0
-----------------------------------
epoch: 0
training cost: -6587.013678
validation loss: -1577.0136776975824
training index: 0
validation index: 1
-----------------------------------
epoch: 0
training cost: -6569.014077
validation loss: -1569.0140772913921
training index: 0
validation index: 0
-----------------------------------
epoch: 0
training cost: -6561.014353
validation loss: -1565.0143531188305
training index: 0
validation index: 0
---

In [29]:
pred = []
test = []
for image , label in zip(primes_test, primes_label_test):
    pred.append(primes_model.predict(image))
    # Przygotowanie etykietek do porównania
    right_label = 1.0 if label in [2,3,5,7] else 0.0
    test.append(right_label)
primes_accuracy = primes_model.evaluate(np.array(pred), np.array(test))
print(f'Accuracy of predicting a prime digit in test set: {primes_accuracy}')

Accuracy of predicting a prime digit in test set: 0.9154291923216226


'''Wnioski: 
- Podczas trenowania modelu osiagnięto skuteczność około 91.15%,
- Próba skorzystania z modelu zbiorowego pozwoliła na podniesienie skuteczności do 92.21%, jednak znacznie wydłużyła czas trenowania i ciężar modelu,
- Nasuwa się także pytanie dotyczące tego, jak mały błąd możemy osiagnąć w wyznaczonym problemie. Jeżeli nasz model zastosowany do wyznaczenia pojedynczej cyfry potrafi ją rozpoznać ze skutecznością x, czy skuteczność modelu tego samego typu rozpoznającego jedną z n cyfr możemy maksymalnie oszacować na x^n,
- Dodanie regularyzacji okazało się korzystną decyzją. Wydłużyło czas trenowania modelu, ale pozwoliło na uniknięcie problemu eksplodujących wag (nagle pojawiających się wartości NaN).
'''

In [None]:
'''Bibliografia:
- https://distill.pub/2017/momentum/
- https://web.stanford.edu/~jurafsky/slp3/5.pdf
- https://cilvr.cs.nyu.edu/diglib/lsml/bottou-sgd-tricks-2012.pdf
'''