<a href="https://colab.research.google.com/github/Yuki3153/diveintocode-ml/blob/master/CloudVisionAPI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install --upgrade google-cloud-vision

In [3]:
from google.cloud import vision
from google.cloud import storage
from google.colab import drive, auth
from google.oauth2 import service_account
from oauth2client.client import GoogleCredentials
from google.protobuf.json_format import MessageToJson
drive.mount('/content/drive')
auth.authenticate_user()
import json
import numpy as np
import pandas as pd
from pathlib import Path
import io
import os
import shutil
import itertools
from PIL import Image
import time
from tqdm import tqdm
import pickle
import cv2
%cd "drive/MyDrive/"


In [None]:
# credentials

key_path = os.path.join(os.path.dirname(os.path.abspath(r'credentials/xxx.json')),
                        'xxx.json')
service_account_info = json.load(open(key_path))
credentials = service_account.Credentials.from_service_account_info(service_account_info)

storage_client = storage.Client(
    credentials=credentials,
    project=credentials.project_id,
)

vision_client = vision.ImageAnnotatorClient(
    credentials=credentials
)

In [None]:
# function

def pil2cv(image):
    """pillow to cv image"""
    image = np.array(image, dtype=np.uint8)
    if image.ndim == 2:
        pass
    elif image.shape[2] == 3:
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    elif image.shape[2] == 4:
        image = cv2.cvtColor(image, cv2.COLOR_RGBA2BGRA)
    return image


def detect_faces(target):
    """Detects faces"""
    content = target.download_as_string()
    vision_image = vision.Image(content=content)
    response = vision_client.face_detection(image=vision_image)
    faces = response.face_annotations
    image = Image.open(io.BytesIO(target.download_as_string()))
    cv2_image = pil2cv(image)
    cv2_image = cv2.resize(cv2_image,(300,300))
    laplacian = cv2.Laplacian(cv2_image, cv2.CV_64F)
    laplacian = laplacian.var()
    sub_output['image_size'].append(image.size)
    sub_output['center_loc'].append((int(image.width/2), int(image.height/2)))
    image_size = sub_output['image_size'][-1][0] * sub_output['image_size'][-1][1]

    blurred_ = []
    cnt = 0

    if faces == []: 
        blurred_.append(s)
        for col in sub_cols[3:-4]:
            sub_output[col].append(s)

    else:
        for face in faces:
            cnt += 1
            blurred_.append(face.blurred_likelihood)
            if cnt > 1 :
                break
            else:
                global box
                box = [(vertex.x, vertex.y) for vertex in face.bounding_poly.vertices]
                sub_output['roll_angle'].append(face.roll_angle)
                sub_output['pan_angle'].append(face.pan_angle)
                sub_output['tilt_angle'].append(face.tilt_angle)
                sub_output['chin_loc'].append((face.landmarks[30].position.x, face.landmarks[30].position.y))
                sub_output['forehead_loc'].append((face.landmarks[29].position.x, face.landmarks[29].position.y))
                sub_output['chin_x_ratio'].append(sub_output['chin_loc'][-1][0] / sub_output['image_size'][-1][0])
                sub_output['chin_y_ratio'].append(sub_output['chin_loc'][-1][1] / sub_output['image_size'][-1][1])
                sub_output['forehead_x_ratio'].append(sub_output['forehead_loc'][-1][0] / sub_output['image_size'][-1][0])
                sub_output['forehead_y_ratio'].append(sub_output['forehead_loc'][-1][1] / sub_output['image_size'][-1][1])

            global face_image
            face_image = image.crop((box[0][0],box[0][1],box[2][0],box[2][1]))
            sub_output['face_center_loc'].append(((box[1][0]-box[0][0])/2,(box[2][1]-box[1][1])/2))
            sub_output['face_image_size'].append(face_image.size)
            face_size = sub_output['face_image_size'][-1][0] * sub_output['face_image_size'][-1][1]
            sub_output['area_ratio'].append(face_size/image_size)
            dist = np.sqrt((sub_output['face_center_loc'][-1][0] - 
                            sub_output['center_loc'][-1][0])**2 + 
                           (sub_output['face_center_loc'][-1][1] - 
                            sub_output['center_loc'][-1][1])**2)
            sub_output['dist_ratio'].append(dist/sub_output['image_size'][-1][0])
            
    return vision_image, image, face_image, blurred_, laplacian, box


