# Binary classification of commutative diagrams
## 1. Data pipeline

In [23]:
import pandas as pd
import numpy as np
#import torch
import os
import shutil
import random
import math
import tensorflow.keras as keras
import matplotlib.pyplot as plt

### 1.1 Expand working directory with folders for training images

In [5]:
##
unsortedSamplesDirName = 'png-2021-all_contexts'
##

cwd = os.getcwd()

unsortedSamplesDir = os.path.join(cwd, unsortedSamplesDirName)
assert('positive' in os.listdir(unsortedSamplesDir) and 'negative' in os.listdir(unsortedSamplesDir))
unsortedPositiveSamplesDir = os.path.join(unsortedSamplesDir, 'positive')
unsortedNegativeSamplesDir = os.path.join(unsortedSamplesDir, 'negative')

sortedSamplesDir = os.path.join(cwd, 'sortedSamples')
os.makedirs(sortedSamplesDir, exist_ok=True)

trainDir = os.path.join(sortedSamplesDir, 'train')
testDir = os.path.join(sortedSamplesDir, 'test')
valDir = os.path.join(sortedSamplesDir, 'val')
os.makedirs(trainDir, exist_ok=True)
os.makedirs(testDir, exist_ok=True)
os.makedirs(valDir, exist_ok=True)

trainPositiveDir = os.path.join(trainDir, 'positive')
trainNegativeDir = os.path.join(trainDir, 'negative')
testPositiveDir = os.path.join(testDir, 'positive')
testNegativeDir = os.path.join(testDir, 'negative')
valPositiveDir = os.path.join(valDir, 'positive')
valNegativeDir = os.path.join(valDir, 'negative')
os.makedirs(trainPositiveDir, exist_ok=True)
os.makedirs(trainNegativeDir, exist_ok=True)
os.makedirs(testPositiveDir, exist_ok=True)
os.makedirs(testNegativeDir, exist_ok=True)
os.makedirs(valPositiveDir, exist_ok=True)
os.makedirs(valNegativeDir, exist_ok=True)

### 1.2 Copy images into train, test and validation folders
The code currently randomly truncates the greater partition between positive and negative to achieve parity.

In [6]:
##
trainRatio = 0.6
testRatio = 0.2
valRatio = 0.2
assert(trainRatio + testRatio + valRatio == 1.)

positiveRatio = 0.5 # Desired ratio of positive samples in the sorted data
##


positiveSamplesFilenameList = os.listdir(unsortedPositiveSamplesDir)
negativeSamplesFilenameList = os.listdir(unsortedNegativeSamplesDir)
random.shuffle(positiveSamplesFilenameList)
random.shuffle(negativeSamplesFilenameList)
numPositiveSamples = len(positiveSamplesFilenameList)
numNegativeSamples = len(negativeSamplesFilenameList)

if numPositiveSamples > numNegativeSamples:
    positiveSamplesFilenameList = positiveSamplesFilenameList[:numNegativeSamples]
    numPositiveSamples = len(positiveSamplesFilenameList)
elif numNegativeSamples > numPositiveSamples:
    negativeSamplesFilenameList = negativeSamplesFilenameList[:numPositiveSamples]
    numNegativeSamples = len(negativeSamplesFilenameList)
assert(numPositiveSamples == numNegativeSamples)
    
numSamples = numPositiveSamples + numNegativeSamples

numTrainSamples = math.floor(numSamples*trainRatio)
numTestSamples = math.floor(numSamples*testRatio)
numValSamples = math.floor(numSamples*valRatio)


unsortedSamplesInfo = {'posDir':unsortedPositiveSamplesDir, 'negDir':unsortedNegativeSamplesDir, 'posFilenameList':positiveSamplesFilenameList,
                       'negFilenameList':negativeSamplesFilenameList}

trainCopyInfo = {'num':numTrainSamples, 'posDir':trainPositiveDir, 'negDir':trainNegativeDir}
testCopyInfo = {'num':numTestSamples, 'posDir':testPositiveDir, 'negDir':testNegativeDir}
valCopyInfo = {'num':numValSamples, 'posDir':valPositiveDir, 'negDir':valNegativeDir}

