In [None]:
import msgpack
import random
import tensorflow as tf
from tensorflow import keras
import numpy as np
import json

from scenes import *
import pyngp as ngp

In [None]:
def get_scene(scene):
	for scenes in [scenes_sdf, scenes_nerf, scenes_image, scenes_volume]:
		if scene in scenes:
			return scenes[scene]
	return None

def Train_NeRF(testbed, load_snapshot, screenshot_transforms, screenshot_dir, sharpen=0, exposure=0.0, screenshot_frames=None, screenshot_spp=16, width=224, height=224, network=None):

	if load_snapshot:
		scene_info = get_scene(load_snapshot)
		if scene_info is not None:
			load_snapshot = default_snapshot_filename(scene_info)
		testbed.load_snapshot(load_snapshot)

	ref_transforms = {}
	if screenshot_transforms: # try to load the given file straight away
		print("Screenshot transforms from ", screenshot_transforms)
		with open(screenshot_transforms) as f:
			ref_transforms = json.load(f)

	if testbed.mode == ngp.TestbedMode.Sdf:
		testbed.tonemap_curve = ngp.TonemapCurve.ACES

	testbed.nerf.sharpen = float(sharpen)
	testbed.exposure = exposure
	testbed.shall_train = True


	testbed.nerf.render_with_lens_distortion = True

	if ref_transforms:
		testbed.fov_axis = 0
		testbed.fov = ref_transforms["camera_angle_x"] * 180 / np.pi
		if not screenshot_frames:
			screenshot_frames = range(len(ref_transforms["frames"]))

		for idx in screenshot_frames:
			f = ref_transforms["frames"][int(idx)]
			if 'transform_matrix' in f:
				cam_matrix = f['transform_matrix']
			elif 'transform_matrix_start' in f:
				cam_matrix = f["transform_matrix_start"]
			else:
				raise KeyError()
			testbed.set_nerf_camera_matrix(np.matrix(cam_matrix)[:-1,:])
			outname = os.path.join(screenshot_dir, os.path.basename(f["file_path"]))

			# Some NeRF datasets lack the .png suffix in the dataset metadata
			if not os.path.splitext(outname)[1]:
				outname = outname + ".png"

			image = testbed.render(width or int(ref_transforms["w"]), height or int(ref_transforms["h"]), screenshot_spp, True)
			os.makedirs(os.path.dirname(outname), exist_ok=True)
			write_image(outname, image)


In [None]:
def loadNeRFData(fileName = "../SavedModel.msgpack"):
    with open(fileName, "rb") as data_file:
        byte_data = data_file.read()
        data_loaded = msgpack.unpackb(byte_data)
        parameters = np.frombuffer(data_loaded["snapshot"]["params_binary"], dtype=np.float16).copy()
        return data_loaded, parameters
        
def SaveParameters(data_loaded, parameters, outputFileName = "../Test.msgpack"):
    data_loaded['snapshot']['params_binary'] = parameters.tobytes()

    with open(outputFileName, "wb") as file:
        file.write(msgpack.packb(data_loaded))

In [None]:
pretrained_model = tf.keras.applications.MobileNetV2()
pretrained_model.trainable = False

decode_predictions = tf.keras.applications.mobilenet_v2.decode_predictions

In [None]:
# import gymnasium as gym
# from gymnasium import spaces
# import numpy as np
# from stable_baselines3 import PPO, A2C
# from stable_baselines3.common.env_checker import check_env
# from PIL import Image
# from skimage.metrics import peak_signal_noise_ratio


# # Preprocess numpy images
# def preprocess(image, unproc=False):
#     if not unproc:
#         image = tf.keras.applications.mobilenet_v2.preprocess_input(image)
#         return np.asarray(image, dtype=np.float32)
#     else:
#         return np.asarray(image, dtype=np.uint8)
#     # Iteration ste

# # Loads images from path
# def prep_im(image_path, unproc=False):
#     image = np.asarray(Image.open(image_path), dtype=np.float32)
#     # ## print(image_raw.shape())
#     image = image[None, ...]
#     image = preprocess(image, unproc=unproc)
#     #     image_probs = pretrained_model.predict(image)
#     #     plot_fig(image, image_probs)
#     return image

