In [None]:
# model definition and training
import chainer
from PIL import Image
import numpy as np
import argparse
import glob
from time import strftime
import chainer
import chainer.functions as F
import chainer.links as L
from chainer.backends import cuda
from sklearn.metrics import mean_squared_error

import matplotlib.pyplot as plt
from chainer import training
from chainer.training import extensions

class WLClassifier(chainer.Chain):
    def __init__(self, predictor):
        super(WLClassifier, self).__init__()
        with self.init_scope():
            self.predictor = predictor
    
    def __call__(self, *args):
        x = args[0]
        t = args[-1]
        self.y = None
        self.loss = None
        self.accuracy = None
        self.y = self.predictor(x)
        self.loss = self.lossfun(self.y, t)
        chainer.report({'loss': self.loss}, self)
        #print(np.sqrt(mean_squared_error(t[0][0], self.y.data[0][0])))
        return self.loss
        
    def lossfun(self, y, t):
        return F.sigmoid_cross_entropy(y,t)

class MLP(chainer.Chain):

    def __init__(self, n_units1, n_units2):
        super(MLP, self).__init__()
        with self.init_scope():
            initializer = chainer.initializers.HeNormal()
            self.l1 = L.Convolution2D(3, n_units1, 5, stride=1, pad=2, initialW=initializer)
            #self.l2 = L.Convolution2D(None, n_units2, 5, stride=1, pad=2, initialW=initializer)
            self.l3 = L.Convolution2D(None, 1, 1)
            
    def __call__(self, *args):

        #assert len(args) >= 2
        x = args[0]
        #t = args[-1]
        #t = x
        #self.y = None
        #self.loss = None
        #self.accuracy = None
        self.y = self.forward(x)
        #self.loss = self.lossfun(self.y, t)
        #chainer.report({'loss': self.loss}, self)
        #print(np.sqrt(mean_squared_error(t[0][0], self.y.data[0][0])))
        return self.y
 
    def forward(self, x):
        #print(x.data.shape)
        h1 = F.relu(self.l1(x))
        #h2 = F.relu(self.l2(h1))
        #print(h1.data.shape)
        h3 = self.l3(h1)
        return h3

    def lossfun(self, y, t):
        #return F.sigmoid_cross_entropy(y,t)
        return (y * (t - (y >= 0)) - F.log1p(F.exp(-F.abs(y))))
        #return chainer.functions.mean_squared_error(F.tanh(y), (-t*2+1).astype(np.float32))
        
   
imw = 320
imh = 240


model = MLP(4,2)
x = np.zeros((0, 3, imh, imw), dtype=np.float32)
t = np.zeros((0, 1, imh, imw), dtype=np.int32)
classifier = WLClassifier(model)

import os
jpgfiles = glob.glob('white-line2020/*.jpg')
print(jpgfiles)
for f in jpgfiles:
    img = Image.open(f).resize((imw, imh))
    plt.imshow(np.array(img))
    a = np.asarray(img).transpose(2,0,1).astype(np.float32)/255.
    a1 = np.expand_dims(a,axis=0)
    x = np.append(x, a1, axis=0)
    
    lfile = os.path.splitext(f)[0] + '_label.png'
    print(lfile)
    img = Image.open(lfile).resize((imw, imh))
    img = img.convert('L')
    a = np.asarray(img).astype(np.float32)/254.
    a = a.astype(np.int32)
    print(np.sum(a))
    a1 = np.expand_dims(a,axis=0)
    a1 = np.expand_dims(a1,axis=0)
    t = np.append(t, a1, axis=0)

optimizer = chainer.optimizers.Adam()
optimizer.setup(classifier)
model.to_gpu()

for rep in range(20):
    train = chainer.datasets.TupleDataset(x, t)
    train_iter = chainer.iterators.SerialIterator(train, 4)
    #test_iter = chainer.iterators.SerialIterator(test, 1, repeat=False, shuffle=False)

    # Set up a trainer
    updater = training.updaters.StandardUpdater(
        train_iter, optimizer, device=0)
    trainer = training.Trainer(updater, (500, 'epoch'), out='test1')

    # Evaluate the model with the test dataset for each epoch
    #trainer.extend(extensions.Evaluator(test_iter, model, device=args.gpu))

    # Dump a computational graph from 'loss' variable at the first iteration
    # The "main" refers to the target link of the "main" optimizer.
    # trainer.extend(extensions.dump_graph('main/loss'))

    # Write a log of evaluation statistics for each epoch
    trainer.extend(extensions.LogReport())
    trainer.extend(extensions.PrintReport(
        ['epoch', 'main/loss', 'validation/main/loss',
         'main/accuracy', 'validation/main/accuracy', 'elapsed_time']))

    # Print a progress bar to stdout
    #trainer.extend(extensions.ProgressBar())
    trainer.run()
    
    
chainer.serializers.save_npz('model_' + strftime("%Y%m%d_%H%M%S") + '.npz', model)

In [None]:
import numpy as np
def eval_image(fname, thre):
    model.to_cpu()
    testx = np.zeros((0, 3, imh, imw), dtype = np.float32)

    y2 = cuda.to_cpu(model.y.data)
    #plt.imshow(y2[0][0])
    img = Image.open(fname).convert('RGB').resize((imw,imh))
    a = np.asarray(img).transpose(2,0,1).astype(np.float32)/255.
    a1 = np.expand_dims(a,axis=0)
    #print(a1.shape)
    testx = np.append(testx, a1, axis=0)

    import time
    t0 = time.time()
    with chainer.using_config('train', False), \
            chainer.using_config('enable_backprop', False):
        testy = model.forward(testx)
    print('forward time [s]: ' + str(time.time()-t0))

    #fig, axs = plt.subplots(1,3)
    #plt.imshow(img, ax=axs[0])
    imd = Image.new('RGB', (imw*2, imh))
    imd.paste(img)
    thimg = (testy.data[0][0]> thre)
    thimg = thimg.astype(np.uint8) * 255

    thimg = Image.fromarray(thimg, 'L')
    plt.imshow(thimg)

    imd.paste(thimg, (imw, 0))
    plt.imshow(imd)
    return testy.data[0][0]



In [None]:
chainer.serializers.load_npz("model_20200107_070457.npz", model)
a = eval_image('white-line2020/000206.jpg', 0)