# Libraries and Spark Context

In [None]:
import glob
import cv2
import numpy as np
from os import listdir
from bigdl.nn.layer import *
from bigdl.util.common import *
import matplotlib.pyplot as plt
from os.path import isfile, join
from bigdl.nn.criterion import *
from pyspark import SparkContext
from bigdl.optim.optimizer import *
from bigdl.util.common import Sample
from bigdl.nn.keras.layer import Flatten
from bigdl.transform.vision.image import *
from pyspark.serializers import BatchedSerializer, PickleSerializer


#creates a spark context with 4 local nodes and a max 2GB runtime serialization memory
sc=SparkContext.getOrCreate(conf=create_spark_conf().setMaster("local[4]").set("spark.driver.memory","2g"))

#starts the engine
init_engine()

# Data Loading

In [2]:
#loading preprocessed data in 8 objects (each object containing a class images)
images1 = [cv2.imread(file) for file in glob.glob("data_1/*.jpg")]
images2 = [cv2.imread(file) for file in glob.glob("data_2/*.jpg")]
images3 = [cv2.imread(file) for file in glob.glob("data_3/*.jpg")]
images4 = [cv2.imread(file) for file in glob.glob("data_4/*.jpg")]
images5 = [cv2.imread(file) for file in glob.glob("data_5/*.jpg")]
images6 = [cv2.imread(file) for file in glob.glob("data_6/*.jpg")]
images7 = [cv2.imread(file) for file in glob.glob("data_7/*.jpg")]
images8 = [cv2.imread(file) for file in glob.glob("data_8/*.jpg")]

#splitting and scaling down number of images
images1_train = images1[0:8]
images1_test = images1[8:10]

images2_train = images2[0:8]
images2_test = images2[8:10]

images3_train = images3[0:8]
images3_test = images3[8:10]

images4_train = images4[0:8]
images4_test = images4[8:10]

images4_train = images4[0:8]
images4_test = images4[8:10]

images5_train = images5[0:8]
images5_test = images5[8:10]

images6_train = images6[0:8]
images6_test = images6[8:10]

images7_train = images7[0:8]
images7_test = images7[8:10]

images8_train = images8[0:8]
images8_test = images8[8:10]

In [3]:
#creating labels for train data
label_train = []

for i in range(64):
    if i <= 7:
        label_train.append(1.0)
    elif i <= 15:
        label_train.append(2.0)
    elif i <= 23:
        label_train.append(3.0)
    elif i <= 31:
        label_train.append(4.0)
    elif i <= 39:
        label_train.append(5.0)
    elif i <= 47:
        label_train.append(6.0)
    elif i <= 55:
        label_train.append(7.0)
    elif i <= 63:
        label_train.append(8.0)

In [4]:
#creating labels for test data
label_test = []

for i in range(16):
    if i <= 1:
        label_test.append(1.0)
    elif i <= 3:
        label_test.append(2.0)
    elif i <= 5:
        label_test.append(3.0)
    elif i <= 7:
        label_test.append(4.0)
    elif i <= 9:
        label_test.append(5.0)
    elif i <= 11:
        label_test.append(6.0)
    elif i <= 13:
        label_test.append(7.0)
    elif i <= 15:
        label_test.append(8.0)

In [5]:
#appending all datasets together for train
features_train = []

for i in range(8):
    features_train.append(images1_train[i])
    
for i in range(8):
    features_train.append(images2_train[i])
    
for i in range(8):
    features_train.append(images3_train[i])
    
for i in range(8):
    features_train.append(images4_train[i])
    
for i in range(8):
    features_train.append(images5_train[i])
    
for i in range(8):
    features_train.append(images6_train[i])
    
for i in range(8):
    features_train.append(images7_train[i])
    
for i in range(8):
    features_train.append(images8_train[i])

In [6]:
#appending all datasets together for test
features_test = []

for i in range(2):
    features_test.append(images1_test[i])
    
