In [1]:
def keras_experiment():
    # Imports
    from hops import hdfs
    from hops import numpy_helper as nph
    from hops import experiment

    from sklearn.preprocessing import OneHotEncoder

    from tensorflow import keras
    from tensorflow.keras import layers as l
    import random
    import numpy as np
    
    def log(*args):
        hdfs.log(str(args))

    # Constants
    dataset_path = 'hdfs:///Projects/HH19_Mentors/hh19_dataset/'
    num_samples = np.array([1500, 2000, 2500, 3000, 3500, 4000]) # Samples per class
    
    # Load data
    A = nph.load(dataset_path + 'A.npy')

    B_train_file = nph.load(dataset_path + 'C_train.npz')
    B_train = {n : (B_train_file['images_'+str(n)],
                    B_train_file['labels_'+str(n)]) for n in num_samples}

    B_test_file = nph.load(dataset_path + 'C_test.npy')
    #B_test = (B_test_file['images'],
     #         B_test_file['labels'])
    
    #C_test = nph.load(dataset_path + 'C_test.npy')

    print(A.shape)

    for num, b in B_train.items():
        print(num, b[0].shape, b[1].shape)

    
    
    # Pre-processing
    ohe = OneHotEncoder(categories='auto', sparse=False)
    ohe.fit(np.arange(10).reshape(-1,1))

    B_train_onehot = {n : (b[0], ohe.transform(b[1])) for n,b in B_train.items()}
    #t_onehot = B_test[0], ohe.transform(B_test[1])

    for num, b in B_train_onehot.items():
        print(num, b[1].shape)

    
    
    from tensorflow.keras.layers import Input, Dense, Convolution2D, MaxPooling2D, UpSampling2D
    from tensorflow.keras.layers import Input, Dense, Conv2D, MaxPooling2D, UpSampling2D
    from tensorflow.keras.models import Model
    from tensorflow.keras import regularizers
    from tensorflow.keras import backend as K

    input_img = Input(shape=(32, 32, 3))  # adapt this if using `channels_first` image data format
    lambda_val = 0.001

    def sparse_reg(activ_matrix):
        p = 0.01
        beta = 3
        p_hat = K.mean(activ_matrix) # average over the batch samples
        print("p_hat = ",p_hat)
        #KLD = p*(K.log(p)-K.log(p_hat)) + (1-p)*(K.log(1-p)-K.log(1-p_hat))
        KLD = p*(K.log(p/p_hat)) + (1-p)*(K.log(1-p/1-p_hat))
        print("KLD = ", KLD)
        return beta * K.sum(KLD) # sum over the layer units

    x = Conv2D(16, (3, 3), activation='relu', padding='same')(input_img)
    x = MaxPooling2D((2, 2), padding='same')(x)
    x = Conv2D(8, (3, 3), activation='relu', padding='same')(x)
    x = MaxPooling2D((2, 2), padding='same')(x)
    x = Conv2D(8, (3, 3), activation='relu', padding='same', 
               kernel_regularizer=regularizers.l2(lambda_val/2),activity_regularizer=sparse_reg)(x)
    encoded = MaxPooling2D((2, 2), padding='same')(x)

    # at this point the representation is (4, 4, 8) i.e. 128-dimensional

    x = Conv2D(8, (3, 3), activation='relu', padding='same')(encoded)
    x = UpSampling2D((2, 2))(x)
    x = Conv2D(8, (3, 3), activation='relu', padding='same')(x)
    x = UpSampling2D((2, 2))(x)
    x = Conv2D(16, (3, 3), activation='relu', padding='same')(x)
    x = UpSampling2D((2, 2))(x)
    decoded = Conv2D(3, (3, 3), activation='sigmoid', padding='same', 
                     kernel_regularizer=regularizers.l2(lambda_val/2),activity_regularizer=sparse_reg)(x)

    autoencoder = Model(input_img, decoded)
    autoencoder.compile(optimizer='adam', loss='binary_crossentropy')

    encoder = Model(input_img, encoded)
    
    A = nph.load(dataset_path + 'A.npy')
    A = A.astype('float32')/255.
    x_data = A
    x_train = x_data[:800,:]
    x_test = x_data[800:1000,:]
    
    autoencoder.fit(x_train, x_train,
                epochs=30,
                batch_size=256,
                shuffle=True,
                validation_data=(x_test, x_test))
    
    def new_classifier():
        model = keras.Sequential()
        model.add(l.Conv2D(32, 2, activation='relu', padding='same', input_shape=(4,4,8)))
        model.add(l.MaxPool2D(pool_size=4))
        model.add(l.Conv2D(32, 2, activation='relu', padding='same'))
