`FCN.ipynb`

Create a fully connected neural network that determines the slope angle of phase data.

In [1]:
# ### GPU Check
# # check available GPU in terminal: watch -d -n 0.5 nvidia-smi
# # select one GPU to train on:
# import os
# os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID"   # see issue #152
# os.environ["CUDA_VISIBLE_DEVICES"]="2"

### Imports

In [2]:
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
%config InlineBackend.figure_format = 'retina'
import io, os, sys, glob

import pyuvdata
import hera_cal as hc
import uvtools

import threading
import random

### Functions & Classes

In [3]:
def load_relevant_data(miriad_path, calfits_path):
    """TODO: docstring"""

    uvd = pyuvdata.UVData()
    uvd.read_miriad(miriad_path)

    # get the redundancies for that data
    aa = hc.utils.get_aa_from_uv(uvd)
    info = hc.omni.aa_to_info(aa)
    red_bls = np.array(info.get_reds())

    # gains for same data 
    gains, flags = hc.io.load_cal(calfits_path)
    
    return red_bls, gains, uvd

In [4]:
def get_good_red_bls(red_bls, gain_keys, min_group_len = 4):
    """Select all the good antennas from red_bls
    
    Each baseline group in red_bls has its bad separations removed. Groups with less than min_group_len are removed.
    
    Arguments:
    
        red_bls : list of lists - Each sublist is a group of redundant separations for a unique baseline.
        gain_keys : dict - gains.keys() from hc.io.load_cal()
        min_group_len: int - Minimum number of separations in a 'good' sublist.
                            (Default = 4, so that both training and testing can take two seps)
    
    Returns:
    
        list of lists - Each sublist is a len >=4 list of separations of good antennas
    
    """
    
    def ants_good(sep):
        """Returns True if both antennas are good.

        Because we are using data from firstcal (is this right? I have trouble rememebring the names and properties of the different data sources)
        we can check for known good or bad antennas by looking to see if the antenna is represented in gains.keys(). If the antenna is present, its good.

        Arguments:

            sep : tuple - antenna indices

        Returns :

            bool - True if both antennas are in gain_keys
        """

        ants = [a[0] for a in gain_keys]

        if sep[0] in ants and sep[1] in ants:
            return True

        else:
            return False

    good_redundant_baselines = []
    
    for group in red_bls:
        
        new_group = []
        
        for sep in group:
            
            # only retain seps made from good antennas
            if ants_good(sep) == True:
                new_group.append(sep)
                
        new_group_len = len(new_group)
        
        # make sure groups are large enough that both the training set and the testing set can take two seps 
        if new_group_len >= min_group_len:
            
            # Make sure groups are made from even number of seps
            #
            # I honestly dont recall why I did this. ¯\_(ツ)_/¯ 
            if new_group_len % 2 != 0:
                new_group.pop()
                
            good_redundant_baselines.append(sorted(new_group))
            
    return good_redundant_baselines

In [5]:
def train_test_split_red_bls(red_bls, training_percent = 0.80):
    """Slit a list of redundant baselines into a training set and a testing set. 
    
    Each of the two sets has at least one pair of baselines from every group from red_bls. However, separations from one set will not appear the other.
    
    Arguments:
    
        red_bls : list of lists - Each sublist is a group of redundant separations for a unique baseline. Each sublist must have at least 4 separations.
        training_percent : float - *Approximate* portion of the separations that will appear in the training set.
        
    Returns:
    
        tuple of dicts - train_red_bls_dict, test_red_bls_dict
    
    """
    
    
    training_redundant_baselines_dict = {}
    testing_redundant_baselines_dict = {}

    # make sure that each set has at least 2 seps from each group
    #
    thinned_groups_dict = {}
    for group in red_bls:

        # group key is the sep with the lowest antenna indicies
        key = sorted(group)[0]
        
        # pop off seps from group and append them to train or test groups
        random.shuffle(group)
        training_group = []
        training_group.append(group.pop())
        training_group.append(group.pop())

        testing_group = []
        testing_group.append(group.pop())
        testing_group.append(group.pop())
        
        # add the new train & test groups into the dicts
        training_redundant_baselines_dict[key] = training_group
        testing_redundant_baselines_dict[key] = testing_group
        
        # if there are still more seps in the group, save them into a dict for later assignment
        if len(group) != 0:
            thinned_groups_dict[key] = group

    # Shuffle and split the group keys into two sets using 
    #
    thinned_dict_keys = thinned_groups_dict.keys()
    random.shuffle(thinned_dict_keys)
    
    """Because we are ensuring that each set has some seps from every group, the ratio of train / test gets reduced a few percent.
    This accounts for that with an arbitrary shift found by trial and error."""
    training_percent = np.min([0.95, training_percent + 0.15])

    
    # why did i call this extra?
    # these are the keys that each set will extract seps from thinned_groups_dict with
    training_redundant_baselines_extra, testing_redundant_baselines_extra = np.split(thinned_dict_keys, [int(len(thinned_dict_keys)*training_percent)])

    # extract seps from thinned_groups_dict and apply to same key in training set
    for key in training_redundant_baselines_extra:
        key = tuple(key)
        group = thinned_groups_dict[key]
        training_group = training_redundant_baselines_dict[key]
        training_group.extend(group)
        training_redundant_baselines_dict[key] = training_group

    # extract seps from thinned_groups_dict and apply to same key in testing set
    for key in testing_redundant_baselines_extra:
        key = tuple(key)
        group = thinned_groups_dict[key]
        testing_group = testing_redundant_baselines_dict[key]
        testing_group.extend(group)
        testing_redundant_baselines_dict[key] = testing_group
        
    return training_redundant_baselines_dict, testing_redundant_baselines_dict

