# Propdesk Data Pipeline - Volatility and CPU-bound processes
### Example: Calculate volatility for a pair in a given exchange, check if everything is ok and set up a periodic job to keep it updated

#### Make sure to have a Tardis key set up on your os environment, e.g.,

##### `export TARDIS_KEY=<tardis_key>`
Note: lines that actually trigger or create jobs will be commented

First, let's look how the ADA-BRL (cardano-brl) pair is called in Binance

In [1]:
%load_ext autoreload
%autoreload 2
from propdesk_services import tardis

exchange = 'binance'
items_to_search = ['ada', 'brl']

all_datasets = tardis.get_all_exchange_datasets(exchange)
print([i for i in all_datasets.keys() if all([s in i for s in items_to_search])])

['adabrl']


In [2]:
pair = 'adabrl'

Let's say we want to calculate it from 2022-07-01 to 2022-09-12

In [3]:
date_from = '2022-07-01'
date_to = '2022-09-12'

Then, we can define the parameters of the volatility estimation process

In [4]:
params_dict = {
    'exchange': exchange,
    'pair': pair,
    'start_date': date_from,
    'end_date': date_to,
    'bandwidth': 5,
    'lookback_seconds':60,
    'log_price': True,
    'annualized_volatility': False,
    'resampling_rule': '100mS',
    'mid_quote_change': True
}

We can simply run this job to have it computed. As an example, let's see if the script is deployed to databricks

In [5]:
from propdesk_services.azure_databricks import list_databricks_src_files
deployed_files = list_databricks_src_files()
print(deployed_files)

['spark_ohlcv.py', 'spark_schedule_handler.py', 'spark_seasonality.py', 'spark_volatility_zma.py']


#### If the script is not deployed, do it by placing it in `propdesk_estimators/spark_jobs` with a name that starts with `spark_` and then run `databricks_deploy.py` FROM THE DIRECTORY `propdesk_estimators/`

Now we can define the script to run and submit the job. It will return job and run info and a url to check the progress of the job

In [6]:
job_name = f'{pair}_{exchange}_vol'
job_type = 'cpu_intensive'
dataset_type = 'volatility_zma_mqc'
script_to_run = 'spark_volatility_zma.py'

from propdesk_services.azure_databricks import single_run_job

# function commented because it would trigger a run and return the data structure printed below
# single_run_job(job_name, script_to_run, params_dict, job_type=job_type)

# ⚠️ ATTENTION - job_type

Volatility is a **CPU-BOUND** job. This means the CPU dominates the time to complete the job. We download a small amount of data and work heavily on that little amount of data (we upsample dates to 100mS ticks).
That means it makes sense to parallelize not only the calculation (using Spark), but also parallelize the distribution of tasks in the Spark nodes. This increases performance and avoids overload.

In jobs that have an start_date and end_date, `single_run_job` will do that work of balancing, splitting the job in up to 100 tasks to be executed in a small ammount of parallel tasks (9-10), a ***batch*** of tasks. 
When each of those tasks in a ***batch*** ends, it triggers another tasks scheduled for the subsequent ***batch***. That guarantees that:
```
1 - cluster is not overloaded
2 - failure in executing one particular date will not compromise the whole of the operation (just a smaller subset of tasks in cascade triggered in subsequent batches)
```

Just pass the correct job type and everything will be fine :)
# ⚠️ ------------------------------------------

### Connecting to storage

In [7]:
from propdesk_estimators.exchange_storage import ExchangeStorage
binance = ExchangeStorage(exchange) # -- ExchangeStorage('binance')

Check for datasets that were already computed

In [8]:
query_dict = {
    'dataset_type': 'volatility_zma_mqc',
    'pair': pair, 
    'date_from': date_from, 
    'date_to': date_to, 
    'annual': False, 
    'bandwidth':5,
    'log': True,
    'lookback': 60,
    'resample': '100mS',
    'mid_quote_change': True
}

In [9]:
print ('Already computed datasets in query')
binance.list_datasets_by_params(query_dict)

Already computed datasets in query


