In [1]:
from dotenv import find_dotenv, load_dotenv
import os
load_dotenv(find_dotenv())

True

In [2]:
import wandb

In [None]:
import prefect
from prefect import task, Flow, Parameter
from prefect.run_configs import LocalRun

In [3]:
from kaggle.api.kaggle_api_extended import KaggleApi

In [4]:
import pandas as pd
from zipfile import ZipFile
import json
from datetime import datetime

In [None]:
_DOWNLOAD = False
_FE = False
_TRAIN = True

In [None]:
def add_convert_for_wandb(artifact, path):
    
    artifact.add_dir(path, name="data")

    for file_name in os.listdir(path):
        if file_name.endswith(".csv"):
            path_to_file = os.path.join(path, file_name)
            tab_name = file_name.replace(".csv", "")
            print(f"adding {tab_name}")
            df = pd.read_csv(path_to_file)
            print(f"{tab_name}:{df.shape}")
            table = wandb.Table(dataframe=df)
            artifact.add(table, name=tab_name)
    
    return None

In [None]:
@task(log_stdout=True)
def download_and_log_kaggle_data(competition: str = "tabular-playground-series-mar-2022", project_name: str = "kaggle-tps-mar-2022-odsc"):

    logger = prefect.context.get("logger")

    print(f"starting new run for {project_name}")
    run = wandb.init(
        project=project_name, job_type="download", name=f"log-{competition}")

    api = KaggleApi()
    api.authenticate()
    api.competition_download_files(competition)
    zip_path = f"{competition}.zip"
    path_to_raw = os.path.join(".", "data", "raw")
    ZipFile(zip_path).extractall(path=path_to_raw)
    os.remove(zip_path)

    # TODO: Remove hack to add data secription
    if competition == "tabular-playground-series-mar-2022":
        data_description = """
            In this competition, you'll forecast twelve-hours of traffic flow in a major U.S. metropolitan area. Time, space, and directional features give you the chance to model interactions across a network of roadways.

            Files and Field Descriptions
            -------------------------------
            train.csv - the training set, comprising measurements of traffic congestion across 65 roadways from April through September of 1991.
            row_id - a unique identifier for this instance
            time - the 20-minute period in which each measurement was taken
            x - the east-west midpoint coordinate of the roadway
            y - the north-south midpoint coordinate of the roadway
            direction - the direction of travel of the roadway. EB indicates "eastbound" travel, for example, while SW indicates a "southwest" direction of travel.
            congestion - congestion levels for the roadway during each hour; the target. The congestion measurements have been normalized to the range 0 to 100.
            test.csv - the test set; you will make hourly predictions for roadways identified by a coordinate location and a direction of travel on the day of 1991-09-30.
            sample_submission.csv - a sample submission file in the correct format
        """

    raw_data_artifact = wandb.Artifact(
        name="raw", type=competition, description=data_description)
    add_convert_for_wandb(raw_data_artifact, path_to_raw)

    run.log_artifact(raw_data_artifact)
    run.finish()

    return None

In [None]:
if _DOWNLOAD:
    download_and_log_kaggle_data.run()

In [None]:
def feature_engineering(data):
    
#     data = raw_data.copy(deep=True)
    
    data['time'] = pd.to_datetime(data['time'])
    data['month'] = data['time'].dt.month
    data['weekday'] = data['time'].dt.weekday
    data['hour'] = data['time'].dt.hour
    data['minute'] = data['time'].dt.minute
    data['is_month_start'] = data['time'].dt.is_month_start.astype('int')
    data['is_month_end'] = data['time'].dt.is_month_end.astype('int')
    data['hour+minute'] = data['time'].dt.hour * 60 + data['time'].dt.minute
    data['is_weekend'] = (data['time'].dt.dayofweek > 4).astype('int')
    data['is_afternoon'] = (data['time'].dt.hour > 12).astype('int')
    data['x+y'] = data['x'].astype('str') + data['y'].astype('str')
    data['x+y+direction'] = data['x'].astype('str') + data['y'].astype('str') + data['direction'].astype('str')
    data['hour+direction'] = data['hour'].astype('str') + data['direction'].astype('str')
    data['hour+x+y'] = data['hour'].astype('str') + data['x'].astype('str') + data['y'].astype('str')
    data['hour+direction+x'] = data['hour'].astype('str') + data['direction'].astype('str') + data['x'].astype('str')
    data['hour+direction+y'] = data['hour'].astype('str') + data['direction'].astype('str') + data['y'].astype('str')
    data['hour+direction+x+y'] = data['hour'].astype('str') + data['direction'].astype('str') + data['x'].astype('str') + data['y'].astype('str')
    data['hour+x'] = data['hour'].astype('str') + data['x'].astype('str')
    data['hour+y'] = data['hour'].astype('str') + data['y'].astype('str')
#     data = data.drop(['time'], axis=1)
    return data

