In [1]:
#!pip3 install google-cloud-aiplatform --upgrade
#!pip3 install kfp google-cloud-pipeline-components==0.1.1 --upgrade

In [2]:
# Importaciones 
from typing import NamedTuple
from kfp.v2 import dsl
import kfp
from kfp.v2.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component, 
                        OutputPath, 
                        InputPath)

from kfp.v2 import compiler
from google.cloud import bigquery
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
from google_cloud_pipeline_components import aiplatform as gcc_aip

In [3]:
PROJECT_ID = "tc-sc-bi-bigdata-corp-tsod-dev" 
BUCKET = "gs://image-recognition-pipeline"
REGION = "us-central1"

# Pipeline root
PIPELINE_ROOT = f"{BUCKET}/PIPELINES/image-recognition/"

In [4]:
query = """
SELECT DISTINCT transport_ord_id as SOC, i.url as url, shipment.plate_num as plate_num, 
provider.doc_id as provider_id, 
provider.doc_verify_digit as provider_verify_digit,
provider.name as provider_name, driver.doc_id as driver_id, 
driver.doc_verify_digit as driver_verify_digit,
driver.name as driver_name, driver.last_name as driver_last_name,
DATETIME(event_crte_tmst, 'America/Santiago') as event_crte_tmst, dfl_crte_tmst
FROM 
`tc-sc-bi-bigdata-corp-tsod-dev.image_recognition.btd_scha_fal_trmg_api_transport_order_temp`,
unnest(image) as i
 
WHERE
  i.url is not null
  and provider.name is not null
  and provider.doc_id is not null
  and DATE(event_crte_tmst, 'America/Santiago') = current_date() - 1
"""

# Components

In [5]:
# Copy tables
@component(
    packages_to_install=["google-cloud-bigquery",
                         "db-dtypes"],
    base_image = "python:3.9",
    output_component_file="load_data.yaml" 
)
def copy_tables(
    project_id:str,
):  
    from google.cloud import bigquery

    # Construct a BigQuery client object.
    client = bigquery.Client(project = project_id)
    
    # TODO(developer): Set source_table_id to the ID of the original table.
    source_table_id1 = "tc-sc-bi-bigdata-corp-tsod-dev.image_recognition.prediction_table_optimizado_final"
    source_table_id2 = "tc-sc-bi-bigdata-corp-tsod-dev.image_recognition.random_table_backup_optimizado_final"
    
    # TODO(developer): Set destination_table_id to the ID of the destination table.
    destination_table_id1 = "tc-sc-bi-bigdata-corp-tsod-dev.image_recognition.prediction_table_optimizado_final_copy"
    destination_table_id2 = "tc-sc-bi-bigdata-corp-tsod-dev.image_recognition.random_table_backup_optimizado_final_copy"
    
    # Eliminar tablas copiadas
    client.delete_table(destination_table_id1, not_found_ok=True)
    client.delete_table(destination_table_id2, not_found_ok=True)
    
    # Copiar tablas originales
    job = client.copy_table(source_table_id1, destination_table_id1)
    job.result()  # Wait for the job to complete.
    print("A copy of the table created.")
    job = client.copy_table(source_table_id2, destination_table_id2)
    job.result()  # Wait for the job to complete.
    print("A copy of the table created.")

In [6]:
# Load data
@component(
    packages_to_install=["pandas",
                         "google-cloud-bigquery",
                         "db-dtypes"],
    base_image = "python:3.9",
    output_component_file="load_data.yaml" 
)
def load_data(
    query:str,
    project_id:str,
    dataset:Output[Dataset],  
):
    import pandas as pd
    import datetime
    from google.cloud import bigquery
    
    client = bigquery.Client(project = project_id)
    df_base = client.query(query).to_dataframe().drop_duplicates(['url'])
    
    # Metadata
    FECHA = datetime.datetime.now().strftime("%Y-%m-%d")
    dataset.metadata["updated_date"] = FECHA
    dataset.metadata["len_data"] = len(df_base)
    
    # Save Artifact
    df_base.to_csv(dataset.path + ".csv", index = False, encoding = 'utf-8-sig')
    
    

In [7]:
# Data to bucket
@component(
    packages_to_install=["google-cloud-bigquery",
                         "google-cloud-storage",
                         "numpy",
                         "pandas",
                         "opencv-python-headless",
                         "db-dtypes"
    ],
    base_image="python:3.9",
    output_component_file="set_data_parallel.yaml"
)
def set_data_parallel(
    df_images:Input[Dataset],
    project_id:str
)-> NamedTuple("output", [("deploy", str)]):
    import datetime
    import numpy as np
    import pandas as pd
    import cv2
    import urllib
    import urllib.request
    from multiprocessing import cpu_count
    from multiprocessing.pool import ThreadPool

    from google.cloud import bigquery
    from google.cloud import storage
    
    def get_data(url):    
        try:
            url_str = url.split('/')[-1]
            url_open = urllib.request.urlopen(url)
            image_cv = np.asarray(bytearray(url_open.read()), dtype="uint8")
            image = cv2.imdecode(image_cv, cv2.IMREAD_COLOR)
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            img_str = cv2.imencode('.png', image)[1].tostring()

            storage_client = storage.Client()
            bucket = storage_client.bucket('pod_images')
            # el cero es para distinguir 
            blob = bucket.blob(f'{date.year}{date.month}{date.day}/{url_str}')
            blob.upload_from_string(img_str)
        except:
            # ver el link que tira error
            pass
        
    date = datetime.datetime.now()
    client = bigquery.Client(project = project_id)
    df_images = pd.read_csv(df_images.path + ".csv")
    # los ultimos 30 para ver si la maquina sirve
    #urls = df_images.loc[:,'url'].iloc[:10]
    urls = df_images.loc[:,'url']
    
    cpus = cpu_count()
    results = ThreadPool(cpus-1).imap_unordered(get_data, urls)
    # verificar que proceso termine
    for _ in results:
        pass
    
    # Salida
    msg = "start"
    return(msg,)

