In [0]:
import tensorflow as tf
import numpy as np
import time
import os
import zipfile
import pandas as pd
from google.colab import drive
import matplotlib.pyplot as plt

In [0]:
#constants

checkpoint_dir = 'model_ckpt'
tr_summaries_dir = os.path.join(checkpoint_dir,'training_summary')
val_summaries_dir = os.path.join(checkpoint_dir,'validation_summary')
ts_summaries_dir = os.path.join(checkpoint_dir,'testing_summary')

validation_start_partition = 0.9
test_start_partition = 0.95

## Preprocessing stuff

### TODO: Modify methods to preprocess image and label.

In [0]:
def extract_datazip(zipfile_path='gdrive/My Drive/img_align_celeba.zip', validation_start_id=None, test_start_id=None, extract_path='celebA'):
  """
  Extract dataset zip file at 'zipfile_path' to train, valid and test directory in 'extract_path'.
  Creating directory structure:
  extract_path/
    train/img_align_celeba/
    valid/img_align_celeba/
    test/img_align_celeba/
    
  return:
    list, list, list : list of files in train, valid and test directories
    dict : dict of paths 
  
  """
  
  zfile = zipfile.ZipFile(zipfile_path)
  zlist = zfile.namelist()[1:]
  
  if validation_start_id ==None:
    validation_start_id = int(len(zlist) * validation_start_partition)
  if test_start_id ==None:
    test_start_id = int(len(zlist) * test_start_partition)
    
  zlist_train = zlist[ : validation_start_id]
  zlist_valid = zlist[validation_start_id : test_start_id]
  zlist_test = zlist[test_start_id : ]
  
  print('Extracting train images at {}'.format(extract_path), end='\t')
  zfile.extractall(os.path.join(extract_path, 'train'), zlist_train)
  print('done')
  print('Extracting validation images at {}'.format(extract_path), end='\t')
  zfile.extractall(os.path.join(extract_path, 'valid'), zlist_valid)
  print('done')
  print('Extracting test images at {}'.format(extract_path), end='\t')
  zfile.extractall(os.path.join(extract_path, 'test'), zlist_test)
  print('done')
  return zlist_train, zlist_valid, zlist_test, {'train': os.path.join(extract_path,'train'), 'valid': os.path.join(extract_path,'valid'), 'test': os.path.join(extract_path,'test')}

In [0]:
def label_from_file(filepath='gdrive/My Drive/list_attr_celeba.txt', validation_start_id=None, test_start_id=None):
  """
  decode attribute file into train, validation, test labels.
  
  parameters:
    filepath: path to csv file
    validation_start_id : index of start of validation label in csv file
    test_start_id : index of start of test label in csv file
  
  return:
    dict : dict of list containing (index, label) pairs.
  
  """
  
  lbl = pd.read_csv('gdrive/My Drive/list_attr_celeba.txt', sep='\s+', skiprows=1)
  lbl = lbl.replace(to_replace=-1, value=0)
  lbl_values = lbl.as_matrix()
  lbl_indexes = lbl.index.get_values()
  lbl_size = lbl_values.shape[0]
  
  if validation_start_id == None:
    validation_start_id = int(lbl_size * validation_start_partition)
  if test_start_id == None:
    test_start_id = int(lbl_size * test_start_partition)
  
  lbl_train = lbl_values[ : validation_start_id]
  lbl_valid = lbl_values[validation_start_id : test_start_id]
  lbl_test  = lbl_values[test_start_id : ]

  index_train = lbl_indexes[ : validation_start_id]
  index_valid = lbl_indexes[validation_start_id : test_start_id]
  index_test  = lbl_indexes[test_start_id : ]
  
  return {'train':[index_train, lbl_train], 'valid': [index_valid, lbl_valid], 'test': [index_test, lbl_test]}

In [0]:
def gen_batch(image_dir_path='celebA/train', lbl_list=None, batch_size=32):
  '''
  Returns a one shot iterator for dataset, which when run through session produces [img, img_file_name], img_label
  '''
  def decode(filename):
    raw_file = tf.io.read_file(filename)
    img = tf.divide(tf.image.decode_jpeg(raw_file, channels=3),255)
    return tf.reshape(img, shape=[218, 178, 3])
  
  assert not lbl_list == None, "lbl_list not referenced"  
  assert len(lbl_list) ==2, "lbl_list should have index and label"
  
  img_list = tf.matching_files(image_dir_path+'/img_align_celeba/*.jpg')
  dataset = tf.data.Dataset.from_tensor_slices((img_list, lbl_list[0], lbl_list[1]))
  dataset = dataset.shuffle(buffer_size=len(lbl_list))
  dataset = dataset.map(lambda x,y,z : [decode(x), y, tf.cast(z, dtype= tf.uint8) ])
  dataset = dataset.batch(batch_size).repeat()
  return dataset.make_one_shot_iterator()
  

## Model Definition

