In [1]:
# Constants
PROJECT_ID = "tc-sc-bi-bigdata-corp-tsod-dev" 
BUCKET = "gs://enrollment_verification_pipeline"
REGION = "us-central1"
PIPELINE_ROOT = f"{BUCKET}/pipeline_root/"

In [2]:
#!pip install kfp
#!pip install google-cloud-aiplatform
#!pip install google-cloud
#!pip install google-cloud-pipeline-components

In [3]:
# Load necessary libraries
from typing import NamedTuple, List, Dict, Text
import kfp
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, ClassificationMetrics, Metrics, component)
from kfp.v2.google.client import AIPlatformClient
from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from datetime import datetime
import pandas as pd


# Get Data to pred
Descargamos el mantenedor, obtenemos la sheet del formulario y de revisión. Obtenemos un df con las filas del formulario que no esten revisadas.

In [4]:
# Primero un componente solo para obtener las id, esto porque vertex ai tiraba
# error cuando queria retornar string y tambien datasets

# Se puede mejorar haciendo que el componente get_data reciba estos ids y asi 
# no se replica tarea
@component(
    base_image="python:3.9",
    packages_to_install=[
        "numpy",
        "pandas",
        "google-cloud",
        "google-cloud-secret-manager",
        "google-api-python-client",
        "gcsfs",
        "gspread",
        "oauth2client",
    ],
    output_component_file="get_ids.yaml"
)
def get_ids(
    project_id: str,
    id_mantenedor: str,
    drive_secret_id: str,
)-> NamedTuple("outputs", [("id_revision", str), ("id_formulario", str)]):
    import io
    from googleapiclient.errors import HttpError
    from googleapiclient.http import MediaIoBaseDownload
    import pandas as pd
    import gspread
    from google.oauth2.credentials import Credentials
    from googleapiclient.discovery import build
    from oauth2client.service_account import ServiceAccountCredentials
    
    
    def save_secret_token(project_id, secret_id, version_id, token_file):
        """
        Access the payload for the given secret version if one exists. The version
        can be a version number as a string (e.g. "5") or an alias (e.g. "latest").
        """

        # Import the Secret Manager client library.
        import google_crc32c
        from google.cloud import secretmanager

        # Create the Secret Manager client.
        client = secretmanager.SecretManagerServiceClient()

        # Build the resource name of the secret version.
        name = f"projects/{project_id}/secrets/{secret_id}/versions/{version_id}"

        # Access the secret version.
        response = client.access_secret_version(request={"name": name})

        # Verify payload checksum.
        crc32c = google_crc32c.Checksum()
        crc32c.update(response.payload.data)
        if response.payload.data_crc32c != int(crc32c.hexdigest(), 16):
            print("Data corruption detected.")
            return response

        # Print the secret payload.
        #
        # WARNING: Do not print the secret in a production environment - this
        # snippet is showing how to access the secret material.
        payload = response.payload.data.decode("UTF-8")
        
        
        import json
        dictt = json.loads(payload)
        # save token
       
        with open(token_file, 'w', encoding='utf-8') as f:
            json.dump(dictt, f)
        return True
    
   # Get the outdated worksheet as a pandas DataFrame
    def get_sheet(spreadsheet_id, creds):
        client = gspread.authorize(creds)
        spreadsheet = client.open_by_key(spreadsheet_id)
        worksheet = spreadsheet.get_worksheet(0)
        df = pd.DataFrame(worksheet.get_all_records())
        return df
    
    def get_id(id_mantenedor, creds):
        #importar mantenedor
        id_mantenedor = id_mantenedor
        mantenedor_df = get_sheet(id_mantenedor, creds)

        #obtener los id de las sheet
        id_formulario = mantenedor_df.iloc[0,1].split('/')[5]
        id_revision = mantenedor_df.iloc[1,1].split('/')[5]

     
        return id_revision, id_formulario
    
    

    SCOPES = ['https://www.googleapis.com/auth/drive']
    TOKEN_FILE = 'token.json'
    save_secret_token(project_id, drive_secret_id, "latest", TOKEN_FILE)
    creds = creds = ServiceAccountCredentials.from_json_keyfile_name(TOKEN_FILE, SCOPES)

    id_revision, id_formulario = get_id(id_mantenedor, creds)
    

    return (id_revision, id_formulario)
    

