# Evolutionary Camouflage Versus a Learning Predator
EvoCamoVsLearningPredator.ipynb

Just a copy of Evo_Camo_vs_Static_FCD.ipynb as of 20220403


In [1]:
# Shared "communication" directory on Drive.
shared_directory = '/content/drive/My Drive/PredatorEye/evo_camo_vs_static_fcd/'

# Pathname of pre-trained Keras/TensorFlow model
saved_model_directory = '/content/drive/My Drive/PredatorEye/saved_models/'
# trained_model = saved_model_directory + '20220202_1211_Find_3_Disks_complex'
# trained_model = saved_model_directory + '20220222_1747_F3D_augmented_rc4'
# trained_model = saved_model_directory + '20220227_0746_F3D2_a'
# trained_model = saved_model_directory + '20220304_1135_FCD5_a'
trained_model = saved_model_directory + '20220321_1711_FCD6_rc4'
model = []

my_prefix = "find_"
other_prefix = "camo_"

my_suffix =  ".txt"
# other_suffix = ".jpeg"
other_suffix = ".png"

fcd_image_size = 1024
fcd_disk_size = 201

import time
import PIL
from os import listdir
from os import remove
from os.path import join
from os.path import split
from os.path import isfile
from tensorflow import keras

# from PIL import Image
import numpy as np

%tensorflow_version 2.x
import tensorflow as tf
print('TensorFlow version:', tf.__version__)

from tensorflow.keras import backend as keras_backend
keras_backend.set_image_data_format('channels_last')

# Import DiskFind utilities for PredatorEye.
import sys
sys.path.append('/content/drive/My Drive/PredatorEye/shared_code/')
import DiskFind as df

TensorFlow version: 2.8.0


# Ad hoc “predator server”

In [5]:
# Top level: wait for camo_xxx.jpeg files to appear, respond with find_xxx.txt
def start_run(step = 0):
    if step == 0:
        print('Start run in', shared_directory )
        list_unexpected_files(shared_directory)
    else:
        print('Continue run at step', step, ' in', shared_directory)
    while True:
        performStep(step, shared_directory)
        step += 1

# Continue from from the last camo_xxx.jpeg file.
def restart_run():
    start_run(newest_file_from_other(shared_directory))

# Single step: wait for camo file, write response, delete previous response.
def performStep(step, directory):
    waitForReply(step, shared_directory)
    print('Write file', step)
    writeResponseFile(step, shared_directory)
    deleteMyFile(step - 1, shared_directory)

# Read image file for step, apply pre-trained model, write response file.
def writeResponseFile(step, directory):
    # Read image file and check for expected format.
    image_pathname = makeOtherPathname(step, directory)
    pixel_tensor = df.read_image_file_as_pixel_tensor(image_pathname)
    assert df.check_pixel_tensor(pixel_tensor), ('wrong file format: ' +
                                                 image_pathname)
    # Run pre-trained model on new image.
    # predict = model.predict(tf.convert_to_tensor([pixel_tensor]))[0]
    prediction = model.predict(tf.convert_to_tensor([pixel_tensor]))[0]
    # Generate response file.
    # response_string = str(predict[0]) + " " + str(predict[1])
    response_string = str(prediction[0]) + " " + str(prediction[1])
    print('response_string ' + "'" + response_string + "'")
    file = open(makeMyPathname(step, directory),"w")
    file.write(response_string)
    file.close()
    print("wrote response file", makeMyPathname(step, directory))
    ############################################################################

    # # 20220408
    # label_for_fine_tuning = center_of_nearest_prey(prediction, step, directory)

    # # 20220409
    # df.draw_image(pixel_tensor, label_for_fine_tuning, prediction)

    # 20220410
    fine_tune_predator(pixel_tensor, prediction, step, directory)

    ############################################################################

# Delete the given file, usually after having written the next one.
def deleteMyFile(step, directory):
    path = makeMyPathname(step, directory)
    if isfile(path):
        remove(path)

# From pathname for file of given step number from the "other" agent.
def makeOtherPathname(step, directory):
    return directory + other_prefix + str(step) + other_suffix

# Form pathname for file of given step number from "this" agent.
def makeMyPathname(step, directory):
    return directory + my_prefix + str(step) + my_suffix

################################################################################
def makePreyPathname(step, directory):
    return directory + 'prey_' + str(step) + '.txt'
################################################################################

