## Bi-LSTM video classification

In [1]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import tensorflow as tf
import seaborn as sns
from IPython.display import YouTubeVideo
import matplotlib.pyplot as plt
import plotly.plotly as py
import multiprocessing as mp # if we want to parallelize i/o

# keras imports
from keras.layers import Dense, Input, LSTM, Dropout, Bidirectional
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras.layers.normalization import BatchNormalization
from keras.layers.embeddings import Embedding
from keras.layers.merge import concatenate
from keras.callbacks import TensorBoard
from keras.models import load_model
from keras.models import Model
import operator
import time 
import gc
import os

import os
from glob import glob
from tqdm import tqdm
import sys
import timeit


from pyspark import SparkConf, SparkContext
from pyspark.sql.types import *
import pyspark
import numpy as np
from elephas.spark_model import SparkModel

Using TensorFlow backend.




In [2]:
###########
#SPECIFY PARAMS FIRST
###########

# 10 class problem for now?
label_feature_size = 1000

# how many frames we will use from each video?
max_frame_rgb_sequence_length = 100
frame_rgb_embedding_size = 1024

# how many audio sequences we will use from each video?
max_frame_audio_sequence_length = 100
frame_audio_embedding_size = 128

number_dense_units = 1000
number_lstm_units = 100
rate_drop_lstm = 0.2
rate_drop_dense = 0.2
activation_function='relu'
validation_split_ratio = 0 # to use all


def create_model():
    """Create and store best model at `checkpoint` path ustilising bi-lstm layer for frame level data of videos"""
    
    # Creating 2 bi-lstm layer, one for rgb and other for audio level data
    lstm_layer_1 = Bidirectional(LSTM(number_lstm_units, dropout=rate_drop_lstm, recurrent_dropout=rate_drop_lstm))
    lstm_layer_2 = Bidirectional(LSTM(number_lstm_units, dropout=rate_drop_lstm, recurrent_dropout=rate_drop_lstm))
    
    # creating input layer for frame-level data
    frame_rgb_sequence_input = Input(shape=(max_frame_rgb_sequence_length, frame_rgb_embedding_size), dtype='float32')
    frame_audio_sequence_input = Input(shape=(max_frame_audio_sequence_length, frame_audio_embedding_size), dtype='float32')
    frame_x1 = lstm_layer_1(frame_rgb_sequence_input)
    frame_x2 = lstm_layer_2(frame_audio_sequence_input)
    
    #creating input layer for video-level data 
    vid_shape=(1024,)
    video_rgb_input = Input(shape=vid_shape)
    video_rgb_dense = Dense(int(number_dense_units/2), activation=activation_function, input_shape=vid_shape)(video_rgb_input)
    
    aud_shape=(128,)
    video_audio_input = Input(shape=aud_shape)
    video_audio_dense = Dense(int(number_dense_units/2), activation=activation_function,input_shape = aud_shape)(video_audio_input)
    
    # merging frame-level bi-lstm output and later passed to dense layer by applying batch-normalisation and dropout
    merged_frame = concatenate([frame_x1, frame_x2])
    merged_frame = BatchNormalization()(merged_frame)
    merged_frame = Dropout(rate_drop_dense)(merged_frame)
    merged_frame_dense = Dense(int(number_dense_units/2), activation=activation_function)(merged_frame)
    
    # merging video-level dense layer output
    merged_video = concatenate([video_rgb_dense, video_audio_dense])
    merged_video = BatchNormalization()(video_rgb_dense)
    merged_video = Dropout(rate_drop_dense)(merged_video)
    merged_video_dense = Dense(int(number_dense_units/2), activation=activation_function)(merged_video)

    
    # merging frame-level and video-level dense layer output
    merged = concatenate([merged_frame_dense, merged_video_dense])
    merged = BatchNormalization()(merged)
    merged = Dropout(rate_drop_dense)(merged)
     
    merged = Dense(number_dense_units, activation=activation_function)(merged)
    merged = BatchNormalization()(merged)
    merged = Dropout(rate_drop_dense)(merged)
    preds = Dense(label_feature_size, activation='sigmoid')(merged)
    
    model = Model(inputs=[frame_rgb_sequence_input, frame_audio_sequence_input, video_rgb_input, video_audio_input], outputs=preds)

    print(model.summary())
    
    model.compile(loss='categorical_crossentropy', optimizer='nadam', metrics=['acc'])

    return model

