In [None]:
!pip install kfp
!pip install google-cloud-pipeline-components

In [19]:
import kfp
from google.cloud import aiplatform
from kfp.v2.dsl import component
from kfp.v2.dsl import pipeline
from kfp.v2 import compiler
from google.cloud.aiplatform import pipeline_jobs

In [7]:
PROJECT_ID = "sascha-playground-doit"
PIPELINE_ROOT = "gs://doit-vertex-demo/"

In [8]:
aiplatform.init(project=PROJECT_ID,
                location='us-central1')

In [None]:
@component(
    packages_to_install=["google-cloud-aiplatform"]
)
def batch_predict():
    from google.cloud import aiplatform
    model = aiplatform.Model('projects/sascha-playground-doit/locations/us-central1/models/6091379080274378752')

    model.batch_predict(
    job_display_name=f"batch_predict_churn",
    machine_type="n1-standard-4",
    starting_replica_count=2,

    instances_format="bigquery",
    predictions_format="bigquery",
    bigquery_source='bq://sascha-playground-doit.churn_prediction.preprocessed_data',
    bigquery_destination_prefix="bq://sascha-playground-doit.batch",
)

In [29]:
#you can use the following code to create a reference to a model if the model is already uploaded
model = aiplatform.Model('projects/sascha-playground-doit/locations/us-central1/models/6091379080274378752')

In [30]:
@pipeline(name="basic-pipeline",
          pipeline_root=PIPELINE_ROOT + "basic-pipeline")
def basic_pipeline():
    batch_predict_task = batch_predict()

In [31]:
compiler.Compiler().compile(
pipeline_func=basic_pipeline, package_path="basic_pipeline.json"
)

In [32]:
job = pipeline_jobs.PipelineJob(
    display_name="basic-pipeline",
    template_path="basic_pipeline.json"
)

In [33]:
job.run(sync=False)

In [26]:
    # Schedule the pipeline job
job.create_schedule(
        display_name=f"batch-churn-predict-schedule",
        cron="0 0 * * *"
    )

Creating PipelineJobSchedule
PipelineJobSchedule created. Resource name: projects/234439745674/locations/us-central1/schedules/6459591230765400064
To use this PipelineJobSchedule in another session:
schedule = aiplatform.PipelineJobSchedule.get('projects/234439745674/locations/us-central1/schedules/6459591230765400064')
View Schedule:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/schedules/6459591230765400064?project=234439745674


<google.cloud.aiplatform.pipeline_job_schedules.PipelineJobSchedule object at 0x12e75f690> 
resource name: projects/234439745674/locations/us-central1/schedules/6459591230765400064

In [11]:
batch_prediction_job = model.batch_predict(
    job_display_name=f"batch_predict_churn",
    machine_type="n1-standard-4",
    starting_replica_count=2,

    instances_format="bigquery",
    predictions_format="bigquery",
    bigquery_source='bq://sascha-playground-doit.churn_prediction.preprocessed_data',
    bigquery_destination_prefix="bq://sascha-playground-doit.batch",
)

Creating BatchPredictionJob
BatchPredictionJob created. Resource name: projects/234439745674/locations/us-central1/batchPredictionJobs/8542524492423888896
To use this BatchPredictionJob in another session:
bpj = aiplatform.BatchPredictionJob('projects/234439745674/locations/us-central1/batchPredictionJobs/8542524492423888896')
View Batch Prediction Job:
https://console.cloud.google.com/ai/platform/locations/us-central1/batch-predictions/8542524492423888896?project=234439745674
BatchPredictionJob projects/234439745674/locations/us-central1/batchPredictionJobs/8542524492423888896 current state:
3
BatchPredictionJob projects/234439745674/locations/us-central1/batchPredictionJobs/8542524492423888896 current state:
3
BatchPredictionJob projects/234439745674/locations/us-central1/batchPredictionJobs/8542524492423888896 current state:
3
BatchPredictionJob projects/234439745674/locations/us-central1/batchPredictionJobs/8542524492423888896 current state:
3
BatchPredictionJob projects/2344397456

KeyboardInterrupt: 