In [1]:
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Orchestrate recurring GDELT Vector Store Data Updates with Vertex Pipelines

<table align="left">
  <td>
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/generative-ai/blob/main/language/intro_palm_api.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Colab logo"> Run in Colab
    </a>
  </td>
  <td>
    <a href="https://github.com/GoogleCloudPlatform/generative-ai/blob/main/language/intro_palm_api.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo">
      View on GitHub
    </a>
  </td>
  <td>
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/generative-ai/blob/main/language/intro_palm_api.ipynb">
      <img src="https://lh3.googleusercontent.com/UiNooY4LUgW_oTvpsNhPpQzsstV5W8F7rYgxgGBD85cWJoLmrOzhVs_ksK_vgx40SHs7jCqkTkCk=e14-rj-sc0xffffff-h130-w32" alt="Vertex AI logo">
      Open in Vertex AI Workbench
    </a>
  </td>
</table>

## Overview

<center>
<img src="imgs/zghost_overview_pipeline_steps.png" width="1200"/>
</center>
Given that the GDELT dataset is constantly updated with new information every 15 min, you may want to consider options for how to automate adding new data to your vector store on an regularly recurring basis. In this notebook we combine the previous notebook steps into a pipeline which will check to see if new data is available, if it is available it will generate the embeddings and upload the new vectors to the matching engine instance.

