# CBX8 LD matrix from 1000 Genomes (GRCh38)

This notebook reads `output/CBX8_variants.vcf`, downloads the 1000G sample panel, pulls only the chr17 region spanning those variants from the 1000 Genomes GRCh38 VCF, and builds an LD correlation matrix (R) for EUR samples.

Outputs:
- `output/CBX8_LD_R.csv` (LD matrix)
- `output/CBX8_LD_R.npy` (binary matrix)
- `output/CBX8_LD_variant_order.tsv` (variant order)


In [None]:
import importlib.util
import subprocess
import sys

def ensure_pkg(name):
    if importlib.util.find_spec(name) is None:
        subprocess.check_call([sys.executable, '-m', 'pip', 'install', name])

for pkg in ['numpy', 'pysam']:
    ensure_pkg(pkg)


In [None]:
from pathlib import Path

root = Path.cwd()
if root.name == 'vignettes':
    root = root.parent
elif not (root / 'output').exists() and (root / 'vignettes').exists():
    # Fallback if notebook is launched from a subdir
    root = root

data_dir = root / 'data'
output_dir = root / 'output'
data_dir.mkdir(parents=True, exist_ok=True)
output_dir.mkdir(parents=True, exist_ok=True)

vcf_path = output_dir / 'CBX8_variants.vcf'
if not vcf_path.exists():
    raise FileNotFoundError(f'Missing input VCF: {vcf_path}')

variants = []
with vcf_path.open() as f:
    for line in f:
        if line.startswith('#'):
            continue
        fields = line.rstrip('\n').split('\t')
        if len(fields) < 5:
            continue
        chrom, pos, vid, ref, alt = fields[:5]
        variants.append({
            'chrom': chrom,
            'pos': int(pos),
            'id': vid,
            'ref': ref,
            'alt': alt.split(',')[0],
        })

if not variants:
    raise ValueError('No variants found in input VCF.')

chroms = sorted({v['chrom'] for v in variants})
min_pos = min(v['pos'] for v in variants)
max_pos = max(v['pos'] for v in variants)

print(f'Loaded {len(variants)} variants on {chroms}')
print(f'Region: {chroms[0]}:{min_pos}-{max_pos}')


In [None]:
import urllib.request

panel_url = (
    'https://ftp.1000genomes.ebi.ac.uk/vol1/ftp/release/20130502/'
    'integrated_call_samples_v3.20130502.ALL.panel'
)
panel_path = data_dir / 'integrated_call_samples_v3.20130502.ALL.panel'

if not panel_path.exists():
    print(f'Downloading panel to {panel_path}')
    urllib.request.urlretrieve(panel_url, panel_path)

eur_samples = []
with panel_path.open() as f:
    header = f.readline()
    for line in f:
        line = line.strip()
        if not line:
            continue
        fields = line.split()
        if len(fields) < 3:
            continue
        sample, pop, super_pop = fields[:3]
        if super_pop == 'EUR':
            eur_samples.append(sample)

print(f'EUR samples in panel: {len(eur_samples)}')


In [None]:
import numpy as np
import pysam

base_url = (
    'https://ftp.1000genomes.ebi.ac.uk/vol1/ftp/'
    'data_collections/1000_genomes_project/release/'
    '20190312_biallelic_SNV_and_INDEL'
)

chrom_no_chr = chroms[0].replace('chr', '')
vcf_url = (
    f'{base_url}/ALL.chr{chrom_no_chr}.'
    'shapeit2_integrated_snvindels_v2a_27022019.GRCh38.phased.vcf.gz'
)

vcf_in = pysam.VariantFile(vcf_url)
contigs = set(vcf_in.header.contigs)
chrom_with_chr = f'chr{chrom_no_chr}'

if chroms[0] in contigs:
    fetch_chrom = chroms[0]
elif chrom_with_chr in contigs:
    fetch_chrom = chrom_with_chr
elif chrom_no_chr in contigs:
    fetch_chrom = chrom_no_chr
else:
    raise ValueError(f'Chromosome not found in VCF contigs: {chroms[0]}')

vcf_samples = list(vcf_in.header.samples)
eur_set = set(eur_samples)
sample_names = [s for s in vcf_samples if s in eur_set]
if not sample_names:
    raise ValueError('No EUR samples from the panel were found in the VCF header.')

print(f'EUR samples used: {len(sample_names)}')

key_to_index = {}
for idx, v in enumerate(variants):
    chrom_key = v['chrom']
    chrom_no_chr = chrom_key.replace('chr', '')
    chrom_with_chr = f'chr{chrom_no_chr}'
    for ck in {chrom_key, chrom_no_chr, chrom_with_chr}:
        key_to_index[(ck, v['pos'], v['ref'], v['alt'])] = idx

G = np.full((len(variants), len(sample_names)), np.nan, dtype=np.float32)

start0 = min_pos - 1
end0 = max_pos
hits = 0
for rec in vcf_in.fetch(fetch_chrom, start0, end0):
    if not rec.alts:
        continue
    alt = rec.alts[0]
    key = (rec.chrom, rec.pos, rec.ref, alt)
    idx = key_to_index.get(key)
    if idx is None:
        continue
    hits += 1
    for j, s in enumerate(sample_names):
        gt = rec.samples[s].get('GT')
        if gt is None or None in gt:
            continue
        if -1 in gt:
            continue
        G[idx, j] = gt[0] + gt[1]

print(f'Matched variants in 1000G: {hits} / {len(variants)}')

found_mask = ~np.isnan(G).all(axis=1)
if not found_mask.all():
    missing = [variants[i]['id'] for i, ok in enumerate(found_mask) if not ok]
    print(f'Missing variants not found in 1000G VCF: {len(missing)}')
    print('Example missing IDs:', missing[:10])
    G = G[found_mask]
    variants = [v for v, ok in zip(variants, found_mask) if ok]


In [None]:
import csv

G_mean = np.nanmean(G, axis=1, keepdims=True)
G_filled = np.where(np.isnan(G), G_mean, G)
X = G_filled - G_mean
ddof = 1 if G.shape[1] > 1 else 0
stds = X.std(axis=1, ddof=ddof, keepdims=True)
stds[stds == 0] = np.nan
X = X / stds
X = np.nan_to_num(X, nan=0.0)
R = (X @ X.T) / max(G.shape[1] - 1, 1)
np.fill_diagonal(R, 1.0)

variant_labels = [v['id'] for v in variants]

output_csv = output_dir / 'CBX8_LD_R.csv'
with output_csv.open('w', newline='') as f:
    writer = csv.writer(f)
    writer.writerow(['variant'] + variant_labels)
    for label, row in zip(variant_labels, R):
        writer.writerow([label] + [f'{x:.6g}' for x in row])

np.save(output_dir / 'CBX8_LD_R.npy', R)

order_path = output_dir / 'CBX8_LD_variant_order.tsv'
with order_path.open('w', newline='') as f:
    writer = csv.writer(f, delimiter='	')
    writer.writerow(['index', 'id', 'chrom', 'pos', 'ref', 'alt'])
    for i, v in enumerate(variants):
        writer.writerow([i, v['id'], v['chrom'], v['pos'], v['ref'], v['alt']])

print('Wrote:', output_csv)
print('Wrote:', order_path)