In [6]:
def get_or_gen_test_train_red_bls_dicts(red_bls = None, gain_keys = None, training_percent = 0.80):

    if len(glob.glob("*.npz")) == 2:
        
        training_redundant_baselines_dict = loadnpz('training_redundant_baselines_dict.npz')[()]
        testing_redundant_baselines_dict = loadnpz('testing_redundant_baselines_dict.npz')[()]
    else:
        
        assert type(red_bls) != None, "Provide a list of redundant baselines"
        assert type(gain_keys) != None, "Provide a list of gain keys"
        
        good_redundant_baselines = get_good_red_bls(red_bls, gain_keys)
        training_redundant_baselines_dict, testing_redundant_baselines_dict = train_test_split_red_bls(good_redundant_baselines, training_percent = training_percent)

        np.savez('training_redundant_baselines_dict', training_redundant_baselines_dict)
        np.savez('testing_redundant_baselines_dict', testing_redundant_baselines_dict)

    return training_redundant_baselines_dict, training_redundant_baselines_dict

In [7]:
from numpy import load
def loadnpz(filename):
    """Loads up npzs. For dicts do loadnpz(fn)[()]"""
    a = load(filename)
    d = dict(zip(("data1{}".format(k) for k in a), (a[k] for k in a)))
    return d['data1arr_0']

In [8]:
def get_seps_data(red_bls_dict, uvd):
    """Get the data and data.conjugate() for all the seps in a redundant baselines dictionary."""
    
    data = {}
    for key in red_bls_dict.keys():
        for sep in red_bls_dict[key]:
            data[sep] = uvd.get_data(sep)
    
    return data