#         model.add(l.MaxPool2D(pool_size=4))
        model.add(l.Flatten())
        model.add(l.Dense(64, activation='relu'))
        model.add(l.Dense(10, activation='softmax'))

        model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
        return model

    log(new_classifier().summary())
    
    def shuffle_split(list1, list2, split_percent): 
        combination = list(zip(list1, list2)) 
        random.shuffle(combination) 
        list1, list2 = zip(*combination) 
        list1_train = list1[:int(split_percent*len(list1))] 
        list1_val = list1[int(split_percent*len(list1)):] 
        list2_train = list2[:int(split_percent*len(list1))] 
        list2_val = list2[int(split_percent*len(list1)):] 
        return np.array(list1_train), np.array(list1_val), np.array(list2_train), np.array(list2_val)

    # Train
    results ={}
    for n in num_samples:
        classifier = new_classifier()
    #     B_train_decoded = autoencoder_B(B_train_onehot[n])
    #     B_train_decoded = autoencoder.predict(B_train_onehot[n][0])
        B_decoded = encoder.predict(B_train_onehot[n][0])
#         log(B_train_decoded)
        B_train_decoded, B_validation_decoded, B_train_label, B_validation_label = shuffle_split(B_decoded, B_train_onehot[n][1],0.9)
        
        history = classifier.fit(B_train_decoded, B_train_label, batch_size=64, epochs=30,
                                 validation_data=(B_validation_decoded,B_validation_label), verbose=False)
        
        B_test_decoded = encoder.predict(B_test_file)
        results[n] = classifier.predict(B_test_decoded, batch_size=64)
#         log(np.max(history.history['val_acc']))
        print(n, np.max(history.history['val_acc']))

    predictions = {'pred_{}'.format(n) : np.argmax(result, axis=-1) for n,result in results.items()}
    
    log(predictions)
    
#     # Evaluate
#     def accuracy(y_true, y_pred):
#         y_true = y_true.reshape(-1)
#         y_pred = y_pred.reshape(-1)
#         return np.mean(y_true == y_pred)

#     accuracies = np.array([accuracy(B_test[1], pred) for n,pred in predictions.items()])
#     weights = (0.001*num_samples-0.5)
#     score = np.sum(accuracies / weights)

#     print(accuracies)
#     log(accuracies)
#     log(accuracies / weights)
#     log('Final score:', score)
    
    # Save predictions
    import os
    def savez(hdfs_filename, **data):
        local_file = os.path.basename(hdfs_filename)
        np.savez(local_file, **data)
        hdfs_path = hdfs._expand_path(hdfs_filename, exists=False)
        if local_file in hdfs_path:
            hdfs_path = hdfs_path.replace(local_file, "")
        hdfs.copy_to_hdfs(local_file, hdfs_path, overwrite=True)

    savez('eval/predictions.npz', **predictions)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1206,application_1573234309149_0939,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
from hops import experiment
experiment.launch(keras_experiment, name='test_classifier', local_logdir=True)


Finished Experiment 

'hdfs://10.0.104.196:8020/Projects/HH19_Group6/Experiments/application_1573234309149_0939/launcher/run.1'
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])