In [8]:
# Blob list

@component(
    packages_to_install=["google-cloud-storage",
                         "db-dtypes"],
    base_image = "python:3.9",
    output_component_file="blob_name_list.yaml" 
)
def blob_name_list(
    bucket_name:str,
    output_path: OutputPath(),
):
    from google.cloud import storage
    #from tempfile import TemporaryFile
    import datetime

    date=datetime.datetime.now()

    #nombre carpeta
    #if date == "today":
    #    folder = str(date.year) + str(date.month) + str(date.day)
    #elif date == "another day":
    #    folder = str(year) + str(month) + str(day)
    #folder = str(2022) + str(12) + str(3) 
    folder = f'{date.year}{date.month}{date.day}'

    # inicialización
    client = storage.Client()
    bucket = client.get_bucket(bucket_name)
    blobs = bucket.list_blobs(prefix=folder)
    # generar lista de nombres de blobs
    name_blobs_list = list(blobs)
    blobs_list = list(map(lambda x: x.name, name_blobs_list))
    
    import pickle

    with open(output_path + "blobs_list.pkl", 'wb') as file:
        pickle.dump(blobs_list, file)
    
    # Metadata
    #FECHA = datetime.datetime.now().strftime("%Y-%m-%d")
    #output_path.metadata["len_list"] = len(blobs_list)
    print(len(blobs_list))
    
    # Save Artifact

In [9]:
# Open parallel bucket
@component(
    packages_to_install=["google-cloud-storage",
                         "numpy",
                         "opencv-python-headless",
                         "db-dtypes",
                         "psutil"],
    base_image = "python:3.9",
    output_component_file="open_parallel_bucket.yaml" 
)
def open_parallel_bucket(
    input_path: InputPath(),
    output_path: OutputPath()
)-> NamedTuple("output", [("time", float)]):
    import psutil

    import pickle
    file = open(input_path + "blobs_list.pkl", 'rb')
    args = pickle.load(file)
    
    # importar tiempo
    import time
    t_ini = time.time()
    
    from google.cloud import storage
    import numpy as np
    import cv2
    # función para leer imágenes desde el bucket
    def read_image_bucket(blob_name):
        imagename = blob_name.split('/')[1]
    
        client = storage.Client()
        
        bucket = client.get_bucket('pod_images')
        blob = bucket.get_blob(blob_name)
        
        try:
            b = blob.download_as_bytes()
            image_cv = np.asarray(bytearray(b), dtype="uint8")
            image = cv2.imdecode(image_cv, cv2.IMREAD_COLOR)
            area = image.shape[0]*image.shape[1]
            url = 'https://prdadessacorptrl.blob.core.windows.net/cl-images/' + imagename
        except:
            image = np.zeros((480,480,3), np.uint8)
            area = image.shape[0]*image.shape[1]
            url = 'https://prdadessacorptrl.blob.core.windows.net/cl-images/' + imagename
            
        return(image, area, url)
    
    from multiprocessing import cpu_count
    from multiprocessing.pool import ThreadPool
    
    cpus = cpu_count()
    #########################################################
    print(cpus)
    # Getting % usage of virtual_memory ( 3rd field)
    print('RAM memory % used:', psutil.virtual_memory()[2])
    # Getting usage of virtual_memory in GB ( 4th field)
    print('RAM Used (GB):', psutil.virtual_memory()[3]/(1024.0 ** 3))
    #########################################################
    
    results = ThreadPool(cpus).imap_unordered(read_image_bucket, args)
    lista_base = []

    for result in results:
        lista_base.append(result) 
    
    # calcular cuánto se demora en procesar
    t_fin = time.time()
    delta_time = t_fin-t_ini
    
    #########################################################
    print(delta_time)
    # Getting % usage of virtual_memory ( 3rd field)
    print('RAM memory % used:', psutil.virtual_memory()[2])
    # Getting usage of virtual_memory in GB ( 4th field)
    print('RAM Used (GB):', psutil.virtual_memory()[3]/(1024.0 ** 3))
    #########################################################

    import pickle

    with open(output_path + "image_bgr_list.pkl", 'wb') as file:
        pickle.dump(lista_base, file)
    
    # Metadata
    #FECHA = datetime.datetime.now().strftime("%Y-%m-%d")
    #dataset.metadata["updated_date"] = FECHA
    
    # Save Artifact
    #new_df_base_final.to_csv(dataset.path + ".csv", index = False, encoding = 'utf-8-sig')
    return(delta_time,)

