# Đồ án cuối kỳ Big Data

## Video Stream Analytics: Detect Object and Mask

### Môn học 
- Công nghệ Dữ liệu lớn - IE212.M11
- GV: Đỗ Trọng Hợp

### Nhóm sinh viên thực hiện
- Trang Hoàng Nhựt (18520123)
- Nguyễn Ngọc Quí (18520410)
- Nguyễn Thị Phương (18520135)
- Lê Thị Minh Hiền (18520049)
  

## 1. Khởi tạo

### 1.1 Khai báo thư viện sử dụng

In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as f


# Xử lý ảnh
import numpy as np
import cv2
import base64

from tensorflow.keras.models import load_model
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array

from timeit import default_timer as timer

### 1.2 Khai báo đường dẫn

In [2]:
ORIGIN_PATH = "D:/uit/IE212 - Bigdata/final-project"

INPUT = ORIGIN_PATH + "/streaming/input"
OUTPUT = ORIGIN_PATH + "/streaming/output"

OBJECT_DETECTOR = ORIGIN_PATH + "/model/object-detector"
FACE_MASK_DETECTOR = ORIGIN_PATH + "/model/face-mask-detector"

SERVING = ORIGIN_PATH + '/serving'

### 1.3 Khởi tạo spark

In [3]:
from pyspark.sql.types import StructType

spark = SparkSession \
    .builder \
    .appName("BIG DATA - Video Stream Analytics") \
    .getOrCreate()

### 1.4 Khai báo một số hàm dùng chung

Chuyển đổi định dạng file ảnh tương ứng:
- **byte**: lưu trữ spark
- **numpy array**: sử dụng cho các model
- **base64**: dữ liệu raw từ spark streaming

In [4]:
def convert_byte_to_nparr(img_byte):
    np_array = cv2.imdecode(np.asarray(bytearray(img_byte)), cv2.IMREAD_COLOR)
    return np_array

In [5]:
def convert_nparr_to_byte(img_np_array):
    success, img = cv2.imencode('.png', img_np_array)
    return img.tobytes()

In [6]:
def convert_base64_to_nparr(raw_base64):
    im_bytes = base64.b64decode(raw_base64)
    im_arr = np.frombuffer(im_bytes, dtype=np.uint8)
    return cv2.imdecode(im_arr, flags=cv2.IMREAD_COLOR)

Lưu hình ảnh

In [7]:
def save_image(path, file_name, image):
    cv2.imwrite(path + "/" + file_name, image)

Chuẩn hóa ***box(startX, startY, endX, endY)*** với hình ảnh có kích thước **(image_h, image_w)** tương với với hình ảnh có kích thước **(net_h, net_w)** (ảnh gốc).

In [8]:
def correct_boxes(startX, startY, endX, endY, image_h, image_w, net_h, net_w):
    startX = startX * net_h / image_h
    startY = startY * net_h / image_h
    endX = endX * net_w / image_w
    endY = endY * net_w / image_w
    return int(startX), int(startY), int(endX), int(endY)

Vẽ box với label tương ứng vào hình.