In [3]:
def extract_video_files(video_files_path):    
    '''
    Extraction of Youtube tfrecords video file features.
    
    Args: path to video files (note: developed with assumption of storing on s3 bucket and assessing with glob)
    
    Assumes each video in the tfrecord has following features:
    'id' : bytes_list
    'labels' : int64_list
    'mean_rgb': float_list
    'mean_audio': float_list
    
    returns:
    numpy arrays of video ids, video multi-labels, mean rgb and mean audio
    '''
    
    vid_ids = []
    labels = []
    mean_rgb = []
    mean_audio = []

    for file in tqdm(glob(video_files_path)):
        for example in tf.python_io.tf_record_iterator(file):
            tf_example = tf.train.Example.FromString(example)

            vid_ids.append(tf_example.features.feature['id'].bytes_list.value[0].decode(encoding='UTF-8'))
            labels.append(tf_example.features.feature['labels'].int64_list.value)
            mean_rgb.append(tf_example.features.feature['mean_rgb'].float_list.value)
            mean_audio.append(tf_example.features.feature['mean_audio'].float_list.value)
            
    assert len(vid_ids) == len(labels),"The number of IDs does not match the number of labeled videos."
    return vid_ids, labels, mean_rgb, mean_audio


In [4]:
def extract_frame_level_features_per_tf_record(frame_file_path,maximum_iter = False,stop_at_iter = 10):
    '''
    Extraction of Youtube tfrecords frame file features.
    
    Args: 
    path to each tf_record (note: developed with assumption of storing on s3 bucket and assessing with glob)
    
    maximum_iter - flag- if True, will limit number of videos extracted from each TF record
    stop_at_iter - number of videos to extract
    num_tf_records - number of records to extract - WARNING!!! this is VERY slow, if bigger than 1
    
    Assumes each video in the tfrecord has following features:
    'id' : bytes_list
    'labels' : int64_list
    'audio': float arr, each frame 128
    'rgb', float arr, each frame 1024
    
    returns:
    numpy arrays of frame ids, frame multi-labels, frame audio, frame rgb
    '''
    frame_ids = []
    frame_labels = []
    feat_rgb = []
    feat_audio = []
    # ATTENTION: only use one TF record for debugging.
    print(f'There is {sum(1 for _ in tf.python_io.tf_record_iterator(frame_file_path))} videos in this TF record.')
    iter_ = 0
    for example in tf.python_io.tf_record_iterator(frame_file_path):
        if maximum_iter and iter_==stop_at_iter:
            break
        tf_example = tf.train.Example.FromString(example)

        frame_ids.append(tf_example.features.feature['id'].bytes_list.value[0].decode(encoding='UTF-8'))
        frame_labels.append(tf_example.features.feature['labels'].int64_list.value)

        tf_seq_example = tf.train.SequenceExample.FromString(example)
        n_frames = len(tf_seq_example.feature_lists.feature_list['audio'].feature)

        rgb_frame = []
        audio_frame = []

        # iterate through frames
        sys.stdout.flush()
        for i in range(n_frames):
            sess = tf.InteractiveSession()
            sys.stdout.write('\r'+'iterating video: ' + str(iter_)+ ' ,frames: ' + str(i)+'/'+str(n_frames))
            sys.stdout.flush()
            rgb_frame.append(tf.cast(tf.decode_raw(
                    tf_seq_example.feature_lists.feature_list['rgb'].feature[i].bytes_list.value[0],tf.uint8)
                           ,tf.float32).eval())
            audio_frame.append(tf.cast(tf.decode_raw(
                    tf_seq_example.feature_lists.feature_list['audio'].feature[i].bytes_list.value[0],tf.uint8)
                           ,tf.float32).eval())

            tf.reset_default_graph()
            sess.close()
        feat_rgb.append(rgb_frame)
        feat_audio.append(audio_frame)
        iter_+=1

    return frame_ids, frame_labels, feat_rgb, feat_audio

