In [213]:
from fastai.vision.all import *
from fastbook import *
import pickle

In [214]:
torch.cuda.is_available()

True

## Image loading

In [215]:
path = untar_data(URLs.MNIST)

In [216]:
def loadImages(paths):
    images = [Image.open(p) for p in paths]
    classes = [p.parent.name for p in paths]
    return (images, classes)

In [217]:
loadImages([
    path/"training"/"5"/"0.png",
    path/"training"/"9"/"10003.png",
    path/"testing"/"4"/"1059.png"
])

([<PIL.PngImagePlugin.PngImageFile image mode=L size=28x28>,
  <PIL.PngImagePlugin.PngImageFile image mode=L size=28x28>,
  <PIL.PngImagePlugin.PngImageFile image mode=L size=28x28>],
 ['5', '9', '4'])

In [218]:
import glob

def allImages(path):
    return list(path.glob("**/*.png"))

testImages = allImages(path)
assert len(testImages) > 0
for image in testImages: assert image.name.endswith(".png") 
len(testImages), type(testImages[0])

(70000, pathlib.WindowsPath)

## Model

In [219]:
class Model:
    def __init__(self, imageSize, categoryCount):
        self.learningEnabled = True
        hiddenSize = imageSize[0] * imageSize[1]
        self.w1 = self.__initParams(imageSize[0] * imageSize[1], hiddenSize)
        self.b1 = self.__initParams(hiddenSize)
        
        self.w2 = self.__initParams(hiddenSize, categoryCount)
        self.b2 = self.__initParams(categoryCount)
        
        self.params = [self.w1, self.b1, self.w2, self.b2]
        
    def applyModel(self, batch):
        hiddenLayer1 = F.relu(batch@self.w1 + self.b1)
        return hiddenLayer1@self.w2 + self.b2
    
    def fit(self, lr):
        if not self.learningEnabled:
            raise Exception("Learning is diabled")
            
        for p in self.params:
            p.data -= p.grad.data * lr
            p.grad.zero_()
    
    def __initParams(self, *size):
        return (torch.rand(size) * 0.01).requires_grad_()
    
    def disableLearning(self):
        self.learningEnabled = False
        for p in self.params:
            p.requires_grad = False

## Learning preparation

### Loss function

In [220]:
def mse(predictedProbabilities, targetProbabilities):
    return torch.square(targetProbabilities - predictedProbabilities).mean()

In [221]:
lossFunction = nn.BCELoss()

In [222]:
test1 = lossFunction(
    tensor([
        [0.7, 0.4, 0.9], # wrong prediction
    ]),
    tensor([
        [0, 0, 1],
    ]).float()
)
test2 = lossFunction(
    tensor([
        [0.6, 0.3, 0.9], # less wrong prediction
    ]),
    tensor([
        [0, 0, 1],
    ]).float()  
)
test1, test2

(tensor(0.6067), tensor(0.4594))

In [223]:
def calculateAccuracy(targetPredictions, predictions):
    border = 0.5
    return ((predictions > border) == (targetPredictions > border)).all(dim=1).float().mean()

calculateAccuracy(
    tensor([
        [0, 0, 1],
        [0, 1, 0],
        [1, 0, 0]
    ]),
    tensor([
        [0.7, 0.4, 0.9], # wrong prediction
        [0.2, 0.61, 0.48], # correct prediction
        [0.2, 0.1, 0.3] # wrong prediction
    ])
)

tensor(0.3333)

In [224]:
class BatchLoader:
    def __init__(self, images, batchSize):
        self.images = images.copy()
        random.shuffle(self.images)
        self.batchSize = batchSize
        self.nextBatch = 0 
    def nextBatch(self):
        batchStartIndex = min(self.batchSize * self.nextBatch, len(self.images))
        batchEndIndex = min(startBatchFrom + self.batchSize, len(self.images))
        if batchStartIndex == batchEndIndex:
            return None
        else:
            return self.images[batchStartIndex:batchEndIndex]

In [225]:
class BatchLoader:
    def __init__(self, items, batchSize):
        self.items = items.copy()
        random.shuffle(self.items)
        self.batchSize = batchSize
        self.nextBatch = 0 
    def getNextBatch(self):
        batchStartIndex = min(self.batchSize * self.nextBatch, len(self.items))
        batchEndIndex = min(batchStartIndex + self.batchSize, len(self.items))
        if batchStartIndex == batchEndIndex:
            return None
        else:
            self.nextBatch += 1
            return self.items[batchStartIndex:batchEndIndex]
       
    
testItems = [1, 2, 3, 4, 5, 6, 7]
testLoader = BatchLoader(testItems, 4)
receivedItems = testLoader.getNextBatch() + testLoader.getNextBatch()
receivedItems.sort()
assert testItems == receivedItems, "received aren't the same as tests " + receivedItems
assert testLoader.getNextBatch() == None

In [226]:
def loadBatchOfImages(paths):
    images = [tensor(Image.open(p)).view(-1) for p in paths]
    return torch.stack(images).float()/255

In [227]:
def loadBatch(paths):
    images = loadBatchOfImages(paths)
    classes = [p.parent.name for p in paths]
    return (images, classes)
    
