## This notebook focus on combining the SV and SNP data to calcualte LD

In [1]:
%%configure -f
{
    "driverMemory": "30G",
    "conf":{"spark.driver.maxResultSize":"0"}
}

In [2]:
import hail as hl
hl.init(sc)

Starting Spark application


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

pip-installed Hail requires additional configuration options in Spark referring
  to the path to the Hail Python module directory HAIL_DIR,
  e.g. /path/to/python/site-packages/hail:
    spark.jars=HAIL_DIR/hail-all-spark.jar
    spark.driver.extraClassPath=HAIL_DIR/hail-all-spark.jar
    spark.executor.extraClassPath=./hail-all-spark.jarRunning on Apache Spark version 3.1.2-amzn-0
SparkUI available 
Welcome to
     __  __     <>__
    / /_/ /__  __/ /
   / __  / _ `/ / /
  /_/ /_/\_,_/_/_/   version 0.2.80-4ccfae1ff293
LOGGING: writing to

# merging of SNP and SV schema for ld_score computation

In [23]:
sg10k_health_mt_uri = "SG10K_Health_r5.3.0.n9770.genotypes.mt"
release14_hc_meta_mt_uri =   "SG10K-SV-Release-1.4-HighConfidenceSV-WithMetadata-correctrsid.mt"


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
# read the snv table
snv_mt = hl.read_matrix_table(sg10k_health_mt_uri)

# annotate svtype as SNP
snv_mt = snv_mt.annotate_rows(callers_info = hl.struct())
snv_mt = snv_mt.annotate_rows(
        callers_info = snv_mt.callers_info.annotate(
            SVTYPE = "SNP" ,
            varid = hl.delimit([snv_mt.locus.contig, hl.str(snv_mt.locus.position), snv_mt.alleles[0], snv_mt.alleles[1]], ':')
))

snv_mt = snv_mt.select_entries(snv_mt.GT)

snv_mt.describe()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

----------------------------------------
Global fields:
    None
----------------------------------------
Column fields:
    's': str
----------------------------------------
Row fields:
    'locus': locus<GRCh38>
    'alleles': array<str>
    'callers_info': struct {
        SVTYPE: str, 
        varid: str
    }
----------------------------------------
Entry fields:
    'GT': call
----------------------------------------
Column key: ['s']
Row key: ['locus', 'alleles']
----------------------------------------

In [26]:
sv_mt = hl.read_matrix_table(release14_hc_meta_mt_uri)
sv_mt = sv_mt.annotate_rows(callers_info = sv_mt.callers_info.annotate(
            varid = sv_mt.sv_id))
sv_mt = sv_mt.select_entries(sv_mt.GT).select_rows('callers_info').drop('metadata')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
sv_mt = sv_mt.annotate_rows(callers_info = sv_mt.callers_info.drop('manta', 'melt', 'survindel2', 'AC_CHINESE', 'AN_CHINESE', 'AF_CHINESE', 'AC_MALAY', 'AF_MALAY', 'AN_MALAY', 'AC_INDIAN', 'AF_INDIAN', 'AN_INDIAN', 'SVSIZE', 'END'))
sv_mt.describe()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

----------------------------------------
Global fields:
    None
----------------------------------------
Column fields:
    's': str
----------------------------------------
Row fields:
    'locus': locus<GRCh38>
    'alleles': array<str>
    'callers_info': struct {
        SVTYPE: str, 
        varid: str
    }
----------------------------------------
Entry fields:
    'GT': call
----------------------------------------
Column key: ['s']
Row key: ['locus', 'alleles']
----------------------------------------

In [28]:
# now we need to rearrange the columns
def align_mt2_cols_to_mt1(mt1, mt2):
    mt1 = mt1.add_col_index()
    mt2 = mt2.add_col_index()
    new_col_order = mt2.index_cols(mt1.col_key).col_idx.collect()
    new_mt = mt2.choose_cols(new_col_order)
    new_mt = new_mt.drop('col_idx')
    return new_mt


snv_mt = snv_mt.filter_cols(hl.is_defined(sv_mt.cols()[snv_mt.col_key]))
snv_mt = align_mt2_cols_to_mt1(sv_mt, snv_mt)


sv_mt.describe()
snv_mt.describe()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

----------------------------------------
Global fields:
    None
----------------------------------------
Column fields:
    's': str
----------------------------------------
Row fields:
    'locus': locus<GRCh38>
    'alleles': array<str>
    'callers_info': struct {
        SVTYPE: str, 
        varid: str
    }
----------------------------------------
Entry fields:
    'GT': call
----------------------------------------
Column key: ['s']
Row key: ['locus', 'alleles']
----------------------------------------
----------------------------------------
Global fields:
    None
----------------------------------------
Column fields:
    's': str
----------------------------------------
Row fields:
    'locus': locus<GRCh38>
    'alleles': array<str>
    'callers_info': struct {
        SVTYPE: str, 
        varid: str
    }
----------------------------------------
Entry fields:
    'GT': call
----------------------------------------
Column key: ['s']
Row key: ['locus', 'alleles']
---------

In [29]:
mt = snv_mt.union_rows(sv_mt).checkpoint("test_joined_SG10K-SV_SG10K_Health.GT-only-varid.mt")
mt.describe()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

----------------------------------------
Global fields:
    None
----------------------------------------
Column fields:
    's': str
----------------------------------------
Row fields:
    'locus': locus<GRCh38>
    'alleles': array<str>
    'callers_info': struct {
        SVTYPE: str, 
        varid: str
    }
----------------------------------------
Entry fields:
    'GT': call
----------------------------------------
Column key: ['s']
Row key: ['locus', 'alleles']
----------------------------------------
2024-05-14 04:17:30 Hail: INFO: wrote matrix table with 179491952 rows and 5487 columns in 55371 partitions to test_joined_SG10K-SV_SG10K_Health.GT-only-varid.mt

## Compute genotypes
### Set the haploid genotypes to diploid
- if haploid, if 0 -> 0/0
- if haploid, if 1 -> 0/1
- if diploid, keep it the same

In [30]:
## load the full matrix table
mt = hl.read_matrix_table("test_joined_SG10K-SV_SG10K_Health.GT-only-varid.mt")
mt.describe()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

----------------------------------------
Global fields:
    None
----------------------------------------
Column fields:
    's': str
----------------------------------------
Row fields:
    'locus': locus<GRCh38>
    'alleles': array<str>
    'callers_info': struct {
        SVTYPE: str, 
        varid: str
    }
----------------------------------------
Entry fields:
    'GT': call
----------------------------------------
Column key: ['s']
Row key: ['locus', 'alleles']
----------------------------------------

In [31]:
# recode to diploid
mt2 = mt.annotate_entries(
    GT = hl.if_else(
             mt.GT.ploidy == 1, 
             hl.call(0, mt.GT[0]),
             mt.GT)
).checkpoint("test_joined_SG10K-SV_SG10K_Health.GT-only-varid-recode_genotypes.mt", overwrite = True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2024-05-14 04:27:03 Hail: INFO: wrote matrix table with 179491952 rows and 5487 columns in 55371 partitions to test_joined_SG10K-SV_SG10K_Health.GT-only-varid-recode_genotypes.mt

In [32]:
## export to plink
## export 1 chr 1 plinnk file

# Chromosome list
chroms = ['chr1', 'chr2', 'chr3', 'chr4', 'chr5', 'chr6', 'chr7', 'chr8', 'chr9', 'chr10', 'chr11', 'chr12', 'chr13', 'chr14', 'chr15', 'chr16', 'chr17', 'chr18', 'chr19', 'chr20', 'chr21', 'chr22', 'chrX', 'chrY']
# Export plink for each chrom
for chr in chroms:
    # Extract chrom from mt
    mt_sub = hl.filter_intervals(mt2, [hl.parse_locus_interval(chr, reference_genome='GRCh38')])
    # Export plink
    hl.export_plink(mt_sub, 'SG10K_SV_SG10K_Health_.' + chr + '.plink', varid=mt_sub.callers_info.varid)




FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2024-05-14 04:37:10 Hail: INFO: merging 4484 files totalling 18.5G...
2024-05-14 04:39:12 Hail: INFO: while writing:
    SG10K_SV_SG10K_Health_.chr1.plink.bed
  merge time: 2m2.2s
2024-05-14 04:39:13 Hail: INFO: merging 4483 files totalling 586.0M...
2024-05-14 04:39:28 Hail: INFO: while writing:
    SG10K_SV_SG10K_Health_.chr1.plink.bim
  merge time: 14.171s
2024-05-14 04:39:37 Hail: INFO: merging 960 files totalling 102.0K...
2024-05-14 04:39:39 Hail: INFO: while writing:
    SG10K_SV_SG10K_Health_.chr1.plink.fam
  merge time: 1.645s
2024-05-14 04:39:39 Hail: INFO: wrote 14497866 variants and 5487 samples to 'SG10K_SV_SG10K_Health_.chr1.plink'
2024-05-14 04:40:19 Hail: INFO: merging 4557 files totalling 19.1G...
2024-05-14 04:42:23 Hail: INFO: while writing:
    SG10K_SV_SG10K_Health_.chr2.plink.bed
  merge time: 2m3.6s
2024-05-14 04:42:24 Hail: INFO: merging 4556 files totalling 601.3M...
2024-05-14 04:42:37 Hail: INFO: while writing:
    SG10K_SV_SG10K_Health_.chr2.plink.bim
  merg

### Filter for high confidence (call rate >= 0.8) and common (MAF >= 1%) variants

In [3]:
mt = hl.read_matrix_table("test_joined_SG10K-SV_SG10K_Health.GT-only-varid-recode_genotypes.mt")
mt.describe()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

----------------------------------------
Global fields:
    None
----------------------------------------
Column fields:
    's': str
----------------------------------------
Row fields:
    'locus': locus<GRCh38>
    'alleles': array<str>
    'callers_info': struct {
        SVTYPE: str, 
        varid: str
    }
----------------------------------------
Entry fields:
    'GT': call
----------------------------------------
Column key: ['s']
Row key: ['locus', 'alleles']
----------------------------------------

In [4]:
# to get AF and call rate
mt = hl.variant_qc(mt)
mt.describe()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

----------------------------------------
Global fields:
    None
----------------------------------------
Column fields:
    's': str
----------------------------------------
Row fields:
    'locus': locus<GRCh38>
    'alleles': array<str>
    'callers_info': struct {
        SVTYPE: str, 
        varid: str
    }
    'variant_qc': struct {
        AC: array<int32>, 
        AF: array<float64>, 
        AN: int32, 
        homozygote_count: array<int32>, 
        call_rate: float64, 
        n_called: int64, 
        n_not_called: int64, 
        n_filtered: int64, 
        n_het: int64, 
        n_non_ref: int64, 
        het_freq_hwe: float64, 
        p_value_hwe: float64, 
        p_value_excess_het: float64
    }
----------------------------------------
Entry fields:
    'GT': call
----------------------------------------
Column key: ['s']
Row key: ['locus', 'alleles']
----------------------------------------

In [6]:
# Filter those with call rate 
mt_filtered = mt.filter_rows((mt.variant_qc.call_rate >= 0.8), keep=True)
mt_filtered.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(158394558, 5487)

In [7]:
mt.aggregate_rows(hl.struct( svtype_count = hl.agg.counter(mt.callers_info.SVTYPE)))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Struct(svtype_count=frozendict({'DEL': 11560, 'DUP': 32464, 'INS': 29011, 'SNP': 179418917}))

In [8]:
mt_filtered.aggregate_rows(hl.struct( svtype_count = hl.agg.counter(mt_filtered.callers_info.SVTYPE)))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Struct(svtype_count=frozendict({'DEL': 8296, 'DUP': 20579, 'INS': 24778, 'SNP': 158340905}))

In [9]:
# Filter for common variants
mt_filtered = mt_filtered.filter_rows(mt_filtered.variant_qc.AF[1] >= 0.01, keep=True)
mt_filtered.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(9456956, 5487)

In [10]:
mt_filtered.aggregate_rows(hl.struct( svtype_count = hl.agg.counter(mt_filtered.callers_info.SVTYPE)))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Struct(svtype_count=frozendict({'DEL': 1516, 'DUP': 2078, 'INS': 3178, 'SNP': 9450184}))

In [11]:
## export to plink
## export 1 chr 1 plinnk file

# Chromosome list
chroms = ['chr1', 'chr2', 'chr3', 'chr4', 'chr5', 'chr6', 'chr7', 'chr8', 'chr9', 'chr10', 'chr11', 'chr12', 'chr13', 'chr14', 'chr15', 'chr16', 'chr17', 'chr18', 'chr19', 'chr20', 'chr21', 'chr22']
# Export plink for each chrom
for chr in chroms:
    # Extract chrom from mt
    mt_sub = hl.filter_intervals(mt_filtered, [hl.parse_locus_interval(chr, reference_genome='GRCh38')])
    # Export plink
    hl.export_plink(mt_sub, 'SG10K_SV_SG10K_Health_filtered.' + chr + '.plink', varid=mt_sub.callers_info.varid)




FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2024-05-16 03:33:28 Hail: INFO: merging 4484 files totalling 942.0M...
2024-05-16 03:33:44 Hail: INFO: while writing:
    SG10K_SV_SG10K_Health_filtered.chr1.plink.bed
  merge time: 15.933s
2024-05-16 03:33:45 Hail: INFO: merging 4483 files totalling 28.8M...
2024-05-16 03:33:54 Hail: INFO: while writing:
    SG10K_SV_SG10K_Health_filtered.chr1.plink.bim
  merge time: 9.004s
2024-05-16 03:33:56 Hail: INFO: merging 960 files totalling 102.0K...
2024-05-16 03:33:58 Hail: INFO: while writing:
    SG10K_SV_SG10K_Health_filtered.chr1.plink.fam
  merge time: 1.651s
2024-05-16 03:33:58 Hail: INFO: wrote 719975 variants and 5487 samples to 'SG10K_SV_SG10K_Health_filtered.chr1.plink'
2024-05-16 03:34:41 Hail: INFO: merging 4557 files totalling 1002.7M...
2024-05-16 03:34:57 Hail: INFO: while writing:
   SG10K_SV_SG10K_Health_filtered.chr2.plink.bed
  merge time: 15.831s
2024-05-16 03:34:58 Hail: INFO: merging 4556 files totalling 30.7M...
2024-05-16 03:35:06 Hail: INFO: while writing:
    SG10K