In [10]:
# Prediction
@component(
    packages_to_install=["pandas",
                         "numpy",
                         "google-cloud-bigquery",
                         "google-cloud-storage",
                         "db-dtypes",
                         "scikit-image",
                         "psutil",
                         "opencv-python-headless"],
    base_image = "gcr.io/deeplearning-platform-release/pytorch-gpu",
    output_component_file="prediction.yaml" 
)
def prediction(
    project_id:str,
    input_path:InputPath(),
    dataset:Output[Dataset] 
)-> NamedTuple("output", [("time", float)]):
    import psutil
    
    import pickle
    
    file = open(input_path + "blobs_list.pkl", 'rb')
    blobs_list = pickle.load(file)
    
    from google.cloud import storage
    
    import torch
    import torchvision
    #from torchvision.models import resnext101_32x8d
    from torchvision.models import resnext101_32x8d, ResNeXt101_32X8D_Weights
    from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
    from torch.utils.data import DataLoader
    
    #########################################################
    # Getting % usage of virtual_memory ( 3rd field)
    print('RAM memory % used:', psutil.virtual_memory()[2])
    # Getting usage of virtual_memory in GB ( 4th field)
    print('RAM Used (GB):', psutil.virtual_memory()[3]/(1024.0 ** 3))
    #########################################################

    # Definir resnext
    #resnext1 = resnext101_32x8d(pretrained=True)
    #resnext2 = resnext101_32x8d(pretrained=True)
    weights = ResNeXt101_32X8D_Weights.DEFAULT
    resnext1 = resnext101_32x8d(weights)
    resnext2 = resnext101_32x8d(weights)
    
    # Obtener clases para modelos
    def download_blob(bucket_name, source_blob_name, destination_file_name, project_id):
        """Downloads a blob from the bucket."""
        storage_client = storage.Client(project= project_id)
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(source_blob_name)
        blob.download_to_filename(destination_file_name)
        print(
            "Downloaded storage object {} from bucket {} to local file {}.".format(
                source_blob_name, bucket_name, destination_file_name
            )
        )
    bucket = "librerias_pod_images" 
    download_blob(bucket, "models_import2.py", "models_import2.py", project_id)
    
    from models_import2 import MultilabelClassifier1, MultilabelClassifier2
    
    # cargar modelo para cara y n° domicilio
    download_blob(bucket, "Models/mlc_model_4.pth", "mlc_model_4.pth", project_id)
    device = 'cuda'
    PATH_1 = 'mlc_model_4.pth'
    ml_model_1 = MultilabelClassifier1(3, resnext1).to(device)
    ml_model_dict_1 = torch.load(PATH_1, map_location=torch.device(device))
    ml_model_1.load_state_dict(ml_model_dict_1['model_state_dict'])
    
    # cargar modelo para etiqueta del producto
    download_blob(bucket, 
                  "Models/mlc_model_baseline_data_revisada_1_2.pth", 
                  "mlc_model_baseline_data_revisada_1_2.pth", project_id)
    device = 'cuda'
    PATH_2 = 'mlc_model_baseline_data_revisada_1_2.pth'
    ml_model_2 = MultilabelClassifier2(3, resnext2).to(device)
    ml_model_dict_2 = torch.load(PATH_2, map_location=torch.device(device))
    ml_model_2.load_state_dict(ml_model_dict_2['model_state_dict'])
    
    def get_object_detection_model(num_classes):
        # load a model pre-trained on COCO
        model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
        # get number of input features for the classifier
        in_features = model.roi_heads.box_predictor.cls_score.in_features
        # replace the pre-trained head with a new one
        model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes) 
        return model
    
    # cargar modelo detector de objetos
    download_blob(bucket, 
                  "Models/model_cf_12y3_1.pth", 
                  "model_cf_12y3_1.pth", project_id)
    device = 'cuda'
    model_path = 'model_cf_12y3_1.pth'
    num_classes = 2
    od_model = get_object_detection_model(num_classes)
    od_model_dict = torch.load(model_path, map_location=torch.device(device))
    od_model.load_state_dict(od_model_dict['model_state_dict'])
    
    #########################################################
    # Getting % usage of virtual_memory ( 3rd field)
    print('RAM memory % used:', psutil.virtual_memory()[2])
    # Getting usage of virtual_memory in GB ( 4th field)
    print('RAM Used (GB):', psutil.virtual_memory()[3]/(1024.0 ** 3))
    #########################################################
    
    from models_import2 import PredictionDataset
    
    # se generan batch para clasificador multi-etiqueta y detector de objetos
    prediction_set_mlc = PredictionDataset(bucket_name = 'pod_images', blob_name_list = blobs_list,
                                           normalization = 'mlc', device = 'cuda')
    prediction_set_od = PredictionDataset(bucket_name = 'pod_images', blob_name_list = blobs_list,
                                          normalization = 'od', device = 'cuda')

    # para aumentar más el batch_size hay que agregar más memoria
    predictionLoader_mlc = DataLoader(prediction_set_mlc, batch_size=32, num_workers=2,
                                      shuffle = False, pin_memory = True)
    predictionLoader_od = DataLoader(prediction_set_od, batch_size=32, num_workers=2,
                                     shuffle = False, pin_memory = True)
    
    import itertools
    
    def bbox_function(b, area_img):
        l =list(range(b.shape[0]))
        # recorrer cajas y calcular inter-area para cada combinación
        indice = 0
        interArea = 0
        for i in itertools.combinations(l, r=2):
            # obtener coordenadas de la inter-area
            x0 = max(b[i[0]][0], b[i[1]][0])
            y0 = max(b[i[0]][1], b[i[1]][1])
            x1 = min(b[i[0]][2], b[i[1]][2])
            y1 = min(b[i[0]][3], b[i[1]][3])
    
            # calcular inter-area
            dif_x = x0-x1
            dif_y = y0-y1
            # se verifica que las esquinas de la interArea esten bien ubicadas 
            if dif_x < 0 and dif_y < 0:
                interArea += dif_x*dif_y
                ##interArea += abs(x0-x1)*abs(y0-y1)
        
        # sumar areas de cada bbox
        area_total_bbox = 0
        for box in b:
            # calcular area de cada bbox
            area_bbox = abs(box[0]-box[2])*abs(box[1]-box[3])
            # calcular area total de bbox
            area_total_bbox += area_bbox
    
        # calcular la union de las areas de cada bbox
        union = area_total_bbox - interArea
    
        # calcular contexto
        if torch.is_tensor(union):
            contexto = union.item()/area_img
        
        else:
            contexto = union/area_img
        
        return contexto

    import pandas as pd
    import numpy as np

    def score(predictionLoader_mlc, predictionLoader_od, classificator_1, classificator_2, 
              detector = od_model, pesos = {'w_prod': 0.4, 'w_notface': 0.1,'w_label': 0.3, 'w_num': 0.1, 
                                            'w_contx': 0.2},
              thresholds = {'t_prod': 0.5, 't_face': 0.5, 't_label': 0.5, 't_num': 0.5, 't_ctx_down': 0.2, 
                            't_ctx_up': 0.65}, device = 'cuda'):
        """
        Args:
            classificator_1 (modelo): clasificador multi-etiqueta para detectar la etiqueta del paquete
            classificator_2 (modelo): clasificador multi-etiqueta para detectar la cara y el domicilio
            detector (modelo): detector de objetos para identificar el paquete y su bbox respectivo
            pesos (dict): diccionario con valores para cada peso, es decir,
            w_prod (product), w_notface (without face), w_label (product label), 
            w_num (address number) y w_contx (context)
            thresholds = diccionario con valores de umbral para cada criterio, es decir,
            t_prod (product), t_face (face detector), t_label (product label) y 
            t_num (address number)
            
        Obs: los pesos deben sumar 1 para todos los criterios menos el del numero de domicilio. Este
            ultimo corresponde a un beneficio de +0.1 si es que aparece en la fotografía.
    
        Returns:
            result_data (dataFrame): cada una de las columnas del dataFrame corresponde a la predicción
            de cada criterio sobre cierto umbral, los scores (confianza del modelo) de cada criterio y 
            la nota de la foto (score).
        """
        # evaluar con gpu o cpu 
        device = torch.device(device)
        
        classificator_1.to(device)
        classificator_1.eval()  
        classificator_2.to(device)
        classificator_2.eval()           
        detector.to(device)
        detector.eval()
        df_base = pd.DataFrame(columns = ['url','paquete', 's_paquete', 'etiqueta_producto',
                                              's_etiqueta_producto', 'sin_rostro', 's_sin_rostro',
                                              'numero_domicilio', 's_numero_domicilio', 'contexto',
                                              'ctx_value', 'score'])
        with torch.no_grad():
            for i, image in enumerate(zip(predictionLoader_mlc, predictionLoader_od)):
                # se obtiene la imagen de cada dataLoader
                image_mlc = image[0][0]
                image_od = image[1][0]
                
                # obtener areas y urls de las imagenes
                area_img = image[0][1]
                #img_url = np.asarray(image[0][2]).reshape(-1,1)
                img_url = list(image[0][2])
        
                # CLASIFICACION MULTI-ETIQUETA
                #---------------------------clasificador 1--------------------------------
                label_base = classificator_1(image_mlc.to(device))
                #---------------------------clasificador 2--------------------------------
                label_etiqueta = classificator_2(image_mlc.to(device))
                # obtener score
                score_etiqueta = torch.sigmoid(label_etiqueta['label']).squeeze()
                score = torch.sigmoid(label_base['label']).squeeze()
                # condición para cuando se tiene un batch de 1 elemento
                if score_etiqueta.dim() == 1:
                    score_etiqueta = torch.unsqueeze(score_etiqueta, dim=0)
                    score = torch.unsqueeze(score, dim=0)
                # obtener predicción etiqueta
                s_etiqueta = score_etiqueta[:,0]
                etiqueta = torch.where(s_etiqueta >= thresholds['t_label'], 1, 0)
                # obtener predicción domicilio
                s_domicilio = score[:,1]
                domicilio = torch.where(s_domicilio >= thresholds['t_num'], 1, 0)
                # obtener predicción cara
                s_cara = score[:,2]
                s_no_cara = 1-s_cara
                no_cara = torch.where(s_no_cara >= thresholds['t_face'], 1, 0)
                
                etiqueta = etiqueta.reshape(-1,1).to('cpu').numpy()
                s_etiqueta = s_etiqueta.reshape(-1,1).to('cpu').numpy()
                domicilio = domicilio.reshape(-1,1).to('cpu').numpy()
                s_domicilio = s_domicilio.reshape(-1,1).to('cpu').numpy()
                no_cara = no_cara.reshape(-1,1).to('cpu').numpy()
                s_no_cara = s_no_cara.reshape(-1,1).to('cpu').numpy()
                
                #-----------------------------------------------------------------------------
                # DETECTOR DE OBJETOS
                od_prediction = od_model(image_od.to(device))
                dataFrame = pd.DataFrame(od_prediction).to_dict(orient="list")
                list_of_lists = list(map(lambda x: x.tolist(), dataFrame['boxes']))
                array_of_arrays1= np.array(list(map(lambda x: x.to('cpu').numpy(), dataFrame['boxes'])))
                array_of_arrays2= np.array(list(map(lambda x: x.to('cpu').numpy(), dataFrame['scores'])))
                #bla = list(map(lambda x: torch.where(x >= 0.5, 1, 0), dataFrame['scores']))
                bla2 = np.array(list(map(lambda x: np.where(x.to('cpu').numpy()>=thresholds['t_prod'],1,
                                                            0).reshape(-1,1),
                                         dataFrame['scores'])))
                producto = array_of_arrays1*bla2
                producto_wz = np.array(list(map(lambda x: x[~np.all(x == 0, axis=1)], producto)))
                contexto = np.array(list(map(lambda x,y: bbox_function(x,y), producto_wz, area_img)))
                contexto = contexto.reshape(-1,1)
                ctx_value = np.where(((contexto >= thresholds['t_ctx_down']) & (contexto <= thresholds['t_ctx_up'])),
                                     1, 0) 
                s_paquete = torch.tensor(list(map(lambda x: max(x, default=0), dataFrame['scores'])))
                s_paquete = s_paquete.reshape(-1,1)
                paquete = torch.where(s_paquete >= thresholds['t_prod'], 1, 0)
                # Obtener array de resultados
                result = np.concatenate([paquete, s_paquete, etiqueta, s_etiqueta, no_cara, s_no_cara,
                                        domicilio, s_domicilio, contexto, ctx_value], axis = 1)
                
                # LLevarlo a un dataFrame
                df = pd.DataFrame(result, columns=['paquete','s_paquete','etiqueta_producto', \
                                                               's_etiqueta_producto', 'sin_rostro', 's_sin_rostro',\
                                                               'numero_domicilio', 's_numero_domicilio', 'contexto',\
                                                               'ctx_value'])
                
                pesos = {'w_prod': 0.4, 'w_notface': 0.1,'w_label': 0.3, 'w_num': 0.1, 'w_contx': 0.2}
                df['score'] = df.paquete*pesos['w_prod'] + df.etiqueta_producto*pesos['w_label'] + \
                df.sin_rostro*pesos['w_notface'] + df.numero_domicilio*pesos['w_num'] + df.ctx_value*pesos['w_contx']
                df.insert(loc=0, column='url', value=img_url)
                # concatenar a df anterior
                df_base = pd.concat((df_base,df), ignore_index= True)
        return df_base

    # importar tiempo
    import time
    t_ini = time.time()
    
    df_prediction = score(predictionLoader_mlc, predictionLoader_od, ml_model_1 , ml_model_2, 
                          detector = od_model,pesos = {'w_prod': 0.4, 'w_notface': 0.1,
                                                       'w_label': 0.3, 'w_num': 0.1, 'w_contx': 0.2},
                          thresholds = {'t_prod': 0.5, 't_face': 0.6491, 't_label': 0.3, 't_num': 0.6, 
                                        't_ctx_down': 0.2, 't_ctx_up': 0.65}, device = 'cuda')
    
    # calcular cuánto se demora en procesar
    t_fin = time.time()
    delta_time = t_fin-t_ini
    
    #########################################################
    print(delta_time)
    # Getting % usage of virtual_memory ( 3rd field)
    print('RAM memory % used:', psutil.virtual_memory()[2])
    # Getting usage of virtual_memory in GB ( 4th field)
    print('RAM Used (GB):', psutil.virtual_memory()[3]/(1024.0 ** 3))
    #########################################################
    
    # Metadata
    import datetime
    FECHA = datetime.datetime.now().strftime("%Y-%m-%d")
    dataset.metadata["updated_date"] = FECHA
    
    # Save Artifact
    df_prediction.to_csv(dataset.path + ".csv", index = False, encoding = 'utf-8-sig')
    
    return(delta_time,)
    