In [5]:
@component(
    base_image="python:3.9",
    packages_to_install=[
        "numpy",
        "pandas",
        "google-cloud",
        "google-cloud-secret-manager",
        "google-cloud-storage",
        "google-api-python-client",
        "tqdm",
        "gcsfs",
        "gspread",
        "oauth2client",
    ],
    output_component_file="pred_get_data.yaml"
)
def get_data(
    project_id: str,
    id_mantenedor: str,
    drive_secret_id: str,
    docs_df: Output[Dataset],
)-> List:
    
    import io
    from googleapiclient.errors import HttpError
    from googleapiclient.http import MediaIoBaseDownload
    import pandas as pd
    import gspread
    from google.oauth2.credentials import Credentials
    from googleapiclient.discovery import build
    from oauth2client.service_account import ServiceAccountCredentials
    
    
    def save_secret_token(project_id, secret_id, version_id, token_file):
        """
        Access the payload for the given secret version if one exists. The version
        can be a version number as a string (e.g. "5") or an alias (e.g. "latest").
        """

        # Import the Secret Manager client library.
        import google_crc32c
        from google.cloud import secretmanager

        # Create the Secret Manager client.
        client = secretmanager.SecretManagerServiceClient()

        # Build the resource name of the secret version.
        name = f"projects/{project_id}/secrets/{secret_id}/versions/{version_id}"

        # Access the secret version.
        response = client.access_secret_version(request={"name": name})

        # Verify payload checksum.
        crc32c = google_crc32c.Checksum()
        crc32c.update(response.payload.data)
        if response.payload.data_crc32c != int(crc32c.hexdigest(), 16):
            print("Data corruption detected.")
            return response

        # Print the secret payload.
        #
        # WARNING: Do not print the secret in a production environment - this
        # snippet is showing how to access the secret material.
        payload = response.payload.data.decode("UTF-8")
        
        
        import json
        dictt = json.loads(payload)
        # save token
       
        with open(token_file, 'w', encoding='utf-8') as f:
            json.dump(dictt, f)
        return True
    
   # Get the outdated worksheet as a pandas DataFrame
    def get_sheet(spreadsheet_id, creds):
        client = gspread.authorize(creds)
        spreadsheet = client.open_by_key(spreadsheet_id)
        worksheet = spreadsheet.get_worksheet(0)
        df = pd.DataFrame(worksheet.get_all_records())
        return df
    
    def get_docs_to_pred(id_mantenedor, creds):
        #importar mantenedor
        id_mantenedor = id_mantenedor
        mantenedor_df = get_sheet(id_mantenedor, creds)

        #obtener los id de las sheet
        id_formulario = mantenedor_df.iloc[0,1].split('/')[5]
        id_revision = mantenedor_df.iloc[1,1].split('/')[5]

        #descargar
        revision_df = get_sheet(id_revision, creds)
        formulario_df = get_sheet(id_formulario, creds)

        #obtenemos las filas 
        if revision_df.empty:
            docs_df = formulario_df.copy()
        if not revision_df.empty:
            docs_df = formulario_df[~formulario_df['Cédula de Identidad'].isin(revision_df['Cédula de Identidad'])]
            docs_df.reset_index(inplace = True, drop = True)
        return id_revision, docs_df
    
    

    SCOPES = ['https://www.googleapis.com/auth/drive']
    TOKEN_FILE = 'token.json'
    save_secret_token(project_id, drive_secret_id, "latest", TOKEN_FILE)
    creds = creds = ServiceAccountCredentials.from_json_keyfile_name(TOKEN_FILE, SCOPES)

    id_revision, doc_df = get_docs_to_pred(id_mantenedor, creds)
    

    # Upload Artifacts
    doc_df.to_csv(docs_df.path + ".csv", index = False)
    #print(id_revision)

    
    

# Get Ground Truth

In [6]:
@component(
    base_image="python:3.9",
    packages_to_install=[
        "numpy",
        "pandas",
    ],
    output_component_file="get_ground_truth.yaml"
)
def get_ground_truth(
    docs_df: Input[Dataset],
    #docs_gt: OutputPath()
    ground_truth: OutputPath()
    )-> None:
    import pandas as pd
    # get data artifacts
    df = pd.read_csv(docs_df.path+".csv")
    docs_gt = []
    for row in df.itertuples():
        docs_gt.append({'index': row[0],
                           'cedula': {
                               'id' : row[8].split('=')[1], #the drive id of the cedula document
                               'rut': row[6]  #the rut number of the driver
                                     }, 
                           'licencia': {
                               'id' : row[9].split('=')[1], #the drive id of the licencia document
                               'rut': row[6]  #the rut number of the driver
                                       }
                           }
                               )
    import pickle
    with open(f'{ground_truth}.pkl', 'wb') as f:
        pickle.dump(docs_gt, f)
    #return docs_gt

# Get images
PDFs files from drive to cropped images in GCS

