In [None]:
!pip3 install wds-client --upgrade
import os
import requests
import sys
import wds_client
import json
import pandas as pd
import io
import re

In [None]:
# Set authentication credentials
!az login --identity --allow-no-subscriptions
cli_token = !az account get-access-token | jq .accessToken
azure_token = cli_token[0].replace('"', '')

In [None]:
# Set workspace variables so the client can detect your data tables of interest.
current_workspaceId = os.environ['WORKSPACE_ID'];
version = "v0.2";
search_request = { "offset": 0, "limit": 10, "sort": "asc"};

In [None]:
def get_wds_url(workspaceId, env):
    """"Get url for wds."""
    
    uri = f"https://leonardo.dsde-{env}.broadinstitute.org/api/apps/v2/{workspaceId}?includeDeleted=false"
    
    headers = {"Authorization": "Bearer " + azure_token,
               "accept": "application/json"}
    
    response = requests.get(uri, headers=headers)
    status_code = response.status_code
    
    if status_code != 200:
        return response.text
    
    print(f"Successfully retrieved details.")

    return json.loads(response.text)

# the response will return all proxy urls for tprhe workspace
response = get_wds_url(current_workspaceId, "prod")

for entries in response: 
    if entries['appType'] == 'WDS' and entries['proxyUrls']['wds'] is not None:
        wds_url = entries['proxyUrls']['wds']
        break

if wds_url is None: 
    print("WDS is missing in current workspace")
else:
    print(wds_url)

In [None]:
api_client = wds_client.ApiClient(header_name='Authorization', header_value="Bearer " + azure_token)
api_client.configuration.host = wds_url

# set up all the clients
# records client is used to interact with Records in the data table
records_client = wds_client.RecordsApi(api_client)
# general WDS info allows to grab info like version and status of the WDS service running in your workspce
generalInfo_client  = wds_client.GeneralWDSInformationApi(api_client)
# schema provides more information about the schema of a data table
schema_client = wds_client.SchemaApi(api_client)
# instance allows to check how many instance of wds client are active in your workspace
instance_client = wds_client.InstancesApi(api_client)

In [None]:
# get workspace data specifics, such as what records exist
workspace_ent_type = schema_client.describe_all_record_types(current_workspaceId, version)
for t in workspace_ent_type:
    print ("name:", t.name ,"count:", t.count)

# get data specifics for a specific data set vs all
ent_types = schema_client.describe_record_type(current_workspaceId, version, 'sample')
print ("name:", ent_types.name ,"count:", ent_types.count)
    
# query a specific record
records = records_client.get_records_as_tsv(current_workspaceId, version, 'sample')
print(records)

In [None]:
#Read your data table record as a pandas dataframe and print the first few rows.
samples = pd.read_csv(records, sep='\t')

def get_sample_name(i):
    return samples['sample_id'].to_numpy()[i-1]

In [None]:
DOMAIN = 'dsde-prod.broadinstitute.org'
LEO_BASE_URL = f'https://leonardo.{DOMAIN}/api/apps/v2'
WORKSPACE_ID = os.environ['WORKSPACE_ID']

def get_token():
    """Get Azure access token"""
    #Get this string from devtools -> disks?role... -> Headers -> Authorization: Bearer
    return "fill_this_in"

def headers():
    return {
        "Authorization": f"Bearer {get_token()}",
        "Accept": "application/json"
    }

def get_cbas_url():
    """Get url for CBAS"""
    uri = f"{LEO_BASE_URL}/{WORKSPACE_ID}?includeDeleted=false"
    response = requests.get(uri, headers=headers())
    status_code = response.status_code
    if status_code != 200:
        return response.text
    response_json = response.json()
    for item in response_json:
        if item['appType'] == 'WORKFLOWS_APP':
            return item['proxyUrls']['cbas']
    return json.loads(response.text)['proxyUrls']['wds']