In [11]:
# Table generator
@component(
    packages_to_install=["pandas"],
    base_image = "python:3.9",
    output_component_file="table_generation.yaml" 
)
def table_generation(
    delta_time1: float,
    df_images:Input[Dataset],
    df_base:Input[Dataset],
    dataset:Output[Dataset],  
):
    import pandas as pd
    import datetime
    
    delta_time = delta_time1
    
    df_images = pd.read_csv(df_images.path + ".csv")
    df_base = pd.read_csv(df_base.path + ".csv") # prediction
    
    inner_merged = pd.merge(df_images, df_base, on=["url"])
    df_base_copy1 = inner_merged.copy()
    # normalización
    #agregar indice
    largo_dataset = len(df_base_copy1)
    df_base_copy1['index'] = df_base_copy1.index
    # Cambiar tipo de dato de "score" a float
    df_base_copy1["score"] = pd.to_numeric(df_base_copy1["score"])
    # Seleccionar SOC que tiene mayor "score"
    idx_max_score = df_base_copy1.groupby(['SOC'])['score'].transform(max) == df_base_copy1['score']
    # Agrupar por RUT, normalizar los nombres y dejar indice
    #df_base_copy3 = df_base_copy1[['provider_name', 'provider_id']].groupby(['provider_id'], sort = False)['provider_name'].transform('first').to_frame() # primer valor
    df_base_copy1.provider_name = df_base_copy1.provider_name.str.strip() # quitar espacios al inicio y final
    df_base_copy1.provider_name = df_base_copy1.provider_name.str.upper() # dejar todo en mayuscula
    df_base_copy3 = df_base_copy1[['provider_name', 'provider_id']].groupby(['provider_id'], sort = False)['provider_name'].transform(lambda x: pd.Series.mode(x)[0]).to_frame() # valor más frecuente
    df_base_copy3['index'] = df_base_copy1['index']
    # "merge" del dataset normalizado y el dataset original, con respecto al indice
    df_base_copy4 = pd.merge(df_base_copy3, df_base_copy1, how='left', on='index')
    df_base_copy4.provider_name_y = df_base_copy4.provider_name_x
    df_base_copy4 = df_base_copy4.drop(columns=["provider_name_x"])
    df_base_copy4.rename(columns = {'provider_name_y':'provider_name'}, inplace = True)
    # "merge" del dataset agrupado por SOC y el dataset anterior, con respecto al indice
    df_base_final = df_base_copy1[idx_max_score].drop_duplicates(['SOC'])
    ##df_base_final = df_base_copy4[idx_max_score].drop_duplicates(['SOC'])
    # arreglar fechas
    ##df_base_final['dfl_crte_tmst'] = df_base_final['dfl_crte_tmst'].dt.tz_localize(None)
    ##df_base_final['event_crte_tmst'] = df_base_final['event_crte_tmst'].dt.tz_localize(None)
    #df_base_final['dfl_crte_tmst'] = pd.to_datetime(df_base_final['dfl_crte_tmst'], utc=True).dt.tz_localize(None)
    #df_base_final['event_crte_tmst'] = pd.to_datetime(df_base_final['event_crte_tmst'], utc=True).dt.tz_localize(None)
    # guardar largo del dataset
    new_df_base_final = df_base_final.assign(len_data = largo_dataset)
    # guardar tiempo de ejecución
    new_df_base_final = new_df_base_final.assign(execution_time_model = delta_time)
    
    # Metadata
    FECHA = datetime.datetime.now().strftime("%Y-%m-%d")
    dataset.metadata["updated_date"] = FECHA
    
    # Save Artifact
    new_df_base_final.to_csv(dataset.path + ".csv", index = False, encoding = 'utf-8-sig')

