In [1]:
from math import sqrt
from numpy import concatenate
from matplotlib import pyplot
from pandas import read_csv
from pandas import DataFrame
from pandas import concat
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import mean_squared_error

In [2]:
def series_to_supervised(data, n_in=1, n_out=1, dropnan=True):
	n_vars = 1 if type(data) is list else data.shape[1]
	df = DataFrame(data)
	cols, names = list(), list()
	# input sequence (t-n, ... t-1)
	for i in range(n_in, 0, -1):
		cols.append(df.shift(i))
		names += [('var%d(t-%d)' % (j+1, i)) for j in range(n_vars)]
	# forecast sequence (t, t+1, ... t+n)
	for i in range(0, n_out):
		cols.append(df.shift(-i))
		if i == 0:
			names += [('var%d(t)' % (j+1)) for j in range(n_vars)]
		else:
			names += [('var%d(t+%d)' % (j+1, i)) for j in range(n_vars)]
	# put it all together
	agg = concat(cols, axis=1)
	agg.columns = names
	# drop rows with NaN values
	if dropnan:
		agg.dropna(inplace=True)
	return agg

In [3]:
# load dataset
dataset = read_csv('pollution.csv', header=0, index_col=0)
values = dataset.values
# integer encode direction
encoder = LabelEncoder()
values[:,4] = encoder.fit_transform(values[:,4])
# ensure all data is float
values = values.astype('float32')
# normalize features
scaler = MinMaxScaler(feature_range=(0, 1))
scaled = scaler.fit_transform(values)
# frame as supervised learning
reframed = series_to_supervised(scaled, 1, 1)
# drop columns we don't want to predict
reframed.drop(reframed.columns[[9,10,11,12,13,14,15]], axis=1, inplace=True)
print(reframed.head())

   var1(t-1)  var2(t-1)  var3(t-1)  var4(t-1)  var5(t-1)  var6(t-1)  \
1   0.129779   0.352941   0.245902   0.527273   0.666667   0.002290   
2   0.148893   0.367647   0.245902   0.527273   0.666667   0.003811   
3   0.159960   0.426471   0.229508   0.545454   0.666667   0.005332   
4   0.182093   0.485294   0.229508   0.563637   0.666667   0.008391   
5   0.138833   0.485294   0.229508   0.563637   0.666667   0.009912   

   var7(t-1)  var8(t-1)   var1(t)  
1   0.000000        0.0  0.148893  
2   0.000000        0.0  0.159960  
3   0.000000        0.0  0.182093  
4   0.037037        0.0  0.138833  
5   0.074074        0.0  0.109658  


In [4]:
# split into train and test sets
values = reframed.values
n_train_hours = 365 * 24
train = values[:n_train_hours, :]
test = values[n_train_hours:, :]
# split into input and outputs
train_X, train_y = train[:, :-1], train[:, -1]
test_X, test_y = test[:, :-1], test[:, -1]
# reshape input to be 3D [samples, timesteps, features]
train_X = train_X.reshape((train_X.shape[0], 1, train_X.shape[1]))
test_X = test_X.reshape((test_X.shape[0], 1, test_X.shape[1]))
print(train_X.shape, train_y.shape, test_X.shape, test_y.shape)

((8760, 1, 8), (8760,), (35039, 1, 8), (35039,))


In [5]:
from bigdl.util.common import *

Using /home/kai/anaconda2/lib/python2.7/site-packages/pyspark
Prepending /home/kai/anaconda2/lib/python2.7/site-packages/bigdl/share/conf/spark-bigdl.conf to sys.path


In [6]:
sc = SparkContext(master="local[4]", appName="LSTM", conf=create_spark_conf())

In [7]:
redire_spark_logs()
show_bigdl_info_logs()
init_engine()

In [9]:
print(type(train_X), type(train_y))

(<type 'numpy.ndarray'>, <type 'numpy.ndarray'>)


In [31]:
train_features = sc.parallelize(train_X)
train_labels = sc.parallelize(train_y)
train_records = train_features.zip(train_labels)
trainingData = train_records.map(lambda t: Sample.from_ndarray(t[0], t[1]))

In [47]:
from bigdl.nn.layer import *
model = Sequential()
rec = Recurrent()
rec.add(LSTM(8, 50))
model.add(rec)
model.add(TimeDistributed(Linear(50, 1)))

creating: createSequential
creating: createRecurrent
creating: createLSTM
creating: createLinear
creating: createTimeDistributed


<bigdl.nn.layer.Sequential at 0x7ff64c956410>

In [48]:
from bigdl.optim.optimizer import *
from bigdl.nn.criterion import *
optimizer = Optimizer(model=model, training_rdd=trainingData, 
                      criterion=AbsCriterion(), optim_method=Adam(), batch_size=72, end_trigger=MaxEpoch(50))

creating: createAbsCriterion
creating: createAdam
creating: createMaxEpoch
creating: createOptimizer


In [49]:
optimizer.optimize()

<bigdl.nn.layer.Layer at 0x7ff64c956810>

2017-10-13 17:44:53 INFO  DistriOptimizer$:332 - [Epoch 50 8784/8760][Iteration 6100][Wall Clock 52.650468395s] Trained 72 records in 0.00833688 seconds. Throughput is 8636.324 records/second. Loss is 0.013799905. 


In [62]:
test_features = sc.parallelize(test_X)
test_labels = sc.parallelize(test_y)
test_records = test_features.zip(test_labels)
testData = test_records.map(lambda t: Sample.from_ndarray(t[0], t[1]))
predict_class = model.predict(testData)

In [63]:
predict_labels = predict_class.take(1)