## Building the application

This step follows a similar set of commands as what exists in the README. It builds the Dockerfile that exists in directory

## First login by running `az login --use-device-code` in your terminal

In [None]:
!./build-and-push-image.sh

# Test your code and generate artifacts

You can manually change your guesses parameter space by changing the sobol_design upper and lower bounds in [generate_guesses_vpc.R](./app/generate_guesses_vpc.R). Next, define your numguesses in the jupyter python context.below.

We mount our `/app` directory on the docker container and run our `generate_guesses_vpc.R` code with `numguesses` as an input argument. This generates our guesses and our vpC pomp model object.  

Then we do the same volume mount, and run our `run_mif2_guesses.R` script with 1 as the input argument. This runs mif2 on the first guess. This is what will be run as tasks on batch, with each guess representing a different starting point in the input parameter space.

In [None]:
numguesses = 1000 # <--- SET YOUR numguesses

!docker run --rm \
    -v "$(pwd)/app:/app" \
    <azureContainerRegistryName>.azurecr.io/r-pomp:4.4.1 Rscript /app/generate_guesses_vpc.R {numguesses}
    
!docker run --rm \
    -v "$(pwd)/app:/app" \
    <azureContainerRegistryName>.azurecr.io/r-pomp:4.4.1 Rscript /app/run_mif2_guesses.R 1

## Azure Batch Setup

The following three cells do the required work before we actually start interacting with Batch.
- Import all the required libraries. You can view those in the `requirements.txt` in this directory.
- Put the required configuration into memory, pulling any sensitive information out of environment variables
- Create the requisite client and configuration objects 

In [None]:
%pip install -r requirements.txt

In [3]:
import azure.batch as batch
from azure.storage.blob import ContainerClient
from azure.identity import  DefaultAzureCredential
from msrest.authentication import BasicTokenAuthentication
from azure.storage.blob import BlobServiceClient
import os
from datetime import datetime

In [7]:
timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')

# Batch configuration
_BATCH_ACCOUNT_URL = 'https://<batchAccountName>.<region>.batch.azure.com'
_BATCH_ACCOUNT_MANAGED_ID_RESOURCE_ID="/subscriptions/<subscriptionId>/resourcegroups/<resourceGroupName>/providers/Microsoft.ManagedIdentity/userAssignedIdentities/<batchAccountManagedIdentity>"

_JOB_ID = f'r-pomp-iterative-filtering-{timestamp}'
_POOL_ID = 'default'

# Storage configuration
_STORAGE_ACCOUNT_NAME = '<storageAccountName>'
_CONTAINER_NAME = 'output'
_ACCOUNT_URL = f'https://{_STORAGE_ACCOUNT_NAME}.blob.core.windows.net'

# for usage with sas keys enabled and not using user managed identity
# _OUTPUT_CONTAINER_URL = f'{_ACCOUNT_URL}/{_CONTAINER_NAME}{_OUTPUT_CONTAINER_SAS}'
_OUTPUT_CONTAINER_URL = f'{_ACCOUNT_URL}/{_CONTAINER_NAME}'

# ACR configuration
_ACR_SERVER='<azureContainerRegistryName>.azurecr.io'
_IMAGE = f'{_ACR_SERVER}/r-pomp:4.4.1'

In [8]:
# credentials

default_credential = DefaultAzureCredential()
identity_reference = batch.models.ComputeNodeIdentityReference(resource_id=_BATCH_ACCOUNT_MANAGED_ID_RESOURCE_ID)

token = {'access_token': default_credential.get_token('https://batch.core.windows.net/.default').token}
batch_credentials = BasicTokenAuthentication(token)

# service clients

batch_client = batch.BatchServiceClient(batch_credentials, batch_url=_BATCH_ACCOUNT_URL)
blob_service_client = BlobServiceClient(account_url=_ACCOUNT_URL, credential=default_credential)

input_container_client = blob_service_client.get_container_client('input')
output_container_client = ContainerClient(account_url=_ACCOUNT_URL, container_name=_CONTAINER_NAME, credential=default_credential)

