In [1]:
pip install pyfaidx biopython tqdm requests


Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [3]:
import os
import gzip
import shutil
import requests
from tqdm import tqdm
from pyfaidx import Fasta
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from Bio import SeqIO

# Parameters
GENES = ["DUSP6", "ETV4", "ETV5",  "BRAF"]
ENSEMBL_REST = "https://rest.ensembl.org"
ENSEMBL_FASTA_URL = "http://ftp.ensembl.org/pub/release-114/fasta/homo_sapiens/dna/Homo_sapiens.GRCh38.dna.primary_assembly.fa.gz"
FASTA_DIR = "genome_data"
FASTA_GZ = os.path.join(FASTA_DIR, "Homo_sapiens.GRCh38.dna.primary_assembly.fa.gz")
FASTA_FILE = FASTA_GZ.rstrip(".gz")
UPSTREAM = 50
DOWNSTREAM = 250

# Step 1: Download genome FASTA
os.makedirs(FASTA_DIR, exist_ok=True)

def download_file(url, output_path):
    if os.path.exists(output_path):
        print(f"File already exists: {output_path}")
        return
    print(f"Downloading {url}...")
    with requests.get(url, stream=True) as r:
        r.raise_for_status()
        with open(output_path, "wb") as f:
            for chunk in tqdm(r.iter_content(chunk_size=8192)):
                f.write(chunk)
    print("Download complete.")

# Step 2: Unzip .gz file
def unzip_gz(gz_path, output_path):
    if os.path.exists(output_path):
        print(f"Unzipped FASTA already exists: {output_path}")
        return
    print(f"Unzipping {gz_path}...")
    with gzip.open(gz_path, 'rb') as f_in:
        with open(output_path, 'wb') as f_out:
            shutil.copyfileobj(f_in, f_out)
    print("Unzipping complete.")

# Step 3: Get TSS info
def get_tss(gene):
    url = f"{ENSEMBL_REST}/lookup/symbol/homo_sapiens/{gene}?expand=1"
    headers = {"Content-Type": "application/json"}
    r = requests.get(url, headers=headers)
    if not r.ok:
        print(f"Could not retrieve {gene}")
        return None
    data = r.json()
    canonical = next((t for t in data.get("Transcript", []) if t.get("is_canonical")), None)
    if canonical:
        chrom = canonical["seq_region_name"]
        strand = canonical["strand"]
        tss = canonical["start"] if strand == 1 else canonical["end"]
        return gene, chrom, tss, strand
    return None

# Step 4: Extract sequences
def extract_tss_sequences(genome_path, genes):
    genome = Fasta(genome_path)
    records = []

    for gene in genes:
        info = get_tss(gene)
        if not info:
            continue
        gene, chrom, tss, strand = info

        if chrom not in genome:
            print(f"{chrom} not in genome for {gene}")
            continue

        start = tss - UPSTREAM if strand == 1 else tss - DOWNSTREAM
        end = tss + DOWNSTREAM if strand == 1 else tss + UPSTREAM
        start = max(1, start)

        try:
            sequence = genome[chrom][start - 1:end].seq  # 0-based
        except Exception as e:
            print(f"Error fetching {gene}: {e}")
            continue

        if strand == -1:
            sequence = Seq(sequence).reverse_complement()

        header = f"{gene}_chr{chrom}_{start}-{end}_TSS±200bp"
        record = SeqRecord(Seq(sequence), id=header, description="")
        records.append(record)

    SeqIO.write(records, "tss_flanking_sequences.fasta", "fasta")
    print("FASTA file written: tss_flanking_sequences.fasta")

# Run
download_file(ENSEMBL_FASTA_URL, FASTA_GZ)
unzip_gz(FASTA_GZ, FASTA_FILE)
extract_tss_sequences(FASTA_FILE, GENES)


File already exists: genome_data/Homo_sapiens.GRCh38.dna.primary_assembly.fa.gz
Unzipped FASTA already exists: genome_data/Homo_sapiens.GRCh38.dna.primary_assembly.fa
FASTA file written: tss_flanking_sequences.fasta
