In [33]:
from kfp import dsl
from kfp.dsl import (
    component, 
    Output,
    Input,
    Model
)

from kfp import compiler
from google.cloud import aiplatform

In [34]:
BASE_IMAGE = "europe-west3-docker.pkg.dev/bda-gameon-demo/vertex/base_football_container:1.0.10"

In [35]:
@component(
    base_image=BASE_IMAGE
)
def load_and_preprocess(
    gamesweek: int,
    competitionId: int,
):
    import requests
    from vertex_utils import (
        prepare_df_from_events_api,
        prepare_df_from_matches_api,
        fetch_events_data,
        merge_events_and_matches,
        enrich_with_tags_names,
        save_historic_to_big_query,
        prepare_aggregations,
    )
    
    API_BASE_URL = "https://big-data-project-api-248863766350.europe-west3.run.app"

    # Fetch matches for the given gameweek and competitionId
    api_match_url = f"{API_BASE_URL}/matches?gameweek={gamesweek}&competitionId={competitionId}"

    print(f"Fetching match info for competition: {competitionId} and gameweek: {gamesweek}...")
    response_matches = requests.get(api_match_url)

    if response_matches.status_code != 200:
        raise Exception(f"API call failed with status code {response_matches.status_code}: {response_matches.text}")

    matches_data = response_matches.json()

    if "matches" not in matches_data:
        raise ValueError("Invalid matches data format received from API.")

    print(f"Gathering match info for competition: {competitionId} and gamesweek: {gamesweek}...")
    matches_df = prepare_df_from_matches_api(matches_data)

    print(f"Fetching events data for competition: {competitionId} and gameweek: {gamesweek}...")
    events_data = fetch_events_data(matches_df, API_BASE_URL)

    # Convert events data to a DataFrame
    print(f"Preparing events DataFrame for competition: {competitionId} and gameweek: {gamesweek}...")
    events_df = prepare_df_from_events_api(events_data)

    print("Merging events and match data...")
    df = merge_events_and_matches(events_df, matches_df)

    print("Enriching with tags names...")
    df = enrich_with_tags_names(df)

    print("Preparing aggregations...")
    aggregations = prepare_aggregations(df)

    print("Saving aggregations to BigQuery...")
    save_historic_to_big_query(aggregations)

In [36]:
@component(base_image=BASE_IMAGE)
def train_model():
    from google.cloud import bigquery
    import logging

    query = """
        CREATE OR REPLACE MODEL football.lightgbm_model
        OPTIONS(model_type='BOOSTED_TREE_CLASSIFIER',
                input_label_cols=['result'])
        AS
        SELECT
            *
        FROM
            `bda-gameon-demo.football.historic_aggregations`
    """

    try:
        logging.info("Initializing BigQuery client...")
        client = bigquery.Client(project="bda-gameon-demo")
        logging.info("Running query:\n%s", query)
        query_job = client.query(query)
        query_job.result()  # Wait for the query to complete
        logging.info("Query completed successfully.")
    except Exception as e:
        logging.error("Failed to execute query: %s", str(e))
        raise

In [37]:
@dsl.pipeline(name="batch_processing", description="Pipeline responsible for batch processing and model training")
def batch_processing_pipeline(
    gamesweek: int = 1,
    competitionId: int = 364,
):
    load_and_preprocess_step = load_and_preprocess(
        gamesweek=gamesweek,
        competitionId=competitionId
    ).set_display_name("Load and Preprocess")

    train_model_step = train_model().after(load_and_preprocess_step).set_display_name("Train Model")

In [38]:
compiler.Compiler().compile(
    pipeline_func=batch_processing_pipeline,
    package_path="batch_processing_pipeline.json",
)

aiplatform.init(project="bda-gameon-demo", location="europe-west3")

pipeline_job = aiplatform.PipelineJob(
    display_name="batch_processing_job",
    template_path="batch_processing_pipeline.json",
    parameter_values={
        "gamesweek": 1,
        "competitionId": 364,
    },
)

pipeline_job.run(sync=True)

Creating PipelineJob
PipelineJob created. Resource name: projects/248863766350/locations/europe-west3/pipelineJobs/batch-processing-20250103205135
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/248863766350/locations/europe-west3/pipelineJobs/batch-processing-20250103205135')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/europe-west3/pipelines/runs/batch-processing-20250103205135?project=248863766350
PipelineJob projects/248863766350/locations/europe-west3/pipelineJobs/batch-processing-20250103205135 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/248863766350/locations/europe-west3/pipelineJobs/batch-processing-20250103205135 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/248863766350/locations/europe-west3/pipelineJobs/batch-processing-20250103205135 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/248863766350/locations/europe-west3/pi