def detect_labels(image):
    """Detects labels"""
    response = vision_client.label_detection(image=image)
    labels = response.label_annotations
    
    label_lst = [label.description for label in labels]
        
    return label_lst


def detect_properties(face_image):
    """detect image propeties"""
    if face_image is None:
        rgb_median = 0
        rgb_mean = 0
    else:
        img_ = np.asarray(face_image.convert("RGB")).reshape(-1,3)
        red_median = np.median(img_[:,0])
        green_median = np.median(img_[:,1])
        blue_median = np.median(img_[:,2])
        red_mean = np.mean(img_[:,0])
        green_mean = np.mean(img_[:,1])
        blue_mean = np.mean(img_[:,2])

        rgb_median = red_median + green_median + blue_median
        rgb_mean = red_mean + green_mean + blue_mean

        sub_output['red_median'].append(red_median)
        sub_output['green_median'].append(green_median)
        sub_output['blue_median'].append(blue_median)
        sub_output['red_mean'].append(red_mean)
        sub_output['green_mean'].append(green_mean)
        sub_output['blue_mean'].append(blue_mean)

        if red_median == green_median == blue_median:
            main_output['dark_white'].append('dark_white')
        else:
            main_output['dark_white'].append(s)
        
    return rgb_median, rgb_mean


In [None]:
# cloud storage
lst=[]
bucket_name = 'xxx'
bucket = storage_client.get_bucket(bucket_name)
buckets = list(storage_client.list_buckets())
blobs = bucket.list_blobs(prefix = 'xxx/')
for blob in tqdm(blobs):
    lst.append(blob.name)

# threshold
th_face_ratio_max = xx
th_face_ratio_min = xx
th_dist = xx
th_dark = xx
th_angle = xx
th_laplacian = xx

# non_val or safe
s = "_"

# detect labels
mask_labels = ["Medical equipment","Medical","Mask"]
hat_labels = ["Hat","Headgear"]
gesture_labels = ["Gesture","Finger"]
illust_labels = ["Cartoon","Illustration"]
picture_labels = ["Photograph"]

# output_path
drive_root_dir="./gdrive/My Drive/"

# main
main_output = dict()
sub_output = dict()

main_cols = ["image_name","detected_num","position","face_direction",
             "blurred","brightness","frameout","dark_white","illustration","equipment",
             "picture","error_judge"]

sub_cols = ["image_name","image_size","center_loc","face_image_size","face_center_loc",
            "dist_ratio","chin_loc","forehead_loc","chin_x_ratio","chin_y_ratio",
            "forehead_x_ratio","forehead_y_ratio","roll_angle","pan_angle","tilt_angle",
            "area_ratio","red_median","green_median","blue_median","red_mean",
            "green_mean","blue_mean","rgb_median","rgb_mean","laplacian","labels"]

for main, sub in itertools.zip_longest(main_cols, sub_cols):
    main_output.setdefault(main,[])
    sub_output.setdefault(sub,[])
del main_output[None]