In [5]:
def create_train_dev_dataset(video_rgb, video_audio, vid_ids, frame_rgb, frame_audio, frame_labels, frame_ids):
    """
    Method to created training and validation data. 
    We need to make sure we only use video IDs for which we have frames.
    This is handled below.
    
    """
    # we have to have the same video of for both video and frame-level features
    video_rgb_matching = []
    video_audio_matching = []
    
    for idx in frame_ids: # for each ID available on frame level, find matching video-level features
        for i, idx_vid in enumerate(vid_ids): # scan through video-level ids
            if idx == idx_vid: 
                video_rgb_matching.append(video_rgb[i])
                video_audio_matching.append(video_audio[i])
                
                
    shuffle_indices = np.random.permutation(np.arange(len(frame_labels)))
        
    video_rgb_shuffled = np.array(video_rgb_matching)[shuffle_indices]
    video_audio_shuffled = np.array(video_audio_matching)[shuffle_indices]
    frame_rgb_shuffled = np.array(frame_rgb)[shuffle_indices]
    frame_audio_shuffled = np.array(frame_audio)[shuffle_indices]
    labels_shuffled = np.array(frame_labels)[shuffle_indices]

    dev_idx = max(1, int(len(labels_shuffled) * validation_split_ratio))
    
    # delete orig vars to clear some cache
    del video_rgb
    del video_audio
    del frame_rgb
    del frame_audio
    gc.collect()
    
    train_video_rgb, val_video_rgb = video_rgb_shuffled[:-dev_idx], video_rgb_shuffled[-dev_idx:]
    train_video_audio, val_video_audio = video_audio_shuffled[:-dev_idx], video_audio_shuffled[-dev_idx:]
    
    train_frame_rgb, val_frame_rgb = frame_rgb_shuffled[:-dev_idx], frame_rgb_shuffled[-dev_idx:]
    train_frame_audio, val_frame_audio = frame_audio_shuffled[:-dev_idx], frame_audio_shuffled[-dev_idx:]
    
    train_labels, val_labels = labels_shuffled[:-dev_idx], labels_shuffled[-dev_idx:]
    
    del video_rgb_shuffled, video_audio_shuffled, frame_rgb_shuffled, frame_audio_shuffled, labels_shuffled
    gc.collect()
    
    return (train_video_rgb, train_video_audio, train_frame_rgb, train_frame_audio, train_labels, val_video_rgb, val_video_audio, 
            val_frame_rgb, val_frame_audio, val_labels)

In [6]:
# transform into final input in the model
def one_hot_y(raw_labels,label_size=20):
    '''
    Helper function to transform labels into one-hot TOP 20
    Uses np.unique(return_counts=True) as implicit sorter (first K labels are the most frequent)
    '''
    all_labels = []
    for i in list(raw_labels):
        for j in list(i):
            all_labels.append(j)

    results = np.unique(all_labels,return_counts=True)
    labels_vocab,counts = results

    labels = labels_vocab[:label_size-1] #last columns will be 1 if none of those labels found in a video
    output = []
    for set_of_labels in raw_labels:
        
        # preallocate numpy arr for each set of labels
        sequence = np.zeros(label_size)
        # loop through all the labels in one video and flip them to 1s
        for this_label in set_of_labels:
            designation = np.where(labels==this_label)
            for des in designation:
                sequence[des]=1
        # done with one training points
        if sequence.sum()==0:
            sequence[-1]=1
        output.append(sequence)
    return output


In [7]:
def transform_data_for_lstm(video_rgb,video_audio, frame_rgb, frame_audio,
                            labels,label_feature_size=10,max_frame_rgb_sequence_length = 10,\
                            max_frame_audio_sequence_length = 10):
    frames = []
    # need to transfrom to numpy (num_videos x max_frame_rgb_sequence_length x 1024)
    #print(len(frame_rgb))
    
    for frame in frame_rgb: 
        # stack the frames in each video, only allowed number of first frams
        #print(np.vstack(frame).shape)
        frames.append(np.vstack(frame)[:max_frame_rgb_sequence_length,:])
    #print(len(frames))

    frames = np.reshape(np.array(frames),(len(frame_rgb),max_frame_rgb_sequence_length,1024))

    #print(frames.shape)
    
    frames_audio = []
    # need to transfrom to numpy (num_videos x max_frame_audio_sequence_length x 128)
    for frame in frame_audio:
        # stack the frames in each video, only allowed number of first frams
        #print(np.vstack(frame).shape)
        frames_audio.append(np.vstack(frame)[:max_frame_audio_sequence_length,:])

    frames_audio = np.reshape(np.array(frames_audio),(len(frame_audio),max_frame_audio_sequence_length,128))
    #print(frames_audio.shape)
    
    # deal with videos
    
    video_rgb = np.vstack(video_rgb)
    video_audio = np.vstack(video_audio)
    
    
    # labels - need to one-hot encode TOP - K label
    labels = one_hot_y(labels,label_feature_size)
    labels = np.vstack(labels)
    return frames,frames_audio, video_rgb,video_audio, labels

