In [45]:
import sys
import random
import csv

QUALITY_VALUES = ["0","1","2","3","4","5","6","7","8","9","A","B","C","D","E","F","G","H","I","J","K","L","M","N","O","P","Q","R","S","T","U","V","Z","a","b","c","d","e","f","g","h","i","j","k","l","m","n","o","p","q","r","s","t","u","v","z","!","$","%","&","(",")","^","#"]
BASES          = ["A","T","C","G"]

### Assignment 1: Random fasta and fastq file generator
Write a python program that generates both fasta and fastq files containing reads with the following characteristics:

- Read id contains a progressive number starting from 0. 
- Sequences have length 50 bp
- Bases are randomly generated using a A,T,C,G alphabet, but probability of each base for each read should be given from the command line as a set of numbers (probA, probT, probC, probG)
- The number of reads should be passed as an argument from the command line
- The name of the fasta/fastq file should be passed as an argument from the command line 
- For fastq files only: the quality of each base is randomly selected.

Example:
python read_generator simulatedfasta.fa 100 30 30 30 10

In [25]:
args = ["simulatedfasta.fa", 100, 30, 30, 30, 10]

BASE_PAIRS     = 50
 
OUTPUT_FILE                  =  args[0] 
READS_NUMBER                 =  args[1]
(probA, probT, probC, probG) = (args[2], args[3], args[4], args[5])

IS_FASTA = is_fasta(OUTPUT_FILE)
IS_FASTQ = is_fastq(OUTPUT_FILE)

ARE_PARAMETERS_CORRUPTED = IS_FASTA == IS_FASTQ or (probA + probT + probC + probG != 100) or READS_NUMBER <= 0

if ARE_PARAMETERS_CORRUPTED:
    sys.exit('There was an error on parameters')

probA = normalize_probability(probA)
probT = normalize_probability(probT)
probC = normalize_probability(probC)
probG = normalize_probability(probG)

probability_intervals = define_probability_intervals(probA, probT, probC, probG)

reads          = list()
quality_values = list()

for i in range(READS_NUMBER):
    read    = generate_read(BASE_PAIRS, probability_intervals)
    quality = generate_quality_value(BASE_PAIRS) if IS_FASTQ == True else ""
    
    reads         .append(read)
    quality_values.append(quality)
    
make_file(OUTPUT_FILE, reads, quality_values)

### Assignment 2: Statistics extraction

Write a python program for extracting statistics from fasta/fastq files. The program must take as a first argument from the command line the name of the input fasta file to be analyzed and write to an output text file (whose name is passed as a second argument from the command line) a summary of the computed statistics.
The following are the expected output statistics:
- Statistics of single bases across all the reads: Number of A,T,C,G
- Number of reads having at least one low complexity sequence: AAAAAA, TTTTTT, CCCCCC or GGGGGG.
- Number of reads having the number of GC couples (so called GC content) higher than a threshold GC_THRESHOLD passed as third argument from the command line
- For each read having a GC content higher than GC_THRESHOLD, report the read_id and the number of GC couples

In [5]:
args = ["simulatedfasta.fa", "analysis_simulatedfasta.txt", 3]

LOW_COMPLEXITY_THRESHOLD = 6
LOW_COMPLEXITY_SEQUENCES = [base * LOW_COMPLEXITY_THRESHOLD for base in BASES]

INPUT_FILE   = args[0]
OUTPUT_FILE  = args[1]
GC_THRESHOLD = args[2]

IS_FASTA = is_fasta(INPUT_FILE)
IS_FASTQ = is_fastq(INPUT_FILE)

ARE_PARAMETERS_CORRUPTED = IS_FASTA == IS_FASTQ 

if ARE_PARAMETERS_CORRUPTED:
    sys.exit('There was an error on parameters')

reads = read_file(INPUT_FILE)  

single_bases_occurrences       = get_occurrences_of_single_bases             (reads)
low_complexity_sequences_reads = get_number_of_low_complexity_sequences_reads(reads)
gc_content_reads               = get_number_of_gc_contents_reads             (reads)

make_analysis_file(OUTPUT_FILE, single_bases_occurrences, low_complexity_sequences_reads, gc_content_reads)

### Assignment 3: Fasta comparison

Write a python program to compare two fasta files. The two fasta files are passed as first and second argument from the command line.

The two fasta files have the following characteristics:
- The fasta format of the two files is correct (no need to check the format) 
- Each read can take up one or multiple lines
- Each input file does not contain duplicated reads (i.e. identical reads)