for obj in tqdm(lst[1:]):
    target = bucket.get_blob(obj)
    name = os.path.basename(obj)
    main_output["image_name"].append(name)
    sub_output["image_name"].append(name)

    if target is None:
        for col in main_cols[2:]:
            if col != "error_judge":
                main_output[col].append(s)
            else:
                main_output[col].append('ERROR')
        for col in sub_cols[1:]:
            sub_output[col].append(s)
    else:
        face_image = None
        box = None
        vision_image, image, face_image, blurred_, laplacian, box = detect_faces(target)
        label_lst = detect_labels(vision_image)
        rgb_median, rgb_mean = detect_properties(face_image)

        sub_output['rgb_median'].append(rgb_median)
        sub_output['rgb_mean'].append(rgb_mean)
        sub_output['laplacian'].append(laplacian)
        sub_output['labels'].append(label_lst)
        
        if blurred_[0] == 1:
            face_ratio = int(sub_output['area_ratio'][-1]*100)
            roll_angle = int(sub_output['roll_angle'][-1]) 
            pan_angle = int(sub_output['pan_angle'][-1])
            tilt_angle = int(sub_output['tilt_angle'][-1])
            x = round(sub_output['face_center_loc'][-1][0]*10)
            y = round(sub_output['face_center_loc'][-1][1]*10)
            d = round(sub_output['dist_ratio'][-1]*100)
            main_output['illustration'].append(s)

            if len(blurred_) > 1:
                main_output['detected_num'].append(len(blurred_))
            else:
                main_output['detected_num'].append(1)

            if laplacian <= th_laplacian:
                main_output['blurred'].append('pint error')
            else:
                main_output['blurred'].append(s)

            if pan_angle <= - th_angle or th_angle <= pan_angle:
                main_output['face_direction'].append('profile')
            elif roll_angle <= - th_angle or th_angle <= roll_angle:
                main_output['face_direction'].append('roll')
            elif tilt_angle <= - th_angle or th_angle <= tilt_angle:
                main_output['face_direction'].append('vertical')
            else:
                main_output['face_direction'].append(s)

            if d >= th_dist or face_ratio < th_face_ratio_min:
                main_output['position'].append('position error')
            else:
                main_output['position'].append(s)

            X = [[sub_output["dist_ratio"][-1],sub_output["chin_x_ratio"][-1],
                 sub_output["chin_y_ratio"][-1],sub_output["forehead_x_ratio"][-1],
                 sub_output["forehead_y_ratio"][-1],sub_output["area_ratio"][-1]]]
            
            pred = model.predict(X)
            pred = np.where(pred > 0.5, 1, 0)

            if face_ratio >= th_face_ratio_max:
                main_output['frameout'].append('frame out')
            elif pred == 1:
                main_output['frameout'].append('frame out')
            else:
                main_output['frameout'].append(s)

            if rgb_median <= th_dark:
                main_output['brightness'].append('dark_picture')
            else:
                main_output['brightness'].append(s)
            
            if any(l in label_lst for l in mask_labels):
                main_output['equipment'].append('Mask')
            elif any(l in label_lst for l in hat_labels):
                main_output['equipment'].append('Hat')
            elif any(l in label_lst for l in gesture_labels):
                main_output['equipment'].append('Gesture')
            else:
                main_output['equipment'].append(s)
            
            if any(l in label_lst for l in picture_labels):
                main_output['picture'].append('picture')
            else:
                main_output['picture'].append(s)

            agg_lst = [main_output[col][-1] for col in main_cols[3:-1]]
            if main_output['detected_num'][-1] ==1 and len(list(set(agg_lst)))==1:
                main_output['error_judge'].append(s)
            else:
                main_output['error_judge'].append('ERROR')

            if main_output['error_judge'][-1] == 'ERROR':
                image.save(drive_root_dir +str(name),'PNG')

        else:
            for col in main_cols[2:8]:
                main_output[col].append(s)
            main_output['equipment'].append(s)
            main_output['picture'].append(s)
            main_output['error_judge'].append('ERROR')
            image.save(drive_root_dir + str(name),'')

            if 'Text' in label_lst and 'Font' in label_lst and 'Line' in label_lst:
                main_output['meishi'].append('Meishi')
            else:
                main_output['meishi'].append('_')

            if any(l in label_lst for l in illust_labels):
                main_output['illustration'].append('illustration picture')
            else:
                main_output['illustration'].append(s)

# output
main_df = pd.DataFrame(main_output.values(), index=main_output.keys()).T
sub_df = pd.DataFrame(sub_output.values(), index=sub_output.keys()).T
print(main_df.shape)
print(sub_df.shape)

main_df.to_excel(drive_root_dir+'main_output.xlsx',encoding="utf-8")
sub_df.to_excel(drive_root_dir+'sub_output.xlsx',encoding="utf-8")

100%|██████████| 1000/1000 [14:43<00:00,  1.13it/s]