Now that we have created the `Dockerfile_gdelt` image, this can be used to create the [Vertex AI Pipeline](https://cloud.google.com/vertex-ai/docs/pipelines/build-pipeline) (as the base image) to orchestrate the previous notebooks:
1. [Setup Vertex Vector Store](https://github.com/GoogleCloudPlatform/generative-ai/blob/main/language/intro_palm_api.ipynb)
2. [GDELT DataOps](https://github.com/GoogleCloudPlatform/generative-ai/blob/main/language/intro_palm_api.ipynb)
3. [Vector Store Index Loader](https://github.com/GoogleCloudPlatform/generative-ai/blob/main/language/intro_palm_api.ipynb)
4. [Alternative document format embeddings](https://github.com/GoogleCloudPlatform/generative-ai/blob/main/language/intro_palm_api.ipynb)

Note that this pipeline will not recreate the resources each time it runs - if you have already created the Matching Engine Index, Index Endpoint, Vector Store, as long as the specification remains the same the pipeline will continue to update the existing resources. 

If you want to generate the resources for a NEW GDELT extraction + Vector Store DB, you'll want to update the input parameters accordingly. 

To get started with Vertex Pipelines, see [the Google Cloud Vertex AI Pipeline Samples](https://github.com/GoogleCloudPlatform/vertex-ai-samples/tree/main/notebooks/official/pipelines).

This type of pipeline could also be considered a data engineering pipeline in that it is largely focused around extracting updated data and inserting it into the Vector Store - depending on the skillsets and preferences of a team, it could also make sense to use [Cloud Composer](https://cloud.google.com/composer) which offers managed Apache Airflow for workflow orchestration of data engineering workflows. 

---

### Objectives
This notebook will:
- Use the previously built custom container image (stored in the Artifact Registry) as the base image in pipeline step
- Define Vertex Pipeline with the following components:
    - Extracts all articles fitting specified criteria
    - Calls the `GDELT` class to scrape the text and load it to a BigQuery table
    - Create matching engine resources if they don't exist
    - Load the vectors to the index
- Set the pipeline arguments
- Compile the pipeline
- Submit the pipeline job

<center>
<img src="imgs/pipeline-complete.png" width="1000"/>
</center>

After running this pipeline, you will be able to take advantage of Vertex AI Pipeline's automatic ML Metadata tracking - allowing you to track artifacts, changes, and metadata for each pipeline run over time. 

<center>
<img src="imgs/pipeline_metadata.png" width="400"/>
</center>

### Costs
This tutorial uses billable components of Google Cloud:

* Cloud Build
* Artifact Registry
* Vertex AI Pipelines
* BigQuery Storage & Compute
* Vertex AI Matching Engine

Learn about [Vertex AI pricing](https://cloud.google.com/vertex-ai/pricing),
and use the [Pricing Calculator](https://cloud.google.com/products/calculator/)
to generate a cost estimate based on your projected usage.

## Getting Started
**Colab only:** Uncomment the following cell to restart the kernel. For Vertex AI Workbench you can restart the terminal using the button on top. 

In [2]:
# # Automatically restart kernel after installs so that your environment can access the new packages
# import IPython

# app = IPython.Application.instance()
# app.kernel.do_shutdown(True)

### Authenticating your notebook environment
* If you are using **Colab** to run this notebook, uncomment the cell below and continue.
* If you are using **Vertex AI Workbench**, check out the setup instructions [here](https://github.com/GoogleCloudPlatform/generative-ai/tree/main/setup-env).

In [3]:
# from google.colab import auth
# auth.authenticate_user()

### Make sure you edit the values below
Each time you run the notebook for the first time with new variables, you just need to edit the actor prefix and version variables below. They are needed to grab all the other variables in the notebook configuration.

In [4]:
# CREATE_NEW_ASSETS        = True # True | False
ACTOR_PREFIX             = "way"
VERSION                  = 'v1'

# print(f"CREATE_NEW_ASSETS  : {CREATE_NEW_ASSETS}")
print(f"ACTOR_PREFIX       : {ACTOR_PREFIX}")
print(f"VERSION            : {VERSION}")

ACTOR_PREFIX       : way
VERSION            : v1


### Load configuration settings from setup notebook
Set the constants used in this notebook and load the config settings from the `00-env-setup.ipynb` notebook.

In [62]:
# staging GCS
GCP_PROJECTS             = !gcloud config get-value project
PROJECT_ID               = GCP_PROJECTS[0]

BUCKET_NAME              = f'zghost-{ACTOR_PREFIX}-{VERSION}-{PROJECT_ID}'
BUCKET_URI               = f'gs://{BUCKET_NAME}'

config = !gsutil cat {BUCKET_URI}/config/notebook_env.py
print(config.n)
exec(config.n)

print(f"BUCKET_NAME        : {BUCKET_NAME}")
print(f"BUCKET_URI         : {BUCKET_URI}")


PROJECT_ID               = "wortz-project-352116"
PROJECT_NUM              = "679926387543"
LOCATION                 = "us-central1"

REGION                   = "us-central1"
BQ_LOCATION              = "US"
VPC_NETWORK_NAME         = "me-network"

CREATE_NEW_ASSETS        = "True"
ACTOR_PREFIX             = "way"
VERSION                  = "v1"
ACTOR_NAME               = "wayfair"
ACTOR_CATEGORY           = "retail"

BUCKET_NAME              = "zghost-way-v1-wortz-project-352116"
EMBEDDING_DIR_BUCKET     = "zghost-way-v1-wortz-project-352116-emd-dir"

BUCKET_URI               = "gs://zghost-way-v1-wortz-project-352116"
EMBEDDING_DIR_BUCKET_URI = "gs://zghost-way-v1-wortz-project-352116-emd-dir"

VPC_NETWORK_FULL         = "projects/679926387543/global/networks/me-network"

ME_INDEX_NAME            = "vectorstore_way_v1"
ME_INDEX_ENDPOINT_NAME   = "vectorstore_way_v1_endpoint"
ME_DIMENSIONS            = "768"

MY_BQ_DATASET            = "zghost_way_v1"
MY_BQ_TRENDS_DATASET     = "zghos

### Pipeline Base Image

Here is the base image we will use for each pipeline component (step). This will a llow us to access the zeitghost codebase within the pipeline step

In [63]:
PIPELINE_BASE_IMAGE = f"{REGION}-docker.pkg.dev/{PROJECT_ID}/zghost-{ACTOR_PREFIX}/gdelt-pipe-{VERSION}:latest"

print(f"PIPELINE_BASE_IMAGE: {PIPELINE_BASE_IMAGE}")

PIPELINE_BASE_IMAGE: us-central1-docker.pkg.dev/wortz-project-352116/zghost-way/gdelt-pipe-v1:latest


### Import Packages

In [64]:
import os
import sys
import json
from datetime import datetime
from time import time
import pandas as pd
# disable INFO and DEBUG logging everywhere
import logging

logging.disable(logging.WARNING)

from google.cloud import aiplatform as vertex_ai
from google.cloud import storage
from google.cloud import bigquery

# Pipelines
from typing import Any, Callable, Dict, NamedTuple, Optional, List

# Kubeflow SDK
import kfp
# from kfp.v2 import dsl
import kfp.v2.dsl
from kfp.v2.google import client as pipelines_client
from kfp.v2.dsl import (
    Artifact, Dataset, Input, 
    InputPath, Model, Output,
    OutputPath, component
)

In [65]:
print(f'kfp version: {kfp.__version__}')
print(f'vertex_ai SDK version: {vertex_ai.__version__}')
print(f'bigquery SDK version: {bigquery.__version__}')

kfp version: 1.8.22
vertex_ai SDK version: 1.25.0
bigquery SDK version: 3.10.0


Instantiate Google cloud SDK clients

In [66]:
# cloud storage client
storage_client = storage.Client(project=PROJECT_ID)

# Vertex client
vertex_ai.init(project=PROJECT_ID, location=LOCATION)

# bigquery client
bqclient = bigquery.Client(
    project=PROJECT_ID,
    # location=LOCATION
)

## Create Pipeline Component Steps

Update root directory path

In [67]:
root_path = '..'
os.chdir(root_path)
os.getcwd()

'/home/jupyter'

### Component for Setting Configuration

In [68]:
@kfp.v2.dsl.component(
    base_image=f"{REGION}-docker.pkg.dev/{PROJECT_ID}/zghost-{ACTOR_PREFIX}/gdelt-pipe-{VERSION}:latest"
)
def set_config(
    project: str
    , project_num: str
    , location: str
    , version: str
    , actor_prefix: str
    , actor_name: str
    , vpc_network_name: str
    , create_new_assets: str
) -> NamedTuple('Outputs', [
    ('bucket_name', str)
    , ('bucket_uri', str)
    , ('emb_bucket', str)
    , ('emb_bucket_uri', str)
    , ('vpc_network_full', str)
    , ('me_index_name', str)
    , ('me_index_endpoint_name', str)
    , ('me_dimensions', int)
    , ('my_bq_dataset', str)
]):
    
    import numpy as np
    import logging
    import pandas as pd
    import uuid
    import json
    
    from google.cloud import aiplatform as vertex_ai
    from google.cloud import storage
    from google.cloud import bigquery
    
    # ==========================
    # set SDK clients
    # ==========================
    
    # cloud storage client
    storage_client = storage.Client(project=project)

    # Vertex client
    vertex_ai.init(project=project, location=location)

    # bigquery client
    bqclient = bigquery.Client(
        project=project,
        # location=LOCATION
    )
    
    # ==========================
    # set names and variables
    # ==========================
    
    # staging GCS
    BUCKET_NAME              = f'zghost-{actor_prefix}-{version}-{project}'
    BUCKET_URI               = f'gs://{BUCKET_NAME}'

    # bucket to stash emb files
    EMBEDDING_DIR_BUCKET     = f'{BUCKET_NAME}-emd-dir'
    EMBEDDING_DIR_BUCKET_URI = f'gs://{EMBEDDING_DIR_BUCKET}'

    # vpc network
    VPC_NETWORK_FULL         = f"projects/{project_num}/global/networks/{vpc_network_name}"

    # matching engine vector store
    ME_INDEX_NAME            = f"vectorstore_{actor_prefix}_{version}"
    ME_DIMENSIONS            = 768 # when using Vertex PaLM Embedding
    ME_INDEX_ENDPOINT_NAME   = f"{ME_INDEX_NAME}_endpoint"

    # bigquery
    MY_BQ_DATASET            = BUCKET_NAME.lower().replace(project,"").replace("-","_").rstrip("_")
    
    logging.info(f"BUCKET_NAME               : {BUCKET_NAME}")
    logging.info(f"BUCKET_URI                : {BUCKET_URI}")
    logging.info(f"EMBEDDING_DIR_BUCKET      : {EMBEDDING_DIR_BUCKET}")
    logging.info(f"EMBEDDING_DIR_BUCKET_URI  : {EMBEDDING_DIR_BUCKET_URI}\n")
    logging.info(f"VPC_NETWORK_FULL          : {VPC_NETWORK_FULL}")
    logging.info(f"ME_INDEX_NAME             : {ME_INDEX_NAME}")
    logging.info(f"ME_DIMENSIONS             : {ME_DIMENSIONS}")
    logging.info(f"ME_INDEX_ENDPOINT_NAME    : {ME_INDEX_ENDPOINT_NAME}")
    logging.info(f"MY_BQ_DATASET             : {MY_BQ_DATASET}")
    
    if create_new_assets == "True":
    
        # ==========================
        # create GCS assets
        # ==========================

        # create staging GCS bucket
        bucket = storage_client.bucket(BUCKET_NAME)
        new_bucket = storage_client.create_bucket(bucket, location=location)
        logging.info(f"Created bucket {new_bucket.name} in {new_bucket.location}")

        # create embedding dir GCS bucket
        bucket = storage_client.bucket(EMBEDDING_DIR_BUCKET)
        new_bucket = storage_client.create_bucket(bucket, location=location)
        logging.info(f"Created bucket {new_bucket.name} in {new_bucket.location}")

        # ================================
        # create initial embedding vector
        # ================================
        # dummy embedding
        init_embedding = {
            "id": str(uuid.uuid4()),
            "embedding": list(np.zeros(ME_DIMENSIONS))
        }

        LOCAL_JSON_FILE = "embeddings_0.json"

        # dump embedding to a local file
        with open(LOCAL_JSON_FILE, "w") as f:
            json.dump(init_embedding, f)

        GCS_BLOB = f'init_index/{LOCAL_JSON_FILE}'

        bucket_client = storage_client.bucket(EMBEDDING_DIR_BUCKET)
        blob = bucket_client.blob(GCS_BLOB)
        blob.upload_from_filename(LOCAL_JSON_FILE)
        logging.info(f"uploaded init embedding to gs://{EMBEDDING_DIR_BUCKET}/{GCS_BLOB}")

        # ================================
        # Create BQ dataset
        # ================================
        ds = bigquery.Dataset(f"{project}.{MY_BQ_DATASET}")
        ds.location = 'us' #Multi-region is REGION[0:2]
        ds = bqclient.create_dataset(dataset = ds, exists_ok = False)

        ds.full_dataset_id
        logging.info(f"created {ds.full_dataset_id}")
        
    else:
        logging.info("No new assets created...")
    
    return (
        BUCKET_NAME
        , BUCKET_URI
        , EMBEDDING_DIR_BUCKET
        , EMBEDDING_DIR_BUCKET_URI
        , VPC_NETWORK_FULL
        , ME_INDEX_NAME
        , ME_INDEX_ENDPOINT_NAME
        , ME_DIMENSIONS
        , MY_BQ_DATASET
    )


### Component to Extract and Scrape GDELT Events Articles

In [69]:
@kfp.v2.dsl.component(
    base_image=f"{REGION}-docker.pkg.dev/{PROJECT_ID}/zghost-{ACTOR_PREFIX}/gdelt-pipe-{VERSION}:latest"
)
def create_gdelt_events_table(
    project: str
    , location: str
    , version: str
    , bq_dataset: str
    , actor_prefix: str
    , actor_name: str
    , min_date: str
    , max_date: str
    , test_table_prefix: str
    , bq_location: str = 'US'
) -> NamedTuple('Outputs', [
    ('raw_articles_table', str)
    , ('scraped_articles_table', str),
]):
    import logging
    # ====================================
    # import packages
    # ====================================
    logging.info(f"importing packages...")
    
    from google.cloud import aiplatform as vertex_ai
    from google.cloud import storage
    from google.cloud import bigquery as bq
    
    from datetime import datetime
    import time
    import pandas as pd
    
    from zeitghost.gdelt.GdeltData import GdeltData
    from zeitghost.bigquery.BigQueryAccessor import BigQueryAccessor

    vertex_ai.init(
        project=project,
        location=location,
    )
    # bigquery client
    bqclient = bq.Client(
        project=project,
        # location=LOCATION
    )
    # ====================================
    # setting vars
    # ====================================
    logging.info(f"setting vars...")
    
    TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
    
    if test_table_prefix == 'True':
        GDELT_TABLE_NAME = f'test_events_gdelt_{actor_prefix}_{version}'
    else:
        GDELT_TABLE_NAME = f'events_gdelt_{actor_prefix}_{version}'
    
    # # Define table name, in format dataset.table_name
    GDELT_TABLE_REF = f'{project}.{bq_dataset}.{GDELT_TABLE_NAME}'
    SCRAPED_GDELT_TABLE_REF = f'{project}.{bq_dataset}.scraped_{GDELT_TABLE_NAME}'
    
    logging.info(f"actor_prefix: {actor_prefix}")
    logging.info(f"GDELT_TABLE_NAME: {GDELT_TABLE_NAME}")
    logging.info(f"GDELT_TABLE_REF: {GDELT_TABLE_REF}")
    logging.info(f"SCRAPED_GDELT_TABLE_REF: {SCRAPED_GDELT_TABLE_REF}")
    
    # ====================================
    # GDELT BigQueryAccessor
    # ====================================
    logging.info(f"creating GDELT BigQueryAccessor...")
    
    geg_data_accessor = BigQueryAccessor(
        project
        , gdelt_project_id='gdelt-bq'
        , gdelt_dataset_id='gdeltv2'
        , gdelt_table_name='events' # geg_gcnlapi
    )
    
    logging.info("getting articles df...")
    start = time.time()
    gdelt_events_accessor = geg_data_accessor.get_records_from_actor_keyword_df(
        actor_name
        , min_date #= "2023-05-01"
        , max_date #= "2023-05-26"
    )
    end = time.time()
    logging.info(f"elapsed time: {end - start}")
    logging.info(f"gdelt_events_accessor.shape: {gdelt_events_accessor.shape}")
    
    # ====================================
    # Load data to BQ
    # ====================================
    logging.info(f"loading GDELT event articles df to {GDELT_TABLE_REF}")
    job = bqclient.load_table_from_dataframe(
        gdelt_events_accessor
        , GDELT_TABLE_REF
    )
    job.result()  # Wait for the job to complete.
    
    # update table description
    TABLE_DESCRIPTION = f"articles publsihed between `{min_date}` to `{max_date}` mentioning {actor_name} from `gdelt-bq.gdeltv2.events"
    logging.info(f"updating {GDELT_TABLE_REF} description to: {TABLE_DESCRIPTION}")
    
    table = bqclient.get_table(GDELT_TABLE_REF)  # API request
    table.description = f'{TABLE_DESCRIPTION}'
    table = bqclient.update_table(table, ["description"])  # API request
    logging.info(f"{GDELT_TABLE_REF} description updated")
    
    if gdelt_events_accessor.shape[0] > 0:
        logging.info(f"gdelt_events_accessor.shape[0] greater than zero")
    
        # ====================================
        # GDELT scraper
        # ====================================
        logging.info(f"Scraping GDELT articles for: {SCRAPED_GDELT_TABLE_REF}")
        start = time.time()

        gdelt_data_processor = GdeltData(
            gdelt_events_accessor
            , destination_table=SCRAPED_GDELT_TABLE_REF
            , destination_dataset=bq_dataset
        )

        end = time.time()
        logging.info(f"elapsed time: {end - start}")

        events_full_source_df = pd.DataFrame(gdelt_data_processor.full_source_data)
        logging.info(f"events_full_source_df.shape: {events_full_source_df.shape}")
    
    # if events_full_source_df.shape[0] > 0:
    #     logging.info(f"events_full_source_df.shape[0] greater than zero")
    
        events_full_source_df = events_full_source_df.loc[events_full_source_df['article_count'] != 0]
        logging.info(f"events_full_source_df.shape: {events_full_source_df.shape}")

        events_full_source_article_list = events_full_source_df['articles']
        logging.info(f"len(geg_articles_article_list): {len(events_full_source_article_list)}")

        # ====================================
        # format df to load to BQ
        # ====================================
        logging.info(f"format df to load to BQ...")
        article_rows = []

        for entry in events_full_source_article_list:
            for ent in entry:
                article_rows.append(ent)

        if len(article_rows) > 0:
            # article_rows[0]
            events_source_article_df = pd.DataFrame(article_rows)
            events_source_article_df.drop(columns='authors', inplace=True) # TODO - fix
            events_source_article_df['source']=events_source_article_df['url']

            logging.info(f"events_source_article_df.shape: {events_source_article_df.shape}")

            # ====================================
            # Load data to BQ
            # ====================================
            logging.info(f"loading scraped articles df to {SCRAPED_GDELT_TABLE_REF}")
            job = bqclient.load_table_from_dataframe(
                events_source_article_df
                , SCRAPED_GDELT_TABLE_REF
            )
            job.result()  # Wait for the job to complete.

            logging.info(f"scraped dataframe loaded to: {SCRAPED_GDELT_TABLE_REF}")

            # ====================================
            # update table description
            # ====================================
            TABLE_DESCRIPTION = f"scraped articles mentioning {actor_name} published between`{min_date}` to `{max_date}` from `gdelt-bq.gdeltv2.events"
            logging.info(f"updating {SCRAPED_GDELT_TABLE_REF} description to: {TABLE_DESCRIPTION}")

            table = bqclient.get_table(SCRAPED_GDELT_TABLE_REF)  # API request
            table.description = f'{TABLE_DESCRIPTION}'
            table = bqclient.update_table(table, ["description"])  # API request
            logging.info(f"{SCRAPED_GDELT_TABLE_REF} description updated")

        else:
            print("No new articles...")
            SCRAPED_GDELT_TABLE_REF = ''

    else:
        print("No new articles...")
        SCRAPED_GDELT_TABLE_REF = ''
    
    return(
        GDELT_TABLE_REF
        , SCRAPED_GDELT_TABLE_REF
    )

### Component to Extract and Scrape GDELT GEG (Entity) Articles

In [70]:
@kfp.v2.dsl.component(
    base_image=f"{REGION}-docker.pkg.dev/{PROJECT_ID}/zghost-{ACTOR_PREFIX}/gdelt-pipe-{VERSION}:latest"
)
def create_geg_article_table(
    project: str
    , location: str
    , version: str
    , bq_dataset: str
    , actor_prefix: str
    , actor_name: str
    , min_date: str
    , max_date: str
    , test_table_prefix: str
    , bq_location: str = 'US'
) -> NamedTuple('Outputs', [
    ('raw_articles_table', str)
    , ('scraped_articles_table', str)
]):

    import logging
    # ====================================
    # import packages
    # ====================================
    
    logging.info(f"importing packages...")
    from google.cloud import aiplatform as vertex_ai
    from google.cloud import storage
    from google.cloud import bigquery as bq
    
    from datetime import datetime
    import time
    import pandas as pd
    
    from zeitghost.gdelt.GdeltData import GdeltData
    from zeitghost.bigquery.BigQueryAccessor import BigQueryAccessor
    
    # ======================================
    # set SDK clients & variables
    # ======================================
    vertex_ai.init(
        project=project,
        location=location,
    )
    # bigquery client
    bqclient = bq.Client(
        project=project,
        # location=LOCATION
    )
    
    TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
    
    if test_table_prefix == 'True':
        GDELT_TABLE_NAME = f'test_geg_articles_{actor_prefix}_{version}'
    else:
        GDELT_TABLE_NAME = f'geg_articles_{actor_prefix}_{version}'
    
    # # Define table name, in format dataset.table_name
    GDELT_TABLE_REF = f'{project}.{bq_dataset}.{GDELT_TABLE_NAME}'
    SCRAPED_GDELT_TABLE_REF = f'{project}.{bq_dataset}.scraped_{GDELT_TABLE_NAME}'
    
    logging.info(f"actor_prefix: {actor_prefix}")
    logging.info(f"GDELT_TABLE_NAME: {GDELT_TABLE_NAME}")
    logging.info(f"GDELT_TABLE_REF: {GDELT_TABLE_REF}")
    logging.info(f"SCRAPED_GDELT_TABLE_REF: {SCRAPED_GDELT_TABLE_REF}")
    
    # ======================================
    # extract GDELT data
    # ======================================
    geg_data_accessor = BigQueryAccessor(
        project
        , gdelt_project_id='gdelt-bq'
        , gdelt_dataset_id='gdeltv2'
        , gdelt_table_name='geg_gcnlapi' # geg_gcnlapi
    )
    
    logging.info("getting articles df...")
    start = time.time()
    
    geg_articles_accessor = geg_data_accessor.get_geg_article_data_v2_full_df(
        actor_name
        , min_date
        , max_date
    )
    end = time.time()
    logging.info(f"elapsed time: {end - start}")
    logging.info(f"geg_articles_accessor.shape: {geg_articles_accessor.shape}")
    
    # ======================================
    # load GDELT to BQ table
    # ======================================
    logging.info(f"loading geg articles df to {GDELT_TABLE_REF}")
    job = bqclient.load_table_from_dataframe(
        geg_articles_accessor
        , GDELT_TABLE_REF
    )
    job.result()  # Wait for the job to complete.
    
    # update table description
    TABLE_DESCRIPTION = f"articles publsihed between `{min_date}` to `{max_date}` mentioning `{actor_name}` from `gdelt-bq.gdeltv2.geg_gcnlapi`"
    logging.info(f"updating {GDELT_TABLE_REF} description to: {TABLE_DESCRIPTION}")
    
    table = bqclient.get_table(GDELT_TABLE_REF)  # API request
    table.description = f'{TABLE_DESCRIPTION}'
    table = bqclient.update_table(table, ["description"])  # API request
    logging.info(f"{GDELT_TABLE_REF} description updated")
    
    if geg_articles_accessor.shape[0] > 0:
        logging.info(f"geg_articles_accessor.shape[0] greater than zero")
    
        # ====================================
        # GDELT scraper
        # ====================================
        logging.info(f"Scraping GDELT articles for: {SCRAPED_GDELT_TABLE_REF}")
        start = time.time()

        gdelt_data_processor = GdeltData(
            geg_articles_accessor
            , destination_table=SCRAPED_GDELT_TABLE_REF
            , destination_dataset=bq_dataset
        )

        end = time.time()
        logging.info(f"elapsed time: {end - start}")

        geg_articles_full_source_df = pd.DataFrame(gdelt_data_processor.full_source_data)
        logging.info(f"geg_articles_full_source_df.shape: {geg_articles_full_source_df.shape}")
    
    # if geg_articles_full_source_df.shape[0] > 0:
    #     logging.info(f"geg_articles_full_source_df.shape[0] greater than zero")
    
        geg_articles_full_source_df = geg_articles_full_source_df.loc[geg_articles_full_source_df['article_count'] != 0]
        logging.info(f"geg_articles_full_source_df.shape: {geg_articles_full_source_df.shape}")

        geg_articles_article_list = geg_articles_full_source_df['articles']
        logging.info(f"len(geg_articles_article_list): {len(geg_articles_article_list)}")

        # format df to load to BQ
        article_rows = []

        for entry in geg_articles_article_list:
            for ent in entry:
                article_rows.append(ent)

        if len(article_rows) > 0:
            geg_articles_df = pd.DataFrame(article_rows)
            geg_articles_df.drop(columns=['authors','NumMentions'], inplace=True) # TODO - fix
            geg_articles_df['source']=geg_articles_df['url']
            logging.info(f"geg_articles_df.shape: {geg_articles_df.shape}")

            # Load data to BQ
            logging.info(f"loading scraped articles df to {SCRAPED_GDELT_TABLE_REF}")
            job = bqclient.load_table_from_dataframe(
                geg_articles_df
                , SCRAPED_GDELT_TABLE_REF
            )
            job.result()  # Wait for the job to complete.

            logging.info(f"scraped dataframe loaded to: {SCRAPED_GDELT_TABLE_REF}")

            # update table description
            TABLE_DESCRIPTION = f"scraped articles from `{min_date}` to `{max_date}` mentioning {actor_name} from `gdelt-bq.gdeltv2.geg_gcnlapi`"
            logging.info(f"updating {SCRAPED_GDELT_TABLE_REF} description to: {TABLE_DESCRIPTION}")

            table = bqclient.get_table(SCRAPED_GDELT_TABLE_REF)  # API request
            table.description = f'{TABLE_DESCRIPTION}'
            table = bqclient.update_table(table, ["description"])  # API request
            logging.info(f"{SCRAPED_GDELT_TABLE_REF} description updated")

        else:
            print("No new articles...")
            SCRAPED_GDELT_TABLE_REF = ''
            
    else:
        print("No new articles...")
        SCRAPED_GDELT_TABLE_REF = ''
    
    return(
        GDELT_TABLE_REF
        , SCRAPED_GDELT_TABLE_REF
    )

### Component to Create Matching Engine Vector Store

In [71]:
@kfp.v2.dsl.component(
    base_image=f"{REGION}-docker.pkg.dev/{PROJECT_ID}/zghost-{ACTOR_PREFIX}/gdelt-pipe-{VERSION}:latest"
)
def create_me_vectorstore(
    project: str
    , project_num: str
    , location: str
    , version: str
    , bq_dataset: str
    , me_index_name: str
    , me_index_endpoint_name: str
    , vpc_network_full: str
    , embedding_dir_bucket_uri: str
    , me_dimensions: int
) -> NamedTuple('Outputs', [
    ('me_index_resource_name', str)
    , ('me_index_endpoint_id', str)
    , ('me_index_id', str)
    , ('me_index', Artifact)
    , ('index_endpoint', Artifact)
]):
    import logging
    # ======================================
    # Import packages
    # ======================================
    logging.info(f"Importing packages...")
    
    from google.cloud import aiplatform as vertex_ai
    from google.cloud import storage
    from google.cloud import bigquery as bq
    
    from datetime import datetime
    import time
    import pandas as pd
    
    from zeitghost.vertex.MatchingEngineCRUD import MatchingEngineCRUD
    from zeitghost.vertex.MatchingEngineVectorstore import MatchingEngineVectorStore

    vertex_ai.init(
        project=project,
        location=location,
    )
    # bigquery client
    bqclient = bq.Client(
        project=project,
        # location=LOCATION
    )
    
    # ======================================
    # create mengine
    # ======================================
    logging.info(f"create mengine...")
    
    mengine = MatchingEngineCRUD(
        project_id=project 
        , project_num=project_num
        , region=location 
        , index_name=me_index_name
        , vpc_network_name=vpc_network_full
    )
    
    # ======================================
    # create or get ME index 
    # ======================================
    logging.info(f"create or get ME index...")
    
    start = time.time()
    me_index = mengine.create_index(
        f"{embedding_dir_bucket_uri}/init_index"
        , me_dimensions
    )
    end = time.time()
    logging.info(f"elapsed time: {end - start}")

    if me_index:
        logging.info(me_index.name)
        
    # ======================================
    # create or get ME index endpoint
    # ======================================
    logging.info(f"create or get ME index endpoint...")
        
    start = time.time()
    index_endpoint=mengine.create_index_endpoint(
        endpoint_name=me_index_endpoint_name
        , network=vpc_network_full
    )
    end = time.time()
    logging.info(f"elapsed time: {end - start}")
    
    if index_endpoint:
        logging.info(f"Index endpoint resource name: {index_endpoint.name}")
        logging.info(f"Index endpoint VPC network name: {index_endpoint.network}")
        logging.info(f"Deployed indexes on the index endpoint:")
        for d in index_endpoint.deployed_indexes:
            logging.info(f"    {d.id}")
            
    # get index & index endpoint IDs
    ME_INDEX_RESOURCE_NAME, ME_INDEX_ENDPOINT_ID = mengine.get_index_and_endpoint()
    ME_INDEX_ID=ME_INDEX_RESOURCE_NAME.split("/")[5]

    logging.info(f"ME_INDEX_RESOURCE_NAME  = {ME_INDEX_RESOURCE_NAME}")
    logging.info(f"ME_INDEX_ENDPOINT_ID    = {ME_INDEX_ENDPOINT_ID}")
    logging.info(f"ME_INDEX_ID             = {ME_INDEX_ID}")
    
    return (
        ME_INDEX_RESOURCE_NAME
        , ME_INDEX_ENDPOINT_ID
        , ME_INDEX_ID
        , me_index
        , index_endpoint
    )

### Component to Deploy Index to Index Endpoint

In [72]:
@kfp.v2.dsl.component(
    base_image=f"{REGION}-docker.pkg.dev/{PROJECT_ID}/zghost-{ACTOR_PREFIX}/gdelt-pipe-{VERSION}:latest"
)
def deploy_index(
    project: str
    , project_num: str
    , location: str
    , version: str
    , me_index_id: str
    , me_index_endpoint_id: str
    , me_index_name: str
    , me_index_endpoint_name: str
    , vpc_network_full: str
    
) -> NamedTuple('Outputs', [
    ('index_endpoint_deployed', Artifact)
    , ('me_index_endpoint_id', str)
    , ('me_index_name', str)
]):
    import logging
    # ======================================
    # Import packages
    # ======================================
    logging.info(f"Importing packages...")
    
    from google.cloud import aiplatform as vertex_ai
    
    from datetime import datetime
    import time
    import pandas as pd
    
    from zeitghost.vertex.MatchingEngineCRUD import MatchingEngineCRUD

    # log component args
    logging.info(f"me_index_id                : {me_index_id}")
    logging.info(f"me_index_endpoint_id       : {me_index_endpoint_id}")
    logging.info(f"me_index_name              : {me_index_name}")
    logging.info(f"me_index_endpoint_name     : {me_index_endpoint_name}")
    logging.info(f"vpc_network_full           : {vpc_network_full}")
    
    # ======================================
    # create mengine
    # ======================================
    logging.info(f"create mengine...")
    
    mengine = MatchingEngineCRUD(
        project_id=project 
        , project_num=project_num
        , region=location 
        , index_name=me_index_name
        , vpc_network_name=vpc_network_full
    )
    
    # ======================================
    # deploy ME index 
    # ======================================
    logging.info(f"deploying index...")
    index_endpoint = mengine.deploy_index(
        index_name = me_index_name
        , endpoint_name = me_index_endpoint_name
    )
    
    if index_endpoint:
        logging.info(f"Index endpoint resource name: {index_endpoint.name}")
        logging.info(f"Index endpoint VPC network name: {index_endpoint.network}")
        logging.info(f"Deployed indexes on the index endpoint:")
        for d in index_endpoint.deployed_indexes:
            logging.info(f"    {d.id}")
            
    return (
        index_endpoint
        , me_index_endpoint_id
        , me_index_name
    )

### Component to Load GDELT Records to Index

In [73]:
@kfp.v2.dsl.component(
    base_image=f"{REGION}-docker.pkg.dev/{PROJECT_ID}/zghost-{ACTOR_PREFIX}/gdelt-pipe-{VERSION}:latest"
)
def load_gdelt_events(
    project: str
    , project_num: str
    , location: str
    , version: str
    , bq_dataset: str
    , vpc_network_full: str
    , embedding_dir_bucket_name: str
    , embedding_dir_bucket_uri: str
    , table_name: str
    , me_index_endpoint_id: str
    , me_index_id: str
    , me_index_artifact: Input[Artifact]
    , me_index_endpoint_artifact: Input[Artifact]
) -> NamedTuple('Outputs', [
    ('me_index_endpoint_id', str)
    , ('me_index_id', str)
    , ('uploaded_ids_list', list)
    , ('uploaded_ids', Artifact)
]):
        
    import logging
    # ====================================
    # import packages
    # ====================================
    logging.info(f"importing packages...")
    
    import logging
    from datetime import datetime
    import pandas as pd
    import uuid
    import numpy as np
    import json
    import time
    import io
    import os
    
    from langchain.document_loaders import DataFrameLoader
    from langchain.docstore.document import Document
    
    from google.cloud import aiplatform as vertex_ai
    from google.cloud import storage
    from google.cloud import bigquery as bq
    
    from zeitghost.vertex.MatchingEngineVectorstore import MatchingEngineVectorStore
    from zeitghost.vertex.LLM import VertexLLM
    from zeitghost.vertex.Embeddings import VertexEmbeddings
    
    # set project ID
    CLOUD_ML_PROJECT_ID = os.environ["CLOUD_ML_PROJECT_ID"]
    
    # log component args
    logging.info(f"bq_dataset: {bq_dataset}")
    logging.info(f"vpc_network_full: {vpc_network_full}")
    logging.info(f"table_name: {table_name}")
    logging.info(f"CLOUD_ML_PROJECT_ID: {CLOUD_ML_PROJECT_ID}")
    
    # GCP SDK clients
    storage_client = storage.Client(project=project)
    
    vertex_ai.init(
        project=project,
        location=location,
    )
    # bigquery client
    bqclient = bq.Client(
        project=CLOUD_ML_PROJECT_ID,
        # location=LOCATION
    )
    # helper function
    def test_gcs_blob_metadata(blob_name, bucket_name):
        """
        inspect blobs uploaded to GCS
        """
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.get_blob(blob_name)
        print(f"Metadata: {blob.metadata}")
        
    logging.info(f"table_ref: {table_name}")
    
    # ======================================
    # create Vertex LLM
    # ======================================
    logging.info(f"create ME vector store...")
    
    llm = VertexLLM(
        stop=None 
        , temperature=0.0
        , max_output_tokens=1000
        , top_p=0.7
        , top_k=40
    )

    REQUESTS_PER_MINUTE = 299 # project quota==300
    logging.info(f"REQUESTS_PER_MINUTE: {REQUESTS_PER_MINUTE}")
    
    vertex_embedding = VertexEmbeddings(requests_per_minute=REQUESTS_PER_MINUTE)
    
    # ======================================
    # create ME vector store
    # ======================================
    logging.info(f"create ME vector store...")
    me = MatchingEngineVectorStore.from_components(
        project_id=project
        # , project_num=PROJECT_NUM
        , region=location
        , gcs_bucket_name=embedding_dir_bucket_uri
        , embedding=vertex_embedding
        , index_id=me_index_id
        , endpoint_id=me_index_endpoint_id
    )
    
    # ======================================
    # load gdelt table
    # ======================================
    logging.info(f"loading gdelt table...")
    
    query = f"""
        SELECT * 
        FROM `{table_name}`
        -- LIMIT 5
    """
    logging.info(f"query: {query}")
    df=bqclient.query(query).to_dataframe().head(1)

    df_col_list = df.columns
    logging.info(f"df_col_list: {df_col_list}")
    
    meta_cols_list = list(df_col_list)
    meta_cols_list.remove('text')
    logging.info(f"meta_cols_list: {meta_cols_list}")
    
    # ======================================
    # chunk text 
    # ======================================
    logging.info(f"chunk text...")
    start = time.time()

    docs = me.chunk_bq_table(
        bq_dataset_name=bq_dataset
        , bq_table_name=table_name
        , query=query
        , page_content_cols=['text']
        # , metadata_cols=['source']
        , metadata_cols=meta_cols_list
        , chunk_size=1000
        , chunk_overlap=0
    )

    end = time.time()
    logging.info(f"elapsed time: {end - start}")

    texts = [d.page_content for d in docs]
    metas = [d.metadata for d in docs]

    # chunk text and add to matching engine vector store
    uploaded_ids = me.add_texts(
        texts=texts
        , metadatas=metas
    )
    logging.info(f"uploaded_ids: {uploaded_ids}")
    
    uuid_strings = []

    for uid in uploaded_ids:
        uuid_strings.append(str(uid))
        
    # ======================================
    # test gcs blob metadata
    # ======================================
    logging.info(f"testing gcs blob metadata...")
        
    # test blob metadata
    TEST_BLOB_UUID = str(uuid_strings[0])
    logging.info(TEST_BLOB_UUID)

    BLOB_NAME=f'documents/{TEST_BLOB_UUID}'
    logging.info(
        test_gcs_blob_metadata(
            blob_name=BLOB_NAME
            , bucket_name=embedding_dir_bucket_name
        )
    )
    
    return (
        me_index_endpoint_id
        , me_index_id
        , uuid_strings
        , uuid_strings
    )

### Component to Load GDELT GEG (Entity) articles to Index

In [74]:
@kfp.v2.dsl.component(
    base_image=f"{REGION}-docker.pkg.dev/{PROJECT_ID}/zghost-{ACTOR_PREFIX}/gdelt-pipe-{VERSION}:latest"
)
def load_gdelt_geg_articles(
    project: str
    , project_num: str
    , location: str
    , version: str
    , bq_dataset: str
    , vpc_network_full: str
    , embedding_dir_bucket_name: str
    , embedding_dir_bucket_uri: str
    , table_name: str
    , me_index_endpoint_id: str
    , me_index_id: str
    , me_index_artifact: Input[Artifact]
    , me_index_endpoint_artifact: Input[Artifact]
) -> NamedTuple('Outputs', [
    ('me_index_endpoint_id', str)
    , ('me_index_id', str)
    , ('uploaded_ids_list', list)
    , ('uploaded_ids', Artifact)
]):
    
    import logging
    # ====================================
    # import packages
    # ====================================
    logging.info(f"importing packages...")
    
    import logging
    from datetime import datetime
    import pandas as pd
    import uuid
    import numpy as np
    import json
    import time
    import io
    import os
    
    from langchain.document_loaders import DataFrameLoader
    from langchain.docstore.document import Document
    
    from google.cloud import aiplatform as vertex_ai
    from google.cloud import storage
    from google.cloud import bigquery as bq
    
    from zeitghost.vertex.MatchingEngineVectorstore import MatchingEngineVectorStore
    from zeitghost.vertex.LLM import VertexLLM
    from zeitghost.vertex.Embeddings import VertexEmbeddings
    
    # set project ID
    CLOUD_ML_PROJECT_ID = os.environ["CLOUD_ML_PROJECT_ID"]
    
    # log component args
    logging.info(f"bq_dataset: {bq_dataset}")
    logging.info(f"vpc_network_full: {vpc_network_full}")
    logging.info(f"table_name: {table_name}")
    logging.info(f"CLOUD_ML_PROJECT_ID: {CLOUD_ML_PROJECT_ID}")
    
    # GCP SDK clients
    storage_client = storage.Client(project=project)
    
    vertex_ai.init(
        project=project,
        location=location,
    )
    # bigquery client
    bqclient = bq.Client(
        project=CLOUD_ML_PROJECT_ID,
        # location=LOCATION
    )
    # helper function
    def test_gcs_blob_metadata(blob_name, bucket_name):
        """
        inspect blobs uploaded to GCS
        """
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.get_blob(blob_name)
        print(f"Metadata: {blob.metadata}")

    logging.info(f"table_name: {table_name}")
    
    # ======================================
    # create Vertex LLM
    # ======================================
    logging.info(f"create ME vector store...")
    
    llm = VertexLLM(
        stop=None 
        , temperature=0.0
        , max_output_tokens=1000
        , top_p=0.7
        , top_k=40
    )

    REQUESTS_PER_MINUTE = 299 # project quota==300
    logging.info(f"REQUESTS_PER_MINUTE: {REQUESTS_PER_MINUTE}")
    
    vertex_embedding = VertexEmbeddings(requests_per_minute=REQUESTS_PER_MINUTE)
    
    # ======================================
    # create ME vector store
    # ======================================
    logging.info(f"create ME vector store...")
    me = MatchingEngineVectorStore.from_components(
        project_id=project
        # , project_num=PROJECT_NUM
        , region=location
        , gcs_bucket_name=embedding_dir_bucket_uri
        , embedding=vertex_embedding
        , index_id=me_index_id
        , endpoint_id=me_index_endpoint_id
    )
    
    # ======================================
    # load gdelt table
    # ======================================
    logging.info(f"loading gdelt table...")
    
    query = f"""
        SELECT * 
        FROM `{table_name}`
        -- LIMIT 5
    """
    logging.info(f"query: {query}")
    df=bqclient.query(query).to_dataframe().head(1)

    df_col_list = df.columns
    logging.info(f"df_col_list: {df_col_list}")
    
    meta_cols_list = list(df_col_list)
    meta_cols_list = [
        e for e in meta_cols_list if e not in (
            'text'
            , 'language'
            , 'date'
            , 'Actor1Name'
            , 'Actor2Name'
            , 'GoldsteinScale'
        )
    ]
    logging.info(f"meta_cols_list: {meta_cols_list}")
    
    # ======================================
    # chunk text 
    # ======================================
    logging.info(f"chunk text...")
    start = time.time()

    docs = me.chunk_bq_table(
        bq_dataset_name=bq_dataset
        , bq_table_name=table_name
        , query=query
        , page_content_cols=['text']
        # , metadata_cols=['source']
        , metadata_cols=meta_cols_list
        , chunk_size=1000
        , chunk_overlap=0
    )

    end = time.time()
    logging.info(f"elapsed time: {end - start}")

    texts = [d.page_content for d in docs]
    metas = [d.metadata for d in docs]

    # chunk text and add to matching engine vector store
    uploaded_ids = me.add_texts(
        texts=texts
        , metadatas=metas
    )
    logging.info(f"uploaded_ids: {uploaded_ids}")
    
    uuid_strings = []

    for uid in uploaded_ids:
        uuid_strings.append(str(uid))
        
    # ======================================
    # test gcs blob metadata
    # ======================================
    logging.info(f"testing gcs blob metadata...")
    
    TEST_BLOB_UUID = str(uuid_strings[0])
    logging.info(TEST_BLOB_UUID)

    BLOB_NAME=f'documents/{TEST_BLOB_UUID}'
    logging.info(
        test_gcs_blob_metadata(
            blob_name=BLOB_NAME
            , bucket_name=embedding_dir_bucket_name
        )
    )
    
    return (
        me_index_endpoint_id
        , me_index_id
        , uuid_strings
        , uuid_strings
    )

## Pipeline Definition

### Set Pipeline Arguments

In [103]:
# create new assets in pipeline
CREATE_NEW_ASSETS        = "False" # TODO: "True" | "False"
# ACTOR_PREFIX             = "way"  # TODO
# VERSION                  = "v1"   # TODO

print(f"CREATE_NEW_ASSETS  : {CREATE_NEW_ASSETS}")
print(f"ACTOR_PREFIX       : {ACTOR_PREFIX}")
print(f"VERSION            : {VERSION}")

CREATE_NEW_ASSETS  : False
ACTOR_PREFIX       : way
VERSION            : v1


In [104]:
# change pipeline_version as you make tweaks to the pipeline 
PIPELINE_VERSION='v1'

# Stores pipeline executions for each run
PIPELINE_ROOT_PATH = f'{BUCKET_URI}/pipeline_root/{PIPELINE_VERSION}'

print(f"BUCKET_URI: {BUCKET_URI}")
print(f'EMBEDDING_DIR_BUCKET_URI: {EMBEDDING_DIR_BUCKET_URI}')
print(f'PIPELINE_ROOT_PATH: {PIPELINE_ROOT_PATH}')

BUCKET_URI: gs://zghost-way-v1-wortz-project-352116
EMBEDDING_DIR_BUCKET_URI: gs://zghost-way-v1-wortz-project-352116-emd-dir
PIPELINE_ROOT_PATH: gs://zghost-way-v1-wortz-project-352116/pipeline_root/v1


In [105]:
PIPELINE_NAME = f'gdelt-pipe-{PIPELINE_VERSION}-{ACTOR_PREFIX}-{VERSION}'.replace('_', '-')
print(f"PIPELINE_NAME: {PIPELINE_NAME}")

PIPELINE_NAME: gdelt-pipe-v1-way-v1


Create pipeline definition from the previously defined components

Adds conditional logic to only deploy new infrastructure if it does not previously exist - see flag `CREATE_NEW_ASSETS`

In [114]:
@kfp.v2.dsl.pipeline(
  name=PIPELINE_NAME
)
def pipeline(
    project: str
    , project_num: str
    , location: str
    , version: str
    , bq_dataset: str
    , min_date_events: str
    , max_date_events: str
    , min_date_geg: str
    , max_date_geg: str
    , test_table_prefix: str
    , vpc_network_full: str
    , extract_geg_articles: str
    , extract_gdelt_events: str
    , create_new_assets: str
    , actor_prefix: str
    , actor_name: str
    , vpc_network_name: str
    , bq_location: str = 'US'
    , create_me_endpoint: str = 'False'
):
    
    
    # ======================================
    # create new or get existing ME index
    # ======================================
    set_config_op = (
        set_config(
            project=project
            , project_num=project_num
            , location=location
            , version=version
            , actor_prefix=actor_prefix
            , actor_name=actor_name
            , vpc_network_name=vpc_network_name
            , create_new_assets=create_new_assets
        )
        .set_caching_options(True)
        .set_display_name("pipeline config")
    )
    
    # ======================================
    # create new or get existing ME index
    # ======================================
    create_me_vectorstore_op = (
        create_me_vectorstore(
            project=project
            , project_num=project_num
            , location=location
            , version=version
            , bq_dataset=set_config_op.outputs['my_bq_dataset']
            , me_index_name=set_config_op.outputs['me_index_name']
            , me_index_endpoint_name=set_config_op.outputs['me_index_endpoint_name']
            , vpc_network_full=set_config_op.outputs['vpc_network_full']
            , embedding_dir_bucket_uri=set_config_op.outputs['emb_bucket_uri']
            , me_dimensions=set_config_op.outputs['me_dimensions']
        )
        .set_caching_options(True)
        .set_display_name("create or get ME index & endpoint")
        # .set_cpu_limit(XXXX)
        # .set_memory_limit(XXXX)
    )
    
    # ======================================
    # deploy index
    # ======================================
    with kfp.v2.dsl.Condition(create_me_endpoint == "True", name="deploy_index"):
        
        deploy_index_op = (
            deploy_index(
                project=project
                , project_num=project_num
                , location=location
                , version=version
                , me_index_id=create_me_vectorstore_op.outputs['me_index_id']
                , me_index_endpoint_id=create_me_vectorstore_op.outputs['me_index_endpoint_id']
                , me_index_endpoint_name=set_config_op.outputs['me_index_endpoint_name']
                , me_index_name=set_config_op.outputs['me_index_name']
                , vpc_network_full=set_config_op.outputs['vpc_network_full']
            )
            .set_display_name("deploy index to endpoint")
        )
    
    with kfp.v2.dsl.Condition(extract_gdelt_events == "True", name="extract event articles"):
        
        # ======================================
        # get GDELT events articles
        # ======================================
        create_gdelt_events_table_op = (
            create_gdelt_events_table(
                project=project
                , location=location
                , bq_location=bq_location
                , version=version
                , bq_dataset=set_config_op.outputs['my_bq_dataset']
                , actor_prefix=actor_prefix
                , actor_name=actor_name
                , min_date=min_date_events
                , max_date=max_date_events
                , test_table_prefix=test_table_prefix
            )
            .set_caching_options(True)
            .set_display_name("Scrape gdelt event articles")
            # .set_cpu_limit(XXXX)
            # .set_memory_limit(XXXX)
        )

        with kfp.v2.dsl.Condition(create_gdelt_events_table_op.outputs['scraped_articles_table'] != "", name="new event articles"):

            # ======================================
            # index GDELT events in ME index
            # ======================================
            load_gdelt_events_op = (
                load_gdelt_events(
                    project=project
                    , project_num=project_num
                    , location=location
                    , version=version
                    , bq_dataset=set_config_op.outputs['my_bq_dataset']
                    , vpc_network_full=set_config_op.outputs['vpc_network_full']
                    , embedding_dir_bucket_name=set_config_op.outputs['emb_bucket']
                    , embedding_dir_bucket_uri=set_config_op.outputs['emb_bucket_uri']
                    , table_name=create_gdelt_events_table_op.outputs['scraped_articles_table']
                    , me_index_endpoint_id=create_me_vectorstore_op.outputs['me_index_endpoint_id']
                    , me_index_id=create_me_vectorstore_op.outputs['me_index_id']
                    , me_index_artifact=create_me_vectorstore_op.outputs['me_index']
                    , me_index_endpoint_artifact=create_me_vectorstore_op.outputs['index_endpoint']
                )
                .set_caching_options(True)
                .set_display_name("Index gdelt event articles in ME")
                # .set_cpu_limit(XXXX)
                # .set_memory_limit(XXXX)
            )

    with kfp.v2.dsl.Condition(extract_geg_articles == "True", name="extract geg articles"):
        
        # ======================================
        # get GDELT geg articles
        # ======================================
        create_geg_article_table_op = (
            create_geg_article_table(
                project=project
                , location=location
                , bq_location=bq_location
                , version=version
                , bq_dataset=set_config_op.outputs['my_bq_dataset']
                , actor_prefix=actor_prefix
                , actor_name=actor_name
                , min_date=min_date_geg
                , max_date=max_date_geg
                , test_table_prefix=test_table_prefix
            )
            .set_caching_options(True)
            .set_display_name("Scrape gdelt geg articles")
            # .set_cpu_limit(XXXX)
            # .set_memory_limit(XXXX)
        )
        
        with kfp.v2.dsl.Condition(create_geg_article_table_op.outputs['scraped_articles_table'] != "", name="new geg articles"):
            # ======================================
            # index GDELT geg in ME index
            # ======================================
            load_gdelt_geg_articles_op = (
                load_gdelt_geg_articles(
                    project=project
                    , project_num=project_num
                    , location=location
                    , version=version
                    , bq_dataset=set_config_op.outputs['my_bq_dataset']
                    , vpc_network_full=set_config_op.outputs['vpc_network_full']
                    , embedding_dir_bucket_name=set_config_op.outputs['emb_bucket']
                    , embedding_dir_bucket_uri=set_config_op.outputs['emb_bucket_uri']
                    , table_name=create_geg_article_table_op.outputs['scraped_articles_table']
                    , me_index_endpoint_id=create_me_vectorstore_op.outputs['me_index_endpoint_id']
                    , me_index_id=create_me_vectorstore_op.outputs['me_index_id']
                    , me_index_artifact=create_me_vectorstore_op.outputs['me_index']
                    , me_index_endpoint_artifact=create_me_vectorstore_op.outputs['index_endpoint']
                )
                .set_caching_options(True)
                .set_display_name("Index gdelt geg articles in ME")
                # .set_cpu_limit(XXXX)
                # .set_memory_limit(XXXX)
            )

### Compile Pipeline

In [115]:
PIPELINE_JSON_SPEC_LOCAL = "custom_pipeline_spec.json"

! rm -f ./pipelines/$PIPELINE_JSON_SPEC_LOCAL

kfp.v2.compiler.Compiler().compile(
    pipeline_func=pipeline, package_path=PIPELINE_JSON_SPEC_LOCAL,
)



In [116]:
PIPELINES_FILEPATH = f'{PIPELINE_ROOT_PATH}/pipeline_spec.json'
print("PIPELINES_FILEPATH:", PIPELINES_FILEPATH)

!gsutil cp $PIPELINE_JSON_SPEC_LOCAL $PIPELINES_FILEPATH

PIPELINES_FILEPATH: gs://zghost-way-v1-wortz-project-352116/pipeline_root/v1/pipeline_spec.json
Copying file://custom_pipeline_spec.json [Content-Type=application/json]...
/ [1 files][ 94.8 KiB/ 94.8 KiB]                                                
Operation completed over 1 objects/94.8 KiB.                                     


In [117]:
!gsutil ls $PIPELINE_ROOT_PATH

gs://zghost-way-v1-wortz-project-352116/pipeline_root/v1/pipeline_spec.json
gs://zghost-way-v1-wortz-project-352116/pipeline_root/v1/679926387543/


### Submit Pipeline Job
All Vertex AI Pipelines use service accounts - make sure that the service account that you are using has the required permissions to perform the activities in the pipeline

In [118]:
# Setting this to true to publish the endpoint in case needed - default is False
CREATE_ME_ENDPOINT = "False"

# "True" will write results to a table prefixed with `test`; useful for debugging and testing pipeline
# change to "False" when scraping all GDELT records of interest
TEST_TABLE_PREFIX    = "True" # TODO: "True" | "False"

# set time window for GDELT `events` table
MIN_DATE_EVENTS      = "2023-06-01"  # TODO
MAX_DATE_EVENTS      = "2023-06-14"  # TODO

# set time window for GDELT `global entity graph` (geg) table
MIN_DATE_GEG         = "2023-01-01"  # TODO
MAX_DATE_GEG         = "2023-06-14"  # TODO

# conditional pipeline args; usefull when testing and debugging
EXTRACT_GDELT_EVENTS = "True"        # TODO: 'True' | 'False'
EXTRACT_GDELT_GEG    = "True"        # TODO: 'True' | 'False'

print(f"TEST_TABLE_PREFIX     : {TEST_TABLE_PREFIX}")
print(f"MIN_DATE_EVENTS       : {MIN_DATE_EVENTS}")
print(f"MAX_DATE_EVENTS       : {MAX_DATE_EVENTS}")
print(f"MIN_DATE_GEG          : {MIN_DATE_GEG}")
print(f"MAX_DATE_GEG          : {MAX_DATE_GEG}")
print(f"EXTRACT_GDELT_EVENTS  : {EXTRACT_GDELT_EVENTS}")
print(f"EXTRACT_GDELT_GEG     : {EXTRACT_GDELT_GEG}")

TEST_TABLE_PREFIX     : True
MIN_DATE_EVENTS       : 2023-06-01
MAX_DATE_EVENTS       : 2023-06-14
MIN_DATE_GEG          : 2023-01-01
MAX_DATE_GEG          : 2023-06-14
EXTRACT_GDELT_EVENTS  : True
EXTRACT_GDELT_GEG     : True


In [119]:
# VPC_NETWORK_FULL = f'projects/{PROJECT_NUM}/global/networks/{vpc_network_name}'
SERVICE_ACCOUNT = f'{PROJECT_NUM}-compute@developer.gserviceaccount.com'

job = vertex_ai.PipelineJob(
    display_name=PIPELINE_NAME,
    template_path=PIPELINES_FILEPATH,
    pipeline_root=f'{PIPELINE_ROOT_PATH}',
    failure_policy='fast', # slow | fast
    # enable_caching=False,
    parameter_values={
        'project'                    : PROJECT_ID
        , 'project_num'              : PROJECT_NUM
        , 'location'                 : LOCATION
        , 'bq_location'              : BQ_LOCATION
        , 'version'                  : VERSION
        , 'vpc_network_full'         : VPC_NETWORK_FULL
        , 'bq_dataset'               : MY_BQ_DATASET
        , 'min_date_events'          : MIN_DATE_EVENTS
        , 'max_date_events'          : MAX_DATE_EVENTS
        , 'min_date_geg'             : MIN_DATE_GEG
        , 'max_date_geg'             : MAX_DATE_GEG
        , 'test_table_prefix'        : TEST_TABLE_PREFIX
        , 'extract_gdelt_events'     : EXTRACT_GDELT_EVENTS
        , 'extract_geg_articles'     : EXTRACT_GDELT_GEG
        , 'create_new_assets'        : CREATE_NEW_ASSETS
        , 'actor_name'               : ACTOR_NAME
        , 'actor_prefix'             : ACTOR_PREFIX
        , 'vpc_network_name'         : VPC_NETWORK_NAME
        , 'create_me_endpoint'       : CREATE_ME_ENDPOINT
    },
)

job.run(
    sync=False,
    service_account=SERVICE_ACCOUNT,
    network=VPC_NETWORK_FULL
)

In the console, if you navigate within Vertex AI to pipeline runs, you should see your pipeline running after you submit the job