# pred = pretrained_model.predict(prep_im("../../Slug69Percent.jpg"))
# decode_predictions(pred, top=5)

In [None]:
testbed = ngp.Testbed()

In [None]:
mse = keras.losses.MeanSquaredError()

In [None]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
from stable_baselines3 import PPO, A2C
from stable_baselines3.common.env_checker import check_env
from PIL import Image
from skimage.metrics import peak_signal_noise_ratio

logPath = '../AdvOutput/rewards.log'
# Preprocess numpy images
def preprocess(image, unproc=False):
    if not unproc:
        image = tf.keras.applications.mobilenet_v2.preprocess_input(image)
        return np.asarray(image, dtype=np.float32)
    else:
        return np.asarray(image, dtype=np.uint8)
    # Iteration ste

# Loads images from path
def prep_im(image_path, unproc=False):
    image = np.asarray(Image.open(image_path), dtype=np.float32)
    # ## print(image_raw.shape())
    image = image[None, ...]
    image = preprocess(image, unproc=unproc)
    #     image_probs = pretrained_model.predict(image)
    #     plot_fig(image, image_probs)
    return image

class InstantNGPEnv(gym.Env):
    def __init__(self):
        super(InstantNGPEnv, self).__init__()
        # self.reward_range = (-1,1)
        # Load Instant-NGP and MobileNetV2 models
        # self.instant_ngp = ...  # Load Instant-NGP model
        self.data_loaded, self.feature_grid = loadNeRFData()
        self.ground_truth_img = None
        self.log = open(logPath, 'a')


        self.mobilenetv2 = pretrained_model  # Load MobileNetV2 model
        self.true_class_id = 954
        self.true_class = "banana"
        self.target = "slug" # Target tiger shark class
        self.noiseAmount = 0.15
        self.max_reward = -np.inf
        self.epochs = 0
        self.targeted_grad = None
        self.unrendered = None
        self.state = 38 + random.randint(-3,3)
        self.Num_Imgs = 172

        # Define action and observation spaces
        self.action_space = spaces.Box(low=-0.005, high=0.005, shape=(13205056,), dtype=np.float32)  # Define space of allowable feature grid modifications
        self.observation_space = spaces.Box(low=0, high=np.inf, shape=(13205056,), dtype=np.float32) # Define state representation (e.g., feature grid, image)


    def reset(self, seed=0):
        super().reset(seed=seed)
        # Reset feature grid or load new one
        self.mobilenetv2.trainable = False
        self.ground_truth_img = prep_im("../NormalOutput/0081.jpg")
        self.data_loaded, self.feature_grid = loadNeRFData()
        # self.log.close()
        return (self.feature_grid, {})
    
    def get_reward(self, originalImage, renderedImage, predicted, min_psnr=30, max_psnr=50, mse_weight = -2, psnr_weight = .5):

        returned_reward = 0
        imageLoss = mse(originalImage, renderedImage)
        psnr = (peak_signal_noise_ratio(originalImage, renderedImage, data_range=255) - min_psnr) / (max_psnr - min_psnr)
        if predicted[1] == self.target:
            returned_reward = predicted[2] * 10 + predicted[0]*.2 # psnr * psnr_weight + imageLoss * mse_weight # TODO: Commented out to test and see if it increases accuracy
        elif predicted[1] != self.true_class and predicted[0] == self.Num_Imgs:
            returned_reward += predicted[2] * 10 + predicted[0] * .2
        else:
            returned_reward = -100

        print("Psnr: " + str(psnr) + ", MSE: " + str(imageLoss) + " for class " + predicted[1])
        return returned_reward

    def step(self, action):
        # Modify feature grid based on action
        self.feature_grid +=  action[0] * self.noiseAmount  # Apply action to feature grid

        # Render image from modified feature grid
        SaveParameters(self.data_loaded, self.feature_grid)
        Train_NeRF(testbed, "../Test.msgpack", "../BananaScene/transforms.json", "../AdvOutput/")

        # TODO: Modify code to iterate through a range of image, test images on ImageNetV2, and find the common predictions between all of them.
        # TODO: use common label as the true label predicted.
        pred_classes = {}
        for i in range(1, self.Num_Imgs):
            image = prep_im(f"../AdvOutput/{i:04}.jpg")
            # Classify image using MobileNetV2
            prediction = self.mobilenetv2.predict(image, verbose=0)

            # print("Top prediction:  " + str(decode_predictions(prediction, top=1)))

            decoded_prediction = decode_predictions(prediction, top=1)[0][0]

            if decoded_prediction[1] not in pred_classes:
                pred_classes[decoded_prediction[1]] = [decoded_prediction[2], 1]
            else:
                pred_classes[decoded_prediction[1]][0] *= pred_classes[decoded_prediction[1]][1]
                pred_classes[decoded_prediction[1]][0] += decoded_prediction[2]
                pred_classes[decoded_prediction[1]][1] += 1
                pred_classes[decoded_prediction[1]][0] /= pred_classes[decoded_prediction[1]][1]
        maxKey = str(max(pred_classes, key=lambda x:pred_classes[x][1]))
        print("Max Key Found: " + maxKey + " with " + str(pred_classes[maxKey][1]) + " number of images with this label")
        print("dictionary: " + str(pred_classes))
        print("pred: " + maxKey + ", confidence: " + str(pred_classes[maxKey][0]))
        # print("true pred: " + self.true_class + ", confidence: " + str(avg_true_confidence))
        # Calculate reward based on adversarial success and image quality
        reward = self.get_reward(self.ground_truth_img, image, [pred_classes[maxKey][1], maxKey, pred_classes[maxKey][0]])  # Implement reward function
        # reward += (1 - (np.squeeze(prediction)[self.true_class_id])) # TODO: Adjust reward so model gets to slug with high accuracy.
        reward -= 0 if self.true_class not in pred_classes else pred_classes[self.true_class][0]*10
        # print("Banana: " + str(decode_predictions(prediction)) + ", conf: " + str(np.squeeze(prediction)[self.true_class_id]))
        # Determine if episode is done
        done = False

        # done = True if reward <= -100 else False  # Define termination criteria
        truncated, info = False, {}
        
        if self.epochs % 10 == 0 or reward == self.max_reward:
        # if self.epochs % 250 == 0:
            print('saving tape...')
            self.log.write(
                '\n'
                + str(self.epochs)
                + ', '
                + str(decode_predictions(prediction, top=1))
            )
        self.epochs += 1

        info = {
            "Target" : self.target,
            "Predicted" : maxKey,
            "Num_Imgs" : pred_classes[maxKey][1],
            "Confidence" : pred_classes[maxKey][0],
            "Epochs" : self.epochs
        }

        return self.feature_grid, reward, done, truncated, info

    
    def render(self):
        pass

    def close(self):
        self.log.close()

# Create environment instance
env = InstantNGPEnv()

# Create RL agent
model = PPO('MlpPolicy', env, device='cuda')

# Train the agent
# Evaluate the trained agent
episode_reward = 0
obs, info = env.reset()
_states = None

epochs = 0
while True:
    action, _states = model.predict(obs, _states)
    obs, reward, done, truncated, info = env.step(action)
    episode_reward += reward
    print("Current episode reward: " + str(episode_reward))

    # TODO: Why did this throw an error? "Target" is a defined key in the info dictionary. Literally right there.
    if (info['Target'] == info['Predicted'] and info['Confidence'] >= .65 and info["Num_Imgs"] >= 172):
        print("Episode reward:", episode_reward)
        break

    if epochs > 100 and info['Predicted'] != info['Target']:
        obs, info = env.reset()
        epochs = 0
        episode_reward = 0
        _states = None
    elif info['Predicted'] == info["Target"]:
        epochs = 0
    epochs += 1



In [None]:
# !python adversarialAttack.py --load_snapshot=../SavedModel.msgpack --screenshot_transforms=../BananaScene/transforms.json --screenshot_dir=../NormalOutput --width=224 --height=224


# Load Pretrained ImageNetV2