def copyImagesInPartition(unsortedSamplesInfo:dict, partitionCopyInfo:dict):
    for _ in range(math.floor(partitionCopyInfo['num']*positiveRatio)):
        copyImage(unsortedSamplesInfo['posFilenameList'], unsortedSamplesInfo['posDir'], partitionCopyInfo['posDir'])
    for _ in range(math.floor(partitionCopyInfo['num']*(1. - positiveRatio))):
        copyImage(unsortedSamplesInfo['negFilenameList'], unsortedSamplesInfo['negDir'], partitionCopyInfo['negDir'])

def copyImage(sampleFilenameList, srcDir, dstDir):
    filename = sampleFilenameList.pop()
    src = os.path.join(srcDir, filename)
    dst = os.path.join(dstDir, filename)
    try:
        shutil.copyfile(src, dst)
    except PermissionError: # Ignores straggler files such as notebook checkpoints
        pass

sampleDirectorySizes = [len(directory) for directory in [
    os.listdir(trainPositiveDir), os.listdir(trainNegativeDir), os.listdir(testPositiveDir), os.listdir(testNegativeDir),
    os.listdir(testPositiveDir), os.listdir(testNegativeDir)]]

if all(size == 0 for size in sampleDirectorySizes):
    copyImagesInPartition(unsortedSamplesInfo, trainCopyInfo)
    copyImagesInPartition(unsortedSamplesInfo, testCopyInfo)
    copyImagesInPartition(unsortedSamplesInfo, valCopyInfo)
else:
    raise Exception("Sorted image directories are not empty.")

# TRASH: Is this actually more readable than the functions above?
"""
if all(size == 0 for size in sampleDirectorySizes):
    for _ in range(math.floor(numTrainSamples*positiveRatio)):
        filename = positiveSamplesFilenameList.pop()
        src = os.path.join(unsortedPositiveSamplesDir, filename)
        dst = os.path.join(trainPositiveDir, filename)
        shutil.copyfile(src, dst)

    for _ in range(math.floor(numTrainSamples*(1. - positiveRatio))):
        filename = negativeSamplesFilenameList.pop()
        src = os.path.join(unsortedNegativeSamplesDir, filename)
        dst = os.path.join(trainNegativeDir, filename)
        shutil.copyfile(src, dst)

    for _ in range(math.floor(numTestSamples*positiveRatio)):
        filename = positiveSamplesFilenameList.pop()
        src = os.path.join(unsortedPositiveSamplesDir, filename)
        dst = os.path.join(testPositiveDir, filename)
        shutil.copyfile(src, dst)

    for _ in range(math.floor(numTestSamples*(1. - positiveRatio))):
        filename = negativeSamplesFilenameList.pop()
        src = os.path.join(unsortedNegativeSamplesDir, filename)
        dst = os.path.join(testNegativeDir, filename)
        shutil.copyfile(src, dst)

    for _ in range(math.floor(numValSamples*positiveRatio)):
        filename = positiveSamplesFilenameList.pop()
        src = os.path.join(unsortedPositiveSamplesDir, filename)
        dst = os.path.join(valPositiveDir, filename)
        shutil.copyfile(src, dst)

    for _ in range(math.floor(numValSamples*(1. - positiveRatio))):
        filename = negativeSamplesFilenameList.pop()
        src = os.path.join(unsortedNegativeSamplesDir, filename)
        dst = os.path.join(valNegativeDir, filename)
        shutil.copyfile(src, dst)
else:
    raise Exception("Sorted image directories are not empty.")"""

Exception: Sorted image directories are not empty.

[*Optional*]: Test whether a sample handful of the images were copied to the correct folders

In [7]:
##
runImageCopyTest = False

comparisonSampleSize = 50
imageFiletype = 'png'
##

