# Prep F1 data for ML model
Reads in data from `fct_results` dbt table and prepares it for Machine Learning use cases.

This notebook mimics https://docs.getdbt.com/guides/dbt-python-snowpark?step=11#covariate-encoding, but builds the preprocessing pipeline using Snowpark in a notebook instead of pandas dbt Python model.

Notebooks are better suited for pipeline development, allowing the developer to more easily see the intermediate steps and debug the pipeline. The final pipeline is saved to a Snowflake Stage so it can be called by downstream dbt Python models for producing predictions. 

### Imports

In [56]:
import os
import warnings
from pprint import pprint

import joblib
import numpy as np
import snowflake.ml.modeling.preprocessing as snowml
from dotenv import load_dotenv
from snowflake.ml.modeling.metrics.correlation import correlation
from snowflake.ml.modeling.pipeline import Pipeline
from snowflake.snowpark import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark import types as T
from snowflake.snowpark.version import VERSION as SNOWPARK_VERSION

warnings.simplefilter("ignore")

### Connect to Snowflake

In [57]:
load_dotenv()  # Get the environment variables from .env file

# Store SNOWFLAKE_ML_USER and SNOWFLAKE_ML_PASSWORD in .env file (not committed to git)
session = Session.builder.configs(
    {
        "account": os.getenv("SNOWFLAKE_ACCOUNT"),
        "database": "FORMULA1",
        "warehouse": "TRANSFORMING",
        "role": "TRANSFORMER",
        "schema": "DBT_GREG",  # Replace with your dev schema
        "user": os.getenv("SNOWFLAKE_ML_USER"),
        "password": os.getenv("SNOWFLAKE_ML_PASSWORD"),
    }
).create()
session.sql_simplifier_enabled = True
snowflake_env = session.sql("select current_user(), current_version()").collect()

pprint("Connected to Snowflake with the following parameters:")
pprint(f"User: {snowflake_env[0][0]}")
pprint(f"Role: {session.get_current_role()}")
pprint(f"Database: {session.get_current_database()}")
pprint(f"Warehouse: {session.get_current_warehouse()}")
pprint(f"Schema: {session.get_current_schema()}")
pprint(f"Snowflake version: {snowflake_env[0][1]}")
pprint(
    f"Snowpark for Python version: {SNOWPARK_VERSION[0]}.{SNOWPARK_VERSION[1]}.{SNOWPARK_VERSION[2]}"
)

'Connected to Snowflake with the following parameters:'
'User: GREG_CLUNIES'
'Role: "TRANSFORMER"'
'Database: "FORMULA1"'
'Warehouse: "TRANSFORMING"'
'Schema: "DBT_GREG"'
'Snowflake version: 8.7.1'
'Snowpark for Python version: 1.12.1'


### Preview the data

In [58]:
features_df = session.table("F1_FEATURES")
features_df.show()

---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"RESULT_ID"  |"RACE_ID"  |"RACE_YEAR"  |"RACE_ROUND"  |"CIRCUIT_ID"  |"CIRCUIT_NAME"         |"CIRCUIT_REF"  |"LOCATION"    |"COUNTRY"  |"LATITUDE"  |"LONGITUDE"  |"ALTITUDE"  |"TOTAL_PIT_STOPS_PER_RACE"  |"RACE_DATE"  |"RACE_TIME"  |"DRIVER_ID"  |"DRIVER"          |"DRIVER_

### Feature Engineering & Selection
ML models require features in certain format to make predictions, often not human-readable. We can use Snowpark optimized preprocessiong functions to transform the data into the format required by the model.

