# Terminal Nucleotide Bias

This notebook aims to explore and compare the metrics developed to characterize the degree of terminal nucleotide bias in Ribo-Seq data. 

## Setup 

### Import Packages 

In [138]:
import numpy as np
import pandas as pd
import random
from RiboMetric.metrics import (
    terminal_nucleotide_bias_KL_metric,
    terminal_nucleotide_bias_max_absolute_metric,

    )

# from RiboMetric.plots import (
#     plot_terminal_nucleotide_bias_distribution,
#     plot_terminal_nucleotide_bias_max_proportion,
#     )

### Setup Simulated Dinucleotide Frequencies

The simulated data is generated by varying the number of favored dinucleotides and the bias degree, calculating observed dinucleotide frequencies based on an exponential bias function, and normalizing them to ensure they sum up to 1. This process is repeated across a range of parameters to capture different scenarios of dinucleotide frequency biases at read termini.

In [139]:
def generate_simulated_dinucs(nucleotide_freqs={'A': 0.25, 'C': 0.25, 'G': 0.25, 'T': 0.25}):
    """
    Generate simulated observed dinucleotide frequencies at the read termini.
    
    Returns:
        list: List of dictionaries representing simulated data.
    """
    expected_dinuc_freqs = {}
    for n1 in ['A', 'C', 'G', 'T']:
        for n2 in ['A', 'C', 'G', 'T']:
            dinuc = n1 + n2
            expected_dinuc_freqs[dinuc] = nucleotide_freqs[n1] * nucleotide_freqs[n2]

    def bias_function(x, bias_degree, favored_dinucs):
        if x in favored_dinucs:
            return np.exp(bias_degree)
        else:
            return np.exp(-bias_degree)

    simulated_data = []
    for num_favored in range(8, 0, -1):
        for bias_degree in np.linspace(0, 3, 4):
            favored_dinucs = random.sample(list(expected_dinuc_freqs.keys()), num_favored)
            observed_dinuc_freqs = {}
            for dinuc, freq in expected_dinuc_freqs.items():
                bias = bias_function(dinuc, bias_degree, favored_dinucs)
                # observed_dinuc_freqs[dinuc] = freq * np.exp(bias)
                observed_dinuc_freqs[dinuc] = freq * bias

            total_freq = sum(observed_dinuc_freqs.values())
            for dinuc in observed_dinuc_freqs:
                observed_dinuc_freqs[dinuc] /= total_freq

            row = {
                'num_favored': num_favored,
                'bias_degree': bias_degree,
                'favored_dinucs': favored_dinucs,
                'observed_termini': observed_dinuc_freqs,
                'expected_termini': expected_dinuc_freqs,
                'read_body': expected_dinuc_freqs
            }
            simulated_data.append(row)
    
    return pd.DataFrame(simulated_data)


## Explore Simulated Data

In [140]:
simulated_data_equal_nuc_freqs = generate_simulated_dinucs(nucleotide_freqs={'A': 0.25, 'C': 0.25, 'G': 0.25, 'T': 0.25})

In [141]:
import plotly.graph_objects as go

def plot_freqs(simulated_data, num_favoured, bias_degree):
    row = simulated_data[
        (simulated_data['num_favored'] == num_favoured) &
        (simulated_data['bias_degree'] == bias_degree)
    ].iloc[0]

    diffs = []
    for dinuc in row['observed_termini']:
        print(row['observed_termini'][dinuc], row['expected_termini'][dinuc])
        diffs.append(row['observed_termini'][dinuc] - row['expected_termini'][dinuc])
    print(diffs)
    fig = go.Figure()
    fig.add_trace(go.Bar(x=list(row['observed_termini'].keys()), y=diffs))
    # y ais -1 to 1 
    fig.update_layout(
        title=f"num_favored={num_favoured}, bias_degree={bias_degree}",
        yaxis=dict(range=[-1, 1])
    )
    fig.show()

In [142]:
plot_freqs(simulated_data_equal_nuc_freqs, 3, 3.0)

