# Sparkling Emoticana

In [1]:
import tensorflow as tf
import numpy as np
import random
import pandas as pd
import scipy.io
import os
import matplotlib.pyplot as plt
import librosa
import librosa.display
import tensorflowonspark as TFOS

%matplotlib inline

  from ._conv import register_converters as _register_converters


In [2]:
from __future__ import absolute_import
from __future__ import division
from __future__ import nested_scopes
from __future__ import print_function

from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

import argparse
import os
import subprocess
import sys
from datetime import datetime
import math
import numpy
import time

import tensorflow as tf
from tensorflowonspark import dfutil
from tensorflowonspark import TFNode
from tensorflowonspark.pipeline import TFEstimator, TFModel

In [4]:
sc

<br></br> <br></br> <br></br> <br></br>

## 1. function

In [47]:
def onehot_encoding(data, num=8) :
    return np.eye(num)[data]

In [None]:
def cutting(train, valid, test, size=1025, num=276) :
    result = []
    half = int(num/2)
    
    for dataset in [train, valid, test] :
        if not dataset :
            continue
            
        zero = np.zeros([len(dataset), size, num])
        emotion_lst = []

        idx = 0
        for spectrogram, emotion in dataset:
            mid = int(spectrogram.shape[1]/2)
            zero[idx, :, 0:len(spectrogram[0])] = spectrogram[:, mid-half:mid+half]
            emotion_lst.append(emotion-1)
            idx += 1
            
        result.append((zero, emotion_lst))
        
    return result

In [None]:
def load_wav_data(path) :
    file_lst = os.listdir(path)
    random.shuffle(file_lst)
    
    train = []
    valid = []
    test = []
    
    for file in file_lst :
        try : 
            y, sr = librosa.load(path+file)
            emotion = int(file.split("-")[2])
            actor = int(file.split("-")[6].split(".")[0])
        
            melspectrogram = librosa.feature.melspectrogram(y, sr=sr, n_mels=128)
        
            if actor in [1,2] :
                valid.append((melspectrogram, emotion))
            elif actor in [3,4] :
                test.append((melspectrogram, emotion))
            else :
                train.append((melspectrogram, emotion))
                
        except :
            pass
    
    return file_lst, train, valid, test

In [None]:
def load_wav_test_data(path) :
    file_lst = os.listdir(path)
    random.shuffle(file_lst)
    
    test = []
    
    for file in file_lst :
        try : 
            y, sr = librosa.load(path+file)
            emotion = int(file.split("-")[2])
            actor = int(file.split("-")[6].split(".")[0])
        
            melspectrogram = librosa.feature.melspectrogram(y, sr=sr, n_mels=128)
            test.append((melspectrogram, emotion))

                
        except :
            pass
    
    return test

In [None]:
def get_batch_data(df):
    # Convert from dict of named arrays to two numpy arrays of the proper type
    train_data = np.array(list(df.select('image').toPandas()['image'])).reshape([-1, 128, 126, 1])
    train_label = np.array(list(df.select('label').toPandas()['label'])).reshape([-1, 8])
        
    return (train_data, train_label)



In [None]:
def print_log(worker_num, arg):
    print("{0}: {1}".format(worker_num, arg))
    

In [None]:
def np_to_df(data, label) :
    data_rdd = sc.parallelize(data.reshape([-1, 128, 126]).tolist())
    label_rdd = sc.parallelize(label.reshape([-1, 8]).tolist())
    
    pair = data_rdd.zip(label_rdd)
    df = spark.createDataFrame(pair, ['image', 'label'])
    
    return df

<br></br> <br></br> <br></br> <br></br>

## 1. Model

