# Python-based automated pipeline for classifying the regulatory roles of each curated peak from ChIP-exo datasets

## Protocol A.4 - Classification of Regulatory Roles in Each Curated Peak

## Step 1. Set-up

### Install and import libraries
Install the packages using `pip`.
If you want to install the library manually, type `pip3 install package_name` in terminal.

In [None]:
import os, importlib, re

libraries = { 'pandas': 'pd' }

for lib, package_name in libraries.items():
    try:
        importlib.import_module(lib)
        print(f"{lib} is installed.")
    except ImportError:
        print(f"{lib} is not installed. Installing {lib}...")
        os.system(f'pip3 install {package_name}')

pandas is installed.


In [None]:
import pandas as pd

### Function
Please run the cell without modification

In [None]:
def extract_locus_tag_number(locus_tag):
    match = re.search(r'\d+', locus_tag)
    if match:
        return int(match.group())
    else:
        return None

def read_chip_exo_peaks(peak_file):
    print("Reading ChIP-exo peak calling data...")
    peaks = pd.read_csv(peak_file, sep='\t', header=None, names=[
        'chrom', 'source', 'feature', 'start', 'end', 'score', 'strand', 'frame', 'attribute'])
    peaks['midpoint'] = ((peaks['start'] + peaks['end']) / 2).round().astype(int)
    peaks['Peak_name'] = peaks['attribute'].apply(lambda x: x.strip('"').split(',')[0])
    print("ChIP-exo peak data successfully read.")
    return peaks

def extract_locus_tags(gff_file):
    locus_tags = []
    try:
        print("Reading reference genome annotation data...")
        with open(gff_file, 'r') as file:
            for line in file:
                if line.startswith('#'):
                    continue
                parts = line.strip().split('\t')
                if len(parts) < 9:
                    continue
                attributes_str = parts[8]
                attributes = extract_attributes(attributes_str)
                locus_tag = attributes.get('locus_tag', '')
                gene_type = attributes.get('type','')
                if locus_tag:
                    locus_tags.append({
                        'chrom': parts[0],
                        'start': int(parts[3]),
                        'end': int(parts[4]),
                        'strand': parts[6],
                        'locus_tag': locus_tag,
                        'gene_type': gene_type
                    })
        print("Reference genome annotation data successfully read.")
    except Exception as e:
        print(f"Error reading GFF file: {e}")
    return pd.DataFrame(locus_tags)

def extract_attributes(attributes_str):
    attributes = {}
    try:
        for attribute in attributes_str.split(';'):
            if '=' in attribute:
                key, value = attribute.split('=')
                attributes[key] = value
    except Exception as e:
        print(f"Error extracting attributes: {e}")
    return attributes

def read_transcription_units(tu_file):
    try:
        print("Reading transcription unit data...")
        transcription_units = pd.read_csv(tu_file, sep='\t', header=None, names=['TU'])
        transcription_units['TU'] = transcription_units['TU'].apply(sort_tu)
        print("Transcription unit data successfully read and sorted.")
        return transcription_units
    except Exception as e:
        print(f"Error reading TU file: {e}")
        return pd.DataFrame()

def sort_tu(tu):
    try:
        tu_list = tu.split(',')
        tu_list_sorted = sorted(tu_list)
        return ','.join(tu_list_sorted)
    except Exception as e:
        print(f"Error sorting TU: {e}")
        return tu

def select_largest_tu(regulatory_genes, transcription_units):
    largest_tu = ''
    for tu in transcription_units['TU']:
        tu_list = tu.split(',')
        if any(gene in tu_list for gene in regulatory_genes):
            current_tu = ','.join(tu_list)
            if len(current_tu.split(',')) > len(largest_tu.split(',')):
                largest_tu = current_tu
    return largest_tu

def estimate_promoter(annotation, gene_to_promoter =300):
    if annotation['strand'] == '+':
        promoter_start = max(0, annotation['start'] - gene_to_promoter )
        promoter_end = annotation['start']
    else:
        promoter_start = annotation['end']
        promoter_end = annotation['end'] + gene_to_promoter
    return promoter_start, promoter_end

