In [40]:
! pip3 install --upgrade google-cloud-aiplatform  \
                                 google-cloud-storage \
                                 kfp \
                                 google-cloud-pipeline-components



In [15]:
from kfp import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics

from google.cloud import aiplatform as aip
from typing import NamedTuple

In [2]:
PROJECT_ID = 'gcp-ccai-auto-ml-contactcenter'
REGION= "europe-west3"
REPO_NAME = "repo-demo3"
SERVICE_ACCOUNT = "944308723981-compute@developer.gserviceaccount.com"
BUCKET_NAME = "ccai-storage"
PIPELINE_NAME = "processing_pipeline"
YAML_NAME = f"{PIPELINE_NAME}.yml"
PIPELINE_ROOT = f"gs://{BUCKET_NAME}/pipeline_root/"
DISPLAY_NAME = PIPELINE_NAME.replace("_", "-")
PIPELINE_ROOT

'gs://ccai-storage/pipeline_root/'

In [3]:
# Set the project id
! gcloud config set project {PROJECT_ID}

Updated property [core/project].


In [4]:
!gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_NAME
!gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_NAME

CommandException: Incorrect public member type for binding ccai-storage:
CommandException: Incorrect public member type for binding ccai-storage:


In [5]:
aip.init(project=PROJECT_ID, staging_bucket=PIPELINE_ROOT, location=REGION)

In [6]:
@component(base_image=f"{REGION}-docker.pkg.dev/{PROJECT_ID}/{REPO_NAME}/data_preparation:latest")

def data_preprocessing(
    bucket_name: str,
    file_path: str,
    folder: str,
    parquet_file_name: str, 
    processed_dataset: Output[Artifact]
):  
    import logging
    
    from processing.data_preparation import GCSParquetLoader
    
    logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s\n')
    
    processor = GCSParquetLoader(bucket_name, file_path, folder, parquet_file_name)
    processed_dataset.uri = processor.save_df_to_gcs_parquet()
    print("--")


In [7]:
@component(base_image=f"{REGION}-docker.pkg.dev/{PROJECT_ID}/{REPO_NAME}/data_tokenization:latest")

def data_tokenization(
    bucket_name: str,
    file_path: Input[Artifact],
    folder: str,
    parquet_file_name: str,
    tokenized_dataset: Output[Artifact]
):  
    import logging
    from processing.tokenization import TokenizationProcessor
    
    logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s\n')
    
    processor = TokenizationProcessor(bucket_name, file_path.uri, folder, parquet_file_name)
    #processor.save_df_to_gcs_parquet()
    
    print("--")
    tokenized_dataset.uri = processor.save_df_to_gcs_parquet()


In [8]:
@component(base_image=f"{REGION}-docker.pkg.dev/{PROJECT_ID}/{REPO_NAME}/data_sentiment:latest")

def data_sentiment(
    bucket_name: str,
    file_path: Input[Artifact],
    folder: str,
    parquet_file_name: str,
    text_column: str,
    num_doc: int,
    sentiment_dataset: Output[Artifact]
):  
    import logging
    from processing.sentiment import GCSSentimentAnalyzer

    logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s\n')
    
    processor = GCSSentimentAnalyzer(bucket_name, file_path.uri, folder, parquet_file_name, 
                                    text_column, num_doc)
    
    
    print("--")
    sentiment_dataset.uri = processor.process()


In [9]:
@component(base_image=f"{REGION}-docker.pkg.dev/{PROJECT_ID}/{REPO_NAME}/data_moderate:latest")

def data_moderate(
    bucket_name: str,
    file_path: Input[Artifact],
    folder: str,
    parquet_file_name: str,
    text_column: str,
    num_doc: int,
    moderate_dataset: Output[Artifact]
):  
    import logging
    from processing.moderate import GCSTextModerationLoader

    logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s\n')
    
    processor = GCSTextModerationLoader(bucket_name, file_path.uri, folder, parquet_file_name, 
                                    text_column, num_doc)
    
    print("--")
    moderate_dataset.uri = processor.process()


In [10]:
@component(base_image=f"{REGION}-docker.pkg.dev/{PROJECT_ID}/{REPO_NAME}/data_entities:latest")

def data_entities(
    bucket_name: str,
    file_path: Input[Artifact],
    folder: str,
    parquet_file_name: str,
    text_column: str,
    num_doc: int,
    entities_dataset: Output[Artifact]
):  
    import logging
    from processing.entities import GCSCEntityAnalyzer

    logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s\n')
    print(f"------ file_path.uri = {file_path.uri}")
    processor = GCSCEntityAnalyzer(bucket_name, file_path.uri, folder, parquet_file_name, 
                                    text_column, num_doc)
    
    
    entities_dataset.uri = processor.process()
    print("--")