In [12]:
class CNN() :
    def __init__(self, name):
        self.name = name
        
    def convolution(self, X_input, filters, kernel_size, strides, name, padding="SAME") :
        with tf.variable_scope(name) :
            bn = tf.layers.batch_normalization(X_input)
            conv = tf.layers.conv2d(bn, filters=filters, kernel_size=kernel_size, strides=strides, padding=padding, kernel_initializer=tf.contrib.layers.xavier_initializer())
            relu = tf.nn.leaky_relu(conv)
            
            return relu
            
    def build(self) :
        with tf.variable_scope(self.name) :
            ### Input
            #input : 128x126x1
            #output : 8
            self.X = tf.placeholder(tf.float32, [None, 128, 126, 1])
            self.Y = tf.placeholder(tf.float32, [None, 8])
            self.training = tf.placeholder(tf.bool)
            self.learning_rate = tf.placeholder(tf.float32)
            print(self.X.shape)
            
        ### Input Layer
        #input : 128x126x1
        #output : 32x31x8
        conv1 = self.convolution(self.X, 8, [3,3], 2, "conv1")
        pool1 = tf.layers.max_pooling2d(conv1, pool_size=[2,2], strides=2, name="pool1")
        print(conv1.shape)
        print(pool1.shape)

        ### Hidden Layer1
        #input : 32x31x8
        #output : 32x31x16
        conv2 = self.convolution(conv1, 16, [3,3], 1, "conv2")
        print(conv2.shape)
            
        ### Hidden Layer2
        #input : 32x31x16
        #output : 32x31x32
        conv3 = self.convolution(conv2, 32, [3,3], 1, "conv3")
        print(conv3.shape)
            
        ### Pooling Layer2
        #input : 32x31x32
        #output : 16x15x32
        pool2 = tf.layers.max_pooling2d(conv3, pool_size=[2,2], strides=2, name="pool2")
        print(pool2.shape)
            
        ### Hidden Layer3
        #input : 16x15x32
        #output : 16x15x64
        conv4 = self.convolution(pool2, 64, [3,3], 1, "conv4")
        print(conv4.shape)
        
        ### Hidden Layer4
        #input : 16x15x64
        #output : 16x15x128
        conv5 = self.convolution(conv4, 128, [3,3], 1, "conv5")
        print(conv5.shape)
        
        ### Pooling Layer3
        #input : 16x15x128
        #output : 8x7x128
        pool3 = tf.layers.max_pooling2d(conv5, pool_size=[2,2], strides=2, name="pool3")
        print(pool3.shape)
        
        ### Hidden Layer5
        #input : 8x7x128
        #output : 8x7x32
        conv6 = self.convolution(pool3, 32, [1,1], 1, "conv6")
        print(conv6.shape)
        
        with tf.variable_scope("global_avg_pooling") :
            ### global avg pooling
            #input : 8x7x32
            #output : 1x1x32
            global_avg_pooling = tf.reduce_mean(conv6, [1, 2], keep_dims=True)
            print(global_avg_pooling.shape)
        
        with tf.variable_scope("fully_connected") :
            ###Output Layer
            #input : 1x1x32
            #ouput : 8
            shape = global_avg_pooling.get_shape().as_list()
            dimension = shape[1] * shape[2] * shape[3]
            self.flat = tf.reshape(global_avg_pooling, shape=[-1, dimension])

            fc = tf.layers.dense(inputs=self.flat, units=8, kernel_initializer=tf.contrib.layers.xavier_initializer())
            self.logits = fc

        self.cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=self.logits, labels=self.Y))
        self.optimizer = tf.train.AdamOptimizer(learning_rate=self.learning_rate).minimize(self.cost)

        correct_prediction = tf.equal(tf.argmax(self.logits, 1), tf.argmax(self.Y, 1))     
        self.accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
        
    def set_sess(self, sess) :
        self.sess = sess

    def predict(self, x_test, training=False):
        feed_dict={self.X: x_test, self.training: training}
        
        return self.sess.run(self.logits, feed_dict=feed_dict)

    def get_accuracy(self, x_test, y_test, training=False):
        feed_dict={self.X: x_test,self.Y: y_test, self.training: training}
        
        return self.sess.run(self.accuracy, feed_dict=feed_dict)

    def train(self, x_data, y_data, learning_rate, training=True):
        feed_dict={self.X: x_data, self.Y: y_data, self.learning_rate: learning_rate, self.training: training}
        
        return self.sess.run([self.cost, self.optimizer], feed_dict=feed_dict)
    
    def evaluate(self, X_input, Y_input, batch_size=None, training=False):
        N = X_input.shape[0]
            
        total_loss = 0
        total_acc = 0
            
        for i in range(0, N, batch_size):
            X_batch = X_input[i:i + batch_size]
            Y_batch = Y_input[i:i + batch_size]
                
            feed_dict = {self.X: X_batch, self.Y: Y_batch, self.training: training}
                
            loss = self.cost
            accuracy = self.accuracy
                
            step_loss, step_acc = self.sess.run([loss, accuracy], feed_dict=feed_dict)
                
            total_loss += step_loss * X_batch.shape[0]
            total_acc += step_acc * X_batch.shape[0]
            
        total_loss /= N
        total_acc /= N
            
        return total_loss, total_acc
    
    def save(self, ver) :
        saver = tf.train.Saver()
        save_path = saver.save(self.sess, "CNN_" + str(ver) + ".ckpt")
        
        print("Model saved in path: %s" % save_path)
                 

