In [None]:
from Bio import SeqIO
from Bio.Seq import Seq
import gzip
import matplotlib.pyplot as plt

In [None]:
# 1. Count sequences in a FASTQ file
def count_fastq_sequences(fastq_file):
    count = 0
    with gzip.open(fastq_file, "rt") if fastq_file.endswith(".gz") else open(fastq_file, "r") as handle:
        for record in SeqIO.parse(handle, "fastq"):
            count += 1
    return count



In [None]:
# 2. Calculate GC content
def calculate_gc_content(fasta_file):
    gc_contents = []
    for record in SeqIO.parse(fasta_file, "fasta"):
        sequence = str(record.seq).upper()
        gc_content = (sequence.count('G') + sequence.count('C')) / len(sequence) * 100
        gc_contents.append(gc_content)
    return gc_contents


In [None]:
# 3. Convert FASTQ to FASTA
def fastq_to_fasta(fastq_file, fasta_file):
    with open(fasta_file, "w") as output_handle:
        SeqIO.convert(fastq_file, "fastq", output_handle, "fasta")

In [None]:
# 4. Find longest sequence in FASTA
def find_longest_sequence(fasta_file):
    return max(SeqIO.parse(fasta_file, "fasta"), key=lambda x: len(x.seq))

In [None]:
# 5. Calculate N50
def calculate_n50(fasta_file):
    lengths = [len(record.seq) for record in SeqIO.parse(fasta_file, "fasta")]
    lengths.sort(reverse=True)
    total_length = sum(lengths)
    cumulative_length = 0
    for length in lengths:
        cumulative_length += length
        if cumulative_length >= total_length / 2:
            return length

In [None]:
# 6. Filter low-quality reads
def filter_low_quality_reads(input_fastq, output_fastq, quality_threshold):
    with open(output_fastq, "w") as output_handle:
        for record in SeqIO.parse(input_fastq, "fastq"):
            if min(record.letter_annotations["phred_quality"]) >= quality_threshold:
                SeqIO.write(record, output_handle, "fastq")


In [None]:
# 7. Compare two FASTA files
def compare_fasta_files(fasta1, fasta2):
    sequences1 = set(str(record.seq) for record in SeqIO.parse(fasta1, "fasta"))
    sequences2 = set(str(record.seq) for record in SeqIO.parse(fasta2, "fasta"))
    return sequences1.intersection(sequences2)



In [None]:
# 8. Reverse complement
def reverse_complement(sequence):
    return str(Seq(sequence).reverse_complement())


In [None]:
# 9. Split FASTA file
def split_fasta(input_fasta, output_prefix, sequences_per_file):
    records = list(SeqIO.parse(input_fasta, "fasta"))
    for i, batch in enumerate(range(0, len(records), sequences_per_file)):
        with open(f"{output_prefix}_{i+1}.fasta", "w") as output_handle:
            SeqIO.write(records[batch:batch+sequences_per_file], output_handle, "fasta")



In [None]:
# 10. Calculate read length distribution
def read_length_distribution(fastq_file):
    lengths = [len(record.seq) for record in SeqIO.parse(fastq_file, "fastq")]
    plt.hist(lengths, bins=50)
    plt.title("Read Length Distribution")
    plt.xlabel("Read Length")
    plt.ylabel("Frequency")
    plt.show()



In [None]:
# 11.Implement a function to simulate long-read sequencing data, including typical error profiles:
import numpy as np

def simulate_long_reads(genome_length, read_length, coverage, error_rates):
    num_reads = int((genome_length * coverage) / read_length)
    reads = []
    bases = ['A', 'C', 'G', 'T']
    error_types = ['substitution', 'insertion', 'deletion']
    
    for _ in range(num_reads):
        start = np.random.randint(0, genome_length - read_length)
        read = ''.join(np.random.choice(bases, read_length))
        
        # Introduce errors
        for i in range(read_length):
            error_type = np.random.choice(error_types + ['no_error'], p=[error_rates['substitution'],
                                                                         error_rates['insertion'],
                                                                         error_rates['deletion'],
                                                                         1 - sum(error_rates.values())])
            if error_type == 'substitution':
                read = read[:i] + np.random.choice([b for b in bases if b != read[i]]) + read[i+1:]
            elif error_type == 'insertion':
                read = read[:i] + np.random.choice(bases) + read[i:]
            elif error_type == 'deletion':
                read = read[:i] + read[i+1:]
        
        reads.append((start, read))
    
    return reads

# Example usage
genome_length = 1000000
read_length = 10000
coverage = 20
error_rates = {'substitution': 0.01, 'insertion': 0.005, 'deletion': 0.005}

simulated_reads = simulate_long_reads(genome_length, read_length, coverage, error_rates)