In [9]:
class Data_Creator(object):
    """Creates data in an alternate thread.

    ## usage:
    ## data_maker = data_creator(num_flatnesses=250, mode = 'train')
    ## data_maker.gen_data() (before loop)
    ## inputs, targets = data_maker.get_data() (start of loop)
    ## data_maker.gen_data() (immediately after get_data())

    """
    
    """TODO:
    
    Comments, arg / return explanation & genreal cleanup
    
    """

    def __init__(self, num_flatnesses, bl_data = None, bl_dict = None, gains = None, clean = False):
        
        
        self._num = num_flatnesses
        self._clean = clean
                    
        self._bl_data = bl_data
        self._bl_data_c = None
        
        self._bl_dict = bl_dict
        
        self._gains = gains
        self._gains_c = None
        
        self._epoch_batch = []
        self._nu = np.arange(1024)
        
    def _gen_data(self):
        
        # scaling tools
        # the NN likes data in the range (0,1)
        angle_tx  = lambda x: (np.asarray(x) + np.pi) / (2. * np.pi)
        angle_itx = lambda x: np.asarray(x) * 2. * np.pi - np.pi

        abs_min_max_delay = 0.040
        delay_tx  = lambda x: (np.array(x) + abs_min_max_delay) / (2. * abs_min_max_delay)
        delay_itx = lambda x: np.array(x) * 2. * abs_min_max_delay - abs_min_max_delay

        if self._clean == True:
            
            targets = np.random.uniform(low = -.040, high = 0.040, size = (self._num * 60, 1))
            inputs = np.angle(np.exp(-2j * np.pi * (targets * self._nu + np.random.uniform())))
            
            self._epoch_batch.append((angle_tx(inputs), delay_tx(targets)))
            
        else:
            assert type(self._bl_data) != None, "Provide visibility data"
            assert type(self._bl_dict) != None, "Provide dict of baselines"
            assert type(self._gains)   != None, "Provide antenna gains"
            
            if self._bl_data_c == None:
                self._bl_data_c = {key : self._bl_data[key].conjugate() for key in self._bl_data.keys()}
                
            if self._gains_c == None:
                self._gains_c = {key : self._gains[key].conjugate() for key in self._gains.keys()}
                

            def _flatness(seps):
                """Create a flatness from a given pair of seperations, their data & their gains."""

                a, b = seps[0][0], seps[0][1]
                c, d = seps[1][0], seps[1][1]

                return self._bl_data[seps[0]]*self._bl_data_c[seps[1]] * self._gains_c[(a,'x')] * self._gains[(b,'x')] * self._gains[(c,'x')] * self._gains_c[(d,'x')]

            inputs = []
            for _ in range(self._num):

                unique_baseline = random.sample(self._bl_dict.keys(), 1)[0]
                two_seps = [random.sample(self._bl_dict[unique_baseline], 2)][0]
                
                inputs.append(_flatness(two_seps))

            targets = np.random.uniform(low = -.040, high = 0.040, size = (self._num * 60, 1))
            inputs = np.angle(np.array(inputs).reshape(-1,1024) * np.exp(-2j * np.pi * (targets * self._nu + np.random.uniform())))

            self._epoch_batch.append((angle_tx(inputs), delay_tx(targets)))

    def gen_data(self):
        
        self._thread = threading.Thread(target = self._gen_data, args=())
        self._thread.start()

    def get_data(self, timeout = 10):
        
        if len(self._epoch_batch) == 0:
            self._thread.join(timeout)
            
        return self._epoch_batch.pop(0)

