# Initialization

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
pip install iv2py

Collecting iv2py
  Downloading iv2py-0.5.0-cp310-cp310-manylinux_2_28_x86_64.whl (496 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/496.9 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m174.1/496.9 kB[0m [31m5.0 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m491.5/496.9 kB[0m [31m7.5 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m496.9/496.9 kB[0m [31m6.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: iv2py
Successfully installed iv2py-0.5.0


In [None]:
pip install biopython

Collecting biopython
  Downloading biopython-1.83-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.1/3.1 MB[0m [31m12.1 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: biopython
Successfully installed biopython-1.83


# FM Index

In [None]:
import iv2py as iv
import os

# load index or create an index
if os.path.exists("file.fmindex"):
    # load fm index from disk
    index = iv.fmindex(path="file.fmindex")
else:
    # load fasta file - normalize sequences (e.g.: make everything capital letters, check for invalid characters)
    reference = [iv.normalize(rec.seq) for rec in iv.fasta.reader('/content/drive/MyDrive/data/hg38_partial.fasta')]

    # build fmindex
    index = iv.fmindex(reference=reference, samplingRate=16)

    index.save("file.fmindex")

In [None]:
# Check if the file imports ok
import iv2py as iv

fasta_file_path = '/content/drive/MyDrive/data/hg38_partial.fasta'

for record in iv.fasta.reader(file=fasta_file_path):
    print("ID:", record.id)
    print("First 10 letters of Sequence:", record.seq[:10])
    print("\n")


ID: NC_000001.11 Homo sapiens chromosome 1, GRCh38 Primary Assembly
First 10 letters of Sequence: TGCTCTGACC




# 1'000

In [None]:
import time
import iv2py as iv

# List the Illumina files
illumina_files = [
    'illumina_reads_100.fasta',
    'illumina_reads_40.fasta',
    'illumina_reads_60.fasta',
    'illumina_reads_80.fasta'
]

# Number of iterations
num_iterations = 100

# Dictionary to store total time per file
total_time_per_file = {file: 0 for file in illumina_files}

for illumina_file in illumina_files:
    for _ in range(num_iterations):
        # Load Illumina file
        illumina_file_path = '/content/drive/MyDrive/data/' + illumina_file
        for record in iv.fasta.reader(file=illumina_file_path):
            # Consider the first 1,000 characters in the current sequence
            reference_text_chunk = record.seq[:1000]

            # Measure the time for searching in the FMIndex
            start_time = time.time()
            res = index.search(reference_text_chunk)
            end_time = time.time()

            # Accumulate the total time per file
            total_time_per_file[illumina_file] += end_time - start_time

# Calculate the average time per file
average_time_per_file = {file: total_time / num_iterations for file, total_time in total_time_per_file.items()}

# Print the results
for file, average_time in average_time_per_file.items():
    print(f"Average time for {num_iterations} iterations in {file}: {average_time:.6f} seconds")


Average time for 100 iterations in illumina_reads_100.fasta: 0.929409 seconds
Average time for 100 iterations in illumina_reads_40.fasta: 0.669361 seconds
Average time for 100 iterations in illumina_reads_60.fasta: 0.727284 seconds
Average time for 100 iterations in illumina_reads_80.fasta: 0.838563 seconds


# 10'000

In [None]:
import time
import iv2py as iv

# Assume you have your reference text loaded in the 'index' variable

# List the Illumina files
illumina_files = [
    'illumina_reads_100.fasta',
    'illumina_reads_40.fasta',
    'illumina_reads_60.fasta',
    'illumina_reads_80.fasta'
]

# Number of iterations
num_iterations = 100

# Dictionary to store total time per file
total_time_per_file = {file: 0 for file in illumina_files}

for illumina_file in illumina_files:
    for _ in range(num_iterations):
        # Load Illumina file
        illumina_file_path = '/content/drive/MyDrive/data/' + illumina_file
        for record in iv.fasta.reader(file=illumina_file_path):
            # Consider the first 10,000 characters in the current sequence
            reference_text_chunk = record.seq[:10000]

            # Measure the time for searching in the FMIndex
            start_time = time.time()
            res = index.search(reference_text_chunk)
            end_time = time.time()

            # Accumulate the total time per file
            total_time_per_file[illumina_file] += end_time - start_time

# Calculate the average time per file
average_time_per_file = {file: total_time / num_iterations for file, total_time in total_time_per_file.items()}

# Print the results
for file, average_time in average_time_per_file.items():
    print(f"Average time for {num_iterations} iterations in {file}: {average_time:.6f} seconds")

Average time for 100 iterations in illumina_reads_100.fasta: 0.939037 seconds
Average time for 100 iterations in illumina_reads_40.fasta: 0.658318 seconds
Average time for 100 iterations in illumina_reads_60.fasta: 0.712422 seconds
Average time for 100 iterations in illumina_reads_80.fasta: 0.835486 seconds


# 100'000

In [None]:
import time
import iv2py as iv

# List the Illumina files
illumina_files = [
    'illumina_reads_100.fasta',
    'illumina_reads_40.fasta',
    'illumina_reads_60.fasta',
    'illumina_reads_80.fasta'
]

# Number of iterations
num_iterations = 100

# Dictionary to store total time per file
total_time_per_file = {file: 0 for file in illumina_files}

for illumina_file in illumina_files:
    for _ in range(num_iterations):
        # Load Illumina file
        illumina_file_path = '/content/drive/MyDrive/data/' + illumina_file
        for record in iv.fasta.reader(file=illumina_file_path):
            # Consider the first 100,000 characters in the current sequence
            reference_text_chunk = record.seq[:100000]

            # Measure the time for searching in the FMIndex
            start_time = time.time()
            res = index.search(reference_text_chunk)
            end_time = time.time()

            # Accumulate the total time per file
            total_time_per_file[illumina_file] += end_time - start_time

# Calculate the average time per file
average_time_per_file = {file: total_time / num_iterations for file, total_time in total_time_per_file.items()}

# Print the results
for file, average_time in average_time_per_file.items():
    print(f"Average time for {num_iterations} iterations in {file}: {average_time:.6f} seconds")

Average time for 100 iterations in illumina_reads_100.fasta: 0.974272 seconds
Average time for 100 iterations in illumina_reads_40.fasta: 0.664467 seconds
Average time for 100 iterations in illumina_reads_60.fasta: 0.700332 seconds
Average time for 100 iterations in illumina_reads_80.fasta: 0.820276 seconds


# 1'000'000

In [None]:
import time
import iv2py as iv

# Assume you have your reference text loaded in the 'index' variable

# List the Illumina files
illumina_files = [
    'illumina_reads_100.fasta',
    'illumina_reads_40.fasta',
    'illumina_reads_60.fasta',
    'illumina_reads_80.fasta'
]

# Number of iterations
num_iterations = 100

# Dictionary to store total time per file
total_time_per_file = {file: 0 for file in illumina_files}

for illumina_file in illumina_files:
    for _ in range(num_iterations):
        # Load Illumina file
        illumina_file_path = '/content/drive/MyDrive/data/' + illumina_file
        for record in iv.fasta.reader(file=illumina_file_path):
            # Consider the first 1,000,000 characters in the current sequence
            reference_text_chunk = record.seq[:1000000]

            # Measure the time for searching in the FMIndex
            start_time = time.time()
            res = index.search(reference_text_chunk)
            end_time = time.time()

            # Accumulate the total time per file
            total_time_per_file[illumina_file] += end_time - start_time

# Calculate the average time per file
average_time_per_file = {file: total_time / num_iterations for file, total_time in total_time_per_file.items()}

# Print the results
for file, average_time in average_time_per_file.items():
    print(f"Average time for {num_iterations} iterations in {file}: {average_time:.6f} seconds")

Average time for 100 iterations in illumina_reads_100.fasta: 0.904364 seconds
Average time for 100 iterations in illumina_reads_40.fasta: 0.647883 seconds
Average time for 100 iterations in illumina_reads_60.fasta: 0.747147 seconds
Average time for 100 iterations in illumina_reads_80.fasta: 0.832899 seconds


# Human Genome Files

# 1'000

In [None]:
import time
import iv2py as iv

# List the Human Genom files
genom_files = [
    'GCA_000001405.15_GRCh38_genomic.fna',
    'GCF_000001405.26_GRCh38_genomic.fna'
]

# Number of iterations
num_iterations = 100

# Dictionary to store total time per file
total_time_per_file = {file: 0 for file in genom_files}

for genom_file in genom_files:
    for _ in range(num_iterations):
        # Load Genom file
        genom_file_path = '/content/drive/MyDrive/data/' + genom_file
        for record in iv.fasta.reader(file=genom_file_path):
            # Consider the first 1,000 characters in the current sequence
            reference_text_chunk = record.seq[:1000]

            # Measure the time for searching in the FMIndex
            start_time = time.time()
            res = index.search(reference_text_chunk)
            end_time = time.time()

            # Accumulate the total time per file
            total_time_per_file[genom_file] += end_time - start_time

# Calculate the average time per file
average_time_per_file = {file: total_time / num_iterations for file, total_time in total_time_per_file.items()}

# Print the results
for file, average_time in average_time_per_file.items():
    print(f"Average time for {num_iterations} iterations in {file}: {average_time:.6f} seconds")

Average time for 100 iterations in GCA_000001405.15_GRCh38_genomic.fna: 0.150838 seconds
Average time for 100 iterations in GCF_000001405.26_GRCh38_genomic.fna: 0.157668 seconds


# 10'000

In [None]:
import time
import iv2py as iv

# List the Human Genom files
genom_files = [
    'GCA_000001405.15_GRCh38_genomic.fna',
    'GCF_000001405.26_GRCh38_genomic.fna'
]

# Number of iterations
num_iterations = 100

# Dictionary to store total time per file
total_time_per_file = {file: 0 for file in genom_files}

for genom_file in genom_files:
    for _ in range(num_iterations):
        # Load Genom file
        genom_file_path = '/content/drive/MyDrive/data/' + genom_file
        for record in iv.fasta.reader(file=genom_file_path):
            # Consider the first 10,000 characters in the current sequence
            reference_text_chunk = record.seq[:10000]

            # Measure the time for searching in the FMIndex
            start_time = time.time()
            res = index.search(reference_text_chunk)
            end_time = time.time()

            # Accumulate the total time per file
            total_time_per_file[genom_file] += end_time - start_time

# Calculate the average time per file
average_time_per_file = {file: total_time / num_iterations for file, total_time in total_time_per_file.items()}

# Print the results
for file, average_time in average_time_per_file.items():
    print(f"Average time for {num_iterations} iterations in {file}: {average_time:.6f} seconds")

Average time for 100 iterations in GCA_000001405.15_GRCh38_genomic.fna: 0.169791 seconds
Average time for 100 iterations in GCF_000001405.26_GRCh38_genomic.fna: 0.168487 seconds


# 100'000

In [None]:
import time
import iv2py as iv

# List the Human Genom files
genom_files = [
    'GCA_000001405.15_GRCh38_genomic.fna',
    'GCF_000001405.26_GRCh38_genomic.fna'
]

# Number of iterations
num_iterations = 100

# Dictionary to store total time per file
total_time_per_file = {file: 0 for file in genom_files}

for genom_file in genom_files:
    for _ in range(num_iterations):
        # Load Genom file
        genom_file_path = '/content/drive/MyDrive/data/' + genom_file
        for record in iv.fasta.reader(file=genom_file_path):
            # Consider the first 100,000 characters in the current sequence
            reference_text_chunk = record.seq[:100000]

            # Measure the time for searching in the FMIndex
            start_time = time.time()
            res = index.search(reference_text_chunk)
            end_time = time.time()

            # Accumulate the total time per file
            total_time_per_file[genom_file] += end_time - start_time

# Calculate the average time per file
average_time_per_file = {file: total_time / num_iterations for file, total_time in total_time_per_file.items()}

# Print the results
for file, average_time in average_time_per_file.items():
    print(f"Average time for {num_iterations} iterations in {file}: {average_time:.6f} seconds")

Average time for 100 iterations in GCA_000001405.15_GRCh38_genomic.fna: 0.205397 seconds
Average time for 100 iterations in GCF_000001405.26_GRCh38_genomic.fna: 0.205804 seconds


# 1'000'000

In [None]:
import time
import iv2py as iv

# List the Human Genom files
genom_files = [
    'GCA_000001405.15_GRCh38_genomic.fna',
    'GCF_000001405.26_GRCh38_genomic.fna'
]

# Number of iterations
num_iterations = 100

# Dictionary to store total time per file
total_time_per_file = {file: 0 for file in genom_files}

for genom_file in genom_files:
    for _ in range(num_iterations):
        # Load Genom file
        genom_file_path = '/content/drive/MyDrive/data/' + genom_file
        for record in iv.fasta.reader(file=genom_file_path):
            # Consider the first 1,000,000 characters in the current sequence
            reference_text_chunk = record.seq[:1000000]

            # Measure the time for searching in the FMIndex
            start_time = time.time()
            res = index.search(reference_text_chunk)
            end_time = time.time()

            # Accumulate the total time per file
            total_time_per_file[genom_file] += end_time - start_time

# Calculate the average time per file
average_time_per_file = {file: total_time / num_iterations for file, total_time in total_time_per_file.items()}

# Print the results
for file, average_time in average_time_per_file.items():
    print(f"Average time for {num_iterations} iterations in {file}: {average_time:.6f} seconds")

Average time for 100 iterations in GCA_000001405.15_GRCh38_genomic.fna: 0.302196 seconds
Average time for 100 iterations in GCF_000001405.26_GRCh38_genomic.fna: 0.300900 seconds


# FM Index Pigeon Hole Search

# 1'000

In [None]:
import time
import iv2py as iv
import os

def k_mismatch_search(index, query, k):
    """
    Perform k-mismatch search using FMIndex.
    """
    result_positions = set()

    # Iterate through all possible k-mismatch substrings
    for i in range(k + 1):
        mismatch_positions = index.search(query)
        mismatch_positions = [(pos, errors) for pos, errors in mismatch_positions if errors <= i]

        # Add the positions to the result set
        result_positions.update(mismatch_positions)

    return list(result_positions)

def hamming_distance(s1, s2):
    """
    Calculate the Hamming distance between two strings of equal length.
    """
    return sum(c1 != c2 for c1, c2 in zip(s1, s2))

def verify_search_results_all(reference, query, search_results, k):
    """
    Verify the accuracy of k-mismatch search results using Hamming distance for all sequences.
    """
    query_length = len(query)

    # Check if each position in the search results has Hamming distance within k of the query
    for result_position, errors in search_results:
        for i in range(len(reference)):
            substring = reference[i][result_position:result_position + query_length]

            if hamming_distance(query, substring) > k:
                print(f"Error: Position {result_position} does not have Hamming distance within {k} of the query for sequence {i}.")

# List the Human Genome files
genome_files = [
    'GCA_000001405.15_GRCh38_genomic.fna',
    'GCF_000001405.26_GRCh38_genomic.fna'
]

# Number of iterations
num_iterations = 100

# Dictionary to store total time per file and k value
total_time_per_file_k = {file: {k: 0 for k in range(3)} for file in genome_files}

# Load FM-index or create if it doesn't exist
if os.path.exists("genome.fmindex"):
    index = iv.fmindex(path="genome.fmindex")
else:
    reference_sequences = []
    for genome_file in genome_files:
        genome_file_path = '/content/drive/MyDrive/data/' + genome_file
        for record in iv.fasta.reader(file=genome_file_path):
            reference_sequences.append(iv.normalize(record.seq))

    index = iv.fmindex(reference=reference_sequences, samplingRate=16)
    index.save("genome.fmindex")

# Perform k-mismatch search for k = 0, 1, 2
for k in range(3):
    for genome_file in genome_files:
        for _ in range(num_iterations):
            # Load Genome file
            genome_file_path = '/content/drive/MyDrive/data/' + genome_file
            for i, record in enumerate(iv.fasta.reader(file=genome_file_path)):
                # Consider the first 1,000 characters in the current sequence
                query_sequence = record.seq[:1000]

                # Measure the time for searching in the FMIndex with k-mismatch
                start_time = time.time()
                search_results = k_mismatch_search(index, query_sequence, k)
                end_time = time.time()

                # Accumulate the total time per file and k value
                total_time_per_file_k[genome_file][k] += end_time - start_time

                # Verify the search results using Hamming distance for all sequences
                verify_search_results_all(reference_sequences, query_sequence, search_results, k)

# Calculate the average time per file and k value
average_time_per_file_k = {file: {k: total_time / num_iterations for k, total_time in time_per_k.items()} for file, time_per_k in total_time_per_file_k.items()}

# Print the results
for file, time_per_k in average_time_per_file_k.items():
    for k, average_time in time_per_k.items():
        print(f"Average time for {num_iterations} iterations in {file} with k={k}: {average_time:.6f} seconds")


Average time for 100 iterations in GCA_000001405.15_GRCh38_genomic.fna with k=0: 0.156519 seconds
Average time for 100 iterations in GCA_000001405.15_GRCh38_genomic.fna with k=1: 0.158613 seconds
Average time for 100 iterations in GCA_000001405.15_GRCh38_genomic.fna with k=2: 0.167433 seconds
Average time for 100 iterations in GCF_000001405.26_GRCh38_genomic.fna with k=0: 0.154599 seconds
Average time for 100 iterations in GCF_000001405.26_GRCh38_genomic.fna with k=1: 0.160970 seconds
Average time for 100 iterations in GCF_000001405.26_GRCh38_genomic.fna with k=2: 0.160253 seconds


In [None]:
import time
import iv2py as iv
import os

def k_mismatch_search(index, query, k):
    """
    Perform k-mismatch search using FMIndex.
    """
    result_positions = set()

    # Iterate through all possible k-mismatch substrings
    for i in range(k + 1):
        mismatch_positions = index.search(query)
        mismatch_positions = [(pos, errors) for pos, errors in mismatch_positions if errors <= i]

        # Add the positions to the result set
        result_positions.update(mismatch_positions)

    return list(result_positions)

def hamming_distance(s1, s2):
    """
    Calculate the Hamming distance between two strings of equal length.
    """
    return sum(c1 != c2 for c1, c2 in zip(s1, s2))

def verify_search_results_all(reference, query, search_results, k):
    """
    Verify the accuracy of k-mismatch search results using Hamming distance for all sequences.
    """
    query_length = len(query)

    # Check if each position in the search results has Hamming distance within k of the query
    for result_position, errors in search_results:
        for i in range(len(reference)):
            substring = reference[i][result_position:result_position + query_length]

            if hamming_distance(query, substring) > k:
              pass
                #print(f"Position {result_position} does not have Hamming distance within {k} of the query for sequence {i}.")

# List the Human Genome files
genome_files = [
    'illumina_reads_40.fasta',
    'illumina_reads_60.fasta'
]

# Number of iterations
num_iterations = 100

# Dictionary to store total time per file and k value
total_time_per_file_k = {file: {k: 0 for k in range(3)} for file in genome_files}

# Load FM-index or create if it doesn't exist
if os.path.exists("genome.fmindex"):
    index = iv.fmindex(path="genome.fmindex")
else:
    reference_sequences = []
    for genome_file in genome_files:
        genome_file_path = '/content/drive/MyDrive/data/' + genome_file
        for record in iv.fasta.reader(file=genome_file_path):
            reference_sequences.append(iv.normalize(record.seq))

    index = iv.fmindex(reference=reference_sequences, samplingRate=16)
    index.save("genome.fmindex")

# Perform k-mismatch search for k = 0, 1, 2
for k in range(3):
    for genome_file in genome_files:
        for _ in range(num_iterations):
            # Load Genome file
            genome_file_path = '/content/drive/MyDrive/data/' + genome_file
            for i, record in enumerate(iv.fasta.reader(file=genome_file_path)):
                # Consider the first 1,000 characters in the current sequence
                query_sequence = record.seq[:1000]

                # Measure the time for searching in the FMIndex with k-mismatch
                start_time = time.time()
                search_results = k_mismatch_search(index, query_sequence, k)
                end_time = time.time()

                # Accumulate the total time per file and k value
                total_time_per_file_k[genome_file][k] += end_time - start_time

                # Verify the search results using Hamming distance for all sequences
                verify_search_results_all(reference_sequences, query_sequence, search_results, k)

# Calculate the average time per file and k value
average_time_per_file_k = {file: {k: total_time / num_iterations for k, total_time in time_per_k.items()} for file, time_per_k in total_time_per_file_k.items()}

# Print the results
for file, time_per_k in average_time_per_file_k.items():
    for k, average_time in time_per_k.items():
        print(f"Average time for {num_iterations} iterations in {file} with k={k}: {average_time:.6f} seconds")