def get_post_data(start, end):
    return {
  "run_set_name": f"Reblocking_{start}-{end}",
  "run_set_description": "",
  "method_version_id": "d46cf9c7-fd9d-481a-8419-aaaa6d6b8b24",
  "workflow_input_definitions": [
    {
      "input_name": "ReblockGVCF.gvcf",
      "input_type": {
        "type": "primitive",
        "primitive_type": "File"
      },
      "source": {
        "type": "record_lookup",
        "record_attribute": "exome_gvcf_path"
      }
    },
    {
      "input_name": "ReblockGVCF.gvcf_index",
      "input_type": {
        "type": "primitive",
        "primitive_type": "File"
      },
      "source": {
        "type": "record_lookup",
        "record_attribute": "exome_gvcf_index_path"
      }
    },
    {
      "input_name": "ReblockGVCF.ref_dict",
      "input_type": {
        "type": "primitive",
        "primitive_type": "File"
      },
      "source": {
        "type": "record_lookup",
        "record_attribute": "ref_dict"
      }
    },
    {
      "input_name": "ReblockGVCF.ref_fasta",
      "input_type": {
        "type": "primitive",
        "primitive_type": "File"
      },
      "source": {
        "type": "record_lookup",
        "record_attribute": "ref_fasta"
      }
    },
    {
      "input_name": "ReblockGVCF.ref_fasta_index",
      "input_type": {
        "type": "primitive",
        "primitive_type": "File"
      },
      "source": {
        "type": "record_lookup",
        "record_attribute": "ref_fasta_index"
      }
    },
    {
      "input_name": "ReblockGVCF.annotations_to_remove_command",
      "input_type": {
        "type": "primitive",
        "primitive_type": "String"
      },
      "source": {
        "type": "record_lookup",
        "record_attribute": "annotations_to_remove_command"
      }
    },
    {
      "input_name": "ReblockGVCF.calling_interval_list",
      "input_type": {
        "type": "primitive",
        "primitive_type": "File"
      },
      "source": {
        "type": "record_lookup",
        "record_attribute": "calling_interval_list"
      }
    },
    {
      "input_name": "ReblockGVCF.gvcf_file_extension",
      "input_type": {
        "type": "primitive",
        "primitive_type": "String"
      },
      "source": {
        "type": "record_lookup",
        "record_attribute": "gvcf_fiile_extension"
      }
    },
    {
      "input_name": "ReblockGVCF.move_filters_to_genotypes",
      "input_type": {
        "type": "primitive",
        "primitive_type": "Boolean"
      },
      "source": {
        "type": "record_lookup",
        "record_attribute": "move_filters_to_genotypes"
      }
    }
  ],
  "workflow_output_definitions": [
    {
      "output_name": "ReblockGVCF.output_vcf",
      "output_type": {
        "type": "primitive",
        "primitive_type": "File"
      },
      "destination": {
        "type": "record_update",
        "record_attribute": "output_vcf"
      }
    },
    {
      "output_name": "ReblockGVCF.output_vcf_index",
      "output_type": {
        "type": "primitive",
        "primitive_type": "File"
      },
      "destination": {
        "type": "record_update",
        "record_attribute": "output_vcf_index"
      }
    }
  ],
  "wds_records": {
    "record_type": "sample",
    "record_ids": list(map(get_sample_name, range(start,end))),
  },
  "call_caching_enabled": True
}

def submit_to_cbas(cbas_url, body):
    uri = f"{cbas_url}/api/batch/v1/run_sets"
    response = requests.post(uri, json=body, headers=headers())
    status_code = response.status_code
    if status_code != 200:
        raise Exception(response.text)
    return response.json()
    

In [None]:
# index starts at 1
# 20685 total samples
batch = 100
start = 1
end = 20685

In [None]:
cbas_url = get_cbas_url()
i = start
while i <= end:
    print(f"Submitting batch {i}..{i+batch-1}")
    data = get_post_data(i, i+batch)
    submit_to_cbas(cbas_url, data)
    i = i + batch
print("Done")
    