<a href="https://colab.research.google.com/github/hkvision/bigdl-demo/blob/main/lstm_pollution_dllib.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# Install jdk8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
import os
# Set environment variable JAVA_HOME.
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
!update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java
!java -version

update-alternatives: using /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java to provide /usr/bin/java (java) in manual mode
openjdk version "1.8.0_312"
OpenJDK Runtime Environment (build 1.8.0_312-8u312-b07-0ubuntu1~18.04-b07)
OpenJDK 64-Bit Server VM (build 25.312-b07, mixed mode)


In [3]:
!pip install bigdl-dllib==2.0.0

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting bigdl-dllib==2.0.0
  Downloading bigdl_dllib-2.0.0-py3-none-manylinux1_x86_64.whl (101.0 MB)
[K     |████████████████████████████████| 101.0 MB 30 kB/s 
Collecting pyspark==2.4.6
  Downloading pyspark-2.4.6.tar.gz (218.4 MB)
[K     |████████████████████████████████| 218.4 MB 58 kB/s 
Collecting py4j==0.10.7
  Downloading py4j-0.10.7-py2.py3-none-any.whl (197 kB)
[K     |████████████████████████████████| 197 kB 54.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.6-py2.py3-none-any.whl size=218814407 sha256=a92bce117ed7e5d1247c538a0bf4fe2fea10843e00f0302b0b97e0eb56a12302
  Stored in directory: /root/.cache/pip/wheels/f1/42/b0/ba397759613f4feb1611021a2503e60e344e546671b2ae04f8
Successfully built pyspark
Installing collected packages: py4j, pyspark, bigdl-d

In [1]:
!wget https://raw.githubusercontent.com/jbrownlee/Datasets/master/pollution.csv

--2022-07-25 08:00:50--  https://raw.githubusercontent.com/jbrownlee/Datasets/master/pollution.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2010492 (1.9M) [text/plain]
Saving to: ‘pollution.csv’


2022-07-25 08:00:51 (31.5 MB/s) - ‘pollution.csv’ saved [2010492/2010492]



In [4]:
import numpy as np
from pandas import read_csv
from pandas import DataFrame
from pandas import concat
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import MinMaxScaler

In [5]:
# convert series to supervised learning
def series_to_supervised(data, n_in=1, n_out=1, dropnan=True):
    n_vars = 1 if type(data) is list else data.shape[1]
    df = DataFrame(data)
    cols, names = list(), list()
    # input sequence (t-n, ... t-1)
    for i in range(n_in, 0, -1):
        cols.append(df.shift(i))
        names += [('var%d(t-%d)' % (j + 1, i)) for j in range(n_vars)]
    # forecast sequence (t, t+1, ... t+n)
    for i in range(0, n_out):
        cols.append(df.shift(-i))
        if i == 0:
            names += [('var%d(t)' % (j + 1)) for j in range(n_vars)]
        else:
            names += [('var%d(t+%d)' % (j + 1, i)) for j in range(n_vars)]
    # put it all together
    agg = concat(cols, axis=1)
    agg.columns = names
    # drop rows with NaN values
    if dropnan:
        agg.dropna(inplace=True)
    return agg

In [7]:
# load dataset
dataset = read_csv('pollution.csv', header=1, index_col=0)
values = dataset.values

In [8]:
# integer encode direction
encoder = LabelEncoder()
values[:, -4] = encoder.fit_transform(values[:, -4])
# ensure all data is float
values = values.astype('float32')
# normalize features
scaler = MinMaxScaler(feature_range=(0, 1))
scaled = scaler.fit_transform(values)
# frame as supervised learning
reframed = series_to_supervised(scaled, 1, 1)
# drop columns we don't want to predict
reframed.drop(reframed.columns[[9, 10, 11, 12, 13, 14, 15]], axis=1, inplace=True)
print(reframed.head())

    var1(t-1)  var2(t-1)  var3(t-1)  var4(t-1)  var5(t-1)  var6(t-1)  \
24        0.0        0.0   0.033333   0.000000   0.129779   0.352941   
25        0.0        0.0   0.033333   0.043478   0.148893   0.367647   
26        0.0        0.0   0.033333   0.086957   0.159960   0.426471   
27        0.0        0.0   0.033333   0.130435   0.182093   0.485294   
28        0.0        0.0   0.033333   0.173913   0.138833   0.485294   

    var7(t-1)  var8(t-1)  var9(t-1)   var5(t)   var6(t)   var7(t)   var8(t)  \
24   0.245902   0.527273   0.666667  0.148893  0.367647  0.245902  0.527273   
25   0.245902   0.527273   0.666667  0.159960  0.426471  0.229508  0.545454   
26   0.229508   0.545454   0.666667  0.182093  0.485294  0.229508  0.563637   
27   0.229508   0.563637   0.666667  0.138833  0.485294  0.229508  0.563637   
28   0.229508   0.563637   0.666667  0.109658  0.485294  0.213115  0.563637   

     var9(t)  var10(t)  var11(t)  var12(t)  
24  0.666667  0.003811  0.000000       0.0  
25

In [9]:
# split into train and test sets
values = reframed.values
n_train_hours = 365 * 24
train = values[:n_train_hours, :]
test = values[n_train_hours:, :]
# split into input and outputs
train_X, train_y = train[:, :-1], train[:, -1]
test_X, test_y = test[:, :-1], test[:, -1]
# reshape input to be 3D [samples, timesteps, features]
train_X = train_X.reshape((train_X.shape[0], 1, train_X.shape[1]))
test_X = test_X.reshape((test_X.shape[0], 1, test_X.shape[1]))
print(train_X.shape, train_y.shape, test_X.shape, test_y.shape)

(8760, 1, 16) (8760,) (32783, 1, 16) (32783,)


In [10]:
# Change the imports to DLlib keras-like api
from bigdl.dllib.nncontext import init_nncontext
from bigdl.dllib.keras.models import Sequential
from bigdl.dllib.keras.layers import LSTM, Dense
from bigdl.dllib.nncontext import ZooContext

ZooContext.log_output = True

# initialize BigDL
sc = init_nncontext(cluster_mode="local")

Current pyspark location is : /usr/local/lib/python3.7/dist-packages/pyspark/__init__.py
Start to getOrCreate SparkContext
pyspark_submit_args is:  --driver-class-path /usr/local/lib/python3.7/dist-packages/bigdl/share/dllib/lib/bigdl-dllib-spark_2.4.6-2.0.0-jar-with-dependencies.jar pyspark-shell 
2022-07-25 08:18:42 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


2022-07-25 08:18:46,686 Thread-4 WARN The bufferSize is set to 4000 but bufferedIo is false: false
2022-07-25 08:18:46,690 Thread-4 WARN The bufferSize is set to 4000 but bufferedIo is false: false
2022-07-25 08:18:46,691 Thread-4 WARN The bufferSize is set to 4000 but bufferedIo is false: false
2022-07-25 08:18:46,692 Thread-4 WARN The bufferSize is set to 4000 but bufferedIo is false: false
22-07-25 08:18:46 [Thread-4] INFO  Engine$:121 - Auto detect executor number and executor cores number
22-07-25 08:18:46 [Thread-4] INFO  Engine$:123 - Executor number is 1 and executor cores number is 2
22-07-25 08:18:47 [Thread-4] INFO  ThreadPool$:95 - Set mkl threads to 1 on thread 14
2022-07-25 08:18:47 WARN  SparkContext:66 - Using an existing SparkContext; some configuration may not take effect.
22-07-25 08:18:47 [Thread-4] INFO  Engine$:446 - Find existing spark context. Checking the spark conf...
cls.getname: com.intel.analytics.bigdl.dllib.utils.python.api.Sample
BigDLBasePickler registe


User settings:

   KMP_AFFINITY=granularity=fine,compact,1,0
   KMP_BLOCKTIME=0
   KMP_DUPLICATE_LIB_OK=True
   KMP_INIT_AT_FORK=FALSE
   KMP_SETTINGS=1
   OMP_NUM_THREADS=1

Effective settings:

   KMP_ABORT_DELAY=0
   KMP_ADAPTIVE_LOCK_PROPS='1,1024'
   KMP_ALIGN_ALLOC=64
   KMP_ALL_THREADPRIVATE=128
   KMP_ATOMIC_MODE=2
   KMP_BLOCKTIME=0
   KMP_CPUINFO_FILE: value is not defined
   KMP_DETERMINISTIC_REDUCTION=false
   KMP_DEVICE_THREAD_LIMIT=2147483647
   KMP_DISP_HAND_THREAD=false
   KMP_DISP_NUM_BUFFERS=7
   KMP_DUPLICATE_LIB_OK=true
   KMP_FORCE_REDUCTION: value is not defined
   KMP_FOREIGN_THREADS_THREADPRIVATE=true
   KMP_FORKJOIN_BARRIER='2,2'
   KMP_FORKJOIN_BARRIER_PATTERN='hyper,hyper'
   KMP_FORKJOIN_FRAMES=true
   KMP_FORKJOIN_FRAMES_MODE=3
   KMP_GTID_MODE=3
   KMP_HANDLE_SIGNALS=false
   KMP_HOT_TEAMS_MAX_LEVEL=1
   KMP_HOT_TEAMS_MODE=0
   KMP_INIT_AT_FORK=true
   KMP_ITT_PREPARE_DELAY=0
   KMP_LIBRARY=throughput
   KMP_LOCK_KIND=queuing
   KMP_MALLOC_POOL_INCR=1M
  

In [11]:
# design network
model = Sequential()
model.add(LSTM(50, input_shape=(train_X.shape[1], train_X.shape[2])))
model.add(Dense(1))
model.compile(loss='mae', optimizer='adam')

# Arguments verbose and shuffle are removed since they are currently not supported in DLlib
# fit network
model.fit(train_X, train_y, nb_epoch=50, batch_size=72, validation_data=(test_X, test_y))


creating: createZooKerasSequential
creating: createZooKerasLSTM
creating: createZooKerasDense
creating: createAdam
creating: createZooKerasMeanAbsoluteError
2022-07-25 08:19:10,436 Thread-4 WARN The bufferSize is set to 4000 but bufferedIo is false: false
2022-07-25 08:19:10,438 Thread-4 WARN The bufferSize is set to 4000 but bufferedIo is false: false
2022-07-25 08:19:10,440 Thread-4 WARN The bufferSize is set to 4000 but bufferedIo is false: false
2022-07-25 08:19:10,442 Thread-4 WARN The bufferSize is set to 4000 but bufferedIo is false: false
22-07-25 08:19:11 [Thread-4] INFO  DistriOptimizer$:826 - caching training rdd ...
2022-07-25 08:19:11 WARN  TaskSetManager:66 - Stage 0 contains a task of very large size (591 KB). The maximum recommended task size is 100 KB.




2022-07-25 08:19:13 WARN  TaskSetManager:66 - Stage 2 contains a task of very large size (2340 KB). The maximum recommended task size is 100 KB.




22-07-25 08:19:15 [Thread-4] INFO  DistriOptimizer$:652 - Cache thread models...
22-07-25 08:19:15 [Executor task launch worker for task 6] INFO  ThreadPool$:95 - Set mkl threads to 1 on thread 50
22-07-25 08:19:15 [Executor task launch worker for task 6] INFO  ThreadPool$:95 - Set mkl threads to 1 on thread 50
22-07-25 08:19:15 [Executor task launch worker for task 6] INFO  ThreadPool$:95 - Set mkl threads to 1 on thread 50
22-07-25 08:19:15 [Executor task launch worker for task 6] INFO  DistriOptimizer$:635 - model thread pool size is 1
2022-07-25 08:19:15 WARN  BlockManager:66 - Asked to remove block test_0weights0, which does not exist
2022-07-25 08:19:15 WARN  BlockManager:66 - Asked to remove block test_0gradients0, which does not exist
22-07-25 08:19:15 [Thread-4] INFO  DistriOptimizer$:654 - Cache thread models... done
22-07-25 08:19:15 [Thread-4] INFO  DistriOptimizer$:164 - Count dataset
22-07-25 08:19:16 [Thread-4] INFO  DistriOptimizer$:168 - Count dataset complete. Time el



22-07-25 08:19:24 [Thread-4] INFO  DistriOptimizer$:178 - [Epoch 1 8784/8760][Iteration 122][Wall Clock 7.003563549s] validate model throughput is 37660.59 records/second
22-07-25 08:19:24 [Thread-4] INFO  DistriOptimizer$:181 - [Epoch 1 8784/8760][Iteration 122][Wall Clock 7.003563549s] Loss is (Loss: 413.62753, count: 32783, Average Loss: 0.0126171345)
22-07-25 08:19:24 [Thread-4] INFO  DistriOptimizer$:430 - [Epoch 2 72/8760][Iteration 123][Wall Clock 7.201282371s] Trained 72.0 records in 0.054649227 seconds. Throughput is 1317.4935 records/second. Loss is 0.0037083237. 
22-07-25 08:19:24 [Thread-4] INFO  DistriOptimizer$:430 - [Epoch 2 144/8760][Iteration 124][Wall Clock 7.23664348s] Trained 72.0 records in 0.035361109 seconds. Throughput is 2036.1354 records/second. Loss is 0.0059026135. 
22-07-25 08:19:24 [Thread-4] INFO  DistriOptimizer$:430 - [Epoch 2 216/8760][Iteration 125][Wall Clock 7.272435442s] Trained 72.0 records in 0.035791962 seconds. Throughput is 2011.6248 records/s

[Stage 2464:>                                                       (0 + 1) / 1]                                                                                

22-07-25 08:19:43 [Thread-4] INFO  DistriOptimizer$:178 - [Epoch 5 8784/8760][Iteration 610][Wall Clock 26.447140643s] validate model throughput is 59801.73 records/second
22-07-25 08:19:43 [Thread-4] INFO  DistriOptimizer$:181 - [Epoch 5 8784/8760][Iteration 610][Wall Clock 26.447140643s] Loss is (Loss: 405.07327, count: 32783, Average Loss: 0.012356199)
22-07-25 08:19:43 [Thread-4] INFO  DistriOptimizer$:430 - [Epoch 6 72/8760][Iteration 611][Wall Clock 27.123465547s] Trained 72.0 records in 0.033664283 seconds. Throughput is 2138.7654 records/second. Loss is 0.0071989177. 
22-07-25 08:19:43 [Thread-4] INFO  DistriOptimizer$:430 - [Epoch 6 144/8760][Iteration 612][Wall Clock 27.151658369s] Trained 72.0 records in 0.028192822 seconds. Throughput is 2553.8416 records/second. Loss is 0.0026137815. 
22-07-25 08:19:43 [Thread-4] INFO  DistriOptimizer$:430 - [Epoch 6 216/8760][Iteration 613][Wall Clock 27.179177712s] Trained 72.0 records in 0.027519343 seconds. Throughput is 2616.3416 reco

[Stage 3446:>                                                       (0 + 1) / 1]                                                                                

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
22-07-25 08:20:05 [Thread-4] INFO  DistriOptimizer$:430 - [Epoch 11 2952/8760][Iteration 1261][Wall Clock 49.106997158s] Trained 72.0 records in 0.024146726 seconds. Throughput is 2981.7708 records/second. Loss is 0.013547643. 
22-07-25 08:20:05 [Thread-4] INFO  DistriOptimizer$:430 - [Epoch 11 3024/8760][Iteration 1262][Wall Clock 49.130685188s] Trained 72.0 records in 0.02368803 seconds. Throughput is 3039.51 records/second. Loss is 0.0034094194. 
22-07-25 08:20:05 [Thread-4] INFO  DistriOptimizer$:430 - [Epoch 11 3096/8760][Iteration 1263][Wall Clock 49.156012132s] Trained 72.0 records in 0.025326944 seconds. Throughput is 2842.8223 records/second. Loss is 0.005785631. 
22-07-25 08:20:05 [Thread-4] INFO  DistriOptimizer$:430 - [Epoch 11 3168/8760][Iteration 1264][Wall Clock 49.177437933s] Trained 72.0 records in 0.021425801 seconds. Throughput is 3360.4346 records/second. Loss is 0.020662658. 
22-07-25 08:20:05 [Thread

In [12]:
from math import sqrt
from sklearn.metrics import mean_squared_error

# make a prediction
# will return an RDD and we collect them to form the result ndarray
yhat = model.predict(test_X).collect()
yhat = np.array([y[0] for y in yhat])
# calculate RMSE
rmse = sqrt(mean_squared_error(test_y, yhat))
print('Test RMSE: %.3f' % rmse)

2022-07-25 08:24:30 WARN  TaskSetManager:66 - Stage 24568 contains a task of very large size (2404 KB). The maximum recommended task size is 100 KB.
22-07-25 08:24:30 [Executor task launch worker for task 12263] INFO  ThreadPool$:95 - Set mkl threads to 1 on thread 372
22-07-25 08:24:30 [Executor task launch worker for task 12264] INFO  ThreadPool$:95 - Set mkl threads to 1 on thread 373


[Stage 24568:>                                                      (0 + 2) / 2]

Test RMSE: 0.036


                                                                                