Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
1105 lines (1008 sloc) 47.1 KB
#' Interface for modeling in the Civis Platform
#'
#' @description An interface for training and scoring data on Civis Platform
#' using a set of Scikit-Learn estimators.
#'
#' @param object A \code{civis_ml} object.
#' @param model_id The \code{id} of CivisML model built previously.
#' @param run_id Optional, the \code{id} of a CivisML model run. If \code{NULL},
#' defaults to fetching the latest run.
#' @param x,newdata See the Data Sources section below.
#' @param model_type The name of the CivisML workflow. See the Workflows section
#' below.
#' @param dependent_variable The dependent variable of the training dataset.
#' For a multi-target problem, this should be a vector of column names of
#' dependent variables. Nulls in a single dependent variable will
#' automatically be dropped.
#' @param primary_key Optional, the unique ID (primary key) of the training
#' dataset. This will be used to index the out-of-sample scores. In
#' \code{predict.civis_ml}, the primary_key of the training task is used by
#' default \code{primary_key = NA}. Use \code{primary_key = NULL} to
#' explicitly indicate the data have no primary_key.
#' @param parameters Optional, parameters for the final stage estimator in a
#' predefined model, e.g. \code{list(C = 2)} for a "sparse_logistic"
#' model.
#' @param cross_validation_parameters Optional, parameter grid for learner
#' parameters, e.g. \code{list(n_estimators = c(100, 200, 500),
#' learning_rate = c(0.01, 0.1), max_depth = c(2, 3))}
#' or \code{"hyperband"} for supported models.
#' @param model_name Optional, the prefix of the Platform modeling jobs.
#' It will have \code{" Train"} or \code{" Predict"} added to become the Script title.
#' @param calibration Optional, if not \code{NULL}, calibrate output
#' probabilities with the selected method, \code{sigmoid}, or \code{isotonic}.
#' Valid only with classification models.
#' @param excluded_columns Optional, a vector of columns which will be
#' considered ineligible to be independent variables.
#' @param oos_scores_table Optional, if provided, store out-of-sample
#' predictions on training set data to this Redshift "schema.tablename".
#' @param oos_scores_db Optional, the name of the database where the
#' \code{oos_scores_table} will be created. If not provided, this will default
#' to \code{database_name}.
#' @param oos_scores_if_exists Optional, action to take if
#' \code{oos_scores_table} already exists. One of \code{"fail"}, \code{"append"}, \code{"drop"}, or \code{"truncate"}.
#' The default is \code{"fail"}.
#' @param fit_params Optional, a mapping from parameter names in the model's
#' \code{fit} method to the column names which hold the data, e.g.
#' \code{list(sample_weight = 'survey_weight_column')}.
#' @param output_table The table in which to put predictions.
#' @param output_db The database containing \code{output_table}. If not
#' provided, this will default to the \code{database_name} specified when
#' the model was built.
#' @param if_output_exists Action to take if the prediction table already exists. One of \code{"fail"}, \code{"append"}, \code{"drop"}, or \code{"truncate"}.
#' The default is \code{"fail"}.
#' @param n_jobs Number of concurrent Platform jobs to use for training and
#' validation, or multi-file / large table prediction. Defaults to
#' \code{NULL}, which allows CivisML to dynamically calculate an
#' appropriate number of workers to use (in general, as many as
#' possible without using all resources in the cluster).
#' @param cpu_requested Optional, the number of CPU shares requested in the
#' Civis Platform for training jobs or prediction child jobs.
#' 1024 shares = 1 CPU.
#' @param memory_requested Optional, the memory requested from Civis Platform
#' for training jobs or prediction child jobs, in MiB.
#' @param disk_requested Optional, the disk space requested on Civis Platform
#' for training jobs or prediction child jobs, in GB.
#' @param notifications Optional, model status notifications. See
#' \code{\link{scripts_post_custom}} for further documentation about email
#' and URL notification.
#' @param polling_interval Check for job completion every this number of seconds.
#' @param validation_data Optional, source for validation data. There are
#' currently two options: \code{train} (the default), which uses training
#' data for validation, and \code{skip}, which skips the validation step.
#' @param verbose Optional, If \code{TRUE}, supply debug outputs in Platform
#' logs and make prediction child jobs visible.
#' @param dvs_to_predict Optional, For scoring, this should be a vector of column
#' names of dependent variables to include in the output table. It must be a
#' subset of the \code{dependent_variable} vector provided for training.
#' The scores for the returned subset will be identical to the scores which
#' those outputs would have had if all outputs were written, but ignoring some
#' of the model's outputs will let predictions complete faster and use less disk space.
#' If not provided, the entire model output will be written to the output table.
#' @param \dots Unused
#'
#' @section CivisML Workflows:
#'
#' You can use the following pre-defined models with \code{civis_ml}. All models
#' start by imputing missing values with the mean of non-null values in a
#' column. The \code{"sparse_*"} models include a LASSO regression step
#' (using \code{glmnet}) to do feature selection before passing data to the
#' final model. In some models, CivisML uses default parameters from those in
#' \href{http://scikit-learn.org/stable/}{Scikit-Learn}, as indicated in the "Altered Defaults" column.
#' All models also have \code{random_state=42}.
#'
#' Specific workflows can also be called directly using the R workflow functions.
#'
#' \tabular{rrrrr}{
#' Name \tab R Workflow \tab Model Type \tab Algorithm \tab Altered Defaults \cr
#' \code{sparse_logistic} \tab \code{\link{civis_ml_sparse_logistic}} \tab classification \tab \href{http://scikit-learn.org/stable/modules/generated/sklearn.linear_model.LogisticRegression.html}{LogisticRegression} \tab \code{C=499999950, tol=1e-08} \cr
#' \code{gradient_boosting_classifier} \tab \code{\link{civis_ml_gradient_boosting_classifier}} \tab classification \tab \href{http://scikit-learn.org/stable/modules/generated/sklearn.ensemble.GradientBoostingClassifier.html}{GradientBoostingClassifier} \tab \code{n_estimators=500, max_depth=2} \cr
#' \code{random_forest_classifier} \tab \code{\link{civis_ml_random_forest_classifier}} \tab classification \tab \href{http://scikit-learn.org/stable/modules/generated/sklearn.ensemble.RandomForestClassifier.html}{RandomForestClassifier} \tab \code{n_estimators=500} \cr
#' \code{extra_trees_classifier} \tab \code{\link{civis_ml_extra_trees_classifier}} \tab classification \tab \href{http://scikit-learn.org/stable/modules/generated/sklearn.ensemble.ExtraTreesClassifier.html}{ExtraTreesClassifier} \tab \code{n_estimators=500} \cr
#' \code{multilayer_perceptron_classifier} \tab \tab classification \tab \href{https://github.com/civisanalytics/muffnn}{muffnn.MLPClassifier} \tab \cr
#' \code{stacking_classifier} \tab \tab classification \tab \href{https://github.com/civisanalytics/civisml-extensions}{StackedClassifier}\tab \cr
#' \code{sparse_linear_regressor} \tab \code{\link{civis_ml_sparse_linear_regressor}} \tab regression \tab \href{http://scikit-learn.org/stable/modules/generated/sklearn.linear_model.LinearRegression.html}{LinearRegression} \tab \cr
#' \code{sparse_ridge_regressor} \tab \code{\link{civis_ml_sparse_ridge_regressor}} \tab regression \tab \href{http://scikit-learn.org/stable/modules/generated/sklearn.linear_model.Ridge.html}{Ridge} \tab \cr
#' \code{gradient_boosting_regressor} \tab \code{\link{civis_ml_gradient_boosting_regressor}} \tab regression \tab \href{http://scikit-learn.org/stable/modules/generated/sklearn.ensemble.GradientBoostingRegressor.html}{GradientBoostingRegressor} \tab \code{n_estimators=500, max_depth=2} \cr
#' \code{random_forest_regressor} \tab \code{\link{civis_ml_random_forest_regressor}} \tab regression \tab \href{http://scikit-learn.org/stable/modules/generated/sklearn.ensemble.RandomForestRegressor.html}{RandomForestRegressor} \tab \code{n_estimators=500} \cr
#' \code{extra_trees_regressor} \tab \code{\link{civis_ml_extra_trees_regressor}} \tab regression \tab \href{http://scikit-learn.org/stable/modules/generated/sklearn.ensemble.ExtraTreesRegressor.html}{ExtraTreesRegressor} \tab \code{n_estimators=500} \cr
#' \code{multilayer_perceptron_regressor} \tab \tab regression \tab \href{https://github.com/civisanalytics/muffnn}{muffnn.MLPRegressor} \tab \cr
#' \code{stacking_regressor} \tab \tab regression \tab \href{https://github.com/civisanalytics/civisml-extensions}{StackedRegressor}\tab \cr
#' }
#' Model names can be easily accessed using the global variables \code{CIVIS_ML_REGRESSORS} and \code{CIVIS_ML_CLASSIFIERS}.
#' @section Stacking:
#'
#' The \code{"stacking_classifier"} model stacks together the \code{"gradient_boosting_classifier"} and
#' \code{"random_forest_classifier"} predefined models together with a
#' \code{glmnet.LogitNet(alpha=0, n_splits=4, max_iter=10000, tol=1e-5, scoring='log_loss')}.
#' Defaults for the predefined models are documented in \code{?civis_ml}. Each column is first
#' \href{http://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.StandardScaler.html}{standardized},
#' and then the model predictions are combined using
#' \href{http://scikit-learn.org/stable/modules/generated/sklearn.linear_model.LogisticRegressionCV.html}{LogisticRegressionCV}
#' with \code{penalty='l2'} and \code{tol=1e-08}. The \code{"stacking_regressor"} works similarly, stacking together
#' the \code{"gradient_boosting_regressor"} and \code{"random_forest_regressor"} models and a
#' \code{glmnet.ElasticNet(alpha=0, n_splits=4, max_iter=10000, tol=1e-5, scoring='r2')}, combining them using
#' \href{https://github.com/civisanalytics/civisml-extensions}{NonNegativeLinearRegression}.
#' The estimators that are being stacked have the same names as the
#' associated pre-defined models, and the meta-estimator steps are named
#' "meta-estimator". Note that although default parameters are provided
#' for multilayer perceptron models, it is highly recommended that
#' multilayer perceptrons be run using hyperband.
#'
#' @section Hyperparameter Tuning:
#' You can tune hyperparameters using one of two methods: grid search or
#' hyperband. CivisML will perform grid search if you pass a list
#' of hyperparameters to the \code{cross_validation_parameters} parameter, where list elements are
#' hyperparameter names, and the values are vectors of hyperparameter
#' values to grid search over. You can run hyperparameter optimization in parallel by
#' setting the \code{n_jobs}
#' parameter to however many jobs you would like to run in
#' parallel. By default, \code{n_jobs} is dynamically calculated based on
#' the resources available on your cluster, such that a modeling job will
#' never take up more than 90% of the cluster resources at once.
#'
#' \href{https://arxiv.org/abs/1603.06560}{Hyperband}
#' is an efficient approach to hyperparameter optimization, and
#' recommended over grid search where possible. CivisML will perform
#' hyperband optimization if you pass the string \code{"hyperband"} to
#' \code{cross_validation_parameters}. Hyperband is currently only supported for the following models:
#' \code{"gradient_boosting_classifier"}, \code{"random_forest_classifier"},
#' \code{"extra_trees_classifier"}, \code{"multilayer_perceptron_classifier"},
#' \code{"stacking_classifier"},
#' \code{"gradient_boosting_regressor"}, \code{"random_forest_regressor"},
#' \code{"extra_trees_regressor"}, \code{"multilayer_perceptron_regressor"},
#' and \code{"stacking_regressor"}.
#'
#' Hyperband cannot be used to tune GLMs. For this reason, preset GLMs do
#' not have a hyperband option. Similarly, when
#' \code{cross_validation_parameters='hyperband'} and the model is
#' \code{stacking_classifier} or \code{stacking_regressor}, only the GBT and
#' random forest steps of the stacker are tuned using hyperband. For the specific
#' distributions used in the predefined hyperband models, see
#' \href{https://civis-python.readthedocs.io/en/stable/ml.html#hyperparameter-tuning}{the detailed table in the Python client documentation}.
#'
#'
#' @section Data Sources:
#'
#' For building models with \code{civis_ml}, the training data can reside in
#' four different places, a file in the Civis Platform, a CSV or feather-format file
#' on the local disk, a \code{data.frame} resident in local the R environment, and finally,
#' a table in the Civis Platform. Use the following helpers to specify the
#' data source when calling \code{civis_ml}:
#'
#' \describe{
#' \item{\code{data.frame}}{\code{civis_ml(x = df, ...)}}
#' \item{local csv file}{\code{civis_ml(x = "path/to/data.csv", ...)}}
#' \item{file in Civis Platform}{\code{civis_ml(x = civis_file(1234))}}
#' \item{table in Civis Platform}{\code{civis_ml(x = civis_table(table_name = "schema.table", database_name = "database"))}}
#' }
#'
#' @section Out of sample scores:
#' Model outputs will always contain out-of-sample (or out of fold) scores,
#' which are accessible through \code{\link{fetch_oos_scores}}.
#' These may be stored in a Civis table on Redshift using the
#' \code{oos_scores}, \code{oos_scores_db}, and \code{oos_scores_if_exists} parameters.
#'
#' @section Predictions:
#'
#' A fitted model can be used to make predictions for data residing in any of
#' the sources above and a \code{\link{civis_file_manifest}}. Similar to
#' \code{civis_ml}, use the data source helpers as the \code{newdata} argument
#' to \code{predict.civis_ml}.
#'
#' A manifest file is a JSON file which specifies the location of many shards of the data to be used for prediction.
#' A manifest file is the output of a Civis export job with \code{force_multifile = TRUE} set, e.g.
#' from \code{\link{civis_to_multifile_csv}}. Large civis tables (provided using \code{table_name})
#' will automatically be exported to manifest files.
#'
#' Prediction outputs will always be stored as gzipped CSVs in one or more civis files.
#' Provide an \code{output_table} (and optionally an \code{output_db},
#' if it's different from \code{database_name}) to copy these predictions into a
#' table on Redshift.
#'
#' @return A \code{civis_ml} object, a list containing the following elements:
#' \item{job}{job metadata from \code{\link{scripts_get_custom}}.}
#' \item{run}{run metadata from \code{\link{scripts_get_custom_runs}}.}
#' \item{outputs}{CivisML metadata from \code{\link{scripts_list_custom_runs_outputs}} containing the locations of
#' files produced by CivisML e.g. files, projects, metrics, model_info, logs, predictions, and estimators.}
#' \item{metrics}{Parsed CivisML output from \code{metrics.json} containing metadata from validation.
#' A list containing the following elements:
#' \itemize{
#' \item run list, metadata about the run.
#' \item data list, metadata about the training data.
#' \item model list, the fitted scikit-learn model with CV results.
#' \item metrics list, validation metrics (accuracy, confusion, ROC, AUC, etc).
#' \item warnings list.
#' \item data_platform list, training data location.
#' }}
#' \item{model_info}{Parsed CivisML output from \code{model_info.json} containing metadata from training.
#' A list containing the following elements:
#' \itemize{
#' \item run list, metadata about the run.
#' \item data list, metadata about the training data.
#' \item model list, the fitted scikit-learn model.
#' \item metrics empty list.
#' \item warnings list.
#' \item data_platform list, training data location.
#' }}
#'
#' @examples \dontrun{
#' # From a data frame:
#' m <- civis_ml(df, model_type = "sparse_logistic",
#' dependent_variable = "Species")
#'
#' # From a table:
#' m <- civis_ml(civis_table("schema.table", "database_name"),
#' model_type = "sparse_logistic", dependent_variable = "Species",
#' oos_scores_table = "schema.scores_table",
#' oos_scores_if_exists = "drop")
#'
#' # From a local file:
#' m <- civis_ml("path/to/file.csv", model_type = "sparse_logistic",
#' dependent_variable = "Species")
#'
#' # From a Civis file:
#' file_id <- write_civis_file("path/to/file.csv", name = "file.csv")
#' m <- civis_ml(civis_file(file_id), model_type = "sparse_logistic",
#' dependent_variable = "Species")
#'
#' pred_job <- predict(m, newdata = df)
#' pred_job <- predict(m, civis_table("schema.table", "database_name"),
#' output_table = "schema.scores_table")
#' pred_job <- predict(m, civis_file(file_id),
#' output_table = "schema.scores_table")
#'
#' m <- civis_ml_fetch_existing(model_id = m$job$id, m$run$id)
#' logs <- fetch_logs(m)
#' yhat <- fetch_oos_scores(m)
#' yhat <- fetch_predictions(pred_job)
#' }
#' @name civis_ml
#' @seealso
#' \code{\link{civis_file}}, \code{\link{civis_table}}, and
#' \code{\link{civis_file_manifest}} for specifying data sources.
#'
#' \code{\link{get_metric}} to access model validation metrics.
#'
#' \code{\link{fetch_logs}} for retrieving logs for a (failed) model build,
#' \code{\link{fetch_oos_scores}} for retrieving the out of sample (fold) scores for each training observation, and
#' \code{\link{fetch_predictions}} for retrieving the predictions from a prediction job.
NULL
#' @rdname civis_ml
#' @export
civis_ml <- function(x,
dependent_variable,
model_type,
primary_key = NULL,
excluded_columns = NULL,
parameters = NULL,
fit_params = NULL,
cross_validation_parameters = NULL,
calibration = NULL,
oos_scores_table = NULL,
oos_scores_db = NULL,
oos_scores_if_exists = c('fail', 'append', 'drop', 'truncate'),
model_name = NULL,
cpu_requested = NULL,
memory_requested = NULL,
disk_requested = NULL,
notifications = NULL,
polling_interval = NULL,
validation_data = c('train', 'skip'),
n_jobs = NULL,
verbose = FALSE) {
UseMethod("civis_ml", x)
}
#' @export
civis_ml.data.frame <- function(x,
dependent_variable,
model_type,
primary_key = NULL,
excluded_columns = NULL,
parameters = NULL,
fit_params = NULL,
cross_validation_parameters = NULL,
calibration = NULL,
oos_scores_table = NULL,
oos_scores_db = NULL,
oos_scores_if_exists = c('fail', 'append', 'drop', 'truncate'),
model_name = NULL,
cpu_requested = NULL,
memory_requested = NULL,
disk_requested = NULL,
notifications = NULL,
polling_interval = NULL,
validation_data = c('train', 'skip'),
n_jobs = NULL,
verbose = FALSE) {
oos_scores_if_exists <- match.arg(oos_scores_if_exists)
validation_data <- match.arg(validation_data)
oos_scores_db_id <- NULL
if (!is.null(oos_scores_db)) {
oos_scores_db_id <- get_database_id(oos_scores_db)
}
file_id <- stash_local_dataframe(x)
create_and_run_model(file_id = file_id,
dependent_variable = dependent_variable,
excluded_columns = excluded_columns,
primary_key = primary_key,
model_type = model_type,
parameters = parameters,
cross_validation_parameters = cross_validation_parameters,
fit_params = fit_params,
calibration = calibration,
oos_scores_table = oos_scores_table,
oos_scores_db_id = oos_scores_db_id,
oos_scores_if_exists = oos_scores_if_exists,
model_name = model_name,
cpu_requested = cpu_requested,
memory_requested = memory_requested,
disk_requested = disk_requested,
validation_data = validation_data,
n_jobs = n_jobs,
notifications = notifications,
verbose = verbose)
}
#' @export
civis_ml.civis_table <- function(x,
dependent_variable,
model_type,
primary_key = NULL,
excluded_columns = NULL,
parameters = NULL,
fit_params = NULL,
cross_validation_parameters = NULL,
calibration = NULL,
oos_scores_table = NULL,
oos_scores_db = NULL,
oos_scores_if_exists = c('fail', 'append', 'drop', 'truncate'),
model_name = NULL,
cpu_requested = NULL,
memory_requested = NULL,
disk_requested = NULL,
notifications = NULL,
polling_interval = NULL,
validation_data = c('train', 'skip'),
n_jobs = NULL,
verbose = FALSE) {
oos_scores_if_exists <- match.arg(oos_scores_if_exists)
validation_data <- match.arg(validation_data)
oos_scores_db_id <- NULL
if (!is.null(oos_scores_db)) {
oos_scores_db_id <- get_database_id(oos_scores_db)
}
create_and_run_model(table_name = x$table_name,
database_id = x$database_id,
sql_where = x$sql_where,
sql_limit = x$sql_limit,
dependent_variable = dependent_variable,
excluded_columns = excluded_columns,
primary_key = primary_key,
model_type = model_type,
parameters = parameters,
cross_validation_parameters = cross_validation_parameters,
fit_params = fit_params,
calibration = calibration,
oos_scores_table = oos_scores_table,
oos_scores_db_id = oos_scores_db_id,
oos_scores_if_exists = oos_scores_if_exists,
model_name = model_name,
cpu_requested = cpu_requested,
memory_requested = memory_requested,
disk_requested = disk_requested,
validation_data = validation_data,
n_jobs = n_jobs,
notifications = notifications,
verbose = verbose)
}
#' @export
civis_ml.civis_file <- function(x,
dependent_variable,
model_type,
primary_key = NULL,
excluded_columns = NULL,
parameters = NULL,
fit_params = NULL,
cross_validation_parameters = NULL,
calibration = NULL,
oos_scores_table = NULL,
oos_scores_db = NULL,
oos_scores_if_exists = c('fail', 'append', 'drop', 'truncate'),
model_name = NULL,
cpu_requested = NULL,
memory_requested = NULL,
disk_requested = NULL,
notifications = NULL,
polling_interval = NULL,
validation_data = c('train', 'skip'),
n_jobs = NULL,
verbose = FALSE) {
oos_scores_if_exists <- match.arg(oos_scores_if_exists)
validation_data <- match.arg(validation_data)
oos_scores_db_id <- NULL
if (!is.null(oos_scores_db)) {
oos_scores_db_id <- get_database_id(oos_scores_db)
}
create_and_run_model(file_id = x,
dependent_variable = dependent_variable,
excluded_columns = excluded_columns,
primary_key = primary_key,
model_type = model_type,
parameters = parameters,
cross_validation_parameters = cross_validation_parameters,
fit_params = fit_params,
calibration = calibration,
oos_scores_table = oos_scores_table,
oos_scores_db_id = oos_scores_db_id,
oos_scores_if_exists = oos_scores_if_exists,
model_name = model_name,
cpu_requested = cpu_requested,
memory_requested = memory_requested,
disk_requested = disk_requested,
validation_data = validation_data,
n_jobs = n_jobs,
notifications = notifications,
verbose = verbose)
}
#' @export
civis_ml.character <- function(x,
dependent_variable,
model_type,
primary_key = NULL,
excluded_columns = NULL,
parameters = NULL,
fit_params = NULL,
cross_validation_parameters = NULL,
calibration = NULL,
oos_scores_table = NULL,
oos_scores_db = NULL,
oos_scores_if_exists = c('fail', 'append', 'drop', 'truncate'),
model_name = NULL,
cpu_requested = NULL,
memory_requested = NULL,
disk_requested = NULL,
notifications = NULL,
polling_interval = NULL,
validation_data = c('train', 'skip'),
n_jobs = NULL,
verbose = FALSE) {
oos_scores_if_exists <- match.arg(oos_scores_if_exists)
validation_data <- match.arg(validation_data)
oos_scores_db_id <- NULL
if (!is.null(oos_scores_db)) {
oos_scores_db_id <- get_database_id(oos_scores_db)
}
file_id <- write_civis_file(x, "modelpipeline_data.csv")
create_and_run_model(file_id = file_id,
dependent_variable = dependent_variable,
excluded_columns = excluded_columns,
primary_key = primary_key,
model_type = model_type,
parameters = parameters,
cross_validation_parameters = cross_validation_parameters,
fit_params = fit_params,
calibration = calibration,
oos_scores_table = oos_scores_table,
oos_scores_db_id = oos_scores_db_id,
oos_scores_if_exists = oos_scores_if_exists,
model_name = model_name,
cpu_requested = cpu_requested,
memory_requested = memory_requested,
disk_requested = disk_requested,
validation_data = validation_data,
n_jobs = n_jobs,
notifications = notifications,
verbose = verbose)
}
#' Stash a data frame in feather or csv format, depending on CivisML version.
#'
#' @param x data.frame to stash
#'
#' @return file id where dataframe is stored
stash_local_dataframe <- function(x) {
# Try to stash a dataframe in feather format.
tmpl_id <- getOption("civis.ml_train_template_id")
tmp_path <- tempfile()
if (tmpl_id > 9969) {
# newer versions use feather
requireNamespace('feather', quietly = TRUE)
feather::write_feather(x, tmp_path)
civis_path <- "modelpipeline_data.feather"
} else {
# older versions can't use feather
utils::write.csv(x, file = tmp_path, row.names = FALSE)
civis_path <- "modelpipeline_data.csv"
}
file_id <- write_civis_file(tmp_path, name = civis_path)
return(file_id)
}
create_and_run_model <- function(file_id = NULL,
table_name = NULL,
database_id = NULL,
sql_where = NULL,
sql_limit = NULL,
dependent_variable = NULL,
excluded_columns = NULL,
primary_key = NULL,
model_type = NULL,
parameters = NULL,
cross_validation_parameters = NULL,
fit_params = NULL,
calibration = NULL,
oos_scores_table = NULL,
oos_scores_db_id = NULL,
oos_scores_if_exists = NULL,
model_name = NULL,
cpu_requested = NULL,
memory_requested = NULL,
disk_requested = NULL,
polling_interval = NULL,
validation_data = NULL,
n_jobs = NULL,
notifications = NULL,
verbose = FALSE) {
args <- list(
MODEL = model_type,
TARGET_COLUMN = paste(dependent_variable, collapse = " "),
PRIMARY_KEY = primary_key,
PARAMS = jsonlite::toJSON(parameters, auto_unbox = TRUE, null = "null"),
IF_EXISTS = oos_scores_if_exists,
TABLE_NAME = table_name,
# We unclass the file_id here b/c jsonlite::toJSON does not know how to
# seralize our custom class. This results in the following error:
# Error: No method asJSON S3 class: civis_file
# It does not appear the maintainers have any interest in allowing users
# to specify/override the asJSON method:
# https://github.com/jeroen/jsonlite/issues/62
CIVIS_FILE_ID = unclass(file_id),
DEBUG = verbose
)
if (length(dependent_variable) > 1) {
mo_not_supported <- c("sparse_linear_regressor", "sparse_ridge_regressor",
"gradient_boosting_regressor",
"sparse_logistic", "gradient_boosting_classifier")
if (model_type %in% mo_not_supported) {
stop(paste0("Multioutput is not supported for "), model_type)
}
}
if (!is.null(cross_validation_parameters)) {
hyperband <- identical(cross_validation_parameters, "hyperband")
hyperband_not_supported <- model_type %in% c("sparse_logistic", "sparse_linear_regressor")
if (hyperband & hyperband_not_supported) {
stop(paste0("cross_validation_parameters = \"hyperband\" not supported for ", model_type))
}
if (hyperband) {
# need to escape quotes to get the string to work
args[["CVPARAMS"]] <- '\"hyperband\"'
} else {
args[["CVPARAMS"]] <- jsonlite::toJSON(cross_validation_parameters, null = "null")
}
}
if (!is.null(calibration)) {
if (!(calibration %in% c("sigmoid", "isotonic"))) {
stop("calibration must be 'sigmoid', 'isotonic', or NULL.")
}
args[["CALIBRATION"]] <- calibration
}
if (!is.null(oos_scores_table)) {
args[["OOSTABLE"]] <- oos_scores_table
args[["OOSDB"]] <- list(database = oos_scores_db_id)
}
if (!is.null(sql_where)) {
args[["WHERESQL"]] <- sql_where
}
if (!is.null(sql_limit)) {
args[["LIMITSQL"]] <- sql_limit
}
if (!is.null(excluded_columns)) {
args[["EXCLUDE_COLS"]] <- paste(excluded_columns, collapse = " ")
}
if (!is.null(fit_params)) {
args[["FIT_PARAMS"]] <- jsonlite::toJSON(fit_params, auto_unbox = TRUE)
}
if (!is.null(database_id)) {
# Prior to v1.0, this parameter was DB_NAME
args[["DB"]] <- list(database = database_id)
}
if (!is.null(cpu_requested)) {
args[["REQUIRED_CPU"]] <- cpu_requested
}
if (!is.null(memory_requested)) {
args[["REQUIRED_MEMORY"]] <- memory_requested
}
if (!is.null(disk_requested)) {
args[["REQUIRED_DISK_SPACE"]] <- disk_requested
}
if (!is.null(validation_data)) {
args[["VALIDATION_DATA"]] <- validation_data
}
if (!is.null(n_jobs)) {
args[["N_JOBS"]] <- n_jobs
}
args <- I(args) # We don't want any conversions and by toJSON.
job_name <- NULL
if (!is.null(model_name)) {
job_name <- paste0(model_name, " Train")
}
tmpl_id <- getOption("civis.ml_train_template_id")
run <- run_model(template_id = tmpl_id, name = job_name, arguments = args,
notifications = notifications,
polling_interval = polling_interval,
verbose = verbose)
civis_ml_fetch_existing(run$job_id, run$run_id)
}
run_model <- function(template_id, name, arguments, notifications,
polling_interval, verbose) {
script_args <- list(
from_template_id = template_id,
arguments = arguments
)
# We must remove both `name` and `notifactions` when NULL, otherwise, we get
# a 500 error from platform.
if (!is.null(name)) {
script_args$name <- name
}
if (!is.null(notifications)) {
script_args$notifications <- notifications
}
job <- do.call(scripts_post_custom, script_args)
run <- scripts_post_custom_runs(job$id)
tryCatch(await(scripts_get_custom_runs, id = job$id, run_id = run$id,
.interval = polling_interval, .verbose = verbose),
civis_error = function(e) stop(civis_ml_error(e)),
error = function(e) stop(e))
list(job_id = job$id, run_id = run$id)
}
#' @rdname civis_ml
#' @export
civis_ml_fetch_existing <- function(model_id, run_id = NULL) {
job <- must_fetch_civis_ml_job(model_id)
run_id <- run_id %||% job$lastRun$id
if (is.null(run_id)) {
stop("Error: invalid model task.")
}
run <- must_fetch_civis_ml_run(model_id, run_id)
outputs <- metrics <- model_info <- NULL
if (run$state == "succeeded") {
outputs <- scripts_list_custom_runs_outputs(id = model_id, run_id = run_id)
metrics <- tryCatch(must_fetch_output_json(outputs, "metrics.json"),
error = function(e) NULL)
model_info <- must_fetch_output_json(outputs, "model_info.json")
# re-raise any CivisML warnings
if (length(model_info$warnings) > 0) {
warning("CivisML issued the following warnings during training:\n", unlist(model_info$warnings))
}
}
type <- model_type(job)
structure(
list(
job = job,
run = run,
outputs = outputs,
metrics = metrics,
model_info = model_info
),
class = c(paste0("civis_ml_", type), "civis_ml")
)
}
must_fetch_civis_ml_job <- function(model_id) {
tryCatch(scripts_get_custom(model_id),
"http_404" = function(e) stop(paste0("Error: model ", model_id, " not found."), call. = FALSE)
)
}
must_fetch_civis_ml_run <- function(model_id, run_id) {
run <- tryCatch(scripts_get_custom_runs(model_id, run_id),
"http_404" = function(e) stop(paste0("Error: run ", run_id, " not found."), call. = FALSE)
)
switch(run$state,
"failed" = warning("The model task failed, use fetch_logs to retreive any error messages."),
"cancelled" = warning("The model task was cancelled."),
"queued" = message("The model task queued."),
"running" = message("The model task is still running."))
run
}
must_fetch_output_json <- function(outputs, file_name) {
path <- must_fetch_output_file(outputs, file_name)
jsonlite::fromJSON(path, simplifyDataFrame = FALSE)
}
must_fetch_output_file <- function(outputs, file_name) {
id <- which(sapply(outputs, function(x) x$name == file_name && x$objectType == "File"))
out <- outputs[id]
if (length(out) == 0) {
stop(paste0(file_name, " not found in model output."), call. = FALSE)
}
# There is no strict requirement on the file names being unique *and* we want
# to return a non-nested list anyway.
file_output <- out[[1]]
download_civis(file_output$objectId, tempfile())
}
#' @rdname civis_ml
#' @export
predict.civis_ml <- function(object,
newdata,
primary_key = NA,
output_table = NULL,
output_db = NULL,
if_output_exists = c('fail', 'append', 'drop', 'truncate'),
n_jobs = NULL,
cpu_requested= NULL,
memory_requested= NULL,
disk_requested = NULL,
polling_interval = NULL,
verbose = FALSE,
dvs_to_predict = NULL,
...) {
output_db_id <- NULL
if (!is.null(output_db)) {
output_db_id <- get_database_id(output_db)
}
if_output_exists <- match.arg(if_output_exists)
# TODO: normalize this...
model_name <- object$job$name
if (!is.null(primary_key) && is.na(primary_key)) {
primary_key <- object$job$arguments$PRIMARY_KEY
}
pred_args <- list(
train_job_id = object$job$id,
train_run_id = object$run$id,
template_id = get_predict_template_id(object),
primary_key = primary_key,
output_table = output_table,
output_db_id = output_db_id,
if_output_exists = if_output_exists,
model_name = model_name,
n_jobs = n_jobs,
cpu_requested= cpu_requested,
memory_requested= memory_requested,
disk_requested = disk_requested,
polling_interval = polling_interval,
verbose = verbose
)
if (inherits(newdata, "data.frame")) {
tmp_path <- tempfile()
utils::write.csv(newdata, file = tmp_path, row.names = FALSE)
file_id <- write_civis_file(tmp_path, "modelpipeline_data.csv")
pred_args[["file_id"]] <- file_id
}
if (is.character(newdata)) {
file_id <- write_civis_file(newdata, "modelpipeline_data.csv")
pred_args[["file_id"]] <- file_id
}
if (inherits(newdata, "civis_file")) {
# See above, we need to strip class attribute for jsonlite::toJSON.
pred_args[["file_id"]] <- unclass(newdata)
}
if (inherits(newdata, "civis_file_manifest")) {
# See above, we need to strip class attribute for jsonlite::toJSON.
pred_args[["manifest"]] <- unclass(newdata)
}
if (inherits(newdata, "civis_table")) {
pred_args[["table_name"]] <- newdata$table_name
pred_args[["database_id"]] <- newdata$database_id
pred_args[["sql_where"]] <- newdata$sql_where
pred_args[["sql_limit"]] <- newdata$sql_limit
}
if (!is.null(dvs_to_predict)) {
pred_args[["dvs_to_predict"]] <- paste(dvs_to_predict, collapse = " ")
}
do.call(create_and_run_pred, pred_args)
}
create_and_run_pred <- function(train_job_id = NULL,
train_run_id = NULL,
template_id = NULL,
file_id = NULL,
table_name = NULL,
database_id = NULL,
sql_where = NULL,
sql_limit = NULL,
manifest = NULL,
primary_key = NULL,
output_table = NULL,
output_db_id = NULL,
if_output_exists = NULL,
model_name = NULL,
n_jobs = NULL,
cpu_requested= NULL,
memory_requested= NULL,
disk_requested = NULL,
polling_interval = NULL,
notifications = NULL,
verbose = FALSE,
dvs_to_predict = NULL) {
args <- list(
TRAIN_JOB = train_job_id,
TRAIN_RUN = train_run_id,
PRIMARY_KEY = primary_key,
IF_EXISTS = if_output_exists,
N_JOBS = n_jobs,
DEBUG = verbose,
CIVIS_FILE_ID = file_id
)
if (!is.null(database_id)) {
args[["TABLE_NAME"]] <- table_name
args[["DB"]] <- list(database = database_id)
}
if (!is.null(sql_where)) {
args[["WHERESQL"]] <- sql_where
}
if (!is.null(sql_limit)) {
args[["LIMITSQL"]] <- sql_limit
}
if (!is.null(output_table)) {
args[["OUTPUT_TABLE"]] <- output_table
args[["OUTPUT_DB"]] <- list(database = output_db_id)
}
if (!is.null(manifest)) {
args[["MANIFEST"]] <- manifest
}
if (!is.null(n_jobs)) {
args[["N_JOBS"]] <- n_jobs
}
if (!is.null(cpu_requested)) {
args[["CPU"]] <- cpu_requested
}
if (!is.null(memory_requested)) {
args[["MEMORY"]] <- memory_requested
}
if (!is.null(disk_requested)) {
args[["DISK_SPACE"]] <- disk_requested
}
if (!is.null(dvs_to_predict)) {
args[["TARGET_COLUMN"]] <- paste(dvs_to_predict, collapse = " ")
}
args <- I(args)
job_name <- NULL
if (!is.null(model_name)) {
job_name <- paste0(model_name, " Predict")
}
run <- run_model(template_id = template_id, name = job_name, arguments = args,
notifications = notifications,
polling_interval = polling_interval,
verbose = verbose)
fetch_predict_results(run$job_id, run$run_id)
}
fetch_predict_results <- function(job_id, run_id) {
job <- scripts_get_custom(job_id)
run <- scripts_get_custom_runs(job_id, run_id)
outputs <- scripts_list_custom_runs_outputs(job_id, run_id)
model_info <- must_fetch_output_json(outputs, "model_info.json")
# re-raise any CivisML warnings
if (length(model_info$warnings) > 0) {
warning("CivisML issued the following warnings during training:\n", unlist(model_info$warnings))
}
structure(
list(
job = job,
run = run,
outputs = outputs,
model_info = model_info
),
class = c("civis_ml_prediction", "civis_ml")
)
}
#' Retrieve predictions from a CivisML prediction job
#' @param x \code{civis_ml_prediction} object from \code{predict.civis_ml}
#' @param ... arguments passed to \code{read.csv}
#' @export
#' @details Predictions can also be downloaded as a \code{csv} directly using \code{download_civis} (see examples).
#' @return A \code{data.frame} of predictions containing an additional column with
#' a primary key. For a multiclass model, a data frame is returned with one
#' column of predictions for each class.
#'
#' @examples
#' \dontrun{
#' m <- civis_ml("path/to/file.csv", model_type = "sparse_logistic",
#' dependent_variable = "Species")
#' pred_job <- predict(m, newdata = "path/to/newdata.csv")
#' yhat <- fetch_predictions(pred_job)
#'
#' # download instead:
#' download_civis(pred_job$model_info$output_file_ids, path = "my_predictions.csv")
#'
#' }
fetch_predictions <- function(x, ...) {
stopifnot(is(x, "civis_ml_prediction"))
out <- fetch_predict_results(job_id = x$job$id, run_id = x$run$id)
# there will always be one file id - this is a pointer to the url shards on S3 (src: Jamie)
id <- out$model_info$output_file_ids
tryCatch({
path <- download_civis(id, tempfile())
res <- utils::read.csv(path, ...)
}, error = function(e) stop(e),
finally = {
unlink(path)
})
res
}
#' A file in the Civis Platform
#'
#' Use \code{civis_file} to use a file in the Civis Platform with
#' \link{civis_ml}.
#'
#' @param file_id The id of a Civis file.
#' @return A \code{civis_file}.
#' @export
civis_file <- function(file_id) {
structure(file_id, class = "civis_file")
}
#' A table in the Civis Platform
#'
#' Use \code{civis_table} to use a table in the Civis Platform with
#' \link{civis_ml}.
#'
#' @param table_name The table name, as \code{"schema.table"}.
#' @param database_name The name of the database holding \code{table_name}.
#' @param sql_where A \code{SQL WHERE} clause used to scope the rows of the
#' training or prediction table. Note, the \code{WHERE} keyword is added at
#' runtime.
#' @param sql_limit A \code{SQL_LIMIT} clause for querying the training or
#' prediction set. Note, the \code{LIMIT} keyword is added at runtime.
#' @return A \code{civis_table}.
#' @export
civis_table <- function(table_name,
database_name = NULL,
sql_where = NULL,
sql_limit = NULL) {
database_name <- database_name %||% get_default_database()
if (is.null(database_name)) {
stop(paste0("Please supply a database_name or set a global default: ",
"options(civis.default_database = 'db_name')"),
call. = FALSE)
}
structure(
list(
table_name = table_name,
database_name = database_name,
database_id = get_database_id(database_name),
sql_where = sql_where,
sql_limit = sql_limit
),
class = "civis_table"
)
}
#' A manifest file in the Civis Platform
#'
#' Use \code{civis_file_manifest} to use a manifest file in the Civis Platform
#' with \link{civis_ml}.
#'
#' @param file_id The id of a Civis file.
#' @return A \code{civis_file_manifest}
#' @export
civis_file_manifest <- function(file_id) {
structure(file_id, class = "civis_file_manifest")
}
#' @export
fetch_logs.civis_ml <- function(object, limit = 100, ...) {
logs <- scripts_list_custom_runs_logs(object$job$id, object$run$id,
limit = limit)
format_scripts_logs(logs)
}
#' @export
fetch_logs.civis_ml_error <- function(object, limit = 100, ...) {
job_id <- attr(object, "args")$id
run_id <- attr(object, "args")$run_id
logs <- scripts_list_custom_runs_logs(job_id, run_id, limit = limit)
format_scripts_logs(logs)
}
#' Retrieve out of sample/fold predictions
#'
#' @details Returns the out of sample (or out of fold) predictions for each
#' observation. For a multiclass model, a data frame is returned with one
#' column of predictions for each class.
#'
#' @param model A \code{civis_ml} model.
#' @param \dots Parameters passed to \code{read.csv}.
#' @return A \code{data.frame} with out of sample/fold predictions for each
#' row of the training data, and containing an additional column with
#' a primary key.
#'
#' @seealso civis_ml
#' @importFrom utils read.csv
#' @export
fetch_oos_scores <- function(model, ...) {
stopifnot(is_civis_ml(model))
path <- must_fetch_output_file(model$outputs, "predictions.csv.gz")
read.csv(path, ...)
}
is_civis_ml <- function(object) {
is(object, "civis_ml")
}
You can’t perform that action at this time.