In [6]:
from numpy import mean
from numpy import std
import numpy as np
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import Flatten
from keras.layers import Dropout
from keras.layers import LSTM
from keras.layers import TimeDistributed
from keras.layers import Conv2D
from keras.layers import MaxPooling2D
from keras.utils import to_categorical
from matplotlib import pyplot
import os

In [None]:
%pip install --upgrade wandb
import wandb
from wandb.keras import WandbCallback
wandb.init(project="cnn-lstm")
wandb.login()

The Implementation below was done on accelerometer data similar to smartphone data <br>
<a href="https://github.com/guillaume-chevalier/LSTM-Human-Activity-Recognition"> The original GitHub post </a> 

https://machinelearningmastery.com/how-to-develop-rnn-models-for-human-activity-recognition-time-series-classification/

In [3]:
from keras.datasets import mnist

In [4]:
def reshape_group(data, labels, seq_length = 8):
	for i in np.unique(labels):
		index = np.where(labels == i)[0]
		labeled_data = data[index]
		rows = (labeled_data.shape[0]//seq_length) * seq_length
		labeled_data = labeled_data[:rows,:]
		labeled_data = labeled_data.reshape((-1,seq_length,labeled_data.shape[1],labeled_data.shape[2],1))
		NewLabels = np.full((labeled_data.shape[0],),fill_value = i)
		try:
			ReshapedData = np.concatenate((ReshapedData,labeled_data),axis=0)
			ReshapedLabel = np.concatenate((ReshapedLabel, NewLabels),axis=0)
		except NameError:
			ReshapedData = labeled_data
			ReshapedLabel = NewLabels

	return (ReshapedData, to_categorical(ReshapedLabel))

In [7]:
# fit and evaluate a model
def evaluate_model(trainX, trainy, testX, testy):
	# define model
	verbose, epochs, batch_size = 0, 200, 64
	# reshape data into time steps of sub-sequences
	seq_length = 8
	trainX, trainy = reshape_group(trainX, trainy)
	testX, testy = reshape_group(testX, testy)
	width, height , n_outputs = trainX.shape[2], trainX.shape[3], trainy.shape[1]
	# define model
	model = Sequential()
	model.add(TimeDistributed(Conv2D(filters=64, kernel_size=3, activation='relu'), input_shape=(None,width,height,1)))
	model.add(TimeDistributed(Conv2D(filters=64, kernel_size=3, activation='relu')))
	model.add(TimeDistributed(Dropout(0.5)))
	model.add(TimeDistributed(MaxPooling2D(pool_size=2)))
	model.add(TimeDistributed(Conv2D(filters=64, kernel_size=3, activation='relu')))
	model.add(TimeDistributed(Conv2D(filters=64, kernel_size=3, activation='relu')))
	model.add(TimeDistributed(Dropout(0.5))) 
	model.add(TimeDistributed(MaxPooling2D(pool_size=2)))
	model.add(TimeDistributed(Flatten()))
	model.add(LSTM(100))
	model.add(Dropout(0.5))
	model.add(Dense(100, activation='relu'))
	model.add(Dense(n_outputs, activation='softmax'))
	model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
	# fit network
	model.fit(trainX, trainy, epochs=epochs, batch_size=batch_size, verbose=verbose, shuffle =True,callbacks=[WandbCallback()])
	model.save(os.path.join(wandb.run.dir, "model.h5"))
	# evaluate model
	_, accuracy = model.evaluate(testX, testy, batch_size=batch_size, verbose=0)
	return accuracy
 
# summarize scores
def summarize_results(scores):
	print(scores)
	m, s = mean(scores), std(scores)
	print('Accuracy: %.3f%% (+/-%.3f)' % (m, s))
 
# run an experiment
def run_experiment(repeats=1):
	# load data
	# trainX, trainy, testX, testy = load_dataset()
	(trainX,trainy),(testX,testy) = mnist.load_data()
	# repeat experiment
	scores = list()
	for r in range(repeats):
		score = evaluate_model(trainX, trainy, testX, testy)
		score = score * 100.0
		print('>#%d: %.3f' % (r+1, score))
		scores.append(score)
	# summarize results
	summarize_results(scores)
 
# run the experiment
run_experiment()

>#1: 45.703
[45.70281207561493]
Accuracy: 45.703% (+/-0.000)
