# 匯入Tensorflow

In [1]:
import tensorflow
if tensorflow.__version__.startswith('1.'):
    import tensorflow as tf
    from tensorflow.python.platform import gfile
else:
    import tensorflow as v2
    import tensorflow.compat.v1 as tf
    tf.disable_v2_behavior()
    import tensorflow.compat.v1.gfile as gfile
print("Tensorflow version:{}".format(tf.__version__))

Instructions for updating:
non-resource variables are not supported in the long term
Tensorflow version:2.6.0


# 匯入其他套件

In [2]:
import cv2,time,PIL,io,html,os
import numpy as np

# COLAB執行串流所需套件

In [3]:
from IPython.display import display, Javascript, Image
from google.colab.output import eval_js
from base64 import b64decode, b64encode

# 影像串流初始化(Colab)

In [4]:
#從Javascript 擷取出圖片
def js_to_image(js_reply):
  
  #對圖片進行解碼
  image_bytes = b64decode(js_reply.split(',')[1])
  #轉換成ndarray
  jpg_as_np = np.frombuffer(image_bytes, dtype=np.uint8)
  #轉換成BGR影像格式
  img = cv2.imdecode(jpg_as_np, flags=1)

  return img
def bbox_to_bytes(bbox_array):
  #轉換成RGBA的格式
  bbox_PIL = PIL.Image.fromarray(bbox_array, 'RGBA')
  iobuf = io.BytesIO()
  bbox_PIL.save(iobuf, format='png')
  bbox_bytes = 'data:image/png;base64,{}'.format((str(b64encode(iobuf.getvalue()), 'utf-8')))

  return bbox_bytes

In [5]:
def video_stream():
  js = Javascript('''
    var video;
    var div = null;
    var stream;
    var captureCanvas;
    var imgElement;
    var labelElement;
    
    var pendingResolve = null;
    var shutdown = false;
    
    function removeDom() {
       stream.getVideoTracks()[0].stop();
       video.remove();
       div.remove();
       video = null;
       div = null;
       stream = null;
       imgElement = null;
       captureCanvas = null;
       labelElement = null;
    }
    
    function onAnimationFrame() {
      if (!shutdown) {
        window.requestAnimationFrame(onAnimationFrame);
      }
      if (pendingResolve) {
        var result = "";
        if (!shutdown) {
          captureCanvas.getContext('2d').drawImage(video, 0, 0, 640, 480);
          result = captureCanvas.toDataURL('image/jpeg', 0.8)
        }
        var lp = pendingResolve;
        pendingResolve = null;
        lp(result);
      }
    }
    
    async function createDom() {
      if (div !== null) {
        return stream;
      }

      div = document.createElement('div');
      div.style.border = '2px solid black';
      div.style.padding = '3px';
      div.style.width = '100%';
      div.style.maxWidth = '600px';
      document.body.appendChild(div);
      
      const modelOut = document.createElement('div');
      modelOut.innerHTML = "<span>Status:</span>";
      labelElement = document.createElement('span');
      labelElement.innerText = 'No data';
      labelElement.style.fontWeight = 'bold';
      modelOut.appendChild(labelElement);
      div.appendChild(modelOut);
           
      video = document.createElement('video');
      video.style.display = 'block';
      video.width = div.clientWidth - 6;
      video.setAttribute('playsinline', '');
      video.onclick = () => { shutdown = true; };
      stream = await navigator.mediaDevices.getUserMedia(
          {video: { facingMode: "environment"}});
      div.appendChild(video);

      imgElement = document.createElement('img');
      imgElement.style.position = 'absolute';
      imgElement.style.zIndex = 1;
      imgElement.onclick = () => { shutdown = true; };
      div.appendChild(imgElement);
      
      const instruction = document.createElement('div');
      instruction.innerHTML = 
          '<span style="color: red; font-weight: bold;">' +
          'When finished, click here or on the video to stop this demo</span>';
      div.appendChild(instruction);
      instruction.onclick = () => { shutdown = true; };
      
      video.srcObject = stream;
      await video.play();

      captureCanvas = document.createElement('canvas');
      captureCanvas.width = 640; //video.videoWidth;
      captureCanvas.height = 480; //video.videoHeight;
      window.requestAnimationFrame(onAnimationFrame);
      
      return stream;
    }
    async function stream_frame(label, imgData) {
      if (shutdown) {
        removeDom();
        shutdown = false;
        return '';
      }

      var preCreate = Date.now();
      stream = await createDom();
      
      var preShow = Date.now();
      if (label != "") {
        labelElement.innerHTML = label;
      }
            
      if (imgData != "") {
        var videoRect = video.getClientRects()[0];
        imgElement.style.top = videoRect.top + "px";
        imgElement.style.left = videoRect.left + "px";
        imgElement.style.width = videoRect.width + "px";
        imgElement.style.height = videoRect.height + "px";
        imgElement.src = imgData;
      }
      
      var preCapture = Date.now();
      var result = await new Promise(function(resolve, reject) {
        pendingResolve = resolve;
      });
      shutdown = false;
      
      return {'create': preShow - preCreate, 
              'show': preCapture - preShow, 
              'capture': Date.now() - preCapture,
              'img': result};
    }
    ''')

  display(js)
  
