In [64]:
from google.cloud import aiplatform
from google.cloud import storage
import json
import requests

# Initialize the client with your project and location
project = 'eduardom-playground'
location = 'us-central1'
pipeline_client = aiplatform.gapic.PipelineServiceClient(client_options={"api_endpoint": f"{location}-aiplatform.googleapis.com"})
gcs_client = storage.Client()
bucket_name = 'rag-demo-bucket-oiajc'
flask_app_url = 'http://34.41.37.220:5000'


In [65]:
from kfp import dsl, compiler
from kfp.dsl import component
from typing import NamedTuple


@component(packages_to_install=["google-cloud-storage"])
def read_tasks_from_gcs(bucket_name: str, file_path: str) -> str:
    from google.cloud import storage
    
    gcs_client = storage.Client()
    bucket = gcs_client.get_bucket(bucket_name)
    blob = bucket.blob(file_path)
    tasks_json = blob.download_as_text()
    return tasks_json

@component(packages_to_install=["requests"])
def evaluate_tasks(flask_app_url: str, tasks_json: str) -> list:
    import json
    import requests
    
    tasks = json.loads(tasks_json)
    results = []
    for task in tasks['prompts']:
        response = requests.post(f"{flask_app_url}/", json={"query": task})
        results.append(response.text)
    return results

@component(packages_to_install=["google-cloud-storage"])
def write_results_to_gcs(bucket_name: str, results_json: list, output_file_path: str) -> str:
    from google.cloud import storage
    
    gcs_client = storage.Client()
    bucket = gcs_client.get_bucket(bucket_name)
    blob = bucket.blob(output_file_path)
    result_test = "=========Email==========\n".join(results_json)
    blob.upload_from_string(result_test, content_type='application/json')
    return f"Results stored in: gs://{bucket_name}/{output_file_path}"

@dsl.pipeline(
    name='rag-evaluation-pipeline',
    description='A pipeline that evaluates tasks using a RAG system through a Flask app.'
)
def rag_evaluation_pipeline(
    bucket_name: str,
    input_file_path: str,
    output_file_path: str
):
    tasks_json = read_tasks_from_gcs(bucket_name=bucket_name, file_path=input_file_path)
    results_json = evaluate_tasks(flask_app_url=flask_app_url, tasks_json=tasks_json.output)
    result_write = write_results_to_gcs(bucket_name=bucket_name, results_json=results_json.output, output_file_path=output_file_path)


In [66]:
compiler.Compiler().compile(pipeline_func=rag_evaluation_pipeline, package_path='rag_evaluation_pipeline.json')

pipeline_job = aiplatform.PipelineJob(
    enable_caching=False,
    display_name="rag_evaluation_pipeline",
    template_path="rag_evaluation_pipeline.json",
    pipeline_root=f"gs://{bucket_name}/pipeline_root",
    parameter_values={
        'bucket_name': bucket_name,
        'input_file_path': 'test/input_file.json',
        'output_file_path': 'test/output_file.json'
    }
)

pipeline_job.run()


Creating PipelineJob
PipelineJob created. Resource name: projects/170650767387/locations/us-central1/pipelineJobs/rag-evaluation-pipeline-20240403045207
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/170650767387/locations/us-central1/pipelineJobs/rag-evaluation-pipeline-20240403045207')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/rag-evaluation-pipeline-20240403045207?project=170650767387
PipelineJob projects/170650767387/locations/us-central1/pipelineJobs/rag-evaluation-pipeline-20240403045207 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/170650767387/locations/us-central1/pipelineJobs/rag-evaluation-pipeline-20240403045207 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/170650767387/locations/us-central1/pipelineJobs/rag-evaluation-pipeline-20240403045207 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/17