In [None]:
@task(log_stdout=True)
def feature_engineer_log_tps_2022(competition: str = "tabular-playground-series-mar-2022", project_name: str = "kaggle-tps-mar-2022-odsc"):
    run = wandb.init(
        project=project_name, job_type="feature_engineer", name=f"feature_engineer-{competition}")
    comp_data_art = run.use_artifact(f"{project_name}/raw:latest", type=competition)
    comp_data_path = os.path.join(comp_data_art.download(), "data")
    
    train_path = os.path.join(comp_data_path, "train.csv")
    test_path = os.path.join(comp_data_path, "test.csv")
    submission_path = os.path.join(comp_data_path, "sample_submission.csv")
    
    train_data = pd.read_csv(train_path, dtype={'time': str})
    test_data = pd.read_csv(test_path, dtype={'time': str})
    submission = pd.read_csv(submission_path)

    fe_train_data = feature_engineering(train_data)
    fe_test_data = feature_engineering(test_data)
    
    local_data_dir = os.path.join("..", "data")
    fe_path = os.path.join(local_data_dir, "fe")
    if not os.path.exists(fe_path):
        os.makedirs(fe_path)
    
    fe_train_data_path = os.path.join(fe_path, "fe_train.csv")
    fe_test_data_path = os.path.join(fe_path, "fe_test.csv")
    
    fe_train_data.to_csv(fe_train_data_path, index=False)
    fe_test_data.to_csv(fe_test_data_path, index=False)
    
    fe_artifact = wandb.Artifact(
        name="feature_engineered", type=competition)
    add_convert_for_wandb(fe_artifact, fe_path)
    
    run.log_artifact(fe_artifact)
    run.finish()
    
    return None

In [None]:
if _FE:
    feature_engineer_log_tps_2022.run()

In [5]:
from pycaret.regression import *

In [6]:
# Accessory function to call the collection of functions needed to convert useful information from the pycaret run into loggable artifacts for lineaging
def perform_experiment(exp):

    # Experiments are run by splitting a data into training and holdout internally, allowing their ability to make comparison

    # Runs an experiment which will compare different model types here and select the best model type
    best_model = compare_models()

    # Return the dataframe that shows the different metrics calculated for each of the tested model types
    leaderboard = get_leaderboard()
    # Get the internal names of the models for referential ID's in a DataFrame
    available_model_types = models()
    # Merge the above Dataframes
    model_comparison_results = leaderboard.reset_index().merge(available_model_types.reset_index(), left_on="Model Name", right_on="Name")

    # Takes the best model type from above and fine tune it to find the best hyperparameters, 
    # and collect useful information about the model during the tuning process
    tuned_finalized_model, tuned_model_results, tuner_cv_results = tune_and_finalize_model_with_metrics(best_model)
    return model_comparison_results, tuned_finalized_model, tuned_model_results, tuner_cv_results

In [7]:
# Function to take a PyCaret model and generate an optimized model and the results of the optimization steps in dataframes
def tune_and_finalize_model_with_metrics(model):
    tuned_model, tuner = tune_model(model, return_tuner=True)
    #Pull collects the latest calculated table from output/experimentation into a dataframe
    #The pull after a tune_model call will return the details of optimization steps over a variety of metrics
    tuned_model_results = pull().reset_index()
    tuned_model_results["index"] = tuned_model_results["index"].astype(str)

    #The tuner cv results will return scores and more internal details of the model as it was tested over the optimization search schema
    tuner_cv_results = pd.DataFrame(tuner.cv_results_).reset_index()
    tuner_cv_results["index"]  = tuner_cv_results["index"].astype(str)

    #We finalize the model to train over the whole dataset (no holdout/validation dataset split)
    tuned_finalized_model = finalize_model(tuned_model)
    return tuned_finalized_model, tuned_model_results, tuner_cv_results

In [8]:
#https://pycaret.readthedocs.io/en/latest/api/regression.html
#TODO: Hardcode the relationships between the features and the numeric vs categorical features
def setup_tps_2022_config(seed):
    config = {
        "target": "congestion",
        "fold_strategy" : 'timeseries',
        "session_id": seed,
        "ignore_features" : ["row_id"],
#         "transform_target": True,
        "experiment_name": f"tps_march_2022_{seed}",
        "silent": True,
#         "normalize": True,
#         "transformation": True,
        "ignore_low_variance": True,
        "remove_multicollinearity": True,
        "multicollinearity_threshold": 0.95,
    }
    return config

In [9]:
n=1
now = datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
competition="tabular-playground-series-mar-2022"
project_name = "kaggle-tps-mar-2022-odsc"

