Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dev #19

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open

dev #19

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions models/core/fct_lap_times.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
with lap_times as (
select
{{ dbt_utils.generate_surrogate_key(['race_id', 'driver_id', 'lap']) }} as lap_times_id,
race_id as race_id,
driver_id as driver_id,
lap as lap,
driver_position as driver_position,
lap_time_formatted as lap_time_formatted,
official_laptime as official_laptime,
lap_time_milliseconds as lap_time_milliseconds
from {{ ref('stg_lap_times') }}
)
select * from lap_times
17 changes: 17 additions & 0 deletions models/marts/aggregates/agg_lap_times_moving_avg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import pandas as pd

def model(dbt, session):
# dbt configuration
dbt.config(packages=["pandas"])

# get upstream data
lap_times = dbt.ref("mrt_lap_times_years").to_pandas()

# describe the data
lap_times["LAP_TIME_SECONDS"] = lap_times["LAP_TIME_MILLISECONDS"]/1000
lap_time_trends = lap_times.groupby(by="RACE_YEAR")["LAP_TIME_SECONDS"].mean().to_frame()
lap_time_trends.reset_index(inplace=True)
lap_time_trends["LAP_MOVING_AVG_5_YEARS"] = lap_time_trends["LAP_TIME_SECONDS"].rolling(5).mean()
lap_time_trends.columns = lap_time_trends.columns.str.upper()

return lap_time_trends.round(1)
19 changes: 19 additions & 0 deletions models/marts/mrt_lap_times_years.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
with lap_times as (
select * from {{ ref('fct_lap_times') }}
),
races as (
select * from {{ ref('dim_races') }}
),
expanded_lap_times_by_year as (
select
lap_times.race_id,
driver_id,
race_year,
lap,
lap_time_milliseconds
from lap_times
left join races
on lap_times.race_id = races.race_id
where lap_time_milliseconds is not null
)
select * from expanded_lap_times_by_year
2 changes: 1 addition & 1 deletion models/ml/prep_encoding_splitting/covariate_encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

def model(dbt, session):
# dbt configuration
dbt.config(packages=["pandas","numpy","scikit-learn"])
dbt.config(packages=["pandas==1.5.3","numpy","scikit-learn"])

# get upstream data
data = dbt.ref("ml_data_prep").to_pandas()
Expand Down
2 changes: 1 addition & 1 deletion models/ml/prep_encoding_splitting/ml_data_prep.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

def model(dbt, session):
# dbt configuration
dbt.config(packages=["pandas"])
dbt.config(packages=["pandas==1.5.3"])

# get upstream data
fct_results = dbt.ref("mrt_results_circuits").to_pandas()
Expand Down
92 changes: 92 additions & 0 deletions models/ml/training_and_predictions/apply_prediction_to_position.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import logging
import joblib
import pandas as pd
import os
from snowflake.snowpark import types as T

DB_STAGE = 'MODELSTAGE'
version = '1.0'
# The name of the model file
model_file_path = 'driver_position_'+version
model_file_packaged = 'driver_position_'+version+'.joblib'

# This is a local directory, used for storing the various artifacts locally
LOCAL_TEMP_DIR = f'/tmp/driver_position'
DOWNLOAD_DIR = os.path.join(LOCAL_TEMP_DIR, 'download')
TARGET_MODEL_DIR_PATH = os.path.join(LOCAL_TEMP_DIR, 'ml_model')
TARGET_LIB_PATH = os.path.join(LOCAL_TEMP_DIR, 'lib')

# The feature columns that were used during model training
# and that will be used during prediction
FEATURE_COLS = [
"RACE_YEAR"
,"RACE_NAME"
,"GRID"
,"CONSTRUCTOR_NAME"
,"DRIVER"
,"DRIVERS_AGE_YEARS"
,"DRIVER_CONFIDENCE"
,"CONSTRUCTOR_RELAIBLITY"
,"TOTAL_PIT_STOPS_PER_RACE"]

def register_udf_for_prediction(p_predictor ,p_session ,p_dbt):

# The prediction udf

def predict_position(p_df: T.PandasDataFrame[int, int, int, int,
int, int, int, int, int]) -> T.PandasSeries[int]:
# Snowpark currently does not set the column name in the input dataframe
# The default col names are like 0,1,2,... Hence we need to reset the column
# names to the features that we initially used for training.
p_df.columns = [*FEATURE_COLS]

# Perform prediction. this returns an array object
pred_array = p_predictor.predict(p_df)
# Convert to series
df_predicted = pd.Series(pred_array)
return df_predicted

# The list of packages that will be used by UDF
udf_packages = p_dbt.config.get('packages')