0.0008174700674825479 0.0625
0.0008174700674825479 0.0625
0.32979096304090905 0.0625
0.0008174700674825479 0.0625
0.0008174700674825479 0.0625
0.0008174700674825479 0.0625
0.0008174700674825479 0.0625
0.0008174700674825479 0.0625
0.0008174700674825479 0.0625
0.32979096304090905 0.0625
0.0008174700674825479 0.0625
0.0008174700674825479 0.0625
0.0008174700674825479 0.0625
0.0008174700674825479 0.0625
0.0008174700674825479 0.0625
0.32979096304090905 0.0625
[-0.06168252993251745, -0.06168252993251745, 0.26729096304090905, -0.06168252993251745, -0.06168252993251745, -0.06168252993251745, -0.06168252993251745, -0.06168252993251745, -0.06168252993251745, 0.26729096304090905, -0.06168252993251745, -0.06168252993251745, -0.06168252993251745, -0.06168252993251745, -0.06168252993251745, 0.26729096304090905]


In [143]:
def calculate_wasserstein_distance(observed, expected):
    """
    Calculate the Wasserstein distance between two probability distributions.
    
    Args:
        observed dict: The observed dinucleotide frequencies.
        expected dict: The expected dinucleotide frequencies.

    Returns:
        float: The Wasserstein distance between the observed and expected distributions.
    """
    observed = np.array(list(observed.values()))
    expected = np.array(list(expected.values()))

    return np.sum(np.abs(np.cumsum(observed) - np.cumsum(expected)))

In [144]:
def fetch_metrics(simualted_data):
    '''
    Calculate the two different metrics for the simulated data.

    Input:
        simualted_data: DataFrame, the simulated data.

    Returns:
        DataFrame: The DataFrame with the two metrics.
    '''
    metrics = []
    for _, row in simualted_data.iterrows():

        expected_freq = {
            dinuc: freq for dinuc, freq in row['expected_termini'].items()
        }
        observed_freq = { 
            'five_prime': {
                dinuc: freq for dinuc, freq in row['observed_termini'].items()
            }
        }
        metrics.append({
            'num_favored': row['num_favored'],
            'bias_degree': row['bias_degree'],
            'terminal_nucleotide_bias_KL_metric_5_prime': terminal_nucleotide_bias_KL_metric(observed_freq, expected_freq, prime="five_prime"),
            # 'terminal_nucleotide_bias_KL_metric_3_prime': terminal_nucleotide_bias_KL_metric(observed_freq, expected_freq, prime="three_prime"),
            'terminal_nucleotide_bias_max_absolute_metric_5_prime': terminal_nucleotide_bias_max_absolute_metric(observed_freq, expected_freq, prime="five_prime"),
            # 'terminal_nucleotide_bias_max_absolute_metric_3_prime': terminal_nucleotide_bias_max_absolute_metric(observed_freq, expected_freq, prime="three_prime"),

            'wasserstein_distance': calculate_wasserstein_distance(observed_freq['five_prime'], expected_freq),
        })
    return pd.DataFrame(metrics)

In [145]:
metrics = fetch_metrics(simulated_data_equal_nuc_freqs)

In [146]:
metrics['terminal_nucleotide_bias_KL_metric_5_prime'] = metrics['terminal_nucleotide_bias_KL_metric_5_prime'] * -1

In [147]:
# plot the two metrics sorted by the bias degree
import plotly.graph_objects as go

fig = go.Figure()

for metric_name in ['terminal_nucleotide_bias_KL_metric_5_prime', 'terminal_nucleotide_bias_max_absolute_metric_5_prime', 'wasserstein_distance']:
    fig.add_trace(go.Scatter(
        x=metrics['bias_degree'],
        y=metrics[metric_name],
        mode='lines',
        name=metric_name,
        text=[f"{bias_degree}_{num_favored}" for bias_degree, num_favored in zip(metrics['bias_degree'], metrics['num_favored'])]
    ))

fig.update_layout(
    title='Simulated data with equal nucleotide frequencies',
    xaxis_title='Bias degree',
    yaxis_title='Metric value',
    legend_title='Metric'
)

fig.show()

In [148]:
# plot scatter of the two metrics
fig = go.Figure()

