In [None]:
# Importação das Bibliotecas
import os
import cv2
import jwt
import time
import base64
import requests
import numpy as np
from PIL import Image
from random import randint
import vc_img_process as vimg
from matplotlib import pyplot as plt
from datetime import datetime, timedelta

CHAVE_AUTK_DES = os.getenv('VCDOC_CHAVE_SERVICO_AUTENTIKUS_DESENVOLVIMENTO').encode()
CHAVE_AUTK_PRO = os.getenv('VCDOC_CHAVE_SERVICO_AUTENTIKUS_PRODUCAO').encode()

In [None]:
# Variáveis globais para definir detalhes da execução, como ambiente, comportamento, ...

# SE FOR NECESSÁRIO, MUDE AQUI O AMBIENTE
AMBIENTE = 'D' # D = Desenvolvimento // H = Homologação // P = Produção // L = Local
URL_API = "https://des-vcdocgpu.app.spesp0003.estaleiro.serpro.gov.br" if AMBIENTE == 'D' else \
          "https://hom-vcdocgpu.app.spesp0003.estaleiro.serpro.gov.br" if AMBIENTE == 'H' else \
          "https://vcdocgpu.app.spesp0003.estaleiro.serpro.gov.br"     if AMBIENTE == 'P' else \
          "http://localhost:8081"

TENTE_ROTACAO = False
USE_EASYOCR = True
#CROP_HEIGHT = 80

HEADER_DIC = {}
HEADER_EXPIRATION = None

if AMBIENTE in "HD":
    # Chave de serviço do ValAutentikus
    chave_servico = CHAVE_AUTK_DES
    URL_SERVICO = "https://valautentikus.estaleiro.serpro.gov.br/autentikus-authn/api/v1/token"
else:
    # Chave de serviço do Autentikus (Produção)
    chave_servico = CHAVE_AUTK_PRO
    URL_SERVICO = "https://autentikus.estaleiro.serpro.gov.br/autentikus-authn/api/v1/token"

AUTH_BASIC = f"{jwt.utils.base64url_encode(chave_servico).decode()}="

In [None]:
# Funções utilizadas nesse notebook
def cria_header_autentikus():
    global HEADER_DIC
    global HEADER_EXPIRATION
    global AMBIENTE
    global AUTH_BASIC
    global URL_SERVICO

    data_agora = datetime.now()
    if (HEADER_EXPIRATION is not None) and ((HEADER_EXPIRATION - data_agora).total_seconds() > 3):
        nseconds = int((HEADER_EXPIRATION - data_agora).total_seconds())
        #print(f"Segundos para a expiração: {nseconds}")
        return HEADER_DIC

    if HEADER_EXPIRATION is not None:
        nseconds = int((HEADER_EXPIRATION - data_agora).total_seconds()) + 1
        if nseconds > 0:
            #print(f"Tive que fazer um sleep de {nseconds} segundos.")
            time.sleep(nseconds)

    print(f"Autorização Basic = {AUTH_BASIC}")

    while True:
        header_autk = {
            'content-type': 'application/x-www-form-urlencoded', 
            'Authorization': f'Basic {AUTH_BASIC}'
            }
        data_autk = 'grant_type=client_credentials&scope=escopo_vcdoc'
        #print("Iniciando o request do token")
        resp_token_antes = requests.post(URL_SERVICO, data=data_autk, headers=header_autk)
        if resp_token_antes.status_code != 200:
            print(f"Erro de conexão Autentikus: {resp_token_antes.content.decode()}")
            exit(-500)
            #HEADER_DIC = {}
            #HEADER_EXPIRATION = 3600
            #return HEADER_DIC

        resp_token = resp_token_antes.json()
        #print(f"RespToken = {resp_token}")
        if 'error' in resp_token:
            raise Exception("Erro na obtenção do token de autenticação")

        exp_time = resp_token['expires_in']
        if exp_time > 2:
            break
        time.sleep(exp_time)

    bearer = resp_token['access_token']

    HEADER_EXPIRATION = datetime.now() + timedelta(seconds=exp_time)

    HEADER_DIC = {'Authorization': 'Bearer ' + bearer, 'typ': 'JWT', 'alg': 'RS512'}
    return HEADER_DIC

def ShowImg(img):
    img2 = img
    if type(img) is np.ndarray:
        img2 = cv2.cvtColor(img,cv2.COLOR_BGR2RGB) 
    plt.imshow(img2)
    plt.show()

def LoadAndShow(filename):
    img = Image.open(filename)
    print(f"Carregando {filename}")
    ShowImg(img)
    img.close()
    img = cv2.imread(filename)
    return img

def imageFileToRequest(imagefilename):
    with open(imagefilename, "rb") as inputfile:
        data = inputfile.read()
    post_data = { "image": base64.b64encode(data).decode("utf-8") }
    return post_data

