In [None]:
# Import necessary modules
import os
import re
import pandas as pd
from Bio import SeqIO
import time
import subprocess

In [None]:
# Function to extract a DNA sequence from a specified chromosome and range
def extract_dna_sequence(chromosome, start, end):
    fasta_file = 'CHM13.fasta'  # Name of the FASTA file containing the sequences
    for record in SeqIO.parse(fasta_file, "fasta"):
        if record.id == chromosome:
            return str(record.seq[start:end])
    # Raise an error if the chromosome is not found
    raise ValueError(f"Chromosome {chromosome} not found in the FASTA file.")

# Function to write sequences to a FASTA file
def write_fasta(file_name, sequences):
    with open(file_name, "w") as fasta_file:
        # Iterate over each sequence ID and sequence pair
        for seq_id, sequence in sequences.items():
            # Write the sequence to the file in FASTA format
            fasta_file.write(f">{seq_id}\n{sequence}\n")

In [None]:
# Read the BED file containing sequence data into a DataFrame
seqDF = pd.read_csv('CHM13_centric_transition.bed', sep='\t')
#This is an example. Change the path according to your system

In [None]:
file_path = "./input/"
# Initialize an empty DataFrame to store status information
statusDF = pd.DataFrame(
    {
        "seq_id": [],
        "region": [],
        "file": [],
        "status": [],
    }
)

# Iterate over each row in the sequence DataFrame
for index in range(0, len(seqDF)):
    start = seqDF["Start"][index]
    end = seqDF["End"][index]
    chromosome = seqDF["Chromosome"][index]
    region = seqDF["Region"][index]
    sequence_id = f"{chromosome}_{region}__{start}_{end}" # Generate a unique sequence ID
    # Extract the DNA sequence using the provided start, end, and chromosome
    sequence = extract_dna_sequence(chromosome=chromosome, start=start, end=end)
    sequences = {sequence_id: sequence}  # Create a dictionary with the sequence

    # Define the output FASTA file name
    seq_filename = f"{sequence_id}.fasta"
    # Write the extracted sequence to a FASTA file
    write_fasta(file_path + seq_filename, sequences)

    # Create a new row in the status DataFrame with the sequence information
    new_row = pd.DataFrame(
        {
            "seq_id": [sequence_id],
            "region": [region],
            "file": [seq_filename],
        }
    )

    statusDF = pd.concat([statusDF, new_row], ignore_index=True)

In [None]:
# Save the status DataFrame to a CSV file
statusDF.to_csv('status_palindrome.csv', index=False)

Running EMBOSS Palindrome

In [None]:
# Function to construct the palindrome command for a given input file
def palindrome_command(input_file: str):
    command = f"palindrome -sequence {input_file} -minpallen 5 -maxpallen 100 -gaplimit 20 -nummismatches 0 -overlap"
    return command

# Function to calculate the difference between two numbers in a string
def get_difference(input_string):
    first_double_underscore = input_string.index("__")  # Find the first double underscore
    next_underscore = input_string.index("_", first_double_underscore + 2)  # Find the next underscore
    first_number_str = input_string[first_double_underscore + 2:next_underscore]  # Extract the first number
    second_number_str = input_string[next_underscore + 1:]  # Extract the second number
    first_number = int(first_number_str)
    second_number = int(second_number_str)
    difference = second_number - first_number
    return difference

In [None]:
startIN = 0
endIN = len(statusDF)
no = endIN - startIN
# Change the current working directory to the output directory
os.chdir(f"./output/")

total_seq_length = 0
# Calculate the total sequence length from the seq_id column in the status DataFrame
for seq_str in statusDF['seq_id']:
    total_seq_length += get_difference(seq_str)

for index in range(startIN, endIN):
    # Open the log file in append mode
    with open("./log.txt", 'a') as file:
        # Write the start of the process to the log file
        file.write(str(f"{index}: {statusDF['seq_id'][index]}: {time.asctime()} [BEGIN]") + '\n')
    
    region = statusDF['region'][index]
    filename = statusDF["file"][index]
    input_seq_file = f"../input/{filename}" 

    # Generate the palindrome command and add the output file argument
    fc = palindrome_command(input_seq_file) + f" -outfile {filename.replace('.fasta', '.palindrome')}"

    # Execute the palindrome command as a subprocess
    process = subprocess.Popen(fc, shell=True)
    process.wait()  # Wait for the process to complete

    if process.returncode == 0:  # Check if the process completed successfully
        print(index, statusDF['seq_id'][index], time.asctime())
        # Write the end of the process to the log file
        with open("./log.txt", 'a') as file:
            file.write(str(f"{index}: {statusDF['seq_id'][index]}: {time.asctime()} [END]") + '\n')
    else:
        print(index, statusDF['seq_id'][index], time.asctime() + " Command failed with return code:", process.returncode)

Reading the output

In [None]:
# Function to extract two numbers from a string using regex
def extract_numbers(input_str):
    pattern = r'(\d+).*?(\d+)'
    match = re.match(pattern, input_str)
    if match:
        numbers = (int(match.group(1)), int(match.group(2)))
        return numbers
    else:
        return None

# Function to extract sequence length from a string using regex
def extract_sequence_length(input_str):
    pattern = r'Sequence length is: (\d+)'
    match = re.search(pattern, input_str)
    if match:
        sequence_length = int(match.group(1))
        return sequence_length
    else:
        return None

In [None]:
master = pd.read_csv('CHM13_centric_transition.bed', sep='\t')
master['ds_occupancy'] = ''
master['ds_nucleotides'] = ''
master['sequence_length'] = ''
master['palindrome_count'] = ''

# Iterate over each index in the status DataFrame
for ind in range(0, len(statusDF)):
    filepath = f"{statusDF['seq_id'][ind]}.palindrome"  # Define the palindrome file path
    lines = []
    # Read the contents of the palindrome file
    with open(filepath, 'r') as file:
        for line in file:
            lines.append(line.strip())

    total_palindromes = int((len(lines) - 3 - 12) / 4)  # Calculate the total number of palindromes
    sequence_length = extract_sequence_length(lines[1])  # Extract the sequence length

    palrecords = []
    for index in range(0, total_palindromes):
        palstr = lines[12 + (4 * index)]  # Get the palindrome string
        palrecords.append(extract_numbers(palstr))  # Extract the palindrome numbers

    bp_count = 0
    # Calculate the total base pair count from the palindrome records
    for pal in palrecords:
        bp_count += (pal[1] - pal[0] + 1) * 2

    occupation_percent = bp_count / sequence_length  # Calculate the ds occupancy

    # Update the master DataFrame with the calculated values
    master.at[ind, 'palindrome_count'] = total_palindromes
    master.at[ind, 'ds_nucleotides'] = bp_count
    master.at[ind, 'sequence_length'] = sequence_length
    master.at[ind, 'ds_occupancy'] = occupation_percent

In [None]:
# Save the master DataFrame to a CSV file
master.to_csv('Dyad_density_output.csv', index=False)