# configuration objects
task_registry = batch.models.ContainerRegistry(registry_server=_ACR_SERVER, identity_reference=identity_reference)
task_container_settings = batch.models.TaskContainerSettings(image_name=_IMAGE, registry=task_registry)
environment_settings = [batch.models.EnvironmentSetting(name=k, value=v) for k, v in {}.items()]
user = batch.models.UserIdentity(auto_user=batch.models.AutoUserSpecification(elevation_level=batch.models.ElevationLevel.admin, scope=batch.models.AutoUserScope.task))

# Upload the files

Upload the required files to the Azure Storage input container, which will be available to your batch tasks when they run.

In [None]:
import os

def upload_input_files(file_paths):

    print(f'Found {len(file_paths)} files to upload.')

    print(f'Uploading files to container input/{_JOB_ID}...')
    for file_path in file_paths:
        print(f'Uploading {file_path}...')
        with open(file_path, 'rb') as f:
            input_container_client.upload_blob(name=f'{_JOB_ID}/{os.path.basename(file_path)}', data=f, overwrite=True)

    print('Upload complete.')

args = ['./app/guesses.rds', './app/vpC.rds', './app/run_mif2_guesses.R']

upload_input_files(args)

# Creating the job

The Batch job itself is relatively simple at its core. All it needs is a pool and an id. There are more things that can be configured, such as preparation and completion tasks or behavior when a task fails.

In [10]:
job = batch.models.JobAddParameter(id=_JOB_ID, pool_info=batch.models.PoolInformation(pool_id=_POOL_ID,))
batch_client.job.add(job)

## Creating the tasks

Create your tasks to be dispatched to your job and run.  Each task will run a mif2 guess and save the results in an RData object.

In [None]:
import azure.batch as batch

tasks = []

for i in range(1, numguesses + 1):
    task_name = f"task_{i}"
    output_name = f'{_JOB_ID}/{task_name}'
    command = f'/bin/bash -c "cd {_JOB_ID} && chmod +x run_mif2_guesses.R && Rscript ./run_mif2_guesses.R {i}"'

    task = batch.models.TaskAddParameter(
        id=task_name,
        command_line=command,
        container_settings=task_container_settings,
        environment_settings=environment_settings,
        user_identity=user,
        resource_files=[
            batch.models.ResourceFile(
                auto_storage_container_name='input',
                blob_prefix=f'{_JOB_ID}/'
            )
        ],
        output_files=[
            batch.models.OutputFile(
                file_pattern='../std*.txt',
                destination=batch.models.OutputFileDestination(
                    container=batch.models.OutputFileBlobContainerDestination(
                        path=f'{output_name}/logs',
                        container_url=f'{_ACCOUNT_URL}/output',
                        identity_reference=identity_reference,
                        upload_headers=[batch.models.HttpHeader(name="Metadata", value="true")]
                    )
                ),
                upload_options=batch.models.OutputFileUploadOptions(
                    upload_condition=batch.models.OutputFileUploadCondition.task_completion)
            ),
            batch.models.OutputFile(
                file_pattern=f'./{_JOB_ID}/**/*',
                destination=batch.models.OutputFileDestination(
                    container=batch.models.OutputFileBlobContainerDestination(
                        path=f'{output_name}/data',
                        container_url=f'{_ACCOUNT_URL}/output',
                        identity_reference=identity_reference,
                        upload_headers=[batch.models.HttpHeader(name="Metadata", value="true")]
                    )
                ),
                upload_options=batch.models.OutputFileUploadOptions(
                    upload_condition=batch.models.OutputFileUploadCondition.task_success)
            )
        ]
    )
    
    tasks.append(task)

print(f'Firing off {len(tasks)} tasks!')

result = batch_client.task.add_collection(_JOB_ID, tasks)

batch_client.job.patch(_JOB_ID, batch.models.JobPatchParameter(on_all_tasks_complete=batch.models.OnAllTasksComplete.terminate_job))


## Creating an artifact

Now that our simulation tasks have all finished, we have a bunch of disparate files in Azure Storage. These steps download each of the files within the container path, load them into our local environment, and run some analysis on them in R.

In [None]:
# Get the list of simulations that completed
blobs = [b.name for b in output_container_client.list_blobs()]

# Filter and print only those blobs in the specified folder
for s in blobs[100:]:
    if s.startswith(_JOB_ID):
        print(s)

In [None]:

# Get the list of .RData files in the specified folder
rdata_files = [b.name for b in output_container_client.list_blobs() if b.name.startswith(_JOB_ID) and b.name.endswith('.RData')]

# Download the .RData files
if not os.path.exists('downloaded_data'):
    os.makedirs('downloaded_data')

for rdata_file in rdata_files:
    blob_client = blob_service_client.get_blob_client(container=_CONTAINER_NAME, blob=rdata_file)
    file_name = os.path.join('downloaded_data', os.path.basename(rdata_file))
    with open(file_name, "wb") as download_file:
        download_file.write(blob_client.download_blob().readall())

print(f"Downloaded {len(rdata_files)} .RData files to 'downloaded_data' directory.")

## Switch to R kernel

In [None]:
install.packages('pomp')
install.packages('ggplot2')
install.packages('tidyverse')
install.packages('reshape2')

In [None]:
library(pomp)
library(ggplot2)
library(tidyverse)
library(reshape2)

# Directory where the .RData files are stored
data_dir <- "downloaded_data"
guesses <- readRDS(paste0("./app/guesses.rds"))


# Traces plot
# List of RData files with full paths
mif_rdata_files <- list.files(data_dir, pattern = "mif_result_.*\\.RData", full.names = TRUE)

# Initialize an empty list to store the mif2d_pomp objects
mifs_list <- list()

# Load each RData file and extract the mif2d_pomp object
for (file in mif_rdata_files) {
  load(file)
  mifs_list <- c(mifs_list, list(mif_result))  # Assuming each RData file contains an object named 'mif_result'
}

# Generate the ggplot
trace_plot <- mifs_list |>
  lapply(traces) |>
  lapply(as.data.frame) |>
  lapply(function(df) {
    df <- df %>% mutate(iteration = row_number())
    return(df)
  }) |>
  bind_rows(.id = "guess") |>
  reshape2::melt(id.vars = c("iteration", "guess")) |>
  filter(variable != "b") |>
  ggplot(aes(x = iteration, y = value, group = guess, color = factor(guess))) +
  geom_line(size = 0.7, alpha = 0.7) +  # Thinner lines with transparency
  facet_wrap(~ variable, scales = "free_y") +
  guides(color = "none") +
  labs(x = "Iteration", y = "Value", title = "Traces from mif2 runs")

# Save the ggplot to a PNG file
ggsave("./results/trace_plot.png", plot = trace_plot, width = 15, height = 12, dpi = 300)


# Generate pair plot
pfilter_rdata_files <- list.files(data_dir, pattern = "mif_pfilter_result_.*\\.RData", full.names = TRUE)

# Initialize an empty list to store the mif2d_pomp objects
mifs_pfilter_list <- list()

# Load each RData file and extract the mif2d_pomp object
for (file in pfilter_rdata_files) {
  load(file)
  mifs_pfilter_list <- c(mifs_pfilter_list, list(pfilter_result))
}

# Extract log likelihoods from each pfilter result
log_lik_values <- sapply(mifs_pfilter_list, logLik)

log_lik_df <- data.frame(
  .id = seq_len(nrow(guesses)),
  loglik = log_lik_values
)

# Extract coefficients from each mif2d_pomp object
coef_list <- lapply(mifs_list, coef)
coef_df <- bind_rows(coef_list)
coef_df$.id <- seq_along(coef_list)

# Reshape coefficients data frame
coef_df_long <- pivot_longer(coef_df, cols = -".id")

# Combine log likelihoods and coefficients
estimates <- left_join(log_lik_df, coef_df_long, by = ".id")

# Reshape the data to wide format
estimates_wide <- pivot_wider(estimates, names_from = name, values_from = value)

# Generate the pairs plot and save it to a PNG file
png("./results/pairs_plot.png", width = 3000, height = 2400, res = 450)

estimates_wide |>
  bind_rows(guesses) |>
  filter(is.na(loglik) | loglik > max(loglik, na.rm = TRUE) - 30) |>
  mutate(col = if_else(is.na(loglik), "#99999955", "#ff0000ff")) |>
  {
    \(dat) pairs(
      ~loglik + r + sigma + K + N_0,
      data = dat,
      col = dat$col,
      pch = 16,
      cex = 0.6,
      cex.labels = 0.8
    )
  }()

dev.off()  # Ensure the device is properly closed