def send_vcdoc_request(arquivoImagem):
    global URL_API
    global TENTE_ROTACAO
    global USE_EASYOCR
    if AMBIENTE == 'L':
        header_dic = {}
    else:
        header_dic = cria_header_autentikus()

    header_dic["requester"]   = 'teste_crops'
    header_dic["vcdoc_param_avaliar_rotacao"] = '1' if TENTE_ROTACAO else '0'
    header_dic["vcdoc_param_ocr_solicitado"]  = '2' if USE_EASYOCR else '1'
    #header_dic["vcdoc_param_crop_height"]     = '48' if CROP_HEIGHT < 24 else str(CROP_HEIGHT)

    req_data = imageFileToRequest( arquivoImagem )
    resultJson = requests.post(URL_API + "/v1/detect-ocr", json=req_data, headers=header_dic).json()
    return resultJson

In [None]:
# Início do processamento propriamente dito. 
# Aqui vamos obter os arquivos de imagem da pasta raiz de imagens

userPath = os.path.expanduser("~")
PASTA_DESENV =  os.path.join(userPath,'desenv')
if not os.path.exists(PASTA_DESENV):
    PASTA_DESENV = os.path.join(userPath,'Documents','desenv')

# Quero pegar uma pasta que está no disco externo. Depois, basta comentar a linha abaixo
PASTA_DESENV = '/media/s002481737/Aux500GB/Serpro/VPN_2022/home_am/desenv'

#pasta_raiz_imagens = os.path.join(PASTA_DESENV,'python','vcdoc','CNH_Teste','goods')
#pasta_raiz_imagens = os.path.join(PASTA_DESENV,'python','vcdoc','CNH_Base')
pasta_raiz_imagens = os.path.join(PASTA_DESENV,'python','vcdoc','train_imgs')
listaExt = ['jpg','jpeg','png','webp']
arquivos_possiveis = []
print(f'Pasta Raiz = {pasta_raiz_imagens}')
for root, _, files in os.walk(pasta_raiz_imagens):
    if pasta_raiz_imagens != root: continue
    filessrt = sorted(files)
    for fi in filessrt:
        ex = os.path.splitext(fi)[1][1:].lower()
        if ex in listaExt:
            arquivos_possiveis += [fi]
print(f"Arquivos de imagens encontrados: {len(arquivos_possiveis)}")

In [None]:
# Aqui vamos escolher randomicamente um dos arquivos da lista de imagens obtidas 
# na célula anterior e enviar um request para a API do VCDOC

n = randint(0,len(arquivos_possiveis)-1)
imgdocfile = arquivos_possiveis[n]
#imgdocfile = 'thamiris.jpg'
#imgdocfile = 'douglas.jpg'
#imgdocfile = 'cnh_rodrigo3.png'
#imgdocfile = 'cnh-036.jpg'
print(f"Arquivo {n+1} de {len(arquivos_possiveis)} = {imgdocfile}")
arquivoImagem = os.path.join(pasta_raiz_imagens, imgdocfile)
img = LoadAndShow(arquivoImagem)

resultJson = send_vcdoc_request(arquivoImagem)
resultList = resultJson['resultlist']
angulo = resultJson['angle']

imgRot = img
if angulo != 0:
    print(f"ângulo de rotação aplicado à imagem: {angulo}")
    imgRot = vimg.rotateImage(img,angulo)

for element in resultList:
    print(f"Element {element['class_index']}: {element['obj_name']}")
    print(element['adjusted_ocr'])
    bbox = element['bounding_box']
    (xmin,ymin,widt,heig) = (bbox['x_min'],bbox['y_min'],bbox['width'],bbox['height'])
    crop = vimg.crop_image(imgRot,xmin,ymin,xmin+widt,ymin+heig)
    ShowImg(crop)

In [None]:
# Só precisa rodar essa 
for imgdocfile in arquivos_possiveis:
    arquivoImagem = os.path.join(pasta_raiz_imagens, imgdocfile)
    n,e = os.path.splitext(arquivoImagem)
    arquivoGroundTruth = n + '.json'
    if os.path.exists(arquivoGroundTruth): continue
    resultJson = send_vcdoc_request(arquivoImagem)
    resultList = resultJson['resultlist']

    nRG = nCNH = 0
    for element in resultList:
        nome = element['obj_name']
        if   "_RG" in nome:
            nRG += 1
        elif nome.startswith("RG"):
            nRG += 1
        elif "_CNH" in nome:
            nCNH += 1
        elif nome.startswith("CNH"):
            nCNH += 1
    tipoDoc = 'CNH' if nCNH > nRG else 'RG'

    jsonStr = '{' + f'\n  "tipo_doc" : "{tipoDoc}",'
    for element in resultList:
        nome = element['obj_name']
        valor = element['adjusted_ocr']
        jsonStr += f'\n  "{nome}" : "{valor.upper()}",'
    jsonStr = jsonStr[:-1] + '\n}'

    gt = open(arquivoGroundTruth,'w')
    gt.write(jsonStr)
    gt.close()