In [12]:
# Save tables
@component(
    packages_to_install=["pandas",
                         "pandas-gbq",
                         "google-cloud-bigquery",
                         "db-dtypes"],
    base_image = "python:3.9",
    output_component_file="save_tables.yaml" 
)
def save_tables(
    new_df_base_final:Input[Dataset],
    n_r:int,
    project_id:str,
    dataset:str,
    full_table:str, 
    random_table:str, 
    random_table_backup:str,  
    prediction_if_exists:str,
    random_if_exists:str, 
    random_backup_if_exists:str,
    output_dataset: Output[Dataset],  
):
    import pandas as pd
    import datetime
    
    new_df_base_final = pd.read_csv(new_df_base_final.path + ".csv")
    
    # cambiar formato de columnas
    # int to string
    new_df_base_final['SOC'] = new_df_base_final['SOC'].astype(str)
    # string to timestamp
    new_df_base_final['dfl_crte_tmst'] = pd.to_datetime(new_df_base_final['dfl_crte_tmst'], utc=True).dt.tz_localize(None)
    new_df_base_final['event_crte_tmst'] = pd.to_datetime(new_df_base_final['event_crte_tmst'], utc=True).dt.tz_localize(None)
    
    # generar tabla aleatoria
    df_copy = new_df_base_final.copy()
    #df_filtrado = df_copy[(df_copy.enlace != 'incorrecto')]
    df_copy=df_copy.assign(paquete_em="", etiqueta_em="", domicilio_em="", rostro_em="")
    import pandas_gbq
    # ubicación de destino para tabla procesada
    destination_full_table = dataset + '.' + full_table
    # anexar tabla de predicciones
    pandas_gbq.to_gbq(new_df_base_final, destination_full_table, project_id=project_id, if_exists = prediction_if_exists)
    #print("Se almacenó exitosamente tabla de predicciones")
    # generar tabla aleatoria
    try:
        # seleccionar pequeño conjunto de datos aleatorios de cada sample
        # intentar generar tabla aleatoria
        random_sample = df_copy.sample(n_r, frac=None, replace=False, weights=None, random_state=None)
        # destino de tabla aleatoria
        destination_random_table = dataset + '.' + random_table
        # anexar tabla aleatoria a tabla generada durante el dia
        pandas_gbq.to_gbq(random_sample, destination_random_table, project_id=project_id, 
                          if_exists = random_if_exists)
        print("Se almacenó exitosamente sub-tabla aleatoria")
        # anexar tabla aleatoria, con todas las tablas aleatorias generadas en dias anteriores
        # para tenerlas de respaldo
        destination_random_table_backup = dataset + '.' + random_table_backup
        pandas_gbq.to_gbq(random_sample, destination_random_table_backup, project_id=project_id, 
                          if_exists = random_backup_if_exists)  
        print("Se almacenó exitosamente sub-tabla aleatoria al backup")
    except:
        print("No se pudo generar tabla aleatoria (muy pocas filas)")
    
    # Metadata
    FECHA = datetime.datetime.now().strftime("%Y-%m-%d")
    output_dataset.metadata["updated_date"] = FECHA
    
    # Save Artifact
    new_df_base_final.to_csv(output_dataset.path + ".csv", index = False, encoding = 'utf-8-sig')