if runImageCopyTest:
    # These must be redefined here since they were popped
    positiveSamplesFilenameList = os.listdir(unsortedPositiveSamplesDir)
    negativeSamplesFilenameList = os.listdir(unsortedNegativeSamplesDir)

    sortedTrainPositiveFilenameList = os.listdir(trainPositiveDir)
    sortedTrainNegativeFilenameList = os.listdir(trainNegativeDir)
    sortedTestPositiveFilenameList = os.listdir(testPositiveDir)
    sortedTestNegativeFilenameList = os.listdir(testNegativeDir)
    sortedValPositiveFilenameList = os.listdir(valPositiveDir)
    sortedValNegativeFilenameList = os.listdir(valNegativeDir)
    random.shuffle(sortedTrainPositiveFilenameList)
    random.shuffle(sortedTrainNegativeFilenameList)
    random.shuffle(sortedTestPositiveFilenameList)
    random.shuffle(sortedTestNegativeFilenameList)
    random.shuffle(sortedValPositiveFilenameList)
    random.shuffle(sortedValNegativeFilenameList)

    allPositiveFilenameLists = [sortedTrainPositiveFilenameList, sortedTestPositiveFilenameList, sortedValPositiveFilenameList]
    allNegativeFilenameLists = [sortedTrainNegativeFilenameList, sortedTestNegativeFilenameList, sortedValNegativeFilenameList]

    def matchesImageFiletype(sampleFilename : str, imageFiletype : str): # Needed to ignore straggler files such as notebook checkpoints
        return sampleFilename[:len(imageFiletype)] == imageFiletype

    for fList in allPositiveFilenameLists:
        assert(all(sampleFilename in positiveSamplesFilenameList
                   for sampleFilename in fList[:comparisonSampleSize]
                   if matchesImageFiletype(sampleFilename, imageFiletype)))
    for fList in allNegativeFilenameLists:
        assert(all(sampleFilename in negativeSamplesFilenameList
                   for sampleFilename in fList[:comparisonSampleSize]
                   if matchesImageFiletype(sampleFilename, imageFiletype)))
    print('Test was succesful!')

### 1.3 Data generators

The data generators themselves can rescale the input pixel values to the [0, 1] range. Note that the pretrained baseline models expect certain kinds of input - Respectively, EfficientNet excepts pixel floats in the [0-255] range to be passed through the keras.applications.resnet_v2.preprocess_input() function, and ResNet expects its input to be raw, which will have its pixel values rescaled to the (-1, 1) range. Thus, a seperate set of data generators are made for them which do net rescale their input.

In [8]:
trainDataGeneratorFactory, testDataGeneratorFactory = keras.preprocessing.image.ImageDataGenerator(rescale=1./255), img.ImageDataGenerator(rescale=1./255)
trainDataGenerator = trainDataGeneratorFactory.flow_from_directory( # Batch generator
    trainDir,
    target_size = (150, 150), # TODO: What size do I want?
    batch_size = 75,
    class_mode = 'binary')
testDataGenerator = testDataGeneratorFactory.flow_from_directory(
    valDir,
    target_size = (150, 150),
    batch_size = 75,
    class_mode = 'binary')

BaselineTrainDataGeneratorFactory, BaselineTestDataGeneratorFactory = keras.preprocessing.image.ImageDataGenerator(), img.ImageDataGenerator()
BaselineTrainDataGenerator = BaselineTrainDataGeneratorFactory.flow_from_directory( # Batch generator
    trainDir,
    target_size = (150, 150), # TODO: What size do I want?
    batch_size = 75,
    class_mode = 'binary')
BaselineTestDataGenerator = BaselineTestDataGeneratorFactory.flow_from_directory(
    valDir,
    target_size = (150, 150),
    batch_size = 75,
    class_mode = 'binary')

Found 2121 images belonging to 2 classes.
Found 706 images belonging to 2 classes.


## 2 Baseline model implementation

### 2.1 Implementation of untuned but pretrained EfficientNet and ResNet

In [22]:
#device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
#print(f'Using {device}.')

#torch.hub.load('NVIDIA/DeepLearningExamples:torchhub', 'nvidia_efficientnet_widese_b0', pretrained=True')

efficientNetB0 = keras.applications.efficientnet.EfficientNetB0(
    include_top = False,
    input_shape = (150, 150, 3), # Input to pretrained model must have 3 channels
    weights = 'imagenet',
    pooling = 'avg')
resNet50V2 = keras.applications.resnet_v2.ResNet50V2(
    include_top = False,
    input_shape = (150, 150, 3), # Input to pretrained model must have 3 channels
    weights = 'imagenet',
    pooling = 'avg')

# keras.applications.resnet_v2.preprocess_input()
# When exactly should this be implemented?

### 2.2 Training and testing of EfficientNet baseline 