for i in range(2):
    features_test.append(images2_test[i])
    
for i in range(2):
    features_test.append(images3_test[i])
    
for i in range(2):
    features_test.append(images4_test[i])
    
for i in range(2):
    features_test.append(images5_test[i])
    
for i in range(2):
    features_test.append(images6_test[i])
    
for i in range(2):
    features_test.append(images7_test[i])
    
for i in range(2):
    features_test.append(images8_test[i])


# Transformations

In [7]:
#creating numpy arrays for the images
train_features = np.array(features_train)
test_features = np.array(features_test)

In [8]:
#creating numpy arrays for the labels
train_labels = np.array(label_train)
test_labels = np.array(label_test)

In [9]:
#creating train and test RDDs (only images)
train_rdd = sc.parallelize(train_features)
test_rdd = sc.parallelize(test_features)

In [10]:
#mapping for resize the data in images
train_rdd = train_rdd.map(lambda x: (cv2.resize(x, (28,  28))))  
test_rdd = test_rdd.map(lambda x: (cv2.resize(x, (28, 28))))  

In [11]:
#mapping for creating a sample of image and label in the RDD
train_rdd = train_rdd.map(lambda x: Sample.from_ndarray(x, train_labels))
test_rdd = test_rdd.map(lambda x: Sample.from_ndarray(x, test_labels)) #transform to grayscale

In [12]:
#testing the rdd
train_rdd.collect()