In [54]:
def train(network,
          num_flatnesses,
          num_epochs,
          batch_size,
          log_dir,
          model_save_interval,
          train_info,
          test_info,
          gains,
          clean = False,
          pretrained_model_path = None,
          sample_keep_prob = 0.80,
          fcl_keep_prob = 0.50):
    
    """TODO: Docstring, comments, cleanup.."""
    
    
    def gen_plot(predicted_values, actual_values):
        """Create a prediction plot and save to byte string."""
        

        abs_min_max_delay = 0.040
        delay_tx  = lambda x: (np.array(x) + abs_min_max_delay) / (2. * abs_min_max_delay)
        delay_itx = lambda x: np.array(x) * 2. * abs_min_max_delay - abs_min_max_delay

        prediction_unscaled = delay_itx(predicted_values)
        actual_unscaled = delay_itx(actual_values)

        sorting_idx = np.argsort(actual_unscaled.T[0])

        fig, ax = plt.subplots(figsize = (5, 3), dpi = 144)

        ax.plot(prediction_unscaled.T[0][sorting_idx],
                linestyle = 'none', marker = '.', markersize = 1,
                color = 'darkblue')

        ax.plot(actual_unscaled.T[0][sorting_idx],
                linestyle = 'none', marker = '.', markersize = 1, alpha = 0.50,
                color = '#E50000')       

        ax.set_title('std: %.9f' %np.std(prediction_unscaled.T[0][sorting_idx] - actual_unscaled.T[0][sorting_idx]))

        buf = io.BytesIO()
        fig.savefig(buf, format='png', dpi = 144)
        plt.close(fig)
        buf.seek(0)

        return buf.getvalue()

    num = num_flatnesses
    num_entries = num * 60

    MISG = []
    MSE = []
    
    
    train_data, train_dict = train_info
    train_batcher = Data_Creator(num, bl_data = train_data, bl_dict = train_dict, gains = gains, clean = clean)
    train_batcher.gen_data()

    test_data, test_dict = test_info
    test_batcher = Data_Creator(num, bl_data = test_data, bl_dict = test_dict, gains = gains, clean = clean)

    test_batcher.gen_data()
    saver = tf.train.Saver()

    with tf.Session() as session:

        if pretrained_model_path == None:
            session.run(tf.global_variables_initializer())
        else:
            saver.restore(session, pretrained_model_path)

        training_writer = tf.summary.FileWriter(log_dir + '/training', session.graph)
        testing_writer = tf.summary.FileWriter(log_dir + '/testing', session.graph)
        model_save_location = log_dir + '/trained_model.ckpt'   

        for epoch in range(num_epochs):

            training_inputs, training_targets = train_batcher.get_data(); train_batcher.gen_data()
            testing_inputs, testing_targets = test_batcher.get_data(); test_batcher.gen_data()  

            for j in range(int(num_entries/batch_size)):

                training_inputs_batch = training_inputs[j*batch_size:(j + 1)*batch_size].reshape(-1,1024)
                training_targets_batch = training_targets[j*batch_size:(j + 1)*batch_size].reshape(-1,1)
                

                session.run([network.optimizer], feed_dict = {network.X: training_inputs_batch,
                                                              network.targets: training_targets_batch,
                                                              network.sample_keep_prob : sample_keep_prob,
                                                              network.fcl_keep_prob : fcl_keep_prob}) 
            # Prediction: Scaled Train(ing results)   
            PST = session.run(network.predictions,
                              feed_dict = {network.X: training_inputs.reshape(-1,1024),
                                           network.sample_keep_prob : 1.,
                                           network.fcl_keep_prob : 1.}) 


            training_MISG, training_MSE, training_summary = session.run([network.MISG, network.MSE, network.summary],
                                                                        feed_dict = {network.X: training_inputs.reshape(-1,1024),
                                                                        network.targets: training_targets.reshape(-1,1),
                                                                        network.sample_keep_prob : 1.,
                                                                        network.fcl_keep_prob : 1.,
                                                                        network.image_buf: gen_plot(PST,training_targets)}) 

            sys.stdout.write('\r' + "Epoch: " + str(epoch) + ". Training: MISG = {:.6f}, MSE = {:.6f}".format(training_MISG, training_MSE))

            training_writer.add_summary(training_summary, epoch)
            training_writer.flush()  

            # Prediction: Scaled test(ing results)   
            PSt = session.run(network.predictions,
                              feed_dict = {network.X: testing_inputs.reshape(-1,1024),
                                           network.sample_keep_prob : 1.,
                                           network.fcl_keep_prob : 1.}) 

            testing_MISG, testing_MSE, testing_summary = session.run([network.MISG, network.MSE, network.summary],
                                                                      feed_dict = {network.X: testing_inputs.reshape(-1,1024),
                                                                                   network.targets: testing_targets.reshape(-1,1),
                                                                                   network.sample_keep_prob : 1.,
                                                                                   network.fcl_keep_prob : 1.,
                                                                                   network.image_buf: gen_plot(PSt,testing_targets)}) 

            sys.stdout.write('\r' + "Epoch: " + str(epoch) + ". Testing: MISG = {:.6f}, MSE = {:.6f}".format(testing_MISG, testing_MSE))

            testing_writer.add_summary(testing_summary, epoch)
            testing_writer.flush()  

            MISG.append((training_MISG, testing_MISG))
            MSE.append((training_MSE, testing_MSE))

            if (epoch + 1) % model_save_interval == 0:
                saver.save(session, model_save_location, epoch)



        print('\rDone')

        training_writer.close()
        testing_writer.close()

    session.close()

