In [1]:
#imports:
# !pip install
import numpy as np
import cv2
import os
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from datagenerator import *

physical_devices = tf.config.list_physical_devices('GPU') 
for device in physical_devices:
    tf.config.experimental.set_memory_growth(device, True)

In [2]:
# constants:
SIZE = 160
RESIZED_WIDTH = SIZE  # temp
RESIZED_HEIGHT = SIZE  # temp
RESIZED_NUM_PIXELS = RESIZED_WIDTH * RESIZED_HEIGHT
SHAPE = RESIZED_NUM_PIXELS + 3
TRAIN_PATH = r'R:\domain_lab\runs_processed\train'  # temp
VALID_PATH = r'R:\domain_lab\runs_processed\validation'  # temp
TEST_PATH = r'R:\domain_lab\runs_processed\test'  # temp
BACH_SIZE = 40 # temp
TRAIN_DATASET_SIZE = dataset_size(TRAIN_PATH)
VALID_DATASET_SIZE = dataset_size(VALID_PATH)
TEST_DATA_SIZE = dataset_size(TEST_PATH)
STEPS_PER_EPOCH = TRAIN_DATASET_SIZE // BACH_SIZE
EPOCHS = 10  # temp
VALIDATION_STEPS = VALID_DATASET_SIZE // BACH_SIZE
TEST_STEPS = TEST_DATA_SIZE // BACH_SIZE

In [3]:
# activating data gen:
train_generator = data_generator(TRAIN_PATH, BACH_SIZE,
                                 True, RESIZED_WIDTH, RESIZED_HEIGHT)
validation_generator = data_generator(VALID_PATH, BACH_SIZE,
                                      True, RESIZED_WIDTH, RESIZED_HEIGHT)
test_generator = data_generator(TEST_PATH, BACH_SIZE,
                                True, RESIZED_WIDTH, RESIZED_HEIGHT)

In [4]:
# model:
model = tf.keras.Sequential()
model.add(tf.keras.Input(shape=(SHAPE,)))
model.add(tf.keras.layers.Dense(3*SHAPE//4))
model.add(tf.keras.layers.Dense(SHAPE//2))
model.add(tf.keras.layers.Dense(SHAPE//4))
model.add(tf.keras.layers.Dense(SHAPE//2))
model.add(tf.keras.layers.Dense(3*SHAPE//4))
model.add(tf.keras.layers.Dense(RESIZED_NUM_PIXELS, activation="sigmoid"))

In [77]:
# Compile the Model:
model.compile(loss=keras.losses.binary_crossentropy,
              optimizer=keras.optimizers.Adam())

In [None]:
# Training the Model:
model_history = model.fit(x=train_generator,
          steps_per_epoch=STEPS_PER_EPOCH,
          epochs=EPOCHS,
          validation_data=validation_generator,
          validation_steps=VALIDATION_STEPS)


Epoch 1/7
Epoch 2/7
Epoch 3/7
 1039/12496 [=>............................] - ETA: 7:33:21 - loss: 3.3562

<keras.callbacks.History at 0x26d8aac5c40>

In [None]:
# Evaluating the Model:
test_loss = model.evaluate(test_generator, steps=TEST_STEPS)
print('Test loss:', test_loss)


In [None]:
######## functions for visualizing predictions ########

def visualize_prediction(pred_input, original_output, prediction):
    fig, axes = plt.subplots(1, 3, figsize=(9, 3))

    # Plot the input frame.
    ax = axes[0]
    ax.imshow(np.squeeze(pred_input), cmap="gray")
    ax.set_title(f"input")
    ax.axis("off")

    # Plot the original new frame.
    ax = axes[1]
    ax.imshow(np.squeeze(original_output), cmap="gray")
    ax.set_title(f"original output")
    ax.axis("off")

    # Plot the new frame.
    ax = axes[2]
    ax.imshow(np.squeeze(prediction), cmap="gray")
    ax.set_title(f"predicted output")
    ax.axis("off")

    # Display the figure.
    plt.show()


def predict_and_save(path_for_saving: str, num_pf_predictions: int = None):
    if num_pf_predictions is None:
        num_pf_predictions = TEST_DATA_SIZE
    next_test = next_data(TEST_PATH, RESIZED_WIDTH, RESIZED_HEIGHT)
    for _ in range(num_pf_predictions):
        pred_input, original_output = next(next_test)
        prediction_output = model.predict(np.array([pred_input]))
        prediction = img_from_output(prediction_output[0], RESIZED_WIDTH, RESIZED_HEIGHT)
        my_input = img_from_input(pred_input,RESIZED_WIDTH,RESIZED_HEIGHT)
        original_output = img_from_output(original_output,RESIZED_WIDTH,RESIZED_HEIGHT)
        #saving:
        join = os.path.join
        volts = pred_input[-3:]
        name = f'{volts[0]}_{volts[1]}_{volts[2]}'
        os.mkdir(join(path_for_saving, name))
        for img_name, img in zip(['prediction.png', 'input.png', 'original_output.png'],
                                 [prediction, my_input, original_output]):
            cv2.imwrite(join(path_for_saving, name, img_name), img)


def show_me_random_unsaved_predictions():
    next_test = next_data(TEST_PATH, RESIZED_WIDTH, RESIZED_HEIGHT)
    while True:
        pred_input, original_output = next(next_test)
        prediction_output = model.predict(np.array([pred_input]))
        prediction = img_from_output(prediction_output[0], RESIZED_WIDTH, RESIZED_HEIGHT)
        my_input = img_from_input(pred_input, RESIZED_WIDTH, RESIZED_HEIGHT)
        original_output = img_from_output(original_output, RESIZED_WIDTH, RESIZED_HEIGHT)
        visualize_prediction(my_input, original_output, prediction)
        que = input('press_enter_for_anther_prediction, type q to break')
        if que == 'q':
            print('breaking...')
            break


def show_me_random_saved_predictions(saved_predictions_path: str):
    predictions_list = os.listdir(saved_predictions_path)
    random.shuffle(predictions_list)
    join = os.path.join
    for pred in predictions_list:
        my_input = cv2.imread(join(saved_predictions_path, pred, 'input.png'), cv2.IMREAD_GRAYSCALE)
        original_output = cv2.imread(join(saved_predictions_path, pred, 'original_output.png'), cv2.IMREAD_GRAYSCALE)
        prediction = cv2.imread(join(saved_predictions_path, pred, 'prediction.png'), cv2.IMREAD_GRAYSCALE)
        visualize_prediction(my_input, original_output, prediction)
        que = input('press_enter_for_anther_prediction, type q to break')
        if que == 'q':
            print('breaking...')
            break
    print('There are no more predictions')




In [None]:
predict_and_save('R:\domain_lab\saved_predictions')

In [None]:
show_me_random_unsaved_predictions()