# Running Dask on the cluster with mlrun

The dask frameworks enables users to parallelize their python code and run it as a distributed process on Iguazio cluster and dramatically accelerate their performance. <br>
In this notebook you'll create an mlrun function running as a dask client. <br>
It also demonstrates how to run parallelize query against snowflake using Dask Delayed option to query a large data set from snowflake 

For more information on dask over kubernetes: https://kubernetes.dask.org/en/latest/

### Set up the enviroment

In [1]:
import mlrun
import os
import warnings
import yaml

project_name = "snowflake-dask"
dask_cluster_name="snowflake-dask-cluster"
artifact_path = mlrun.set_environment(project=project_name,
                                      artifact_path = os.path.join(os.path.abspath('/v3io/projects/'), project_name))

warnings.filterwarnings("ignore")
print(f'artifact_path = {artifact_path}')

> 2022-03-10 19:31:17,431 [info] loaded project snowflake-dask from MLRun DB
artifact_path = ('snowflake-dask', '/v3io/projects/snowflake-dask')


### Load snowflake configuration from config file. 
This is for demo purpose, in the real production code, you would need to put the snowflake connection info into secrets use the secrets in the running pod to connect to snowflake

In [2]:
# Load connection info
with open("config.yaml") as f:
    connection_info = yaml.safe_load(f)

# verify the config
print(connection_info['account'])
mlrun.get_run_db().create_project_secrets(
    project_name,
    provider=mlrun.api.schemas.SecretProviderName.kubernetes,
    secrets=connection_info
)

nf77378.eu-west-2.aws


### Create a python function

This function querys data from snowflake using snowflake python connector for parallel processing of the query results. <br>
With snoeflake python connector, when you execute a query, the cursor will return the result batches. <br>
Using Dask Delayed it will return and process results set in parallel. <br>

#### write the function to a py file

In [3]:
%%writefile snowflake-dask.py
import mlrun
from mlrun.execution import MLClientCtx
from mlrun.datastore import DataItem
import snowflake.connector as snow
import os
import numpy as np
from dask.distributed import Client
from dask.dataframe import from_delayed
from dask import delayed
from dask import dataframe as dd

import warnings
warnings.filterwarnings("ignore")
    
@delayed
def load(batch):
    try:
        print("BATCHING")
        df_ = batch.to_pandas()
        return df_
    except Exception as e:
        print(f"Failed on {batch} for {e}")
        pass

def load_delayed(dask_client, connection_info, query, out_dir, write_out=False, publish=False):        
    context = mlrun.get_or_create_ctx('dask-cluster')  
    sfAccount = context.get_secret('account')
    context.log_result('sfAccount', sfAccount)
    context.logger.info(f'sfAccount = {sfAccount}')
    # setup dask client from the MLRun dask cluster function
    if dask_client:
        client = mlrun.import_function(dask_client).client
        context.logger.info(f'Existing dask client === >>> {client}\n')
    else:
        client = Client()
        context.logger.info(f'\nNewly created dask client === >>> {client}\n')
        
    query = query

    conn = snow.connect(**connection_info)
    cur = conn.cursor()
    cur.execute(query)
    batches = cur.get_result_batches()
    print(f'batches len === {len(batches)}\n')
    
    dfs = []    
    for batch in batches:
        if batch.rowcount > 0:
            df = load(batch)
            dfs.append(df)        
    ddf = from_delayed(dfs)
    
    # materialize the query results set for some sample compute
    
    ddf_sum = ddf.sum().compute()
    ddf_mean = ddf.mean().compute()
    ddf_describe = ddf.describe().compute()
    ddf_grpby = ddf.groupby("C_CUSTKEY").count().compute()
    
    context.logger.info(f'sum === >>> {ddf_sum}\n')
    context.logger.info(f'mean === >>> {ddf_mean}\n')
    context.logger.info(f'ddf head === >>> {ddf.head()}\n')
    context.logger.info(f'ddf  === >>> {ddf}\n')

    context.log_result('number of rows', len(ddf.index))   
    
    context.log_dataset('dask_data_frame', ddf)
    context.log_dataset("my_df_describe", df=ddf_describe)
    context.log_dataset("my_df_grpby",    df=ddf_grpby)
    
    ddf.persist(name = 'customer')
    if publish and (not client.list_datasets()):    
        client.publish_dataset(customer=ddf)
        
    if write_out:
        dd.to_parquet(df=ddf, path=out_dir)
        context.log_result('parquet', out_dir)