[Sample: features: [JTensor: storage: [252. 158.  82. ...  51.  41.  34.], shape: [28 28  3], float], labels: [JTensor: storage: [1. 1. 1. 1. 1. 1. 1. 1. 2. 2. 2. 2. 2. 2. 2. 2. 3. 3. 3. 3. 3. 3. 3. 3.
  4. 4. 4. 4. 4. 4. 4. 4. 5. 5. 5. 5. 5. 5. 5. 5. 6. 6. 6. 6. 6. 6. 6. 6.
  7. 7. 7. 7. 7. 7. 7. 7. 8. 8. 8. 8. 8. 8. 8. 8.], shape: [64], float],
 Sample: features: [JTensor: storage: [31. 42. 34. ... 48. 53. 51.], shape: [28 28  3], float], labels: [JTensor: storage: [1. 1. 1. 1. 1. 1. 1. 1. 2. 2. 2. 2. 2. 2. 2. 2. 3. 3. 3. 3. 3. 3. 3. 3.
  4. 4. 4. 4. 4. 4. 4. 4. 5. 5. 5. 5. 5. 5. 5. 5. 6. 6. 6. 6. 6. 6. 6. 6.
  7. 7. 7. 7. 7. 7. 7. 7. 8. 8. 8. 8. 8. 8. 8. 8.], shape: [64], float],
 Sample: features: [JTensor: storage: [134.  64.  87. ...  53.  45.  56.], shape: [28 28  3], float], labels: [JTensor: storage: [1. 1. 1. 1. 1. 1. 1. 1. 2. 2. 2. 2. 2. 2. 2. 2. 3. 3. 3. 3. 3. 3. 3. 3.
  4. 4. 4. 4. 4. 4. 4. 4. 5. 5. 5. 5. 5. 5. 5. 5. 6. 6. 6. 6. 6. 6. 6. 6.
  7. 7. 7. 7. 7. 7. 7. 7. 8. 8. 

In [13]:
test_rdd.collect()

[Sample: features: [JTensor: storage: [196. 209. 223. ...  59. 141. 176.], shape: [28 28  3], float], labels: [JTensor: storage: [1. 1. 2. 2. 3. 3. 4. 4. 5. 5. 6. 6. 7. 7. 8. 8.], shape: [16], float],
 Sample: features: [JTensor: storage: [  0.   0.   0. ... 208. 191. 134.], shape: [28 28  3], float], labels: [JTensor: storage: [1. 1. 2. 2. 3. 3. 4. 4. 5. 5. 6. 6. 7. 7. 8. 8.], shape: [16], float],
 Sample: features: [JTensor: storage: [0. 0. 0. ... 0. 0. 0.], shape: [28 28  3], float], labels: [JTensor: storage: [1. 1. 2. 2. 3. 3. 4. 4. 5. 5. 6. 6. 7. 7. 8. 8.], shape: [16], float],
 Sample: features: [JTensor: storage: [ 46.  67.  89. ... 126. 150. 178.], shape: [28 28  3], float], labels: [JTensor: storage: [1. 1. 2. 2. 3. 3. 4. 4. 5. 5. 6. 6. 7. 7. 8. 8.], shape: [16], float],
 Sample: features: [JTensor: storage: [ 48. 110. 128. ... 120. 129. 246.], shape: [28 28  3], float], labels: [JTensor: storage: [1. 1. 2. 2. 3. 3. 4. 4. 5. 5. 6. 6. 7. 7. 8. 8.], shape: [16], float],
 Sample

# BigDL: Defining Model 

In [14]:
# model = Sequential()
# model.add(Reshape([3, 28, 28]))
# model.add(SpatialConvolution(3, 32, 2, 2))
# model.add(ReLU())
# #model.add(TemporalMaxPooling(4))
# model.add(Linear(3 * 32 * 3 * 3, 128))   #can't decide what values to give here
# model.add(Linear(864, 8))
# model.add(LogSoftMax())

model = Sequential()
model.add(Reshape([3, 28, 28]))
model.add(SpatialConvolution(3, 6, 5, 5))
model.add(Tanh())
model.add(SpatialMaxPooling(2, 2, 2, 2))
model.add(Tanh())
model.add(SpatialConvolution(6, 12, 5, 5))
model.add(SpatialMaxPooling(2, 2, 2, 2))
model.add(Reshape([12 * 4 * 4]))
model.add(Linear(12 * 4 * 4, 100))
model.add(Tanh())
model.add(Linear(100, 8))
model.add(LogSoftMax())

creating: createSequential
creating: createReshape
creating: createSpatialConvolution
creating: createTanh
creating: createSpatialMaxPooling
creating: createTanh
creating: createSpatialConvolution
creating: createSpatialMaxPooling
creating: createReshape
creating: createLinear
creating: createTanh
creating: createLinear
creating: createLogSoftMax


<bigdl.nn.layer.Sequential at 0x119665d30>

# BigDL: Defining Training and Testing

In [15]:
optimizer = Optimizer(
    model=model,
    training_rdd=train_rdd,
    criterion=PoissonCriterion(),
    optim_method=SGD(learningrate=0.4, learningrate_decay=0.0002),
    end_trigger=MaxEpoch(20),
    batch_size=64)

optimizer.set_validation(
    batch_size=16,
    val_rdd=test_rdd,
    trigger=EveryEpoch(),
    val_method=[Top1Accuracy()]
)

creating: createPoissonCriterion
creating: createDefault
creating: createSGD
creating: createMaxEpoch
creating: createDistriOptimizer
creating: createEveryEpoch
creating: createTop1Accuracy


# BigDL: Training

In [None]:
train_model = optimizer.optimize()

# BigDL: Evaluation

In [None]:
predictions = train_model.predict(test_rdd)

# BigDL: Visualization

In [None]:
loss = np.array(train_summary.read_scalar("Loss"))
top1 = np.array(val_summary.read_scalar("Top1Accuracy"))

plt.figure(figsize = (12,12))
plt.subplot(2,1,1)
plt.plot(loss[:,0],loss[:,1],label='loss')
plt.xlim(0,loss.shape[0]+10)
plt.grid(True)
plt.title("loss")
plt.subplot(2,1,2)
plt.plot(top1[:,0],top1[:,1],label='top1')
plt.xlim(0,loss.shape[0]+10)
plt.title("top1 accuracy")
plt.grid(True)