If you're lucky, you know what features to use for your model. But often, you don't and will need to perform include techniques like recursive feature elimination, feature importance, etc. A list of Snowpark optimized functions can be found [here](https://docs.snowflake.com/en/developer-guide/snowpark-ml/reference/latest/modeling#snowflake-ml-modeling-feature-selection).

For this demo, we will only show some simple feature engineering. We will explicitly select the features we want to use.

In [59]:
target = ["POSITION_LABEL"]  # 1 = podium, 2 = points, 3 = no points
features = [
    "RACE_YEAR",
    "CIRCUIT_NAME",
    "GRID",
    "CONSTRUCTOR_NAME",
    "DRIVER",
    "DRIVERS_AGE_YEARS",
    "DRIVER_CONFIDENCE",
    "CONSTRUCTOR_RELIABILITY",
    "TOTAL_PIT_STOPS_PER_RACE",
]
features_df = features_df.select(target + features)
features_df.show()

----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"POSITION_LABEL"  |"RACE_YEAR"  |"CIRCUIT_NAME"         |"GRID"  |"CONSTRUCTOR_NAME"  |"DRIVER"          |"DRIVERS_AGE_YEARS"  |"DRIVER_CONFIDENCE"  |"CONSTRUCTOR_RELIABILITY"  |"TOTAL_PIT_STOPS_PER_RACE"  |
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|1                 |2010         |BAHRAIN_GRAND_PRIX     |4       |MCLAREN             |LEWIS_HAMILTON    |25                   |0.911215             |0.855491                   |0                           |
|2                 |2010         |BAHRAIN_GRAND_PRIX     |1       |RED_BULL            |SEBASTIAN_VETTEL  |23                   |0.902326             |0.847025     

In [60]:
# Normalizing the numeric features
snowml_mms = snowml.MinMaxScaler(
    clip=True,
    input_cols=[
        "GRID",
        "DRIVER_CONFIDENCE",
        "CONSTRUCTOR_RELIABILITY",
        "TOTAL_PIT_STOPS_PER_RACE",
    ],
    output_cols=[
        "GRID_NORM",
        "DRIVER_CONFIDENCE_NORM",
        "CONSTRUCTOR_RELIABILITY_NORM",
        "TOTAL_PIT_STOPS_PER_RACE_NORM",
    ],
)
normalized_features_df = snowml_mms.fit(features_df).transform(features_df)
normalized_features_df.show()

-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"GRID_NORM"           |"DRIVER_CONFIDENCE_NORM"  |"CONSTRUCTOR_RELIABILITY_NORM"  |"TOTAL_PIT_STOPS_PER_RACE_NORM"  |"POSITION_LABEL"  |"RACE_YEAR"  |"CIRCUIT_NAME"         |"GRID"  |"CONSTRUCTOR_NAME"  |"DRIVER"          |"DRIVERS_AGE_YEARS"  |"DRIVER_CONFIDENCE"  |"CONSTRUCTOR_RELIABILITY"  |"TOTAL_PIT_STOPS_PER_RACE"  |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|0.16666666666666666  

In [61]:
# One hot encoding the categorical features
snowml_ohe = snowml.OneHotEncoder(
    input_cols=["CIRCUIT_NAME", "CONSTRUCTOR_NAME", "DRIVER"],
    output_cols=["CIRCUIT_NAME_OHE", "CONSTRUCTOR_NAME_OHE", "DRIVER_OHE"],
)
ohe_features_df = snowml_ohe.fit(normalized_features_df).transform(
    normalized_features_df
)

np.array(ohe_features_df.columns)

array(['CIRCUIT_NAME_OHE_70TH_ANNIVERSARY_GRAND_PRIX',
       'CIRCUIT_NAME_OHE_ABU_DHABI_GRAND_PRIX',
       'CIRCUIT_NAME_OHE_AUSTRALIAN_GRAND_PRIX',
       'CIRCUIT_NAME_OHE_AUSTRIAN_GRAND_PRIX',
       'CIRCUIT_NAME_OHE_AZERBAIJAN_GRAND_PRIX',
       'CIRCUIT_NAME_OHE_BAHRAIN_GRAND_PRIX',
       'CIRCUIT_NAME_OHE_BELGIAN_GRAND_PRIX',
       'CIRCUIT_NAME_OHE_BRAZILIAN_GRAND_PRIX',
       'CIRCUIT_NAME_OHE_BRITISH_GRAND_PRIX',
       'CIRCUIT_NAME_OHE_CANADIAN_GRAND_PRIX',
       'CIRCUIT_NAME_OHE_CHINESE_GRAND_PRIX',
       'CIRCUIT_NAME_OHE_EIFEL_GRAND_PRIX',
       'CIRCUIT_NAME_OHE_EMILIA_ROMAGNA_GRAND_PRIX',
       'CIRCUIT_NAME_OHE_EUROPEAN_GRAND_PRIX',
       'CIRCUIT_NAME_OHE_FRENCH_GRAND_PRIX',
       'CIRCUIT_NAME_OHE_GERMAN_GRAND_PRIX',
       'CIRCUIT_NAME_OHE_HUNGARIAN_GRAND_PRIX',
       'CIRCUIT_NAME_OHE_INDIAN_GRAND_PRIX',
       'CIRCUIT_NAME_OHE_ITALIAN_GRAND_PRIX',
       'CIRCUIT_NAME_OHE_JAPANESE_GRAND_PRIX',
       'CIRCUIT_NAME_OHE_KOREAN_GRAND_PRIX',
       '

## Preprocessing Pipeline

Let's package the feature engineering into a pipeline that can be used in downstream dbt models, Snowpark ML models, or other notebooks.

In [62]:
NUMERIC_COLUMNS = [
    "GRID",
    "DRIVER_CONFIDENCE",
    "CONSTRUCTOR_RELIABILITY",
    "TOTAL_PIT_STOPS_PER_RACE",
]
NUMERIC_COLUMNS_NORM = [
    "GRID_NORM",
    "DRIVER_CONFIDENCE_NORM",
    "CONSTRUCTOR_RELIABILITY_NORM",
    "TOTAL_PIT_STOPS_PER_RACE_NORM",
]
CATEGORICAL_COLUMNS = [
    "CIRCUIT_NAME",
    "CONSTRUCTOR_NAME",
    "DRIVER",
]
CATEGORICAL_COLUMNS_OHE = [
    "CIRCUIT_NAME_OHE",
    "CONSTRUCTOR_NAME_OHE",
    "DRIVER_OHE",
]

In [67]:
preprocess_pipeline = Pipeline(
    steps=[
        (
            "MMS",
            snowml.MinMaxScaler(
                clip=True,
                input_cols=NUMERIC_COLUMNS,
                output_cols=NUMERIC_COLUMNS_NORM,
            ),
        ),
        (
            "OHE",
            snowml.OneHotEncoder(
                input_cols=CATEGORICAL_COLUMNS,
                output_cols=CATEGORICAL_COLUMNS_OHE,
            ),
        ),
    ]
)

In [68]:
PIPELINE_FILE = "f1_preprocess_pipeline.joblib"
joblib.dump(preprocess_pipeline, PIPELINE_FILE)  # Save to local pickle file

transformed_features_df = preprocess_pipeline.fit(features_df).transform(features_df)
transformed_features_df.show()

----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [69]:
# Create a stage in Snowflake to upload the preprocess pipeline file
SCHEMA_NAME = "FORMULA1_INTERNAL_STAGES"
STAGE_NAME = "ML_F1_STAGE"
session.sql(f"create schema if not exists {SCHEMA_NAME}").collect()
session.sql(f"create or replace stage FORMULA1_INTERNAL_STAGES.{STAGE_NAME}").collect()
session.file.put(PIPELINE_FILE, f"@{SCHEMA_NAME}.{STAGE_NAME}", overwrite=True)

[PutResult(source='f1_preprocess_pipeline.joblib', target='f1_preprocess_pipeline.joblib.gz', source_size=1570, target_size=912, source_compression='NONE', target_compression='GZIP', status='UPLOADED', message='')]