<br></br> <br></br> <br></br> <br></br>

## 2. Setting

In [None]:
num_executors = 3

In [8]:
cwd = os.getcwd()
model_dir = os.sep.join([cwd, "CNN_ckpt/ver_1"])       # path to TensorFlow model/checkpoint
export_dir = os.sep.join([cwd, "CNN_tfos/ver_1"])      # path to TensorFlow saved_model export
output = os.sep.join([cwd, "CNN_result/ver_1"])        # path to output of inferencing
data_dir = os.sep.join([cwd, "data/wav"]) 

print(model_dir)
print(export_dir)
print(output)

/home/ubuntu/Conference_2018_1/CNN_ckpt/ver_1
/home/ubuntu/Conference_2018_1/CNN_tfos/ver_1
/home/ubuntu/Conference_2018_1/CNN_result/ver_1


In [None]:
parser = argparse.ArgumentParser()

parser.add_argument("--batch_size", help="number of records per batch", type=int, default=40)
parser.add_argument("--epochs", help="number of epochs", type=int, default=50)
parser.add_argument("--model_dir", help="HDFS path to save/load model during train/inference", type=str)
parser.add_argument("--export_dir", help="HDFS path to export saved_model", type=str)
parser.add_argument("--cluster_size", help="number of nodes in the cluster", type=int, default=num_executors)
parser.add_argument("--num_ps", help="number of PS nodes in cluster", type=int, default=1)
parser.add_argument("--protocol", help="Tensorflow network protocol (grpc|rdma)", default="grpc")
parser.add_argument("--steps", help="maximum number of steps", type=int, default=1000)
parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true")

parser.add_argument("--format", help="format: wav", default="wav")
parser.add_argument("--datas", help="HDFS path to MNIST data in parallelized format")
parser.add_argument("--output", help="HDFS path to save test/inference output", default="predictions")

parser.add_argument("--train", help="train a model using Estimator", action="store_true")
parser.add_argument("--inference_mode", help="type of inferencing (none|checkpoint|signature|direct)", choices=["none","signature","direct","checkpoint"], default="none")
parser.add_argument("--inference_output", help="output type for inferencing (predictions|features)", choices=["predictions","features"], default="predictions")

In [None]:
args = parser.parse_args(["--model_dir", model_dir, \
                          "--export_dir", export_dir, \
                          "--output", output, \
                          "--datas", data_dir, \
                          "--train", \
                          "--inference_mode", "checkpoint", \
                          "--inference_output", "predictions"])

<br></br> <br></br> <br></br> <br></br>

## 3. TFoS