In [16]:
@component(
    base_image="gcr.io/deeplearning-platform-release/pytorch-gpu",
    packages_to_install=[
        "numpy",
        "pandas",
        "pdf2image",
        "opencv-python",
        "google-api-python-client",
        "google-cloud-secret-manager",
        "google-cloud",
        "oauth2client",
        "google-cloud-storage",
        "tqdm",
        "Pillow"
    ],
    output_component_file="get_images.yaml"
)
def get_images(
    bucket:str,
    project_id:str,
    drive_secret_id: str,
    document:str, 
    docs_gt: InputPath(),
    docs_ds: Input[Dataset],
    docs_gt_artifact: OutputPath()
    ):
    
    ############################
    # Librerias
    ############################
    
    from google.cloud import storage
    from pdf2image import convert_from_bytes 
    import numpy as np
    import cv2
    from googleapiclient.errors import HttpError
    from googleapiclient.http import MediaIoBaseDownload
    from googleapiclient.discovery import build
    import io
    from oauth2client.service_account import ServiceAccountCredentials
    import os
    import pandas as pd
    from google.cloud import storage
    from tempfile import TemporaryFile
    from PIL import Image
    from tqdm import tqdm
    import pickle
    
    # install with subprocess
    os.system('apt-get update')
    os.system('sudo apt-get install poppler-utils -y')
    #import subprocess
    #subprocess.run(["sudo apt-get install python-poppler"], shell = True)
    
    ############################
    # Helper Functions
    ############################
    
    def save_secret_token(project_id, secret_id, version_id, token_file):
        """
        Access the payload for the given secret version if one exists. The version
        can be a version number as a string (e.g. "5") or an alias (e.g. "latest").
        """

        # Import the Secret Manager client library.
        import google_crc32c
        from google.cloud import secretmanager

        # Create the Secret Manager client.
        client = secretmanager.SecretManagerServiceClient()

        # Build the resource name of the secret version.
        name = f"projects/{project_id}/secrets/{secret_id}/versions/{version_id}"

        # Access the secret version.
        response = client.access_secret_version(request={"name": name})

        # Verify payload checksum.
        crc32c = google_crc32c.Checksum()
        crc32c.update(response.payload.data)
        if response.payload.data_crc32c != int(crc32c.hexdigest(), 16):
            print("Data corruption detected.")
            return response

        # Print the secret payload.
        #
        # WARNING: Do not print the secret in a production environment - this
        # snippet is showing how to access the secret material.
        payload = response.payload.data.decode("UTF-8")
        
        
        import json
        dictt = json.loads(payload)
        # save token
       
        with open(token_file, 'w', encoding='utf-8') as f:
            json.dump(dictt, f)
        return True
    
    def download_file(real_file_id, creds):
        """Downloads a file
        Args:
            real_file_id: ID of the file to download
        Returns : IO object with location.

        Load pre-authorized user credentials from the environment.
        """

        try:
            # create drive api clien
            file_id = real_file_id

            # pylint: disable=maybe-no-member
            service = build('drive', 'v3', credentials=creds)
            request = service.files().get_media(fileId=file_id)
            file = io.BytesIO()
            downloader = MediaIoBaseDownload(file, request)
            done = False
            while done is False:
                status, done = downloader.next_chunk()
                print(F'Download {int(status.progress() * 100)}.')

        except HttpError as error:
            print(F'An error occurred: {error}')
            file = None
        return file.getvalue()
    
    
    ############################
    # download necessary scripts
    ############################
    
    def download_blob(bucket_name, source_blob_name, destination_file_name, project_id):
        """Downloads a blob from the bucket."""
        # The ID of your GCS bucket
        # bucket_name = "your-bucket-name"

        # The ID of your GCS object
        # source_blob_name = "storage-object-name"

        # The path to which the file should be downloaded
        # destination_file_name = "local/path/to/file"

        storage_client = storage.Client(project= project_id)

        bucket = storage_client.bucket(bucket_name)

        # Construct a client side representation of a blob.
        # Note `Bucket.blob` differs from `Bucket.get_blob` as it doesn't retrieve
        # any content from Google Cloud Storage. As we don't need additional data,
        # using `Bucket.blob` is preferred here.
        blob = bucket.blob(source_blob_name)
        blob.download_to_filename(destination_file_name)

        print(
            "Downloaded storage object {} from bucket {} to local file {}.".format(
                source_blob_name, bucket_name, destination_file_name
            )
        )
    
    os.mkdir("utils")
    download_blob(bucket, "utils/autoanchor.py", "utils/autoanchor.py", project_id)
    download_blob(bucket, "utils/crop.py", "utils/crop.py", project_id)
    download_blob(bucket, "utils/datasets.py", "utils/datasets.py", project_id)
    download_blob(bucket, "utils/general.py", "utils/general.py", project_id)
    download_blob(bucket, "utils/google_utils.py", "utils/google_utils.py", project_id)
    download_blob(bucket, "utils/loss.py", "utils/loss.py", project_id)
    download_blob(bucket, "utils/metrics.py", "utils/metrics.py", project_id)
    download_blob(bucket, "utils/plots.py", "utils/plots.py", project_id)
    download_blob(bucket, "utils/torch_utils.py", "utils/torch_utils.py", project_id)
    
    os.mkdir("models")
    download_blob(bucket, "models/common.py", "models/common.py", project_id)
    download_blob(bucket, "models/experimental.py", "models/experimental.py", project_id)
    download_blob(bucket, "models/yolo.py", "models/yolo.py", project_id)
    download_blob(bucket, "models/best.pt", "models/best.pt", project_id)
    
    from utils.crop import crop
    ############################
    # Logic
    ############################
    
    # Read Artifact
    docs_df = pd.read_csv(docs_ds.path + ".csv")
    
    with open(docs_gt + ".pkl", 'rb') as f:
        docs_gt = pickle.load(f)
    
    #En primer lugar nos aseguramos que la carpeta de destino exista y esté vacía
    dir = 'crop_pipeline'
    if not os.path.exists(dir):
        os.makedirs(dir)
    
    #Nos creamos una lista de los nombres de los archivos de interés (files)
    #y un dataframe en el que iremos guardando la ID de los documentos encontrados
    #y la confianza con la que fue hallado cada uno.
    if document == 'licencia':
        col_name = 'Licencia de Conducir'
        classes = 1
    elif document == 'cedula':
        col_name = 'Cédula de Identidad'
        classes = 0
        
    docs = docs_df[col_name][:]
    
    # Credentials
    SCOPES = ['https://www.googleapis.com/auth/drive']
    TOKEN_FILE = 'token.json'
    save_secret_token(project_id, drive_secret_id, "latest", TOKEN_FILE)
    creds = ServiceAccountCredentials.from_json_keyfile_name(TOKEN_FILE, SCOPES)
    
    #Luego recorremos el total de los archivos de interés (los .pdf obtenidos de excel)
    for file in docs:
        doc_id = file.split('=')[1]
        try:
            pdf = download_file(doc_id, creds) #descargamos el pdf
            pdf_images = convert_from_bytes(pdf, fmt="jpeg", size=(960,1280)) #lo pasamos a jpeg
        except:
            # Si falla, no se pudo leer la imagen, la reemplazamos por una imagen en blanco
            white_img = np.full((960,1280), 255, dtype = np.float32)
            pdf_images = [white_img]
        #Guardamos todas las páginas de todos los pdf con distintos nombres
        for i in range(len(pdf_images)):
            cv2.imwrite(('crop_pipeline/'+doc_id + str(i) + '.jpeg'),cv2.cvtColor(np.array(pdf_images[i]), cv2.COLOR_RGB2BGR))
    
    #Una vez ya tenemos todas las páginas, intentamos encontrar el documento que nos interesa en cada una de ellas.

    df_docs = pd.DataFrame(index=['ID_IMG' , 'ID', 'IMG', 'found', 'confidence']).T
    
    try:
        c = crop(weights="models/best.pt", source = 'crop_pipeline/', save_img = False, save_path= 'crop', classes = classes, df = df_docs)
    except:
        c = [[]]
    
    df_docs =  df_docs.sort_values('confidence', ascending=False).drop_duplicates(['ID'], keep ='first').sort_index().reset_index(drop=True)
    #Cambiamos los found de 1 o 0 a True y False
    for i in df_docs.index:
        if df_docs.loc[i,'found'] == 1:
            df_docs.loc[i,'found'] = True
        elif df_docs.loc[i,'found'] == 0:
            df_docs.loc[i,'found'] = False
            
    #Si los dataframe cuentan con información acerca de un recorte de un documento
    #en particular, entonces se marca en 'found'        
    for i in range(len(docs_gt)): 
        doc_id = docs_gt[i][document]['id']
        found_doc = df_docs.loc[df_docs['ID']==doc_id]['found'].item()
        docs_gt[i][document]['found'] = found_doc       

    # limpiamos los archivos de la carpeta
    for f in os.listdir(dir):
        if f != '.ipynb_checkpoints':
            os.remove('crop_pipeline/' +f)
            
    # Save images to GCS 
    project_bucket = "enrollment_verification"
    client = storage.Client(project= project_id)
    buck = client.get_bucket(project_bucket)
    
    
    for i in tqdm(df_docs.index):
        nombre_img = df_docs.loc[i,'ID']+'.jpeg'
        path = f'inference_{document}/'+ nombre_img
        df_docs.loc[i,'ID_IMG'] = nombre_img
        blob = buck.blob(path)
        if df_docs.loc[i,'found']:
            img = np.array(df_docs.loc[i,'IMG'][0])
            pil_img = Image.fromarray(img)
            b = io.BytesIO() 
            pil_img.save(b, 'jpeg') 
            pil_img.close()
            blob.upload_from_string(b.getvalue(), content_type='image/jpeg')
    
    # Save artifacts
    import pickle
    with open(docs_gt_artifact + ".pkl", 'wb') as f:
        pickle.dump(docs_gt, f)
    
    #return docs_gt



