# Import Required Libraries
Import necessary libraries such as google.cloud.aiplatform, datetime, and croniter for Vertex AI pipeline management.

In [None]:
import json
from datetime import datetime
from google.cloud import aiplatform
from kfp.v2 import compiler
from src.pipelines.earth_data_to_bq import earth_data_pipeline  # Assuming the pipeline is importable

# Define Pipeline Parameters
Define and validate input parameters including dataset_name, dataset_version, is_scheduled, start_date, end_date, and cron_schedule.

In [None]:
# Define parameters
dataset_name = "MODIS"  # Example
dataset_version = "061"
is_scheduled = False  # True for scheduled, False for immediate run
start_date = "2023-01-01" if not is_scheduled else None
end_date = "2023-01-02" if not is_scheduled else None
cron_schedule = "0 0 * * *" if is_scheduled else None  # Daily at midnight

# Other required parameters
polygon = [[-122.0, 37.0], [-122.0, 38.0], [-121.0, 38.0], [-121.0, 37.0]]
polygon_str = json.dumps(polygon)
bq_dataset = "your_bq_dataset"
table_id = "your_table"

# Validate
if is_scheduled and not cron_schedule:
    raise ValueError("cron_schedule must be provided if is_scheduled is True")

# Create Pipeline Function
The pipeline function is already defined in src.pipelines.earth_data_to_bq. We use it directly.

# Compile Pipeline
Compile the pipeline into a JSON format suitable for Vertex AI.

In [None]:
# Compile the pipeline
pipeline_json_path = "earth_data_pipeline.json"
compiler.Compiler().compile(
    pipeline_func=earth_data_pipeline,
    package_path=pipeline_json_path
)
print(f"Pipeline compiled to {pipeline_json_path}")

# Schedule Pipeline
If is_scheduled is True, use cron_schedule to create a scheduled run in Vertex AI, ignoring start_date.

In [None]:
# Schedule the pipeline if requested
if is_scheduled:
    aiplatform.init(project="your-gcp-project", location="us-central1")
    pipeline_job = aiplatform.PipelineJob(
        display_name="Scheduled Earth Data Pipeline",
        template_path=pipeline_json_path,
        parameter_values={
            "dataset_name": dataset_name,
            "dataset_version": dataset_version,
            "polygon_str": polygon_str,
            "bq_dataset": bq_dataset,
            "table_id": table_id,
            # start_date and end_date will be None, so pipeline uses current dates
        }
    )
    schedule = aiplatform.Schedule.create(
        pipeline_job=pipeline_job,
        cron=cron_schedule,
        display_name="Earth Data Schedule"
    )
    print(f"Pipeline scheduled with ID: {schedule.resource_name}")
else:
    print("Pipeline not scheduled.")

# Run Pipeline Immediately
If is_scheduled is False, run the pipeline immediately in Vertex AI using start_date and end_date.

In [None]:
# Run the pipeline immediately if not scheduled
if not is_scheduled:
    aiplatform.init(project="your-gcp-project", location="us-central1")
    job = aiplatform.PipelineJob(
        display_name="Earth Data Pipeline Run",
        template_path=pipeline_json_path,
        parameter_values={
            "dataset_name": dataset_name,
            "dataset_version": dataset_version,
            "start_date": start_date,
            "end_date": end_date,
            "polygon_str": polygon_str,
            "bq_dataset": bq_dataset,
            "table_id": table_id
        }
    )
    job.run()
    print(f"Pipeline run started with ID: {job.resource_name}")
else:
    print("Pipeline is scheduled, not running immediately.")

# Delete Pipeline
Delete an existing pipeline from Vertex AI using its resource name or ID.

In [None]:
# Delete a pipeline (run this cell separately with the resource name)
# Replace 'your-pipeline-resource-name' with the actual resource name from previous runs
pipeline_resource_name = "projects/your-gcp-project/locations/us-central1/pipelineJobs/your-job-id"
aiplatform.PipelineJob.delete(resource_name=pipeline_resource_name)
print(f"Deleted pipeline: {pipeline_resource_name}")

# For schedules, if deleting a schedule:
# schedule_resource_name = "projects/.../schedules/..."
# aiplatform.Schedule.delete(resource_name=schedule_resource_name)