### In this script we build line detector model using  Multilayer perceptron classifier in pyspark
- BY: Abdelraouf Hawash 
- DATE: 23 / 12 / 2022

### import libraries

In [1]:
import numpy as np
import cv2
import os

from pyspark.ml.classification import MultilayerPerceptronClassifier, MultilayerPerceptronClassificationModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors

spark = SparkSession.builder.getOrCreate()

22/12/23 17:01:55 WARN Utils: Your hostname, Raouf-PC resolves to a loopback address: 127.0.1.1; using 192.168.1.100 instead (on interface wlp2s0)
22/12/23 17:01:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/23 17:02:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### important attributes and methods

In [2]:
classes = ['QR','empty','horizontal','lef3','left2','left1','center','right1','right2','right3']

def preprocessing (img, dest_size = (20,20), dest_rang: int = 16):
    '''
    this function resize the image then makes pixels in a certain range
    it takes about 0.00035 s
    the input image should be in gray scale
    '''
    if len(img.shape) == 3:
        img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    out = (cv2.resize(img, dest_size) * (dest_rang/255)).astype('uint8')
    return out.reshape( out.shape[0] * out.shape[1] )

def show_processed_img (input, dest_size = (20,20), input_range: int = 16):
    out = (input.reshape(dest_size) * (255/input_range)).astype('uint8')
    cv2.imshow("source image", out)
    k = cv2.waitKey(0)
    cv2.destroyAllWindows()
    return k

def draw_output (img, label):

    Y_length, x_length = img.shape[0], img.shape[1]
    
    if label == 'QR':
        cv2.putText(img, 'QR', (20,35), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 3, cv2.LINE_AA, 0)
    if label == 'empty':
        cv2.putText(img, 'empty', (20,35), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 3, cv2.LINE_AA, 0)
    if label == 'horizontal':
        cv2.putText(img, 'horizontal', (20,35), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 3, cv2.LINE_AA, 0)
    if label == 'lef3':
        cv2.line(img, (0, 0), (0, Y_length), (0, 0, 255), 10)
    if label == 'left2':
        cv2.line(img, (round(x_length/6), 0), (round(x_length/6), Y_length), (0, 0, 255), 10)
    if label == 'left1':
        cv2.line(img, (round(x_length*2/6), 0), (round(x_length*2/6), Y_length), (0, 0, 255), 10)
    if label == 'center':
        cv2.line(img, (round(x_length*3/6), 0), (round(x_length*3/6), Y_length), (0, 0, 255), 10)
    if label == 'right1':
        cv2.line(img, (round(x_length*4/6), 0), (round(x_length*4/6), Y_length), (0, 0, 255), 10)
    if label == 'right2':
        cv2.line(img, (round(x_length*5/6), 0), (round(x_length*5/6), Y_length), (0, 0, 255), 10)
    if label == 'right3':
        cv2.line(img, (x_length, 0), (x_length, Y_length), (0, 0, 255), 10)
                    
    return img
    

### loading data

In [3]:
X_data = np.load('./../data/X_data.npy')
print(X_data)
print(X_data.shape)
show_processed_img(X_data[1])

Y_data = np.load('./../data/y_data.npy')
print(Y_data)
print(Y_data.shape)

Y_data = np.asarray([classes.index(i) for i in Y_data])
print(Y_data)
print(Y_data.shape)

[[11 12 12 ... 12 12 12]
 [ 5  5  5 ...  8  9  9]
 [10 10 10 ...  8  8  7]
 ...
 [14 13 14 ...  7  8  7]
 [ 5  6  5 ... 10  8  8]
 [12 13 13 ...  9  7  7]]
(4563, 400)


qt.qpa.plugin: Could not find the Qt platform plugin "wayland" in "/home/raouf/.local/lib/python3.10/site-packages/cv2/qt/plugins"


['lef1' 'QR' 'empty' ... 'center' 'QR' 'QR']
(4563,)
[5 0 1 ... 6 0 0]
(4563,)