testBatchLoader = BatchLoader(allImages(path/"training"), 100)
trainingImages, trainingClasses = loadBatch(testBatchLoader.getNextBatch())
assert len(trainingImages) == len(trainingClasses)

In [228]:
def numberToPrediction(number):
    return [1.0 if i == number else 0.0 for i in range(10)]

def targetPredictions(classes):
    return tensor([numberToPrediction(int(c)) for c in classes])

targetPredictions(["0", "5", "9"])

tensor([[1., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 1., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., 1.]])

### The learning loop

In [229]:
def validateOneBatch(model, batch):
    images, classes = loadBatch(batch)
    predictions = model.applyModel(images).sigmoid()
    #print("predictions")
    #print(predictions)
    target = targetPredictions(classes)
    #print("targets")
    #print(target)
    return calculateAccuracy(target, predictions)

validateOneBatch(Model((28,28), 10), allImages(path/"testing/4")[:10])

tensor(0.)

In [230]:
def validateEpoach(model, validationSet, batchSize):
    batchLoader = BatchLoader(validationSet, batchSize)
    batch = batchLoader.getNextBatch()
    accuracies = []
    while batch != None:
        accuracies.append(validateOneBatch(model, batch))
        batch = batchLoader.getNextBatch()
    return round(torch.stack(accuracies).mean().item(), 4)

validateEpoach(Model((28,28), 10), allImages(path/"testing"), 10000)

0.0

In [231]:
def fitOneBatch(model, batch, lr):
    traingingImages, trainingClasses = loadBatch(batch)
    predictions = model.applyModel(traingingImages).sigmoid()
    #print("predictions " + str(predictions))
    target = targetPredictions(trainingClasses)
    #print("target " + str(target))
    criterion = nn.BCEWithLogitsLoss()
    loss = criterion(predictions,target)
    loss.backward()
    #print("Grad w1 " + str(model.w1.grad.mean()) + ", b1 " + str(model.b1.grad.mean()))
    model.fit(lr)
    print("Loss: " + str(loss.item()))

fitOneBatch(Model((28, 28), 10), allImages(path/"training/6")[:10], 1.)

Loss: 1.1406452655792236


In [232]:
def trainEpoach(model, trainingSet, validationSet,  batchSize, lr):
    batchLoader = BatchLoader(trainingSet, batchSize)
    batch = batchLoader.getNextBatch()
    while batch != None:
        fitOneBatch(model, batch, lr)
        batch = batchLoader.getNextBatch()
    accuracy = validateEpoach(model, validationSet, batchSize)
    print("Accuracy " + str(accuracy))
    
model = Model((28, 28), 10)    
trainEpoach(model, allImages(path/"training"), allImages(path/"testing"), 10000, 1.)

Loss: 1.1336984634399414
Loss: 1.0321985483169556
Loss: 0.7815778851509094
Loss: 0.726283848285675
Loss: 0.7160937786102295
Loss: 0.7106762528419495
Accuracy 0.0


In [None]:
model = Model((28, 28), 10)
for e in range(1, 4):
    trainEpoach(model, allImages(path/"training"), allImages(path/"testing"), 10000, 1. / e)

Loss: 1.1338011026382446
Loss: 1.033186435699463
Loss: 0.7807151675224304
Loss: 0.7266725897789001
Loss: 0.7160493731498718
Loss: 0.7106031179428101
Accuracy 0.0
Loss: 0.7076739072799683
Loss: 0.7065567970275879
Loss: 0.7052990794181824
Loss: 0.7046921849250793
Loss: 0.7039493918418884
Loss: 0.7034403085708618
Accuracy 0.0
Loss: 0.702660322189331
Loss: 0.7022919654846191
Loss: 0.7021487355232239


In [None]:
model.disableLearning()
with open('model.pkl', 'wb') as output:
    pickle.dump(model, output, pickle.HIGHEST_PROTOCOL)

In [None]:
def loadModel(path):
    with open(path, 'rb') as file:
        return pickle.load(file)

In [None]:
testModel = loadModel('model.pkl')
testClassificationResult = testModel.applyModel(loadBatchOfImages([path/"testing"/"4"/"1059.png"])).sigmoid()
testMSE = mse(targetPredictions(["4"]), testClassificationResult)
testClassificationResult, testMSE

In [None]:
validateOneBatch(testModel, allImages(path/"testing"/"4")[:4])

In [None]:
import nbformat
import ipynbname

notebook_path = ipynbname.path() 
output_file = "model.py"

# Load the notebook
with open(notebook_path, "r", encoding="utf-8") as f:
    notebook = nbformat.read(f, as_version=4)

# Extract code from cells with the "export" tag
exported_code = []
for cell in notebook.cells:
    if cell.cell_type == "code" and "tags" in cell.metadata:
        if "export" in cell.metadata.tags:
            exported_code.append(cell.source)

# Write the extracted code to a .py file
with open(output_file, "w", encoding="utf-8") as f:
    f.write("\n\n".join(exported_code))

print(f"Exported {len(exported_code)} functions to {output_file}")