# Wait until other agent's file for given step appears.
def waitForReply(step, directory):
    print('start waiting for  ', makeOtherPathname(step, directory))
    start_time = time.time()
    while not isFilePresent(makeOtherPathname(step, directory)):
        time.sleep(2)  # wait 2 sec
    print('done waiting for   ', makeOtherPathname(step, directory))
    print('Elapsed time:', int(time.time() - start_time), 'seconds.')

# Like fs::exists() but for unknown reasons, that does not
# seem to work for newly created files on G Drive.
#
# TODO Why? This version works on G Drive, but it seems simply
#      calling fs::exists() should be enough.
#
def isFilePresent(file):
    result = False
    (directory, filename) = split(file)
    for i in listdir(directory):
        if i == filename:
            result = True
    return result

# Actually I guess the counterparty may have already written its first...
def list_unexpected_files(directory):
    directory_contents = listdir(directory)
    if directory_contents:
        print('Unexpected files:', directory_contents)

# Returns the step number of the newest file from "other" in given directory.
# (So if "camo_573.jpeg" is the only "other" file there, returns int 573)
def newest_file_from_other(directory):
    steps = [0]  # Default to zero in case dir is empty.
    for filename in listdir(directory):
        if other_prefix == filename[0:len(other_prefix)]:
            steps.append(int(filename.split(".")[0].split("_")[1]))
    return max(steps)

################################################################################

# TODO 20220413 experiment
# (This seems rather ad hoc, but I wonder what would happen if instead of
# training on a single training example per step, if I accumulated a new
# “training set” of all the steps seen so far during a single run? (note this
#  will newly make run “stateful” but probably OK for a test))
fine_tune_images = None
fine_tune_labels = None

# use np.concatenate or tf.stack to grow these each step


# 20220410
def fine_tune_predator(pixel_tensor, prediction, step, directory):

    # 20220408
    label_for_fine_tuning = center_of_nearest_prey(prediction, step, directory)


    # TODO 20220414
    global fine_tune_images
    global fine_tune_labels
    new_image = tf.convert_to_tensor([pixel_tensor])
    new_label = tf.convert_to_tensor([label_for_fine_tuning])
    if (fine_tune_images == None) and (fine_tune_labels == None):
        fine_tune_images = new_image
        fine_tune_labels = new_label
    else:
        fine_tune_images = tf.concat([fine_tune_images, new_image], axis=0)
        fine_tune_labels = tf.concat([fine_tune_labels, new_label], axis=0)

    # 20220409
    # TODO 20220410 turn this off for a test run.
    # df.draw_image(pixel_tensor, label_for_fine_tuning, prediction)

    # Do a training step of the predator model. Use the ground truth center of
    # the prey located nearest the current model's prediction from the current
    # "tournament image". That is, assume it was aiming at the correct prey but
    # just missed a little.
    # history = model.fit(x = tf.convert_to_tensor([pixel_tensor]),
    #                     y = tf.convert_to_tensor([label_for_fine_tuning]),
    #                     validation_data = None)
    # TODO 20220414
    # Here trying a train step, using the collection of all previous examples
    history = model.fit(x = fine_tune_images,
                        y = fine_tune_labels,
                        validation_data = None)

    # visualize change from train step:
    new_prediction = model.predict(tf.convert_to_tensor([pixel_tensor]))[0]

    # # TODO 20220410 this should be a display utility in df:
    # image_width = pixel_tensor.shape[0]
    # disk_width = image_width * df.fcd_relative_disk_size

    # df.draw_image(pixel_tensor)
    # df.draw_circle(prediction, disk_width)
    # df.draw_circle(new_prediction, disk_width)
    # # plt.show()
    # df.draw_image_plt_show()

    # TODO 20220410 very temp prototype rather than get hung up further on
    # visualization: draw with "new_prediction" as "label" and "prediction"
    # as "prediction".
    # TODO 20220410 turn this off for a test run.
    # df.draw_image(pixel_tensor, new_prediction, prediction)

    # TODO 20220412 log direction of distance error change
    print('~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')
    # TODO 20220414
    print('fine_tune_images.shape =', fine_tune_images.shape)
    print('fine_tune_labels.shape =', fine_tune_labels.shape)

    before = df.dist2d(prediction, label_for_fine_tuning)
    after = df.dist2d(new_prediction, label_for_fine_tuning)
    d = after - before
    note = 'INCREASED'
    if before > after :
        note = 'DECREASED'
    print(note, ' --  before:', before, ' after:', after, ' diff:', d)
    print('~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')