[Creating Spark dataframe from numpy matrix](https://stackoverflow.com/questions/45063591/creating-spark-dataframe-from-numpy-matrix)

In [4]:
data = [(int(y), Vectors.dense(x)) for (x,y) in zip(X_data,Y_data)]
print(data[1])

df = spark.createDataFrame(data, schema=['label', 'features'])
df.show()

(0, DenseVector([5.0, 5.0, 5.0, 6.0, 6.0, 7.0, 7.0, 11.0, 11.0, 13.0, 14.0, 14.0, 14.0, 14.0, 14.0, 14.0, 14.0, 14.0, 12.0, 12.0, 5.0, 6.0, 6.0, 5.0, 6.0, 7.0, 7.0, 10.0, 6.0, 3.0, 4.0, 4.0, 4.0, 7.0, 3.0, 3.0, 3.0, 14.0, 12.0, 12.0, 5.0, 6.0, 6.0, 6.0, 6.0, 7.0, 6.0, 11.0, 6.0, 11.0, 14.0, 14.0, 3.0, 14.0, 14.0, 14.0, 13.0, 14.0, 12.0, 12.0, 5.0, 5.0, 6.0, 5.0, 6.0, 7.0, 7.0, 10.0, 6.0, 2.0, 13.0, 13.0, 13.0, 5.0, 13.0, 3.0, 14.0, 14.0, 12.0, 12.0, 5.0, 5.0, 5.0, 5.0, 6.0, 7.0, 7.0, 10.0, 6.0, 2.0, 12.0, 4.0, 2.0, 4.0, 13.0, 3.0, 14.0, 14.0, 12.0, 11.0, 5.0, 5.0, 6.0, 6.0, 6.0, 7.0, 6.0, 10.0, 6.0, 11.0, 12.0, 13.0, 13.0, 13.0, 14.0, 14.0, 14.0, 14.0, 12.0, 11.0, 5.0, 5.0, 5.0, 5.0, 6.0, 7.0, 6.0, 10.0, 1.0, 1.0, 2.0, 5.0, 3.0, 4.0, 3.0, 3.0, 3.0, 14.0, 12.0, 12.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 2.0, 9.0, 5.0, 10.0, 12.0, 3.0, 3.0, 14.0, 14.0, 14.0, 3.0, 14.0, 5.0, 2.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 2.0, 9.0, 5.0, 10.0, 11.0, 13.0, 3.0, 14.0, 5.0, 14.0, 14.0, 14.0, 4.0, 2.0, 0.0, 0.0, 

[Stage 0:>                                                          (0 + 1) / 1]

22/12/17 06:51:50 WARN TaskSetManager: Stage 0 contains a task of very large size (3619 KiB). The maximum recommended task size is 1000 KiB.
+-----+--------------------+
|label|            features|
+-----+--------------------+
|    5|[11.0,12.0,12.0,1...|
|    0|[5.0,5.0,5.0,6.0,...|
|    1|[10.0,10.0,10.0,1...|
|    0|[11.0,11.0,11.0,1...|
|    6|[12.0,12.0,12.0,1...|
|    0|[11.0,11.0,11.0,1...|
|    1|[10.0,10.0,10.0,1...|
|    4|[13.0,13.0,13.0,1...|
|    1|[8.0,8.0,7.0,8.0,...|
|    0|[6.0,6.0,6.0,6.0,...|
|    1|[8.0,8.0,8.0,8.0,...|
|    2|[12.0,13.0,12.0,1...|
|    2|[14.0,14.0,14.0,1...|
|    8|[14.0,14.0,14.0,1...|
|    4|[10.0,7.0,4.0,4.0...|
|    4|[2.0,2.0,2.0,2.0,...|
|    1|[7.0,7.0,7.0,8.0,...|
|    1|[11.0,11.0,11.0,1...|
|    4|[11.0,11.0,12.0,1...|
|    5|[12.0,12.0,12.0,1...|
+-----+--------------------+
only showing top 20 rows



                                                                                

train test split

In [5]:
train, test = df.randomSplit([0.8, 0.2], 1234)

### building and training model

build

[Multilayer perceptron classifier](https://spark.apache.org/docs/3.3.1/ml-classification-regression.html#multilayer-perceptron-classifier)

In [7]:
# specify layers for the neural network:
# input layer of size 400 (features), two intermediate of size 5 and 4
# and output of size 10 (classes)
layers = [400, 200, 200, 200, 200 , 10]

# create the trainer and set its parameters
trainer = MultilayerPerceptronClassifier(maxIter=2000, layers=layers, blockSize=128, seed=1234)

train

In [None]:
# train the model >>> (70 mint )
model = trainer.fit(train)


compute accuracy on the test set

In [21]:
result = model.transform(test)
predictionAndLabels = result.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

22/12/16 22:48:06 WARN DAGScheduler: Broadcasting large task binary with size 1655.0 KiB
22/12/16 22:48:06 WARN TaskSetManager: Stage 2170 contains a task of very large size (3619 KiB). The maximum recommended task size is 1000 KiB.




Test set accuracy = 0.9022801302931596


                                                                                

### saving model

In [12]:
model.save("model")

22/12/16 22:39:58 WARN TaskSetManager: Stage 2161 contains a task of very large size (1621 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

### loading learned model

In [3]:
MLPC_model = MultilayerPerceptronClassificationModel.load('model')

                                                                                

### using model and calculating the time of processing

In [4]:
img = cv2.imread("./../prepare_data/root_data/QR/QR_code.jpg",0) # you must load it as gray scale image
e1 = cv2.getTickCount()
features = preprocessing(img)
prediction = MLPC_model.predict(Vectors.dense(features))
e2 = cv2.getTickCount()
time = (e2 - e1)/ cv2.getTickFrequency()

print("prediction = " , classes[round(prediction)])
print("time of processing = ",time," s") # time of processing =  0.02  s

prediction =  QR
time of processing =  1.937822578  s


### using live from camera

In [5]:
camera = cv2.VideoCapture(0)
while (camera.isOpened):
    ret, frame = camera.read()
    img = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    
    features = preprocessing(img)
    prediction = MLPC_model.predict(Vectors.dense(features))
    
    out_label = classes[round(prediction)]

    # draw the output
    output = draw_output(frame, out_label)
    cv2.imshow("output", output)
    # print(out_label)
    
    if cv2.waitKey(1) == ord('q'):
        break
    
cv2.destroyAllWindows()
camera.release()


qt.qpa.plugin: Could not find the Qt platform plugin "wayland" in "/home/raouf/.local/lib/python3.10/site-packages/cv2/qt/plugins"