# Definir Máquinas

In [13]:
from google_cloud_pipeline_components.v1.custom_job import create_custom_training_job_from_component

# Convert the above component into a custom training job
ctj_set_data = create_custom_training_job_from_component(
    set_data_parallel,
    display_name = 'set_data_parallel',
    machine_type = 'n1-highcpu-16'
)

ctj_image_bgr_list = create_custom_training_job_from_component(
    open_parallel_bucket,
    display_name = 'open_parallel_bucket',
    machine_type = 'n1-highmem-16'
)

ctj_prediction_op = create_custom_training_job_from_component(
    prediction,
    display_name = 'open_parallel_bucket',
    machine_type = 'n1-highmem-16',
    accelerator_type='NVIDIA_TESLA_T4'
)

# Build Pipeline

In [14]:
@dsl.pipeline(
    pipeline_root = PIPELINE_ROOT,
    name = "pipeline-image-recognition",
)
def pipeline(
    query:str = query,
    project_id:str = PROJECT_ID,
    bucket_name:str = 'pod_images',
    n_r:int = 200,
    dataset:str = 'image_recognition',
    full_table:str = 'prediction_table_optimizado_final', 
    random_table:str = 'random_table_optimizado_final', 
    random_table_backup:str  = 'random_table_backup_optimizado_final',  
    prediction_if_exists:str = 'append',
    random_if_exists:str = 'replace', 
    random_backup_if_exists:str = 'append'
):
    # crear backup
    backup = copy_tables(project_id)
    # obtener data
    data_op = load_data(query, project_id)
    # cargar imagenes al bucket
    set_data_parallel(data_op.output, project_id)
    set_data = ctj_set_data(project=project_id).set_args(data_op.output, project_id).set_display_name('ctj_set_data')
    ##set_data = set_data_parallel(data_op.output, project_id).set_cpu_limit('16').set_memory_limit('14G') #n1-highcpu-16 $0.453 hourly
    # una vez se terminan de cargar los datos, comienza la etapa de extracción
    with dsl.Condition(
        set_data.outputs["deploy"] == "start",
        name = "leer-imagenes",
    ):
        # obtener lista de nombres
        blob_list = blob_name_list(bucket_name)
        # obtener lista de imagenes
        image_bgr_list = ctj_image_bgr_list(project=project_id,
                                           ).set_display_name('ctj_image_bgr_list').inputs(blob_list.output)
        ##image_bgr_list = open_parallel_bucket(blob_list.output).set_cpu_limit('16').set_memory_limit('104G') #n1-highmem-16 $0.719 hourly
        # image_bgr_list.output
        # obtener DataFrame de predicciones
        #n1-standard-8 + 1 NVIDIA T4
        prediction_op = ctj_prediction_op(project=project_id,
                                           ).set_display_name('ctj_prediction_op').inputs(project_id, image_bgr_list.outputs["output_path"])
        ##prediction_op = prediction(project_id, image_bgr_list.outputs["output_path"]).set_cpu_limit('8').set_memory_limit('104G').add_node_selector_constraint('cloud.google.com/gke-accelerator', 'NVIDIA_TESLA_T4')
        #prediction_op.output
        # obtener tablas: prediction, random y random backup
        df_final = table_generation(image_bgr_list.outputs["time"], prediction_op.outputs["time"], 
                                    data_op.output, prediction_op.outputs["dataset"])
        # guardar tablas en BigQuery
        save_tables(df_final.output, n_r, project_id, dataset, full_table, random_table, 
                    random_table_backup, prediction_if_exists, random_if_exists, 
                    random_backup_if_exists)

