This is an auto-generated notebook to perform batch inference on a Spark DataFrame using a selected model from the model registry. This feature is in preview, and we would greatly appreciate any feedback through this form: https://databricks.sjc1.qualtrics.com/jfe/form/SV_1H6Ovx38zgCKAR0.

## Instructions:
1. Run the notebook against a cluster with Databricks ML Runtime version 12.2.x-cpu, to best re-create the training environment.
2. Add additional data processing on your loaded table to match the model schema if necessary (see the "Define input and output" section below).
3. "Run All" the notebook.
4. Note: If the `%pip` does not work for your model (i.e. it does not have a `requirements.txt` file logged), modify to use `%conda` if possible.

In [0]:
model_name = "Global SST Forecasting"

## Environment Recreation
Run the notebook against a cluster with Databricks ML Runtime version 12.2.x-cpu, to best re-create the training environment.. The cell below downloads the model artifacts associated with your model in the remote registry, which include `conda.yaml` and `requirements.txt` files. In this notebook, `pip` is used to reinstall dependencies by default.

### (Optional) Conda Instructions
Models logged with an MLflow client version earlier than 1.18.0 do not have a `requirements.txt` file. If you are using a Databricks ML runtime (versions 7.4-8.x), you can replace the `pip install` command below with the following lines to recreate your environment using `%conda` instead of `%pip`.
```
conda_yml = os.path.join(local_path, "conda.yaml")
%conda env update -f $conda_yml
```

In [0]:
from mlflow.store.artifact.models_artifact_repo import ModelsArtifactRepository
import os

model_uri = f"models:/{model_name}/Production"
local_path = ModelsArtifactRepository(model_uri).download_artifacts("") # download model from remote registry

requirements_path = os.path.join(local_path, "requirements.txt")
if not os.path.exists(requirements_path):
  dbutils.fs.put("file:" + requirements_path, "", True)

In [0]:
%pip install -r $requirements_path

Python interpreter will be restarted.
Python interpreter will be restarted.


## Define input and output
The table path assigned to`input_table_name` will be used for batch inference and the predictions will be saved to `output_table_path`. After the table has been loaded, you can perform additional data processing, such as renaming or removing columns, to ensure the model and table schema matches.

In [0]:
# redefining key variables here because %pip and %conda restarts the Python interpreter
model_name = "Global SST Forecasting"
input_table_name = "default.forecasting_dates"
output_table_path = "/FileStore/batch-inference/Global SST Forecasting2"

In [0]:
# load table as a Spark DataFrame
table = spark.table(input_table_name)

# optionally, perform additional data processing (may be necessary to conform the schema)


## Load model and run inference
**Note**: If the model does not return double values, override `result_type` to the desired type.

In [0]:
import mlflow
from pyspark.sql.functions import struct

model_uri = f"models:/{model_name}/Production"

# create spark user-defined function for model prediction
predict = mlflow.pyfunc.spark_udf(spark, model_uri, result_type="double", env_manager="conda")

2023/04/06 01:48:28 INFO mlflow.pyfunc: This UDF will use conda to recreate the model's software environment for inference. This may take extra time during execution.
2023/04/06 01:48:28 INFO mlflow.models.flavor_backend_registry: Selected backend for flavor 'python_function'
2023/04/06 01:48:29 INFO mlflow.utils.conda: Conda environment /local_disk0/.ephemeral_nfs/repl_tmp_data/ReplId-22b20-a42ff-bc69e-e/mlflow/envs/conda_envs/mlflow-1b098421568e9125cc2f8cd1845d5a310d657280-96931d73dff136435a7f4a8c768701951631d919 already exists.
2023/04/06 01:48:29 INFO mlflow.utils.environment: === Running command '['bash', '-c', 'source /databricks/conda/bin/../etc/profile.d/conda.sh && conda activate mlflow-1b098421568e9125cc2f8cd1845d5a310d657280-96931d73dff136435a7f4a8c768701951631d919 1>&2 && python -c ""']'


In [0]:
output_df = table.withColumn("prediction", predict(struct(*table.columns)))

## Save predictions
**The default output path on DBFS is accessible to everyone in this Workspace. If you want to limit access to the output you must change the path to a protected location.**
The cell below will save the output table to the specified FileStore path. `datetime.now()` is appended to the path to prevent overwriting the table in the event that this notebook is run in a batch inference job. To overwrite existing tables at the path, replace the cell below with:
```python
output_df.write.mode("overwrite").save(output_table_path)
```

### (Optional) Write predictions to Unity Catalog
If you have access to any UC catalogs, you can also save predictions to UC by specifying a table in the format `<catalog>.<database>.<table>`.
```python
output_table = "" # Example: "ml.batch-inference.Global SST Forecasting"
output_df.write.saveAsTable(output_table)
```

In [0]:
output_df.display()

In [0]:
output_df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("long_prediction")