['adabrl/volatility_zma_mqc/2022/07/adabrl_volatility_zma_mqc_2022-07-01',
 'adabrl/volatility_zma_mqc/2022/07/adabrl_volatility_zma_mqc_2022-07-02',
 'adabrl/volatility_zma_mqc/2022/07/adabrl_volatility_zma_mqc_2022-07-03',
 'adabrl/volatility_zma_mqc/2022/07/adabrl_volatility_zma_mqc_2022-07-04',
 'adabrl/volatility_zma_mqc/2022/07/adabrl_volatility_zma_mqc_2022-07-05',
 'adabrl/volatility_zma_mqc/2022/07/adabrl_volatility_zma_mqc_2022-07-06',
 'adabrl/volatility_zma_mqc/2022/07/adabrl_volatility_zma_mqc_2022-07-07',
 'adabrl/volatility_zma_mqc/2022/07/adabrl_volatility_zma_mqc_2022-07-08',
 'adabrl/volatility_zma_mqc/2022/07/adabrl_volatility_zma_mqc_2022-07-09',
 'adabrl/volatility_zma_mqc/2022/07/adabrl_volatility_zma_mqc_2022-07-10',
 'adabrl/volatility_zma_mqc/2022/07/adabrl_volatility_zma_mqc_2022-07-11',
 'adabrl/volatility_zma_mqc/2022/07/adabrl_volatility_zma_mqc_2022-07-12',
 'adabrl/volatility_zma_mqc/2022/07/adabrl_volatility_zma_mqc_2022-07-13',
 'adabrl/volatility_zma_m

## We could have used the method amend to find only the days that we are missing and generate a dict to process those days
Note: This notebook was created on 2021/12/15, so the data for this day will not be availabe, and the task related to this day failed. If this notebook is run after this date, the dataset will be available.

The amend_datasets_by_params generate a `params_dict` structure that can be used to run jobs to complete the missing days, as seen below

In [10]:
import datetime
query_dict = {
    'dataset_type': 'volatility_zma_mqc',
    'pair': pair, 
    'date_from': str(datetime.datetime.utcnow()), 
    'date_to': str(datetime.datetime.utcnow() + datetime.timedelta(days=2)), 
    'annual': False, 
    'bandwidth':5,
    'log': True,
    'lookback': 60,
    'resample': '100mS'
}

In [11]:
binance.amend_datasets_by_params(query_dict)


Missing datasets: ['2022-09-13', '2022-09-14', '2022-09-15']

Run appropriate job with these parameters:
{'dataset_type': 'volatility_zma_mqc', 'pair': 'adabrl', 'annual': False, 'bandwidth': 5, 'log': True, 'lookback': 60, 'resample': '100mS', 'start_date': '2022-09-13', 'end_date': '2022-09-16', 'exchange': 'binance'}


True

In [12]:

job_1 = {'dataset_type': 'volatility_zma_mqc', 'pair': 'adabrl', 'annual': False, 'bandwidth': 5, 'log': True, 'lookback': 60, 'resample': '100mS', 'start_date': '2022-09-13', 'end_date': '2022-09-16', 'exchange': 'binance'}

# we could just use a for loop
# job_1_run_info = single_run_job(job_name+'1', script_to_run, params_dict = job_1, max_tasks_in_job = 100)
#job_2_run_info = single_run_job(job_name+'2', script_to_run, params_dict = job_2, max_tasks_in_job = 100)

# print(job_1_run_info)
# print(job_2_run_info)

In [13]:
from propdesk_services.azure_databricks import get_run
# print(get_run(job_1_run_info['run_id'])['state'])
# print(get_run(job_2_run_info['run_id'])['state'])

### After jobs are done, the datasets should be uploaded to Azure Storage, and the same query performed before should return those datasets
amend will return an list with 2021/15/12 missing, because task failed (data was not available at the time)

In [None]:
binance.list_datasets_by_params(query_dict, amend=True)

### With the module `propdesk_services.exchange_storage`, we can interact with those datasets. Let's straight up get a dataframe with the data

In [16]:
from propdesk_services.exchange_storage import get_dataframe_by_params

query_dict = {
    'dataset_type': 'volatility_zma_mqc',
    'pair': 'adabrl', 
    'date_from': '2022-07-01', 
    'date_to': '2022-07-02', 
    'annual': False, 
    'bandwidth':5,
    'log': True,
    'lookback': 60,
    'resample': '100mS'
}


# pass the flag keep_local to keep raw files instead of downloading them again if needed
adabrl_vol_df = get_dataframe_by_params(exchange_str='binance', params_dict=query_dict, keep_local=True)
adabrl_vol_df

files saved to: /tmp/tmplv2g94h2


Unnamed: 0,datetime,volatility_estimation
0,2022-07-01 00:00:00.000,2.448069e-07
1,2022-07-01 00:00:00.100,2.448069e-07
2,2022-07-01 00:00:00.200,2.448069e-07
3,2022-07-01 00:00:00.300,2.448069e-07
4,2022-07-01 00:00:00.400,2.448069e-07
...,...,...
858589,2022-07-01 23:59:59.500,8.223578e-08
858590,2022-07-01 23:59:59.600,7.616000e-08
858591,2022-07-01 23:59:59.700,7.616000e-08
858592,2022-07-01 23:59:59.800,7.616000e-08


# Success :)

## Creating a periodic job
Now, after using, checking data, etc., we can create a periodic (weekly or daily) job to keep this dataset updated. We'll setup it to run weekly on Mondays at 04:00 AM Sao Paulo time, to use the cluster in an idle time. 

## Ideally, **check Databricks UI to see if there are jobs scheduled for those times and day to avoid overloading the cluster**

In [None]:
from propdesk_services.azure_databricks import create_periodic_job

pair = 'btcusdt'
exchange = 'binance'

# weekly job
job_name = f'daily_volatility_zma_mqc_{pair}_{exchange}'

# make sure to have a schedule that is compatible with the period
job_schedule = "0 15 1 ? * *"

period = 'daily'
# no need for start_date and end_date in weekly jobs 
job_params = {
    'exchange': exchange,
    'pair': pair,
    'bandwidth': 5,
    'lookback_seconds':60,
    'log_price': True,
    'annualized_volatility': False,
    'resampling_rule': '100mS'
}

create_periodic_job(job_name_str=job_name,
                    filename_str=script_to_run,
                    params_dict=job_params,
                    period=period,
                    cron_expression_str=job_schedule)

{'job': {'job_id': 837105722299197},
 'schedule_job': {'job_id': 874249017617680}}

## That's it. **check Databricks UI to make sure everything is ok**

### Have fun, move fast, break things, buy btc (or dcr or algorand) ⚡.
#### -- Propdesk Transfero