<a href="https://colab.research.google.com/github/apetresc/rotman-ml/blob/master/notebooks/Rotman_AirBNB_New_User_Bookings.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# MLOps Colloquium - AirBNB New User Bookings

This data set, from [a Kaggle competition](https://www.kaggle.com/competitions/airbnb-recruiting-new-user-bookings/overview) that AirBNB used for recruitment, contains demographic and summary data about a batch of new users, together with some data about their web sessions. The goal of the original exercise was to train a model that could predict which country a new user would make their very first booking in.

We, however, are going to assume this model already exists. Instead, we're going to focus on a very common operation in real-world ML workflows: resolving data issues that can affect the accuracy of the model.

In [None]:
YOUR_NAME = 'apetresc'
GCP_PROJECT_ID = ''
REGION = 'us-central1'
GCS_BUCKET = 'gs://rotman-vertex-demo'

In [None]:
!pip install kfp==1.8.14 scikit-learn google-cloud-pipeline-components==1.0.24 matplotlib
!pip install pyspark
!wget https://github.com/GoogleCloudDataproc/hadoop-connectors/releases/download/v2.2.11/gcs-connector-hadoop3-2.2.11-shaded.jar

In [None]:
import kfp
from kfp.v2 import dsl, components
from google_cloud_pipeline_components.v1.dataproc import \
    DataprocPySparkBatchOp

In [None]:
TRAINING_DATA_URI = GCS_BUCKET + "/airbnb-new-user-bookings"

The shell commands below are going to download a local copy of the dataset from GCS. You can manually inspect it (once the cell has finished running) from the Files tab on the left (in Colab).

In [None]:
!gsutil cp spark_code.py {GCS_BUCKET}/code/{YOUR_NAME}/spark_code.py
!gsutil cp -r {GCS_BUCKET}/airbnb-new-user-bookings/ ./

In order to experiment with the data before committing it to a pipeline, we can use a local PySpark installation to grab and visualize the data.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType, DateType, FloatType
import pyspark.sql.functions as F

In [None]:
spark = SparkSession.builder.getOrCreate()

Now we have a Spark session, we can load the CSVs we pulled above into a DataFrame and start exploring it.

In [None]:
users_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv('./airbnb-new-user-bookings/users.csv')
users_df.show()

In [None]:
sessions_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv('./airbnb-new-user-bookings/sessions.csv')
sessions_df.show()

In the course of our exploration, we detect something... unusual... about the distribution of age columns.

In [None]:
import matplotlib.pyplot as plt

ages = users_df.select(F.col('age').cast('int')).rdd.flatMap(lambda x: x).collect()
plt.hist([age for age in ages if age is not None])
plt.yscale('log')
plt.title('Distribution of Ages')
plt.xlabel('Age')
plt.ylabel('Count')
plt.show()

It seems like there's some corruption somewhere in our data pipeline. Unless AirBNB really has a few ~2000-year-old customers, we need to figure out what these incorrect values mean and try to fix them.

In the cell below (with the `%%writefile` magic), write some Spark code that can pre-process the `users` table to deal with these erroneous fields.

Of course, feel free to create other exploratory cells above it to help you figure out how to investigate and solve the problem, but condense it all into the `%%writefile` cell once you're done, since our final Spark job needs to be a single, self-contained Spark application, not a bunch of Jupyter notebook cells.

In [None]:
%%writefile spark_code.py

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Your code goes here

# Putting it all together

Now that you've identified and solved the data issue, we need to make sure this fix can be applied automatically to all future training and evaulation runs. Below is the Kubeflow Pipeline we're using to train this model in production.

Given the pipeline below, modify it so that the data preprocessing Spark job you wrote above acts on the loaded data before it reaches the training job.

Hint: Try using the `DataprocPySparkBatchOp` component we imported above!

(You may find it useful to examine [this sample Notebook](https://colab.research.google.com/github/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/google_cloud_pipeline_components_dataproc_tabular.ipynb) in detail to see some more examples of more sophisticated pipelines that make use of `DataprocPySparkBatchOp`)

In [None]:
@dsl.component(base_image="python:3.8-slim")
def build_args() -> list:
    return [
        "--train-data-path",
        TRAINING_DATA_URI
    ]

@dsl.pipeline(name='airbnb-pipeline', description='A pipeline that loads CSV files, preprocesses the data, and trains a model')
def airbnb_pipeline(
        model_output_path: str
):
    # Define the pipeline steps
    build_args_op = build_args()    

    # Define the training step
    train_step = kfp.components.load_component_from_text("""
    name: Training
    description: This container runs a custom training routine.
    inputs:
    - name: trainingDataPath
      type: JsonArray
    outputs:
    - name: model
    implementation:
      container:
        image: rotman/training_image:v1
        command: ['python', 'train.py', {inputValue: trainingDataPath}, {outputPath: model}]
    """)
    train_op = train_step(build_args_op.output)

    # Connect the preprocessing step to the training step
    train_op = train_op.after(build_args_op)


You don't need to actually run this pipeline (although if you're feeling *very* adventurous you could; the easiest way to get access to a live Kubeflow instance would be on your own GCP account).

But let's at least compile the pipeline above into the `.json` file that would be submitted for execution. I can execute this JSON file to evaluate the solution!

Feel free to inspect the generated `pipeline.json` file on the "Files" tab on the left sidebar and see if you can get a feel for what it represents.

In [None]:
import kfp.v2.compiler as compiler
compiler.Compiler().compile(airbnb_pipeline, 'pipeline.json')
