# Load Libraries

In [1]:
from numpy import mean
from numpy import std
from numpy import dstack
from pandas import read_csv
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import Flatten
from keras.layers import Dropout
from keras.layers import ConvLSTM2D
from keras.utils import to_categorical
from keras.models import model_from_json
import pandas as pd
from hdfs import InsecureClient

Using TensorFlow backend.
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])


# Connect to Hadoop

In [4]:
client_hdfs = InsecureClient('http://awscdh6-ma.sap.local:9870', user='dr.who')

In [5]:
client_hdfs.list('/tmp/tbr/BARMER/DSP')

['data_labeled_performance',
 'data_labeled_training',
 'data_unlabeled_predictions',
 'model']

# Load Model from Hadoop

Model Structure:

In [6]:
with client_hdfs.read('/tmp/tbr/BARMER/DSP/model/model_structure.json', encoding='utf-8') as reader:
    loaded_model_json = reader.read()
    reader.close()

model = model_from_json(loaded_model_json)

Instructions for updating:
Colocations handled automatically by placer.


In [7]:
loaded_model_json[:100]

'{"class_name": "Sequential", "config": {"name": "sequential_2", "layers": [{"class_name": "ConvLSTM2'

Model Weights:

In [8]:
path = "/tmp/tbr/BARMER/DSP/model/model_weights.h5"
download_path = client_hdfs.download(path, 'download', overwrite=True)
download_path

'C:\\Users\\tbraeutigam\\OneDrive\\Data Science\\Code\\ISR\\ISR - BARMER\\ISR DSP\\download'

In [9]:
model.load_weights(download_path)

Compile model:

In [10]:
# compile loaded model 
model.compile(loss='binary_crossentropy', optimizer='rmsprop', metrics=['accuracy'])

# Load Data (labeled) from Hadoop

In [11]:
# Check Trainings Data
client_hdfs.list('/tmp/tbr/BARMER/DSP/data_labeled_training/Inertial Signals')

['body_acc_x.txt',
 'body_acc_y.txt',
 'body_acc_z.txt',
 'body_gyro_x.txt',
 'body_gyro_y.txt',
 'body_gyro_z.txt',
 'total_acc_x.txt',
 'total_acc_y.txt',
 'total_acc_z.txt',
 'y_labels.txt']

In [12]:
# load the dataset, returns train and test X and y elements
def load_dataset(prefix):
    # load data and labels
    X, y = load_dataset_group(prefix)
    
    # zero-offset class values
    y = y - 1
    
    # one hot encode y
    y = to_categorical(y)
    
    # return dataset
    return X, y

A function for loading a dataset group of files

In [13]:
# load a dataset group, such as train or test
def load_dataset_group(group):
    
    # load all 9 files as a single array
    filenames = list()
    
    # total acceleration
    filenames += ['/Inertial Signals/total_acc_x.txt',
                  '/Inertial Signals/total_acc_y.txt',
                  '/Inertial Signals/total_acc_z.txt']
    
    # body acceleration
    filenames += ['/Inertial Signals/body_acc_x.txt',
                  '/Inertial Signals/body_acc_y.txt',
                  '/Inertial Signals/body_acc_z.txt']
    
    # body gyroscope
    filenames += ['/Inertial Signals/body_gyro_x.txt',
                  '/Inertial Signals/body_gyro_y.txt',
                  '/Inertial Signals/body_gyro_z.txt']
    
    # load input data
    X = load_group(filenames, group)
    
    # load class output
    y = load_file(group+'/Inertial Signals/y_labels.txt')
    
    # return X and y
    return X, y

A function for loading a group of files

In [14]:
# load a list of files and return as a 3d numpy array
def load_group(filenames, group):
    loaded = list()
    
    for name in filenames:

        data = load_file(group+name)
        loaded.append(data)
    
    # stack group so that features are the 3rd dimension
    loaded = dstack(loaded)
    return loaded

A function for loading a single file

In [15]:
# load a single file as a numpy array
def load_file(filepath):
    path = '/tmp/tbr/BARMER/DSP/' + filepath
    
    with client_hdfs.read(path, encoding = 'utf-8') as reader:
        dataframe = pd.read_csv(reader, header=None, delim_whitespace=True)   
        
    return dataframe.values

Execute Function-Chain

In [16]:
# load training data
trainX, trainy = load_dataset('data_labeled_training')

In [17]:
trainX[0]

array([[ 1.012817e+00, -1.232167e-01,  1.029341e-01, ...,  3.019122e-02,
         6.601362e-02,  2.285864e-02],
       [ 1.022833e+00, -1.268756e-01,  1.056872e-01, ...,  4.371071e-02,
         4.269897e-02,  1.031572e-02],
       [ 1.022028e+00, -1.240037e-01,  1.021025e-01, ...,  3.568780e-02,
         7.485018e-02,  1.324969e-02],
       ...,
       [ 1.018445e+00, -1.240696e-01,  1.003852e-01, ...,  3.985177e-02,
         1.909445e-03, -2.170124e-03],
       [ 1.019372e+00, -1.227451e-01,  9.987355e-02, ...,  3.744932e-02,
        -7.982483e-05, -5.642633e-03],
       [ 1.021171e+00, -1.213260e-01,  9.498741e-02, ...,  2.881781e-02,
        -3.771800e-05, -1.446006e-03]])

In [18]:
trainy[0]

array([0., 0., 0., 0., 1., 0.], dtype=float32)

# Data Preprocessing

# Update Model

In [21]:
# fit and evaluate a model
def update_model(trainX, trainy):
     
    # define parameters
    verbose = 1
    epochs = 25
    batch_size = 64
    time_steps = 4
    rows = 1
    columns = 32
    channels = 9 #number of features
    samples = trainX.shape[0]
   
    # reshape data into subsequences (samples, time steps, rows, cols, channels)
    trainX = trainX.reshape((samples, time_steps, rows, columns, channels))
        
    # fit network
    model.fit(trainX, trainy, epochs=epochs, batch_size=batch_size, verbose=verbose)    

    return model

In [20]:
model = update_model(trainX, trainy)

Instructions for updating:
Use tf.cast instead.


# Store Model on Hadoop

## Save model structure

Serialize as JSON

In [22]:
model_json = model.to_json()
model_json[:100]

'{"class_name": "Sequential", "config": {"name": "sequential_2", "layers": [{"class_name": "ConvLSTM2'

Write to Hadoop

In [23]:
path = "/tmp/tbr/BARMER/DSP/model/model_structure.json"

with client_hdfs.write(path, encoding = 'utf-8', overwrite=True) as writer:
    writer.write(model_json)

Check result

In [24]:
client_hdfs.list('/tmp/tbr/BARMER/DSP/model/')

['mlflow', 'model_performance.csv', 'model_structure.json', 'model_weights.h5']

## Save model weights

Serialize as local H5 file

In [25]:
# serialize weights to HDF5
model.save_weights("model_weights.h5")

Upload File to Hadoop

In [26]:
path = "/tmp/tbr/BARMER/DSP/model/model_weights.h5"
_ = client_hdfs.upload(hdfs_path=path, local_path="model_weights.h5", overwrite=True)

Check result

In [27]:
client_hdfs.list('/tmp/tbr/BARMER/DSP/model/')

['mlflow', 'model_performance.csv', 'model_structure.json', 'model_weights.h5']