def video_frame(label, bbox):
  data = eval_js('stream_frame("{}", "{}")'.format(label, bbox))
  return data

# 恢復PB檔案函數

In [6]:
def model_restore_from_pb(pb_path,node_dict,GPU_ratio=None):
  tf_node_dict = dict()
  with tf.Graph().as_default():
    config = tf.ConfigProto(log_device_placement=True,
                            allow_soft_placement=True,
                            )
    if GPU_ratio is None:
      config.gpu_options.allow_growth = True  
    else:
      config.gpu_options.per_process_gpu_memory_fraction = GPU_ratio 

    sess_pb = tf.Session(config=config)
    with gfile.FastGFile(pb_path, 'rb') as f:
      content = f.read()
      graph_def = tf.GraphDef()
      graph_def.ParseFromString(content)
      sess_pb.graph.as_default()
      
      tf.import_graph_def(graph_def, name='')# 匯入計算圖

    sess_pb.run(tf.global_variables_initializer())
    for key,value in node_dict.items():
      try:
          node = sess_pb.graph.get_tensor_by_name(value)
          tf_node_dict[key] = node
      except:
          print("節點名稱:{}不存在".format(key))
    return sess_pb,tf_node_dict

# 人臉偵測相關函數

In [7]:
def generate_anchors(feature_map_sizes, anchor_sizes, anchor_ratios, offset=0.5):
  '''
  generate anchors.
  :param feature_map_sizes: list of list, for example: [[40,40], [20,20]]
  :param anchor_sizes: list of list, for example: [[0.05, 0.075], [0.1, 0.15]]
  :param anchor_ratios: list of list, for example: [[1, 0.5], [1, 0.5]]
  :param offset: default to 0.5
  :return:
  '''
  anchor_bboxes = list()
  for idx, feature_size in enumerate(feature_map_sizes):
    cx = (np.linspace(0, feature_size[0] - 1, feature_size[0]) + 0.5) / feature_size[0]
    cy = (np.linspace(0, feature_size[1] - 1, feature_size[1]) + 0.5) / feature_size[1]
    cx_grid, cy_grid = np.meshgrid(cx, cy)
    cx_grid_expend = np.expand_dims(cx_grid, axis=-1)
    cy_grid_expend = np.expand_dims(cy_grid, axis=-1)
    center = np.concatenate((cx_grid_expend, cy_grid_expend), axis=-1)

    num_anchors = len(anchor_sizes[idx]) +  len(anchor_ratios[idx]) - 1
    center_tiled = np.tile(center, (1, 1, 2* num_anchors))
    anchor_width_heights = []

    # different scales with the first aspect ratio
    for scale in anchor_sizes[idx]:
        ratio = anchor_ratios[idx][0] # select the first ratio
        width = scale * np.sqrt(ratio)
        height = scale / np.sqrt(ratio)
        anchor_width_heights.extend([-width / 2.0, -height / 2.0, width / 2.0, height / 2.0])

    # the first scale, with different aspect ratios (except the first one)
    for ratio in anchor_ratios[idx][1:]:
        s1 = anchor_sizes[idx][0] # select the first scale
        width = s1 * np.sqrt(ratio)
        height = s1 / np.sqrt(ratio)
        anchor_width_heights.extend([-width / 2.0, -height / 2.0, width / 2.0, height / 2.0])

    bbox_coords = center_tiled + np.array(anchor_width_heights)
    bbox_coords_reshape = bbox_coords.reshape((-1, 4))
    anchor_bboxes.append(bbox_coords_reshape)
  anchor_bboxes = np.concatenate(anchor_bboxes, axis=0)
  return anchor_bboxes