# Donut

In [17]:
@component(
    base_image="gcr.io/deeplearning-platform-release/pytorch-gpu",
    packages_to_install=[
        "numpy",
        "pandas",
        "transformers",
        "opencv-python",
        "google-api-python-client",
        "google-cloud-secret-manager",
        "google-cloud",
        "oauth2client",
        "google-cloud-storage",
        "tqdm",
        "Pillow",
        "datasets"
    ],
    output_component_file="inference.yaml"
)
def inference(
    project_id: str,
    bucket:str,
    document:str,
    model:str,
    docs_gt_artifact: InputPath(),
    docs_pred_artifact: OutputPath()
             ):
    
    ###################
    # Librerias
    ###################
    
    import copy
    import os
    from google.cloud import storage
    import pickle
    
    ###################
    # Helper Functions
    ###################
    
    def download_blob(bucket_name, source_blob_name, destination_file_name, project_id):
        """Downloads a blob from the bucket."""
        # The ID of your GCS bucket
        # bucket_name = "your-bucket-name"

        # The ID of your GCS object
        # source_blob_name = "storage-object-name"

        # The path to which the file should be downloaded
        # destination_file_name = "local/path/to/file"

        storage_client = storage.Client(project= project_id)

        bucket = storage_client.bucket(bucket_name)

        # Construct a client side representation of a blob.
        # Note `Bucket.blob` differs from `Bucket.get_blob` as it doesn't retrieve
        # any content from Google Cloud Storage. As we don't need additional data,
        # using `Bucket.blob` is preferred here.
        blob = bucket.blob(source_blob_name)
        blob.download_to_filename(destination_file_name)

        print(
            "Downloaded storage object {} from bucket {} to local file {}.".format(
                source_blob_name, bucket_name, destination_file_name
            )
        )
    
    os.mkdir("models")
    download_blob(bucket, "models/donut.py", "models/donut.py", project_id)
    
    from models.donut import donut_inference
    
    
    
    def get_secret_token(project_id, secret_id, version_id):
        """
        Access the payload for the given secret version if one exists. The version
        can be a version number as a string (e.g. "5") or an alias (e.g. "latest").
        """

        # Import the Secret Manager client library.
        import google_crc32c
        from google.cloud import secretmanager

        # Create the Secret Manager client.
        client = secretmanager.SecretManagerServiceClient()

        # Build the resource name of the secret version.
        name = f"projects/{project_id}/secrets/{secret_id}/versions/{version_id}"

        # Access the secret version.
        response = client.access_secret_version(request={"name": name})

        # Verify payload checksum.
        crc32c = google_crc32c.Checksum()
        crc32c.update(response.payload.data)
        if response.payload.data_crc32c != int(crc32c.hexdigest(), 16):
            print("Data corruption detected.")
            return response

        # Print the secret payload.
        #
        # WARNING: Do not print the secret in a production environment - this
        # snippet is showing how to access the secret material.
        payload = response.payload.data.decode("UTF-8")
        
        
        return payload
    
    
    ####################
    # Logic
    ####################
    
     # Load Artifact
    with open(docs_gt_artifact + ".pkl", 'rb') as f:
        docs_gt = pickle.load(f)
    
    hf_token = get_secret_token(project_id, "HF_TOKEN", "latest")
    
    docs_pred = copy.deepcopy(docs_gt)
    licencia_output = []
    cedula_output = []
    
    project_bucket = "enrollment_verification"
    
    if document == 'licencia':
        #model = "AA-supply/donut-finetuned-lic-crop"
        path = f'inference_{document}'
    elif document == 'cedula':
        #model = "AA-supply/donut-finetuned-cedula-crop"
        path = f'inference_{document}'
        

    docs_pred = donut_inference(docs_pred, project_id, hf_token, document, model, project_bucket,path, licencia_output)
    #docs_pred = donut_inference(docs_pred, 'cedula', "AA-supply/donut-finetuned-cedula-crop", 'crop_pipeline_cedula', cedula_output)
    
    # Save artifacts
    import pickle
    with open(docs_pred_artifact + ".pkl", 'wb') as f:
        pickle.dump(docs_pred, f)
    
    #return docs_pred
    