def calculate_real_tu_by_position(regulatory_genes, tu, strand, regulatory_positions, annotations, filter_cds=True):
    tu_list = tu.split(',')
    real_tu = []
    regulatory_positions_list = [regulatory_positions.get(gene) for gene in regulatory_genes if regulatory_positions.get(gene) is not None]
    if not regulatory_positions_list:
        return ''
    if filter_cds:
        tu_list = [gene for gene in tu_list if not annotations.loc[annotations['locus_tag'] == gene, 'gene_type'].empty and
                   annotations.loc[annotations['locus_tag'] == gene, 'gene_type'].values[0] == 'CDS']
    if strand == '+':
        min_reg_position = min(regulatory_positions_list)
        real_tu = [gene for gene in tu_list if regulatory_positions.get(gene) and regulatory_positions[gene] >= min_reg_position]
    elif strand == '-':
        max_reg_position = max(regulatory_positions_list)
        real_tu = [gene for gene in tu_list if regulatory_positions.get(gene) and regulatory_positions[gene] <= max_reg_position]
    return ','.join(real_tu)

def find_regulatory_roles(peaks, annotations, transcription_units=None, gene_to_promoter =300, filter_cds=True):
    print("Starting regulatory role analysis...")
    regulatory_roles = []
    use_tu = transcription_units is not None
    regulatory_positions = annotations.set_index('locus_tag')[['start', 'end']].mean(axis=1).to_dict()
    for index, peak in peaks.iterrows():
        if index % 100 == 0:
            print(f"Processing peak {index + 1}/{len(peaks)}...")
        matching_annotations = annotations[annotations['chrom'] == peak['chrom']]
        matching_annotations['promoter_start'], matching_annotations['promoter_end'] = zip(
            *matching_annotations.apply(lambda row: estimate_promoter(row, gene_to_promoter ), axis=1)
        )
        regulatory_genes_plus = matching_annotations[
            (matching_annotations['promoter_start'] <= peak['midpoint']) &
            (matching_annotations['promoter_end'] >= peak['midpoint']) &
            (matching_annotations['strand'] == '+')
        ]['locus_tag'].tolist()
        regulatory_genes_minus = matching_annotations[
            (matching_annotations['promoter_start'] <= peak['midpoint']) &
            (matching_annotations['promoter_end'] >= peak['midpoint']) &
            (matching_annotations['strand'] == '-')
        ]['locus_tag'].tolist()

        tu_plus = ''
        real_tu_plus = ''
        if use_tu:
            tu_plus = select_largest_tu(regulatory_genes_plus, transcription_units)
            real_tu_plus = calculate_real_tu_by_position(
                regulatory_genes_plus, tu_plus, '+', regulatory_positions, annotations, filter_cds) if tu_plus else ''
        else:
            tu_plus = ','.join(regulatory_genes_plus)
            real_tu_plus = calculate_real_tu_by_position(
                regulatory_genes_plus, tu_plus, '+', regulatory_positions, annotations, filter_cds) if tu_plus else ''

        tu_minus = ''
        real_tu_minus = ''
        if use_tu:
            tu_minus = select_largest_tu(regulatory_genes_minus, transcription_units)
            real_tu_minus = calculate_real_tu_by_position(
                regulatory_genes_minus, tu_minus, '-', regulatory_positions, annotations, filter_cds) if tu_minus else ''
        else:
            tu_minus = ','.join(regulatory_genes_minus)
            real_tu_minus = calculate_real_tu_by_position(
                regulatory_genes_minus, tu_minus, '-', regulatory_positions, annotations, filter_cds) if tu_minus else ''

        regulatory_roles.append({
            'Start': peak['start'],
            'End': peak['end'],
            'Peak_name': peak['Peak_name'],
            'Regulatory': '0' if regulatory_genes_plus or regulatory_genes_minus else 'x',
            'Bidirection': '0' if regulatory_genes_plus and regulatory_genes_minus else 'x',
            'TU_plus_strand': tu_plus,
            'Real_TU_plus_strand': real_tu_plus,
            'TU_minus_strand': tu_minus,
            'Real_TU_minus_strand': real_tu_minus
        })
    print("Regulatory role analysis complete.")
    return pd.DataFrame(regulatory_roles)