In [14]:
@dsl.pipeline(
    pipeline_root = PIPELINE_ROOT,
    name = "pipeline-image-recognition2",
)
def pipeline(
    query:str = query,
    project_id:str = PROJECT_ID,
    bucket_name:str = 'pod_images',
    n_r:int = 1000,
    dataset:str = 'image_recognition',
    full_table:str = 'prediction_table_optimizado_final', 
    random_table:str = 'random_table_optimizado_final', 
    random_table_backup:str  = 'random_table_backup_optimizado_final',  
    prediction_if_exists:str = 'append',
    random_if_exists:str = 'replace', 
    random_backup_if_exists:str = 'append'
):
    # crear backup
    backup = copy_tables(project_id)
    # obtener data
    data_op = load_data(query, project_id)
    # cargar imagenes al bucket
    set_data = set_data_parallel(data_op.output, project_id).set_cpu_limit('32').set_memory_limit('15G') 
    #e2-highcpu-16
    # una vez se terminan de cargar los datos, comienza la etapa de extracción
    with dsl.Condition(
        set_data.outputs["deploy"] == "start",
        name = "leer-imagenes",
    ):
        # obtener lista de nombres
        blob_list = blob_name_list(bucket_name)
        # obtener DataFrame de predicciones
        prediction_op = prediction(project_id, blob_list.output).set_cpu_limit('4').set_memory_limit('15G').add_node_selector_constraint('cloud.google.com/gke-accelerator', 'NVIDIA_TESLA_T4')
        #n1-standard-8 + 1 NVIDIA T4
        # obtener tablas: prediction, random y random backup
        df_final = table_generation(prediction_op.outputs["time"], 
                                    data_op.output, prediction_op.outputs["dataset"])
        # guardar tablas en BigQuery
        save_tables(df_final.output, n_r, project_id, dataset, full_table, random_table, 
                    random_table_backup, prediction_if_exists, random_if_exists, 
                    random_backup_if_exists)    