# run on SPARK

In [8]:
def create_pandas(frame_rgb, frame_audio, video_rgb, video_audio, labels):
    '''
    Spark unfortunately does not work with numpy arrays - so we need to convert to traditional python types.
    '''
    return pd.DataFrame.from_dict({'frame_rgb':[[[float(k) for k in j] for j in i] for i in frame_rgb],\
                             'frame_audio':[[[float(k) for k in j] for j in i] for i in frame_audio],\
                             'mean_rgb':[[float(j) for j in i] for i in list(video_rgb)],\
                            'mean_audio':[[float(j) for j in i] for i in list(video_audio)],\
                            'labels':[[float(j) for j in i] for i in list(labels)]})

In [9]:
conf = SparkConf().setAppName('Youtube-8M') \
                  .set("spark.jars",
                       "ecosystem/spark/spark-tensorflow-connector/target/spark-tensorflow-connector_2.11-1.10.0.jar")
sc = SparkContext(conf = conf)
spark = pyspark.sql.SparkSession(sc)

In [10]:
# We just load all of the video data in memory since it is fairly small and manageable.
train_vid_ids, train_labels, train_mean_rgb, train_mean_audio \
    = extract_video_files("mys3bucket/yt8pm_100th_shard/v2/video/train*")

  0%|          | 0/41 [00:00<?, ?it/s]

Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`


100%|██████████| 41/41 [00:04<00:00,  9.37it/s]


In [8]:
# the fun starts here, pull frame data per tf-record
train_frame_shards = glob('mys3bucket/yt8pm_100th_shard/v2/frame/train*')
val_frame_shards = glob('mys3bucket/yt8pm_100th_shard/v2/frame/validate*')

In [12]:
train_frame_shards += val_frame_shards

In [9]:
len(train_frame_shards)

41

In [21]:
# iterate tf records one by one and append to spark dataframe

FIRST_RECORD = True # flag whether to append to Spark SQL or create a new one
NUM_RECORDS_TO_LOAD = 2

for tf_record in tqdm(train_frame_shards[:NUM_RECORDS_TO_LOAD]):
    # pull frames in memory
    train_frame_ids, train_frame_labels,train_frame_rgb,train_frame_audio \
        = extract_frame_level_features_per_tf_record(tf_record,maximum_iter=True,\
                               stop_at_iter=10) # just pull 10 videos from each tf record for debugging
    # first transformation
    train_video_rgb, train_video_audio, train_frame_rgb, train_frame_audio, \
    train_labels, val_video_rgb, val_video_audio, val_frame_rgb, val_frame_audio, val_labels \
                = create_train_dev_dataset(train_mean_rgb, train_mean_audio, train_vid_ids, train_frame_rgb, \
                                            train_frame_audio, train_frame_labels, train_frame_ids )    
    
    # final transformation for LSTM
    train_frame_rgb, train_frame_audio, train_video_rgb, train_video_audio, train_labels = \
            transform_data_for_lstm(train_video_rgb, train_video_audio, train_frame_rgb, train_frame_audio,train_labels)
    
    val_frame_rgb, val_frame_audio, val_video_rgb, val_video_audio, val_labels = \
            transform_data_for_lstm( val_video_rgb, val_video_audio,val_frame_rgb, val_frame_audio, val_labels)
    
    #### BELOW WE ONLY USE THE TRAINING DATA AND NO VALIDATION DATA FOR SIMPLICITY
    df = create_pandas(train_frame_rgb, train_frame_audio, \
                   train_video_rgb, train_video_audio, train_labels)
    # create spark data frame
    if FIRST_RECORD:
        df_spark = spark.createDataFrame(df)
        FIRST_RECORD = False
    else:
        df_spark_new = spark.createDataFrame(df)
        df_spark = df_spark.union(df_spark_new)
        
    

  0%|          | 0/2 [00:00<?, ?it/s]

There is 1015 videos in this TF record.
iterating video: 9 ,frames: 122/123

 50%|█████     | 1/2 [00:34<00:34, 34.14s/it]

There is 1029 videos in this TF record.
iterating video: 9 ,frames: 127/128

100%|██████████| 2/2 [00:59<00:00, 31.56s/it]


In [27]:

def convert_data(train_frame_shards):
    
    NUM_RECORDS_TO_LOAD = 1
    
    for tf_record in tqdm(train_frame_shards[:NUM_RECORDS_TO_LOAD]):
        # pull frames in memory
        train_frame_ids, train_frame_labels,train_frame_rgb,train_frame_audio \
            = extract_frame_level_features_per_tf_record(tf_record,maximum_iter=True,\
                                   stop_at_iter=5) # just pull 10 videos from each tf record for debugging
        # first transformation
        train_video_rgb, train_video_audio, train_frame_rgb, train_frame_audio, \
        train_labels, val_video_rgb, val_video_audio, val_frame_rgb, val_frame_audio, val_labels \
                    = create_train_dev_dataset(train_mean_rgb, train_mean_audio, train_vid_ids, train_frame_rgb, \
                                                train_frame_audio, train_frame_labels, train_frame_ids )    

        # final transformation for LSTM
        train_frame_rgb, train_frame_audio, train_video_rgb, train_video_audio, train_labels = \
                transform_data_for_lstm(train_video_rgb, train_video_audio, train_frame_rgb, train_frame_audio,train_labels)

        val_frame_rgb, val_frame_audio, val_video_rgb, val_video_audio, val_labels = \
                transform_data_for_lstm( val_video_rgb, val_video_audio,val_frame_rgb, val_frame_audio, val_labels)

        #### BELOW WE ONLY USE THE TRAINING DATA AND NO VALIDATION DATA FOR SIMPLICITY
        df = create_pandas(train_frame_rgb, train_frame_audio, \
                       train_video_rgb, train_video_audio, train_labels)
        # create spark data frame

        df_spark = spark.createDataFrame(df)        
    
        path = f"{str(tf_record.split('/')[-1])}-converted.tfrecord"
        df_spark.write.format("tfrecords").option("recordType", "SequenceExample").save(path)
        del df_spark
        gc.collect()
        print(path)
        os.system(f'sudo mv {path} mys3bucket/converted_records_for_spark/')
        

In [22]:
os.system('sudo mv train0093.tfrecord-converted.tfrecord mys3bucket/converted_records_for_spark/')

0

In [28]:
convert_data(train_frame_shards)


  0%|          | 0/1 [00:00<?, ?it/s][A

There is 1015 videos in this TF record.
iterating video: 4 ,frames: 299/300train0093.tfrecord-converted.tfrecord



100%|██████████| 1/1 [00:18<00:00, 18.15s/it][A

In [19]:
!ls mys3bucket/converted_records_for_spark/

In [None]:
df = spark.read.format("tfrecords").option("recordType", "SequenceExample").load('mys3bucket/converted_records_for_spark/train0093.tfrecord-converted.tfrecord/')
df.show()

In [79]:
# divide list of tasks into list of lists for each core
import multiprocessing as mp
def chunks(l, n):
    """Yield successive n-sized chunks from l."""
    for i in range(0, len(l), n):
        yield l[i:i + n]

def chunkify_tasks(num_cores,tasks):
    """
    Split a list of tasks to chunks to enable multiprocessing.
    """
    num_tasks = len(tasks)
    step = np.int(np.ceil(num_tasks / num_cores))
    return [i for i in chunks(tasks,step)]

In [80]:
!cat /proc/cpuinfo | grep processor | wc -l

4


In [97]:
chunkify_tasks(4,train_frame_shards)[0]

['mys3bucket/yt8pm_100th_shard/v2/frame/train0093.tfrecord',
 'mys3bucket/yt8pm_100th_shard/v2/frame/train0111.tfrecord',
 'mys3bucket/yt8pm_100th_shard/v2/frame/train0208.tfrecord',
 'mys3bucket/yt8pm_100th_shard/v2/frame/train0274.tfrecord',
 'mys3bucket/yt8pm_100th_shard/v2/frame/train0276.tfrecord',
 'mys3bucket/yt8pm_100th_shard/v2/frame/train0352.tfrecord',
 'mys3bucket/yt8pm_100th_shard/v2/frame/train0434.tfrecord',
 'mys3bucket/yt8pm_100th_shard/v2/frame/train0477.tfrecord',
 'mys3bucket/yt8pm_100th_shard/v2/frame/train0503.tfrecord',
 'mys3bucket/yt8pm_100th_shard/v2/frame/train0580.tfrecord',
 'mys3bucket/yt8pm_100th_shard/v2/frame/train0637.tfrecord',
 'mys3bucket/yt8pm_100th_shard/v2/frame/train0667.tfrecord',
 'mys3bucket/yt8pm_100th_shard/v2/frame/train0830.tfrecord',
 'mys3bucket/yt8pm_100th_shard/v2/frame/train0979.tfrecord',
 'mys3bucket/yt8pm_100th_shard/v2/frame/train1087.tfrecord',
 'mys3bucket/yt8pm_100th_shard/v2/frame/train1110.tfrecord',
 'mys3bucket/yt8pm_100th

In [108]:
def prepare_multiprocess_conversion(num_cores = 4, files=None):
                                
    processes = []
    chunk_files = chunkify_tasks(num_cores,files)

    for i in range(num_cores): #create processes operating select image chunks (cut the list of images)
        proc = mp.Process(target=convert_data, args=([chunk_files[i]]))
                                                                             
                                                                           
        processes.append(proc)
    return processes

In [109]:
processes = prepare_multiprocess_conversion(1,files=train_frame_shards)

In [110]:
def run_multiprocessing(processes):
    
    verbose = True
    if verbose:
        start = timeit.default_timer()
    for p in processes:
        p.start()
    print('processing started...')

    for p in processes:
        p.join()
    if verbose:
        end = timeit.default_timer()
        print("The running time is ", end - start)

In [111]:
run_multiprocessing(processes)


  0%|          | 0/79 [00:00<?, ?it/s][A

processing started...
There is 1015 videos in this TF record.
iterating video: 0 ,frames: 0/287

KeyboardInterrupt: 

In [54]:
convert_data()


  0%|          | 0/2 [00:00<?, ?it/s][A

There is 1015 videos in this TF record.
iterating video: 9 ,frames: 122/123


 50%|█████     | 1/2 [00:26<00:26, 26.77s/it][A

There is 1029 videos in this TF record.
iterating video: 9 ,frames: 127/128


100%|██████████| 2/2 [00:50<00:00, 25.96s/it][A

In [55]:
df = spark.read.format("tfrecords").option("recordType", "SequenceExample").load(path)
df.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+
|          mean_audio|              labels|            mean_rgb|           frame_rgb|         frame_audio|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|[-1.1013856, -0.2...|[0.0, 0.0, 0.0, 0...|[0.30566043, 0.73...|[WrappedArray(97....|[WrappedArray(103...|
|[0.9377164, -0.44...|[0.0, 0.0, 0.0, 1...|[0.2763526, -0.85...|[WrappedArray(0.0...|[WrappedArray(62....|
|[-1.4782097, -0.7...|[1.0, 1.0, 0.0, 0...|[0.6331078, -0.29...|[WrappedArray(157...|[WrappedArray(66....|
|[1.0440451, 0.883...|[0.0, 0.0, 1.0, 0...|[-0.73581076, -0....|[WrappedArray(0.0...|[WrappedArray(113...|
|[-0.8527888, 0.96...|[0.0, 0.0, 0.0, 0...|[0.19254452, 1.00...|[WrappedArray(33....|[WrappedArray(173...|
|[-1.070533, -0.29...|[1.0, 0.0, 0.0, 0...|[-1.100188, -0.95...|[WrappedArray(53....|[WrappedArray(101...|
|[-0.32618448, -1....|[0.0, 0.0, 0.0,

In [56]:
sample = df.take(1)

In [63]:
len(sample[-1])

10

In [None]:
# multiprocessing



In [None]:
## DYLAN - YOUR JOB IS TO FIGURE OUT HOW TO RUN THE ELEPHAS TRAINING NOW

In [22]:
df_spark.count()

16

In [23]:
df_spark.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+
|         frame_audio|           frame_rgb|              labels|          mean_audio|            mean_rgb|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|[WrappedArray(173...|[WrappedArray(170...|[0.0, 1.0, 0.0, 0...|[1.12153804302215...|[-0.0249195266515...|
|[WrappedArray(155...|[WrappedArray(105...|[1.0, 0.0, 0.0, 0...|[-0.9770614504814...|[-0.5987871885299...|
|[WrappedArray(173...|[WrappedArray(0.0...|[0.0, 1.0, 1.0, 0...|[-0.2137299776077...|[-0.3027234375476...|
|[WrappedArray(196...|[WrappedArray(0.0...|[0.0, 0.0, 0.0, 0...|[0.53537881374359...|[-0.3724643290042...|
|[WrappedArray(173...|[WrappedArray(33....|[0.0, 0.0, 0.0, 0...|[-0.8527888059616...|[0.19254451990127...|
|[WrappedArray(101...|[WrappedArray(53....|[1.0, 0.0, 0.0, 0...|[-1.0705330371856...|[-1.1001880168914...|
|[WrappedArray(103...|[WrappedArray(9

In [46]:
path = "test-output-ALL1.tfrecord"
df_spark.write.format("tfrecords").option("recordType", "SequenceExample").save(path)

In [47]:
df = spark.read.format("tfrecords").option("recordType", "SequenceExample").load(path)
df.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+
|          mean_audio|              labels|            mean_rgb|           frame_rgb|         frame_audio|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|[-1.1013856, -0.2...|[0.0, 0.0, 0.0, 0...|[0.30566043, 0.73...|[WrappedArray(97....|[WrappedArray(103...|
|[0.9377164, -0.44...|[0.0, 0.0, 0.0, 1...|[0.2763526, -0.85...|[WrappedArray(0.0...|[WrappedArray(62....|
|[-1.4782097, -0.7...|[1.0, 1.0, 0.0, 0...|[0.6331078, -0.29...|[WrappedArray(157...|[WrappedArray(66....|
|[1.0440451, 0.883...|[0.0, 0.0, 1.0, 0...|[-0.73581076, -0....|[WrappedArray(0.0...|[WrappedArray(113...|
|[-0.8527888, 0.96...|[0.0, 0.0, 0.0, 0...|[0.19254452, 1.00...|[WrappedArray(33....|[WrappedArray(173...|
|[-1.070533, -0.29...|[1.0, 0.0, 0.0, 0...|[-1.100188, -0.95...|[WrappedArray(53....|[WrappedArray(101...|
|[-0.32618448, -1....|[0.0, 0.0, 0.0,

In [42]:
with open('sample.json', 'w') as f:
    json.dump(json_df,f)

TypeError: Object of type 'RDD' is not JSON serializable

In [24]:
train_rdd = df_spark.rdd

In [25]:
# change the order of vars to fit the order in the model!
# train_frame_rgb, train_frame_audio, train_video_rgb, train_video_audio], train_labels
train_rdd = train_rdd.map(lambda x: (np.array(x[1]), np.array(x[0]),np.array(x[4]),np.array(x[3]),np.array(x[2])))

In [31]:
!ls

anaconda2
anaconda3
apache-maven-3.6.1-bin.tar.gz
checkpoints
Classification_Tommy_2.ipynb
create_and_train_biLSTM_Youtube_old.py
create_and_train_biLSTM_Youtube.py
derby.log
ecosystem
examples
metastore_db
mys3bucket
Nvidia_Cloud_EULA.pdf
Playground.ipynb
__pycache__
README
s3fs-fuse
spark-warehouse
src
Tensorflow-spark-connector.ipynb
test-output.tfrecord
test_save.txt
tools
train_youtube_elephas.py
tutorials
Youtube Classification - Frame - level.ipynb
Youtube Classification - Frame - level - PMY.ipynb
Youtube Classification - Frame - OLD.ipynb
Youtube_video_bi-LSTM_classification.ipynb
Youtube_video_bi-LSTM_classification-Spark.ipynb
Youtube_video_bi-LSTM_classification-SPARK_NEW.ipynb


In [37]:
!which hadoop

In [36]:
train_rdd.persist()

[(array([[170., 132.,  66., ..., 137., 135.,  40.],
         [ 93., 153.,  75., ..., 179., 243.,  73.],
         [118., 111.,  55., ..., 215., 160.,  59.],
         ...,
         [120., 106.,  73., ..., 140.,  76.,  40.],
         [122., 173.,  73., ..., 255.,  72., 101.],
         [137., 118.,  92., ..., 191.,  72., 158.]]),
  array([[173.,  27., 126., ..., 133., 130., 187.],
         [196., 101.,  42., ..., 177., 165., 255.],
         [202.,  74.,  71., ..., 154., 151., 192.],
         ...,
         [161.,  79.,  95., ...,  50.,  16., 255.],
         [130., 148.,  81., ..., 191., 152.,  52.],
         [184.,  94.,  66., ..., 102., 193., 124.]]),
  array([-0.02491953,  0.51688439, -0.46078882, ..., -0.28112867,
          0.00509355, -0.0869326 ]),
  array([ 1.12153804, -0.466645  , -0.55684108,  0.13377328,  0.21466218,
          0.24749878,  0.01759028, -0.72384763,  0.92802167,  0.18893668,
         -0.32677573, -0.45106331, -0.73749471,  0.24681903, -0.04980842,
          0.1322046

In [32]:
file = sc.textFile('test_save.txt')

In [34]:
rdd_sample = file.flatMap(lambda line: line.split())

In [35]:
rdd_sample.collect()

['(array([[',
 '97.,',
 '173.,',
 '117.,',
 '...,',
 '212.,',
 '137.,',
 '62.],',
 '[',
 '92.,',
 '173.,',
 '126.,',
 '...,',
 '227.,',
 '100.,',
 '98.],',
 '[',
 '87.,',
 '174.,',
 '120.,',
 '...,',
 '190.,',
 '98.,',
 '35.],',
 '...,',
 '[111.,',
 '173.,',
 '126.,',
 '...,',
 '166.,',
 '87.,',
 '121.],',
 '[',
 '92.,',
 '173.,',
 '124.,',
 '...,',
 '190.,',
 '28.,',
 '39.],',
 '[106.,',
 '183.,',
 '131.,',
 '...,',
 '218.,',
 '100.,',
 '78.]]),',
 'array([[103.,',
 '60.,',
 '216.,',
 '...,',
 '99.,',
 '177.,',
 '138.],',
 '[',
 '66.,',
 '82.,',
 '249.,',
 '...,',
 '44.,',
 '170.,',
 '0.],',
 '[',
 '46.,',
 '134.,',
 '233.,',
 '...,',
 '115.,',
 '0.,',
 '255.],',
 '...,',
 '[',
 '58.,',
 '130.,',
 '221.,',
 '...,',
 '255.,',
 '161.,',
 '255.],',
 '[',
 '78.,',
 '96.,',
 '240.,',
 '...,',
 '208.,',
 '8.,',
 '255.],',
 '[',
 '46.,',
 '117.,',
 '217.,',
 '...,',
 '200.,',
 '255.,',
 '187.]]),',
 'array([',
 '0.30566043,',
 '0.73250562,',
 '0.11245143,',
 '...,',
 '0.30170697,',
 '-0.5392

In [21]:
keras_model= create_model()

Instructions for updating:
Colocations handled automatically by placer.
Instructions for updating:
Please use `rate` instead of `keep_prob`. Rate should be set to `rate = 1 - keep_prob`.
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            (None, 10, 1024)     0                                            
__________________________________________________________________________________________________
input_2 (InputLayer)            (None, 10, 128)      0                                            
__________________________________________________________________________________________________
bidirectional_1 (Bidirectional) (None, 200)          900000      input_1[0][0]                    
__________________________________________________________________________________________________
bidirectional_2 (Bidi

In [22]:
spark_model = SparkModel(keras_model, frequency='batch', mode='synchronous')

In [23]:
# HOW TO MAKE SURE THIS WORKS???
history = spark_model.fit(train_rdd, epochs=10, batch_size=2, verbose=0)

>>> Fit model


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 5.0 failed 1 times, most recent failure: Lost task 3.0 in stage 5.0 (TID 20, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/ubuntu/anaconda3/envs/tensorflow_p36/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
    process()
  File "/home/ubuntu/anaconda3/envs/tensorflow_p36/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/ubuntu/anaconda3/envs/tensorflow_p36/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/ubuntu/anaconda3/envs/tensorflow_p36/lib/python3.6/site-packages/elephas/worker.py", line 36, in train
    x_train = np.asarray([x for x, y in feature_iterator])
  File "/home/ubuntu/anaconda3/envs/tensorflow_p36/lib/python3.6/site-packages/elephas/worker.py", line 36, in <listcomp>
    x_train = np.asarray([x for x, y in feature_iterator])
ValueError: too many values to unpack (expected 2)

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:467)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/ubuntu/anaconda3/envs/tensorflow_p36/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
    process()
  File "/home/ubuntu/anaconda3/envs/tensorflow_p36/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/ubuntu/anaconda3/envs/tensorflow_p36/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/ubuntu/anaconda3/envs/tensorflow_p36/lib/python3.6/site-packages/elephas/worker.py", line 36, in train
    x_train = np.asarray([x for x, y in feature_iterator])
  File "/home/ubuntu/anaconda3/envs/tensorflow_p36/lib/python3.6/site-packages/elephas/worker.py", line 36, in <listcomp>
    x_train = np.asarray([x for x, y in feature_iterator])
ValueError: too many values to unpack (expected 2)

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more