def save_regulatory_roles(regulatory_roles, output_file):
    try:
        print(f"Saving results to {output_file}...")
        regulatory_roles.to_csv(output_file, sep='\t', index=False)
        print(f"Results successfully saved to {output_file}.")
    except Exception as e:
        print(f"Error saving regulatory roles: {e}")

def Defining_regulatory_roles_of_peaks(working_directroy, peak_file, gff_file, output_file, tu_file=None, gene_to_promoter =300, filter_cds=True):
    os.chdir(working_directory)
    print(f'Current working directiory : {os.getcwd()}')

    peaks = read_chip_exo_peaks(peak_file)
    annotations = extract_locus_tags(gff_file)
    transcription_units = read_transcription_units(tu_file) if tu_file else None
    regulatory_roles = find_regulatory_roles(peaks, annotations, transcription_units, gene_to_promoter , filter_cds)
    save_regulatory_roles(regulatory_roles, output_file)
    print("Processing completed")

### (Optional) Google Drive Mount

If you are using Google Colaboratory, mount Google Drive to access files stored in your Google Drive.

In [None]:
from google.colab import drive, files

drive.mount('/content/mnt')

Drive already mounted at /content/mnt; to attempt to forcibly remount, call drive.mount("/content/mnt", force_remount=True).


## Step 2. Determine the appropriate distance range between gene and promoter

Set the appropriate distance range between gene and promoter in interested strain. If the value is hard to determine, then try different values and compare with them.

In [None]:
gene_to_promoter = 300  ## Enter the value (Default : 300)

## Step 3. Run the automated pipeline

### Define the regulatory role and target genes of peaks

Enter the information of files.
* **working_directory** : Enter your working directory.
* **peak_file** : Enter the name of visualized cruated peak file (.gff) from Protocol A.3.
* **gff_file** : Enter the name of visualized reference genome file (.gff) from ChEAP.
* **output_file** : Enter the name of result file.
* **tu_file** : (optional) Enter the directory of the file (.tsv) containing the transcription units. If you don't have the TU file, then set to `None`. (Default : `None`)
* **filter_cds** : Set to `True` if you want to include only CDS genes in the output file. (Default : `True`)

In [None]:
working_directory =  '/content/mnt/My Drive/Test/ChIP-exo' ##'/home/user/example'
peak_file = 'Curated_peak.gff'
gff_file = 'reference_NC_000913.3.gff'
output_file = 'output.tsv'
tu_file = 'MG1655_tu_info.tsv'
filter_cds = True

In [None]:
Defining_regulatory_roles_of_peaks( working_directory, peak_file , gff_file, output_file, tu_file, gene_to_promoter , filter_cds )

Current working directiory : /content/mnt/My Drive/Test/ChIP-exo
Reading ChIP-exo peak calling data...
ChIP-exo peak data successfully read.
Reading reference genome annotation data...
Reference genome annotation data successfully read.
Reading transcription unit data...
Transcription unit data successfully read and sorted.
Starting regulatory role analysis...
Processing peak 1/1566...
Processing peak 101/1566...
Processing peak 201/1566...
Processing peak 301/1566...
Processing peak 401/1566...
Processing peak 501/1566...
Processing peak 601/1566...
Processing peak 701/1566...
Processing peak 801/1566...
Processing peak 901/1566...
Processing peak 1001/1566...
Processing peak 1101/1566...
Processing peak 1201/1566...
Processing peak 1301/1566...
Processing peak 1401/1566...
Processing peak 1501/1566...
Regulatory role analysis complete.
Saving results to output.tsv...
Results successfully saved to output.tsv.
Processing completed