In [14]:
@dsl.pipeline(
    pipeline_root = PIPELINE_ROOT,
    name = "pipeline-image-recognition2",
)
def pipeline(
    query:str = query,
    project_id:str = PROJECT_ID,
    bucket_name:str = 'pod_images',
    n_r:int = 1000,
    dataset:str = 'image_recognition',
    full_table:str = 'prediction_table_optimizado_final', 
    random_table:str = 'random_table_optimizado_final', 
    random_table_backup:str  = 'random_table_backup_optimizado_final',  
    prediction_if_exists:str = 'append',
    random_if_exists:str = 'replace', 
    random_backup_if_exists:str = 'append'
):

    # obtener data
    data_op = load_data(query, project_id)


In [15]:
compiler.Compiler().compile(pipeline_func=pipeline, package_path='pipeline_ImageRecognition2.json')



In [16]:
start_pipeline = pipeline_jobs.PipelineJob(
    display_name = 'pipeline-image-recognition2',
    template_path = 'pipeline_ImageRecognition2.json',
    enable_caching = False, # poner False cuando se pase a producción
    location = REGION,
)

In [17]:
start_pipeline.submit()

Creating PipelineJob
PipelineJob created. Resource name: projects/1003479373544/locations/us-central1/pipelineJobs/pipeline-image-recognition2-20230117211227
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/1003479373544/locations/us-central1/pipelineJobs/pipeline-image-recognition2-20230117211227')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/pipeline-image-recognition2-20230117211227?project=1003479373544


# Pipeline lectura por batches

In [None]:
@kfl.dsl.pipeline(
    pipeline_root = PIPELINE_ROOT,
    name = 'pipeline-ImageReconition',
)
def pipeline(
    query:str,
    project_id:str = PROJECT_ID,
    date:str,
    bucket_name:str = 'pod_images',
    year:int,
    month:int,
    day:int,
    n_p:int = 10,
    n_r:int = 1,
    dataset:str = 'image_recognition',
    full_table:str = 'prediction_table_optimizado_final_pipeline', 
    random_table:str = 'random_table_optimizado_final_pipeline', 
    random_table_backup:str  = 'random_table_backup_optimizado_final_pipeline',  
    prediction_if_exists:str = 'append',
    random_if_exists:str = 'replace', 
    random_backup_if_exists:str = 'append'
):
    data_op = load_data(query, project_id)
    blob_list = blob_name_list(date, bucket_name, year, month, day)
    # particionar lista de nombres
    partitions = [blob_list.output[i:i + n_p] for i in range(0, len(blob_list.output[0:100]), n_p)]
    # df_base
    #df_prediction = pd.DataFrame(columns = ['url','paquete', 's_paquete', 'etiqueta_producto',
    #                                      's_etiqueta_producto', 'sin_rostro', 's_sin_rostro',
    #                                      'numero_domicilio', 's_numero_domicilio', 'contexto',
    #                                      'ctx_value', 'score'])
    # recorrer particiones (sub-listas) generadas
    for partition in partitions:
        # guardar imágenes en una lista
        image_bgr_list = open_parallel_bucket(partition, bucket_name)
        # se genera DataFrame
        prediction_op = prediction(project_id, image_bgr_list.output)
        # unión de DataFrames
        #df_prediction = pd.concat((df_prediction, prediction_op.output), ignore_index= True)
        df_prediction = prediction_op.output
        print('se ejecutó partición')
    
    df_final = table_generation(data_op.output, df_prediction)
    save_tables(df_final.output, n_r, project_id, dataset, full_table, random_table, 
                random_table_backup, prediction_if_exists, random_if_exists, 
                random_backup_if_exists)

SyntaxError: non-default argument follows default argument (1048305445.py, line 6)