In [0]:
def get_model(X, output_units):
  """
  returns tensorflow model network 
  
  parameters:
  X : tensor, input to the network
  output_units: 
  """
  sig_cond = tf.Variable([0.5]*output_units)
  
  layer = tf.layers.conv2d(inputs=X, filters=16, kernel_size=[3,3], kernel_initializer= tf.initializers.random_uniform(),  )
  layer = tf.layers.max_pooling2d(inputs = layer, pool_size=[2,2], strides=2)
  layer = tf.nn.relu(layer)
  layer = tf.layers.conv2d(inputs=layer, filters=64, kernel_size=[3,3], kernel_initializer= tf.initializers.random_uniform(), )
  layer = tf.layers.max_pooling2d(inputs = layer, pool_size=[2,2], strides=2)
  layer = tf.nn.relu(layer)
  layer = tf.layers.flatten(inputs = layer,)
  layer = tf.layers.dense(inputs = layer, units=256, kernel_initializer=tf.initializers.random_uniform(), )
  layer = tf.nn.relu(layer)
  layer = tf.layers.dense(inputs= layer, units = 128, kernel_initializer=tf.initializers.random_uniform(), )
  layer = tf.nn.relu(layer)
  logit =  tf.layers.dense(inputs= layer, units = output_units, kernel_initializer=tf.initializers.random_uniform(), )
  Y_ = tf.nn.sigmoid(logit,)
  Y_ = tf.cast(x = tf.greater(tf.cast(Y_, tf.float32), sig_cond), dtype=tf.int8,  name = 'predictions')
  return logit, Y_

## Graph Definition

In [0]:
def get_graph(graph_type=None, summaries_dir=None, *args, **kwargs):
  
  # argument validation
  assert graph_type in ['train', 'valid', 'test'], 'Invalid graph type'
  assert summaries_dir !=None, 'summary directory not defined'
  if graph_type=='train':
    assert 'lr' in kwargs.keys() != None, 'training graph must have learing rate' 
  
  graph = tf.Graph()  
  with graph.as_default():
    with tf.variable_scope('training_graph'):
      #datasets:
      iterator = gen_batch(image_dir_path=path_dict[graph_type], lbl_list=lbl[graph_type])
      batch = iterator.get_next()
      saveable = tf.contrib.data.make_saveable_from_iterator(iterator)
      # TODO: figure out how to save iterator state
      #  ::::HERE::::
      X = batch[0]
      Y = tf.stop_gradient(batch[2])      

      #model
      logit, predictions = get_model(X, Y.shape[-1])    

      #ops
      loss_op = tf.reduce_mean( tf.nn.sigmoid_cross_entropy_with_logits(labels=tf.cast(Y, tf.float32), logits=logit))
      accuracy_op, update_op = tf.metrics.accuracy( labels=Y, predictions=Y_, name='accuracy')
      acc_mat_init = tf.variables_initializer( graph.get_collection('local_variables', scope='training_graph/accuracy'))

      if type == 'train':
        #optimiser
        #TODO: implement adaptive learning rate method 
        #  ::::HERE::::
        global_step = tf.Variable(0, name='global_step')
        optimizer = tf.train.AdamOptimizer( learning_rate=kwargs['lr'] )
        train_op = optimizer.minimize(loss_op, global_step=global_step)
      else:
        train_op = None
          

      # summary and checkpoint
      tf.summary.scalar('loss', loss_op)
      tf.summary.scalar('accuracy', accuracy_op)
      # TODO: add other summary items eg. learning rate
      #  :::HERE:::
      
      summaries = tf.summary.merge_all()
      summary_writer = tf.summary.FileWriter( summaries_dir, graph )
      config_proto = tf.ConfigProto(allow_soft_placement=True, **kwargs)
      session = tf.Session( config= config_proto )
      
      return {'logits':logit, 
              'predictions':predictions, 
              'loss':loss_op, 
              'train_op':train_op, 
              'accuracy':accuracy_op, 
              'update_acc':update_op, 
              'acc_mat_init':acc_mat_init, 
              'summaries':summaries,
              'summary_writer':summary_writer, 
              'session':session, }

##Train the model

In [0]:
def train_model():
  # TODO: write training loop
  pass

##Test Model

In [0]:
def test_model():
  # TODO: write testing loop
  pass

##STUFF 
###testing area

In [0]:
# testing step
with tf.device("/device:GPU:0"):
  with tf.name_scope('test_loop'):
    num_ts_batches = ts_x.shape[0] //batch_size
    with tf.Session(graph=graph, config=tf.ConfigProto(allow_soft_placement=True,log_device_placement=True)) as sess:
      saver.restore(sess=sess, save_path= tf.train.latest_checkpoint( checkpoint_dir=checkpoint_dir))
      sess.run([running_variables_initializer])
      for batch_inst in range(num_ts_batches):
        ts_batch= sess.run(test_batch)
        loss, _, accuracy = sess.run([loss_op, update_op, accuracy_op], feed_dict={X:ts_batch[0], Y:ts_batch[1]})
      print("loss: {:.6f}, accuracy:{:.6f}".format(loss,accuracy))

img_align_celeba


In [0]:
with tf.Session() as sess:
  refr = tf.Variable([0.5]*40)
  sess.run( tf.global_variables_initializer())
  tr_l = sess.run([train_batch[2],tf.cast(tf.greater(tf.cast(train_batch[2], tf.float32), refr), tf.int8)])
  

In [0]:
tr_l[0][0],tr_l[1][0] 

(array([0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1,
        0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1], dtype=uint8),
 array([0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1,
        0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1], dtype=int8))

## Run

In [0]:
drive.mount('/content/gdrive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&scope=email%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdocs.test%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive.photos.readonly%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fpeopleapi.readonly&response_type=code

Enter your authorization code:
··········
Mounted at /content/gdrive


In [0]:
validation_start_id, test_start_id= get_dataset_partition_index()

In [0]:
list_train, list_valid, list_test, path_dict = extract_datazip(validation_start_id=validation_start_id, test_start_id=test_start_id)

Extracting train images at celebA	done
Extracting validation images at celebA	done
Extracting test images at celebA	done


In [0]:
lbl = label_from_file(validation_start_id=validation_start_id, test_start_id=test_start_id)

In [0]:
train_model()

In [0]:
test_model()