The program must write as output a third fasta file containing only the reads that are in common between the input files. 
The read ids in the output file should be composed by the read id of the first file concatenated with the read id of the second file.

In [35]:
args = ["simulatedfasta1.fa", "simulatedfasta2.fa"]

INPUT_FILE_1 = args[0]
INPUT_FILE_2 = args[1]
OUTPUT_FILE  = "common_reads_" + INPUT_FILE_1.replace(".fa", "") + "_" + INPUT_FILE_2.replace(".fa", "") + ".txt"

reads_1 = read_fasta_file(INPUT_FILE_1)
reads_2 = read_fasta_file(INPUT_FILE_2)

common_reads = list(set(reads_1).intersection(reads_2))

make_common_reads_file(OUTPUT_FILE, common_reads, reads_1, reads_2)

### Assignment 4: Consensus region

Write a python program that reconstructs the consensus regions on a specific chromosome starting from a tab-separated file called alignments.txt made up of three columns:<br> the read ID, the sequence of the read and the alignment position of the read onto the reference genome. <br> An example of alignments.txt is available in the following. <br><br>
Exploiting the sequence and the alignment position of each read, build the consensus regions on the selected chromosome. <br>
Please note that all reads have the same length and that multiple consensus regions are allowed for the same chromosome.

In [120]:
args = ["alignment.txt", "reference_genome.txt"]

ALIGNMENT_FILE        = args[0]
REFERENCE_GENOME_FILE = args[1]

mock_alignment_file(ALIGNMENT_FILE)

reference_genome = "".join(read_file(REFERENCE_GENOME_FILE))

reads = read_alignment_file(ALIGNMENT_FILE)

regions = [Base(i) for i in range(len(reference_genome))]
regions = fill_regions(reads, regions)
regions = [region.current_base for region in regions]

produce_consensus_regions(regions)

['ATTTCGGCGGCGACACACCGATGACACTAAGCACG', 'GCATTTAAAAAATCCTTGGACACAAATGCAT']

### Code

In [119]:
def is_fasta(filename):
    return True if filename.find(".fa") != -1 else False

def is_fastq(filename):
    return True if filename.find(".fq") != -1 else False

def read_file(filename):
    global IS_FASTA, IS_FASTQ
    
    if IS_FASTA == True:
        return read_fasta_file(filename)

    if IS_FASTQ == True:
        return read_fastq_file(filename)

def read_fasta_file(filename, multiline = False):
    if multiline == True:
        return read_fasta_file_multiline(filename)
    
    reads = list()
    
    with open(filename, "r") as f:
        for line in f:
            if line.startswith(">"):
                continue
                
            reads.append(line.rstrip("\n"))
    
    f.close()
    return reads

def read_fasta_file_multiline(filename):
    sequence = ""
    
    with open(filename, "r") as f:
        for line in f:
            if line.startswith(">"):
                if len(sequence) > 0:
                    reads.append(sequence)
                    sequence = ""
                continue
                
            sequence += sequence.rstrip("\n")
    
    f.close()
    return reads
    
def read_fastq_file(filename):
    reads = list()
    skip  = False
    
    with open(filename, "r") as f:
        for line in f:
            if line.startswith(">"):
                continue
            
            if skip == False:
                reads.append(line.rstrip("\n"))
            
            skip = not skip
    
    f.close()
    return reads

def read_alignment_file(ALIGNMENT_FILE):
    f              = open(ALIGNMENT_FILE, "r") 
    alignment_file = csv.reader(f, delimiter="\t")
    
    reads = list()
    
    for row in alignment_file:
        reads.append( (row[1], int(row[2])) )

    return reads

def normalize_probability(prob):
    return prob/100

def define_probability_intervals(probA, probT, probC, probG):
    probability_intervals = dict()
    
    cumulative_probability     = probA
    probability_intervals["A"] = (0,                          cumulative_probability - 0.01)
    
    old_cumulative_probability = cumulative_probability
    cumulative_probability    += probC
    probability_intervals["C"] = (old_cumulative_probability, cumulative_probability - 0.01) 
    
    old_cumulative_probability = cumulative_probability
    cumulative_probability    += probT
    probability_intervals["T"] = (old_cumulative_probability, cumulative_probability - 0.01)
    
    old_cumulative_probability = cumulative_probability
    cumulative_probability     = 1
    probability_intervals["G"] = (old_cumulative_probability, cumulative_probability)
    
    return probability_intervals

def generate_read(BASE_PAIRS, probability_intervals):
    global BASES
    
    read = ""
    
    for i in range(BASE_PAIRS):
        base  = generate_base(BASES, probability_intervals)
        
        read += base
    
    return read

