In [None]:
from fastai.vision.all import *
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

In [None]:
## Configuration
## If you are training a new model, set is_training to True, or False to load a pretrained model from a pkl file
is_training = False

## If training a new model, this is the file name that the model will be saved to, if not training this will be the name of the pkl file to load.
file_name = '512x512epochs.pkl'

## Set mae_loss to True to use Mean Absolute Error for the loss function, or False to use Mean Squared Error.
mae_loss = True

## Set use_standardization to True to use standardization for feature scaling, or False to use normalization.
use_standardization = True

## Batch size of 28 works for my 16GB of VRAM. However, this value will depend on your specs. Use smaller values if less VRAM is available.
batch_size = 28

## Set the size in pixels that the images should be scaled to. Higher counts will require more VRAM and take longer per epoch to train.
## However, higher values may allow the model to converge with fewer total epochs and achieve a higher accuracy.
img_size = 512

## Set the learning rate to a custom value. Or, to use lr_find to automatically calculate a good learning rate, set lr = 0 (this is the recommended default).
lr = 0

## Set the number of epochs to train for. Should probably be >= ~200 to yield a usable model.
n_epochs = 512

## Set the number of epochs to train while the body of the weights is frozen. This should not be set too high to avoid overfitting (default = 1).
n_freeze_epochs = 2

In [None]:
## Read in y values.
df = pd.read_csv('PhenotypeDataUGA.csv').drop('line', axis=1)
df

In [None]:
## Extract column names.
traits = df.columns.values.tolist()[1:]
traits

In [None]:
## Feature Scaling
scaler = StandardScaler() if use_standardization else MinMaxScaler()
df.iloc[:, 1:] = pd.DataFrame(scaler.fit_transform(df.iloc[:, 1:]), columns=traits)

df

In [None]:
## Combine values in each row to a list to pass into the model.
df['combined'] = df[traits].values.tolist()

In [None]:
## Split the dataset.
train, test = train_test_split(df, test_size=0.2, random_state=8)

In [None]:
class TitledList(list, ShowTitle):
    _show_args = {'label': 'text'}
    
    def show(self, ctx=None, **kwargs):
        "Show self"
        return show_title(self, ctx=ctx, **merge(self._show_args, kwargs))

class ToListTensor(DisplayedTransform):
    _show_args = {'label': 'text'}
    
    def __init__(self, split_idx=None,):
        super().__init__(split_idx=split_idx)

    def encodes(self, o): return o
    
    def decodes(self, o): return TitledList(o)

In [None]:
## Create the datablock. Of particular importance are the resize dimensions, method, and pad mode.
## The model gets better results faster when using high resolution images.
## Padding with zeros ensures that there is no loss of aspect ratio or information compared to squishing or cropping.
plant = DataBlock(blocks = [ImageBlock, RegressionBlock(n_out=12)],
                  get_x = ColReader('photo_id', pref=f'fruits/IMG_', suff='.JPG'),
                  get_y = Pipeline( [ColReader('combined'), ToListTensor ]),
                  splitter = RandomSplitter(valid_pct=0.1),
                  item_tfms = Resize(img_size, method=ResizeMethod.Pad, pad_mode=PadMode.Zeros),
                  n_inp = 1
)


In [None]:
## Create the dataloader.
dls = plant.dataloaders(train, bs=batch_size).cuda()

In [None]:
## Create the learner using resnet50 as the initial weights.
learn = vision_learner(
               dls = dls,
               arch = resnet50,
               metrics = [mae, mse, rmse, R2Score()],
               loss_func = L1LossFlat() if mae_loss else MSELossFlat(),
               n_out = 12
)

In [None]:
## If training, set learning rate. When training, lr_find can be useful for picking a more optimal learning rate.
if is_training and lr <= 0:
    lrs = learn.lr_find(suggest_funcs=(minimum, steep, valley, slide))
    lr = lrs.valley

In [None]:
if is_training:
    ## Train the model and export to file.
    learn.fine_tune(epochs=n_epochs, base_lr=lr, freeze_epochs=n_freeze_epochs)
    learn.export(file_name)
else:
    ## Load model for testing
    learn = load_learner(file_name)

In [None]:
## Predict on the test dataset.
dl = learn.dls.test_dl(test)
preds, _ = learn.get_preds(dl=dl)

In [None]:
## Extract list of photo IDs.
photo_id_col = test["photo_id"].tolist()

## Undo feature scaling.
preds = pd.DataFrame(scaler.inverse_transform(preds), columns=traits)

## Append the photo_id column
preds['photo_id'] = photo_id_col
preds.head(10)

In [None]:
## Drop photo_id and combined as undoing feature scaling using sklearn returns a numpy array, losing the dataframe's column names.
test = test.drop(['photo_id', 'combined'], axis=1, errors='ignore')
test.index = range(len(test.index))

## Undo feature scaling.
test = pd.DataFrame(scaler.inverse_transform(test), columns=traits)

## Append the photo_id column
test['photo_id'] = photo_id_col
test.head(10)

In [None]:
## Create empty dataframe to combine test and preds into.
test_preds_combined = pd.DataFrame()

## Iterate through test and preds and append each row. Even rows are test data, the next index is the predicted value.
for i in range(len(preds)):
    test_preds_combined = pd.concat([test_preds_combined, test.iloc[i]], axis=1)
    test_preds_combined = pd.concat([test_preds_combined, preds.iloc[i]], axis=1)

## Transform dataframe to have the same structure as the originals.
test_preds_combined = test_preds_combined.T.reset_index(drop=True)

## Export dataframe to a csv file.
test_preds_combined.to_csv(file_name + '_combined.csv')
test_preds_combined

In [None]:
## Calculate regression metrics for all columns combined except photo_id.
print('Mean squared error: ' + str(mean_squared_error(test.iloc[:, :-1], preds.iloc[:, :-1])))
print('Mean absolute error: ' + str(mean_absolute_error(test.iloc[:, :-1], preds.iloc[:, :-1])))
print('R2 score: ' + str(r2_score(test.iloc[:, :-1], preds.iloc[:, :-1])))

In [None]:
## Calculate regression metrics for each column individually.
for i, name in enumerate(traits):
    print('Mean squared error for column ' + name + ': ' + str(mean_squared_error(test.iloc[:, i], preds.iloc[:, i])))
    print('Mean absolute error for column ' + name + ': ' + str(mean_absolute_error(test.iloc[:, i], preds.iloc[:, i])))
    print('R2 score for column ' + name + ': ' + str(r2_score(test.iloc[:, i], preds.iloc[:, i])) + '\n')

In [None]:
## valid_ranges stores margins of error to be checked.
valid_ranges = dict.fromkeys([0.1, 0.05, 0.025, 0.01])
## Iterate through each margin of error and save each row where every column is within that percent of the test values. 
for percent in valid_ranges:
    accurate_preds = preds.copy()
    size_matched_test = test.copy()
    for col in traits:
        accurate_preds = accurate_preds.loc[(accurate_preds[col] >= (size_matched_test[col]*(1-percent))) & (accurate_preds[col] <= (size_matched_test[col]*(1+percent)))]
        size_matched_test = size_matched_test[size_matched_test['photo_id'].isin(accurate_preds['photo_id'])]
    valid_ranges[percent] = accurate_preds

In [None]:
## Print proportion of the predicted values that are within each margin of error of the test set.
for percent in valid_ranges:
    print(f'Accurate within ' + str(percent * 100) + "% of test: " + str(len(valid_ranges[percent]) / len(test)))