In [9]:
def draw_box(image, startX, startY, endX, endY, label):
    image = cv2.rectangle(image, (startX, startY), (endX, endY), (36,255,1))
    cv2.putText(image, label, (startX, startY-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (36,255,12), 2)
    return image

## 2. Class Object Detector
Input:
- **weights_path**: đường dẫn chứa file yolov3.weights
- **configuration_path**: dường dẫn chứa file yolov3.cfg
- **labels**: tương ứng với mô hình yolov3

Mô hình sử dụng 2 thông số mặc định là **probability minimum = 0.5 và thresold = 0.3**.

Mô hình *YOLO - You Only Look Once (v3)* là một trong những mô hình phát hiện vật thể tốt nhất. Mô hình phát hiện được 80 vật thể: person, bicycle, car, motorbike, aeroplane, bus, train,... Chi tiết xem tại file: *./model/object-detector/coco.names*.

Mô hình YOLO sau khi lượt bỏ các đối tượng có độ chính xác thấp hơn **probability** (0.5), vẫn có khả năng sẽ phát hiện trùng lặp xung quanh một đối tượng. Vì thế, ta cần phải sử dụng **Non-Maximum Suppression (NMS)**, còn gọi là **Non-Maxima Suppression** để loại bỏ đi các đối tượng bị trùng lặp.


In [10]:
# load danh sách label của mô hình YOLO từ file coco.names
labels = open(OBJECT_DETECTOR + '/coco.names').read().strip().split('\n')

In [11]:
class Object_Detector():
    """Object Detector 
  
    """
    def __init__(self,
                 weights_path = 'yolov3.weights',
                 configuration_path = 'file yolov3.cfg',
                 labels = []):
        
        self.weights_path = weights_path
        self.configuration_path = configuration_path
        
        self.probability_minimum = 0.5
        self.threshold = 0.3
        
        self.labels = labels
        
        # Load mạng network của model YOLO
        network = cv2.dnn.readNetFromDarknet(configuration_path, weights_path)
        layers_names_all = network.getLayerNames()
        layers_names_output = [layers_names_all[i - 1] for i in network.getUnconnectedOutLayers()]
        self.network = network
        self.layers_names_output = layers_names_output
        
        # Non-Maximum Suppression (NMS) 
        self.NMSBoxes = cv2.dnn.NMSBoxes
    def solve(self, image):
        """solve
            input: image (numpy array)
            output: Danh sách các đối tượng phát hiện tương ứng 
                [(startX, startY, endX, endY, label, confidences, obj_byte)]
        """
        input_shape = image.shape
        h, w = input_shape[:2]
        
        # Chuẩn hóa lại hình ảnh sử dụng model YOLO
        blob = cv2.dnn.blobFromImage(image, 1 / 255.0, (416, 416), swapRB=True, crop=False)
    
        self.network.setInput(blob)
   
        output_from_network = self.network.forward(self.layers_names_output)
        
        bounding_boxes = []
        confidences = []
        class_numbers = []
    
        for result in output_from_network:
            for detection in result:
                # Lấy danh sách các scores tương ứng với kết quả và tìm ra class với score lớn nhất
                scores = detection[5:]
                class_current = np.argmax(scores)
                confidence_current = scores[class_current]
                
                # probability_minimum: 0.5
                if confidence_current > self.probability_minimum:
                    # Tính toán các giá trị: x_min, y_min, box_width, box_height, confidences, class_numbers
                    # sử dụng cho Non-Maximum Suppression
                    box_current = detection[0:4] * np.array([w, h, w, h])
                    x_center, y_center, box_width, box_height = box_current.astype('int')
                    x_min = int(x_center - (box_width / 2))
                    y_min = int(y_center - (box_height / 2))
                    bounding_boxes.append([x_min, y_min, int(box_width), int(box_height)])
                    confidences.append(float(confidence_current))
                    class_numbers.append(class_current)
                    
        # Kiểm tra nếu không tồn tại đối tượng trả về []
        if len(bounding_boxes) == 0:
            return []
        
        # Tiến hành chuẩn hóa lại Boxes bằng cách sử dụng Non-Maximum Suppression
        results = self.NMSBoxes(bounding_boxes, confidences, self.probability_minimum, self.threshold)
        
        obj_detection = []
        for i in results.flatten():
            # Đối với đối tượng phát hiện được ta tiến hành tính toán các giá trị:
            # startX, startY, endX, endY, label, confidence, obj_byte
            
            x_min, y_min = bounding_boxes[i][0], bounding_boxes[i][1]
            box_width, box_height = bounding_boxes[i][2], bounding_boxes[i][3]

            startX = x_min
            startY = y_min
            endX = x_min + box_width
            endY = y_min + box_height

            (startX, startY) = (max(0, startX), max(0, startY))
            (endX, endY) = (min(w - 1, endX), min(h - 1, endY))
            
            obj_image = image[startY:endY, startX:endX]
            obj_byte = convert_nparr_to_byte(obj_image)
            
            obj_detection.append((startX, startY, endX, endY, labels[int(class_numbers[i])], confidences[i], obj_byte))
        return obj_detection

## 3. Class Face Mask Detector
Input:
- **prototxtPath**: đường dẫn chứa file deploy.prototxt (Caffe model)
- **weightsPath**: dường dẫn chứa file res10_300x300_ssd_iter_140000.caffemodel (Caffe model)
- **mask_model_path**: Đường dẫn chứa file masknet_vgg19.h5

Đối với mô hình Caffe, nhóm sử dụng thông số mặc định: **probability minimum = 0.5**.

Sử dụng pretrain *model VGG19* của keras để train cho mô hình phát hiện khẩu trang có **accurency: 98.3%** đối với bộ dữ liệu  [Face Mask Detection ~12K Images Dataset](https://www.kaggle.com/ashishjangra27/face-mask-12k-images-dataset).

In [12]:
class Face_Mask_Detector():
    """Face Mask Detector
    """
    def __init__(self,
                 prototxtPath = "deploy.prototxt",
                 weightsPath = "res10_300x300_ssd_iter_140000.caffemodel",
                 mask_model_path = "masknet_vgg19.h5"):

        self.net = cv2.dnn.readNet(prototxtPath, weightsPath)
        self.vgg19_model = load_model(mask_model_path)
        self.probability_minimum = 0.5


    def face_mask_detection(self, face):
        """face_mask_detection: Kiểm tra xem khuôn mặt có đeo khẩu trang hay không
            input: face (numpy array)
            output: with_mask hoặc without_mask và confidence tương tứng. 
        """
        face = cv2.resize(face, (128, 128))
        face = face / 255.0

        face = np.expand_dims(face, axis=0)
        
        # model tính toán và trả về tỉ lệ tương ứng    
        (mask, withoutMask) = self.vgg19_model.predict(face)[0]
        
        label = 'with_mask'
        confidence = mask
        if mask < withoutMask:
            label = 'without_mask'
            confidence = withoutMask
            
        return label, confidence
    def solve(self, x_origin, y_origin, image):
        """solve
            input: image (numpy array)
            output: Danh sách các đối tượng phát hiện tương ứng 
                [(startX, startY, endX, endY, label, confidences, obj_byte)]
        """
        (net_h, net_w) = image.shape[:2]
        (h, w) = image.shape[:2]
        
        # Chuẩn hóa lại hình ảnh sử dụng model Caffe
        blob = cv2.dnn.blobFromImage(cv2.resize(image, (300, 300)), 1.0, (300, 300), (104.0, 177.0, 123.0))
        self.net.setInput(blob)
        detections = self.net.forward()

        extracted_face_list = []
        
        # Xử lý với từng khuôn mặt phát hiện
        for i in range(0, detections.shape[2]):
            confidence = detections[0, 0, i, 2]
            # probability_minimum: 0.5
            if confidence > self.probability_minimum:
                # Tính toán các giá trị: startX, startY, endX, endY, label, confidence, obj_byte
                box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
                (startX, startY, endX, endY) = box.astype("int")
                (startX, startY) = (max(0, startX), max(0, startY))
                (endX, endY) = (min(w - 1, endX), min(h - 1, endY))
                
                # Tính toán lại các trị startX, startY, endX, endY tương ứng với kích thước ảnh gốc
                startX, startY, endX, endY = correct_boxes(startX, startY, endX, endY, h, w, net_h, net_w)
                
                # Kiểm tra xem khuôn mặt có đeo khẩu trang hay không
                face = image[startY:endY, startX:endX]
                label, confidence = self.face_mask_detection(face)
                
                obj_byte = convert_nparr_to_byte(face)
                extracted_face_list.append(
                    (startX + x_origin, startY + y_origin, endX + x_origin, endY + y_origin
                     , label, float(confidence), obj_byte))
                
        return extracted_face_list

## 4. Detection
Input:
- **raw_img_content**: hình ảnh ở định dạng base64

Output: 
- Danh sách các đối tượng phát hiện tương ứng

In [13]:
def detection(raw_img_content, id):
    """detection
        input: raw_img_content (base64)
        output: Danh sách các đối tượng phát hiện tương ứng 
            [(startX, startY, endX, endY, label, confidences, obj_byte)]
    """
#     start ...
    start = timer() 
    
    image_input = convert_base64_to_nparr(raw_img_content)
    
    # Khai báo đối tượng Object Detector
    obj_detector = Object_Detector(
        weights_path = OBJECT_DETECTOR + "/yolov3.weights",
        configuration_path = OBJECT_DETECTOR + "/yolov3.cfg",
        labels = labels,
    )
    
    # Khai báo đối tượng Face Mask Detector
    face_mask_detector = Face_Mask_Detector(
        prototxtPath = FACE_MASK_DETECTOR + "/deploy.prototxt",
        weightsPath = FACE_MASK_DETECTOR + "/res10_300x300_ssd_iter_140000.caffemodel",
        mask_model_path = FACE_MASK_DETECTOR + "/masknet_vgg19.h5"
    )
    
    # Model YOLOv3
    extracted_object_list = obj_detector.solve(image_input)
    
    result = []
    extracted_face_mask_list = []
    for x in extracted_object_list:
        result.append(x)
        
        # x = (startX, startY, endX, endY, label, confidences, obj_byte)
        # Kiểm tra đối tượng detect được có phải là person hay không.
        if x[4] == 'person':
            person = convert_byte_to_nparr(x[6])
            
            # Model Face Mask
            face_mask_list = face_mask_detector.solve(x[0], x[1], person)
            for face_mask in face_mask_list:
                extracted_face_mask_list.append(face_mask)

    for x in extracted_face_mask_list:
        result.append(x)
        
    # Lưu hình ảnh để hiện thị cho client
    save_image(SERVING, 'input.png', image_input)

    img_draw_box = image_input
    for x in result:
        startX, startY, endX, endY, label, confidence, obj_byte = x
        
        label = "%s (%.3f)" % (label, confidence)
        
        img_draw_box = draw_box(img_draw_box, startX, startY, endX, endY, label)
    
    # Lưu hình ảnh tương ứng với mỗi input
    save_image(OUTPUT, 'images_{:05n}.png'.format(id), img_draw_box)
    
    # Lưu hình ảnh để hiện thị cho client
    save_image(SERVING, 'output.png', img_draw_box)
#     end ...
    end = timer()
    delta = end - start
    print(('\nDone ' + str(id) + ' after ' + str(delta) + ' seconds.\n'))

    return result

### 4.1 Định nghĩa schema

In [14]:
object_extraction_schema = ArrayType(StructType([
    StructField("startX", IntegerType(), False),
    StructField("startY", IntegerType(), False),
    StructField("endX", IntegerType(), False),
    StructField("endY", IntegerType(), False),
    StructField("label", StringType(), False),
    StructField("confidence", FloatType(), False),
    StructField("img_content", BinaryType(), False),
]))

Object_Extraction_UDF = f.udf(lambda Image, id: detection(Image, id),
                            object_extraction_schema)

## 5. Tiến hành Streaming dữ liệu

### 5.1 Streaming dữ liệu
Dữ liệu sẽ được đọc stream từ file csv ở thư mục **./streaming/input** với schema tương ứng

In [15]:
sch = 'image STRING, timestamp STRING, src STRING, id INTEGER'
df = spark.readStream.schema(sch).csv(INPUT).withColumn('timestamp', f.to_timestamp("timestamp"))

# df.select('timestamp', 'src', 'id').show(truncate=False)

### Xử lý hình ảnh

Tiến hành quá trình detect object thông qua hàm **Object_Extraction_UDF**.

Dữ liệu sẽ được thêm vào cột **detected_object_list** có cấu trúc: **object_extraction_schema**

In [16]:
detected_object_list_df = df.withColumn("detected_object_list",
    Object_Extraction_UDF("image", "id"))

detected_object_list_df.printSchema()
# detected_object_list_df.show()

root
 |-- image: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- src: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- detected_object_list: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- startX: integer (nullable = false)
 |    |    |-- startY: integer (nullable = false)
 |    |    |-- endX: integer (nullable = false)
 |    |    |-- endY: integer (nullable = false)
 |    |    |-- label: string (nullable = false)
 |    |    |-- confidence: float (nullable = false)
 |    |    |-- img_content: binary (nullable = false)



In [17]:
detected_object_df = detected_object_list_df.withColumn("extracted_object",
  f.explode(f.col("detected_object_list"))).drop("detected_object_list")

detected_object_df.printSchema()
# detected_object_df.show()

root
 |-- image: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- src: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- extracted_object: struct (nullable = true)
 |    |-- startX: integer (nullable = false)
 |    |-- startY: integer (nullable = false)
 |    |-- endX: integer (nullable = false)
 |    |-- endY: integer (nullable = false)
 |    |-- label: string (nullable = false)
 |    |-- confidence: float (nullable = false)
 |    |-- img_content: binary (nullable = false)



In [18]:
flat_detected_object_df = detected_object_df.select(f.col("image"), f.col("timestamp"), f.col("src"), f.col("id"),
      f.col("extracted_object.startX").alias("startX"),
    f.col("extracted_object.startY").alias("startY"),
    f.col("extracted_object.endX").alias("endX"),
    f.col("extracted_object.endY").alias("endY"),
    f.col("extracted_object.label").alias("label"),
    f.col("extracted_object.confidence").alias("confidence"),
    f.col("extracted_object.img_content").alias("img_content"))

flat_detected_object_df.printSchema()
# flat_detected_object_df.show()

root
 |-- image: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- src: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- startX: integer (nullable = true)
 |-- startY: integer (nullable = true)
 |-- endX: integer (nullable = true)
 |-- endY: integer (nullable = true)
 |-- label: string (nullable = true)
 |-- confidence: float (nullable = true)
 |-- img_content: binary (nullable = true)



### Tổng kết lại quá trình xử lý

In [19]:
summary = flat_detected_object_df.groupBy('id').agg(f.collect_set('label').alias('detection')) \
    .withColumn('count', f.size('detection')) \
    .sort("id", ascending=False)
summary.printSchema()
# summary.show()

root
 |-- id: integer (nullable = true)
 |-- detection: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- count: integer (nullable = false)



In [20]:
summary.writeStream.format('console') \
    .outputMode('complete') \
    .option("truncate", "false") \
    .start()


<pyspark.sql.streaming.StreamingQuery at 0x1f5d107b850>