In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
import os
import pickle
import pandas as pd
from sklearn.feature_extraction import DictVectorizer

In [2]:
pip install wandb

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting wandb
  Downloading wandb-0.15.3-py3-none-any.whl (2.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.0/2.0 MB[0m [31m20.6 MB/s[0m eta [36m0:00:00[0m
Collecting GitPython!=3.1.29,>=1.0.0 (from wandb)
  Downloading GitPython-3.1.31-py3-none-any.whl (184 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m184.3/184.3 kB[0m [31m17.2 MB/s[0m eta [36m0:00:00[0m
Collecting sentry-sdk>=1.0.0 (from wandb)
  Downloading sentry_sdk-1.25.0-py2.py3-none-any.whl (206 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m206.5/206.5 kB[0m [31m18.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting docker-pycreds>=0.4.0 (from wandb)
  Downloading docker_pycreds-0.4.0-py2.py3-none-any.whl (9.0 kB)
Collecting pathtools (from wandb)
  Downloading pathtools-0.1.2.tar.gz (11 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting

In [4]:
import wandb

In [5]:
def dump_pickle(obj, filename: str):
    with open(filename, "wb") as f_out:
        return pickle.dump(obj, f_out)

In [6]:
def read_dataframe(filename: str):
    df = pd.read_parquet(filename)

    df["duration"] = df["lpep_dropoff_datetime"] - df["lpep_pickup_datetime"]
    df.duration = df.duration.apply(lambda td: td.total_seconds() / 60)
    df = df[(df.duration >= 1) & (df.duration <= 60)]

    categorical = ["PULocationID", "DOLocationID"]
    df[categorical] = df[categorical].astype(str)

    return df

In [7]:
def preprocess(df: pd.DataFrame, dv: DictVectorizer, fit_dv: bool = False):
    df["PU_DO"] = df["PULocationID"] + "_" + df["DOLocationID"]
    categorical = ["PU_DO"]
    numerical = ["trip_distance"]
    dicts = df[categorical + numerical].to_dict(orient="records")
    if fit_dv:
        X = dv.fit_transform(dicts)
    else:
        X = dv.transform(dicts)
    return X, dv

In [9]:
def run_data_prep(
    wandb_project: str,
    wandb_entity: str,
    raw_data_path: str,
    dest_path: str,
    dataset: str = "green",
):
    # Initialize a Weights & Biases run
    wandb.init(project=wandb_project, entity=wandb_entity, job_type="preprocess")

    # Load parquet files
    df_train = read_dataframe(
        os.path.join(raw_data_path, f"{dataset}_tripdata_2022-01.parquet")
    )
    df_val = read_dataframe(
        os.path.join(raw_data_path, f"{dataset}_tripdata_2022-02.parquet")
    )
    df_test = read_dataframe(
        os.path.join(raw_data_path, f"{dataset}_tripdata_2022-03.parquet")
    )

    # Extract the target
    target = "tip_amount"
    y_train = df_train[target].values
    y_val = df_val[target].values
    y_test = df_test[target].values

    # Fit the DictVectorizer and preprocess data
    dv = DictVectorizer()
    X_train, dv = preprocess(df_train, dv, fit_dv=True)
    X_val, _ = preprocess(df_val, dv, fit_dv=False)
    X_test, _ = preprocess(df_test, dv, fit_dv=False)

    # Create dest_path folder unless it already exists
    os.makedirs(dest_path, exist_ok=True)

    # Save DictVectorizer and datasets
    dump_pickle(dv, os.path.join(dest_path, "dv.pkl"))
    dump_pickle((X_train, y_train), os.path.join(dest_path, "train.pkl"))
    dump_pickle((X_val, y_val), os.path.join(dest_path, "val.pkl"))
    dump_pickle((X_test, y_test), os.path.join(dest_path, "test.pkl"))

    artifact = wandb.Artifact("NYC-Taxi", type="preprocessed_dataset")
    artifact.add_dir(dest_path)
    wandb.log_artifact(artifact)

In [11]:
if __name__ == "__main__":
    run_data_prep(wandb_project = "ML Preprocess",wandb_entity = "abeluxer",
                  raw_data_path = "/content/drive/MyDrive/MLOPs Zoomcamp/Data",dest_path = "/content/drive/MyDrive/MLOPs Zoomcamp/Data_preprocessed_wandb")

VBox(children=(Label(value='0.042 MB of 0.042 MB uploaded (0.000 MB deduped)\r'), FloatProgress(value=1.0, max…

[34m[1mwandb[0m: Adding directory to artifact (/content/drive/MyDrive/MLOPs Zoomcamp/Data_preprocessed_wandb)... Done. 0.1s


W and B Model Logging

In [14]:
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error

In [15]:
def load_pickle(filename: str):
    with open(filename, "rb") as f_in:
        return pickle.load(f_in)        

In [16]:
def run_train(
    wandb_project: str,
    wandb_entity: str,
    data_artifact: str,
    max_depth: int,
    random_state: int,
):
    # Initialize a Weights & Biases run
    wandb.init(
        project=wandb_project,
        entity=wandb_entity,
        job_type="train",
        config={"max_depth": max_depth, "random_state": random_state},
    )

    # Fetch the preprocessed dataset from artifacts
    artifact = wandb.use_artifact(data_artifact, type="preprocessed_dataset")
    data_path = artifact.download()

    X_train, y_train = load_pickle(os.path.join(data_path, "train.pkl"))
    X_val, y_val = load_pickle(os.path.join(data_path, "val.pkl"))

    # Define the Randomforest Regressor Mode, train the model and perform prediction
    rf = RandomForestRegressor(max_depth=max_depth, random_state=random_state)
    rf.fit(X_train, y_train)
    y_pred = rf.predict(X_val)

    mse = mean_squared_error(y_val, y_pred, squared=False)
    wandb.log({"MSE": mse})

    with open("regressor.pkl", "wb") as f:
        pickle.dump(rf, f)

    # TODO: Log `regressor.pkl` as an artifact of type `model`
    artifact = wandb.Artifact(f"{wandb.run.id}-model", type="model")
    artifact.add_file("regressor.pkl")

In [17]:
if __name__ == "__main__":
    run_train(wandb_project = "ML Preprocess",wandb_entity = "abeluxer",
              data_artifact = "abeluxer/ML Preprocess/NYC-Taxi:v0",
              max_depth = 10, random_state = 42)

[34m[1mwandb[0m:   4 of 4 files downloaded.  


Pick best model

In [18]:
import os
import pickle

In [19]:
from functools import partial

import wandb

from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error

In [23]:
def run_train(data_artifact: str):
    wandb.init()
    config = wandb.config

    # Fetch the preprocessed dataset from artifacts
    artifact = wandb.use_artifact(data_artifact, type="preprocessed_dataset")
    data_path = artifact.download()

    X_train, y_train = load_pickle(os.path.join(data_path, "train.pkl"))
    X_val, y_val = load_pickle(os.path.join(data_path, "val.pkl"))

    # Define the XGBoost Regressor Mode, train the model and perform prediction
    # TODO: Pass the parameters n_estimators, min_samples_split, min_samples_leaf from `config` to `RandomForestRegressor`
    rf = RandomForestRegressor(max_depth=config.max_depth, n_estimators = config.n_estimators, min_samples_split = config.min_samples_split,
                               min_samples_leaf= config.min_samples_leaf,random_state=0)
    rf.fit(X_train, y_train)
    y_pred = rf.predict(X_val)

    mse = mean_squared_error(y_val, y_pred, squared=False)
    wandb.log({"MSE": mse})

    with open("regressor.pkl", "wb") as f:
        pickle.dump(rf, f)

    artifact = wandb.Artifact(f"{wandb.run.id}-model", type="model")
    artifact.add_file("regressor.pkl")
    wandb.log_artifact(artifact)

In [24]:
SWEEP_CONFIG = {
    "method": "bayes",
    "metric": {"name": "MSE", "goal": "minimize"},
    "parameters": {
        "max_depth": {
            "distribution": "int_uniform",
            "min": 1,
            "max": 20,
        },
        "n_estimators": {
            "distribution": "int_uniform",
            "min": 10,
            "max": 50,
        },
        "min_samples_split": {
            "distribution": "int_uniform",
            "min": 2,
            "max": 10,
        },
        "min_samples_leaf": {
            "distribution": "int_uniform",
            "min": 1,
            "max": 4,
        },
    },
}

In [25]:
def run_sweep(wandb_project: str, wandb_entity: str, data_artifact: str, count: int):
    sweep_id = wandb.sweep(SWEEP_CONFIG, project=wandb_project, entity=wandb_entity)
    wandb.agent(sweep_id, partial(run_train, data_artifact=data_artifact), count=count)

In [26]:
if __name__ == "__main__":
    run_sweep(wandb_project = "ML Preprocess",wandb_entity = "abeluxer",
              data_artifact = "abeluxer/ML Preprocess/NYC-Taxi:v0",count=5)



Create sweep with ID: 53yo2sud
Sweep URL: https://wandb.ai/abeluxer/ML%20Preprocess/sweeps/53yo2sud


[34m[1mwandb[0m: Agent Starting Run: pjpygxtn with config:
[34m[1mwandb[0m: 	max_depth: 3
[34m[1mwandb[0m: 	min_samples_leaf: 3
[34m[1mwandb[0m: 	min_samples_split: 3
[34m[1mwandb[0m: 	n_estimators: 11


[34m[1mwandb[0m:   4 of 4 files downloaded.  


VBox(children=(Label(value='0.014 MB of 0.014 MB uploaded (0.000 MB deduped)\r'), FloatProgress(value=1.0, max…

0,1
MSE,▁

0,1
MSE,2.47105


[34m[1mwandb[0m: Agent Starting Run: huhai0j1 with config:
[34m[1mwandb[0m: 	max_depth: 10
[34m[1mwandb[0m: 	min_samples_leaf: 1
[34m[1mwandb[0m: 	min_samples_split: 8
[34m[1mwandb[0m: 	n_estimators: 47
[34m[1mwandb[0m: Currently logged in as: [33mabeluxer[0m. Use [1m`wandb login --relogin`[0m to force relogin


[34m[1mwandb[0m:   4 of 4 files downloaded.  


VBox(children=(Label(value='0.527 MB of 0.527 MB uploaded (0.000 MB deduped)\r'), FloatProgress(value=1.0, max…

0,1
MSE,▁

0,1
MSE,2.46278


[34m[1mwandb[0m: Agent Starting Run: 5zli6niz with config:
[34m[1mwandb[0m: 	max_depth: 11
[34m[1mwandb[0m: 	min_samples_leaf: 3
[34m[1mwandb[0m: 	min_samples_split: 2
[34m[1mwandb[0m: 	n_estimators: 47


[34m[1mwandb[0m:   4 of 4 files downloaded.  


VBox(children=(Label(value='1.636 MB of 1.636 MB uploaded (0.000 MB deduped)\r'), FloatProgress(value=1.0, max…

0,1
MSE,▁

0,1
MSE,2.44997


[34m[1mwandb[0m: Agent Starting Run: v6epdr0k with config:
[34m[1mwandb[0m: 	max_depth: 17
[34m[1mwandb[0m: 	min_samples_leaf: 1
[34m[1mwandb[0m: 	min_samples_split: 2
[34m[1mwandb[0m: 	n_estimators: 44


[34m[1mwandb[0m:   4 of 4 files downloaded.  


0,1
MSE,▁

0,1
MSE,2.45415


[34m[1mwandb[0m: Agent Starting Run: mtzye3qg with config:
[34m[1mwandb[0m: 	max_depth: 13
[34m[1mwandb[0m: 	min_samples_leaf: 3
[34m[1mwandb[0m: 	min_samples_split: 2
[34m[1mwandb[0m: 	n_estimators: 45


[34m[1mwandb[0m:   4 of 4 files downloaded.  


0,1
MSE,▁

0,1
MSE,2.44893


Error in callback <function _WandbInit._pause_backend at 0x7f4271f2a950> (for post_run_cell):


BrokenPipeError: ignored