def decode_bbox(anchors, raw_outputs, variances=[0.1, 0.1, 0.2, 0.2]):
    '''
    Decode the actual bbox according to the anchors.
    the anchor value order is:[xmin,ymin, xmax, ymax]
    :param anchors: numpy array with shape [batch, num_anchors, 4]
    :param raw_outputs: numpy array with the same shape with anchors
    :param variances: list of float, default=[0.1, 0.1, 0.2, 0.2]
    :return:
    '''
    anchor_centers_x = (anchors[:, :, 0:1] + anchors[:, :, 2:3]) / 2
    anchor_centers_y = (anchors[:, :, 1:2] + anchors[:, :, 3:]) / 2
    anchors_w = anchors[:, :, 2:3] - anchors[:, :, 0:1]
    anchors_h = anchors[:, :, 3:] - anchors[:, :, 1:2]
    raw_outputs_rescale = raw_outputs * np.array(variances)
    predict_center_x = raw_outputs_rescale[:, :, 0:1] * anchors_w + anchor_centers_x
    predict_center_y = raw_outputs_rescale[:, :, 1:2] * anchors_h + anchor_centers_y
    predict_w = np.exp(raw_outputs_rescale[:, :, 2:3]) * anchors_w
    predict_h = np.exp(raw_outputs_rescale[:, :, 3:]) * anchors_h
    predict_xmin = predict_center_x - predict_w / 2
    predict_ymin = predict_center_y - predict_h / 2
    predict_xmax = predict_center_x + predict_w / 2
    predict_ymax = predict_center_y + predict_h / 2
    predict_bbox = np.concatenate([predict_xmin, predict_ymin, predict_xmax, predict_ymax], axis=-1)
    return predict_bbox

def single_class_non_max_suppression(bboxes, confidences, conf_thresh=0.2, iou_thresh=0.5, keep_top_k=-1):
    '''
    do nms on single class.
    Hint: for the specific class, given the bbox and its confidence,
    1) sort the bbox according to the confidence from top to down, we call this a set
    2) select the bbox with the highest confidence, remove it from set, and do IOU calculate with the rest bbox
    3) remove the bbox whose IOU is higher than the iou_thresh from the set,
    4) loop step 2 and 3, util the set is empty.
    :param bboxes: numpy array of 2D, [num_bboxes, 4]
    :param confidences: numpy array of 1D. [num_bboxes]
    :param conf_thresh:
    :param iou_thresh:
    :param keep_top_k:
    :return:
    '''
    if len(bboxes) == 0: return []

    conf_keep_idx = np.where(confidences > conf_thresh)[0]

    bboxes = bboxes[conf_keep_idx]
    confidences = confidences[conf_keep_idx]

    pick = []
    xmin = bboxes[:, 0]
    ymin = bboxes[:, 1]
    xmax = bboxes[:, 2]
    ymax = bboxes[:, 3]

    area = (xmax - xmin + 1e-3) * (ymax - ymin + 1e-3)
    idxs = np.argsort(confidences)

    while len(idxs) > 0:
      last = len(idxs) - 1
      i = idxs[last]
      pick.append(i)

      # keep top k
      if keep_top_k != -1:
          if len(pick) >= keep_top_k:
              break

      overlap_xmin = np.maximum(xmin[i], xmin[idxs[:last]])
      overlap_ymin = np.maximum(ymin[i], ymin[idxs[:last]])
      overlap_xmax = np.minimum(xmax[i], xmax[idxs[:last]])
      overlap_ymax = np.minimum(ymax[i], ymax[idxs[:last]])
      overlap_w = np.maximum(0, overlap_xmax - overlap_xmin)
      overlap_h = np.maximum(0, overlap_ymax - overlap_ymin)
      overlap_area = overlap_w * overlap_h
      overlap_ratio = overlap_area / (area[idxs[:last]] + area[i] - overlap_area)

      need_to_be_deleted_idx = np.concatenate(([last], np.where(overlap_ratio > iou_thresh)[0]))
      idxs = np.delete(idxs, need_to_be_deleted_idx)

    return conf_keep_idx[pick]