In [None]:
# 12. Create a simple genomic data compression algorithm using run-length encoding:
def compress_genomic_sequence(sequence):
    compressed = []
    count = 1
    current_base = sequence[0]
    
    for base in sequence[1:]:
        if base == current_base:
            count += 1
        else:
            compressed.append((current_base, count))
            current_base = base
            count = 1
    
    compressed.append((current_base, count))
    return compressed

def decompress_genomic_sequence(compressed):
    return ''.join(base * count for base, count in compressed)

# Example usage
sequence = "AAAATTCCCGGGGAAATTTCCCG"
compressed = compress_genomic_sequence(sequence)
decompressed = decompress_genomic_sequence(compressed)

print(f"Original sequence: {sequence}")
print(f"Compressed: {compressed}")
print(f"Decompressed: {decompressed}")
print(f"Compression ratio: {len(compressed) / len(sequence):.2f}")

In [None]:
# 13. Write a script to anonymize personal identifiers in a genomic dataset while preserving necessary metadata:
import pandas as pd
import hashlib

def anonymize_genomic_data(data, identifiers, salt=""):
    anonymized_data = data.copy()
    
    for identifier in identifiers:
        if identifier in anonymized_data.columns:
            anonymized_data[identifier] = anonymized_data[identifier].apply(
                lambda x: hashlib.sha256((str(x) + salt).encode()).hexdigest()
            )
    
    return anonymized_data

# Example usage
data = pd.DataFrame({
    'patient_id': ['P001', 'P002', 'P003'],
    'age': [35, 42, 28],
    'gender': ['M', 'F', 'M'],
    'genotype': ['AA', 'AT', 'TT']
})

identifiers = ['patient_id']
anonymized_data = anonymize_genomic_data(data, identifiers)

print("Original data:")
print(data)
print("\nAnonymized data:")
print(anonymized_data)

In [None]:
# 14. Develop a function to calculate and visualize the quality scores distribution for a FASTQ file:
from Bio import SeqIO
import matplotlib.pyplot as plt

def visualize_quality_scores(fastq_file):
    quality_scores = []
    
    with open(fastq_file, "r") as handle:
        for record in SeqIO.parse(handle, "fastq"):
            quality_scores.extend(record.letter_annotations["phred_quality"])
    
    plt.figure(figsize=(10, 6))
    plt.hist(quality_scores, bins=40, edgecolor='black')
    plt.title("Quality Scores Distribution")
    plt.xlabel("Phred Quality Score")
    plt.ylabel("Frequency")
    plt.savefig("quality_scores_distribution.png")
    plt.close()

    print(f"Quality scores distribution saved as 'quality_scores_distribution.png'")

# Example usage
fastq_file = "example.fastq"
visualize_quality_scores(fastq_file)

In [None]:
# 15. Implement a basic error correction algorithm for long-read sequencing data:
from collections import Counter

def correct_errors(reads, k=21):
    # Build k-mer frequency table
    kmer_counts = Counter()
    for read in reads:
        for i in range(len(read) - k + 1):
            kmer_counts[read[i:i+k]] += 1
    
    # Correct errors
    corrected_reads = []
    bases = ['A', 'C', 'G', 'T']
    
    for read in reads:
        corrected_read = ""
        for i in range(len(read)):
            if i < k - 1:
                corrected_read += read[i]
            else:
                kmer = read[i-k+1:i+1]
                if kmer_counts[kmer] < 2:  # Potential error
                    best_base = read[i]
                    best_count = kmer_counts[kmer]
                    for base in bases:
                        new_kmer = kmer[:-1] + base
                        if kmer_counts[new_kmer] > best_count:
                            best_base = base
                            best_count = kmer_counts[new_kmer]
                    corrected_read += best_base
                else:
                    corrected_read += read[i]
        
        corrected_reads.append(corrected_read)
    
    return corrected_reads

# Example usage
reads = [
    "ACGTACGTACGTACGTACGTACGTACGTACGT",
    "ACGTACGTACGTACGTACGTACGTACGTACGA",
    "ACGTACGTACGTACGTACGTACGTACGTACGG"
]

corrected_reads = correct_errors(reads)

print("Original reads:")
print(reads)
print("\nCorrected reads:")
print(corrected_reads)

In [None]:
# Example usage:
# count_fastq_sequences("example.fastq")
# calculate_gc_content("example.fasta")
# fastq_to_fasta("input.fastq", "output.fasta")
# longest_seq = find_longest_sequence("example.fasta")
# n50 = calculate_n50("example.fasta")
# filter_low_quality_reads("input.fastq", "output.fastq", 20)
# common_sequences = compare_fasta_files("file1.fasta", "file2.fasta")
# rev_comp = reverse_complement("ATGCATGC")
# split_fasta("large_file.fasta", "output", 1000)
# read_length_distribution("example.fastq")