# Run national version of the NHP model

In [0]:
import gzip
import multiprocessing as mp
import os

from datetime import datetime

from azure.identity import ManagedIdentityCredential
from azure.storage.blob import BlobServiceClient
from databricks.connect import DatabricksSession
from databricks.sdk import WorkspaceClient

from nhp.databricks.national import DatabricksNational
from nhp.model.helpers import load_params
from nhp.model.run import run_all

In [None]:
mp.set_start_method("spawn", force=True)

In [None]:
spark = DatabricksSession.builder.getOrCreate()
w = WorkspaceClient()
dbutils = w.dbutils

## Get the notebook parameters

In [0]:
params_path = dbutils.widgets.get("params_path")

data_path  = dbutils.widgets.get("data_path")
save_full_model_results = dbutils.widgets.get("save_full_model_results") == "True"
# TODO: this will need to be updated with each new release manually
outputs_version = "v4.1"

## Load the model run parameters

In [0]:
params = load_params(params_path)

params["create_datetime"] = f"{datetime.now():%Y%m%d_%H%M%S}"

metadata = {
    k: str(v)
    for k, v in params.items()
    if not isinstance(v, dict) and not isinstance(v, list)
}
metadata

## Set up the data

In [0]:
nhp_data = DatabricksNational.create(spark, data_path, sample_rate = 0.01, seed = params["seed"])

## Run the model

In [0]:
saved_files, json_filename = run_all(params, nhp_data, save_full_model_results=save_full_model_results)
print(f"saved files: {saved_files}")
print(f"json path: {json_filename}")

## Upload results

In [None]:
blob_client = BlobServiceClient("https://nhpsa.blob.core.windows.net", credential=ManagedIdentityCredential())
cont = blob_client.get_container_client("results")

### Upload compressed JSON results

Should be removed in future versions of the model

In [None]:
with open(f"results/{json_filename}.json", "rb") as f:
    zipped_results = gzip.compress(f.read())
    
cont.upload_blob(
    f"prod/{outputs_version}/{json_filename}.json.gz",
    zipped_results,
    metadata=metadata,
    overwrite=True,
)

# TODO: make the model return the objects or allow a results path to be passed in
# make sure to remove the file from databricks storage [in the asset bundle deployment]
os.unlink(f"results/{json_filename}.json")

### Upload the parquet files

In [None]:
for file in saved_files:
    filename = file[8:]
    with open(file, "rb") as f:
        cont.upload_blob(
            f"aggregated-model-results/{outputs_version}/{filename}",
            f.read(),
            overwrite=True,
        )
        # make sure to remove the file from databricks storage [in the asset bundle deployment]
        os.unlink(file)

### Save the full model results

In [None]:
if save_full_model_results:
    from pathlib import Path

    # Save the IP full model results to storage
    # From docker_run._upload_full_model_results
    dataset = params["dataset"]
    scenario = params["scenario"]
    create_datetime = params["create_datetime"]

    path = Path(f"results/{dataset}/{scenario}/{create_datetime}")
    for file in path.glob("**/*.parquet"):
        filename = file.as_posix()[8:]
        with open(file, "rb") as f:
            cont.upload_blob(
                f"full-model-results/{outputs_version}/{filename}",
                f.read(),
                overwrite=True,
            )
        # make sure to remove the file from databricks storage [in the asset bundle deployment]
        os.unlink(file)

## Remove the params file from the queue

In [None]:
# remove the file from the queue
os.unlink(params_path)