<img src="https://github.com/OasisLMF/Workshop2019/raw/master/images/oasis-lmf-colour.png" alt="Oasis LMF logo" width="250" align="left"/>
<br><br><br>

Exercise 3: Autoscaling  
========================



## 3.1 Autoscalling overview
Each model in the OasisAPI has two new endpoints for controlling `job chunking` and  `worker Autoscaling`. These match up with the meta-data files `chunking_configuration.json` and `scaling_configuration.json` which were uploaded in exercise 2. Like with the model_settings, on model registration these endpoints are updated with the json files stored in the azure file share. So fetching these values for the CHAZ model should match up with the files uploaded.


<img src="https://github.com/OasisLMF/Workshop2022/blob/main/images/m_chunking_config.png?raw=true" alt="chunking" width="600" align="center" style="float"/>
<img src="https://github.com/OasisLMF/Workshop2022/blob/main/images/m_scaling_config.png?raw=true" alt="scalling" width="600" align="center" style="float"/><br>


These two sets of options are read by seperate [kubernetes pods](https://kubernetes.io/docs/concepts/workloads/pods/) 

<img src="https://github.com/OasisLMF/Workshop2022/blob/main/images/controller_pods.png?raw=true" alt="chunking" width="600" align="center" style="float"/><br>


* **oasis-task-controller** - Is responsible for reading a model's `chunking_configuration` and splitting up, either `input_generation` or `loss_analyses`, requests into a fixed number of celery **sub-tasks** which can then be distributed across multiple nodes in the cluster.   

* **oasis-worker-controller** Manages the number of running worker pods by reading `scaling_configuration` and adjusting a model deployments [ReplicaSet](https://kubernetes.io/docs/concepts/workloads/controllers/replicaset/) based on the current Job Queue status.  (pushed as Async updates from the Oasis Websocket) 



### Sub-task concurrency
The number of concurrently running 'slots' for sub-tasks is based on both the number of running worker pods AND how many cores are available per worker. The worker node type used in this workshop is [Standard_E2_v3](https://azureprice.net/vm/Standard_E2_v3) which has **2 vCPUs** per worker node. So if 4 workers are running, the maximum allocation per workshop account, then (4*2), **8 chunks can run in parallel** of either input or loss generation.
**This limit applies across all models** 


## 3.2 Understanding the Configuration options 

In [None]:
# Edit this value to match your workshop ID, if your username is `workshop6@oasislmfenterprise.onmicrosoft.com` set   'WORKSHOP_ID=6'
WORKSHOP_ID=

# Install of workshop package 
!pip install oasis-workshop==1.0.0

# Clear cell output
from IPython.display import clear_output
clear_output(wait=False) 

In [None]:
# Python Imports 
from requests import get

from oasis_workshop.client import APIClient
from oasis_workshop.funcs import (
    tabulate_endpoint, 
    tabulate_json,
    tabulate_analysis,
    tabulate_portfolio,
    plot_subtasks
)
    
from IPython.display import (
    display, 
    Code,
    clear_output, 
    HTML, 
    JSON,
    Markdown 
)

In [None]:
# Start API connection 
oasis_server = APIClient(
    api_url=f'https://oasis-workshop-{WORKSHOP_ID}.northcentralus.cloudapp.azure.com/api/',
    username='admin', 
    password='password'
)

# Get current configs for the Chaz & PiWind models 
CHAZ_MODEL_ID = oasis_server.models.search({"supplier_id__contains":'columbia'}).json()[0]['id']
PIWIND_MODEL_ID = oasis_server.models.search({"supplier_id__contains":'OasisLMF'}).json()[0]['id']

display(JSON(oasis_server.models.chunking_configuration.get(CHAZ_MODEL_ID).json(), root='CHAZ - chunking_configuration'))
display(Markdown('___'))
display(JSON(oasis_server.models.scaling_configuration.get(CHAZ_MODEL_ID).json(), root='CHAZ - scaling_configuration'))
#display(Markdown('___'))
#display(JSON(oasis_server.models.chunking_configuration.get(PIWIND_MODEL_ID).json(), root='PiWind - chunking_configuration'))
#display(Markdown('___'))
#display(JSON(oasis_server.models.scaling_configuration.get(PIWIND_MODEL_ID).json(), root='PiWind - scaling_configuration'))

## 3.2.2 Scaling options
There are three modes of scaling operation which are controlled using the `scaling_strategy` key. 
The idle state for all Strategies is 0 worker pods running, this applies when a model's main celery queue is empty. 

#### **Strategy 1** - FIXED_WORKERS
When one or more tasks are placed on a model queue, then a fixed number of workers will be started. Once all of the pending sub-tasks (or chunks) have completed the model deploymented will return to the idle state, where all workers are shut down.
The number of workers started is controlled by the int value stored in `worker_count_fixed`. 

> **Note:** The control values not connected with a selected strategy have no effect when set.
So when running with `FIXED_WORKERS` only the value in `worker_count_fixed` is used. 
Both `worker_count_max` and `chunks_per_worker` are ignored and have no effect on the number of workers started. The same applies to all other modes, only the values relevant to that Strategy are applied. 

In [None]:
req = oasis_server.models.scaling_configuration.post(PIWIND_MODEL_ID, {
    "scaling_strategy":    "FIXED_WORKERS",
    "worker_count_fixed": 4
})

#### **Strategy 2** - QUEUE_LOAD
The number of workers started depends on how many pending tasks are waiting on a model's queue. These are **main tasks** and not sub-tasks. So if `<m>` loss analysis requests are sent to the API via `/api/v1/analyses/{id}/run/` then `<m>` worker pods will be started, up to a maximum of `worker_count_max`.  

In [None]:
req = oasis_server.models.scaling_configuration.post(PIWIND_MODEL_ID, {
    "scaling_strategy":    "QUEUE_LOAD",
    "worker_count_max": 4
})

#### **Strategy 3** - DYNAMIC_TASKS
The number of workers started is controlled by the number of **sub-tasks** waiting on the model queue. The number of workers started is the sum of all sub-tasks (a.k.a chunks) on the model queue divided by `chunks_per_worker`. For example, if three loss analysis requests are send and each broken up into 15 chunks and `chunks_per_worker = 5` we'll have **{3 * 15} / 5 = 9 workers** started. 
However, the maximum cap `worker_count_max` still applies to this strategy.

In [None]:
CHUNKS_PER_WORKER = 2
req = oasis_server.models.scaling_configuration.post(PIWIND_MODEL_ID, {
    "scaling_strategy":    "DYNAMIC_TASKS",
    "chunks_per_worker": CHUNKS_PER_WORKER, 
    "worker_count_max": 4
})

## 3.2.3 Chunking options
There are two modes, **FIXED** and **DYNAMIC**, for splitting up either `lookup` or `loss` stages. These are controlled independently using `lookup_strategy` and `loss_strategy`.

#### **Strategy 1** - FIXED_CHUNKS
Setting a fixed value breaks every **main task** into a fixed number of chunks. So if we set the following for a model.

In [None]:
FIXED_LOOKUP_CHUNKS = 10
req = oasis_server.models.chunking_configuration.post(PIWIND_MODEL_ID, {
    "lookup_strategy":    "FIXED_CHUNKS",
    "fixed_lookup_chunks": FIXED_LOOKUP_CHUNKS
})

Every `/api/v1/analyses/{id}/generate_inputs/` request will split the given **location.csv** into **FIXED_LOOKUP_CHUNKS** even partitions and run the oasis lookup on each partition in parallel (and across multiple workers). 


In [None]:
FIXED_EVENT_BATCHES = 20
req = oasis_server.models.chunking_configuration.post(PIWIND_MODEL_ID, {
    "loss_strategy":         "FIXED_CHUNKS",
    "fixed_analysis_chunks": FIXED_EVENT_BATCHES
})

Now every `/api/v1/analyses/{id}/run/` request will split into **FIXED_EVENT_BATCHES** event batches. 
Each event batch is analogous to the ktools execution pipes, and runs in its own bash scripted.
`eve 1 <FIXED_EVENT_BATCHES> | modelpy | gulpy ... etc`


#### Notes on Fixed chunking 

* **1. Minimum chunking** - If a file size is smaller than the requested chunk size, say a 4 line location is requested to be split 5 ways, then only 4 chunks will be created.

* **2. Single fixed chunk** - This is treated as a special case, If a request for a fixed single chunk is sent, this task is not split and acts like tasks from the version `1.x.x` Platform. The task will run only on a single worker pod and parallelize across the resources of that single node.     

#### **Strategy 2** - DYNAMIC_CHUNKS

Using the `DYNAMIC`chunking strategy, will automatically scale the number of sub-tasks based on the size of the input given. 
For lookup this is based on the number of lines in the **location.csv**. So if a location file has 100,000 lines and 
`dynamic_locations_per_lookup=10000` is set, then this will result in **10 chunks** each with **10,000 location lines** to process in parallel. 

The value of `dynamic_chunks_max` is a cap on the maximum number of chunks, so setting, `dynamic_locations_per_lookup=1` for that same file will return **200 chunks** and **not 100,000 chunks**.

In [None]:
LOC_LINES_PER_CHUNK = 10000
req = oasis_server.models.chunking_configuration.post(PIWIND_MODEL_ID, {
    "lookup_strategy":              "DYNAMIC_CHUNKS",
    "dynamic_locations_per_lookup": LOC_LINES_PER_CHUNK, 
    "dynamic_chunks_max":           200
})

Using DYNAMIC chunking for loss generation scales by the size of event set selected, the selected event set from `model_settings.json` _MUST_ have a value set `number_of_events = <total-events-in-set>` Otherwise the calls to `/api/v1/analyses/{id}/run/` will return a 400 Bad Request response. 

<img src="https://github.com/OasisLMF/Workshop2022/blob/main/images/err_dynamic_run.png?raw=true" alt="chunking" width="500" align="center" style="float"/><br>


The value `dynamic_events_per_analysis=n` works in a similar way as for lookup, sub-tasks are created containing batches of `n` events.
Looking at PiWind, we see that there is **1447** events in set **p**. 

If `p` is selected with `dynamic_events_per_analysis=100`, then a total of **15 sub-tasks** will be generated.

In [None]:
display(JSON(oasis_server.models.settings.get(PIWIND_MODEL_ID).json()['model_settings']['event_set']['options'][0], root='event_set'))

In [None]:
EVENTS_PER_BATCH = 100
req = oasis_server.models.chunking_configuration.post(PIWIND_MODEL_ID, {
    "loss_strategy":              "DYNAMIC_CHUNKS",
    "dynamic_events_per_analysis": EVENTS_PER_BATCH, 
    "dynamic_chunks_max":           200
})

## 3.3 Running a fixed size configuration 

### 3.3.1 Create PiWind Portfolio & analysis 

In [None]:
# Load exposure and create Portfolio 
portfolio_name = 'piwind_portfolio'
analysis_name = 'fixed_run_piwind'

# ---- Data Source  ---------------------- #
settings_url = 'https://raw.githubusercontent.com/OasisLMF/OasisPiWind/master/analysis_settings.json'
base_url='https://raw.githubusercontent.com/OasisLMF/OasisPiWind/master/tests/inputs'
loc_url=f'{base_url}/SourceLocOEDPiWind10.csv'
#acc_url=f'{base_url}/SourceAccOEDPiWind.csv'
#scp_url=f'{base_url}/SourceReinsScopeOEDPiWind.csv'
#inf_url=f'{base_url}/SourceReinsInfoOEDPiWind.csv'


# ---- Portfolio ---------------------- #
portfolio_list = oasis_server.portfolios.search({'name': portfolio_name}).json()
if len(portfolio_list) > 0:
     PORTFOLIO_ID = portfolio_list[-1]['id']
else:
     PORTFOLIO_ID = oasis_server.portfolios.create(portfolio_name).json()['id']

# Upload exposure data 
loc_data = get(loc_url).content
#acc_data = get(acc_url).content 
#scp_data = get(scp_url).content
#inf_data = get(inf_url).content

oasis_server.portfolios.location_file.post(PORTFOLIO_ID, loc_data, content_type='text/csv')
#oasis_server.portfolios.accounts_file.post(PORTFOLIO_ID, acc_data, content_type='text/csv')
#oasis_server.portfolios.reinsurance_scope_file.post(PORTFOLIO_ID, scp_data, content_type='text/csv')
#oasis_server.portfolios.reinsurance_info_file.post(PORTFOLIO_ID, inf_data, content_type='text/csv')


# ---- Analysis ---------------------- #
# Find or create analsysis 
analysis_list = oasis_server.analyses.search({'name': analysis_name}).json()
if len(analysis_list) > 0:
    ANALYSIS = analysis_list[0]
    ANALYSIS_ID = ANALYSIS['id']
else:    
    ANALYSIS = oasis_server.analyses.create(analysis_name, portfolio_id=PORTFOLIO_ID, model_id=PIWIND_MODEL_ID).json()
    ANALYSIS_ID = ANALYSIS['id']
    
# Upload analysis Settings   
analysis_settings = get(settings_url).json()
oasis_server.analyses.settings.post(ANALYSIS['id'], analysis_settings).json()


# ---- Display State ---------------------- #
display(Markdown('#### Selected Portfolio'))
display(tabulate_portfolio([oasis_server.portfolios.get(PORTFOLIO_ID).json()]))
display(Markdown('#### Selected Analyses'))
display(tabulate_analysis([oasis_server.analyses.get(ANALYSIS_ID).json()]))

### 3.3.2 Set PiWind to Fixed Chunks
Set the number of worker pods to 4 (Max) and set the chunking options to fill all concurrent 'slots'. 

In [None]:
# Set chunking to 8 for both input & loss generation
r = oasis_server.models.chunking_configuration.post(PIWIND_MODEL_ID, {
    "lookup_strategy": "FIXED_CHUNKS",
    "fixed_lookup_chunks": 8,
     "loss_strategy": "FIXED_CHUNKS",
    "fixed_analysis_chunks": 8
})

# Set scaling to 4 workers 
r = oasis_server.models.scaling_configuration.post(PIWIND_MODEL_ID, {
    "scaling_strategy":    "FIXED_WORKERS",
    "worker_count_fixed": 4
})

### 3.3.3 Generate Oasis Inputs 
At this point its worth opening up an [Azure cloud shell](https://shell.azure.com/) and monitoring the running pods.

#### Watching running pods 
<img src="https://github.com/OasisLMF/Workshop2022/blob/main/images/terminal_icon.png?raw=true" alt="Azure Portal Home" width="28" align="left"/>  `watch -n 1 kubectl get pods`

#### Viewing pod logs 
<img src="https://github.com/OasisLMF/Workshop2022/blob/main/images/terminal_icon.png?raw=true" alt="Azure Portal Home" width="28" align="left"/>  `kubectl logs {pod-name}`


#### Restarting a pod 
This might be needed for the `worker-controller`, which can get stuck in an inconsistent state. If model workers are not spinning up or down try forcing a restart using this command. 

<img src="https://github.com/OasisLMF/Workshop2022/blob/main/images/terminal_icon.png?raw=true" alt="Azure Portal Home" width="28" align="left"/>  `kubectl delete pod {pod-name}`


In [None]:
if oasis_server.analyses.status(ANALYSIS_ID) != 'READY':
    print('Generating Oasis Inputs')
    oasis_server.run_generate(ANALYSIS['id'])

display(tabulate_analysis([oasis_server.analyses.get(ANALYSIS_ID).json()]))

### 3.3.4 Show List of SubTasks (Optional Step)

In [None]:
input_get_subtasks = oasis_server.analyses.sub_task_list(ANALYSIS['id']).json()
JSON(input_get_subtasks, root='Input Generation SubTasks')

### 3.3.5 Display Log file from one of the SubTasks (Optional Step)

In [None]:
# Display log file from a Sub-Task
CHUNK_ID = 1
TASK_NAMES= [f"prepare-keys-file-{CHUNK_ID}", f"generate-losses-chunk-{CHUNK_ID}"]
input_get_subtasks = oasis_server.analyses.sub_task_list(ANALYSIS_ID).json()
subtask_id = [x for x in input_get_subtasks if x['slug'] in TASK_NAMES].pop()['id']
subtask_output_log = oasis_server.task_status.output_log.get(subtask_id).text
Code(subtask_output_log)

### 3.3.6 Graph the execution time of each lookup SubTask

In [None]:
## Graph SubTask Execution time
if oasis_server.analyses.status(ANALYSIS_ID) == 'READY':
    input_get_subtasks = oasis_server.analyses.sub_task_list(ANALYSIS_ID).json()
    display(plot_subtasks(input_get_subtasks))
else:
    print("Warning: Input generation has not completed")

### 3.3.7 Update PiWind to use only one worker and Generate Losses

In [None]:
oasis_server.models.scaling_configuration.post(PIWIND_MODEL_ID, {
    "scaling_strategy": "FIXED_WORKERS",
    "worker_count_fixed": 1
})

if oasis_server.analyses.status(ANALYSIS_ID) != 'RUN_COMPLETED':
    print('Starting Analysis Losses')
    oasis_server.run_analysis(ANALYSIS_ID)

display(tabulate_analysis([oasis_server.analyses.get(ANALYSIS_ID).json()]))

### 3.3.8 Graph the execution time of each loss SubTask
You should see a staggered plot, its because only two subtasks can execute simultaneously. 

In [None]:
if oasis_server.analyses.status(ANALYSIS_ID) == 'RUN_COMPLETED':
    loss_gen_subtasks = oasis_server.analyses.sub_task_list(ANALYSIS['id']).json()
    display(plot_subtasks(loss_gen_subtasks))
else:
    print("Warning: Execution has not completed")

## 3.4 Running a dynamic configuration

In this section we run the CHAZ model, only this time using Dynamic chunking.
For reference the input sizes are:
* `location_file` = 85803 rows
* `event_set` = 38127 events 

### 3.4.1 Update the chaz model chunk settings

In [None]:
# Edit thses values to changes the generated sub-tasks
LOC_ROWS_PER_CHUNK=2000
EVENTS_PER_BATCH=1000
MAX_CHUNKS=50

# Chunks for each execution stage 
from math import ceil
location_rows=85803
event_set_size=38127
expected_lookup_chunks = min(ceil(location_rows / LOC_ROWS_PER_CHUNK), MAX_CHUNKS)
expected_loss_chunks = min(ceil(event_set_size / EVENTS_PER_BATCH), MAX_CHUNKS)

# Set chunking to 8 for both input & loss generation
oasis_server.models.chunking_configuration.post(CHAZ_MODEL_ID, {
    "lookup_strategy": "DYNAMIC_CHUNKS",
    "dynamic_locations_per_lookup": LOC_ROWS_PER_CHUNK,
    "loss_strategy": "DYNAMIC_CHUNKS",
    "dynamic_events_per_analysis": EVENTS_PER_BATCH,
    "dynamic_chunks_max": MAX_CHUNKS
})

# Set scaling to 4 workers 
oasis_server.models.scaling_configuration.post(CHAZ_MODEL_ID, {
    "scaling_strategy": "FIXED_WORKERS",
    "worker_count_fixed": 4
})

print(f' lookup_chunks = {expected_lookup_chunks}')
print(f' loss_chunks   = {expected_loss_chunks}')

### 3.5.2 Create CHAZ Portfolio & analysis

In [None]:
# Load exposure and create Portfolio 
portfolio_name = 'chaz_portfolio'
analysis_name = 'chaz_dynamic_run'

# Data Source 
base_url = 'https://raw.githubusercontent.com/OasisLMF/Workshop2022/main/examples'
loc_url = f'{base_url}/oed_location_litpop.csv'
run_settings = f'{base_url}/chaz_analysis_settings.json' 


# ---- Portfolio ---------------------- #
portfolio_list = oasis_server.portfolios.search({'name': portfolio_name}).json()
if len(portfolio_list) > 0:
     PORTFOLIO_ID = portfolio_list[-1]['id']
else:
     PORTFOLIO_ID = oasis_server.portfolios.create(portfolio_name).json()['id']

# Upload exposure data 
oasis_server.portfolios.location_file.post(
    PORTFOLIO_ID, 
    get(loc_url).content, 
    content_type='text/csv'
)


# ---- Analysis ---------------------- #
# Find or create analsysis 
analysis_list = oasis_server.analyses.search({'name': analysis_name}).json()
if len(analysis_list) > 0:
    ANALYSIS = analysis_list[0]
    ANALYSIS_ID = ANALYSIS['id']
else:    
    ANALYSIS = oasis_server.analyses.create(analysis_name, portfolio_id=PORTFOLIO_ID, model_id=CHAZ_MODEL_ID).json()
    ANALYSIS_ID = ANALYSIS['id']
    
# Upload analysis Settings   
oasis_server.analyses.settings.post(ANALYSIS['id'], get(run_settings).json()).json()



# ---- Display State ---------------------- #
display(Markdown('#### Selected Portfolio'))
display(tabulate_portfolio([oasis_server.portfolios.get(PORTFOLIO_ID).json()]))
display(Markdown('#### Selected Analyses'))
display(tabulate_analysis([oasis_server.analyses.get(ANALYSIS_ID).json()]))

### 3.5.3 Generate Inputs and Graph

In [None]:
ANALYSIS = oasis_server.analyses.get(ANALYSIS_ID).json()
if oasis_server.analyses.status(ANALYSIS_ID) != 'READY':
    print('Generating Oasis Inputs')
    oasis_server.run_generate(ANALYSIS['id'])

## Graph SubTask Execution timeInputs
if oasis_server.analyses.status(ANALYSIS_ID) == 'READY':
    input_get_subtasks = oasis_server.analyses.sub_task_list(ANALYSIS_ID).json()
    display(plot_subtasks(input_get_subtasks))
else:
    print("Warning: Input generaInputstion has not completed")

display(Markdown('#### Completed Analyses'))
display(tabulate_analysis([oasis_server.analyses.get(ANALYSIS_ID).json()]))

### 3.5.3 Generate Losses and Graph

In [None]:
if oasis_server.analyses.status(ANALYSIS_ID) != 'RUN_COMPLETED':
    print('Starting Analysis Losses')
    oasis_server.run_analysis(ANALYSIS_ID)

display(tabulate_analysis([oasis_server.analyses.get(ANALYSIS_ID).json()]))

if oasis_server.analyses.status(ANALYSIS_ID) == 'RUN_COMPLETED':
    loss_gen_subtasks = oasis_server.analyses.sub_task_list(ANALYSIS['id']).json()
    display(plot_subtasks(loss_gen_subtasks))
else:
    print("Warning: Execution has not completed")
    
display(Markdown('#### Selected Analyses'))
display(tabulate_analysis([oasis_server.analyses.get(ANALYSIS_ID).json()]))