# 20220408 
# Given the xy prediction from the current predator model, read the ground truth
# prey locations in the "prey_n.txt" file, return the one nearest to the current
# prediction. Effectively, assume the predator was "aiming for" that one but
# missed by a bit.
def center_of_nearest_prey(predict, step, directory):
    prey_centers = read_3_centers_from_file(step, directory)
    # print('prey_centers =\n' + str(prey_centers))
    # print('predict =', predict)
    # print('type(predict) =', type(predict))

    # SURELY there is a more "pythonic" way to do this (select min distance)
    min_distance = float('inf')
    nearest_center = []
    for prey_center in prey_centers:
        # print('prey_center =', prey_center)
        distance = df.dist2d(prey_center, predict)
        # print('distance =', distance)
        if min_distance > distance:
            min_distance = distance
            nearest_center = prey_center
        # print('min_distance =', min_distance)
        # print('nearest_center =', nearest_center)

    return nearest_center

# TODO 20220408
# Read ground truth prey center location data provided in "prey_n.txt" file.
def read_3_centers_from_file(step, directory):
    # Read contents of file as string.
    file = open(makePreyPathname(step, directory),'r')
    prey_centers_string = file.read()
    file.close()
    # Split string at whitespace, map to 6 floats, reshape into 3 xy pairs.
    return np.reshape(list(map(float, prey_centers_string.split())), (3, 2))


################################################################################

# Read pre-trained model

In [3]:
# Read pre-trained TensorFlow "predator vision" model.

print('Reading pre-trained model from:', trained_model)
# ad hoc workaround suggested on https://stackoverflow.com/q/66408995/1991373
#
# dependencies = {
#     'hamming_loss': tfa.metrics.HammingLoss(mode="multilabel", name="hamming_loss"),
#     'attention': attention(return_sequences=True)
# }
#
# dependencies = {
#     'valid_accuracy': ValidAccuracy
# }

# Calculates RELATIVE disk radius on the fly -- rewrite later.
def fcd_disk_radius():
    return (float(fcd_disk_size) / float(fcd_image_size)) / 2

# Given two tensors of 2d point coordinates, return a tensor of the Cartesian
# distance between corresponding points in the input tensors.
def corresponding_distances(y_true, y_pred):
    true_pos_x, true_pos_y = tf.split(y_true, num_or_size_splits=2, axis=1)
    pred_pos_x, pred_pos_y = tf.split(y_pred, num_or_size_splits=2, axis=1)
    dx = true_pos_x - pred_pos_x
    dy = true_pos_y - pred_pos_y
    distances = tf.sqrt(tf.square(dx) + tf.square(dy))
    return distances

# 20211231 copied from Find_Concpocuous_Disk
def in_disk(y_true, y_pred):
    distances = corresponding_distances(y_true, y_pred)
    # relative_disk_radius = (float(fcd_disk_size) / float(fcd_image_size)) / 2

    # From https://stackoverflow.com/a/42450565/1991373
    # Boolean tensor marking where distances are less than relative_disk_radius.
    # insides = tf.less(distances, relative_disk_radius)
    insides = tf.less(distances, fcd_disk_radius())
    map_to_zero_or_one = tf.cast(insides, tf.int32)
    return map_to_zero_or_one

dependencies = { 'in_disk': in_disk }

model = keras.models.load_model(trained_model, custom_objects=dependencies)

Reading pre-trained model from: /content/drive/My Drive/PredatorEye/saved_models/20220321_1711_FCD6_rc4


# Run test

In [None]:
# Normally start from step 0, or if an "other" file exists
# (eg 'camo_123.jpeg') then restart from that point.
restart_run()

Start run in /content/drive/My Drive/PredatorEye/evo_camo_vs_static_fcd/
start waiting for   /content/drive/My Drive/PredatorEye/evo_camo_vs_static_fcd/camo_0.png
done waiting for    /content/drive/My Drive/PredatorEye/evo_camo_vs_static_fcd/camo_0.png
Elapsed time: 40 seconds.
Write file 0
response_string '0.6794255 0.35817093'
wrote response file /content/drive/My Drive/PredatorEye/evo_camo_vs_static_fcd/find_0.txt
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
fine_tune_images.shape = (1, 128, 128, 3)
fine_tune_labels.shape = (1, 2)
INCREASED  --  before: 0.07735971735372192  after: 0.1888302970220201  diff: 0.11147057966829817
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
start waiting for   /content/drive/My Drive/PredatorEye/evo_camo_vs_static_fcd/camo_1.png
done waiting for    /content/drive/My Drive/PredatorEye/evo_camo_vs_static_fcd/camo_1.png
Elapsed time: 40 sec