In [None]:
import argparse
import json
import tensorflow as tf
import numpy as np
import h5py
import copy
import matplotlib.pyplot as plt
import matplotlib.path as mplPath
from sklearn.manifold import TSNE

from fr3D.train.utils import setup_datasets
from fr3D.data.utils import get_normalization
from fr3D.models import ConvAutoencoder, ConvAutoencoderCGAN, ConvAutoencoderC

tf.keras.backend.set_image_data_format('channels_last')

import os
os.environ["CUDA_VISIBLE_DEVICES"] = "1"

In [None]:
Re = 500
expt_variable = 'U'

#dataset_path = f'/fr3D/postprocessed/annulus_64_{Re}.h5'
dataset_path = f'/fr3D/postprocessed/annulus_64.h5'
#dataset_path = f'/fr3D/postprocessed/annulus_64_plane.h5'

#CGAN
#experiment_config = f'/fr3D/configs/training/ConvAutoencoderCGAN_{expt_variable}.json'
#weights_path = f'/storage/weights/ConvAutoencoderCGAN_{expt_variable}_Annulus64/ConvAutoencoderCGAN_{expt_variable}_Annulus64.h5'

#ConvAutoencoderC
experiment_config = f'/fr3D/configs/training/ConvAutoencoderC_{expt_variable}.json'
weights_path = f'/storage/weights{Re}/ConvAutoencoderC_{expt_variable}_Annulus64/ConvAutoencoderC_{expt_variable}_Annulus64.h5'



datasetf = h5py.File(dataset_path,'r')

shuf_buf = 1
config = json.load(open(experiment_config,'r'))
train_dataset, test_dataset = setup_datasets(config, dataset_path, shuf_buf, case_names=True, evaluation=True)
sensor_shape = train_dataset.element_spec[0][0].shape
full_field_shape = train_dataset.element_spec[0][1].shape

def get_normalization_type(node_configs):
    normalization_spec = {'method': None}
    for node in node_configs:
        if node['nodetype'] == 'normalize':
            normalization_spec = node['normalization_spec']
            break
    normalizer = get_normalization(**normalization_spec, batch_mode=True)
    return normalizer
normalizer = get_normalization_type(config['dataset']['node_configurations'])

In [None]:
model = ConvAutoencoderC(dense_input_units=sensor_shape[1],
                         autoencoder_input_shape=full_field_shape[1:],
                         **config['model'])
loss_fn = "mse"#tf.keras.losses.get(config['training']['loss'])
model.compile(l_optimizer= tf.keras.optimizers.get(config['training']['l_optimizer']),
              loss=loss_fn,
              optimizer = tf.keras.optimizers.get(config['training']['ae_optimizer']),
              metrics = config['training'].get('metrics', None))
model.load_weights(weights_path)

In [None]:
ds = iter(test_dataset)

latent_embeddings = np.zeros((0,) + model.encoder.output_shape[1:])
case_names = []
for (_, full_field, _), case_names_batch in ds:
    latent_embeddings = np.concatenate([latent_embeddings, model.encoder(full_field).numpy()], 0)
    case_names = case_names + list(map(lambda x: x.decode('utf-8'), case_names_batch.numpy()))

In [None]:
def choose_samples(x):
    return x[::50]

chosen_embeddings = choose_samples(latent_embeddings)
chosen_case_names = choose_samples(case_names)
tsne_embeddings = TSNE().fit_transform(chosen_embeddings.reshape(chosen_embeddings.shape[0],-1))

In [None]:
from matplotlib.pyplot import cm
import matplotlib.patches as mpatches

plt.rcParams['figure.dpi'] = 800
plt.rcParams.update({'font.size': 18})

unique_case_names = set(chosen_case_names)
case_name_colors = {}
for case_name, color in zip(unique_case_names, iter(cm.rainbow(np.linspace(0, 1, len(unique_case_names))))):
    case_name_colors[case_name] = color

plt.figure()

for embedding, case_name in zip(tsne_embeddings, chosen_case_names):
    plt.scatter(embedding[0], embedding[1], color=case_name_colors[case_name])
    
legend_patches = []
for case_name in case_name_colors:
    legend_patches.append(mpatches.Patch(color=case_name_colors[case_name], label = case_name))
plt.legend(handles=legend_patches, ncol = 3, loc = "lower left", fontsize=12, bbox_to_anchor = (-0.1,-0.8))
plt.savefig('/storage/paper/tsne_embeddings.pdf', bbox_inches='tight')
plt.show()
plt.close()