# Postprocess

In [18]:
@component(
    base_image="python:3.9",
    packages_to_install=[
        "numpy",
        "pandas",
        "tqdm"
    ],
    output_component_file="postprocess.yaml"
)
def postprocess(
               document: str,
               docs_gt: InputPath(),
               docs_pred: InputPath(),
               docs_gt_artifact: OutputPath(),
               docs_pred_artifact: OutputPath(),
               ):
    
    from datetime import date
    from datetime import datetime   
    import copy
    import re 
    import pickle
    
    # Load Artifact
    with open(docs_pred + ".pkl", 'rb') as f:
        docs_pred = pickle.load(f)
        
    with open(docs_gt + ".pkl", 'rb') as f:
        docs_gt = pickle.load(f)
    
    #Esta función se asegura de que el rut  están en el formato correcto
    def preproc(gt):
        ground_truth = copy.deepcopy(gt)
        r= ground_truth['rut']
        r = str(r).upper()
        r = r.strip()    
        if r == '':
            return ground_truth
        else:
            r = re.sub(r'[^0-9K]+', '', r)
            ground_truth['rut_proc'] = r 
            return ground_truth

    #Esta función se asegura de que el rut y la fecha de vencimiento
    #de una predicción estén en el formato correcto
    def postproc(pred):
        dic = copy.deepcopy(pred)

        r = dic['rut']
        r = str(r).upper()
        v = dic['venc'].upper()

        r = r.strip() #elimina los espacios al princio y al final del string
        if r == '':
            r = 'Not Found'
        elif r == 'NOT FOUND':
            r = 'Not Found'
        else:
            r = re.sub(r'[^0-9K]+', '', r)  #reemplaza todos los caracteres que no sean 0-9 o K por ''

        #Por otro lado la fecha de vencimiento será estandarizada, chequearemos que los primeros
        #dos dígitos sean números (el día) y los últimos 4 sean dígitos (el año).
        #Entre estos dos está el mes, el cual puede estar en formato numérico o en letras, si
        #está en letras, lo pasaremos a número.
        #Finalmente, nos aseguremos que estos 4 dígitos sean interpretables en formato fecha,
        #si alguno de ellos no está, se entregará una fecha antigua, la cual evitará que sea aprobada.
        v = v.strip()
        if v == '':
            fecha = 'Not Found'
        else:
            v = re.sub("\s\s+" , " ", v) #reemplazamos los espacios repetidos por un solo espacio " "
            v = v.replace('/', ' ')
            v = re.sub(r'[^A-Za-z0-9 ]+', '', v)  #reemplaza todos los caracteres que no sean alfanumericos o espacio por ''

            fecha = re.findall(r'\b(\d{2})\s(\d{2})\s(\d{4})\b', v)
            if not fecha:
                fecha = re.findall(r'\b(\d{2})\s(ENE|FEB|MAR|ABR|MAYO|JUN|JUL|AGO|SEPT|OCT|NOV|DIC)\s(\d{4})\b', v)

            if fecha:
                dia = fecha[0][0]
                mes = fecha[0][1]
                año = fecha[0][2]
                if not mes.isnumeric():
                    months = {'ENE' : '01', 'FEB' : '02', 'MAR' : '03', 'ABR':'04' , 'MAYO':'05' , 'JUN':'06', 'JUL':'07' , 'AGO':'08' , 'SEPT':'09' , 'OCT':'10', 'NOV':'11', 'DIC':'12'}
                    mes = months[mes]
                fecha = f'{dia}/{mes}/{año}'
            try:
                datetime.date(datetime.strptime(fecha,'%d/%m/%Y'))
            except:
                fecha = 'Error de Lectura'
            if not fecha:
                fecha = 'Not Found'

        dic['rut_proc'] = r
        dic['venc_proc'] = fecha

        return dic

    #Hacemos el preprocesamiento y el postprocesamiento
    # Aprovechamos que len(docs_gt)==len(docs_pred)
    for i in range(len(docs_gt)):
        docs_gt[i][document] = preproc(docs_gt[i][document])
        docs_pred[i][document] = postproc(docs_pred[i][document])
    #    docs_gt[i]['licencia'] = preproc(docs_gt[i]['licencia'])

    #for i in range(len(docs_pred)):
    #    docs_pred[i]['cedula'] = postproc(docs_pred[i]['cedula'])
    #    docs_pred[i]['licencia'] = postproc(docs_pred[i]['licencia'])
    
    # Save artifacts
    import pickle
    with open(docs_gt_artifact + ".pkl", 'wb') as f:
        pickle.dump(docs_gt, f)
        
    with open(docs_pred_artifact + ".pkl", 'wb') as f:
        pickle.dump(docs_pred, f)
    
    #return docs_gt, docs_pred