fig.add_trace(go.Scatter(
    x=metrics['terminal_nucleotide_bias_KL_metric_5_prime'],
    y=metrics['terminal_nucleotide_bias_max_absolute_metric_5_prime'],
    mode='markers',
    name='Metrics',
    marker=dict(
        size=10,
        color=metrics['bias_degree'],
        colorscale='Viridis',
        showscale=True
    ),
    text=[f"{bias_degree}_{num_favored}" for bias_degree, num_favored in zip(metrics['bias_degree'], metrics['num_favored'])]
))

fig.update_layout(
    title='Simulated data with equal nucleotide frequencies',
    xaxis_title='Terminal nucleotide bias distribution',
    yaxis_title='Terminal nucleotide bias max proportion',
)

fig.show()

In [149]:
from sklearn.preprocessing import MinMaxScaler, MaxAbsScaler

scaler = MinMaxScaler()

metrics['Kullback–Leibler '] = scaler.fit_transform(metrics['terminal_nucleotide_bias_KL_metric_5_prime'].values.reshape(-1, 1))
metrics['Max-Absolute']= scaler.fit_transform(metrics['terminal_nucleotide_bias_max_absolute_metric_5_prime'].values.reshape(-1, 1))
metrics['wasserstein_distance'] = scaler.fit_transform(metrics['wasserstein_distance'].values.reshape(-1, 1))

In [150]:
# plot grouped bar chart of the two metrics sorted by degree of bias and number of favored dinucleotides
metrics['bias_degree'] = metrics['bias_degree'].round(2)
metrics['num_favored'] = metrics['num_favored'].astype(str)

fig = go.Figure()

subset_metrics = metrics[
    metrics['num_favored'].isin(['1', '3', '5', '7'])
]
for metric_name in ['Kullback–Leibler ', 'Max-Absolute']: #, 'wasserstein_distance']:
    fig.add_trace(go.Bar(
        x=subset_metrics['num_favored'] + ' di-nt, ' + subset_metrics['bias_degree'].astype(str) + ' bias',
        y=subset_metrics[metric_name],
        name=metric_name
    ))

fig.update_layout(
    yaxis_title='Metric value',
    barmode='group',
    font=dict(
        family="Arial",  # Specify the font family
        size=22,         # Specify the font size
        color="black"    # Specify the font color
    )
)

fig.show()

The max proportion metric struggles to identify biases when the number of favoured nucleotides is high but works quite well when it is 1 favoured and kind of for 2. KL divergence also works better when the number of favoured dinucs is lower but correlates with degree of bias for all combinations. 

In [151]:
def kl_divergence(p, q):
    """
    Calculate the Kullback-Leibler (KL) divergence between two probability distributions.
    
    Args:
        p (numpy.array): The observed probability distribution.
        q (numpy.array): The reference probability distribution.
    
    Returns:
        float: The KL divergence between p and q.
    """
    return np.sum(p * np.log(p / q))

def calculate_max_kl_divergence(num_dinucleotides):
    """
    Calculate the maximum possible KL divergence for a given number of dinucleotides.
    
    Args:
        num_dinucleotides (int): The number of dinucleotides.
    
    Returns:
        float: The maximum possible KL divergence.
    """
    q = np.full(num_dinucleotides, 1 / num_dinucleotides)  # Uniform distribution as reference
    print(q)
    
    # Maximum KL divergence occurs when one dinucleotide has probability 1 and others have 0
    p = np.zeros(num_dinucleotides)
    p[0] = 1
    
    max_kl_divergence = kl_divergence(p, q)
    
    return max_kl_divergence

# Example usage:
num_dinucleotides = 16
max_kl_divergence = calculate_max_kl_divergence(num_dinucleotides)
print("Maximum possible KL divergence:", max_kl_divergence)

[0.0625 0.0625 0.0625 0.0625 0.0625 0.0625 0.0625 0.0625 0.0625 0.0625
 0.0625 0.0625 0.0625 0.0625 0.0625 0.0625]
Maximum possible KL divergence: nan



divide by zero encountered in log


invalid value encountered in multiply