Overwriting snowflake-dask.py


### Convert the code to MLRun function

Use code_to_function to convert the code to MLRun and specify the configuration for the dask process (e.g. replicas, memory etc) <br>
Note that the resource configurations are per worker

In [4]:
image='.mlrun/snowflakedask'

In [5]:
fn = mlrun.code_to_function("snowflake-dask-mlrun",  
                            kind='job', 
                            filename='snowflake-dask.py',
                            image=image,
                            handler="load_delayed").apply(mlrun.platforms.auto_mount())

### Get dask client uri

In [6]:
# function URI is db://<project>/<name>
dask_uri = f'db://{project_name}/{dask_cluster_name}'
dask_uri

'db://snowflake-dask/snowflake-dask-cluster'

### Run the function

When running the function you would see a remote dashboard link as part of the result. click on this link takes you to the dask monitoring dashboard

In [7]:
import uuid
parquet_path = f"/v3io/bigdata/pq_from_sf_dask/{uuid.uuid1()}"

fn.run(handler = 'load_delayed',
       params={"dask_client": dask_uri, 
               "connection_info" :connection_info, 
               "query": "SELECT * FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.CUSTOMER LIMIT 3000",
               "out_dir": parquet_path,
               "write_out": True,
               "publish": True
              }
      )

> 2022-03-10 19:31:29,094 [info] starting run snowflake-dask-mlrun-load_delayed uid=7919bc3027e44f6c8794029c945eda57 DB=http://mlrun-api:8080
> 2022-03-10 19:31:29,271 [info] Job is running in the background, pod: snowflake-dask-mlrun-load-delayed-6554g
> 2022-03-10 19:31:36,289 [info] sfAccount = nf77378.eu-west-2.aws
> 2022-03-10 19:31:36,730 [info] trying dask client at: tcp://mlrun-snowflake-dask-cluster-1d25cde8-8.default-tenant:8786
> 2022-03-10 19:31:36,738 [info] using remote dask scheduler (mlrun-snowflake-dask-cluster-1d25cde8-8) at: tcp://mlrun-snowflake-dask-cluster-1d25cde8-8.default-tenant:8786
remote dashboard: default-tenant.app.us-sales-322.iguazio-cd1.com:30066
> 2022-03-10 19:31:36,738 [info] Existing dask client === >>> <Client: 'tcp://172.31.1.12:8786' processes=1 threads=1, memory=3.88 GiB>

batches len === 4

> 2022-03-10 19:31:50,522 [info] sum === >>> C_CUSTKEY                                             13603998844
C_NAME          Customer#004750001Customer#00

project,uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
snowflake-dask,...945eda57,0,Mar 10 19:31:35,completed,snowflake-dask-mlrun-load_delayed,v3io_user=xingshengkind=jobowner=xingshengmlrun/client_version=0.10.0host=snowflake-dask-mlrun-load-delayed-6554g,,"dask_client=db://snowflake-dask/snowflake-dask-clusterconnection_info={'user': 'xingsheng', 'password': 'Xgg2jcDDbxBsB7oL', 'warehouse': 'compute_sh', 'account': 'nf77378.eu-west-2.aws'}query=SELECT * FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.CUSTOMER LIMIT 3000out_dir=/v3io/bigdata/pq_from_sf_dask/aee6641a-a0a8-11ec-b79f-0788189e3eecwrite_out=Truepublish=True",sfAccount=nf77378.eu-west-2.awsnumber of rows=3000parquet=/v3io/bigdata/pq_from_sf_dask/aee6641a-a0a8-11ec-b79f-0788189e3eec,dask_data_framemy_df_describemy_df_grpby





> 2022-03-10 19:32:01,048 [info] run executed, status=completed


<mlrun.model.RunObject at 0x7f2fbd398090>

## Track the progress in the UI

Users can view the progress and detailed information in the mlrun UI by clicking on the uid above. <br>
Also, to track the dask progress in the dask UI click on the "dashboard link" above the "client" section