def generate_quality_value(BASE_PAIRS):
    global QUALITY_VALUES
    
    quality_value = ""
    
    for i in range(BASE_PAIRS):
        quality_value_index = random.randint(0, len(QUALITY_VALUES) - 1)
        
        quality_value      += QUALITY_VALUES[quality_value_index]

    return quality_value

def generate_base(BASES, probability_intervals):
    random_value = round(random.random(), 2)
    
    for base in BASES:
        low, high = probability_intervals[base]
        if low <= random_value <= high:
            return str(base)
    
    return str(base)

def get_occurrences_of_single_bases(reads):
    sequences = "".join(reads)
    
    return {
        "A": sequences.count("A"),
        "T": sequences.count("T"),
        "C": sequences.count("C"),
        "G": sequences.count("G")
    }

def get_number_of_low_complexity_sequences_reads(reads):
    global LOW_COMPLEXITY_SEQUENCES
    
    matching_reads = 0
    
    for read in reads:
        if any(low_complexity_sequence in read for low_complexity_sequence in LOW_COMPLEXITY_SEQUENCES):
            matching_reads += 1
            
    return matching_reads
    
def get_number_of_gc_contents_reads(reads):
    global GC_THRESHOLD
    
    matching_reads = list()
    
    for i in range(len(reads)):
        read = reads[i]
        gc_contents = read.count("GC")
        
        if gc_contents >= GC_THRESHOLD:
            matching_reads.append( (i, gc_contents) )
            
    return matching_reads

def produce_consensus_regions(regions):
    
    region           = ""
    consensus_region = list()
    
    for base in regions:
        
        if base != "":
        
            region += base

        elif base == "" and region != "":
            
            consensus_region.append(region)
            region = ""
    
    consensus_region.append(region)
    
    return consensus_region
        

def fill_regions(reads, regions):
    
    for sequence, index in reads:
        
        for base in list(sequence):
            
            regions[index].fill(base)
            
            index += 1
    
    return regions

def make_file(filename, reads, quality_values):
    global IS_FASTA, IS_FASTQ
    
    f = open(filename, "w")
    
    for i in range(len(reads)):
        f.write(">read" + str(i) + "\n")
        f.write(reads[i] + "\n")
        
        if IS_FASTQ == True:
            f.write(quality_values[i] + "\n")

    f.close()

def make_analysis_file(filename, single_bases_occurrences, low_complexity_sequences_reads, gc_content_reads):
    global BASES, GC_THRESHOLD
    
    with open(filename, "w") as f:
        for base in BASES:
            f.write("Number of occurrences of base " + base + ": " + str(single_bases_occurrences[base]) + "\n")
        f.write("\n")
        
        f.write("Number of reads having at least one low complexity sequence: " + str(low_complexity_sequences_reads) + "\n")
        f.write("Number of reads having the number of GC content higher than " + str(GC_THRESHOLD) + ": " + str(len(gc_content_reads)) + "\n")
        f.write("\n")
        
        for read in gc_content_reads:
            f.write("read" + str(read[0]) + ",\t" + str(read[1]) + "\n")
    
    return
       
def make_common_reads_file(filename, common_reads, reads_1, reads_2):
    if any(common_reads):
        with open(filename, "w") as f:
            for read in common_reads:
                f.write(">read" + str(reads_1.index(read)) + "_" + str(reads_2.index(read)) + "\n" + read)
                
def mock_alignment_file(filename):
    content = [
        ("CAGCCATGACACTAAGCACG", 15),
        ("TTTAAAAAATCCGTGGACAC", 40),
        ("GCATTTAAAAAATCCTTGGA", 37),
        ("ATTTCGGCGGCGACACCCCG", 0 ),
        ("TTCGGCGGCGACACACCGAT", 2 ),
        ("ATATTTGGACACAAATGCAT", 48)
    ]

    with open(filename, "w") as f:
        for i in range(len(content)):
            f.write("read_" +  str(i) + "\t" + str(content[i][0]) + "\t" + str(content[i][1]) + "\n")

In [78]:
class Base:
    def __init__(self, index):
        self.index        = index
        self.current_base = ""
        self.counters     = {
            "A" : 0,
            "C" : 0,
            "T" : 0,
            "G" : 0
        }
        pass
    
    def fill(self, base_found):

        self.counters[base_found] += 1
        
        if self.current_base == "" or (self.current_base != base_found and self.counters[base_found] > self.counters[self.current_base]):
            self.current_base = base_found

        return
        pass