Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Creating integration tests for quick-start for ranking #1015

Merged
merged 10 commits into from
Jul 4, 2023
45 changes: 31 additions & 14 deletions examples/quick_start/scripts/preproc/preprocessing.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import gc
import logging
import os
import shutil
from functools import reduce
from typing import Optional

Expand All @@ -12,6 +13,7 @@
from .args_parsing import parse_arguments

INDEX_TMP_COL = "__index"
NVT_OUTPUT_FOLDER = "nvt_outputs"


def filter_by_freq(df_to_filter, df_for_stats, column, min_freq=None, max_freq=None):
Expand Down Expand Up @@ -221,7 +223,8 @@ def generate_nvt_features(self):

for col in args.categorical_features:
feats[col] = [col] >> nvt_ops.Categorify(
freq_threshold=args.categ_min_freq_capping
freq_threshold=args.categ_min_freq_capping,
out_path=NVT_OUTPUT_FOLDER,
)
for col in args.continuous_features:
feats[col] = [col]
Expand All @@ -244,14 +247,13 @@ def generate_nvt_features(self):

if args.target_encoding_targets and args.target_encoding_features:
for target_col in args.target_encoding_targets:
feats[f"{target_col}_te_features"] = (
args.target_encoding_features
>> nvt.ops.TargetEncoding(
[target_col],
kfold=args.target_encoding_kfold,
p_smooth=args.target_encoding_smoothing,
out_dtype="float32",
)
feats[
f"{target_col}_te_features"
] = args.target_encoding_features >> nvt.ops.TargetEncoding(
[target_col],
kfold=args.target_encoding_kfold,
p_smooth=args.target_encoding_smoothing,
out_dtype="float32",
)

for col in args.user_features:
Expand Down Expand Up @@ -322,7 +324,9 @@ def merge_dataset_features_values(
).excluding_by_name([INDEX_TMP_COL])

dataset_joint = nvt.Dataset(
dataset_joint, schema=schema_joint, cpu=not self.gpu,
dataset_joint,
schema=schema_joint,
cpu=not self.gpu,
)

return dataset_joint
Expand Down Expand Up @@ -430,6 +434,16 @@ def run(self):

output_dataset_path = args.output_path

nvt_outputs_folder = os.path.join(output_dataset_path, NVT_OUTPUT_FOLDER)

if os.path.exists(nvt_outputs_folder):
logging.info(
"The NVTabular output folder already exists and is "
"being deleted: {nvt_outputs_folder}"
)
# Delete Folder code
shutil.rmtree(nvt_outputs_folder)

train_dataset = nvt.Dataset(ddf, cpu=not self.gpu)
# Processing features and targets in separate workflows, because
# targets might not be available for test/predict_dataset
Expand All @@ -442,7 +456,8 @@ def run(self):
train_dataset_features, train_dataset_targets, "train", args
)
train_dataset_preproc.to_parquet(
output_train_dataset_path, output_files=args.output_num_partitions,
output_train_dataset_path,
output_files=args.output_num_partitions,
)

if args.eval_data_path or args.dataset_split_strategy:
Expand All @@ -459,7 +474,8 @@ def run(self):
eval_dataset_features, eval_dataset_targets, "eval", args
)
eval_dataset_preproc.to_parquet(
output_eval_dataset_path, output_files=args.output_num_partitions,
output_eval_dataset_path,
output_files=args.output_num_partitions,
)

if args.predict_data_path:
Expand All @@ -484,9 +500,10 @@ def run(self):
logging.info(f"Saving predict/test set: {output_predict_dataset_path}")

new_predict_dataset.to_parquet(
output_predict_dataset_path, output_files=args.output_num_partitions,
output_predict_dataset_path,
output_files=args.output_num_partitions,
)
nvt_save_path = os.path.join(output_dataset_path, "workflow")
nvt_save_path = os.path.join(nvt_outputs_folder, "workflow")
logging.info(f"Saving nvtabular workflow to: {nvt_save_path}")
nvt_workflow_features.save(nvt_save_path)

Expand Down
Loading
Loading