In [None]:
%run 'Text Recognition 1.ipynb'

In [None]:
import matplotlib.ticker as ticker
import seaborn
import string
import os
import sys

In [None]:
home_directory = os.path.expanduser('~')
nn_library_path = home_directory + '/Documents/HarveyMuddWork/Neural_Nets_Research/neural_nets_research/Neural Nets Library'
sys.path.append(nn_library_path)

In [None]:
from visualize import make_dot

In [None]:
resnet_type = ResidualNet(1, 32, 32, 3, 2)
resnet_digit = ResidualNet(1, 32, 32, 3, 10)
resnet_uppercase_char = ResidualNet(1, 32, 32, 3, 26)

resnet_type = nn.DataParallel(resnet_type.cuda())
resnet_digit = nn.DataParallel(resnet_digit.cuda())
resnet_uppercase_char = nn.DataParallel(resnet_uppercase_char.cuda())

In [None]:
def train_model_with_validation(model, train_loader, validation_loader, criterion, 
                                optimizer, lr_scheduler, num_epochs=20):
    since = time.time()

    best_model = model
    best_acc = 0.0

    for epoch in range(num_epochs):
        model.train(True)
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        optimizer = lr_scheduler(optimizer, epoch)

        running_loss = 0.0
        running_corrects = 0

        current_batch = 0
        # Iterate over data.
        for inputs, labels in train_loader:
            current_batch += 1

            # wrap them in Variable
            inputs, labels = Variable(inputs.cuda()), \
                             Variable(labels.cuda())

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward
            outputs = model(inputs)
            _, preds = torch.max(outputs.data, 1)
            loss = criterion(outputs, labels)

            # backward
            loss.backward()
            optimizer.step()

            # statistics
            running_loss += loss.data[0]
            running_corrects += torch.sum(preds == labels.data)

            if current_batch % 250 == 0:
                curr_acc = running_corrects / (current_batch * train_loader.batch_size)
                curr_loss = running_loss / (current_batch * train_loader.batch_size)
                time_elapsed = time.time() - since

                print('Epoch Number: {}, Batch Number: {}, Loss: {:.4f}, Acc: {:.4f}'.format(
                        epoch, current_batch, curr_loss, curr_acc))
                print('Time so far is {:.0f}m {:.0f}s'.format(
                      time_elapsed // 60, time_elapsed % 60))

                
        
        validation_acc = test_model(model, validation_loader)
        print('Epoch Number: {}, Validation Accuracy: {:.4f}'.format(epoch, validation_acc))

        # deep copy the model
        if validation_acc > best_acc:
            best_acc = validation_acc
            best_model = copy.deepcopy(model)

        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(
        time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))
    
    model.train(False)
    
    return best_model

In [None]:
def split_dataset(dset, batch_size=128, thread_count=4):
    sampler_dset_train = data.sampler.SubsetRandomSampler(list(range(int(0.7*len(dset)))))
    sampler_dset_test = data.sampler.SubsetRandomSampler(list(range(int(0.7*len(dset)), 
                                                                    int(0.85*len(dset)))))
    sampler_dset_validation = data.sampler.SubsetRandomSampler(list(range(int(0.85*len(dset)), 
                                                                          len(dset))))

    loader_dset_train = torch.utils.data.DataLoader(
        dset, batch_size=batch_size, num_workers=thread_count,
        pin_memory=True, sampler = sampler_dset_train)
    loader_dset_test = torch.utils.data.DataLoader(
        dset, batch_size=batch_size, num_workers=thread_count,
        pin_memory=True, sampler = sampler_dset_test)
    loader_dset_validation = torch.utils.data.DataLoader(
        dset, batch_size=batch_size, num_workers=thread_count,
        pin_memory=True, sampler = sampler_dset_validation)

    return loader_dset_train, loader_dset_test, loader_dset_validation

dset_type = AdvancedImageFolder('by_class', transform, 
                                target_transform = lambda n: 0 if n < 10 else 1, 
                                loader = image_loader,
                                filter_fn = lambda p: p[1] <= 35,
                                shuffle = True)
dset_digit = AdvancedImageFolder('by_class', transform, loader = image_loader,
                                 filter_fn = lambda p: p[1] < 10,
                                 shuffle = True)
dset_uppercase_char = AdvancedImageFolder('by_class', transform, target_transform = lambda n: n - 10, 
                                loader = image_loader, filter_fn = lambda p: p[1] >= 10 and p[1] <= 35,
                                shuffle = True)

loader_type_train, loader_type_test, loader_type_validation = split_dataset(dset_type, thread_count=8, batch_size=128)
loader_digit_train, loader_digit_test, loader_digit_validation = split_dataset(dset_digit, thread_count=8, batch_size=128)
loader_uppercase_char_train, loader_uppercase_char_test, loader_uppercase_char_validation = split_dataset(dset_uppercase_char, thread_count=8, batch_size=128)

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer_type = optim.SGD(resnet_type.parameters(), lr=0.001, momentum=0.9)
optimizer_digit = optim.SGD(resnet_digit.parameters(), lr=0.001, momentum=0.9)
optimizer_uppercase_char = optim.SGD(resnet_uppercase_char.parameters(), lr=0.001, momentum=0.9)

In [None]:
resnet_type = train_model_with_validation(resnet_type, loader_type_train, loader_type_validation, criterion, 
                                          optimizer_type, exp_lr_scheduler)

In [None]:
print(test_model(resnet_type, loader_type_test))

In [None]:
resnet_uppercase_char = train_model_with_validation(resnet_uppercase_char, loader_uppercase_char_train, 
                                                    loader_uppercase_char_validation, criterion, 
                                                    optimizer_uppercase_char, exp_lr_scheduler)

In [None]:
print(test_model(resnet_uppercase_char, loader_uppercase_char_test))

In [None]:
resnet_digit = train_model_with_validation(resnet_digit, loader_digit_train, loader_digit_validation, criterion, 
                                           optimizer_digit, exp_lr_scheduler)

In [None]:
print(test_model(resnet_digit, loader_digit_test))

In [None]:
def categoryFromOutput(output):
    _, category_i = output.data.max(1) # Tensor out of Variable with .data
    return category_i

def categoryAndProb(output):
    top_value, category_i = output.data.max(1) # Tensor out of Variable with .data
    return top_value, category_i

def confusion_matrix(model, data_loader, all_categories):
    n_categories = len(all_categories)
    # Keep track of correct guesses in a confusion matrix
    confusion = torch.zeros(n_categories, n_categories)
   
    for inputs, labels in data_loader:
        inputs = Variable(inputs.cuda())
        output = model(inputs)
        
        guesses = categoryFromOutput(output)
        
        for category_i, guess_i in zip(labels, guesses):
            confusion[category_i][guess_i] += 1

    # Normalize by dividing every row by its sum
    for i in range(n_categories):
        confusion[i] = confusion[i] / confusion[i].sum()

    # Set up plot
    fig = plt.figure(figsize = (16,16), dpi = 160)
    ax = fig.add_subplot(111)
    print(confusion)
    cax = ax.matshow(confusion.numpy())
    fig.colorbar(cax)

    # Set up axes
    ax.set_xticklabels([''] + all_categories, rotation=90)
    ax.set_yticklabels([''] + all_categories)

    # Force label at every tick
    ax.xaxis.set_major_locator(ticker.MultipleLocator(1))
    ax.yaxis.set_major_locator(ticker.MultipleLocator(1))

    # sphinx_gallery_thumbnail_number = 2
    plt.show()
    
def accuracy_for_each_category(model, data_loader, all_categories):
    n_categories = len(all_categories)
    
    # Keep track of guesses
    number_of_corrects = torch.zeros(n_categories)
    number_of_appearances = torch.zeros(n_categories)
    
    for inputs, labels in data_loader:
        inputs = Variable(inputs.cuda())
        output = model(inputs)
        
        guesses = categoryFromOutput(output)
        
        for category_i, guess_i in zip(labels, guesses):
            if category_i == guess_i:
                number_of_corrects[category_i] += 1
            
            number_of_appearances[category_i] += 1
        
    accuracies = number_of_corrects / number_of_appearances
    
    return dict(zip(all_categories, accuracies))

In [None]:
characters = list(string.ascii_uppercase)
digits = list(range(10))
all_symbols = digits + characters

In [None]:
confusion_matrix(resnet_type, loader_type_test, ['digit', 'character'])

In [None]:
confusion_matrix(resnet_uppercase_char, loader_uppercase_char_test,
                 characters)

In [None]:
confusion_matrix(resnet_digit, loader_digit_test, digits)

In [None]:
dset_all = AdvancedImageFolder('by_class', transform, 
                                loader = image_loader,
                                filter_fn = lambda p: p[1] <= 35,
                                shuffle = True)

loader_all_train, loader_all_test, loader_all_validation = split_dataset(dset_all, thread_count=8)

In [None]:
resnet_all = ResidualNet(1, 32, 32, 5, 36)
resnet_all = nn.DataParallel(resnet_all.cuda())

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer_all = optim.SGD(resnet_all.parameters(), lr=0.001, momentum=0.9)

In [None]:
resnet_all = train_model_with_validation(resnet_all, loader_all_train, loader_all_validation, criterion, 
                                         optimizer_all, exp_lr_scheduler)

In [None]:
print(test_model(resnet_all, loader_all_test))

In [None]:
confusion_matrix(resnet_all, loader_all_test, all_symbols)

In [None]:
class HandwritingClassifier(nn.Module):
    def __init__(self, char_model, digit_model, type_model, with_joint = False):
        super(HandwritingClassifier, self).__init__()
        self.char_model = char_model
        self.digit_model = digit_model
        self.type_model = type_model
        self.with_joint = with_joint
    
    def forward(self, x):
        type_values = F.log_softmax(self.type_model(x))
        char_values = F.log_softmax(self.char_model(x)) 
        digit_values = F.log_softmax(self.digit_model(x))
        
        if self.with_joint:
            log_prob_digits, log_prob_chars = type_values[:, 0].unsqueeze(1), type_values[:, 1].unsqueeze(1)
            
            digit_log_probs = digit_values + log_prob_digits
            char_log_probs = char_values + log_prob_chars
        else:
            _, top_types = type_values.max(1)
            top_types = top_types.unsqueeze(1).float()
                        
            digit_log_probs = digit_values * (top_types - 1)
            char_log_probs = char_values * top_types

        return torch.cat((digit_log_probs, char_log_probs), dim=1)

In [None]:
def test_handwriting_classifiers(char_model, digit_model, type_model, dset_loader, with_joint = False):
    char_model.train(False)
    digit_model.train(False)
    type_model.train(False)
    
    running_corrects = 0
    
    for inputs, labels in dset_loader:
        inputs, labels = Variable(inputs.cuda()), \
                         Variable(labels.cuda())
        types = type_model(inputs)
            
        if with_joint:
            char_values = F.softmax(char_model(inputs)) 
            digit_values = F.softmax(digit_model(inputs))
            type_values = F.softmax(types)
            
            prob_digits, prob_chars = type_values[:, 0].unsqueeze(1), type_values[:, 1].unsqueeze(1)
            
            char_probs = char_values * prob_chars
            digit_probs = digit_values * prob_digits

            top_char_prob, top_char_index = categoryAndProb(char_probs)
            top_digit_prob, top_digit_index = categoryAndProb(digit_probs)
        
            possible_chars = top_char_prob >= top_digit_prob
            possible_digits = top_digit_prob > top_char_prob
            
            char_guesses, label_chars = top_char_index[possible_chars], labels[possible_chars] - 10
            digit_guesses, label_digits = top_digit_index[possible_digits], labels[possible_digits]
            
            running_corrects += torch.sum(char_guesses == label_chars.data)
            running_corrects += torch.sum(digit_guesses == label_digits.data)
        else:
            top_types = categoryFromOutput(types)
            
            possible_chars = top_types.nonzero().squeeze()
            possible_digits = (top_types - 1).nonzero().squeeze()
            
            input_chars, label_chars = inputs[possible_chars], labels[possible_chars] - 10
            input_digits, label_digits = inputs[possible_digits], labels[possible_digits]

            char_guesses = categoryFromOutput(char_model(input_chars))
            digit_guesses = categoryFromOutput(digit_model(input_digits))
            
            running_corrects += torch.sum(char_guesses == label_chars.data)
            running_corrects += torch.sum(digit_guesses == label_digits.data)
    
    return running_corrects/(len(dset_loader) * dset_loader.batch_size)

In [None]:
def compare_models(model1, model2, dset_loader):
    running_matches = 0
    
    for inputs, labels in dset_loader:
        # wrap them in Variable
        inputs = Variable(inputs.cuda())

        # forward
        outputs1 = model1(inputs)
        _, preds1 = outputs1.data.max(1)
        
        outputs2 = model2(inputs)
        _, preds2 = outputs2.data.max(1)
        
        print(preds1)
        print(preds2)
        print(labels)
        
        running_matches += torch.sum(preds1 == preds2)
        print(running_matches)
        raise ValueError('Hi')
    
    return running_matches/(len(dset_loader) * dset_loader.batch_size)

In [None]:
def predict_point(model, datapoint, class_names=None):
    inputs = Variable(datapoint.cuda()).unsqueeze(0)    
    output = model(inputs)
        
    _, preds = output.data.max(1)
    preds = preds[0]
    
    if class_names is None:
        return preds
    else:
        return class_names[preds]

In [None]:
split_model_acc_no_joint = test_handwriting_classifiers(resnet_uppercase_char, resnet_digit, resnet_type, loader_all_test)

In [None]:
print(split_model_acc_no_joint)

In [None]:
split_model_acc_with_joint = test_handwriting_classifiers(resnet_uppercase_char, resnet_digit, resnet_type, loader_all_test, True)

In [None]:
print(split_model_acc_with_joint)

In [None]:
joint_model = HandwritingClassifier(resnet_uppercase_char, resnet_digit, resnet_type, with_joint=True)
nonjoint_model = HandwritingClassifier(resnet_uppercase_char, resnet_digit, resnet_type, with_joint=False)

In [None]:
print(test_model(nonjoint_model, loader_all_test))
print(test_model(joint_model, loader_all_test))

In [None]:
confusion_matrix(joint_model, loader_all_test, all_symbols)

In [None]:
accuracy_dict = accuracy_for_each_category(joint_model, loader_all_test, all_symbols)

In [None]:
for symbol, accuracy in sorted(accuracy_dict.items(), key = lambda p: p[1]):
    print('Symbol: {}, Acc: {:.4f}'.format(symbol, accuracy))

In [None]:
accuracy_dict_chars = accuracy_for_each_category(resnet_uppercase_char, loader_uppercase_char_test, characters)
accuracy_dict_digits = accuracy_for_each_category(resnet_digit, loader_digit_test, digits)

In [None]:
for char, accuracy in sorted(accuracy_dict_chars.items(), key = lambda p: p[1]):
    print('Letter: {}, Acc: {:.4f}'.format(char, accuracy))

In [None]:
for digit, accuracy in sorted(accuracy_dict_digits.items(), key = lambda p: p[1]):
    print('Letter: {}, Acc: {:.4f}'.format(digit, accuracy))

In [None]:
print(compare_models(joint_model, nonjoint_model, loader_all_test))

In [None]:
transform2 = transforms.Compose((transforms.ToTensor(),
                                 transforms.Lambda(tightest_image_crop),
                                 transforms.Lambda(square_padding),
                                 transforms.ToPILImage(),
                                 transforms.Scale(32),
                                 transforms.ToTensor()
                                 ))

In [None]:
dset_cropped_images = AdvancedImageFolder('Cropped Characters', transform2, 
                                          loader = image_loader)

In [None]:
print(len(dset_cropped_images))

In [None]:
def show(img):
    npimg = img.numpy()[0]
    plt.figure()
    plt.imshow(npimg, interpolation='nearest', cmap='gray')

In [None]:
# 1. Also fix non joint model.

# 2. One thing to see if it matters is making it so that the non joint model only uses the appropriate
# second model. Not sure if it'll have a noticeable impact on efficiency.

# 3. Also should try training these models end to end.

# 4. Also should make function that prints out the rank of the correct answer when wrong.

# 5. Pad on both sides instead of just padding to left/top.

# 6. Cut out characters from street name.

In [None]:
output = joint_model(Variable(dset_cropped_images[0][0].unsqueeze(0)).cuda())

In [None]:
print(output)

In [None]:
graph_visual = make_dot(output)

In [None]:
graph_visual

In [None]:
type(graph_visual)

In [None]:
graph_visual.render('joint_model')