# Verification

In [19]:
@component(
    base_image="python:3.9",
    packages_to_install=[
        "numpy",
        "pandas",
        "tqdm",
        "google-cloud-storage",
        "nltk"
    ],
    output_component_file="verification.yaml"
)
def verification(
    project_id:str,
    bucket:str,
    docs_df_artifact: Input[Dataset],
    docs_gt_cedula_artifact: InputPath(),
    docs_pred_cedula_artifact: InputPath(),
    docs_gt_licencia_artifact: InputPath(),
    docs_pred_licencia_artifact: InputPath(),
    df_val_artifact: Output[Dataset]
):
    import pandas as pd
    import numpy as np
    from google.cloud import storage
    import os
    
    ###################
    # Helper Functions
    ###################
    
    def download_blob(bucket_name, source_blob_name, destination_file_name, project_id):
        """Downloads a blob from the bucket."""
        # The ID of your GCS bucket
        # bucket_name = "your-bucket-name"

        # The ID of your GCS object
        # source_blob_name = "storage-object-name"

        # The path to which the file should be downloaded
        # destination_file_name = "local/path/to/file"

        storage_client = storage.Client(project= project_id)

        bucket = storage_client.bucket(bucket_name)

        # Construct a client side representation of a blob.
        # Note `Bucket.blob` differs from `Bucket.get_blob` as it doesn't retrieve
        # any content from Google Cloud Storage. As we don't need additional data,
        # using `Bucket.blob` is preferred here.
        blob = bucket.blob(source_blob_name)
        blob.download_to_filename(destination_file_name)

        print(
            "Downloaded storage object {} from bucket {} to local file {}.".format(
                source_blob_name, bucket_name, destination_file_name
            )
        )
    
    os.mkdir("utils")
    download_blob(bucket, "utils/verification.py", "utils/verification.py", project_id)
    
    from utils.verification import observaciones,venc_verifier, rut_verifier, proximity_check, gen_checker, length_checker, verificador, verificador_venc, correcciones
    
    
    # get data artifacts
    import pickle
    docs_df = pd.read_csv(docs_df_artifact.path +".csv")
    
    with open(docs_gt_cedula_artifact + ".pkl", 'rb') as f:
        docs_gt_cedula = pickle.load(f)
    
    with open(docs_pred_cedula_artifact + ".pkl", 'rb') as f:
        docs_pred_cedula = pickle.load(f)
    
    with open(docs_gt_licencia_artifact + ".pkl", 'rb') as f:
        docs_gt_licencia = pickle.load(f)
    
    with open(docs_pred_licencia_artifact + ".pkl", 'rb') as f:
        docs_pred_licencia = pickle.load(f)
    
    
    
    df_val = docs_df.copy(deep = True)
    new_columns = ['VERIFICADO',  'OBSERVACIONES GENERALES','OBSERVACIONES CEDULA','OBSERVACIONES LICENCIA', 'LEC RUT CED.','LEC RUT LIC.', 'LEC VENC CED.','LEC VENC LIC.', 'CORRECCIONES RUT']
    for i in range(len(new_columns)):
        df_val[new_columns[i]] = np.nan
    
    
    obs_gen_licencia, obs_licencia = observaciones(docs_gt_licencia, docs_pred_licencia, "licencia")
    obs_gen_cedula, obs_cedula = observaciones(docs_gt_cedula, docs_pred_cedula, "cedula")
    correcciones_ = correcciones(docs_gt_cedula, docs_pred_cedula, docs_pred_licencia)
    
    # medio innecesario esto
    obs_gen = []
    for i in range(len(obs_gen_cedula)):
        if obs_gen_cedula[i] != '' and obs_gen_licencia[i] != '':
            ob = f'{obs_gen_cedula[i]}, {obs_gen_licencia[i]}'
        elif obs_gen_cedula != '':
            ob = obs_gen_cedula[i]
        elif obs_gen_licencia != '':
            ob = obs_gen_licencia[i]
        else:
            ob = ''
        obs_gen.append(ob)
                    
        
        
    #obs = observaciones(docs_gt, docs_pred)
    df_val['OBSERVACIONES GENERALES']  = obs_gen
    df_val['OBSERVACIONES CEDULA']  = obs_cedula
    df_val['OBSERVACIONES LICENCIA']  = obs_licencia
    df_val['CORRECCIONES RUT'] = correcciones_
    
    
    # Puede pasar que docs_pred licencia se de disinto largo que cedula? 
    for i in range(len(docs_pred_cedula)):
        pred_id_cedula = docs_pred_cedula[i]['index']
        pred_id_licencia = docs_pred_licencia[i]['index']
        ruts_general = rut_verifier(docs_gt_cedula[pred_id_cedula]['cedula']['rut_proc'],docs_pred_cedula[i]['cedula']['rut_proc'],docs_pred_licencia[pred_id_licencia]['licencia']['rut_proc'])
        venc_general = venc_verifier(docs_pred_cedula[pred_id_cedula]['cedula']['venc_proc'],docs_pred_licencia[pred_id_licencia]['licencia']['venc_proc'])
        df_val['VERIFICADO'].iloc[pred_id_cedula] = ruts_general and venc_general
        #Predicciones:
        df_val['LEC RUT CED.'].iloc[pred_id_cedula] = docs_pred_cedula[pred_id_cedula]['cedula']['rut']
        df_val['LEC RUT LIC.'].iloc[pred_id_licencia] = docs_pred_licencia[pred_id_licencia]['licencia']['rut']
        df_val['LEC VENC CED.'].iloc[pred_id_cedula] = docs_pred_cedula[pred_id_cedula]['cedula']['venc']
        df_val['LEC VENC LIC.'].iloc[pred_id_licencia] = docs_pred_licencia[pred_id_licencia]['licencia']['venc']

    # Upload Artifacts
    df_val.to_csv(df_val_artifact.path + ".csv", index = False)










