# Use the `metadata-api` library to prepare inputs for pipelines in Green Box

## 1. SmartSeq2 Pipeline

1. It requires the `sample_id` and `2 fastq files` to run the SmartSeq2 pipeline:
    - The `sample_id` should be a valid uuid string. 
    - In order to let Cromwell localize the input files during the runtime, the WDL accepts the   URL(string) to the files on a GCS bucket.

2. Also **note** that although the `metadata-api` provides a handy multi-threading `dss_client` to download the files fast and peacefully, Green Box cannot transit to use that at this time, beacuse during the communication with Blue Box, there are certain steps that are logged and recorded by Green Box, which will be used for some internal retry logic. In the long-term, Green Box will migrate to use the DSS Python API `dss_client`, but for now, Green Box will keep using the functionalities in `pipeline-tools` to talk to DSS API directly. 

### 1.1 Prepare testing data

In [1]:
test_ss2_fastq1_suffix = '.c7bbee4c46bbf29432862e05830c8f39.4ef74578'
test_ss2_fastq2_suffix = '.a3a9f23d07cfc5e40a4c3a8adf3903ae.69987b3e'
test_ss2_sample_id = 'f89a7a2e-a789-495c-bf37-11e82757cc82'

### 1.2 Import libraries

In [2]:
from humancellatlas.data.metadata.helpers.dss import download_bundle_metadata, dss_client
from humancellatlas.data.metadata.api import Bundle
from humancellatlas.data.metadata.helpers.json import as_json

from concurrent.futures import ThreadPoolExecutor
import functools
import logging
import json

from pipeline_tools import input_utils, dcp_utils
from pipeline_tools.http_requests import HttpRequests

### 1.3 Define bundle data

In [3]:
ss2_bundle_id, ss2_bundle_version = '81fdd652-0820-447e-b171-c05ed6132216', '2018-08-08T120431.041734Z'

dss_staging_url = 'https://dss.staging.data.humancellatlas.org/v1'  # this won't be required after migrating to use the dss_client

num_workers = None

### 1.4 Prepare inputs

To asseble a `Bundle` object, `metadata-api` requires you to provide the following parameters to the constructor:
- `uuid: str`
- `version: str`
- `manifest: List[JSON]`
- `metadata_files: Mapping[str, JSON]`

#### 1.4.1 Prepare `manifest`

In [4]:
raw_manifest = dcp_utils.get_manifest(
    bundle_uuid=ss2_bundle_id,
    bundle_version=ss2_bundle_version,
    dss_url=dss_staging_url,
    http_requests=HttpRequests(write_dummy_files=False)
)  # this won't be required after migrating to use the dss_client

manifest = raw_manifest['bundle']['files']  # this won't be required after migrating to use the dss_client

#### 1.4.2 Prepare `metadata_files`

In [5]:
def download_file(item, dss_url, http_requests=HttpRequests(write_dummy_files=False)):
    """
    This function borrows a lot of existing code from the `metadata-api` code base for consistency,
    and this won't be required after migrating to use the dss_client.
    """
    file_name, manifest_entry = item
    file_uuid = manifest_entry['uuid']
    file_version = manifest_entry['version']
    logging.debug("Getting file '%s' (%s.%s) from DSS.", file_name, file_uuid, file_version)
    return file_name, dcp_utils.get_file_by_uuid(file_id=file_uuid, dss_url=dss_url, http_requests=http_requests)

raw_metadata_files = {f["name"]: f for f in manifest if f["indexed"]}

if num_workers == 0:
    metadata_files = dict(
        map(
            functools.partial(download_file, dss_url=dss_staging_url, http_requests=HttpRequests(write_dummy_files=False)), 
            raw_metadata_files.items()
        )
    )
else:
    with ThreadPoolExecutor(num_workers) as tpe:
        metadata_files = dict(
        tpe.map(
            functools.partial(download_file, dss_url=dss_staging_url, http_requests=HttpRequests(write_dummy_files=False)), 
            raw_metadata_files.items()
        )
    )

#### 1.4.3 Assemble the `Bundle` object

In [6]:
ss2_primary_bundle = Bundle(uuid=ss2_bundle_id,
                version=ss2_bundle_version,
                manifest=manifest,
                metadata_files=metadata_files)

#### 1.4.4 Get pipeline inputs by using the assembled `Bundle` object

**Note** A sequence file should look like:
```python
SequenceFile(
    document_id=UUID('1db5c87a-7577-4feb-8d5f-ff7dac3aaccf'),
    file_format='fastq.gz',
    to_processes={}, 
    manifest_entry=ManifestEntry(
        content_type='application/gzip; dcp-type=data', 
        crc32c='string', 
        indexed=False, 
        name='R1.fastq.gz', 
        s3_etag='string', 
        sha1='string', 
        sha256='string', 
        size=int, 
        url='gs://THE_TARGET_URL_WE_WANT_HERE', 
        uuid=UUID('1db5c87a-7577-4feb-8d5f-ff7dac3aaccf'), 
        version='2018-08-08T120430.594837Z'
    ), 
    read_index='read1', 
    lane_index=1
)
```

In [8]:
from humancellatlas.data.metadata import SequenceFile

ss2_sequencing_output = ss2_primary_bundle.sequencing_output

ss2_fastq_names = [b.manifest_entry.name for b in ss2_sequencing_output]

ss2_fastq_1_url = [file for file in ss2_sequencing_output if file.read_index == 'read1'][0].manifest_entry.url

ss2_fastq_2_url = [file for file in ss2_sequencing_output if file.read_index == 'read2'][0].manifest_entry.url

ss2_sample_id = str(ss2_primary_bundle.sequencing_input[0].document_id)

print(f' => ss2_fastq_names: {ss2_fastq_names}')
print(f' => ss2_fastq_1_url not None: {not ss2_fastq_1_url is None}')
print(f' => ss2_fastq_2_url not None: {not ss2_fastq_2_url is None}')
print(f' => ss2_sample_id: {ss2_sample_id}')

 => ss2_fastq_names: ['R1.fastq.gz', 'R2.fastq.gz']
 => ss2_fastq_1_url not None: True
 => ss2_fastq_2_url not None: True
 => ss2_sample_id: f89a7a2e-a789-495c-bf37-11e82757cc82


### 1.5 Validation

In [9]:
assert ss2_fastq_1_url.endswith(test_ss2_fastq1_suffix)
assert ss2_fastq_2_url.endswith(test_ss2_fastq2_suffix)
assert ss2_sample_id == test_ss2_sample_id

print('Validation succeeded!!')

Validation succeeded!!


## 2. Optimus 10x Pipeline

_Coming Soon..._