# 人臉偵測類別

In [8]:
class face_detection():
  def __init__(self,face_pb_path,margin = 40):
      
    node_dict = {'input':'data_1:0',
            'detection_bboxes':'loc_branch_concat_1/concat:0',
            'detection_scores':'cls_branch_concat_1/concat:0'}
    conf_thresh = 0.5
    iou_thresh = 0.6
    #----anchors config
    feature_map_sizes = [[33, 33], [17, 17], [9, 9], [5, 5], [3, 3]]
    anchor_sizes = [[0.04, 0.056], [0.08, 0.11], [0.16, 0.22], [0.32, 0.45], [0.64, 0.72]]
    anchor_ratios = [[1, 0.62, 0.42]] * 5
    
    #----generate anchors
    anchors = generate_anchors(feature_map_sizes, anchor_sizes, anchor_ratios)
    # for inference , the batch size is 1, the model output shape is [1, N, 4],
    # so we expand dim for anchors to [1, anchor_num, 4]
    anchors_exp = np.expand_dims(anchors, axis=0)
    sess,face_node_dict = model_restore_from_pb(face_pb_path, node_dict)
    tf_input = face_node_dict['input']
    shape = tf_input.shape
    model_shape = [None,shape[1].value,shape[2].value,shape[3].value]
#         print("model_shape = ", model_shape)
    detection_bboxes = node_dict['detection_bboxes']
    detection_scores = node_dict['detection_scores']
    
    self.margin = margin
    self.conf_thresh = conf_thresh
    self.iou_thresh = iou_thresh
    self.anchors_exp = anchors_exp
    self.model_shape = model_shape
    self.tf_input = tf_input
    self.sess = sess
    self.detection_bboxes = detection_bboxes
    self.detection_scores = detection_scores
  def infer(self,img):
    coors = list()
    height,width,_ = img.shape
    #----image processing
    img_resized = cv2.resize(img, (self.model_shape[2], self.model_shape[1]))
    img_resized = cv2.cvtColor(img_resized, cv2.COLOR_BGR2RGB)
    img_resized = img_resized.astype('float32')
    img_resized /= 255
    
    #----mask detection
    feed_dict={self.tf_input: np.expand_dims(img_resized, axis=0)}
    y_bboxes_output, y_cls_output = self.sess.run([self.detection_bboxes, self.detection_scores],
                            feed_dict=feed_dict)
    y_bboxes = decode_bbox(self.anchors_exp, y_bboxes_output)[0]
    y_cls = y_cls_output[0]
    # To speed up, do single class NMS, not multiple classes NMS.
    bbox_max_scores = np.max(y_cls, axis=1)
    bbox_max_score_classes = np.argmax(y_cls, axis=1)

    # keep_idx is the alive bounding box after nms.
    keep_idxs = single_class_non_max_suppression(y_bboxes,
                        bbox_max_scores,
                        conf_thresh=self.conf_thresh,
                        iou_thresh=self.iou_thresh,
                        )
    #====draw bounding box
    for idx in keep_idxs:
        conf = float(bbox_max_scores[idx])
        class_id = bbox_max_score_classes[idx]
        bbox = y_bboxes[idx]
        # clip the coordinate, avoid the value exceed the image boundary.
        xmin = max(0, int(bbox[0] * width - self.margin // 2))
        ymin = max(0, int(bbox[1] * height - self.margin // 2))
        xmax = min(int(bbox[2] * width + self.margin // 2), width)
        ymax = min(int(bbox[3] * height + self.margin // 2), height)
        coors.append((xmin,ymin,xmax,ymax))
        
    return coors

# 影像串流與口罩判斷的函數

In [9]:
def mask_or_not(face_pb_path,mask_pb_path,margin=40):
  #----var
  # face_pb_path = "face_detection.pb"
  frame_count = 0
  bbox = ''
  FPS = "Initialing"
  label_html = 'Capturing...'
  nodename_dict = {
            'input': 'input:0',
            'keep_prob': 'keep_prob:0',
            'prediction': 'prediction:0'
           }
  label2classname_dict = {0:'no_mask',1:"with_mask"}
                                       
  #----影像串流初始化
  # cap, height, width = video_init(video_source)
  # print("影像高度:",height)
  # print("影像寬度:",width)

  #----人臉偵測器初始化
  find_face = face_detection(face_pb_path,margin=margin)

  #----Mask PB檔案初始化
  sess_infer,tf_node_dict = model_restore_from_pb(mask_pb_path,nodename_dict,GPU_ratio=None)
  
  #----取出推論的節點
  pb_prediction = tf_node_dict['prediction']
  pb_input = tf_node_dict['input']
  pb_keep_prob = tf_node_dict['keep_prob']

  #----COLAB影像串流初始化
  video_stream()
  
  while True:
    js_reply = video_frame(label_html, bbox)
    if not js_reply:
      break

    #----Javascrip回應轉換成圖片格式
    img = js_to_image(js_reply["img"])
    #----建立透明的方框初始值
    bbox_array = np.zeros([480,640,4], dtype=np.uint8)

    #----人臉偵測
    coors = find_face.infer(img)
    
    if len(coors):
      for coor in coors:
        xmin,ymin,xmax,ymax = coor#臉部區域座標
        #----擷取臉部區域
        img_face = img[ymin:ymax,xmin:xmax,:].copy()
        #----調整大小至80 x 80
        img_face = cv2.resize(img_face,(80,80))
        #----將三維資料轉換成四維資料
        img_face = np.expand_dims(img_face,axis=0)
        #----將數值型態從uint8轉換成float32
        img_face = img_face.astype(np.float32)
        #----資料標準化(Normalization)
        img_face /= 255

        #----口罩偵測
        predictions = sess_infer.run(pb_prediction,
                      feed_dict={pb_input:img_face, pb_keep_prob:1})
        #----根據label轉換成類別名稱
        arg_predictions = np.argmax(predictions,axis=1)
        classname = label2classname_dict[arg_predictions[0]]
        #----根據類別名稱決定方框的顏色
        if classname == 'with_mask':#有戴口罩，方框為綠色
            color = (0, 255, 0)  # (R,G,B)
        else:#沒戴口罩，方框為紅色
            color = (255, 0, 0)  # (R,G,B)
        #----畫上方框
        cv2.rectangle(bbox_array, (xmin, ymin), (xmax, ymax), color, 2)
        #----標註上類別名稱
        # cv2.putText(影像, 文字, 座標, 字型, 大小, 顏色, 線條寬度, 線條種類)
        cv2.putText(bbox_array, classname, (xmin + 2, ymin - 2),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.8, color)
         
    #----FPS的計算
    if frame_count == 0:
        t_start = time.time()
    frame_count += 1
    if frame_count >= 4:
        t_stop = time.time()
        FPS = "FPS={}".format(round(4 / (t_stop - t_start)))
        frame_count = 0

    # cv2.putText(影像, 文字, 座標, 字型, 大小, 顏色, 線條寬度, 線條種類)
    cv2.putText(bbox_array, FPS, (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 3)

    #----bbox的處理
    bbox_array[:,:,3] = (bbox_array.max(axis = 2) > 0 ).astype(int) * 255
    bbox_bytes = bbox_to_bytes(bbox_array)
    bbox = bbox_bytes

# 函數的使用

In [10]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [11]:
mydrive_path = os.path.join('drive','MyDrive','Python','Code','Jupyter','Book','CH27')
face_pb_path = os.path.join(mydrive_path,"face_detection.pb")
mask_pb_path = os.path.join(mydrive_path,"infer_acc_0.99.pb")
margin = 40
mask_or_not(face_pb_path,mask_pb_path,margin=margin)

Device mapping:
/job:localhost/replica:0/task:0/device:GPU:0 -> device: 0, name: Tesla K80, pci bus id: 0000:00:04.0, compute capability: 3.7

Instructions for updating:
Use tf.gfile.GFile.
Device mapping:
/job:localhost/replica:0/task:0/device:GPU:0 -> device: 0, name: Tesla K80, pci bus id: 0000:00:04.0, compute capability: 3.7



<IPython.core.display.Javascript object>