In [69]:
class FCN(object):
    """A neural network of fully connected layers.
    
    First layer has Leaky_ReLU activation with a trainable alpha. Other layers have ReLU activation.
    Input sample has dropout at rate 1 - sample_keep_prob.
    activations have dropout at rate 1 - fcl_keep_prob.
    """
    
    def __init__(self,
                 layer_nodes,
                 learning_rate = 0.0001,
                 dtype = tf.float32):
        
        self.layer_nodes = layer_nodes
        self.learning_rate = learning_rate
        self.dtype = dtype
        
        self.layers = []
        self.layer_names = ['layer_{}'.format(i) for i in range(len(self.layer_nodes))]
        
    def create_graph(self):
        
        tf.reset_default_graph()
        
        with tf.variable_scope('keep_probs'):

            self.sample_keep_prob = tf.placeholder(self.dtype, name = 'sample_keep_prob')

            self.fcl_keep_prob = tf.placeholder(self.dtype, name = 'fcl_keep_prob')  
            
        with tf.variable_scope('sample'):

            self.X = tf.placeholder(self.dtype, shape = [None, 1024], name = 'X')
            self.X = tf.nn.dropout(self.X, self.sample_keep_prob)
            
        with tf.variable_scope('input_layer'):
            
            b = tf.get_variable(name = 'bias', shape = [layer_nodes[0]],
                                initializer = tf.contrib.layers.xavier_initializer())
            
            w = tf.get_variable(name = 'weights', shape  = [1024, layer_nodes[0]],
                                initializer = tf.contrib.layers.xavier_initializer())
            
            self.leaky_relu_alpha = tf.get_variable(name = 'leaky_relu_alpha',
                                                    shape = [1],
                                                    dtype = self.dtype)
            
            layer = tf.nn.leaky_relu(tf.matmul(self.X, w) + b, alpha = self.leaky_relu_alpha)
            layer = tf.nn.dropout(layer, self.fcl_keep_prob)
            self.layers.append(layer)
            
        for i in range(len(layer_nodes)):
            if i > 0:
                with tf.variable_scope('layer_%d' %(i)):
                    layer = tf.contrib.layers.fully_connected(self.layers[i-1], layer_nodes[i])
                    layer = tf.nn.dropout(layer, self.fcl_keep_prob)
                    self.layers.append(layer)

                    
        with tf.variable_scope('prediction'):
            self.predictions = tf.contrib.layers.fully_connected(self.layers[-1], 1)


        with tf.variable_scope('targets'):
            self.targets  = tf.placeholder(self.dtype, shape = (None, 1), name = 'targets')

            
            
        with tf.variable_scope('costs'):

            error = tf.subtract(self.targets, self.predictions, name = 'error')
            squared_error = tf.square(error, name = 'squared_difference')

            with tf.variable_scope('mean_inverse_shifted_gaussian'):
                with tf.variable_scope('normal_distribution'):
                    sigma = tf.constant(0.00625, name = 'sigma')
                    normal_dist = tf.contrib.distributions.Normal(0.0, sigma, name = 'normal_dist')
                    gaussian_prob = normal_dist.prob(error, name = 'gaussian_prob')
                    shift = tf.constant(0.01, name = 'shift')
                    shifted_gaussian = tf.add(gaussian_prob, shift, name = 'shifted_gaussian')        

                self.MISG = tf.reduce_mean(tf.divide(1.0, shifted_gaussian), name = 'mean_inverse_shifted_gaussian')

            with tf.variable_scope('mean_squared_error'):
                self.MSE = tf.reduce_mean(squared_error)
        
        with tf.variable_scope('train'):
            self.optimizer = tf.train.AdamOptimizer(self.learning_rate, epsilon=1e-08).minimize(self.MISG)
            
        with tf.variable_scope('logging'):  

            with tf.variable_scope('image'):
                self.image_buf = tf.placeholder(tf.string, shape=[])
                epoch_image = tf.expand_dims(tf.image.decode_png(self.image_buf, channels=4), 0)

            with tf.variable_scope('percent_within_threshold'):
                self.PWT = tf.reduce_mean(tf.cast(tf.less_equal(tf.abs(self.targets - self.predictions), sigma), self.dtype) )

            tf.summary.histogram(name = 'targets', values = self.targets)
            tf.summary.histogram(name = 'predictions',values =  self.predictions)
            tf.summary.scalar(name = 'MSE', tensor = self.MSE)
            tf.summary.scalar(name = 'MISG', tensor = self.MISG)
            tf.summary.scalar(name = 'PWT', tensor = self.PWT)
            tf.summary.image('prediction_vs_actual', epoch_image)
            self.summary = tf.summary.merge_all()

### Setup Data

In [12]:
# extract the redundant baselines and their gains and data from miriad and calfits files
red_bls, gains, uvd = load_relevant_data('../zen_data/zen.2458098.58037.xx.HH.uv','../zen_data/zen.2458098.58037.xx.HH.uv.abs.calfits')

# seperate trining and testing redundant baselines 
# if we have not already done this, load them from disk
training_redundant_baselines_dict, testing_redundant_baselines_dict = get_or_gen_test_train_red_bls_dicts(red_bls, gains.keys())

# seperate the visiblites
training_baselines_data = get_seps_data(training_redundant_baselines_dict, uvd)
testing_baselines_data = get_seps_data(testing_redundant_baselines_dict, uvd)

In [72]:
### Create network
layer_nodes = [1024,512,256,128,64,32,16]
network = FCN(layer_nodes, learning_rate = 0.00001)
network.create_graph()

In [None]:
train(network,
      5,
      50,
      2,
      'logs/test',
      5,
      (training_baselines_data, training_redundant_baselines_dict),
      (testing_baselines_data, testing_redundant_baselines_dict),
      gains,
      clean = True)

Epoch: 17. Testing: MISG = 92.477112, MSE = 0.127349