# Load to Drive


In [20]:
@component(
    base_image="python:3.9",
    packages_to_install=[
        "numpy",
        "pandas",
        "google-api-python-client",
        "google-cloud-secret-manager",
        "google-cloud",
        "oauth2client",
        "google-cloud-storage",
        "gspread",
        "gspread-formatting",
    ],
    output_component_file="upload_results.yaml"
)
def upload_results(
    project_id:str,
    drive_secret_id:str,
    id_revision: str,
    df_val_artifact: Input[Dataset]
    ):
    
    import gspread
    import pandas as pd
    from gspread_formatting import get_conditional_format_rules,ConditionalFormatRule,GridRange,BooleanRule, BooleanCondition,CellFormat, Color  
    from googleapiclient.errors import HttpError
    from googleapiclient.http import MediaIoBaseDownload
    import pandas as pd
    import gspread
    from google.oauth2.credentials import Credentials
    from googleapiclient.discovery import build
    from oauth2client.service_account import ServiceAccountCredentials
    #####################
    ## Helper Functions
    #####################
    
    def save_secret_token(project_id, secret_id, version_id, token_file):
        """
        Access the payload for the given secret version if one exists. The version
        can be a version number as a string (e.g. "5") or an alias (e.g. "latest").
        """

        # Import the Secret Manager client library.
        import google_crc32c
        from google.cloud import secretmanager

        # Create the Secret Manager client.
        client = secretmanager.SecretManagerServiceClient()

        # Build the resource name of the secret version.
        name = f"projects/{project_id}/secrets/{secret_id}/versions/{version_id}"

        # Access the secret version.
        response = client.access_secret_version(request={"name": name})

        # Verify payload checksum.
        crc32c = google_crc32c.Checksum()
        crc32c.update(response.payload.data)
        if response.payload.data_crc32c != int(crc32c.hexdigest(), 16):
            print("Data corruption detected.")
            return response

        # Print the secret payload.
        #
        # WARNING: Do not print the secret in a production environment - this
        # snippet is showing how to access the secret material.
        payload = response.payload.data.decode("UTF-8")
        
        
        import json
        dictt = json.loads(payload)
        # save token
       
        with open(token_file, 'w', encoding='utf-8') as f:
            json.dump(dictt, f)
        return True

    def append_to_sheet(spreadsheet_id, df, client):
        # Get the outdated worksheet as a pandas DataFrame
        spreadsheet = client.open_by_key(spreadsheet_id)
        worksheet = spreadsheet.get_worksheet(0)

        # Append the columns names
        #worksheet.add_rows(1)
        worksheet.update([list(df.columns.values)] , raw = True)

        # Append the rows from the DataFrame to the worksheet
        df.fillna('',inplace=True)
        df_rows = df.values.tolist()
        worksheet.add_rows(len(df_rows))
        worksheet.append_rows(df_rows)
        print('Google Spreadsheet updated successfully.')

    def add_color(spreadsheet_id, client):
        # Get the outdated worksheet as a pandas DataFrame
        spreadsheet = client.open_by_key(spreadsheet_id)
        worksheet = spreadsheet.get_worksheet(0)
        rules = get_conditional_format_rules(worksheet)
        rules.clear()

        rule_g = ConditionalFormatRule( ranges=[GridRange.from_a1_range('M:M', worksheet)],
                                     booleanRule=BooleanRule(condition=BooleanCondition('TEXT_EQ', ['TRUE']),
                                                             format=CellFormat(backgroundColor=Color(0.15, 0.5, 0.15))
                                                            )
                                    )
        rule_r = ConditionalFormatRule( ranges=[GridRange.from_a1_range('M:M', worksheet)],
                                     booleanRule=BooleanRule(condition=BooleanCondition('TEXT_EQ', ['FAlSE']),
                                                             format=CellFormat(backgroundColor=Color(0.5,0.15,0.15))
                                                            )
                                    )

        # or, to replace any existing rules with just your single rule:

        rules.append(rule_g)
        rules.append(rule_r)
        rules.save()
    
    #####################
    # Logic
    #####################
    
    # load artifact
    df_val = pd.read_csv(df_val_artifact.path + ".csv")
        
    # Authenticate and authorize the application to access the Google Drive API
    # Credentials
    SCOPES = ['https://www.googleapis.com/auth/drive']
    TOKEN_FILE = 'token.json'
    save_secret_token(project_id, drive_secret_id, "latest", TOKEN_FILE)
    creds = ServiceAccountCredentials.from_json_keyfile_name(TOKEN_FILE, SCOPES)
    client = gspread.authorize(creds)
    
    append_to_sheet(id_revision, df_val, client)
    add_color(id_revision, client)

# BUILD PIPELINE

In [21]:

PROJECT_ID = "tc-sc-bi-bigdata-corp-tsod-dev" # se puede sacar desde gcp automaticamente
BUCKET = "enrollment_verification_pipeline"
#MODEL_DIRECTORY = os.environ['AIP_MODEL_DIR']
DRIVE_SECRET_ID = "drive_token"
ID_MANTENEDOR = "1Wj-wKyzGyfLoujoUE68z1uPtVhTqLTMWgZ3KhChPvdY"
MODEL_LICENCIA = "AA-supply/donut-finetuned-crop-licencia-DA"
MODEL_CEDULA = "AA-supply/donut-finetuned-crop-cedula-DA"
    
@kfp.dsl.pipeline(
    # i.e. in my case: PIPELINE_ROOT = 'gs://my-bucket/test_vertex/pipeline_root/'
    # Can be overriden when submitting the pipeline
    pipeline_root=PIPELINE_ROOT,
    name="enrollment-verification-pipeline",  # Your own naming for the pipeline.
)
def pipeline(
    project_id: str = PROJECT_ID,
    bucket:str = BUCKET,
    id_mantenedor:str = ID_MANTENEDOR,
    drive_secret_id:str = DRIVE_SECRET_ID,
    model_licencia:str = MODEL_LICENCIA,
    model_cedula:str=MODEL_CEDULA
):
    # Get Data and Ground Truth
    
    get_ids_task = get_ids(
    project_id=project_id,
    id_mantenedor=id_mantenedor,
    drive_secret_id=drive_secret_id)
    
    get_data_task = get_data( 
    project_id = project_id,
    id_mantenedor=id_mantenedor,
    drive_secret_id = drive_secret_id
    )
    
    grund_truth_task = get_ground_truth(
    docs_df = get_data_task.outputs["docs_df"]
    )
    
    # Get Images
    
    license_get_images_task = get_images(bucket=BUCKET,
    project_id=project_id,
    drive_secret_id = drive_secret_id,
    document = 'licencia',                                    
    docs_gt = grund_truth_task.output,
    docs_ds=get_data_task.outputs["docs_df"]
    ).add_node_selector_constraint('cloud.google.com/gke-accelerator', 'NVIDIA_TESLA_T4')
    
    cedula_get_images_task = get_images(bucket=BUCKET,
    project_id=project_id,
    drive_secret_id = drive_secret_id,
    document = 'cedula',
    docs_gt = grund_truth_task.output,
    docs_ds=get_data_task.outputs["docs_df"]
    ).add_node_selector_constraint('cloud.google.com/gke-accelerator', 'NVIDIA_TESLA_T4')
    
    # Inference
    
    licence_inference_task = inference(
    project_id=project_id ,
    bucket=bucket,
    document='licencia',
    model = model_licencia,
    docs_gt_artifact=license_get_images_task.outputs["docs_gt_artifact"]
    ).add_node_selector_constraint('cloud.google.com/gke-accelerator', 'NVIDIA_TESLA_T4')
    
    cedula_inference_task = inference(
    project_id=project_id ,
    bucket=bucket,
    document='cedula',
    model = model_cedula,
    docs_gt_artifact=cedula_get_images_task.outputs["docs_gt_artifact"]
    ).add_node_selector_constraint('cloud.google.com/gke-accelerator', 'NVIDIA_TESLA_T4')
    
    # Postprocess
     
    licence_postprocess_task = postprocess(
    document = 'licencia',
    docs_gt=license_get_images_task.outputs["docs_gt_artifact"],
    docs_pred=licence_inference_task.outputs["docs_pred_artifact"]
    )
    
    cedula_postprocess_task = postprocess(
    document= 'cedula',
    docs_gt=cedula_get_images_task.outputs["docs_gt_artifact"],
    docs_pred=cedula_inference_task.outputs["docs_pred_artifact"]
    )
    
    # Verification
    verification_task = verification(
    project_id = project_id,
    bucket = bucket,
    docs_df_artifact = get_data_task.outputs["docs_df"],
    docs_gt_cedula_artifact = cedula_postprocess_task.outputs["docs_gt_artifact"],
    docs_pred_cedula_artifact = cedula_postprocess_task.outputs["docs_pred_artifact"],
    docs_gt_licencia_artifact = licence_postprocess_task.outputs["docs_gt_artifact"],
    docs_pred_licencia_artifact = licence_postprocess_task.outputs["docs_pred_artifact"],
    )
    
    # Upload results
    upload_results_task = upload_results(
    project_id=project_id,
    drive_secret_id=drive_secret_id,
    id_revision=get_ids_task.outputs["id_revision"],
    df_val_artifact=verification_task.outputs["df_val_artifact"]
    )
    

In [22]:

# Compiling the 'PIPELINE'    

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="enrollment_verification.json"
)



In [23]:
from google.cloud import aiplatform

job = aiplatform.PipelineJob(display_name = "enrollment_verification",
                             template_path = "enrollment_verification.json",
                             #job_id = JOB_ID,
                             pipeline_root = PIPELINE_ROOT,
                            # parameter_values = {"query":query,
                            #                    "project_id": PROJECT_ID},
                             enable_caching = False,
                             #encryption_spec_key_name = CMEK,
                             #labels = LABELS,
                             #credentials = CREDENTIALS,
                             project = PROJECT_ID,
                             location = REGION)

job.submit()

Creating PipelineJob
PipelineJob created. Resource name: projects/1003479373544/locations/us-central1/pipelineJobs/enrollment-verification-pipeline-20230324184311
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/1003479373544/locations/us-central1/pipelineJobs/enrollment-verification-pipeline-20230324184311')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/enrollment-verification-pipeline-20230324184311?project=1003479373544