In [10]:
for i in range(n):
    #Use seed to create a unique configuration for the current pycaret experiment
    seed = i + 1

    config = setup_tps_2022_config(seed) #Your specific configs for pycaret data preparation
    
    #Initialize wandb run to begin logging for pycaret experiment
    run = wandb.init(project=project_name, reinit=True, config = config,
               name=f"train-seed-{seed}-{competition}")
    print(f"Seed: {seed}")
    # run.display(height=360)

    #Pull latest training data from wandb and load into df
    fe_data_art = run.use_artifact(f"{project_name}/feature_engineered:latest", type=competition)
    fe_data_path = os.path.join(fe_data_art.download(), "data")    

[34m[1mwandb[0m: Currently logged in as: [33ma-sh0ts[0m (use `wandb login --relogin` to force relogin)


Seed: 1


[34m[1mwandb[0m: Downloading large artifact feature_engineered:latest, 109.27MB. 4 files... Done. 0:0:0


In [11]:
all_train_data = pd.read_csv(os.path.join(fe_data_path, "fe_train.csv"))#.convert_dtypes()

In [12]:
# train_data = all_train_data.drop(["time"], axis=1)

In [13]:
# targetless_data = train_data.drop(["congestion"], axis=1)

In [14]:
# corr = pd.DataFrame(np.corrcoef(targetless_data.T))

In [15]:
train_data = all_train_data[["row_id", "time", "congestion", "x", "y", "direction"]]
train_data['time'] = pd.to_datetime(train_data['time'])

In [16]:
train_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 848835 entries, 0 to 848834
Data columns (total 6 columns):
 #   Column      Non-Null Count   Dtype         
---  ------      --------------   -----         
 0   row_id      848835 non-null  int64         
 1   time        848835 non-null  datetime64[ns]
 2   congestion  848835 non-null  int64         
 3   x           848835 non-null  int64         
 4   y           848835 non-null  int64         
 5   direction   848835 non-null  object        
dtypes: datetime64[ns](1), int64(4), object(1)
memory usage: 38.9+ MB


In [17]:
#setup and run experiment
#TODO: run with the proper generated features
ts_exp = setup(data=train_data, **config)
model_comparison_results, tuned_finalized_model, tuned_model_results, tuner_cv_results = perform_experiment(ts_exp)

Unnamed: 0,MAE,MSE,RMSE,R2,RMSLE,MAPE
0,6.8013,90.1223,9.4933,0.6817,0.2616,0.1865
1,6.7184,88.7165,9.4189,0.6887,0.2625,0.1867
2,6.6852,87.1274,9.3342,0.6925,0.2561,0.1834
3,6.6679,86.7386,9.3134,0.695,0.2623,0.1842
4,6.6556,87.1913,9.3376,0.688,0.2592,0.1779
5,6.6484,87.172,9.3366,0.6901,0.2594,0.1826
6,6.6302,86.3843,9.2943,0.6917,0.257,0.1848
7,6.6286,86.4232,9.2964,0.6929,0.2609,0.1775
8,6.6765,87.8222,9.3714,0.6894,0.2649,0.1841
9,6.6374,86.5798,9.3048,0.6926,0.2535,0.1816


In [18]:
# save model
model_title = f"{competition}-{now}-{seed}"
save_model(tuned_finalized_model, model_title)

# generate wandb tables from the results dfs from our experiment
model_artifacts = wandb.Artifact("model_artifacts", type=competition)

model_comparison_results_table = wandb.Table(dataframe=model_comparison_results.drop(["Index", "Model", "Name"], axis=1))
tuned_model_results_table = wandb.Table(dataframe=tuned_model_results)
tuner_cv_results_table = wandb.Table(dataframe=tuner_cv_results)

# add all objects to artifact
model_artifacts.add(model_comparison_results_table, "model_comparison_results_table")
model_artifacts.add(tuned_model_results_table, "tuned_model_results_table")
model_artifacts.add(tuner_cv_results_table, "tuner_cv_results_table")
model_artifacts.add_file(f"{model_title}.pkl", name="model.pkl")

run.log_artifact(model_artifacts)

run.finish()

# return None

Transformation Pipeline and Model Successfully Saved



VBox(children=(Label(value='1.222 MB of 1.222 MB uploaded (0.000 MB deduped)\r'), FloatProgress(value=1.0, max…

In [None]:
def configure_prefect_flow():

    with Flow("run-tps-2022-e2e") as flow:
        competition = Parameter(
            "competition", default="tabular-playground-series-mar-2022")
        project_name = Parameter(
            "project_name", default="kaggle-tps-mar-2022-odsc")
        download_and_log_kaggle_data(competition=competition, project_name=project_name)
        feature_engineer_log_tps_2022(competition=competition, project_name=project_name)

    # Configure the `PROJECT` environment variable for this flow
    flow.run_config = LocalRun(
        env={"KAGGLE_USERNAME": os.environ["KAGGLE_USERNAME"],
             "KAGGLE_KEY": os.environ["KAGGLE_KEY"], "WANDB_API_KEY": os.environ["WANDB_API_KEY"]})

    flow.register(project_name="odsc-east-2022")
    # flow.run()