# Overview

This notebook is for creating new a [single-sample mode](https://broadinstitute.github.io/gatk-sv/docs/gs/calling_modes#single-sample-mode) reference panel.

***Preqrequisites***
- GATK-SV run completed through AnnotateVcf using the joint calling workspace

Outputs from the workspace data table are mapped to the corresponding resources in the GATK-SV [input build framework](https://broadinstitute.github.io/gatk-sv/docs/advanced/build_inputs). The notebook generates a json file containing all resources required for building configuration files for the single-sample Terra workspace, as well as workflow test inputs for development.

Users should need to modify the Inputs section. A subset of files are generated and uploaded to a user-specified GCS bucket automatically. In addition, a few input files must be created manually, such as the population assignments table. However, these files are not required for single-sample calling and are only used for testing.

Please refer to the [Advanced guide on building reference panels](https://broadinstitute.github.io/gatk-sv/docs/advanced/build_ref_panel) for further instructions. 

# Imports and definitions

In [None]:
#####################
###### Imports ######
#####################

import argparse
import io
import json
import os
import sys
import subprocess
import zipfile

from google.cloud import storage
import firecloud.api as fapi
import pandas as pd

############################
###### Attribute maps ######
############################

# Map from sample table columns to the resource identifiers
SAMPLE_KEYS_MAP = {
    "entity:sample_id": "samples",
    "bam_or_cram_file": "bam_or_cram_files",
    "coverage_counts": "counts",
    "manta_vcf": "manta_vcfs",
    "manta_index" : "manta_vcfs_index",
    "pesr_disc": "PE_files",
    "pesr_disc_index": "PE_files_index",
    "pesr_sd": "SD_files",
    "pesr_sd_index": "SD_files_index",
    "pesr_split": "SR_files",
    "pesr_split_index": "SR_files_index",
    "scramble_vcf": "scramble_vcfs",
    "scramble_index": "scramble_vcfs_index",
    "wham_vcf": "wham_vcfs",
    "wham_index": "wham_vcfs_index"
}

# Maps arrays to special "example" inputs used for workflows that run on one sample at a time
SAMPLE_EXAMPLE_INDEX = 0  # Index of the sample to use
SAMPLE_EXAMPLE_MAP = {
    "pesr_disc": "PE_example_file",
    "pesr_split": "SR_example_file",
    "pesr_sd": "SD_example_file",
    "bam_or_cram_file": "bam_or_cram_example",
    "entity:sample_id": "sample_example"
}

# Map from sample set table columns to the resource identifiers
SAMPLE_SET_KEYS_MAP = {
    "clustered_depth_vcf": "merged_depth_vcf",
    "clustered_depth_vcf_index": "merged_depth_vcf_index",
    "clustered_manta_vcf": "merged_manta_vcf",
    "clustered_manta_vcf_index": "merged_manta_vcf_index",
    "clustered_scramble_vcf": "merged_scramble_vcf",
    "clustered_scramble_vcf_index": "merged_scramble_vcf_index",
    "clustered_wham_vcf": "merged_wham_vcf",
    "clustered_wham_vcf_index": "merged_wham_vcf_index",
    "contig_ploidy_model_tar": "contig_ploidy_model_tar",
    "cutoffs": "cutoffs",
    "filtered_batch_samples_file": "final_sample_list",
    "gcnv_model_tars": "gcnv_model_tars",
    "genotyped_depth_vcf": "genotyped_depth_vcf",
    "genotyped_depth_vcf_index": "genotyped_depth_vcf_index",
    "genotyped_pesr_vcf": "genotyped_pesr_vcf",
    "genotyped_pesr_vcf_index": "genotyped_pesr_vcf_index",
    "median_cov": "medianfile",
    "merged_BAF": "merged_baf_file",
    "merged_BAF_index": "merged_baf_file_index",
    "merged_PE": "merged_disc_file",
    "merged_PE_index": "merged_disc_file_index",
    "merged_SR": "merged_split_file",
    "merged_SR_index": "merged_split_file_index",
    "merged_bincov": "merged_coverage_file",
    "merged_bincov_index": "merged_coverage_file_index",
    "merged_dels": "del_bed",
    "merged_dups": "dup_bed",
    "metrics": "evidence_metrics",
    "metrics_common": "evidence_metrics_common",
    "outlier_filtered_depth_vcf": "filtered_depth_vcf",
    "outlier_filtered_depth_vcf_index": "filtered_depth_vcf_index",
    "outlier_filtered_pesr_vcf": "filtered_pesr_vcf",
    "outlier_filtered_pesr_vcf_index": "filtered_pesr_vcf_index",
    "regeno_coverage_medians": "regeno_coverage_medians",
    "sites_filtered_depth_vcf": "sites_filtered_depth_vcf",
    "sites_filtered_manta_vcf": "sites_filtered_manta_vcf",
    "sites_filtered_scramble_vcf": "sites_filtered_scramble_vcf",
    "sites_filtered_wham_vcf": "sites_filtered_wham_vcf",
    "sr_background_fail": "raw_sr_background_fail_file",
    "sr_bothside_pass": "raw_sr_bothside_pass_file",
    "std_manta_vcf_tar": "std_manta_vcf_tar",
    "std_scramble_vcf_tar": "std_scramble_vcf_tar",
    "std_wham_vcf_tar": "std_wham_vcf_tar",
    "trained_PE_metrics": "PE_metrics",
    "trained_SR_metrics": "SR_metrics",
    "trained_genotype_depth_depth_sepcutoff": "genotype_depth_depth_sepcutoff",
    "trained_genotype_depth_pesr_sepcutoff": "genotype_depth_pesr_sepcutoff",
    "trained_genotype_pesr_depth_sepcutoff": "genotype_pesr_depth_sepcutoff",
    "trained_genotype_pesr_pesr_sepcutoff": "genotype_pesr_pesr_sepcutoff"
}


# Map from sample set settable columns to the resource identifiers
SAMPLE_SET_SET_KEYS_MAP = {
    "annotated_vcf": "annotated_vcf",
    "annotated_vcf_index": "annotated_vcf_index",
    "breakpoint_overlap_dropped_record_vcfs": "breakpoint_overlap_dropped_record_vcfs",
    "breakpoint_overlap_dropped_record_vcf_indexes": "breakpoint_overlap_dropped_record_vcf_indexes",
    "cleaned_vcf": "clean_vcf",
    "cleaned_vcf_index": "clean_vcf_index",
    "cluster_background_fail_lists": "cluster_background_fail_lists",
    "cluster_bothside_pass_lists": "cluster_bothside_pass_lists",
    "combined_vcfs": "combined_vcfs",
    "combined_vcf_indexes": "combined_vcf_indexes",
    "complex_genotype_vcfs": "complex_genotype_vcfs",
    "complex_genotype_vcf_indexes": "complex_genotype_vcf_indexes",
    "complex_resolve_background_fail_list": "complex_resolve_background_fail_list",
    "complex_resolve_bothside_pass_list": "complex_resolve_bothside_pass_list",
    "complex_resolve_vcfs": "complex_resolve_vcfs",
    "complex_resolve_vcf_indexes": "complex_resolve_vcf_indexes",
    "concordance_vcf": "concordance_vcf",
    "concordance_vcf_index": "concordance_vcf_index",
    "cpx_evidences": "cpx_evidences",
    "cpx_refined_vcf": "complex_refined_vcf",
    "cpx_refined_vcf_index": "complex_refined_vcf_index",
    "filtered_vcf": "genotype_filtered_vcf",
    "filtered_vcf_index": "genotype_filtered_vcf_index",
    "joined_raw_calls_vcf": "joined_raw_calls_vcf",
    "joined_raw_calls_vcf_index": "joined_raw_calls_vcf_index",
    "number_regenotyped_file": "number_regenotyped_file",
    "number_regenotyped_filtered_file": "number_regenotyped_filtered_file",
    "ploidy_table": "ploidy_table",
    "regenotyped_depth_vcfs": "regenotyped_depth_vcf",
    "regenotyped_depth_vcf_indexes": "regenotyped_depth_vcf_index",
    "main_vcf_qc_tarball": "genotype_filtered_qc_tarball",
    "unfiltered_recalibrated_vcf": "gq_filtered_vcf",
    "unfiltered_recalibrated_vcf_index": "gq_filtered_vcf_index"
}

# Map from workspace attributes to resource identifiers
WORKSPACE_DATA_KEY_MAP = {
    "cohort_ped_file": "ped_file",
    "cohort_depth_vcf": "cohort_depth_vcf",
    "cohort_pesr_vcf": "cohort_pesr_vcf"
}

# Map from identifiers of list-valued resources to corresponding file list identifiers
# For example, an array of sample IDs "samples" to a file containing a list of those IDs "samples_list"
# All of these resources consist of a list of strings
ARRAY_TO_LISTS_KEY_MAP = {
    "samples": "samples_list",
    "gcnv_model_tars": "gcnv_model_tars_list",
    "PE_files": "PE_files_list",
    "SR_files": "SR_files_list",
    "SD_files": "SD_files_list"
}


# Inputs

To be modified by the user.

In [None]:
# Sample set ID from workspace data table
SAMPLE_SET_ID = "all_samples"

# Sample set set ID from workspace data table
SAMPLE_SET_SET_ID = "all_batches"

# Name for output files and the desired name of the reference panel
REF_PANEL_NAME = "test_panel"

# Bucket to upload new file lists to
FILE_LISTS_BUCKET = "gatk-sv-ref-panel-1kg-v1-0-1"

# Path within the bucket to place new file lists
# i.e. files will be uploaded to "gs://<FILE_LISTS_BUCKET>/<FILE_LISTS_DESTINATION_PATH>/"
FILE_LISTS_DESTINATION_PATH = "lists"

# These inputs must be manually generated. Defaults are populated with example data for the 1KGP reference panel.
# These files are only required for generating workflow test input files.
# This map can be left empty if you are only using this cohort as a single-sample mode reference panel.
# `clean_vcf_gatk_formatter_args` is for legacy files and usually can be left blank.
MANUAL_DATA_MAP = {
    "sample_pop_assignments": "gs://gatk-sv-resources-public/hg38/v0/sv-resources/ref-panel/1KG/v2/populations.ref_panel_1kg.tsv",
    "outlier_cutoff_table": "gs://gatk-sv-resources-public/hg38/v0/sv-resources/ref-panel/1KG/v1/module03_outlier_cutoff_table.tsv",
    "qc_definitions": "gs://gatk-sv-ref-panel-1kg/outputs/GATKSVPipelineBatch/38c65ca4-2a07-4805-86b6-214696075fef/ref_panel_1kg.qc_definitions.tsv",
    "clean_vcf_gatk_formatter_args": ""
}

# Load data

In [None]:
# Loads sample set or sample set set data from firecloud API
# Returns pandas data table
def load_zipped_data(api_name, sub_entity_name, row_id, keys_map):
    response = fapi.get_entities_tsv(NAMESPACE, WORKSPACE, api_name, model="flexible")
    with open('set.zip', 'wb') as f:
        f.write(response.content)
    with zipfile.ZipFile('set.zip', 'r') as zip_ref:
        # Extract sample set data
        with zip_ref.open(f"{api_name}_entity.tsv") as file:
            tsv_file = io.StringIO(file.read().decode('utf-8'))
            set_tbl = pd.read_csv(tsv_file, sep='\t', converters={col: lambda x: x.strip("[]").replace("\"","").split(",") if isinstance(x, str) and "[" in x else x for col in keys_map})
            set_tbl = set_tbl.reset_index(drop=True)

        # Extract membership data
        with zip_ref.open(f"{api_name}_membership.tsv") as membership_file:
            membership_tsv = io.StringIO(membership_file.read().decode('utf-8'))
            membership_df = pd.read_csv(membership_tsv, sep='\t')

        # Add list of samples (sets) to corresponding sample set (set)
        sample_groups = membership_df.groupby(f"membership:{api_name}_id")[sub_entity_name].unique().apply(list)
        set_tbl['samples'] = set_tbl[f"entity:{api_name}_id"].map(sample_groups)
        set_tbl = set_tbl[set_tbl[f"entity:{api_name}_id"] == row_id].set_index(f"entity:{api_name}_id")
        if set_tbl.shape[0] == 0:
            raise ValueError(f"Table row {row_id} not found")
        return set_tbl


# Workspace constants
TLD_PATH = 'create_reference_panel'
PROJECT = os.environ['GOOGLE_PROJECT']
WORKSPACE = os.environ['WORKSPACE_NAME']
WS_BUCKET = os.environ['WORKSPACE_BUCKET']
NAMESPACE = os.environ['WORKSPACE_NAMESPACE']

# Load workspace attributes
response_workspace =  fapi.get_workspace(NAMESPACE, WORKSPACE, fields=None)
workspace_attr = json.loads(response_workspace.content.decode('utf-8'))['workspace']['attributes']

# Load samples
sample_response = fapi.get_entities_tsv(
    NAMESPACE, WORKSPACE, "sample", model="flexible"
)
samples_tsv = 'samples.tsv'
with open('samples.tsv', 'w') as f:
    f.write(sample_response.content.decode('utf-8'))
sample_tbl = pd.read_csv(samples_tsv, sep='\t')
sample_tbl = sample_tbl.set_index('entity:sample_id')

# Load sample set
sample_set_tbl = load_zipped_data(api_name='sample_set', sub_entity_name='sample', row_id=SAMPLE_SET_ID, keys_map=SAMPLE_SET_KEYS_MAP)

# Load sample set set
sample_set_set_tbl = load_zipped_data(api_name='sample_set_set', sub_entity_name='sample', row_id=SAMPLE_SET_SET_ID, keys_map=SAMPLE_SET_SET_KEYS_MAP)


In [None]:
# Check that tables loaded correctly
display(sample_tbl)  # Should contain 1 row per sample
display(sample_set_tbl)  # Should contain only 1 row
display(sample_set_set_tbl)  # Should contain only 1 row

# Map attributes

In [None]:
def update_dict_with_table(table, output_map, keys_map, row_ids, unnest, name):
    for key, val in keys_map.items():
        if key not in table.columns and key != table.index.name:
            raise ValueError(f"Column {key} not defined in the {name} table")
        if key == table.index.name:
            col = table.index.to_series()
        else:
            col = table.loc[row_ids, key]
        if val in output_map:
            raise ValueError(f"Output key {val} already set")
        output_map[val] = col.values.tolist()
        if unnest:
            output_map[val] = output_map[val][0]


def update_dict_with_dict(key_map, output_map, name):
    for val in key_map.values():
        if val in output_map:
            raise ValueError(f"Output key:value {key}:{val} from {name} already in output")
    output_map.update(key_map)

    
# Translate tables to dictionary for output json
output = {"name": REF_PANEL_NAME}

# Sample data
update_dict_with_table(
    table=sample_tbl, 
    output_map=output, 
    keys_map=SAMPLE_KEYS_MAP, 
    row_ids=sorted(sample_tbl.index.values.tolist()), 
    unnest=False,
    name="samples"
)

# Example sample data
update_dict_with_table(
    table=sample_tbl, 
    output_map=output, 
    keys_map=SAMPLE_EXAMPLE_MAP, 
    row_ids=[sorted(sample_tbl.index.values)[SAMPLE_EXAMPLE_INDEX]], 
    unnest=True,
    name="sample example"
)

# Sample set data
update_dict_with_table(
    table=sample_set_tbl, 
    output_map=output, 
    keys_map=SAMPLE_SET_KEYS_MAP,
    row_ids=[SAMPLE_SET_ID], 
    unnest=True,
    name="sample sets"
)

# Sample set set data
update_dict_with_table(
    table=sample_set_set_tbl, 
    output_map=output, 
    keys_map=SAMPLE_SET_SET_KEYS_MAP, 
    row_ids=[SAMPLE_SET_SET_ID], 
    unnest=True,
    name="sample set sets"
)

# Manually updated files
update_dict_with_dict(
    key_map=MANUAL_DATA_MAP, 
    output_map=output,
    name="manually updated files"
)

# Workspace attributes
for key in WORKSPACE_DATA_KEY_MAP:
    if key not in workspace_attr:
        raise ValueError(f"Workspace attribute {key} was expected but not found")
update_dict_with_dict(
    key_map={WORKSPACE_DATA_KEY_MAP[key]: val for key, val in workspace_attr.items() if key in WORKSPACE_DATA_KEY_MAP}, 
    output_map=output,
    name="workspace attributes"
)


In [None]:
def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"
    # The path to your file to upload
    # source_file_name = "local/path/to/file"
    # The ID of your GCS object
    # destination_blob_name = "storage-object-name"
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)
    dest_uri = f"gs://{bucket_name}/{destination_blob_name}"
    print(f"File {source_file_name} uploaded to {dest_uri}")
    return dest_uri


# Write list files, copy them to bucket, and add them to the outputs dictionary
for key in ARRAY_TO_LISTS_KEY_MAP:
    if key not in output:
        raise ValueError(f"Expected key is unassigned in output: {key}")
    val_list = output[key]
    if not isinstance(val_list, list):
        raise ValueError(f"Expected {key} to be of type list but found {typeof(val_list)}")
    file_name = f"{REF_PANEL_NAME}.{key}.list"
    file_path = "./" + file_name
    with open(file_path, "w") as f:
        f.writelines(s + '\n' for s in val_list)
    dest_path = FILE_LISTS_DESTINATION_PATH if FILE_LISTS_DESTINATION_PATH.endswith("/") else FILE_LISTS_DESTINATION_PATH + "/"
    dest_path += file_name
    output[ARRAY_TO_LISTS_KEY_MAP[key]] = upload_blob(
        bucket_name=FILE_LISTS_BUCKET, 
        source_file_name=file_path, 
        destination_blob_name=dest_path
    )


# Outputs

In [None]:
# Write attributes to json
file_path = f"{REF_PANEL_NAME}.json"
with open(file_path, 'w') as f:
    f.write(json.dumps(output, sort_keys=True, indent=4))

# Print for copy/paste
print(json.dumps(output, sort_keys=True, indent=4))