In [11]:
@component(base_image=f"{REGION}-docker.pkg.dev/{PROJECT_ID}/{REPO_NAME}/data_bigquery:latest")

def data_bigquery(
    bucket_name: str,
    file_path: Input[Artifact],
    folder: str,
    parquet_file_name: str,
    project_id: str,
    dataset_id: str,
    table_id: str,
    location: str,
    bigquery_table: Output[Artifact]
    
):  
    
    import logging
    from processing.bigquery import GCS_Bigquery

    logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s\n')
    print(f"------ file_path.uri = {file_path.uri}")
    print("--")
    processor = GCS_Bigquery(bucket_name, file_path.uri, folder, parquet_file_name, 
                         project_id, dataset_id, table_id, location)
    
    
    bigquery_table.uri = processor.upload_dataframe_to_bigquery()
    

In [12]:
@dsl.pipeline(
    pipeline_root=PIPELINE_ROOT,
    name=DISPLAY_NAME,
    description="Data preprocessing"
)
def pipeline():
    import logging
    bucket_name = 'ccai-storage'
    file_path = 'fabio/articlesoutputv3.parquet'
    folder = 'pipeline'
    PROJECT_ID = 'gcp-ccai-auto-ml-contactcenter' 
    dataset_id = "datasetnlp"
    table_id = "stepfinalbq"
    text_column = 'body_pre'
    location = "europe-west3"
    num_doc=10
    
    logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s\n')
    
    output_processing = 'step1_pipeline.parquet'
    output_tokenization = 'step2_pipeline.parquet'
    output_sentiment = 'step3_pipeline.parquet'
    output_moderate = 'step4_pipeline.parquet'
    output_entities = 'step5_pipeline.parquet'
    output_final = 'pipeline/step_final_bq.parquet'

    
    processing_op = data_preprocessing(bucket_name=bucket_name, 
                                       file_path=file_path, 
                                       folder=folder, 
                                       parquet_file_name=output_processing)
    
    tokenization_op = data_tokenization(bucket_name=bucket_name, 
                                        file_path=processing_op.outputs["processed_dataset"], 
                                        folder=folder, 
                                        parquet_file_name=output_tokenization)
                                        
    sentiment_op = data_sentiment(bucket_name=bucket_name, 
                                  file_path=tokenization_op.outputs["tokenized_dataset"], 
                                  folder=folder, 
                                  parquet_file_name=output_sentiment,
                                  text_column=text_column, num_doc=num_doc)
    
    moderate_op = data_moderate(bucket_name=bucket_name, 
                                    file_path=sentiment_op.outputs["sentiment_dataset"], 
                                    folder=folder, 
                                    parquet_file_name=output_moderate,
                                    text_column=text_column, num_doc=num_doc)
        
    entities_op = data_entities(bucket_name=bucket_name, 
                                file_path=moderate_op.outputs["moderate_dataset"], 
                                folder=folder, 
                                parquet_file_name=output_entities,
                                text_column=text_column, num_doc=num_doc)
    
    bigquery_op = data_bigquery(bucket_name=bucket_name, 
                                file_path=entities_op.outputs['entities_dataset'], 
                                folder=folder, 
                                parquet_file_name=output_final, 
                                project_id=PROJECT_ID, 
                                dataset_id=dataset_id, 
                                table_id=table_id,
                                location=location)
    


In [14]:
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path=YAML_NAME
)

job = aip.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path=YAML_NAME,
    pipeline_root=PIPELINE_ROOT,
    enable_caching=,
)

job.submit(service_account=SERVICE_ACCOUNT)

Creating PipelineJob


INFO: Creating PipelineJob



PipelineJob created. Resource name: projects/944308723981/locations/europe-west3/pipelineJobs/processing-pipeline-20240323014813


INFO: PipelineJob created. Resource name: projects/944308723981/locations/europe-west3/pipelineJobs/processing-pipeline-20240323014813



To use this PipelineJob in another session:


INFO: To use this PipelineJob in another session:



pipeline_job = aiplatform.PipelineJob.get('projects/944308723981/locations/europe-west3/pipelineJobs/processing-pipeline-20240323014813')


INFO: pipeline_job = aiplatform.PipelineJob.get('projects/944308723981/locations/europe-west3/pipelineJobs/processing-pipeline-20240323014813')



View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/europe-west3/pipelines/runs/processing-pipeline-20240323014813?project=944308723981


INFO: View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/europe-west3/pipelines/runs/processing-pipeline-20240323014813?project=944308723981