In [None]:
def CNN_funtion(args, ctx):
    
    # Delay PS nodes a bit, since workers seem to reserve GPUs more quickly/reliably (w/o conflict)
    if job_name == "ps":
        time.sleep((worker_num + 1) * 5)
        
    # Get TF cluster and server instances
    cluster, server = TFNode.start_cluster_server(ctx, 0, args.protocol == 'rdma')

    worker_num = ctx.worker_num
    job_name = ctx.job_name
    task_index = ctx.task_index
    
    height = 128
    width = 126
    batch_size = args.batch_size
    
    if job_name == "ps":
        server.join()
    elif job_name == "worker":

        # Assigns ops to the local worker by default.
        with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % task_index, cluster=cluster)):
            model = CNN("CNN")
            global_step = tf.Variable(0)
            
            logit = model.logits
            loss = model.cost
            optimizer = model.optimizer
            accuracy = model.accuracy
            
            tf.summary.scalar("loss", loss)
            tf.summary.scalar("accuracy", accuracy)

            saver = tf.train.Saver()
            summary_op = tf.summary.merge_all()
            initializer = tf.global_variables_initializer()

        # Create a "supervisor", which oversees the training process and stores model state into HDFS
        logdir = TFNode.hdfs_path(ctx, args.model_dir)
        summary_writer = tf.summary.FileWriter("tensorboard_%d" % (worker_num), graph=tf.get_default_graph())
        print("tensorflow model path: {0}".format(logdir))

        sv = tf.train.Supervisor(is_chief=(task_index == 0),
                                 logdir=logdir,
                                 init_op=initializer,
                                 summary_op=None,
                                 saver=saver,
                                 global_step=global_step,
                                 stop_grace_secs=300,
                                 save_model_secs=5)

        # The supervisor takes care of session initialization, restoring from
        # a checkpoint, and closing when done or an error occurs.
        with sv.managed_session(server.target) as sess:
            print("{0} session ready".format(datetime.now().isoformat()))

            step = 0
            tf_feed = TFNode.DataFeed(ctx.mgr, input_mapping=args.input_mapping)

            while not sv.should_stop() and not tf_feed.should_stop() and step < args.steps:
                batch_xs, batch_ys = get_batch_data(tf_feed.next_batch(batch_size))
                 
                feed_dict1 = {mode.X: batch_xs, model.Y: batch_ys, model.learning_rate: 0.008, self.training:True}
                feed_dict2 = {mode.X: batch_xs, model.Y: batch_ys, model.learning_rate: 0.008, self.training:False}
                
                if len(batch_xs) > 0:
                    _, summary, step = sess.run([optimizer, summary_op, global_step], feed_dict=feed_dict1)

                    if (step % 20 == 0):
                        print("{0} step: {1} accuracy: {2}".format(datetime.now().isoformat(), step, sess.run(accuracy, feed_dict = feed_dict2)))

                    if sv.is_chief:
                        summary_writer.add_summary(summary, step)

            if sv.should_stop() or step >= args.steps:
                tf_feed.terminate()

            if sv.is_chief and args.export_dir:
                print("{0} exporting saved_model to: {1}".format(datetime.now().isoformat(), args.export_dir))

                signatures = {
                  tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: {
                    'inputs': {'image': self.X},
                    'outputs': {'prediction': logit},
                    'method_name': tf.saved_model.signature_constants.PREDICT_METHOD_NAME
                  },
                  'featurize': {
                    'inputs': {'image': model.X},
                    'outputs': {'features': model.flat},
                    'method_name': 'featurize'
                  }
                }
                TFNode.export_saved_model(sess,
                                          args.export_dir,
                                          tf.saved_model.tag_constants.SERVING,
                                          signatures)
                
            else:
                while not sv.should_stop():
                    print("Waiting for chief")
                    time.sleep(5)

        print("{0} stopping supervisor".format(datetime.now().isoformat()))
        sv.stop()

In [None]:
if args.format == "wav" and args.train :
    file_lst, train, valid, test = load_wav_data(args.data)
    cut_train, cut_valid, cut_test = cutting(train, valid, test, size =128 , num=126)

    train_data = cut_train[0].reshape([-1, 128, 126, 1])
    train_label = onehot_encoding(cut_train[1])
    
    test_data = cut_test[0].reshape([-1, 128, 126, 1])
    test_label = onehot_encoding(cut_test[1])
 
    df_train = np_to_df(train_data, train_label)
    df_test = np_to_df(test_data, testlabel)
    