predict_position_udf = p_session.udf.register(
predict_position
,name=f'predict_position'
,packages = udf_packages
)
return predict_position_udf

def download_models_and_libs_from_stage(p_session):
p_session.file.get(f'@{DB_STAGE}/{model_file_path}/{model_file_packaged}', DOWNLOAD_DIR)

def load_model(p_session):
# Load the model and initialize the predictor
model_fl_path = os.path.join(DOWNLOAD_DIR, model_file_packaged)
predictor = joblib.load(model_fl_path)
return predictor

# -------------------------------
def model(dbt, session):
dbt.config(
packages = ['snowflake-snowpark-python' ,'scipy','scikit-learn' ,'pandas' ,'numpy'],
materialized = "table",
tags = "predict",
use_anonymous_sproc=True
)
session._use_scoped_temp_objects = False
download_models_and_libs_from_stage(session)
predictor = load_model(session)
predict_position_udf = register_udf_for_prediction(predictor, session ,dbt)

# Retrieve the data, and perform the prediction
hold_out_df = (dbt.ref("hold_out_dataset_for_prediction")
.select(*FEATURE_COLS)
)
trained_model_file = dbt.ref("train_model_to_predict_position")

# Perform prediction.
new_predictions_df = hold_out_df.withColumn("position_predicted"
,predict_position_udf(*FEATURE_COLS)
)

return new_predictions_df
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import snowflake.snowpark.functions as F
from sklearn.model_selection import train_test_split
import pandas as pd
from sklearn.metrics import confusion_matrix, balanced_accuracy_score
import io
from sklearn.linear_model import LogisticRegression
from joblib import dump, load
import joblib
import logging
import sys
from joblib import dump, load

logger = logging.getLogger("mylog")

def save_file(session, model, path, dest_filename):
input_stream = io.BytesIO()
joblib.dump(model, input_stream)
session._conn.upload_stream(input_stream, path, dest_filename)
return "successfully created file: " + path

def model(dbt, session):
dbt.config(
packages = ['numpy','scikit-learn','pandas','numpy','joblib','cachetools'],
materialized = "table",
tags = "train"
)
# Create a stage in Snowflake to save our model file
session.sql('create or replace stage MODELSTAGE').collect()

#session._use_scoped_temp_objects = False
version = "1.0"
logger.info('Model training version: ' + version)

# read in our training and testing upstream dataset
test_train_df = dbt.ref("training_testing_dataset")

# cast snowpark df to pandas df
test_train_pd_df = test_train_df.to_pandas()
target_col = "POSITION_LABEL"

# split out covariate predictors, x, from our target column position_label, y.
split_X = test_train_pd_df.drop([target_col], axis=1)
split_y = test_train_pd_df[target_col]

# Split out our training and test data into proportions
X_train, X_test, y_train, y_test = train_test_split(split_X, split_y, train_size=0.7, random_state=42)
train = [X_train, y_train]
test = [X_test, y_test]
# now we are only training our one model to deploy
# we are keeping the focus on the workflows and not algorithms for this lab!
model = LogisticRegression()

# fit the preprocessing pipeline and the model together
model.fit(X_train, y_train)
y_pred = model.predict_proba(X_test)[:,1]
predictions = [round(value) for value in y_pred]
balanced_accuracy = balanced_accuracy_score(y_test, predictions)

# Save the model to a stage
save_file(session, model, "@MODELSTAGE/driver_position_"+version, "driver_position_"+version+".joblib" )
logger.info('Model artifact:' + "@MODELSTAGE/driver_position_"+version+".joblib")

# Take our pandas training and testing dataframes and put them back into snowpark dataframes
snowpark_train_df = session.write_pandas(pd.concat(train, axis=1, join='inner'), "train_table", auto_create_table=True, create_temp_table=True)
snowpark_test_df = session.write_pandas(pd.concat(test, axis=1, join='inner'), "test_table", auto_create_table=True, create_temp_table=True)

# Union our training and testing data together and add a column indicating train vs test rows
return snowpark_train_df.with_column("DATASET_TYPE", F.lit("train")).union(snowpark_test_df.with_column("DATASET_TYPE", F.lit("test")))
3 changes: 3 additions & 0 deletions models/staging/display_moving_avg_laptimes.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
--select * from {{ ref('apply_prediction_to_position') }}

select * from {{ ref('apply_prediction_to_position') }} order by position_predicted
4 changes: 4 additions & 0 deletions package-lock.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
packages:
- package: dbt-labs/dbt_utils
version: 1.0.0
sha1_hash: efa9169fb1f1a1b2c967378c02b60e3d85ae464b