# Create a Hail cluster on Enterprise Terra

<table align="left">

  <td>
    <a href="https://github.com/DataBiosphere/terra-axon-examples/blob/main/dataproc/create_hail_cluster.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo">
      View on GitHub
    </a>
  </td>
  <td>
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/DataBiosphere/terra-axon-examples/main/dataproc/create_hail_cluster.ipynb">
      <img src="https://lh3.googleusercontent.com/UiNooY4LUgW_oTvpsNhPpQzsstV5W8F7rYgxgGBD85cWJoLmrOzhVs_ksK_vgx40SHs7jCqkTkCk=e14-rj-sc0xffffff-h130-w32" alt="Vertex AI logo">
      Open in a Terra notebook instance
    </a>
  </td>                                                                                               
</table>

# Overview

This notebook demonstrates how to create Hail clusters on Enterprise Terra and submit batch jobs to them. It also discusses how to use JupyterLab on the Hail cluster and access debugging consoles such as the Spark console.

<div class="alert alert-block alert-info">
<b>Note:</b> This Enterprise Terra notebook creates a Hail cluster, but Enterprise Terra users can also do this from the terminal or using the Cloud Console UI if they pass the additional configuration needed to install the Hail library.
</div>


## Objective

In this tutorial you will learn how to run [Hail](https://hail.is/) via [Dataproc](https://cloud.google.com/dataproc/docs/concepts/overview) with [autoscaling](https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/autoscaling#what_is_autoscaling) for resource management. The steps include:

1. Gather the necessary configuration for the cluster.
1. Create the Hail cluster.
1. Access JupyterLab and the Spark console running on the cluster.
1. Submit a script to the cluster for Hail to run in batch mode.

## How to run this notebook

Run this notebook cell by cell to create and use a Hail cluster.

## Costs

This notebook takes less than a minute to run, which will typically cost less than $0.01 of compute time on your cloud environment. This estimate does not include the [cost](https://cloud.google.com/dataproc/pricing) of running the Dataproc cluster.

# Setup and Configuration



In [None]:
from datetime import datetime
import os

In [None]:
if not (os.getenv('GOOGLE_CLOUD_PROJECT')
        and os.getenv('GOOGLE_SERVICE_ACCOUNT_EMAIL')
        and os.getenv('TERRA_USER_EMAIL')):
    raise Exception('Expected environment variables are not available. Please let terra-support@verily.com know.')

Obtain the GCP project ID so that it can be used in the name of any buckets we create and also to tell Dataproc where to create the cluster.

In [None]:
PROJECT = os.getenv('GOOGLE_CLOUD_PROJECT')

PROJECT

Obtain the Enterprise Terra user's service account. They will act as this same service account when running notebooks or batch scripts on the Dataproc cluster.

In [None]:
SERVICE_ACCOUNT = os.getenv('GOOGLE_SERVICE_ACCOUNT_EMAIL')

SERVICE_ACCOUNT

Obtain the user name so that it can become part of the Hail cluster name and staging bucket. This is useful when people collaborate in Enterprise Terra workspaces and want to differentiate their clusters from each other.

In [None]:
USER = os.getenv('TERRA_USER_EMAIL').split('@')[0].replace('.', '-')

USER

## Check the Enterprise Terra reference to the Dataproc staging bucket

Create the user's Dataproc staging bucket, if it does not yet exist.

In [None]:
!terra resource resolve --name dataproc_staging_{USER} || terra resource create gcs-bucket \
    --name=dataproc_staging_{USER} \
    --bucket-name={PROJECT}-dataproc-staging-{USER} \
    --cloning=COPY_NOTHING \
    --description="Bucket for {USER} Dataproc staging files. See https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/staging-bucket"

Resolve the reference to the staging bucket so that we can use it in subsequent commands.

In [None]:
STAGING_BUCKET_CMD_OUTPUT = !terra resolve --name=dataproc_staging_{USER}
STAGING_BUCKET = STAGING_BUCKET_CMD_OUTPUT[0]
STAGING_BUCKET

## Check the Enterprise Terra reference to the Dataproc temp bucket

<div class="alert alert-block alert-info">
<b>Note:</b> This notebook assumes that <a href="https://github.com/DataBiosphere/terra-axon-examples/blob/main/workspace_setup.ipynb">workspace_setup.ipynb</a>, which is in the parent directory of this notebook, has been run.  
</div>


In [None]:
!terra resource resolve --name ws_files_autodelete_after_two_weeks || echo Be sure to run \
  workspace_setup.ipynb first before this notebook.

If the command above gave an error, run `workspace_setup.ipynb` before continuing.  It creates two Cloud Storage buckets for your workspace files with workspace reference names: 

 - ws_files   
 - ws_files_autodelete_after_two_weeks      
    
This notebook uses the "autodelete" bucket as the Dataproc temp bucket. Any file in this bucket will be automatically deleted two weeks after it is written. This alleviates the need for you to remember to clean up temporary and example files manually.

Resolve the reference to the temp bucket so that we can use it in subsequent commands.

In [None]:
TEMP_BUCKET_CMD_OUTPUT = !terra resolve --name=ws_files_autodelete_after_two_weeks
TEMP_BUCKET = TEMP_BUCKET_CMD_OUTPUT[0]
TEMP_BUCKET

## Define an autoscaling policy

Configure Dataproc [autoscaling](https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/autoscaling) to automatically and dynamically scale the number of worker VMs in Dataproc clusters to meet workload demands.

People will likely have many different autoscaling policies, since some jobs will run best with different numbers of primary workers that will not be preempted.

In [None]:
%%writefile two_worker_autoscaling_policy.yaml

workerConfig:
  # Best practice: keep min and max values identical for primary workers
  # https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/autoscaling#avoid_scaling_primary_workers
  minInstances: 2
  maxInstances: 2
secondaryWorkerConfig:
  maxInstances: 50
basicAlgorithm:
  cooldownPeriod: 4m
  yarnConfig:
    scaleUpFactor: 0.05
    scaleDownFactor: 1.0
    gracefulDecommissionTimeout: 1h

Import the autoscaling policy, if it does not yet exist.

In [None]:
!gcloud dataproc autoscaling-policies describe two_worker_autoscaling_policy --region=us-central1 || \
    gcloud dataproc autoscaling-policies import two_worker_autoscaling_policy \
        --source=two_worker_autoscaling_policy.yaml \
        --region=us-central1

## Define the Hail initialization script

This script is based on the logic in [hailctl](https://hail.is/docs/0.2/cloud/google_cloud.html#hailctl-dataproc) version 0.2.108.

In [None]:
%%writefile init_tvc_hail_cluster.py
#!/opt/conda/default/bin/python3

# This script is based on gs://hail-common/hailctl/dataproc/0.2.108/init_notebook.py with all 
# JupyterLab configuration removed.

import json
import os
import subprocess as sp
import sys
import errno
from subprocess import check_output

assert sys.version_info > (3, 0), sys.version_info


def safe_call(*args, **kwargs):
    try:
        sp.check_output(args, stderr=sp.STDOUT, **kwargs)
    except sp.CalledProcessError as e:
        print(e.output.decode())
        raise e


def get_metadata(key):
    return check_output(['/usr/share/google/get_metadata_value', 'attributes/{}'.format(key)]).decode()


def mkdir_if_not_exists(path):
    try:
        os.makedirs(path)
    except OSError as e:
        if e.errno != errno.EEXIST:
            raise


# get role of machine (master or worker)
role = get_metadata('dataproc-role')

if role == 'Master':
    # Additional packages to install.
    # None of these packages should be related to Jupyter or JupyterLab because
    # Dataproc Component Gateway takes care of properly setting up those services.
    pip_pkgs = [
        'setuptools',
        'mkl<2020',
        'lxml<5',
        'https://github.com/hail-is/jgscm/archive/v0.1.12+hail.zip',
    ]

    # add user-requested packages
    try:
        user_pkgs = get_metadata('PKGS')
    except Exception:
        pass
    else:
        pip_pkgs.extend(user_pkgs.split('|'))

    print('pip packages are {}'.format(pip_pkgs))
    command = ['pip', 'install']
    command.extend(pip_pkgs)
    safe_call(*command)

    print('getting metadata')

    wheel_path = get_metadata('WHEEL')
    wheel_name = wheel_path.split('/')[-1]

    print('copying wheel')
    safe_call('gsutil', 'cp', wheel_path, f'/home/hail/{wheel_name}')

    safe_call('pip', 'install', '--no-dependencies', f'/home/hail/{wheel_name}')

    print('setting environment')

    spark_lib_base = '/usr/lib/spark/python/lib/'
    files_to_add = [os.path.join(spark_lib_base, x) for x in os.listdir(spark_lib_base) if x.endswith('.zip')]

    env_to_set = {
        'PYTHONHASHSEED': '0',
        'PYTHONPATH': ':'.join(files_to_add),
        'SPARK_HOME': '/usr/lib/spark/',
        'PYSPARK_PYTHON': '/opt/conda/default/bin/python',
        'PYSPARK_DRIVER_PYTHON': '/opt/conda/default/bin/python',
        'HAIL_LOG_DIR': '/home/hail',
        'HAIL_DATAPROC': '1',
    }

    # VEP ENV
    try:
        vep_config_uri = get_metadata('VEP_CONFIG_URI')
    except Exception:
        pass
    else:
        env_to_set["VEP_CONFIG_URI"] = vep_config_uri

    print('setting environment')

    for e, value in env_to_set.items():
        safe_call('/bin/sh', '-c',
                  'set -ex; echo "export {}={}" | tee -a /etc/environment /usr/lib/spark/conf/spark-env.sh'.format(e,
                                                                                                                   value))

    hail_jar = sp.check_output([
        '/bin/sh', '-c',
        'set -ex; python3 -m pip show hail | grep Location | sed "s/Location: //"'
    ]).decode('ascii').strip() + '/hail/backend/hail-all-spark.jar'

    conf_to_set = [
        'spark.executorEnv.PYTHONHASHSEED=0',
        'spark.app.name=Hail',
        # the below are necessary to make 'submit' work
        'spark.jars={}'.format(hail_jar),
        'spark.driver.extraClassPath={}'.format(hail_jar),
        'spark.executor.extraClassPath=./hail-all-spark.jar',
    ]

    print('setting spark-defaults.conf')

    with open('/etc/spark/conf/spark-defaults.conf', 'a') as out:
        out.write('\n')
        for c in conf_to_set:
            out.write(c)
            out.write('\n')

In [None]:
!gsutil cp init_tvc_hail_cluster.py {STAGING_BUCKET}/hail/dataproc/0.2.108/

## Prepare and copy the example notebook to the staging bucket

These steps will not be necessary in future.

Edit the output bucket in the example notebook (temporary work-around):

In [None]:
# TEMPORARY: edit the output bucket in the notebook that will be copied to the cluster. 
# Do this since the Dataproc cluster does not currently have the terra cli installed
# to resolve references. This will not be necessary in future.
!cp annotate_significant_gwas_results_with_gnomad.ipynb annotate_significant_gwas_results_with_gnomad_ORIG.ipynb

with open('annotate_significant_gwas_results_with_gnomad_ORIG.ipynb', 'rt') as fin:
    with open('annotate_significant_gwas_results_with_gnomad.ipynb', 'wt') as fout:
        for line in fin:
            fout.write(line.replace("os.getenv('WORKSPACE_BUCKET')",
                                    f"'{STAGING_BUCKET}'"))

Copy the notebook to the staging bucket (temporary work-around):

In [None]:
# TEMPORARY: copy the notebook from the git clone. Do this since the Dataproc cluster does 
# not currently clone the git repos referenced in this workspace.
!gsutil cp annotate_significant_gwas_results_with_gnomad.ipynb {STAGING_BUCKET}/notebooks/jupyter/

# Create a Dataproc cluster where we will run Hail

The values within `--properties` and `--metadata` in the command line below are based on what is generated by https://hail.is/docs/0.2/cloud/google_cloud.html#hailctl-dataproc. But two extra packages were added in support of the gnomAD notebook: `plotnine` and `openpyxl`.

In [None]:
HAIL_CLUSTER_NAME = '-'.join(['hail', USER, datetime.now().strftime('%Y%m%d')])

HAIL_CLUSTER_NAME

Run the command to create the cluster.  If you like, you can modify many of these properties.
See [this page](https://cloud.google.com/sdk/gcloud/reference/dataproc/clusters/create) for more detail on the available options.

However, for Enterprise Terra, it is required to keep the `service-account`, `subnet`, `tags`, and `enable-component-gateway` parameters as is, and retain `JUPYTER` in the `optional-components`. 

Additionally, the hail-related values set by the `properties` and `metadata` strings (which, as noted above, are the same as those used by the `hailctl` setup) may not be compatible with changes to some of the other config.

In [None]:
!gcloud dataproc clusters create {HAIL_CLUSTER_NAME} \
    --region us-central1 \
    --image-version=2.0.44-debian10 \
    --properties='^|||^spark:spark.task.maxFailures=20|||spark:spark.driver.extraJavaOptions=-Xss4M|||spark:spark.executor.extraJavaOptions=-Xss4M|||spark:spark.speculation=true|||hdfs:dfs.replication=1|||dataproc:dataproc.logging.stackdriver.enable=false|||dataproc:dataproc.monitoring.stackdriver.enable=false|||spark:spark.driver.memory=12g|||yarn:yarn.nodemanager.resource.memory-mb=14592|||yarn:yarn.scheduler.maximum-allocation-mb=14592|||spark:spark.executor.cores=4|||spark:spark.executor.memory=5837m|||spark:spark.executor.memoryOverhead=8755m|||spark:spark.memory.storageFraction=0.2|||spark:spark.executorEnv.HAIL_WORKER_OFF_HEAP_MEMORY_PER_CORE_MB=3648' \
    --initialization-actions={STAGING_BUCKET}/hail/dataproc/0.2.108/init_tvc_hail_cluster.py \
    --metadata='^|||^WHEEL=gs://hail-common/hailctl/dataproc/0.2.105/hail-0.2.105-py3-none-any.whl|||PKGS=openpyxl==3.0.10|plotnine==0.10.1|aiohttp>=3.8.1,<4|aiohttp_session>=2.7,<2.8|asyncinit>=0.2.4,<0.3|avro>=1.10,<1.12|azure-identity>=1.6.0,<2|azure-storage-blob>=12.11.0,<13|bokeh>1.3,<2.0|boto3>=1.17,<2.0|botocore>=1.20,<2.0|decorator<5|Deprecated>=1.2.10,<1.3|dill>=0.3.1.1,<0.4|google-auth>=1.27.0,<2|frozenlist>=1.3.1,<2|google-cloud-storage==1.25.*|humanize>=1.0.0,<2|hurry.filesize>=0.9,<1|janus>=0.6,<1.1|Jinja2==3.0.3|nest_asyncio>=1.5.4,<2|numpy<2|orjson>=3.6.4,<4|pandas>=1.3.0,<1.5.0|parsimonious<0.9|plotly>=5.5.0,<5.11|protobuf==3.20.2|PyJWT|rich==12.6.0|python-json-logger>=2.0.2,<3|requests>=2.25.1,<3|scipy>1.2,<1.10|sortedcontainers>=2.4.0,<3|tabulate>=0.8.9,<1|uvloop>=0.16.0,<1' \
    --master-machine-type n1-standard-4 \
    --master-boot-disk-size 100 \
    --autoscaling-policy two_worker_autoscaling_policy \
    --num-workers 2 \
    --worker-machine-type n1-standard-4 \
    --worker-boot-disk-size 100 \
    --num-secondary-workers 0 \
    --secondary-worker-machine-type n1-standard-4 \
    --secondary-worker-boot-disk-size 100 \
    --enable-component-gateway \
    --optional-components JUPYTER \
    --service-account {SERVICE_ACCOUNT} \
    --subnet projects/{PROJECT}/regions/us-central1/subnetworks/subnetwork \
    --tags leonardo \
    --bucket {STAGING_BUCKET[len('gs://'):]} \
    --temp-bucket {TEMP_BUCKET[len('gs://'):]} \
    --max-idle 30m

The cluster will take a few minutes to start up.

## Access JupterLab and the debugging consoles

You can use the URL printed by the next cell to access JupyterLab running on the cluster. See also the URLs to the debuging consoles such as the Spark Console.

Or if you would like to use the Cloud Console to obtain these URLs:
* Go to the Cloud Console -> Dataproc -> Clusters
* Select the cluster on which you want to run the notebook
* Click on tab 'WEB INTERFACES'
* Click on 'JupyterLab'

Lastly, CPU utilization, memory utlization, and other performance metrics for the cluster are available on the Cloud Console. Click on the cluster name to see the plots of these metrics.

In [None]:
!gcloud dataproc clusters describe {HAIL_CLUSTER_NAME} --region=us-central1 \
  --format="yaml(config.endpointConfig.httpPorts)"

# Use Hail on the cluster

## Submit a script to Hail to run

<div class="alert alert-block alert-info">
<b>Note:</b> This section uses an Enterprise Terra notebook to run a Hail batch job, but Enterprise Terra users can also do this from the terminal or using the Cloud Console UI.
</div>

In [None]:
!jupyter nbconvert --to script annotate_significant_gwas_results_with_gnomad.ipynb

In [None]:
!gcloud dataproc jobs submit pyspark --cluster {HAIL_CLUSTER_NAME} --region us-central1 \
    annotate_significant_gwas_results_with_gnomad.py

Once the job is running (or after it has finished), you can view the [cluster dashboard](https://console.cloud.google.com/dataproc) (click in to view detail for each cluster) and the [job info](https://console.cloud.google.com/dataproc/jobs), including job logs, in the Google Cloud console. 

## Use Hail interactively on the cluster's JupyterLab server

In the output of the section "Access JupyterLab and the debugging consoles" above, click the **JupyterLab** link. 
You can also find this link under the **WEB INTERFACES** tab when you click in to the details for your cluster in the [Cloud Console](https://console.cloud.google.com/dataproc).

Open the `annotate_significant_gwas_results_with_gnomad.ipynb` notebook, which is available on the cluster's JupyterLab server because we copied it to `{STAGING_BUCKET}/notebooks/jupyter/`. 

In the notebook, you may want to try setting the `INTERVALS_TO_EXAMINE` constant to `['chr1-chr22']`, to run at scale.  This should cause the cluster's *autoscaling* to kick in.


# Stop or delete your cluster when you are finished

You can use the next cell to stop the cluster; or your cluster will automatically be deleted after `max-idle` (default 30) minutes of inactivity.

<div class="alert alert-block alert-info">
<b>Note:</b> If autoscaling has been initiated, it may not be possible to `STOP` the cluster, only to `DELETE` it.<br/>   
    Even if your cluster has been stopped, it will still delete itself after `max-idle` minutes of inactivity.
</div>

Alternately, if you would like to use the Cloud Console to stop or delete the cluster:
* Go to the Cloud Console -> Dataproc -> Clusters
* Select the cluster on which you want to stop or delete
* Click on 'Stop' or 'Delete'

In [None]:
# Uncomment this command to STOP your cluster
# !gcloud dataproc clusters stop {HAIL_CLUSTER_NAME} --region=us-central1

In [None]:
# UNCOMMENT this command if you want to also delete the cluster.

#!gcloud dataproc clusters delete {HAIL_CLUSTER_NAME} --region=us-central1 --quiet

# Provenance

Generate information about this notebook environment and the packages installed.

In [None]:
!date

Conda and pip installed packages:

In [None]:
!conda env export

JupyterLab extensions:

In [None]:
!jupyter labextension list

Number of cores:

In [None]:
!grep ^processor /proc/cpuinfo | wc -l

Memory:

In [None]:
!grep "^MemTotal:" /proc/meminfo

---
Copyright 2023 Verily Life Sciences LLC

Use of this source code is governed by a BSD-style   
license that can be found in the LICENSE file or at   
https://developers.google.com/open-source/licenses/bsd