elif args.format == "wav" and (not args.train) :
    test = load_wav_test_data(args.data)
    cut_test = cutting_test([], [], test, size=128, num=126)
    
    test_data = cut_train[0].reshape([-1, 128, 126, 1])
    test_label = onehot_encoding(cut_train[1])
    
    df = np_to_df(test_data, test_label)
    
else:
    raise Exception("Unsupported format: {}".format(args.format))

In [None]:
if args.train:
    tf_args = { 'initial_learning_rate': 0.01, 'num_epochs_per_decay': 2.0, 'learning_rate_decay_factor': 0.95 }
    estimator = TFEstimator(CNN_function, tf_args) \
          .setInputMapping({'image':'image', 'label':'label'}) \
          .setModelDir(args.model_dir) \
          .setExportDir(args.export_dir) \
          .setClusterSize(args.cluster_size) \
          .setNumPS(args.num_ps) \
          .setProtocol(args.protocol) \
          .setTensorboard(args.tensorboard) \
          .setEpochs(args.epochs) \
          .setBatchSize(args.batch_size) \
          .setSteps(args.steps)
    model = estimator.fit(df)
    
else:
    model = TFModel(args) \
        .setExportDir(args.export_dir) \
        .setBatchSize(args.batch_size)

In [None]:
if args.inference_mode == 'none':
    sys.exit(0)
    
elif args.inference_mode == 'checkpoint':
    model.setModelDir(args.model_dir)                         # load model from checkpoint at args.model_dir
    model.setExportDir(None)                                  # don't use a saved_model
    model.setInputMapping({'image':'model.X'})                      # map DataFrame 'image' column to the 'x' input tensor
    if args.inference_output == 'predictions':
        model.setOutputMapping({'prediction':'col_out'})      # map 'prediction' output tensor to output DataFrame 'col_out' column
    else:  # args.inference_output == 'features':
        model.setOutputMapping({'prediction':'col_out', 'Relu':'col_out2'})   # add 'Relu' output tensor to output DataFrame 'col_out2' column

elif args.inference_mode == 'signature':
    model.setModelDir(None)                                   # don't use the model checkpoint
    model.setExportDir(args.export_dir)                       # load saved_model from args.export_dir
    model.setTagSet(tf.saved_model.tag_constants.SERVING)     # using default SERVING tagset
    model.setInputMapping({'image':'image'})                  # map DataFrame 'image' column to the 'image' input tensor alias of signature
    if args.inference_output == 'predictions':
        model.setSignatureDefKey(tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY)   # default signature def key, i.e. 'predict'
        model.setOutputMapping({'prediction':'col_out'})      # map 'prediction' output tensor alias to output DataFrame 'col_out' column
    else:  # args.inference_output == 'features'
        model.setSignatureDefKey('featurize')                 # custom signature def key
        model.setOutputMapping({'features':'col_out'})        # map 'features' output tensor alias to output DataFrame 'col_out' column

else:  
    model.setModelDir(None)                                   # don't use the model checkpoint
    model.setExportDir(args.export_dir)                       # load saved_model from args.export_dir
    model.setTagSet(tf.saved_model.tag_constants.SERVING)     # using default SERVING tagset
    model.setInputMapping({'image':'model.X'})                      # map DataFrame 'image' column to the 'x' input tensor
    if args.inference_output == 'predictions':
        model.setOutputMapping({'prediction': 'col_out'})     # map 'prediction' output tensor to output DataFrame 'col_out' column
    else:  # args.inference_output == 'features'
        model.setOutputMapping({'prediction': 'col_out', 'Relu': 'col_out2'})   # add 'Relu' output tensor to output DataFrame 'col_out2' column

In [None]:
print("{0} ===== Model.transform()".format(datetime.now().isoformat()))
preds = model.transform(df)
preds.write.json(args.output)